mirror of
https://github.com/prometheus/prometheus
synced 2025-04-11 03:52:13 +00:00
Merge pull request #15669 from rajagopalanand/restore-for-state
rules: Add an option to restore new rule groups added to existing rule manager
This commit is contained in:
commit
39d7242cfe
6
rules/fixtures/alert_rule.yaml
Normal file
6
rules/fixtures/alert_rule.yaml
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
groups:
|
||||||
|
- name: test
|
||||||
|
interval: 1s
|
||||||
|
rules:
|
||||||
|
- alert: rule1
|
||||||
|
expr: 1 < bool 2
|
6
rules/fixtures/alert_rule1.yaml
Normal file
6
rules/fixtures/alert_rule1.yaml
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
groups:
|
||||||
|
- name: test2
|
||||||
|
interval: 1s
|
||||||
|
rules:
|
||||||
|
- alert: rule2
|
||||||
|
expr: 1 < bool 2
|
@ -96,6 +96,7 @@ type Manager struct {
|
|||||||
block chan struct{}
|
block chan struct{}
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
restored bool
|
restored bool
|
||||||
|
restoreNewRuleGroups bool
|
||||||
|
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
}
|
}
|
||||||
@ -122,6 +123,10 @@ type ManagerOptions struct {
|
|||||||
ConcurrentEvalsEnabled bool
|
ConcurrentEvalsEnabled bool
|
||||||
RuleConcurrencyController RuleConcurrencyController
|
RuleConcurrencyController RuleConcurrencyController
|
||||||
RuleDependencyController RuleDependencyController
|
RuleDependencyController RuleDependencyController
|
||||||
|
// At present, manager only restores `for` state when manager is newly created which happens
|
||||||
|
// during restarts. This flag provides an option to restore the `for` state when new rule groups are
|
||||||
|
// added to an existing manager
|
||||||
|
RestoreNewRuleGroups bool
|
||||||
|
|
||||||
Metrics *Metrics
|
Metrics *Metrics
|
||||||
}
|
}
|
||||||
@ -159,6 +164,7 @@ func NewManager(o *ManagerOptions) *Manager {
|
|||||||
block: make(chan struct{}),
|
block: make(chan struct{}),
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
logger: o.Logger,
|
logger: o.Logger,
|
||||||
|
restoreNewRuleGroups: o.RestoreNewRuleGroups,
|
||||||
}
|
}
|
||||||
|
|
||||||
return m
|
return m
|
||||||
@ -296,7 +302,7 @@ func (m *Manager) LoadGroups(
|
|||||||
) (map[string]*Group, []error) {
|
) (map[string]*Group, []error) {
|
||||||
groups := make(map[string]*Group)
|
groups := make(map[string]*Group)
|
||||||
|
|
||||||
shouldRestore := !m.restored
|
shouldRestore := !m.restored || m.restoreNewRuleGroups
|
||||||
|
|
||||||
for _, fn := range filenames {
|
for _, fn := range filenames {
|
||||||
rgs, errs := m.opts.GroupLoader.Load(fn, ignoreUnknownFields)
|
rgs, errs := m.opts.GroupLoader.Load(fn, ignoreUnknownFields)
|
||||||
@ -329,7 +335,7 @@ func (m *Manager) LoadGroups(
|
|||||||
labels.FromMap(r.Annotations),
|
labels.FromMap(r.Annotations),
|
||||||
externalLabels,
|
externalLabels,
|
||||||
externalURL,
|
externalURL,
|
||||||
m.restored,
|
!shouldRestore,
|
||||||
m.logger.With("alert", r.Alert),
|
m.logger.With("alert", r.Alert),
|
||||||
))
|
))
|
||||||
continue
|
continue
|
||||||
|
@ -2264,6 +2264,139 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNewRuleGroupRestoration(t *testing.T) {
|
||||||
|
store := teststorage.New(t)
|
||||||
|
t.Cleanup(func() { store.Close() })
|
||||||
|
var (
|
||||||
|
inflightQueries atomic.Int32
|
||||||
|
maxInflight atomic.Int32
|
||||||
|
maxConcurrency int64
|
||||||
|
interval = 60 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
waitForEvaluations := func(t *testing.T, ch <-chan int32, targetCount int32) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case cnt := <-ch:
|
||||||
|
if cnt == targetCount {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
files := []string{"fixtures/alert_rule.yaml"}
|
||||||
|
|
||||||
|
option := optsFactory(store, &maxInflight, &inflightQueries, maxConcurrency)
|
||||||
|
option.Queryable = store
|
||||||
|
option.Appendable = store
|
||||||
|
option.NotifyFunc = func(ctx context.Context, expr string, alerts ...*Alert) {}
|
||||||
|
|
||||||
|
var evalCount atomic.Int32
|
||||||
|
ch := make(chan int32)
|
||||||
|
noopEvalIterFunc := func(ctx context.Context, g *Group, evalTimestamp time.Time) {
|
||||||
|
evalCount.Inc()
|
||||||
|
ch <- evalCount.Load()
|
||||||
|
}
|
||||||
|
|
||||||
|
ruleManager := NewManager(option)
|
||||||
|
go ruleManager.Run()
|
||||||
|
err := ruleManager.Update(interval, files, labels.EmptyLabels(), "", noopEvalIterFunc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
waitForEvaluations(t, ch, 3)
|
||||||
|
require.Equal(t, int32(3), evalCount.Load())
|
||||||
|
ruleGroups := make(map[string]struct{})
|
||||||
|
for _, group := range ruleManager.groups {
|
||||||
|
ruleGroups[group.Name()] = struct{}{}
|
||||||
|
require.False(t, group.shouldRestore)
|
||||||
|
for _, rule := range group.rules {
|
||||||
|
require.True(t, rule.(*AlertingRule).restored.Load())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
files = append(files, "fixtures/alert_rule1.yaml")
|
||||||
|
err = ruleManager.Update(interval, files, labels.EmptyLabels(), "", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
ruleManager.Stop()
|
||||||
|
for _, group := range ruleManager.groups {
|
||||||
|
// new rule groups added to existing manager will not be restored
|
||||||
|
require.False(t, group.shouldRestore)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewRuleGroupRestorationWithRestoreNewGroupOption(t *testing.T) {
|
||||||
|
store := teststorage.New(t)
|
||||||
|
t.Cleanup(func() { store.Close() })
|
||||||
|
var (
|
||||||
|
inflightQueries atomic.Int32
|
||||||
|
maxInflight atomic.Int32
|
||||||
|
maxConcurrency int64
|
||||||
|
interval = 60 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
waitForEvaluations := func(t *testing.T, ch <-chan int32, targetCount int32) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case cnt := <-ch:
|
||||||
|
if cnt == targetCount {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
files := []string{"fixtures/alert_rule.yaml"}
|
||||||
|
|
||||||
|
option := optsFactory(store, &maxInflight, &inflightQueries, maxConcurrency)
|
||||||
|
option.Queryable = store
|
||||||
|
option.Appendable = store
|
||||||
|
option.RestoreNewRuleGroups = true
|
||||||
|
option.NotifyFunc = func(ctx context.Context, expr string, alerts ...*Alert) {}
|
||||||
|
|
||||||
|
var evalCount atomic.Int32
|
||||||
|
ch := make(chan int32)
|
||||||
|
noopEvalIterFunc := func(ctx context.Context, g *Group, evalTimestamp time.Time) {
|
||||||
|
evalCount.Inc()
|
||||||
|
ch <- evalCount.Load()
|
||||||
|
}
|
||||||
|
|
||||||
|
ruleManager := NewManager(option)
|
||||||
|
go ruleManager.Run()
|
||||||
|
err := ruleManager.Update(interval, files, labels.EmptyLabels(), "", noopEvalIterFunc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
waitForEvaluations(t, ch, 3)
|
||||||
|
require.Equal(t, int32(3), evalCount.Load())
|
||||||
|
ruleGroups := make(map[string]struct{})
|
||||||
|
for _, group := range ruleManager.groups {
|
||||||
|
ruleGroups[group.Name()] = struct{}{}
|
||||||
|
require.False(t, group.shouldRestore)
|
||||||
|
for _, rule := range group.rules {
|
||||||
|
require.True(t, rule.(*AlertingRule).restored.Load())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
files = append(files, "fixtures/alert_rule1.yaml")
|
||||||
|
err = ruleManager.Update(interval, files, labels.EmptyLabels(), "", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
// stop eval
|
||||||
|
ruleManager.Stop()
|
||||||
|
for _, group := range ruleManager.groups {
|
||||||
|
if _, OK := ruleGroups[group.Name()]; OK {
|
||||||
|
// already restored
|
||||||
|
require.False(t, group.shouldRestore)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// new rule groups added to existing manager will be restored
|
||||||
|
require.True(t, group.shouldRestore)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestBoundedRuleEvalConcurrency(t *testing.T) {
|
func TestBoundedRuleEvalConcurrency(t *testing.T) {
|
||||||
storage := teststorage.New(t)
|
storage := teststorage.New(t)
|
||||||
t.Cleanup(func() { storage.Close() })
|
t.Cleanup(func() { storage.Close() })
|
||||||
|
Loading…
Reference in New Issue
Block a user