diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index c883ab47e..778b131c8 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -1053,6 +1053,7 @@ func main() { startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000) localStorage.Set(db, startTimeMargin) + db.SetWriteNotified(remoteStorage) close(dbOpen) <-cancel return nil diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 88892e62e..d01f96b3b 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -76,6 +76,13 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal return s } +func (s *Storage) Notify() { + for _, q := range s.rws.queues { + // These should all be non blocking + q.watcher.Notify() + } +} + // ApplyConfig updates the state as the new config requires. func (s *Storage) ApplyConfig(conf *config.Config) error { s.mtx.Lock() diff --git a/tsdb/db.go b/tsdb/db.go index 0cd00c2c5..a0d0a4b26 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -229,6 +229,8 @@ type DB struct { // out-of-order compaction and vertical queries. oooWasEnabled atomic.Bool + writeNotified wlog.WriteNotified + registerer prometheus.Registerer } @@ -802,6 +804,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs if err != nil { return nil, err } + db.head.writeNotified = db.writeNotified // Register metrics after assigning the head block. db.metrics = newDBMetrics(db, r) @@ -2016,6 +2019,12 @@ func (db *DB) CleanTombstones() (err error) { return nil } +func (db *DB) SetWriteNotified(wn wlog.WriteNotified) { + db.writeNotified = wn + // It's possible we already created the head struct, so we should also set the WN for that. + db.head.writeNotified = wn +} + func isBlockDir(fi fs.DirEntry) bool { if !fi.IsDir() { return false diff --git a/tsdb/head.go b/tsdb/head.go index a3c135300..aa7a3c125 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -121,6 +121,8 @@ type Head struct { stats *HeadStats reg prometheus.Registerer + writeNotified wlog.WriteNotified + memTruncationInProcess atomic.Bool } diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 2af01a2d6..060d32b7f 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -842,6 +842,10 @@ func (a *headAppender) Commit() (err error) { return errors.Wrap(err, "write to WAL") } + if a.head.writeNotified != nil { + a.head.writeNotified.Notify() + } + // No errors logging to WAL, so pass the exemplars along to the in memory storage. for _, e := range a.exemplars { s := a.head.series.getByID(chunks.HeadSeriesRef(e.ref)) diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go index 31f60feab..221e9607c 100644 --- a/tsdb/wlog/watcher.go +++ b/tsdb/wlog/watcher.go @@ -34,12 +34,16 @@ import ( ) const ( - readPeriod = 10 * time.Millisecond checkpointPeriod = 5 * time.Second segmentCheckPeriod = 100 * time.Millisecond consumer = "consumer" ) +var ( + ErrIgnorable = errors.New("ignore me") + readTimeout = 15 * time.Second +) + // WriteTo is an interface used by the Watcher to send the samples it's read // from the WAL on to somewhere else. Functions will be called concurrently // and it is left to the implementer to make sure they are safe. @@ -61,11 +65,17 @@ type WriteTo interface { SeriesReset(int) } +// Used to notifier the watcher that data has been written so that it can read. +type WriteNotified interface { + Notify() +} + type WatcherMetrics struct { recordsRead *prometheus.CounterVec recordDecodeFails *prometheus.CounterVec samplesSentPreTailing *prometheus.CounterVec currentSegment *prometheus.GaugeVec + notificationsSkipped *prometheus.CounterVec } // Watcher watches the TSDB WAL for a given WriteTo. @@ -88,9 +98,11 @@ type Watcher struct { recordDecodeFailsMetric prometheus.Counter samplesSentPreTailing prometheus.Counter currentSegmentMetric prometheus.Gauge + notificationsSkipped prometheus.Counter - quit chan struct{} - done chan struct{} + readNotify chan struct{} + quit chan struct{} + done chan struct{} // For testing, stop when we hit this segment. MaxSegment int @@ -134,6 +146,15 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { }, []string{consumer}, ), + notificationsSkipped: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "prometheus", + Subsystem: "wal_watcher", + Name: "notifications_skipped_total", + Help: "The number of WAL write notifications that the Watcher has skipped due to already being in a WAL read routine.", + }, + []string{consumer}, + ), } if reg != nil { @@ -141,6 +162,7 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { reg.MustRegister(m.recordDecodeFails) reg.MustRegister(m.samplesSentPreTailing) reg.MustRegister(m.currentSegment) + reg.MustRegister(m.notificationsSkipped) } return m @@ -161,13 +183,25 @@ func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logge sendExemplars: sendExemplars, sendHistograms: sendHistograms, - quit: make(chan struct{}), - done: make(chan struct{}), + readNotify: make(chan struct{}), + quit: make(chan struct{}), + done: make(chan struct{}), MaxSegment: -1, } } +func (w *Watcher) Notify() { + select { + case w.readNotify <- struct{}{}: + return + default: // default so we can exit + // we don't need a buffered channel or any buffering since + // for each notification it recv's the watcher will read until EOF + w.notificationsSkipped.Inc() + } +} + func (w *Watcher) setMetrics() { // Setup the WAL Watchers metrics. We do this here rather than in the // constructor because of the ordering of creating Queue Managers's, @@ -177,6 +211,8 @@ func (w *Watcher) setMetrics() { w.recordDecodeFailsMetric = w.metrics.recordDecodeFails.WithLabelValues(w.name) w.samplesSentPreTailing = w.metrics.samplesSentPreTailing.WithLabelValues(w.name) w.currentSegmentMetric = w.metrics.currentSegment.WithLabelValues(w.name) + w.notificationsSkipped = w.metrics.notificationsSkipped.WithLabelValues(w.name) + } } @@ -262,7 +298,7 @@ func (w *Watcher) Run() error { // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. - if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil { + if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil && !errors.Is(err, ErrIgnorable) { return err } @@ -330,6 +366,26 @@ func (w *Watcher) segments(dir string) ([]int, error) { return refs, nil } +func (w *Watcher) readAndHandleError(r *LiveReader, segmentNum int, tail bool, size int64) error { + err := w.readSegment(r, segmentNum, tail) + + // Ignore all errors reading to end of segment whilst replaying the WAL. + if !tail { + if err != nil && errors.Cause(err) != io.EOF { + level.Warn(w.logger).Log("msg", "Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err) + } else if r.Offset() != size { + level.Warn(w.logger).Log("msg", "Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", r.Offset(), "size", size) + } + return ErrIgnorable + } + + // Otherwise, when we are tailing, non-EOFs are fatal. + if errors.Cause(err) != io.EOF { + return err + } + return nil +} + // Use tail true to indicate that the reader is currently on a segment that is // actively being written to. If false, assume it's a full segment and we're // replaying it on start to cache the series records. @@ -342,7 +398,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { reader := NewLiveReader(w.logger, w.readerMetrics, segment) - readTicker := time.NewTicker(readPeriod) + readTicker := time.NewTicker(readTimeout) defer readTicker.Stop() checkpointTicker := time.NewTicker(checkpointPeriod) @@ -400,7 +456,6 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { if last <= segmentNum { continue } - err = w.readSegment(reader, segmentNum, tail) // Ignore errors reading to end of segment whilst replaying the WAL. @@ -421,24 +476,23 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { return nil + // we haven't read due to a notification in quite some time, try reading anyways case <-readTicker.C: - err = w.readSegment(reader, segmentNum, tail) - - // Ignore all errors reading to end of segment whilst replaying the WAL. - if !tail { - switch { - case err != nil && errors.Cause(err) != io.EOF: - level.Warn(w.logger).Log("msg", "Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err) - case reader.Offset() != size: - level.Warn(w.logger).Log("msg", "Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size) - } - return nil - } - - // Otherwise, when we are tailing, non-EOFs are fatal. - if errors.Cause(err) != io.EOF { + level.Debug(w.logger).Log("msg", "Watcher is reading the WAL due to timeout, haven't received any write notifications recently", "timeout", readTimeout) + err := w.readAndHandleError(reader, segmentNum, tail, size) + if err != nil { return err } + // still want to reset the ticker so we don't read too often + readTicker.Reset(readTimeout) + + case <-w.readNotify: + err := w.readAndHandleError(reader, segmentNum, tail, size) + if err != nil { + return err + } + // still want to reset the ticker so we don't read too often + readTicker.Reset(readTimeout) } } } diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index 530d0ffb4..94b6a92d1 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -104,7 +104,7 @@ func (wtm *writeToMock) SeriesReset(index int) { } } -func (wtm *writeToMock) checkNumLabels() int { +func (wtm *writeToMock) checkNumSeries() int { wtm.seriesLock.Lock() defer wtm.seriesLock.Unlock() return len(wtm.seriesSegmentIndexes) @@ -230,9 +230,9 @@ func TestTailSamples(t *testing.T) { expectedExemplars := seriesCount * exemplarsCount expectedHistograms := seriesCount * histogramsCount retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumLabels() >= expectedSeries + return wt.checkNumSeries() >= expectedSeries }) - require.Equal(t, expectedSeries, wt.checkNumLabels(), "did not receive the expected number of series") + require.Equal(t, expectedSeries, wt.checkNumSeries(), "did not receive the expected number of series") require.Equal(t, expectedSamples, wt.samplesAppended, "did not receive the expected number of samples") require.Equal(t, expectedExemplars, wt.exemplarsAppended, "did not receive the expected number of exemplars") require.Equal(t, expectedHistograms, wt.histogramsAppended, "did not receive the expected number of histograms") @@ -290,7 +290,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) { } } require.NoError(t, w.Log(recs...)) - + readTimeout = time.Second _, _, err = Segments(w.Dir()) require.NoError(t, err) @@ -299,11 +299,10 @@ func TestReadToEndNoCheckpoint(t *testing.T) { go watcher.Start() expected := seriesCount - retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumLabels() >= expected - }) + require.Eventually(t, func() bool { + return wt.checkNumSeries() == expected + }, 20*time.Second, 1*time.Second) watcher.Stop() - require.Equal(t, expected, wt.checkNumLabels()) }) } } @@ -383,16 +382,17 @@ func TestReadToEndWithCheckpoint(t *testing.T) { _, _, err = Segments(w.Dir()) require.NoError(t, err) + readTimeout = time.Second wt := newWriteToMock() watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) go watcher.Start() expected := seriesCount * 2 - retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumLabels() >= expected - }) + + require.Eventually(t, func() bool { + return wt.checkNumSeries() == expected + }, 10*time.Second, 1*time.Second) watcher.Stop() - require.Equal(t, expected, wt.checkNumLabels()) }) } } @@ -460,10 +460,10 @@ func TestReadCheckpoint(t *testing.T) { expectedSeries := seriesCount retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumLabels() >= expectedSeries + return wt.checkNumSeries() >= expectedSeries }) watcher.Stop() - require.Equal(t, expectedSeries, wt.checkNumLabels()) + require.Equal(t, expectedSeries, wt.checkNumSeries()) }) } } @@ -595,6 +595,7 @@ func TestCheckpointSeriesReset(t *testing.T) { _, _, err = Segments(w.Dir()) require.NoError(t, err) + readTimeout = time.Second wt := newWriteToMock() watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) watcher.MaxSegment = -1 @@ -602,9 +603,11 @@ func TestCheckpointSeriesReset(t *testing.T) { expected := seriesCount retry(t, defaultRetryInterval, defaultRetries, func() bool { - return wt.checkNumLabels() >= expected + return wt.checkNumSeries() >= expected }) - require.Equal(t, seriesCount, wt.checkNumLabels()) + require.Eventually(t, func() bool { + return wt.checkNumSeries() == seriesCount + }, 10*time.Second, 1*time.Second) _, err = Checkpoint(log.NewNopLogger(), w, 2, 4, func(x chunks.HeadSeriesRef) bool { return true }, 0) require.NoError(t, err) @@ -621,7 +624,9 @@ func TestCheckpointSeriesReset(t *testing.T) { // If you modify the checkpoint and truncate segment #'s run the test to see how // many series records you end up with and change the last Equals check accordingly // or modify the Equals to Assert(len(wt.seriesLabels) < seriesCount*10) - require.Equal(t, tc.segments, wt.checkNumLabels()) + require.Eventually(t, func() bool { + return wt.checkNumSeries() == tc.segments + }, 20*time.Second, 1*time.Second) }) } } diff --git a/tsdb/wlog/wlog.go b/tsdb/wlog/wlog.go index df8bab53f..e38cb94cb 100644 --- a/tsdb/wlog/wlog.go +++ b/tsdb/wlog/wlog.go @@ -188,6 +188,8 @@ type WL struct { compress bool snappyBuf []byte + WriteNotified WriteNotified + metrics *wlMetrics } @@ -343,6 +345,10 @@ func (w *WL) Dir() string { return w.dir } +func (w *WL) SetWriteNotified(wn WriteNotified) { + w.WriteNotified = wn +} + func (w *WL) run() { Loop: for {