diff --git a/head.go b/head.go index e0fd3c076..37126363d 100644 --- a/head.go +++ b/head.go @@ -239,16 +239,20 @@ func (h *Head) ReadWAL() error { // They are connected through a ring of channels which ensures that all sample batches // read from the WAL are processed in order. var ( + wg sync.WaitGroup n = runtime.GOMAXPROCS(0) firstInput = make(chan []RefSample, 300) input = firstInput ) + wg.Add(n) + for i := 0; i < n; i++ { output := make(chan []RefSample, 300) go func(i int, input <-chan []RefSample, output chan<- []RefSample) { unknown := h.processWALSamples(mint, uint64(i), uint64(n), input, output) atomic.AddUint64(&unknownRefs, unknown) + wg.Done() }(i, input, output) // The output feeds the next worker goroutine. For the last worker, @@ -294,6 +298,8 @@ func (h *Head) ReadWAL() error { close(firstInput) for range input { } + wg.Wait() + if err != nil { return errors.Wrap(err, "consume WAL") } @@ -756,10 +762,11 @@ func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { s.Lock() c := s.chunk(int(cid)) + mint, maxt := c.minTime, c.maxTime s.Unlock() // Do not expose chunks that are outside of the specified range. - if c == nil || !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) { + if c == nil || !intervalOverlap(mint, maxt, h.mint, h.maxt) { return nil, ErrNotFound } return &safeChunk{