Add unittest for rule staleness, and rules generally.
This commit is contained in:
parent
0400f3cfd2
commit
0451d6d31b
|
@ -163,7 +163,7 @@ func (g *Group) run() {
|
|||
iterationsScheduled.Inc()
|
||||
|
||||
start := time.Now()
|
||||
g.Eval()
|
||||
g.Eval(start)
|
||||
|
||||
iterationDuration.Observe(time.Since(start).Seconds())
|
||||
}
|
||||
|
@ -257,9 +257,8 @@ func typeForRule(r Rule) ruleType {
|
|||
// Eval runs a single evaluation cycle in which all rules are evaluated in parallel.
|
||||
// In the future a single group will be evaluated sequentially to properly handle
|
||||
// rule dependency.
|
||||
func (g *Group) Eval() {
|
||||
func (g *Group) Eval(ts time.Time) {
|
||||
var (
|
||||
now = time.Now()
|
||||
wg sync.WaitGroup
|
||||
mu sync.Mutex
|
||||
seriesReturned = make(map[string]labels.Labels, len(g.seriesInPreviousEval))
|
||||
|
@ -279,7 +278,7 @@ func (g *Group) Eval() {
|
|||
|
||||
evalTotal.WithLabelValues(rtyp).Inc()
|
||||
|
||||
vector, err := rule.Eval(g.opts.Context, now, g.opts.QueryEngine, g.opts.ExternalURL)
|
||||
vector, err := rule.Eval(g.opts.Context, ts, g.opts.QueryEngine, g.opts.ExternalURL)
|
||||
if err != nil {
|
||||
// Canceled queries are intentional termination of queries. This normally
|
||||
// happens on shutdown and thus we skip logging of any errors here.
|
||||
|
@ -344,7 +343,7 @@ func (g *Group) Eval() {
|
|||
for metric, lset := range g.seriesInPreviousEval {
|
||||
if _, ok := seriesReturned[metric]; !ok {
|
||||
// Series no longer exposed, mark it stale.
|
||||
_, err = app.Add(lset, timestamp.FromTime(now), math.Float64frombits(value.StaleNaN))
|
||||
_, err = app.Add(lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
|
||||
switch err {
|
||||
case nil:
|
||||
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
|
||||
|
|
|
@ -14,7 +14,10 @@
|
|||
package rules
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -23,7 +26,10 @@ import (
|
|||
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||
"github.com/prometheus/prometheus/pkg/value"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
)
|
||||
|
||||
func TestAlertingRule(t *testing.T) {
|
||||
|
@ -156,3 +162,96 @@ func annotateWithTime(lines []string, ts time.Time) []string {
|
|||
}
|
||||
return annotatedLines
|
||||
}
|
||||
|
||||
func TestStaleness(t *testing.T) {
|
||||
storage := testutil.NewStorage(t)
|
||||
defer storage.Close()
|
||||
engine := promql.NewEngine(storage, nil)
|
||||
opts := &ManagerOptions{
|
||||
QueryEngine: engine,
|
||||
Appendable: storage,
|
||||
Context: context.Background(),
|
||||
}
|
||||
|
||||
expr, err := promql.ParseExpr("a + 1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rule := NewRecordingRule("a_plus_one", expr, labels.Labels{})
|
||||
group := NewGroup("default", time.Second, []Rule{rule}, opts)
|
||||
|
||||
// A time series that has two samples and then goes stale.
|
||||
app, _ := storage.Appender()
|
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 0, 1)
|
||||
if err = app.Commit(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
app, _ = storage.Appender()
|
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2)
|
||||
if err = app.Commit(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
app, _ = storage.Appender()
|
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN))
|
||||
if err = app.Commit(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Execute 3 times, 1 second apart.
|
||||
group.Eval(time.Unix(0, 0))
|
||||
group.Eval(time.Unix(1, 0))
|
||||
group.Eval(time.Unix(2, 0))
|
||||
|
||||
querier, err := storage.Querier(0, 2000)
|
||||
defer querier.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
matcher, _ := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one")
|
||||
seriesSet := querier.Select(matcher)
|
||||
samples, err := readSeriesSet(seriesSet)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
metric := labels.FromStrings(model.MetricNameLabel, "a_plus_one").String()
|
||||
metricSample, ok := samples[metric]
|
||||
if !ok {
|
||||
t.Fatalf("Series %s not returned.", metric)
|
||||
}
|
||||
if !value.IsStaleNaN(metricSample[2].V) {
|
||||
t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(metricSample[2].V))
|
||||
}
|
||||
metricSample[2].V = 42 // reflect.DeepEqual cannot handle NaN.
|
||||
|
||||
want := map[string][]promql.Point{
|
||||
metric: []promql.Point{{0, 2}, {1000, 3}, {2000, 42}},
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(want, samples) {
|
||||
t.Fatalf("Returned samples not as expected. Wanted: %+v Got: %+v", want, samples)
|
||||
}
|
||||
}
|
||||
|
||||
// Convert a SeriesSet into a form useable with reflect.DeepEqual.
|
||||
func readSeriesSet(ss storage.SeriesSet) (map[string][]promql.Point, error) {
|
||||
result := map[string][]promql.Point{}
|
||||
|
||||
for ss.Next() {
|
||||
series := ss.At()
|
||||
|
||||
points := []promql.Point{}
|
||||
it := series.Iterator()
|
||||
for it.Next() {
|
||||
t, v := it.At()
|
||||
points = append(points, promql.Point{T: t, V: v})
|
||||
}
|
||||
|
||||
name := series.Labels().String()
|
||||
result[name] = points
|
||||
if err := ss.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue