From 4fd2556baa8bc11d49529abb92163feca33d1a58 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Thu, 26 Sep 2024 15:43:19 +0200 Subject: [PATCH] Extract processWithBoundedParallelismAndConsistentWorkers Signed-off-by: Oleg Zaytsev --- tsdb/index/postings.go | 93 +++++++++++++++++++++++------------------- 1 file changed, 52 insertions(+), 41 deletions(-) diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index e6a6c708ff..f8415407ef 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -300,52 +300,34 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma // Deleting label names mutates p.m map, so it should be done from a single goroutine after nobody else is reading it. deleteLabelNames := make(chan string, len(p.m)) - process := func(l labels.Label) { - orig := p.m[l.Name][l.Value] - repl := make([]storage.SeriesRef, 0, len(orig)) - for _, id := range orig { - if _, ok := deleted[id]; !ok { - repl = append(repl, id) + process, wait := processWithBoundedParallelismAndConsistentWorkers( + runtime.GOMAXPROCS(0), + func(l labels.Label) uint64 { return xxhash.Sum64String(l.Name) }, + func(l labels.Label) { + orig := p.m[l.Name][l.Value] + repl := make([]storage.SeriesRef, 0, len(orig)) + for _, id := range orig { + if _, ok := deleted[id]; !ok { + repl = append(repl, id) + } } - } - if len(repl) > 0 { - p.m[l.Name][l.Value] = repl - } else { - delete(p.m[l.Name], l.Value) - if len(p.m[l.Name]) == 0 { - // Delete the key if we removed all values. - deleteLabelNames <- l.Name + if len(repl) > 0 { + p.m[l.Name][l.Value] = repl + } else { + delete(p.m[l.Name], l.Value) + if len(p.m[l.Name]) == 0 { + // Delete the key if we removed all values. + deleteLabelNames <- l.Name + } } - } - } + }, + ) - // Create GOMAXPROCS workers. - wg := sync.WaitGroup{} - jobs := make([]chan labels.Label, runtime.GOMAXPROCS(0)) - for i := range jobs { - jobs[i] = make(chan labels.Label, 128) - wg.Add(1) - go func(jobs chan labels.Label) { - defer wg.Done() - for l := range jobs { - process(l) - } - }(jobs[i]) - } - - // Process all affected labels and the allPostingsKey. for l := range affected { - j := int(xxhash.Sum64String(l.Name) % uint64(len(jobs))) - jobs[j] <- l + process(l) } - j := int(xxhash.Sum64String(allPostingsKey.Name) % uint64(len(jobs))) - jobs[j] <- allPostingsKey - - // Close jobs channels and wait all workers to finish. - for i := range jobs { - close(jobs[i]) - } - wg.Wait() + process(allPostingsKey) + wait() // Close deleteLabelNames channel and delete the label names requested. close(deleteLabelNames) @@ -354,6 +336,35 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma } } +// processWithBoundedParallelismAndConsistentWorkers will call f() with bounded parallelism, +// making sure that elements with same hash(T) will always be processed by the same worker. +// Call process() to add more jobs to process, and once finished adding, call wait() to ensure that all jobs are processed. +func processWithBoundedParallelismAndConsistentWorkers[T any](workers int, hash func(T) uint64, f func(T)) (process func(T), wait func()) { + wg := &sync.WaitGroup{} + jobs := make([]chan T, workers) + for i := 0; i < workers; i++ { + wg.Add(1) + jobs[i] = make(chan T, 128) + go func(jobs <-chan T) { + defer wg.Done() + for l := range jobs { + f(l) + } + }(jobs[i]) + } + + process = func(job T) { + jobs[hash(job)%uint64(workers)] <- job + } + wait = func() { + for i := range jobs { + close(jobs[i]) + } + wg.Wait() + } + return process, wait +} + // Iter calls f for each postings list. It aborts if f returns an error and returns it. func (p *MemPostings) Iter(f func(labels.Label, Postings) error) error { p.mtx.RLock()