From 8df4bca470a92a83a478b5e4cf319ff3a22eee24 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Fri, 13 Sep 2019 11:23:58 -0600 Subject: [PATCH] Garbage collect asynchronously in the WAL Watcher The WAL Watcher replays a checkpoint after it is created in order to garbage collect series that no longer exist in the WAL. Currently the garbage collection process is done serially with reading from the tip of the WAL which can cause large delays in writing samples to remote storage just after compaction occurs. This also fixes a memory leak where dropped series are not cleaned up as part of the SeriesReset process. Signed-off-by: Chris Marchbanks --- storage/remote/queue_manager.go | 11 +++++++++++ tsdb/wal/watcher.go | 24 ++++++++++++++++++++---- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 5aad119ae..8d426cace 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -194,6 +194,7 @@ type QueueManager struct { client StorageClient watcher *wal.Watcher + seriesMtx sync.Mutex seriesLabels map[uint64]labels.Labels seriesSegmentIndexes map[uint64]int droppedSeries map[uint64]struct{} @@ -264,6 +265,7 @@ func NewQueueManager(reg prometheus.Registerer, logger log.Logger, walDir string func (t *QueueManager) Append(samples []record.RefSample) bool { outer: for _, s := range samples { + t.seriesMtx.Lock() lbls, ok := t.seriesLabels[s.Ref] if !ok { t.droppedSamplesTotal.Inc() @@ -271,8 +273,10 @@ outer: if _, ok := t.droppedSeries[s.Ref]; !ok { level.Info(t.logger).Log("msg", "dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref) } + t.seriesMtx.Unlock() continue } + t.seriesMtx.Unlock() // This will only loop if the queues are being resharded. backoff := t.cfg.MinBackoff for { @@ -356,9 +360,11 @@ func (t *QueueManager) Stop() { t.watcher.Stop() // On shutdown, release the strings in the labels from the intern pool. + t.seriesMtx.Lock() for _, labels := range t.seriesLabels { releaseLabels(labels) } + t.seriesMtx.Unlock() // Delete metrics so we don't have alerts for queues that are gone. name := t.client.Name() queueHighestSentTimestamp.DeleteLabelValues(name) @@ -378,6 +384,8 @@ func (t *QueueManager) Stop() { // StoreSeries keeps track of which series we know about for lookups when sending samples to remote. func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) { + t.seriesMtx.Lock() + defer t.seriesMtx.Unlock() for _, s := range series { ls := processExternalLabels(s.Labels, t.externalLabels) lbls := relabel.Process(ls, t.relabelConfigs...) @@ -402,6 +410,8 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) { // stored series records with the checkpoints index number, so we can now // delete any ref ID's lower than that # from the two maps. func (t *QueueManager) SeriesReset(index int) { + t.seriesMtx.Lock() + defer t.seriesMtx.Unlock() // Check for series that are in segments older than the checkpoint // that were not also present in the checkpoint. for k, v := range t.seriesSegmentIndexes { @@ -409,6 +419,7 @@ func (t *QueueManager) SeriesReset(index int) { delete(t.seriesSegmentIndexes, k) releaseLabels(t.seriesLabels[k]) delete(t.seriesLabels, k) + delete(t.droppedSeries, k) } } } diff --git a/tsdb/wal/watcher.go b/tsdb/wal/watcher.go index 2167e3324..11c9bfddc 100644 --- a/tsdb/wal/watcher.go +++ b/tsdb/wal/watcher.go @@ -41,10 +41,13 @@ const ( ) // WriteTo is an interface used by the Watcher to send the samples it's read -// from the WAL on to somewhere else. +// from the WAL on to somewhere else. Functions will be called concurrently +// and it is left to the implementer to make sure they are safe. type WriteTo interface { Append([]record.RefSample) bool StoreSeries([]record.RefSeries, int) + // SeriesReset is called after reading a checkpoint to allow the deletion + // of all series created in a segment lower than the argument. SeriesReset(int) } @@ -337,6 +340,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { } } + gcSem := make(chan struct{}, 1) for { select { case <-w.quit: @@ -345,9 +349,21 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { case <-checkpointTicker.C: // Periodically check if there is a new checkpoint so we can garbage // collect labels. As this is considered an optimisation, we ignore - // errors during checkpoint processing. - if err := w.garbageCollectSeries(segmentNum); err != nil { - level.Warn(w.logger).Log("msg", "error process checkpoint", "err", err) + // errors during checkpoint processing. Doing the process asynchronously + // allows the current WAL segment to be processed while reading the + // checkpoint. + select { + case gcSem <- struct{}{}: + go func() { + defer func() { + <-gcSem + }() + if err := w.garbageCollectSeries(segmentNum); err != nil { + level.Warn(w.logger).Log("msg", "error process checkpoint", "err", err) + } + }() + default: + // Currently doing a garbage collect, try again later. } case <-segmentTicker.C: