From c7e7fd355e524e4212851000f1673b853fb0f3c2 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Wed, 31 Oct 2018 22:52:26 +0000 Subject: [PATCH] Only send WAL read workers the samples they need. Calculating the modulus in each worker was a hotspot, and meant that you had more work to do the more cores you had. This cuts CPU usage (on my 8 core, 4 real core machine) by 33%, and walltime by 3% Signed-off-by: Brian Brazil --- head.go | 60 ++++++++++++++++++++++++++++++++------------------------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/head.go b/head.go index 690489b83..b5cc00ffb 100644 --- a/head.go +++ b/head.go @@ -237,7 +237,6 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int // Samples before the mint timestamp are discarded. func (h *Head) processWALSamples( minValidTime int64, - partition, total uint64, input <-chan []RefSample, output chan<- []RefSample, ) (unknownRefs uint64) { defer close(output) @@ -249,7 +248,7 @@ func (h *Head) processWALSamples( for samples := range input { for _, s := range samples { - if s.T < minValidTime || s.Ref%total != partition { + if s.T < minValidTime { continue } ms := refSeries[s.Ref] @@ -317,25 +316,22 @@ func (h *Head) loadWAL(r *wal.Reader) error { // They are connected through a ring of channels which ensures that all sample batches // read from the WAL are processed in order. var ( - wg sync.WaitGroup - n = runtime.GOMAXPROCS(0) - firstInput = make(chan []RefSample, 300) - input = firstInput + wg sync.WaitGroup + n = runtime.GOMAXPROCS(0) + inputs = make([]chan []RefSample, n) + outputs = make([]chan []RefSample, n) ) wg.Add(n) for i := 0; i < n; i++ { - output := make(chan []RefSample, 300) + outputs[i] = make(chan []RefSample, 300) + inputs[i] = make(chan []RefSample, 300) - go func(i int, input <-chan []RefSample, output chan<- []RefSample) { - unknown := h.processWALSamples(minValidTime, uint64(i), uint64(n), input, output) + go func(input <-chan []RefSample, output chan<- []RefSample) { + unknown := h.processWALSamples(minValidTime, input, output) atomic.AddUint64(&unknownRefs, unknown) wg.Done() - }(i, input, output) - - // The output feeds the next worker goroutine. For the last worker, - // it feeds the initial input again to reuse the RefSample slices. - input = output + }(inputs[i], outputs[i]) } var ( @@ -373,17 +369,27 @@ func (h *Head) loadWAL(r *wal.Reader) error { // cause thousands of very large in flight buffers occupying large amounts // of unused memory. for len(samples) > 0 { - n := 5000 - if len(samples) < n { - n = len(samples) + m := 5000 + if len(samples) < m { + m = len(samples) } - var buf []RefSample - select { - case buf = <-input: - default: + shards := make([][]RefSample, n) + for i := 0; i < n; i++ { + var buf []RefSample + select { + case buf = <-outputs[i]: + default: + } + shards[i] = buf[:0] } - firstInput <- append(buf[:0], samples[:n]...) - samples = samples[n:] + for _, sam := range samples[:m] { + mod := sam.Ref % uint64(n) + shards[mod] = append(shards[mod], sam) + } + for i := 0; i < n; i++ { + inputs[i] <- shards[i] + } + samples = samples[m:] } samples = s // Keep whole slice for reuse. case RecordTombstones: @@ -407,9 +413,11 @@ func (h *Head) loadWAL(r *wal.Reader) error { return errors.Wrap(r.Err(), "read records") } - // Signal termination to first worker and wait for last one to close its output channel. - close(firstInput) - for range input { + // Signal termination to each worker and wait for it to close its output channel. + for i := 0; i < n; i++ { + close(inputs[i]) + for range outputs[i] { + } } wg.Wait()