Making the number of CPUs used for WAL replay configurable (#12066)

Adds `WALReplayConcurrency` as an option on tsdb `Options` and `HeadOptions`.
If it is not set or set <=0, then `GOMAXPROCS` is used, which matches the previous behaviour.

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
This commit is contained in:
Đurica Yuri Nikolić 2023-03-07 17:41:33 +01:00 committed by GitHub
parent ff993b279a
commit c9b85afd93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 56 additions and 37 deletions

View File

@ -125,6 +125,10 @@ type Options struct {
// WALCompression will turn on Snappy compression for records on the WAL. // WALCompression will turn on Snappy compression for records on the WAL.
WALCompression bool WALCompression bool
// Maximum number of CPUs that can simultaneously processes WAL replay.
// If it is <=0, then GOMAXPROCS is used.
WALReplayConcurrency int
// StripeSize is the size in entries of the series hash map. Reducing the size will save memory but impact performance. // StripeSize is the size in entries of the series hash map. Reducing the size will save memory but impact performance.
StripeSize int StripeSize int
@ -782,6 +786,9 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
headOpts.EnableNativeHistograms.Store(opts.EnableNativeHistograms) headOpts.EnableNativeHistograms.Store(opts.EnableNativeHistograms)
headOpts.OutOfOrderTimeWindow.Store(opts.OutOfOrderTimeWindow) headOpts.OutOfOrderTimeWindow.Store(opts.OutOfOrderTimeWindow)
headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax) headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax)
if opts.WALReplayConcurrency > 0 {
headOpts.WALReplayConcurrency = opts.WALReplayConcurrency
}
if opts.IsolationDisabled { if opts.IsolationDisabled {
// We only override this flag if isolation is disabled at DB level. We use the default otherwise. // We only override this flag if isolation is disabled at DB level. We use the default otherwise.
headOpts.IsolationDisabled = opts.IsolationDisabled headOpts.IsolationDisabled = opts.IsolationDisabled

View File

@ -18,6 +18,7 @@ import (
"io" "io"
"math" "math"
"path/filepath" "path/filepath"
"runtime"
"sync" "sync"
"time" "time"
@ -58,6 +59,8 @@ var (
// defaultIsolationDisabled is true if isolation is disabled by default. // defaultIsolationDisabled is true if isolation is disabled by default.
defaultIsolationDisabled = false defaultIsolationDisabled = false
defaultWALReplayConcurrency = runtime.GOMAXPROCS(0)
) )
// Head handles reads and writes of time series data within a time window. // Head handles reads and writes of time series data within a time window.
@ -155,6 +158,11 @@ type HeadOptions struct {
EnableMemorySnapshotOnShutdown bool EnableMemorySnapshotOnShutdown bool
IsolationDisabled bool IsolationDisabled bool
// Maximum number of CPUs that can simultaneously processes WAL replay.
// The default value is GOMAXPROCS.
// If it is set to a negative value or zero, the default value is used.
WALReplayConcurrency int
} }
const ( const (
@ -172,6 +180,7 @@ func DefaultHeadOptions() *HeadOptions {
StripeSize: DefaultStripeSize, StripeSize: DefaultStripeSize,
SeriesCallback: &noopSeriesLifecycleCallback{}, SeriesCallback: &noopSeriesLifecycleCallback{},
IsolationDisabled: defaultIsolationDisabled, IsolationDisabled: defaultIsolationDisabled,
WALReplayConcurrency: defaultWALReplayConcurrency,
} }
ho.OutOfOrderCapMax.Store(DefaultOutOfOrderCapMax) ho.OutOfOrderCapMax.Store(DefaultOutOfOrderCapMax)
return ho return ho
@ -247,6 +256,10 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wlog.WL, opts *Hea
opts.ChunkPool = chunkenc.NewPool() opts.ChunkPool = chunkenc.NewPool()
} }
if opts.WALReplayConcurrency <= 0 {
opts.WALReplayConcurrency = defaultWALReplayConcurrency
}
h.chunkDiskMapper, err = chunks.NewChunkDiskMapper( h.chunkDiskMapper, err = chunks.NewChunkDiskMapper(
r, r,
mmappedChunksDir(opts.ChunkDirRoot), mmappedChunksDir(opts.ChunkDirRoot),

View File

@ -18,7 +18,6 @@ import (
"math" "math"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -65,13 +64,13 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
// Start workers that each process samples for a partition of the series ID space. // Start workers that each process samples for a partition of the series ID space.
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
n = runtime.GOMAXPROCS(0) concurrency = h.opts.WALReplayConcurrency
processors = make([]walSubsetProcessor, n) processors = make([]walSubsetProcessor, concurrency)
exemplarsInput chan record.RefExemplar exemplarsInput chan record.RefExemplar
dec record.Decoder dec record.Decoder
shards = make([][]record.RefSample, n) shards = make([][]record.RefSample, concurrency)
histogramShards = make([][]histogramRecord, n) histogramShards = make([][]histogramRecord, concurrency)
decoded = make(chan interface{}, 10) decoded = make(chan interface{}, 10)
decodeErr, seriesCreationErr error decodeErr, seriesCreationErr error
@ -116,7 +115,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
// For CorruptionErr ensure to terminate all workers before exiting. // For CorruptionErr ensure to terminate all workers before exiting.
_, ok := err.(*wlog.CorruptionErr) _, ok := err.(*wlog.CorruptionErr)
if ok || seriesCreationErr != nil { if ok || seriesCreationErr != nil {
for i := 0; i < n; i++ { for i := 0; i < concurrency; i++ {
processors[i].closeAndDrain() processors[i].closeAndDrain()
} }
close(exemplarsInput) close(exemplarsInput)
@ -124,8 +123,8 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
} }
}() }()
wg.Add(n) wg.Add(concurrency)
for i := 0; i < n; i++ { for i := 0; i < concurrency; i++ {
processors[i].setup() processors[i].setup()
go func(wp *walSubsetProcessor) { go func(wp *walSubsetProcessor) {
@ -276,7 +275,7 @@ Outer:
multiRef[walSeries.Ref] = mSeries.ref multiRef[walSeries.Ref] = mSeries.ref
} }
idx := uint64(mSeries.ref) % uint64(n) idx := uint64(mSeries.ref) % uint64(concurrency)
processors[idx].input <- walSubsetProcessorInputItem{walSeriesRef: walSeries.Ref, existingSeries: mSeries} processors[idx].input <- walSubsetProcessorInputItem{walSeriesRef: walSeries.Ref, existingSeries: mSeries}
} }
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification. //nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
@ -293,7 +292,7 @@ Outer:
if len(samples) < m { if len(samples) < m {
m = len(samples) m = len(samples)
} }
for i := 0; i < n; i++ { for i := 0; i < concurrency; i++ {
if shards[i] == nil { if shards[i] == nil {
shards[i] = processors[i].reuseBuf() shards[i] = processors[i].reuseBuf()
} }
@ -305,10 +304,10 @@ Outer:
if r, ok := multiRef[sam.Ref]; ok { if r, ok := multiRef[sam.Ref]; ok {
sam.Ref = r sam.Ref = r
} }
mod := uint64(sam.Ref) % uint64(n) mod := uint64(sam.Ref) % uint64(concurrency)
shards[mod] = append(shards[mod], sam) shards[mod] = append(shards[mod], sam)
} }
for i := 0; i < n; i++ { for i := 0; i < concurrency; i++ {
if len(shards[i]) > 0 { if len(shards[i]) > 0 {
processors[i].input <- walSubsetProcessorInputItem{samples: shards[i]} processors[i].input <- walSubsetProcessorInputItem{samples: shards[i]}
shards[i] = nil shards[i] = nil
@ -351,7 +350,7 @@ Outer:
if len(samples) < m { if len(samples) < m {
m = len(samples) m = len(samples)
} }
for i := 0; i < n; i++ { for i := 0; i < concurrency; i++ {
if histogramShards[i] == nil { if histogramShards[i] == nil {
histogramShards[i] = processors[i].reuseHistogramBuf() histogramShards[i] = processors[i].reuseHistogramBuf()
} }
@ -363,10 +362,10 @@ Outer:
if r, ok := multiRef[sam.Ref]; ok { if r, ok := multiRef[sam.Ref]; ok {
sam.Ref = r sam.Ref = r
} }
mod := uint64(sam.Ref) % uint64(n) mod := uint64(sam.Ref) % uint64(concurrency)
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H}) histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H})
} }
for i := 0; i < n; i++ { for i := 0; i < concurrency; i++ {
if len(histogramShards[i]) > 0 { if len(histogramShards[i]) > 0 {
processors[i].input <- walSubsetProcessorInputItem{histogramSamples: histogramShards[i]} processors[i].input <- walSubsetProcessorInputItem{histogramSamples: histogramShards[i]}
histogramShards[i] = nil histogramShards[i] = nil
@ -388,7 +387,7 @@ Outer:
if len(samples) < m { if len(samples) < m {
m = len(samples) m = len(samples)
} }
for i := 0; i < n; i++ { for i := 0; i < concurrency; i++ {
if histogramShards[i] == nil { if histogramShards[i] == nil {
histogramShards[i] = processors[i].reuseHistogramBuf() histogramShards[i] = processors[i].reuseHistogramBuf()
} }
@ -400,10 +399,10 @@ Outer:
if r, ok := multiRef[sam.Ref]; ok { if r, ok := multiRef[sam.Ref]; ok {
sam.Ref = r sam.Ref = r
} }
mod := uint64(sam.Ref) % uint64(n) mod := uint64(sam.Ref) % uint64(concurrency)
histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH}) histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH})
} }
for i := 0; i < n; i++ { for i := 0; i < concurrency; i++ {
if len(histogramShards[i]) > 0 { if len(histogramShards[i]) > 0 {
processors[i].input <- walSubsetProcessorInputItem{histogramSamples: histogramShards[i]} processors[i].input <- walSubsetProcessorInputItem{histogramSamples: histogramShards[i]}
histogramShards[i] = nil histogramShards[i] = nil
@ -444,7 +443,7 @@ Outer:
} }
// Signal termination to each worker and wait for it to close its output channel. // Signal termination to each worker and wait for it to close its output channel.
for i := 0; i < n; i++ { for i := 0; i < concurrency; i++ {
processors[i].closeAndDrain() processors[i].closeAndDrain()
} }
close(exemplarsInput) close(exemplarsInput)
@ -685,12 +684,12 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
lastSeq, lastOff := lastMmapRef.Unpack() lastSeq, lastOff := lastMmapRef.Unpack()
// Start workers that each process samples for a partition of the series ID space. // Start workers that each process samples for a partition of the series ID space.
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
n = runtime.GOMAXPROCS(0) concurrency = h.opts.WALReplayConcurrency
processors = make([]wblSubsetProcessor, n) processors = make([]wblSubsetProcessor, concurrency)
dec record.Decoder dec record.Decoder
shards = make([][]record.RefSample, n) shards = make([][]record.RefSample, concurrency)
decodedCh = make(chan interface{}, 10) decodedCh = make(chan interface{}, 10)
decodeErr error decodeErr error
@ -712,15 +711,15 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
_, ok := err.(*wlog.CorruptionErr) _, ok := err.(*wlog.CorruptionErr)
if ok { if ok {
err = &errLoadWbl{err: err} err = &errLoadWbl{err: err}
for i := 0; i < n; i++ { for i := 0; i < concurrency; i++ {
processors[i].closeAndDrain() processors[i].closeAndDrain()
} }
wg.Wait() wg.Wait()
} }
}() }()
wg.Add(n) wg.Add(concurrency)
for i := 0; i < n; i++ { for i := 0; i < concurrency; i++ {
processors[i].setup() processors[i].setup()
go func(wp *wblSubsetProcessor) { go func(wp *wblSubsetProcessor) {
@ -779,17 +778,17 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
if len(samples) < m { if len(samples) < m {
m = len(samples) m = len(samples)
} }
for i := 0; i < n; i++ { for i := 0; i < concurrency; i++ {
shards[i] = processors[i].reuseBuf() shards[i] = processors[i].reuseBuf()
} }
for _, sam := range samples[:m] { for _, sam := range samples[:m] {
if r, ok := multiRef[sam.Ref]; ok { if r, ok := multiRef[sam.Ref]; ok {
sam.Ref = r sam.Ref = r
} }
mod := uint64(sam.Ref) % uint64(n) mod := uint64(sam.Ref) % uint64(concurrency)
shards[mod] = append(shards[mod], sam) shards[mod] = append(shards[mod], sam)
} }
for i := 0; i < n; i++ { for i := 0; i < concurrency; i++ {
processors[i].input <- shards[i] processors[i].input <- shards[i]
} }
samples = samples[m:] samples = samples[m:]
@ -816,7 +815,7 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
mmapMarkerUnknownRefs.Inc() mmapMarkerUnknownRefs.Inc()
continue continue
} }
idx := uint64(ms.ref) % uint64(n) idx := uint64(ms.ref) % uint64(concurrency)
// It is possible that some old sample is being processed in processWALSamples that // It is possible that some old sample is being processed in processWALSamples that
// could cause race below. So we wait for the goroutine to empty input the buffer and finish // could cause race below. So we wait for the goroutine to empty input the buffer and finish
// processing all old samples after emptying the buffer. // processing all old samples after emptying the buffer.
@ -845,7 +844,7 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
} }
// Signal termination to each worker and wait for it to close its output channel. // Signal termination to each worker and wait for it to close its output channel.
for i := 0; i < n; i++ { for i := 0; i < concurrency; i++ {
processors[i].closeAndDrain() processors[i].closeAndDrain()
} }
wg.Wait() wg.Wait()
@ -1381,18 +1380,18 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie
var ( var (
numSeries = 0 numSeries = 0
unknownRefs = int64(0) unknownRefs = int64(0)
n = runtime.GOMAXPROCS(0) concurrency = h.opts.WALReplayConcurrency
wg sync.WaitGroup wg sync.WaitGroup
recordChan = make(chan chunkSnapshotRecord, 5*n) recordChan = make(chan chunkSnapshotRecord, 5*concurrency)
shardedRefSeries = make([]map[chunks.HeadSeriesRef]*memSeries, n) shardedRefSeries = make([]map[chunks.HeadSeriesRef]*memSeries, concurrency)
errChan = make(chan error, n) errChan = make(chan error, concurrency)
refSeries map[chunks.HeadSeriesRef]*memSeries refSeries map[chunks.HeadSeriesRef]*memSeries
exemplarBuf []record.RefExemplar exemplarBuf []record.RefExemplar
dec record.Decoder dec record.Decoder
) )
wg.Add(n) wg.Add(concurrency)
for i := 0; i < n; i++ { for i := 0; i < concurrency; i++ {
go func(idx int, rc <-chan chunkSnapshotRecord) { go func(idx int, rc <-chan chunkSnapshotRecord) {
defer wg.Done() defer wg.Done()
defer func() { defer func() {