Refactoring
Signed-off-by: Danny Kopping <danny.kopping@grafana.com>
This commit is contained in:
parent
0dc7036db3
commit
94cdfa30cd
|
@ -437,7 +437,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
|||
eval := func(i int, rule Rule, async bool) {
|
||||
defer func() {
|
||||
if async {
|
||||
g.opts.ConcurrencyController.Done()
|
||||
g.opts.ConcurrentEvalsController.Done()
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -569,7 +569,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
|
|||
|
||||
// If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output.
|
||||
// Try run concurrently if there are slots available.
|
||||
if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrencyController.Allow() {
|
||||
if g.dependencyMap.isIndependent(rule) && g.opts.ConcurrentEvalsController != nil && g.opts.ConcurrentEvalsController.Allow() {
|
||||
go eval(i, rule, true)
|
||||
} else {
|
||||
eval(i, rule, false)
|
||||
|
@ -888,8 +888,8 @@ func NewGroupMetrics(reg prometheus.Registerer) *Metrics {
|
|||
}
|
||||
|
||||
// dependencyMap is a data-structure which contains the relationships between rules within a group.
|
||||
// It is used to describe the dependency associations between recording rules in a group whereby one rule uses the
|
||||
// output metric produced by another recording rule in its expression (i.e. as its "input").
|
||||
// It is used to describe the dependency associations between rules in a group whereby one rule uses the
|
||||
// output metric produced by another rule in its expression (i.e. as its "input").
|
||||
type dependencyMap map[Rule][]Rule
|
||||
|
||||
// dependents returns the count of rules which use the output of the given rule as one of their inputs.
|
||||
|
@ -905,10 +905,6 @@ func (m dependencyMap) dependencies(r Rule) int {
|
|||
|
||||
var count int
|
||||
for _, children := range m {
|
||||
if len(children) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, child := range children {
|
||||
if child == r {
|
||||
count++
|
||||
|
@ -919,6 +915,8 @@ func (m dependencyMap) dependencies(r Rule) int {
|
|||
return count
|
||||
}
|
||||
|
||||
// isIndependent determines whether the given rule is not dependent on another rule for its input, nor is any other rule
|
||||
// dependent on its output.
|
||||
func (m dependencyMap) isIndependent(r Rule) bool {
|
||||
if m == nil {
|
||||
return false
|
||||
|
@ -928,6 +926,16 @@ func (m dependencyMap) isIndependent(r Rule) bool {
|
|||
}
|
||||
|
||||
// buildDependencyMap builds a data-structure which contains the relationships between rules within a group.
|
||||
//
|
||||
// Alert rules, by definition, cannot have any dependents - but they can have dependencies. Any recording rule on whose
|
||||
// output an Alert rule depends will not be able to run concurrently.
|
||||
//
|
||||
// There is a class of rule expressions which are considered "indeterminate", because either relationships cannot be
|
||||
// inferred, or concurrent evaluation of rules depending on these series would produce undefined/unexpected behaviour:
|
||||
// - wildcard queriers like {cluster="prod1"} which would match every series with that label selector
|
||||
// - any "meta" series (series produced by Prometheus itself) like ALERTS, ALERTS_FOR_STATE
|
||||
//
|
||||
// Rules which are independent can run concurrently with no side-effects.
|
||||
func buildDependencyMap(rules []Rule) dependencyMap {
|
||||
dependencies := make(dependencyMap)
|
||||
|
||||
|
|
|
@ -104,21 +104,21 @@ type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert)
|
|||
|
||||
// ManagerOptions bundles options for the Manager.
|
||||
type ManagerOptions struct {
|
||||
ExternalURL *url.URL
|
||||
QueryFunc QueryFunc
|
||||
NotifyFunc NotifyFunc
|
||||
Context context.Context
|
||||
Appendable storage.Appendable
|
||||
Queryable storage.Queryable
|
||||
Logger log.Logger
|
||||
Registerer prometheus.Registerer
|
||||
OutageTolerance time.Duration
|
||||
ForGracePeriod time.Duration
|
||||
ResendDelay time.Duration
|
||||
MaxConcurrentEvals int64
|
||||
ConcurrentEvalsEnabled bool
|
||||
ConcurrencyController ConcurrencyController
|
||||
GroupLoader GroupLoader
|
||||
ExternalURL *url.URL
|
||||
QueryFunc QueryFunc
|
||||
NotifyFunc NotifyFunc
|
||||
Context context.Context
|
||||
Appendable storage.Appendable
|
||||
Queryable storage.Queryable
|
||||
Logger log.Logger
|
||||
Registerer prometheus.Registerer
|
||||
OutageTolerance time.Duration
|
||||
ForGracePeriod time.Duration
|
||||
ResendDelay time.Duration
|
||||
GroupLoader GroupLoader
|
||||
MaxConcurrentEvals int64
|
||||
ConcurrentEvalsEnabled bool
|
||||
ConcurrentEvalsController ConcurrentRuleEvalController
|
||||
|
||||
Metrics *Metrics
|
||||
}
|
||||
|
@ -134,7 +134,7 @@ func NewManager(o *ManagerOptions) *Manager {
|
|||
o.GroupLoader = FileLoader{}
|
||||
}
|
||||
|
||||
o.ConcurrencyController = NewConcurrencyController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals)
|
||||
o.ConcurrentEvalsController = NewConcurrentRuleEvalController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals)
|
||||
|
||||
m := &Manager{
|
||||
groups: map[string]*Group{},
|
||||
|
@ -410,16 +410,26 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc {
|
|||
}
|
||||
}
|
||||
|
||||
type ConcurrencyController struct {
|
||||
// ConcurrentRuleEvalController controls whether rules can be evaluated concurrently. Its purpose it to bound the amount
|
||||
// of concurrency in rule evaluations so they do not overwhelm the Prometheus server with additional query load.
|
||||
// Concurrency is controlled globally, not on a per-group basis.
|
||||
type ConcurrentRuleEvalController interface {
|
||||
Allow() bool
|
||||
Done()
|
||||
}
|
||||
|
||||
// concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules.
|
||||
type concurrentRuleEvalController struct {
|
||||
enabled bool
|
||||
sema *semaphore.Weighted
|
||||
}
|
||||
|
||||
func NewConcurrencyController(enabled bool, maxConcurrency int64) ConcurrencyController {
|
||||
return ConcurrencyController{enabled: enabled, sema: semaphore.NewWeighted(maxConcurrency)}
|
||||
func NewConcurrentRuleEvalController(enabled bool, maxConcurrency int64) ConcurrentRuleEvalController {
|
||||
return concurrentRuleEvalController{enabled: enabled, sema: semaphore.NewWeighted(maxConcurrency)}
|
||||
}
|
||||
|
||||
func (c ConcurrencyController) Allow() bool {
|
||||
// Allow determines whether any concurrency slots are available.
|
||||
func (c concurrentRuleEvalController) Allow() bool {
|
||||
if !c.enabled {
|
||||
return false
|
||||
}
|
||||
|
@ -427,7 +437,8 @@ func (c ConcurrencyController) Allow() bool {
|
|||
return c.sema.TryAcquire(1)
|
||||
}
|
||||
|
||||
func (c ConcurrencyController) Done() {
|
||||
// Done releases a concurrent evaluation slot.
|
||||
func (c concurrentRuleEvalController) Done() {
|
||||
if !c.enabled {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -1471,7 +1471,53 @@ func TestNoDependency(t *testing.T) {
|
|||
})
|
||||
|
||||
// A group with only one rule cannot have dependencies.
|
||||
require.False(t, group.dependencyMap.isIndependent(rule))
|
||||
require.Empty(t, group.dependencyMap)
|
||||
}
|
||||
|
||||
func TestDependenciesEdgeCases(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
opts := &ManagerOptions{
|
||||
Context: ctx,
|
||||
Logger: log.NewNopLogger(),
|
||||
}
|
||||
|
||||
t.Run("empty group", func(t *testing.T) {
|
||||
group := NewGroup(GroupOptions{
|
||||
Name: "rule_group",
|
||||
Interval: time.Second,
|
||||
Rules: []Rule{}, // empty group
|
||||
Opts: opts,
|
||||
})
|
||||
|
||||
expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))")
|
||||
require.NoError(t, err)
|
||||
rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
|
||||
|
||||
// A group with no rules has no dependency map, but doesn't panic if the map is queried.
|
||||
require.Nil(t, group.dependencyMap)
|
||||
require.False(t, group.dependencyMap.isIndependent(rule))
|
||||
})
|
||||
|
||||
t.Run("rules which reference no series", func(t *testing.T) {
|
||||
expr, err := parser.ParseExpr("one")
|
||||
require.NoError(t, err)
|
||||
rule1 := NewRecordingRule("1", expr, labels.Labels{})
|
||||
|
||||
expr, err = parser.ParseExpr("two")
|
||||
require.NoError(t, err)
|
||||
rule2 := NewRecordingRule("2", expr, labels.Labels{})
|
||||
|
||||
group := NewGroup(GroupOptions{
|
||||
Name: "rule_group",
|
||||
Interval: time.Second,
|
||||
Rules: []Rule{rule1, rule2},
|
||||
Opts: opts,
|
||||
})
|
||||
|
||||
// A group with rules which reference no series will still produce a dependency map
|
||||
require.True(t, group.dependencyMap.isIndependent(rule1))
|
||||
require.True(t, group.dependencyMap.isIndependent(rule2))
|
||||
})
|
||||
}
|
||||
|
||||
func TestNoMetricSelector(t *testing.T) {
|
||||
|
@ -1596,10 +1642,23 @@ func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
for h, g := range ruleManager.groups {
|
||||
const ruleName = "job:http_requests:rate5m"
|
||||
var rr *RecordingRule
|
||||
|
||||
for _, r := range g.rules {
|
||||
if r.Name() == ruleName {
|
||||
rr = r.(*RecordingRule)
|
||||
}
|
||||
}
|
||||
|
||||
require.NotEmptyf(t, rr, "expected to find %q recording rule in fixture", ruleName)
|
||||
|
||||
// Dependency maps must change because the groups would've been updated.
|
||||
require.NotEqual(t, orig[h], g.dependencyMap)
|
||||
// We expect there to be some dependencies since the new rule group contains a dependency.
|
||||
require.Greater(t, len(g.dependencyMap), 0)
|
||||
require.Equal(t, 1, g.dependencyMap.dependents(rr))
|
||||
require.Zero(t, g.dependencyMap.dependencies(rr))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1625,17 +1684,16 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||
inflightQueries.Add(-1)
|
||||
}()
|
||||
|
||||
// Artificially delay all query executions to highly concurrent execution improvement.
|
||||
// Artificially delay all query executions to highlight concurrent execution improvement.
|
||||
time.Sleep(artificialDelay)
|
||||
|
||||
// return a stub sample
|
||||
// Return a stub sample.
|
||||
return promql.Vector{
|
||||
promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345},
|
||||
}, nil
|
||||
},
|
||||
})
|
||||
|
||||
// Evaluate groups manually to show the impact of async rule evaluations.
|
||||
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...)
|
||||
require.Empty(t, errs)
|
||||
require.Len(t, groups, 1)
|
||||
|
@ -1688,7 +1746,7 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||
|
||||
for _, group := range groups {
|
||||
// Allow up to 2 concurrent rule evaluations.
|
||||
group.opts.ConcurrencyController = NewConcurrencyController(true, 2)
|
||||
group.opts.ConcurrentEvalsController = NewConcurrentRuleEvalController(true, 2)
|
||||
require.Len(t, group.rules, expectedRules)
|
||||
|
||||
start := time.Now()
|
||||
|
@ -1749,17 +1807,16 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) {
|
|||
inflightQueries.Add(-1)
|
||||
}()
|
||||
|
||||
// Artificially delay all query executions to highly concurrent execution improvement.
|
||||
// Artificially delay all query executions to highlight concurrent execution improvement.
|
||||
time.Sleep(artificialDelay)
|
||||
|
||||
// return a stub sample
|
||||
// Return a stub sample.
|
||||
return promql.Vector{
|
||||
promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345},
|
||||
}, nil
|
||||
},
|
||||
})
|
||||
|
||||
// Evaluate groups manually to show the impact of async rule evaluations.
|
||||
groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...)
|
||||
require.Empty(t, errs)
|
||||
require.Len(t, groups, groupCount)
|
||||
|
|
Loading…
Reference in New Issue