Garbage collect asynchronously in the WAL Watcher
The WAL Watcher replays a checkpoint after it is created in order to garbage collect series that no longer exist in the WAL. Currently the garbage collection process is done serially with reading from the tip of the WAL which can cause large delays in writing samples to remote storage just after compaction occurs. This also fixes a memory leak where dropped series are not cleaned up as part of the SeriesReset process. Signed-off-by: Chris Marchbanks <csmarchbanks@gmail.com>
This commit is contained in:
parent
895abbb7d0
commit
8df4bca470
|
@ -194,6 +194,7 @@ type QueueManager struct {
|
||||||
client StorageClient
|
client StorageClient
|
||||||
watcher *wal.Watcher
|
watcher *wal.Watcher
|
||||||
|
|
||||||
|
seriesMtx sync.Mutex
|
||||||
seriesLabels map[uint64]labels.Labels
|
seriesLabels map[uint64]labels.Labels
|
||||||
seriesSegmentIndexes map[uint64]int
|
seriesSegmentIndexes map[uint64]int
|
||||||
droppedSeries map[uint64]struct{}
|
droppedSeries map[uint64]struct{}
|
||||||
|
@ -264,6 +265,7 @@ func NewQueueManager(reg prometheus.Registerer, logger log.Logger, walDir string
|
||||||
func (t *QueueManager) Append(samples []record.RefSample) bool {
|
func (t *QueueManager) Append(samples []record.RefSample) bool {
|
||||||
outer:
|
outer:
|
||||||
for _, s := range samples {
|
for _, s := range samples {
|
||||||
|
t.seriesMtx.Lock()
|
||||||
lbls, ok := t.seriesLabels[s.Ref]
|
lbls, ok := t.seriesLabels[s.Ref]
|
||||||
if !ok {
|
if !ok {
|
||||||
t.droppedSamplesTotal.Inc()
|
t.droppedSamplesTotal.Inc()
|
||||||
|
@ -271,8 +273,10 @@ outer:
|
||||||
if _, ok := t.droppedSeries[s.Ref]; !ok {
|
if _, ok := t.droppedSeries[s.Ref]; !ok {
|
||||||
level.Info(t.logger).Log("msg", "dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref)
|
level.Info(t.logger).Log("msg", "dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref)
|
||||||
}
|
}
|
||||||
|
t.seriesMtx.Unlock()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
t.seriesMtx.Unlock()
|
||||||
// This will only loop if the queues are being resharded.
|
// This will only loop if the queues are being resharded.
|
||||||
backoff := t.cfg.MinBackoff
|
backoff := t.cfg.MinBackoff
|
||||||
for {
|
for {
|
||||||
|
@ -356,9 +360,11 @@ func (t *QueueManager) Stop() {
|
||||||
t.watcher.Stop()
|
t.watcher.Stop()
|
||||||
|
|
||||||
// On shutdown, release the strings in the labels from the intern pool.
|
// On shutdown, release the strings in the labels from the intern pool.
|
||||||
|
t.seriesMtx.Lock()
|
||||||
for _, labels := range t.seriesLabels {
|
for _, labels := range t.seriesLabels {
|
||||||
releaseLabels(labels)
|
releaseLabels(labels)
|
||||||
}
|
}
|
||||||
|
t.seriesMtx.Unlock()
|
||||||
// Delete metrics so we don't have alerts for queues that are gone.
|
// Delete metrics so we don't have alerts for queues that are gone.
|
||||||
name := t.client.Name()
|
name := t.client.Name()
|
||||||
queueHighestSentTimestamp.DeleteLabelValues(name)
|
queueHighestSentTimestamp.DeleteLabelValues(name)
|
||||||
|
@ -378,6 +384,8 @@ func (t *QueueManager) Stop() {
|
||||||
|
|
||||||
// StoreSeries keeps track of which series we know about for lookups when sending samples to remote.
|
// StoreSeries keeps track of which series we know about for lookups when sending samples to remote.
|
||||||
func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) {
|
func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) {
|
||||||
|
t.seriesMtx.Lock()
|
||||||
|
defer t.seriesMtx.Unlock()
|
||||||
for _, s := range series {
|
for _, s := range series {
|
||||||
ls := processExternalLabels(s.Labels, t.externalLabels)
|
ls := processExternalLabels(s.Labels, t.externalLabels)
|
||||||
lbls := relabel.Process(ls, t.relabelConfigs...)
|
lbls := relabel.Process(ls, t.relabelConfigs...)
|
||||||
|
@ -402,6 +410,8 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) {
|
||||||
// stored series records with the checkpoints index number, so we can now
|
// stored series records with the checkpoints index number, so we can now
|
||||||
// delete any ref ID's lower than that # from the two maps.
|
// delete any ref ID's lower than that # from the two maps.
|
||||||
func (t *QueueManager) SeriesReset(index int) {
|
func (t *QueueManager) SeriesReset(index int) {
|
||||||
|
t.seriesMtx.Lock()
|
||||||
|
defer t.seriesMtx.Unlock()
|
||||||
// Check for series that are in segments older than the checkpoint
|
// Check for series that are in segments older than the checkpoint
|
||||||
// that were not also present in the checkpoint.
|
// that were not also present in the checkpoint.
|
||||||
for k, v := range t.seriesSegmentIndexes {
|
for k, v := range t.seriesSegmentIndexes {
|
||||||
|
@ -409,6 +419,7 @@ func (t *QueueManager) SeriesReset(index int) {
|
||||||
delete(t.seriesSegmentIndexes, k)
|
delete(t.seriesSegmentIndexes, k)
|
||||||
releaseLabels(t.seriesLabels[k])
|
releaseLabels(t.seriesLabels[k])
|
||||||
delete(t.seriesLabels, k)
|
delete(t.seriesLabels, k)
|
||||||
|
delete(t.droppedSeries, k)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,10 +41,13 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
// WriteTo is an interface used by the Watcher to send the samples it's read
|
// WriteTo is an interface used by the Watcher to send the samples it's read
|
||||||
// from the WAL on to somewhere else.
|
// from the WAL on to somewhere else. Functions will be called concurrently
|
||||||
|
// and it is left to the implementer to make sure they are safe.
|
||||||
type WriteTo interface {
|
type WriteTo interface {
|
||||||
Append([]record.RefSample) bool
|
Append([]record.RefSample) bool
|
||||||
StoreSeries([]record.RefSeries, int)
|
StoreSeries([]record.RefSeries, int)
|
||||||
|
// SeriesReset is called after reading a checkpoint to allow the deletion
|
||||||
|
// of all series created in a segment lower than the argument.
|
||||||
SeriesReset(int)
|
SeriesReset(int)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -337,6 +340,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
gcSem := make(chan struct{}, 1)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-w.quit:
|
case <-w.quit:
|
||||||
|
@ -345,9 +349,21 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
|
||||||
case <-checkpointTicker.C:
|
case <-checkpointTicker.C:
|
||||||
// Periodically check if there is a new checkpoint so we can garbage
|
// Periodically check if there is a new checkpoint so we can garbage
|
||||||
// collect labels. As this is considered an optimisation, we ignore
|
// collect labels. As this is considered an optimisation, we ignore
|
||||||
// errors during checkpoint processing.
|
// errors during checkpoint processing. Doing the process asynchronously
|
||||||
if err := w.garbageCollectSeries(segmentNum); err != nil {
|
// allows the current WAL segment to be processed while reading the
|
||||||
level.Warn(w.logger).Log("msg", "error process checkpoint", "err", err)
|
// checkpoint.
|
||||||
|
select {
|
||||||
|
case gcSem <- struct{}{}:
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
<-gcSem
|
||||||
|
}()
|
||||||
|
if err := w.garbageCollectSeries(segmentNum); err != nil {
|
||||||
|
level.Warn(w.logger).Log("msg", "error process checkpoint", "err", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
default:
|
||||||
|
// Currently doing a garbage collect, try again later.
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-segmentTicker.C:
|
case <-segmentTicker.C:
|
||||||
|
|
Loading…
Reference in New Issue