diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go index 525515221d..6994d2dd8a 100644 --- a/tsdb/wlog/watcher.go +++ b/tsdb/wlog/watcher.go @@ -262,9 +262,6 @@ func (w *Watcher) loop() { // Run the watcher, which will tail the WAL until the quit channel is closed // or an error case is hit. func (w *Watcher) Run() error { - var lastSegment int - var err error - // We want to ensure this is false across iterations since // Run will be called again if there was a failure to read the WAL. w.sendSamples = false @@ -289,21 +286,19 @@ func (w *Watcher) Run() error { return err } - level.Debug(w.logger).Log("msg", "Tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment, "lastSegment", lastSegment) + level.Debug(w.logger).Log("msg", "Tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment) for !isClosed(w.quit) { w.currentSegmentMetric.Set(float64(currentSegment)) - level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment) - // Reset the value of lastSegment each iteration, this is to avoid having to wait too long for - // between reads if we're reading a segment that is not the most recent segment after startup. - _, lastSegment, err = w.firstAndLast() + // Re-check on each iteration in case a new segment was added, + // because watch() will wait for notifications on the last segment. + _, lastSegment, err := w.firstAndLast() if err != nil { return fmt.Errorf("wal.Segments: %w", err) } tail := currentSegment >= lastSegment - // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. - // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. + level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment, "lastSegment", lastSegment) if err := w.watch(currentSegment, tail); err != nil && !errors.Is(err, ErrIgnorable) { return err } diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index 871640a7cf..ff006cb817 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -59,46 +59,36 @@ type writeToMock struct { seriesLock sync.Mutex seriesSegmentIndexes map[chunks.HeadSeriesRef]int - // delay reads with a short sleep + // If nonzero, delay reads with a short sleep. delay time.Duration } func (wtm *writeToMock) Append(s []record.RefSample) bool { - if wtm.delay > 0 { - time.Sleep(wtm.delay) - } + time.Sleep(wtm.delay) wtm.samplesAppended += len(s) return true } func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool { - if wtm.delay > 0 { - time.Sleep(wtm.delay) - } + time.Sleep(wtm.delay) wtm.exemplarsAppended += len(e) return true } func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample) bool { - if wtm.delay > 0 { - time.Sleep(wtm.delay) - } + time.Sleep(wtm.delay) wtm.histogramsAppended += len(h) return true } func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSample) bool { - if wtm.delay > 0 { - time.Sleep(wtm.delay) - } + time.Sleep(wtm.delay) wtm.floatHistogramsAppended += len(fh) return true } func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) { - if wtm.delay > 0 { - time.Sleep(wtm.delay) - } + time.Sleep(wtm.delay) wtm.UpdateSeriesSegment(series, index) }