diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index e52780925..4e4e18975 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -35,6 +35,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/util/testutil" @@ -531,7 +532,7 @@ func BenchmarkStartup(b *testing.B) { m := NewQueueManager(nil, logger, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, 1*time.Minute) - m.watcher.StartTime = math.MaxInt64 + m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.MaxSegment = segments[len(segments)-2] err := m.watcher.Run() testutil.Ok(b, err) diff --git a/tsdb/wal/watcher.go b/tsdb/wal/watcher.go index 11c9bfddc..f92386f0d 100644 --- a/tsdb/wal/watcher.go +++ b/tsdb/wal/watcher.go @@ -68,7 +68,9 @@ type Watcher struct { metrics *WatcherMetrics readerMetrics *liveReaderMetrics - StartTime int64 + startTime time.Time + startTimestamp int64 // the start time as a Prometheus timestamp + sendSamples bool recordsReadMetric *prometheus.CounterVec recordDecodeFailsMetric prometheus.Counter @@ -191,7 +193,7 @@ func (w *Watcher) loop() { // We may encounter failures processing the WAL; we should wait and retry. for !isClosed(w.quit) { - w.StartTime = timestamp.FromTime(time.Now()) + w.SetStartTime(time.Now()) if err := w.Run(); err != nil { level.Error(w.logger).Log("msg", "error tailing WAL", "err", err) } @@ -212,6 +214,12 @@ func (w *Watcher) Run() error { return errors.Wrap(err, "wal.Segments") } + // We want to ensure this is false across iterations since + // Run will be called again if there was a failure to read the WAL. + w.sendSamples = false + + level.Info(w.logger).Log("msg", "replaying WAL", "queue", w.name) + // Backfill from the checkpoint first if it exists. lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir) if err != nil && err != record.ErrNotFound { @@ -456,7 +464,6 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { samples []record.RefSample send []record.RefSample ) - for r.Next() && !isClosed(w.quit) { rec := r.Record() w.recordsReadMetric.WithLabelValues(recordType(dec.Type(rec))).Inc() @@ -482,7 +489,12 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { return err } for _, s := range samples { - if s.T > w.StartTime { + if s.T > w.startTimestamp { + if !w.sendSamples { + w.sendSamples = true + duration := time.Since(w.startTime) + level.Info(w.logger).Log("msg", "done replaying WAL", "duration", duration) + } send = append(send, s) } } @@ -505,6 +517,11 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { return r.Err() } +func (w *Watcher) SetStartTime(t time.Time) { + w.startTime = t + w.startTimestamp = timestamp.FromTime(t) +} + func recordType(rt record.Type) string { switch rt { case record.Invalid: diff --git a/tsdb/wal/watcher_test.go b/tsdb/wal/watcher_test.go index 9cd9286a2..8378e997d 100644 --- a/tsdb/wal/watcher_test.go +++ b/tsdb/wal/watcher_test.go @@ -139,7 +139,7 @@ func TestTailSamples(t *testing.T) { wt := newWriteToMock() watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) - watcher.StartTime = now.UnixNano() + watcher.SetStartTime(now) // Set the Watcher's metrics so they're not nil pointers. watcher.setMetrics()