Only send WAL read workers the samples they need.

Calculating the modulus in each worker was a hotspot,
and meant that you had more work to do the more cores you had.
This cuts CPU usage (on my 8 core, 4 real core machine) by
33%, and walltime by 3%

Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
Brian Brazil 2018-10-31 22:52:26 +00:00
parent a64b0d51c4
commit c7e7fd355e
1 changed files with 34 additions and 26 deletions

50
head.go
View File

@ -237,7 +237,6 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
// Samples before the mint timestamp are discarded. // Samples before the mint timestamp are discarded.
func (h *Head) processWALSamples( func (h *Head) processWALSamples(
minValidTime int64, minValidTime int64,
partition, total uint64,
input <-chan []RefSample, output chan<- []RefSample, input <-chan []RefSample, output chan<- []RefSample,
) (unknownRefs uint64) { ) (unknownRefs uint64) {
defer close(output) defer close(output)
@ -249,7 +248,7 @@ func (h *Head) processWALSamples(
for samples := range input { for samples := range input {
for _, s := range samples { for _, s := range samples {
if s.T < minValidTime || s.Ref%total != partition { if s.T < minValidTime {
continue continue
} }
ms := refSeries[s.Ref] ms := refSeries[s.Ref]
@ -319,23 +318,20 @@ func (h *Head) loadWAL(r *wal.Reader) error {
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
n = runtime.GOMAXPROCS(0) n = runtime.GOMAXPROCS(0)
firstInput = make(chan []RefSample, 300) inputs = make([]chan []RefSample, n)
input = firstInput outputs = make([]chan []RefSample, n)
) )
wg.Add(n) wg.Add(n)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
output := make(chan []RefSample, 300) outputs[i] = make(chan []RefSample, 300)
inputs[i] = make(chan []RefSample, 300)
go func(i int, input <-chan []RefSample, output chan<- []RefSample) { go func(input <-chan []RefSample, output chan<- []RefSample) {
unknown := h.processWALSamples(minValidTime, uint64(i), uint64(n), input, output) unknown := h.processWALSamples(minValidTime, input, output)
atomic.AddUint64(&unknownRefs, unknown) atomic.AddUint64(&unknownRefs, unknown)
wg.Done() wg.Done()
}(i, input, output) }(inputs[i], outputs[i])
// The output feeds the next worker goroutine. For the last worker,
// it feeds the initial input again to reuse the RefSample slices.
input = output
} }
var ( var (
@ -373,17 +369,27 @@ func (h *Head) loadWAL(r *wal.Reader) error {
// cause thousands of very large in flight buffers occupying large amounts // cause thousands of very large in flight buffers occupying large amounts
// of unused memory. // of unused memory.
for len(samples) > 0 { for len(samples) > 0 {
n := 5000 m := 5000
if len(samples) < n { if len(samples) < m {
n = len(samples) m = len(samples)
} }
shards := make([][]RefSample, n)
for i := 0; i < n; i++ {
var buf []RefSample var buf []RefSample
select { select {
case buf = <-input: case buf = <-outputs[i]:
default: default:
} }
firstInput <- append(buf[:0], samples[:n]...) shards[i] = buf[:0]
samples = samples[n:] }
for _, sam := range samples[:m] {
mod := sam.Ref % uint64(n)
shards[mod] = append(shards[mod], sam)
}
for i := 0; i < n; i++ {
inputs[i] <- shards[i]
}
samples = samples[m:]
} }
samples = s // Keep whole slice for reuse. samples = s // Keep whole slice for reuse.
case RecordTombstones: case RecordTombstones:
@ -407,9 +413,11 @@ func (h *Head) loadWAL(r *wal.Reader) error {
return errors.Wrap(r.Err(), "read records") return errors.Wrap(r.Err(), "read records")
} }
// Signal termination to first worker and wait for last one to close its output channel. // Signal termination to each worker and wait for it to close its output channel.
close(firstInput) for i := 0; i < n; i++ {
for range input { close(inputs[i])
for range outputs[i] {
}
} }
wg.Wait() wg.Wait()