WBL loading: don't send empty buffers over chan (#12808)

Signed-off-by: Shirley Leu <4163034+fridgepoet@users.noreply.github.com>
Co-authored-by: Fiona Liao <fiona.y.liao@gmail.com>
This commit is contained in:
Shirley 2023-09-12 16:26:02 +02:00 committed by GitHub
parent 6daee89e5f
commit d3a1044354
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 7 additions and 3 deletions

View File

@ -749,8 +749,10 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
m = len(samples) m = len(samples)
} }
for i := 0; i < concurrency; i++ { for i := 0; i < concurrency; i++ {
if shards[i] == nil {
shards[i] = processors[i].reuseBuf() shards[i] = processors[i].reuseBuf()
} }
}
for _, sam := range samples[:m] { for _, sam := range samples[:m] {
if r, ok := multiRef[sam.Ref]; ok { if r, ok := multiRef[sam.Ref]; ok {
sam.Ref = r sam.Ref = r
@ -759,7 +761,10 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
shards[mod] = append(shards[mod], sam) shards[mod] = append(shards[mod], sam)
} }
for i := 0; i < concurrency; i++ { for i := 0; i < concurrency; i++ {
if len(shards[i]) > 0 {
processors[i].input <- shards[i] processors[i].input <- shards[i]
shards[i] = nil
}
} }
samples = samples[m:] samples = samples[m:]
} }
@ -881,7 +886,6 @@ func (wp *wblSubsetProcessor) reuseBuf() []record.RefSample {
// processWBLSamples adds the samples it receives to the head and passes // processWBLSamples adds the samples it receives to the head and passes
// the buffer received to an output channel for reuse. // the buffer received to an output channel for reuse.
// Samples before the minValidTime timestamp are discarded.
func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) { func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) {
defer close(wp.output) defer close(wp.output)