Remote Write: Queue Manager specific metrics shouldn't exist if the queue no longer exists (#5445)

* Unregister remote write queue manager specific metrics when stopping the
queue manager.

* Use DeleteLabelValues instead of Unregister to remove queue and watcher
related metrics when we stop them. Create those metrics in the structs
start functions rather than in their constructors because of the
ordering of creation, start, and stop in remote storage ApplyConfig.

* Add setMetrics function to WAL watcher so we can set
the watchers metrics in it's Start function, but not
have to call Start in some tests (causes data race).

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2019-04-23 01:49:17 -07:00 committed by Brian Brazil
parent b7538e7b49
commit e87449b59d
3 changed files with 60 additions and 25 deletions

View File

@ -189,6 +189,7 @@ type QueueManager struct {
sentBatchDuration prometheus.Observer sentBatchDuration prometheus.Observer
succeededSamplesTotal prometheus.Counter succeededSamplesTotal prometheus.Counter
retriedSamplesTotal prometheus.Counter retriedSamplesTotal prometheus.Counter
shardCapacity prometheus.Gauge
} }
// NewQueueManager builds a new QueueManager. // NewQueueManager builds a new QueueManager.
@ -219,27 +220,11 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, cfg
samplesDropped: newEWMARate(ewmaWeight, shardUpdateDuration), samplesDropped: newEWMARate(ewmaWeight, shardUpdateDuration),
samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration), samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration),
samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration),
highestSentTimestampMetric: &maxGauge{
Gauge: queueHighestSentTimestamp.WithLabelValues(name),
},
pendingSamplesMetric: queuePendingSamples.WithLabelValues(name),
enqueueRetriesMetric: enqueueRetriesTotal.WithLabelValues(name),
droppedSamplesTotal: droppedSamplesTotal.WithLabelValues(name),
numShardsMetric: numShards.WithLabelValues(name),
failedSamplesTotal: failedSamplesTotal.WithLabelValues(name),
sentBatchDuration: sentBatchDuration.WithLabelValues(name),
succeededSamplesTotal: succeededSamplesTotal.WithLabelValues(name),
retriedSamplesTotal: retriedSamplesTotal.WithLabelValues(name),
} }
t.watcher = NewWALWatcher(logger, name, t, walDir) t.watcher = NewWALWatcher(logger, name, t, walDir)
t.shards = t.newShards() t.shards = t.newShards()
// Initialise some metrics.
shardCapacity.WithLabelValues(name).Set(float64(t.cfg.Capacity))
t.pendingSamplesMetric.Set(0)
return t return t
} }
@ -307,6 +292,27 @@ outer:
// Start the queue manager sending samples to the remote storage. // Start the queue manager sending samples to the remote storage.
// Does not block. // Does not block.
func (t *QueueManager) Start() { func (t *QueueManager) Start() {
// Setup the QueueManagers metrics. We do this here rather than in the
// constructor because of the ordering of creating Queue Managers's, stopping them,
// and then starting new ones in storage/remote/storage.go ApplyConfig.
name := t.client.Name()
t.highestSentTimestampMetric = &maxGauge{
Gauge: queueHighestSentTimestamp.WithLabelValues(name),
}
t.pendingSamplesMetric = queuePendingSamples.WithLabelValues(name)
t.enqueueRetriesMetric = enqueueRetriesTotal.WithLabelValues(name)
t.droppedSamplesTotal = droppedSamplesTotal.WithLabelValues(name)
t.numShardsMetric = numShards.WithLabelValues(name)
t.failedSamplesTotal = failedSamplesTotal.WithLabelValues(name)
t.sentBatchDuration = sentBatchDuration.WithLabelValues(name)
t.succeededSamplesTotal = succeededSamplesTotal.WithLabelValues(name)
t.retriedSamplesTotal = retriedSamplesTotal.WithLabelValues(name)
t.shardCapacity = shardCapacity.WithLabelValues(name)
// Initialise some metrics.
t.shardCapacity.Set(float64(t.cfg.Capacity))
t.pendingSamplesMetric.Set(0)
t.shards.start(t.numShards) t.shards.start(t.numShards)
t.watcher.Start() t.watcher.Start()
@ -335,6 +341,18 @@ func (t *QueueManager) Stop() {
for _, labels := range t.seriesLabels { for _, labels := range t.seriesLabels {
release(labels) release(labels)
} }
// Delete metrics so we don't have alerts for queues that are gone.
name := t.client.Name()
queueHighestSentTimestamp.DeleteLabelValues(name)
queuePendingSamples.DeleteLabelValues(name)
enqueueRetriesTotal.DeleteLabelValues(name)
droppedSamplesTotal.DeleteLabelValues(name)
numShards.DeleteLabelValues(name)
failedSamplesTotal.DeleteLabelValues(name)
sentBatchDuration.DeleteLabelValues(name)
succeededSamplesTotal.DeleteLabelValues(name)
retriedSamplesTotal.DeleteLabelValues(name)
shardCapacity.DeleteLabelValues(name)
} }
// StoreSeries keeps track of which series we know about for lookups when sending samples to remote. // StoreSeries keeps track of which series we know about for lookups when sending samples to remote.

View File

@ -128,18 +128,25 @@ func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string
quit: make(chan struct{}), quit: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
recordsReadMetric: watcherRecordsRead.MustCurryWith(prometheus.Labels{queue: name}),
recordDecodeFailsMetric: watcherRecordDecodeFails.WithLabelValues(name),
samplesSentPreTailing: watcherSamplesSentPreTailing.WithLabelValues(name),
currentSegmentMetric: watcherCurrentSegment.WithLabelValues(name),
maxSegment: -1, maxSegment: -1,
} }
} }
func (w *WALWatcher) setMetrics() {
// Setup the WAL Watchers metrics. We do this here rather than in the
// constructor because of the ordering of creating Queue Managers's,
// stopping them, and then starting new ones in storage/remote/storage.go ApplyConfig.
w.recordsReadMetric = watcherRecordsRead.MustCurryWith(prometheus.Labels{queue: w.name})
w.recordDecodeFailsMetric = watcherRecordDecodeFails.WithLabelValues(w.name)
w.samplesSentPreTailing = watcherSamplesSentPreTailing.WithLabelValues(w.name)
w.currentSegmentMetric = watcherCurrentSegment.WithLabelValues(w.name)
}
// Start the WALWatcher. // Start the WALWatcher.
func (w *WALWatcher) Start() { func (w *WALWatcher) Start() {
w.setMetrics()
level.Info(w.logger).Log("msg", "starting WAL watcher", "queue", w.name) level.Info(w.logger).Log("msg", "starting WAL watcher", "queue", w.name)
go w.loop() go w.loop()
} }
@ -147,6 +154,14 @@ func (w *WALWatcher) Start() {
func (w *WALWatcher) Stop() { func (w *WALWatcher) Stop() {
close(w.quit) close(w.quit)
<-w.done <-w.done
// Records read metric has series and samples.
watcherRecordsRead.DeleteLabelValues(w.name, "series")
watcherRecordsRead.DeleteLabelValues(w.name, "samples")
watcherRecordDecodeFails.DeleteLabelValues(w.name)
watcherSamplesSentPreTailing.DeleteLabelValues(w.name)
watcherCurrentSegment.DeleteLabelValues(w.name)
level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name) level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name)
} }

