diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 5aad119ae..8d426cace 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -194,6 +194,7 @@ type QueueManager struct { client StorageClient watcher *wal.Watcher + seriesMtx sync.Mutex seriesLabels map[uint64]labels.Labels seriesSegmentIndexes map[uint64]int droppedSeries map[uint64]struct{} @@ -264,6 +265,7 @@ func NewQueueManager(reg prometheus.Registerer, logger log.Logger, walDir string func (t *QueueManager) Append(samples []record.RefSample) bool { outer: for _, s := range samples { + t.seriesMtx.Lock() lbls, ok := t.seriesLabels[s.Ref] if !ok { t.droppedSamplesTotal.Inc() @@ -271,8 +273,10 @@ outer: if _, ok := t.droppedSeries[s.Ref]; !ok { level.Info(t.logger).Log("msg", "dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref) } + t.seriesMtx.Unlock() continue } + t.seriesMtx.Unlock() // This will only loop if the queues are being resharded. backoff := t.cfg.MinBackoff for { @@ -356,9 +360,11 @@ func (t *QueueManager) Stop() { t.watcher.Stop() // On shutdown, release the strings in the labels from the intern pool. + t.seriesMtx.Lock() for _, labels := range t.seriesLabels { releaseLabels(labels) } + t.seriesMtx.Unlock() // Delete metrics so we don't have alerts for queues that are gone. name := t.client.Name() queueHighestSentTimestamp.DeleteLabelValues(name) @@ -378,6 +384,8 @@ func (t *QueueManager) Stop() { // StoreSeries keeps track of which series we know about for lookups when sending samples to remote. func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) { + t.seriesMtx.Lock() + defer t.seriesMtx.Unlock() for _, s := range series { ls := processExternalLabels(s.Labels, t.externalLabels) lbls := relabel.Process(ls, t.relabelConfigs...) @@ -402,6 +410,8 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) { // stored series records with the checkpoints index number, so we can now // delete any ref ID's lower than that # from the two maps. func (t *QueueManager) SeriesReset(index int) { + t.seriesMtx.Lock() + defer t.seriesMtx.Unlock() // Check for series that are in segments older than the checkpoint // that were not also present in the checkpoint. for k, v := range t.seriesSegmentIndexes { @@ -409,6 +419,7 @@ func (t *QueueManager) SeriesReset(index int) { delete(t.seriesSegmentIndexes, k) releaseLabels(t.seriesLabels[k]) delete(t.seriesLabels, k) + delete(t.droppedSeries, k) } } } diff --git a/tsdb/wal/watcher.go b/tsdb/wal/watcher.go index 2167e3324..11c9bfddc 100644 --- a/tsdb/wal/watcher.go +++ b/tsdb/wal/watcher.go @@ -41,10 +41,13 @@ const ( ) // WriteTo is an interface used by the Watcher to send the samples it's read -// from the WAL on to somewhere else. +// from the WAL on to somewhere else. Functions will be called concurrently +// and it is left to the implementer to make sure they are safe. type WriteTo interface { Append([]record.RefSample) bool StoreSeries([]record.RefSeries, int) + // SeriesReset is called after reading a checkpoint to allow the deletion + // of all series created in a segment lower than the argument. SeriesReset(int) } @@ -337,6 +340,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { } } + gcSem := make(chan struct{}, 1) for { select { case <-w.quit: @@ -345,9 +349,21 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { case <-checkpointTicker.C: // Periodically check if there is a new checkpoint so we can garbage // collect labels. As this is considered an optimisation, we ignore - // errors during checkpoint processing. - if err := w.garbageCollectSeries(segmentNum); err != nil { - level.Warn(w.logger).Log("msg", "error process checkpoint", "err", err) + // errors during checkpoint processing. Doing the process asynchronously + // allows the current WAL segment to be processed while reading the + // checkpoint. + select { + case gcSem <- struct{}{}: + go func() { + defer func() { + <-gcSem + }() + if err := w.garbageCollectSeries(segmentNum); err != nil { + level.Warn(w.logger).Log("msg", "error process checkpoint", "err", err) + } + }() + default: + // Currently doing a garbage collect, try again later. } case <-segmentTicker.C: