diff --git a/rules/group.go b/rules/group.go index 7f53e1b47..8de0900d1 100644 --- a/rules/group.go +++ b/rules/group.go @@ -71,7 +71,6 @@ type Group struct { // Rule group evaluation iteration function, // defaults to DefaultEvalIterationFunc. evalIterationFunc GroupEvalIterationFunc - dependencyMap dependencyMap } // GroupEvalIterationFunc is used to implement and extend rule group @@ -130,7 +129,6 @@ func NewGroup(o GroupOptions) *Group { logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), metrics: metrics, evalIterationFunc: evalIterationFunc, - dependencyMap: buildDependencyMap(o.Rules), } } @@ -437,7 +435,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { eval := func(i int, rule Rule, async bool) { defer func() { if async { - g.opts.ConcurrentEvalsController.Done() + g.opts.RuleConcurrencyController.Done() } }() @@ -569,7 +567,8 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output. // Try run concurrently if there are slots available. - if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrentEvalsController != nil && g.opts.ConcurrentEvalsController.Allow() { + ctrl := g.opts.RuleConcurrencyController + if ctrl != nil && ctrl.RuleEligible(g, rule) && ctrl.Allow() { go eval(i, rule, true) } else { eval(i, rule, false) diff --git a/rules/manager.go b/rules/manager.go index b982f2375..9ac95cdbd 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -118,7 +118,7 @@ type ManagerOptions struct { GroupLoader GroupLoader MaxConcurrentEvals int64 ConcurrentEvalsEnabled bool - ConcurrentEvalsController ConcurrentRuleEvalController + RuleConcurrencyController RuleConcurrencyController Metrics *Metrics } @@ -134,7 +134,9 @@ func NewManager(o *ManagerOptions) *Manager { o.GroupLoader = FileLoader{} } - o.ConcurrentEvalsController = NewConcurrentRuleEvalController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals) + if o.RuleConcurrencyController == nil { + o.RuleConcurrencyController = newRuleConcurrencyController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals) + } m := &Manager{ groups: map[string]*Group{}, @@ -182,6 +184,10 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels m.mtx.Lock() defer m.mtx.Unlock() + if m.opts.RuleConcurrencyController != nil { + m.opts.RuleConcurrencyController.Invalidate() + } + groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, files...) if errs != nil { @@ -410,26 +416,55 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc { } } -// ConcurrentRuleEvalController controls whether rules can be evaluated concurrently. Its purpose it to bound the amount -// of concurrency in rule evaluations so they do not overwhelm the Prometheus server with additional query load. +// RuleConcurrencyController controls whether rules can be evaluated concurrently. Its purpose it to bound the amount +// of concurrency in rule evaluations, to not overwhelm the Prometheus server with additional query load. // Concurrency is controlled globally, not on a per-group basis. -type ConcurrentRuleEvalController interface { +type RuleConcurrencyController interface { + // RuleEligible determines if a rule can be run concurrently. + RuleEligible(g *Group, r Rule) bool + + // Allow determines whether any concurrent evaluation slots are available. Allow() bool + + // Done releases a concurrent evaluation slot. Done() + + // Invalidate instructs the controller to invalidate its state. + // This should be called when groups are modified (during a reload, for instance), because the controller may + // store some state about each group in order to more efficiently determine rule eligibility. + Invalidate() +} + +func newRuleConcurrencyController(enabled bool, maxConcurrency int64) RuleConcurrencyController { + return &concurrentRuleEvalController{ + enabled: enabled, + sema: semaphore.NewWeighted(maxConcurrency), + depMaps: map[*Group]dependencyMap{}, + } } // concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. type concurrentRuleEvalController struct { + mu sync.Mutex enabled bool sema *semaphore.Weighted + depMaps map[*Group]dependencyMap } -func NewConcurrentRuleEvalController(enabled bool, maxConcurrency int64) ConcurrentRuleEvalController { - return concurrentRuleEvalController{enabled: enabled, sema: semaphore.NewWeighted(maxConcurrency)} +func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool { + c.mu.Lock() + defer c.mu.Unlock() + + depMap, found := c.depMaps[g] + if !found { + depMap = buildDependencyMap(g.rules) + c.depMaps[g] = depMap + } + + return depMap.isIndependent(r) } -// Allow determines whether any concurrency slots are available. -func (c concurrentRuleEvalController) Allow() bool { +func (c *concurrentRuleEvalController) Allow() bool { if !c.enabled { return false } @@ -437,11 +472,18 @@ func (c concurrentRuleEvalController) Allow() bool { return c.sema.TryAcquire(1) } -// Done releases a concurrent evaluation slot. -func (c concurrentRuleEvalController) Done() { +func (c *concurrentRuleEvalController) Done() { if !c.enabled { return } c.sema.Release(1) } + +func (c *concurrentRuleEvalController) Invalidate() { + c.mu.Lock() + defer c.mu.Unlock() + + // Clear out the memoized dependency maps because some or all groups may have been updated. + c.depMaps = map[*Group]dependencyMap{} +} diff --git a/rules/manager_test.go b/rules/manager_test.go index 8b3b9c08f..47f0248eb 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1435,21 +1435,23 @@ func TestDependencyMap(t *testing.T) { Opts: opts, }) - require.Zero(t, group.dependencyMap.dependencies(rule)) - require.Equal(t, 2, group.dependencyMap.dependents(rule)) - require.False(t, group.dependencyMap.isIndependent(rule)) + depMap := buildDependencyMap(group.rules) - require.Zero(t, group.dependencyMap.dependents(rule2)) - require.Equal(t, 1, group.dependencyMap.dependencies(rule2)) - require.False(t, group.dependencyMap.isIndependent(rule2)) + require.Zero(t, depMap.dependencies(rule)) + require.Equal(t, 2, depMap.dependents(rule)) + require.False(t, depMap.isIndependent(rule)) - require.Zero(t, group.dependencyMap.dependents(rule3)) - require.Zero(t, group.dependencyMap.dependencies(rule3)) - require.True(t, group.dependencyMap.isIndependent(rule3)) + require.Zero(t, depMap.dependents(rule2)) + require.Equal(t, 1, depMap.dependencies(rule2)) + require.False(t, depMap.isIndependent(rule2)) - require.Zero(t, group.dependencyMap.dependents(rule4)) - require.Equal(t, 1, group.dependencyMap.dependencies(rule4)) - require.False(t, group.dependencyMap.isIndependent(rule4)) + require.Zero(t, depMap.dependents(rule3)) + require.Zero(t, depMap.dependencies(rule3)) + require.True(t, depMap.isIndependent(rule3)) + + require.Zero(t, depMap.dependents(rule4)) + require.Equal(t, 1, depMap.dependencies(rule4)) + require.False(t, depMap.isIndependent(rule4)) } func TestNoDependency(t *testing.T) { @@ -1470,8 +1472,9 @@ func TestNoDependency(t *testing.T) { Opts: opts, }) + depMap := buildDependencyMap(group.rules) // A group with only one rule cannot have dependencies. - require.Empty(t, group.dependencyMap) + require.Empty(t, depMap) } func TestDependenciesEdgeCases(t *testing.T) { @@ -1493,9 +1496,10 @@ func TestDependenciesEdgeCases(t *testing.T) { require.NoError(t, err) rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{}) + depMap := buildDependencyMap(group.rules) // A group with no rules has no dependency map, but doesn't panic if the map is queried. - require.Nil(t, group.dependencyMap) - require.False(t, group.dependencyMap.isIndependent(rule)) + require.Nil(t, depMap) + require.False(t, depMap.isIndependent(rule)) }) t.Run("rules which reference no series", func(t *testing.T) { @@ -1514,9 +1518,10 @@ func TestDependenciesEdgeCases(t *testing.T) { Opts: opts, }) + depMap := buildDependencyMap(group.rules) // A group with rules which reference no series will still produce a dependency map - require.True(t, group.dependencyMap.isIndependent(rule1)) - require.True(t, group.dependencyMap.isIndependent(rule2)) + require.True(t, depMap.isIndependent(rule1)) + require.True(t, depMap.isIndependent(rule2)) }) } @@ -1542,10 +1547,11 @@ func TestNoMetricSelector(t *testing.T) { Opts: opts, }) + depMap := buildDependencyMap(group.rules) // A rule with no metric selector cannot be reliably determined to have no dependencies on other rules, and therefore // all rules are not considered independent. - require.False(t, group.dependencyMap.isIndependent(rule)) - require.False(t, group.dependencyMap.isIndependent(rule2)) + require.False(t, depMap.isIndependent(rule)) + require.False(t, depMap.isIndependent(rule2)) } func TestDependentRulesWithNonMetricExpression(t *testing.T) { @@ -1574,9 +1580,10 @@ func TestDependentRulesWithNonMetricExpression(t *testing.T) { Opts: opts, }) - require.False(t, group.dependencyMap.isIndependent(rule)) - require.False(t, group.dependencyMap.isIndependent(rule2)) - require.True(t, group.dependencyMap.isIndependent(rule3)) + depMap := buildDependencyMap(group.rules) + require.False(t, depMap.isIndependent(rule)) + require.False(t, depMap.isIndependent(rule2)) + require.True(t, depMap.isIndependent(rule3)) } func TestRulesDependentOnMetaMetrics(t *testing.T) { @@ -1604,7 +1611,8 @@ func TestRulesDependentOnMetaMetrics(t *testing.T) { Opts: opts, }) - require.False(t, group.dependencyMap.isIndependent(rule)) + depMap := buildDependencyMap(group.rules) + require.False(t, depMap.isIndependent(rule)) } func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { @@ -1623,17 +1631,19 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { orig := make(map[string]dependencyMap, len(ruleManager.groups)) for _, g := range ruleManager.groups { + depMap := buildDependencyMap(g.rules) // No dependency map is expected because there is only one rule in the group. - require.Empty(t, g.dependencyMap) - orig[g.Name()] = g.dependencyMap + require.Empty(t, depMap) + orig[g.Name()] = depMap } // Update once without changing groups. err = ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil) require.NoError(t, err) for h, g := range ruleManager.groups { + depMap := buildDependencyMap(g.rules) // Dependency maps are the same because of no updates. - require.Equal(t, orig[h], g.dependencyMap) + require.Equal(t, orig[h], depMap) } // Groups will be recreated when updated. @@ -1653,12 +1663,13 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) { require.NotEmptyf(t, rr, "expected to find %q recording rule in fixture", ruleName) + depMap := buildDependencyMap(g.rules) // Dependency maps must change because the groups would've been updated. - require.NotEqual(t, orig[h], g.dependencyMap) + require.NotEqual(t, orig[h], depMap) // We expect there to be some dependencies since the new rule group contains a dependency. - require.Greater(t, len(g.dependencyMap), 0) - require.Equal(t, 1, g.dependencyMap.dependents(rr)) - require.Zero(t, g.dependencyMap.dependencies(rr)) + require.Greater(t, len(depMap), 0) + require.Equal(t, 1, depMap.dependents(rr)) + require.Zero(t, depMap.dependencies(rr)) } } @@ -1674,7 +1685,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { ) files := []string{"fixtures/rules_multiple.yaml"} - ruleManager := NewManager(&ManagerOptions{ + opts := &ManagerOptions{ Context: context.Background(), Logger: log.NewNopLogger(), Appendable: storage, @@ -1692,39 +1703,42 @@ func TestAsyncRuleEvaluation(t *testing.T) { promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345}, }, nil }, - }) + } - groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) - require.Empty(t, errs) - require.Len(t, groups, 1) + inflightTracker := func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + highWatermark := maxInflight.Load() + current := inflightQueries.Load() + if current > highWatermark { + maxInflight.Store(current) + } + + time.Sleep(time.Millisecond) + } + } + } expectedRules := 4 t.Run("synchronous evaluation with independent rules", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) + ruleManager := NewManager(opts) + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) + require.Empty(t, errs) + require.Len(t, groups, 1) + for _, group := range groups { require.Len(t, group.rules, expectedRules) start := time.Now() // Never expect more than 1 inflight query at a time. - go func() { - for { - select { - case <-ctx.Done(): - return - default: - highWatermark := maxInflight.Load() - current := inflightQueries.Load() - if current > highWatermark { - maxInflight.Store(current) - } - - time.Sleep(time.Millisecond) - } - } - }() + go inflightTracker(ctx) group.Eval(ctx, start) @@ -1744,33 +1758,27 @@ func TestAsyncRuleEvaluation(t *testing.T) { maxInflight.Store(0) ctx, cancel := context.WithCancel(context.Background()) + // Configure concurrency settings. + opts.ConcurrentEvalsEnabled = true + opts.MaxConcurrentEvals = 2 + opts.RuleConcurrencyController = nil + ruleManager := NewManager(opts) + + groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...) + require.Empty(t, errs) + require.Len(t, groups, 1) + for _, group := range groups { - // Allow up to 2 concurrent rule evaluations. - group.opts.ConcurrentEvalsController = NewConcurrentRuleEvalController(true, 2) require.Len(t, group.rules, expectedRules) start := time.Now() - go func() { - for { - select { - case <-ctx.Done(): - return - default: - highWatermark := maxInflight.Load() - current := inflightQueries.Load() - if current > highWatermark { - maxInflight.Store(current) - } - - time.Sleep(time.Millisecond) - } - } - }() + go inflightTracker(ctx) group.Eval(ctx, start) - require.EqualValues(t, 3, maxInflight.Load()) + // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. + require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) // Some rules should execute concurrently so should complete quicker. require.Less(t, time.Since(start).Seconds(), (time.Duration(expectedRules) * artificialDelay).Seconds()) // Each rule produces one vector.