View File

@ -102,8 +102,6 @@ func TestTailSamples(t *testing.T) {
err = os.Mkdir(wdir, 0777) err = os.Mkdir(wdir, 0777)
testutil.Ok(t, err) testutil.Ok(t, err)
// os.Create(wal.SegmentName(wdir, 30))
enc := tsdb.RecordEncoder{} enc := tsdb.RecordEncoder{}
w, err := wal.NewSize(nil, nil, wdir, 128*pageSize) w, err := wal.NewSize(nil, nil, wdir, 128*pageSize)
testutil.Ok(t, err) testutil.Ok(t, err)
@ -139,6 +137,9 @@ func TestTailSamples(t *testing.T) {
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWALWatcher(nil, "", wt, dir) watcher := NewWALWatcher(nil, "", wt, dir)
watcher.startTime = now.UnixNano() watcher.startTime = now.UnixNano()
// Set the Watcher's metrics so they're not nil pointers.
watcher.setMetrics()
for i := first; i <= last; i++ { for i := first; i <= last; i++ {
segment, err := wal.OpenReadSegment(wal.SegmentName(watcher.walDir, i)) segment, err := wal.OpenReadSegment(wal.SegmentName(watcher.walDir, i))
testutil.Ok(t, err) testutil.Ok(t, err)
@ -148,14 +149,12 @@ func TestTailSamples(t *testing.T) {
// Use tail true so we can ensure we got the right number of samples. // Use tail true so we can ensure we got the right number of samples.
watcher.readSegment(reader, i, true) watcher.readSegment(reader, i, true)
} }
go watcher.Start()
expectedSeries := seriesCount expectedSeries := seriesCount
expectedSamples := seriesCount * samplesCount expectedSamples := seriesCount * samplesCount
retry(t, defaultRetryInterval, defaultRetries, func() bool { retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumLabels() >= expectedSeries return wt.checkNumLabels() >= expectedSeries
}) })
watcher.Stop()
testutil.Equals(t, expectedSeries, wt.checkNumLabels()) testutil.Equals(t, expectedSeries, wt.checkNumLabels())
testutil.Equals(t, expectedSamples, wt.samplesAppended) testutil.Equals(t, expectedSamples, wt.samplesAppended)
} }
@ -424,6 +423,9 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
watcher := NewWALWatcher(nil, "", wt, dir) watcher := NewWALWatcher(nil, "", wt, dir)
watcher.maxSegment = -1 watcher.maxSegment = -1
// Set the Watcher's metrics so they're not nil pointers.
watcher.setMetrics()
lastCheckpoint, _, err := tsdb.LastCheckpoint(watcher.walDir) lastCheckpoint, _, err := tsdb.LastCheckpoint(watcher.walDir)
testutil.Ok(t, err) testutil.Ok(t, err)