From c9b85afd93d03df31f1b320353521ee7ac9e51d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=90urica=20Yuri=20Nikoli=C4=87?= Date: Tue, 7 Mar 2023 17:41:33 +0100 Subject: [PATCH] Making the number of CPUs used for WAL replay configurable (#12066) Adds `WALReplayConcurrency` as an option on tsdb `Options` and `HeadOptions`. If it is not set or set <=0, then `GOMAXPROCS` is used, which matches the previous behaviour. Signed-off-by: Yuri Nikolic --- tsdb/db.go | 7 +++++ tsdb/head.go | 13 +++++++++ tsdb/head_wal.go | 73 ++++++++++++++++++++++++------------------------ 3 files changed, 56 insertions(+), 37 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index 616213b03..561867025 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -125,6 +125,10 @@ type Options struct { // WALCompression will turn on Snappy compression for records on the WAL. WALCompression bool + // Maximum number of CPUs that can simultaneously processes WAL replay. + // If it is <=0, then GOMAXPROCS is used. + WALReplayConcurrency int + // StripeSize is the size in entries of the series hash map. Reducing the size will save memory but impact performance. StripeSize int @@ -782,6 +786,9 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs headOpts.EnableNativeHistograms.Store(opts.EnableNativeHistograms) headOpts.OutOfOrderTimeWindow.Store(opts.OutOfOrderTimeWindow) headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax) + if opts.WALReplayConcurrency > 0 { + headOpts.WALReplayConcurrency = opts.WALReplayConcurrency + } if opts.IsolationDisabled { // We only override this flag if isolation is disabled at DB level. We use the default otherwise. headOpts.IsolationDisabled = opts.IsolationDisabled diff --git a/tsdb/head.go b/tsdb/head.go index cf2c15d79..ef176d1c5 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -18,6 +18,7 @@ import ( "io" "math" "path/filepath" + "runtime" "sync" "time" @@ -58,6 +59,8 @@ var ( // defaultIsolationDisabled is true if isolation is disabled by default. defaultIsolationDisabled = false + + defaultWALReplayConcurrency = runtime.GOMAXPROCS(0) ) // Head handles reads and writes of time series data within a time window. @@ -155,6 +158,11 @@ type HeadOptions struct { EnableMemorySnapshotOnShutdown bool IsolationDisabled bool + + // Maximum number of CPUs that can simultaneously processes WAL replay. + // The default value is GOMAXPROCS. + // If it is set to a negative value or zero, the default value is used. + WALReplayConcurrency int } const ( @@ -172,6 +180,7 @@ func DefaultHeadOptions() *HeadOptions { StripeSize: DefaultStripeSize, SeriesCallback: &noopSeriesLifecycleCallback{}, IsolationDisabled: defaultIsolationDisabled, + WALReplayConcurrency: defaultWALReplayConcurrency, } ho.OutOfOrderCapMax.Store(DefaultOutOfOrderCapMax) return ho @@ -247,6 +256,10 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wlog.WL, opts *Hea opts.ChunkPool = chunkenc.NewPool() } + if opts.WALReplayConcurrency <= 0 { + opts.WALReplayConcurrency = defaultWALReplayConcurrency + } + h.chunkDiskMapper, err = chunks.NewChunkDiskMapper( r, mmappedChunksDir(opts.ChunkDirRoot), diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index f2ef4c1e0..dd55f438d 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -18,7 +18,6 @@ import ( "math" "os" "path/filepath" - "runtime" "strconv" "strings" "sync" @@ -65,13 +64,13 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks. // Start workers that each process samples for a partition of the series ID space. var ( wg sync.WaitGroup - n = runtime.GOMAXPROCS(0) - processors = make([]walSubsetProcessor, n) + concurrency = h.opts.WALReplayConcurrency + processors = make([]walSubsetProcessor, concurrency) exemplarsInput chan record.RefExemplar dec record.Decoder - shards = make([][]record.RefSample, n) - histogramShards = make([][]histogramRecord, n) + shards = make([][]record.RefSample, concurrency) + histogramShards = make([][]histogramRecord, concurrency) decoded = make(chan interface{}, 10) decodeErr, seriesCreationErr error @@ -116,7 +115,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks. // For CorruptionErr ensure to terminate all workers before exiting. _, ok := err.(*wlog.CorruptionErr) if ok || seriesCreationErr != nil { - for i := 0; i < n; i++ { + for i := 0; i < concurrency; i++ { processors[i].closeAndDrain() } close(exemplarsInput) @@ -124,8 +123,8 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks. } }() - wg.Add(n) - for i := 0; i < n; i++ { + wg.Add(concurrency) + for i := 0; i < concurrency; i++ { processors[i].setup() go func(wp *walSubsetProcessor) { @@ -276,7 +275,7 @@ Outer: multiRef[walSeries.Ref] = mSeries.ref } - idx := uint64(mSeries.ref) % uint64(n) + idx := uint64(mSeries.ref) % uint64(concurrency) processors[idx].input <- walSubsetProcessorInputItem{walSeriesRef: walSeries.Ref, existingSeries: mSeries} } //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. @@ -293,7 +292,7 @@ Outer: if len(samples) < m { m = len(samples) } - for i := 0; i < n; i++ { + for i := 0; i < concurrency; i++ { if shards[i] == nil { shards[i] = processors[i].reuseBuf() } @@ -305,10 +304,10 @@ Outer: if r, ok := multiRef[sam.Ref]; ok { sam.Ref = r } - mod := uint64(sam.Ref) % uint64(n) + mod := uint64(sam.Ref) % uint64(concurrency) shards[mod] = append(shards[mod], sam) } - for i := 0; i < n; i++ { + for i := 0; i < concurrency; i++ { if len(shards[i]) > 0 { processors[i].input <- walSubsetProcessorInputItem{samples: shards[i]} shards[i] = nil @@ -351,7 +350,7 @@ Outer: if len(samples) < m { m = len(samples) } - for i := 0; i < n; i++ { + for i := 0; i < concurrency; i++ { if histogramShards[i] == nil { histogramShards[i] = processors[i].reuseHistogramBuf() } @@ -363,10 +362,10 @@ Outer: if r, ok := multiRef[sam.Ref]; ok { sam.Ref = r } - mod := uint64(sam.Ref) % uint64(n) + mod := uint64(sam.Ref) % uint64(concurrency) histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H}) } - for i := 0; i < n; i++ { + for i := 0; i < concurrency; i++ { if len(histogramShards[i]) > 0 { processors[i].input <- walSubsetProcessorInputItem{histogramSamples: histogramShards[i]} histogramShards[i] = nil @@ -388,7 +387,7 @@ Outer: if len(samples) < m { m = len(samples) } - for i := 0; i < n; i++ { + for i := 0; i < concurrency; i++ { if histogramShards[i] == nil { histogramShards[i] = processors[i].reuseHistogramBuf() } @@ -400,10 +399,10 @@ Outer: if r, ok := multiRef[sam.Ref]; ok { sam.Ref = r } - mod := uint64(sam.Ref) % uint64(n) + mod := uint64(sam.Ref) % uint64(concurrency) histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH}) } - for i := 0; i < n; i++ { + for i := 0; i < concurrency; i++ { if len(histogramShards[i]) > 0 { processors[i].input <- walSubsetProcessorInputItem{histogramSamples: histogramShards[i]} histogramShards[i] = nil @@ -444,7 +443,7 @@ Outer: } // Signal termination to each worker and wait for it to close its output channel. - for i := 0; i < n; i++ { + for i := 0; i < concurrency; i++ { processors[i].closeAndDrain() } close(exemplarsInput) @@ -685,12 +684,12 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks. lastSeq, lastOff := lastMmapRef.Unpack() // Start workers that each process samples for a partition of the series ID space. var ( - wg sync.WaitGroup - n = runtime.GOMAXPROCS(0) - processors = make([]wblSubsetProcessor, n) + wg sync.WaitGroup + concurrency = h.opts.WALReplayConcurrency + processors = make([]wblSubsetProcessor, concurrency) dec record.Decoder - shards = make([][]record.RefSample, n) + shards = make([][]record.RefSample, concurrency) decodedCh = make(chan interface{}, 10) decodeErr error @@ -712,15 +711,15 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks. _, ok := err.(*wlog.CorruptionErr) if ok { err = &errLoadWbl{err: err} - for i := 0; i < n; i++ { + for i := 0; i < concurrency; i++ { processors[i].closeAndDrain() } wg.Wait() } }() - wg.Add(n) - for i := 0; i < n; i++ { + wg.Add(concurrency) + for i := 0; i < concurrency; i++ { processors[i].setup() go func(wp *wblSubsetProcessor) { @@ -779,17 +778,17 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks. if len(samples) < m { m = len(samples) } - for i := 0; i < n; i++ { + for i := 0; i < concurrency; i++ { shards[i] = processors[i].reuseBuf() } for _, sam := range samples[:m] { if r, ok := multiRef[sam.Ref]; ok { sam.Ref = r } - mod := uint64(sam.Ref) % uint64(n) + mod := uint64(sam.Ref) % uint64(concurrency) shards[mod] = append(shards[mod], sam) } - for i := 0; i < n; i++ { + for i := 0; i < concurrency; i++ { processors[i].input <- shards[i] } samples = samples[m:] @@ -816,7 +815,7 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks. mmapMarkerUnknownRefs.Inc() continue } - idx := uint64(ms.ref) % uint64(n) + idx := uint64(ms.ref) % uint64(concurrency) // It is possible that some old sample is being processed in processWALSamples that // could cause race below. So we wait for the goroutine to empty input the buffer and finish // processing all old samples after emptying the buffer. @@ -845,7 +844,7 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks. } // Signal termination to each worker and wait for it to close its output channel. - for i := 0; i < n; i++ { + for i := 0; i < concurrency; i++ { processors[i].closeAndDrain() } wg.Wait() @@ -1381,18 +1380,18 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie var ( numSeries = 0 unknownRefs = int64(0) - n = runtime.GOMAXPROCS(0) + concurrency = h.opts.WALReplayConcurrency wg sync.WaitGroup - recordChan = make(chan chunkSnapshotRecord, 5*n) - shardedRefSeries = make([]map[chunks.HeadSeriesRef]*memSeries, n) - errChan = make(chan error, n) + recordChan = make(chan chunkSnapshotRecord, 5*concurrency) + shardedRefSeries = make([]map[chunks.HeadSeriesRef]*memSeries, concurrency) + errChan = make(chan error, concurrency) refSeries map[chunks.HeadSeriesRef]*memSeries exemplarBuf []record.RefExemplar dec record.Decoder ) - wg.Add(n) - for i := 0; i < n; i++ { + wg.Add(concurrency) + for i := 0; i < concurrency; i++ { go func(idx int, rc <-chan chunkSnapshotRecord) { defer wg.Done() defer func() {