From 05380aa0ac3d2df2c549d4d4c863d850da7a9886 Mon Sep 17 00:00:00 2001 From: Sebastian Rabenhorst <4246554+rabenhorst@users.noreply.github.com> Date: Wed, 12 Jun 2024 16:07:42 +0200 Subject: [PATCH] agent db: make rejecting ooo samples configurable (#14094) feat: Make OOO ingestion time window configurable for Prometheus Agent. Signed-off-by: Sebastian Rabenhorst --- cmd/prometheus/main.go | 24 ++++++++----- docs/configuration/configuration.md | 4 +++ tsdb/agent/db.go | 38 ++++++++++++++++---- tsdb/agent/db_test.go | 55 ++++++++++++++++++++++++++++- 4 files changed, 104 insertions(+), 17 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 8db2f2c5e..cd7f533d1 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -1197,7 +1197,7 @@ func main() { } if agentMode { // WAL storage. - opts := cfg.agent.ToAgentOptions() + opts := cfg.agent.ToAgentOptions(cfg.tsdb.OutOfOrderTimeWindow) cancel := make(chan struct{}) g.Add( func() error { @@ -1233,6 +1233,7 @@ func main() { "TruncateFrequency", cfg.agent.TruncateFrequency, "MinWALTime", cfg.agent.MinWALTime, "MaxWALTime", cfg.agent.MaxWALTime, + "OutOfOrderTimeWindow", cfg.agent.OutOfOrderTimeWindow, ) localStorage.Set(db, 0) @@ -1736,17 +1737,22 @@ type agentOptions struct { TruncateFrequency model.Duration MinWALTime, MaxWALTime model.Duration NoLockfile bool + OutOfOrderTimeWindow int64 } -func (opts agentOptions) ToAgentOptions() agent.Options { +func (opts agentOptions) ToAgentOptions(outOfOrderTimeWindow int64) agent.Options { + if outOfOrderTimeWindow < 0 { + outOfOrderTimeWindow = 0 + } return agent.Options{ - WALSegmentSize: int(opts.WALSegmentSize), - WALCompression: wlog.ParseCompressionType(opts.WALCompression, opts.WALCompressionType), - StripeSize: opts.StripeSize, - TruncateFrequency: time.Duration(opts.TruncateFrequency), - MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)), - MaxWALTime: durationToInt64Millis(time.Duration(opts.MaxWALTime)), - NoLockfile: opts.NoLockfile, + WALSegmentSize: int(opts.WALSegmentSize), + WALCompression: wlog.ParseCompressionType(opts.WALCompression, opts.WALCompressionType), + StripeSize: opts.StripeSize, + TruncateFrequency: time.Duration(opts.TruncateFrequency), + MinWALTime: durationToInt64Millis(time.Duration(opts.MinWALTime)), + MaxWALTime: durationToInt64Millis(time.Duration(opts.MaxWALTime)), + NoLockfile: opts.NoLockfile, + OutOfOrderTimeWindow: outOfOrderTimeWindow, } } diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index f0e13cf13..b83219700 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -3813,6 +3813,10 @@ NOTE: Out-of-order ingestion is an experimental feature, but you do not need any # into the TSDB, i.e. it is an in-order sample or an out-of-order/out-of-bounds sample # that is within the out-of-order window, or (b) too-old, i.e. not in-order # and before the out-of-order window. +# +# When out_of_order_time_window is greater than 0, it also affects experimental agent. It allows +# the agent's WAL to accept out-of-order samples that fall within the specified time window relative +# to the timestamp of the last appended sample for the same series. [ out_of_order_time_window: | default = 0s ] ``` diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 513c2ed5a..1b6df3af0 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -81,19 +81,23 @@ type Options struct { // NoLockfile disables creation and consideration of a lock file. NoLockfile bool + + // OutOfOrderTimeWindow specifies how much out of order is allowed, if any. + OutOfOrderTimeWindow int64 } // DefaultOptions used for the WAL storage. They are reasonable for setups using // millisecond-precision timestamps. func DefaultOptions() *Options { return &Options{ - WALSegmentSize: wlog.DefaultSegmentSize, - WALCompression: wlog.CompressionNone, - StripeSize: tsdb.DefaultStripeSize, - TruncateFrequency: DefaultTruncateFrequency, - MinWALTime: DefaultMinWALTime, - MaxWALTime: DefaultMaxWALTime, - NoLockfile: false, + WALSegmentSize: wlog.DefaultSegmentSize, + WALCompression: wlog.CompressionNone, + StripeSize: tsdb.DefaultStripeSize, + TruncateFrequency: DefaultTruncateFrequency, + MinWALTime: DefaultMinWALTime, + MaxWALTime: DefaultMaxWALTime, + NoLockfile: false, + OutOfOrderTimeWindow: 0, } } @@ -812,6 +816,11 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo series.Lock() defer series.Unlock() + if t <= a.minValidTime(series.lastTs) { + a.metrics.totalOutOfOrderSamples.Inc() + return 0, storage.ErrOutOfOrderSample + } + // NOTE: always modify pendingSamples and sampleSeries together. a.pendingSamples = append(a.pendingSamples, record.RefSample{ Ref: series.ref, @@ -935,6 +944,11 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int series.Lock() defer series.Unlock() + if t <= a.minValidTime(series.lastTs) { + a.metrics.totalOutOfOrderSamples.Inc() + return 0, storage.ErrOutOfOrderSample + } + switch { case h != nil: // NOTE: always modify pendingHistograms and histogramSeries together @@ -1103,3 +1117,13 @@ func (a *appender) logSeries() error { return nil } + +// mintTs returns the minimum timestamp that a sample can have +// and is needed for preventing underflow. +func (a *appender) minValidTime(lastTs int64) int64 { + if lastTs < math.MinInt64+a.opts.OutOfOrderTimeWindow { + return math.MinInt64 + } + + return lastTs - a.opts.OutOfOrderTimeWindow +} diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index a7dae3220..b984e6bc0 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -16,6 +16,7 @@ package agent import ( "context" "fmt" + "math" "path/filepath" "strconv" "testing" @@ -761,7 +762,9 @@ func TestDBAllowOOOSamples(t *testing.T) { ) reg := prometheus.NewRegistry() - s := createTestAgentDB(t, reg, DefaultOptions()) + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = math.MaxInt64 + s := createTestAgentDB(t, reg, opts) app := s.Appender(context.TODO()) // Let's add some samples in the [offset, offset+numDatapoints) range. @@ -879,6 +882,56 @@ func TestDBAllowOOOSamples(t *testing.T) { require.NoError(t, db.Close()) } +func TestDBOutOfOrderTimeWindow(t *testing.T) { + tc := []struct { + outOfOrderTimeWindow, firstTs, secondTs int64 + expectedError error + }{ + {0, 100, 101, nil}, + {0, 100, 100, storage.ErrOutOfOrderSample}, + {0, 100, 99, storage.ErrOutOfOrderSample}, + {100, 100, 1, nil}, + {100, 100, 0, storage.ErrOutOfOrderSample}, + } + + for _, c := range tc { + t.Run(fmt.Sprintf("outOfOrderTimeWindow=%d, firstTs=%d, secondTs=%d, expectedError=%s", c.outOfOrderTimeWindow, c.firstTs, c.secondTs, c.expectedError), func(t *testing.T) { + reg := prometheus.NewRegistry() + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = c.outOfOrderTimeWindow + s := createTestAgentDB(t, reg, opts) + app := s.Appender(context.TODO()) + + lbls := labelsForTest(t.Name()+"_histogram", 1) + lset := labels.New(lbls[0]...) + _, err := app.AppendHistogram(0, lset, c.firstTs, tsdbutil.GenerateTestHistograms(1)[0], nil) + require.NoError(t, err) + err = app.Commit() + require.NoError(t, err) + _, err = app.AppendHistogram(0, lset, c.secondTs, tsdbutil.GenerateTestHistograms(1)[0], nil) + require.ErrorIs(t, err, c.expectedError) + + lbls = labelsForTest(t.Name(), 1) + lset = labels.New(lbls[0]...) + _, err = app.Append(0, lset, c.firstTs, 0) + require.NoError(t, err) + err = app.Commit() + require.NoError(t, err) + _, err = app.Append(0, lset, c.secondTs, 0) + require.ErrorIs(t, err, c.expectedError) + + expectedAppendedSamples := float64(2) + if c.expectedError != nil { + expectedAppendedSamples = 1 + } + m := gatherFamily(t, reg, "prometheus_agent_samples_appended_total") + require.Equal(t, expectedAppendedSamples, m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples") + require.Equal(t, expectedAppendedSamples, m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms") + require.NoError(t, s.Close()) + }) + } +} + func BenchmarkCreateSeries(b *testing.B) { s := createTestAgentDB(b, nil, DefaultOptions()) defer s.Close()