Run rule evaluation with timestamps precisely evaluation_interval apart (#4201)

* Run rule evaluation with timestamps precisely evaluation_interval apart from one another.

Signed-off-by: Alin Sinpalean <alin.sinpalean@gmail.com>
This commit is contained in:
Alin Sinpalean 2018-06-01 16:23:07 +02:00 committed by Brian Brazil
parent 05cf674279
commit 9dc763cc03
1 changed files with 19 additions and 19 deletions

View File

@ -192,8 +192,9 @@ func (g *Group) run(ctx context.Context) {
defer close(g.terminated) defer close(g.terminated)
// Wait an initial amount to have consistently slotted intervals. // Wait an initial amount to have consistently slotted intervals.
evalTimestamp := g.evalTimestamp().Add(g.interval)
select { select {
case <-time.After(g.offset()): case <-time.After(time.Until(evalTimestamp)):
case <-g.done: case <-g.done:
return return
} }
@ -202,17 +203,20 @@ func (g *Group) run(ctx context.Context) {
iterationsScheduled.Inc() iterationsScheduled.Inc()
start := time.Now() start := time.Now()
g.Eval(ctx, start) g.Eval(ctx, evalTimestamp)
timeSinceStart := time.Since(start)
iterationDuration.Observe(time.Since(start).Seconds()) iterationDuration.Observe(timeSinceStart.Seconds())
g.SetEvaluationTime(time.Since(start)) g.SetEvaluationTime(timeSinceStart)
} }
lastTriggered := time.Now()
iter()
// The assumption here is that since the ticker was started after having
// waited for `evalTimestamp` to pass, the ticks will trigger soon
// after each `evalTimestamp + N * g.interval` occurrence.
tick := time.NewTicker(g.interval) tick := time.NewTicker(g.interval)
defer tick.Stop() defer tick.Stop()
iter()
for { for {
select { select {
case <-g.done: case <-g.done:
@ -222,12 +226,12 @@ func (g *Group) run(ctx context.Context) {
case <-g.done: case <-g.done:
return return
case <-tick.C: case <-tick.C:
missed := (time.Since(lastTriggered).Nanoseconds() / g.interval.Nanoseconds()) - 1 missed := (time.Since(evalTimestamp) / g.interval) - 1
if missed > 0 { if missed > 0 {
iterationsMissed.Add(float64(missed)) iterationsMissed.Add(float64(missed))
iterationsScheduled.Add(float64(missed)) iterationsScheduled.Add(float64(missed))
} }
lastTriggered = time.Now() evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
iter() iter()
} }
} }
@ -261,20 +265,16 @@ func (g *Group) SetEvaluationTime(dur time.Duration) {
g.evaluationTime = dur g.evaluationTime = dur
} }
// offset returns until the next consistently slotted evaluation interval. // evalTimestamp returns the immediately preceding consistently slotted evaluation time.
func (g *Group) offset() time.Duration { func (g *Group) evalTimestamp() time.Time {
now := time.Now().UnixNano()
var ( var (
base = now - (now % int64(g.interval)) offset = int64(g.hash() % uint64(g.interval))
offset = g.hash() % uint64(g.interval) now = time.Now().UnixNano()
next = base + int64(offset) adjNow = now - offset
base = adjNow - (adjNow % int64(g.interval))
) )
if next < now { return time.Unix(0, base+offset)
next += int64(g.interval)
}
return time.Duration(next - now)
} }
// copyState copies the alerting rule and staleness related state from the given group. // copyState copies the alerting rule and staleness related state from the given group.