From d3a104435448b0a1d5dc7cf6fdb133880ff571ac Mon Sep 17 00:00:00 2001 From: Shirley <4163034+fridgepoet@users.noreply.github.com> Date: Tue, 12 Sep 2023 16:26:02 +0200 Subject: [PATCH] WBL loading: don't send empty buffers over chan (#12808) Signed-off-by: Shirley Leu <4163034+fridgepoet@users.noreply.github.com> Co-authored-by: Fiona Liao --- tsdb/head_wal.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index bfd49b47a..56e9884d4 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -749,7 +749,9 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks. m = len(samples) } for i := 0; i < concurrency; i++ { - shards[i] = processors[i].reuseBuf() + if shards[i] == nil { + shards[i] = processors[i].reuseBuf() + } } for _, sam := range samples[:m] { if r, ok := multiRef[sam.Ref]; ok { @@ -759,7 +761,10 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks. shards[mod] = append(shards[mod], sam) } for i := 0; i < concurrency; i++ { - processors[i].input <- shards[i] + if len(shards[i]) > 0 { + processors[i].input <- shards[i] + shards[i] = nil + } } samples = samples[m:] } @@ -881,7 +886,6 @@ func (wp *wblSubsetProcessor) reuseBuf() []record.RefSample { // processWBLSamples adds the samples it receives to the head and passes // the buffer received to an output channel for reuse. -// Samples before the minValidTime timestamp are discarded. func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) { defer close(wp.output)