diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index a4384e03e..932406217 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -271,7 +271,7 @@ type QueueManager struct { } // NewQueueManager builds a new QueueManager. -func NewQueueManager(reg prometheus.Registerer, metrics *queueManagerMetrics, logger log.Logger, walDir string, samplesIn *ewmaRate, cfg config.QueueConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager { +func NewQueueManager(metrics *queueManagerMetrics, watcherMetrics *wal.WatcherMetrics, readerMetrics *wal.LiveReaderMetrics, logger log.Logger, walDir string, samplesIn *ewmaRate, cfg config.QueueConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager { if logger == nil { logger = log.NewNopLogger() } @@ -301,7 +301,7 @@ func NewQueueManager(reg prometheus.Registerer, metrics *queueManagerMetrics, lo metrics: metrics, } - t.watcher = wal.NewWatcher(reg, wal.NewWatcherMetrics(reg), logger, client.Name(), t, walDir) + t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, walDir) t.shards = t.newShards() return t diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index e4917441f..74a7fddbc 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -61,7 +61,7 @@ func TestSampleDelivery(t *testing.T) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil) - m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) // These should be received by the client. @@ -90,7 +90,7 @@ func TestSampleDeliveryTimeout(t *testing.T) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil) - m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -131,7 +131,7 @@ func TestSampleDeliveryOrder(t *testing.T) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil) - m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) m.Start() @@ -150,7 +150,8 @@ func TestShutdown(t *testing.T) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil) - m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) + + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend samples, series := createTimeseries(n, n) m.StoreSeries(series, 0) @@ -188,7 +189,7 @@ func TestSeriesReset(t *testing.T) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil) - m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) for i := 0; i < numSegments; i++ { series := []record.RefSeries{} for j := 0; j < numSeries; j++ { @@ -218,7 +219,7 @@ func TestReshard(t *testing.T) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil) - m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) m.Start() @@ -251,7 +252,7 @@ func TestReshardRaceWithStop(t *testing.T) { go func() { for { metrics := newQueueManagerMetrics(nil) - m = NewQueueManager(nil, metrics, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) + m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m.Start() h.Unlock() h.Lock() @@ -269,7 +270,7 @@ func TestReshardRaceWithStop(t *testing.T) { func TestReleaseNoninternedString(t *testing.T) { metrics := newQueueManagerMetrics(nil) c := NewTestStorageClient() - m := NewQueueManager(nil, metrics, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m.Start() for i := 1; i < 1000; i++ { @@ -316,7 +317,7 @@ func TestCalculateDesiredsShards(t *testing.T) { for _, c := range cases { metrics := newQueueManagerMetrics(nil) client := NewTestStorageClient() - m := NewQueueManager(nil, metrics, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline) m.numShards = c.startingShards m.samplesIn.incr(c.samplesIn) m.samplesOut.incr(c.samplesOut) @@ -527,7 +528,7 @@ func BenchmarkSampleDelivery(b *testing.B) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil) - m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m.StoreSeries(series, 0) // These should be received by the client. @@ -569,7 +570,7 @@ func BenchmarkStartup(b *testing.B) { for n := 0; n < b.N; n++ { metrics := newQueueManagerMetrics(nil) c := NewTestBlockedStorageClient() - m := NewQueueManager(nil, metrics, logger, dir, + m := NewQueueManager(metrics, nil, nil, logger, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, 1*time.Minute) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) @@ -620,7 +621,7 @@ func TestCalculateDesiredShards(t *testing.T) { metrics := newQueueManagerMetrics(nil) samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) - m := NewQueueManager(nil, metrics, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline) + m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline) // Need to start the queue manager so the proper metrics are initialized. // However we can stop it right away since we don't need to do any actual diff --git a/storage/remote/write.go b/storage/remote/write.go index 665eb2b07..5d77ec751 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/wal" ) var ( @@ -46,11 +47,12 @@ var ( // WriteStorage represents all the remote write storage. type WriteStorage struct { - reg prometheus.Registerer logger log.Logger mtx sync.Mutex queueMetrics *queueManagerMetrics + watcherMetrics *wal.WatcherMetrics + liveReaderMetrics *wal.LiveReaderMetrics configHash string externalLabelHash string walDir string @@ -65,13 +67,14 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string logger = log.NewNopLogger() } rws := &WriteStorage{ - queues: make(map[string]*QueueManager), - reg: reg, - queueMetrics: newQueueManagerMetrics(reg), - logger: logger, - flushDeadline: flushDeadline, - samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), - walDir: walDir, + queues: make(map[string]*QueueManager), + queueMetrics: newQueueManagerMetrics(reg), + watcherMetrics: wal.NewWatcherMetrics(reg), + liveReaderMetrics: wal.NewLiveReaderMetrics(reg), + logger: logger, + flushDeadline: flushDeadline, + samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), + walDir: walDir, } go rws.run() return rws @@ -152,8 +155,9 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { return err } newQueues[hash] = NewQueueManager( - rws.reg, rws.queueMetrics, + rws.watcherMetrics, + rws.liveReaderMetrics, rws.logger, rws.walDir, rws.samplesIn, diff --git a/tsdb/wal/live_reader.go b/tsdb/wal/live_reader.go index 446e85994..7124f6408 100644 --- a/tsdb/wal/live_reader.go +++ b/tsdb/wal/live_reader.go @@ -28,14 +28,14 @@ import ( ) // liveReaderMetrics holds all metrics exposed by the LiveReader. -type liveReaderMetrics struct { +type LiveReaderMetrics struct { readerCorruptionErrors *prometheus.CounterVec } // NewLiveReaderMetrics instantiates, registers and returns metrics to be injected // at LiveReader instantiation. -func NewLiveReaderMetrics(reg prometheus.Registerer) *liveReaderMetrics { - m := &liveReaderMetrics{ +func NewLiveReaderMetrics(reg prometheus.Registerer) *LiveReaderMetrics { + m := &LiveReaderMetrics{ readerCorruptionErrors: prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "prometheus_tsdb_wal_reader_corruption_errors_total", Help: "Errors encountered when reading the WAL.", @@ -43,15 +43,14 @@ func NewLiveReaderMetrics(reg prometheus.Registerer) *liveReaderMetrics { } if reg != nil { - // TODO(codesome): log error. - _ = reg.Register(m.readerCorruptionErrors) + reg.MustRegister(m.readerCorruptionErrors) } return m } // NewLiveReader returns a new live reader. -func NewLiveReader(logger log.Logger, metrics *liveReaderMetrics, r io.Reader) *LiveReader { +func NewLiveReader(logger log.Logger, metrics *LiveReaderMetrics, r io.Reader) *LiveReader { lr := &LiveReader{ logger: logger, rdr: r, @@ -89,7 +88,7 @@ type LiveReader struct { // NB the non-ive Reader implementation allows for this. permissive bool - metrics *liveReaderMetrics + metrics *LiveReaderMetrics } // Err returns any errors encountered reading the WAL. io.EOFs are not terminal diff --git a/tsdb/wal/watcher.go b/tsdb/wal/watcher.go index f92386f0d..4a9e7f455 100644 --- a/tsdb/wal/watcher.go +++ b/tsdb/wal/watcher.go @@ -66,7 +66,7 @@ type Watcher struct { walDir string lastCheckpoint string metrics *WatcherMetrics - readerMetrics *liveReaderMetrics + readerMetrics *LiveReaderMetrics startTime time.Time startTimestamp int64 // the start time as a Prometheus timestamp @@ -125,17 +125,17 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { } if reg != nil { - _ = reg.Register(m.recordsRead) - _ = reg.Register(m.recordDecodeFails) - _ = reg.Register(m.samplesSentPreTailing) - _ = reg.Register(m.currentSegment) + reg.MustRegister(m.recordsRead) + reg.MustRegister(m.recordDecodeFails) + reg.MustRegister(m.samplesSentPreTailing) + reg.MustRegister(m.currentSegment) } return m } // NewWatcher creates a new WAL watcher for a given WriteTo. -func NewWatcher(reg prometheus.Registerer, metrics *WatcherMetrics, logger log.Logger, name string, writer WriteTo, walDir string) *Watcher { +func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, walDir string) *Watcher { if logger == nil { logger = log.NewNopLogger() } @@ -143,7 +143,7 @@ func NewWatcher(reg prometheus.Registerer, metrics *WatcherMetrics, logger log.L logger: logger, writer: writer, metrics: metrics, - readerMetrics: NewLiveReaderMetrics(reg), + readerMetrics: readerMetrics, walDir: path.Join(walDir, "wal"), name: name, quit: make(chan struct{}), @@ -179,11 +179,13 @@ func (w *Watcher) Stop() { <-w.done // Records read metric has series and samples. - w.metrics.recordsRead.DeleteLabelValues(w.name, "series") - w.metrics.recordsRead.DeleteLabelValues(w.name, "samples") - w.metrics.recordDecodeFails.DeleteLabelValues(w.name) - w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name) - w.metrics.currentSegment.DeleteLabelValues(w.name) + if w.metrics != nil { + w.metrics.recordsRead.DeleteLabelValues(w.name, "series") + w.metrics.recordsRead.DeleteLabelValues(w.name, "samples") + w.metrics.recordDecodeFails.DeleteLabelValues(w.name) + w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name) + w.metrics.currentSegment.DeleteLabelValues(w.name) + } level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name) } diff --git a/tsdb/wal/watcher_test.go b/tsdb/wal/watcher_test.go index db8e3e89f..482d96551 100644 --- a/tsdb/wal/watcher_test.go +++ b/tsdb/wal/watcher_test.go @@ -138,7 +138,7 @@ func TestTailSamples(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) watcher.SetStartTime(now) // Set the Watcher's metrics so they're not nil pointers. @@ -148,7 +148,7 @@ func TestTailSamples(t *testing.T) { testutil.Ok(t, err) defer segment.Close() - reader := NewLiveReader(nil, NewLiveReaderMetrics(prometheus.DefaultRegisterer), segment) + reader := NewLiveReader(nil, NewLiveReaderMetrics(nil), segment) // Use tail true so we can ensure we got the right number of samples. watcher.readSegment(reader, i, true) } @@ -217,7 +217,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) go watcher.Start() expected := seriesCount @@ -303,7 +303,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { _, _, err = w.Segments() testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) go watcher.Start() expected := seriesCount * 2 @@ -368,7 +368,7 @@ func TestReadCheckpoint(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) go watcher.Start() expectedSeries := seriesCount @@ -439,7 +439,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { } wt := newWriteToMock() - watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) watcher.MaxSegment = -1 // Set the Watcher's metrics so they're not nil pointers. @@ -510,7 +510,7 @@ func TestCheckpointSeriesReset(t *testing.T) { testutil.Ok(t, err) wt := newWriteToMock() - watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) watcher.MaxSegment = -1 go watcher.Start()