diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 57c21c6d8..c883ab47e 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -337,6 +337,9 @@ func main() { serverOnlyFlag(a, "storage.tsdb.head-chunks-write-queue-size", "Size of the queue through which head chunks are written to the disk to be m-mapped, 0 disables the queue completely. Experimental."). Default("0").IntVar(&cfg.tsdb.HeadChunksWriteQueueSize) + serverOnlyFlag(a, "storage.tsdb.samples-per-chunk", "Target number of samples per chunk."). + Default("120").Hidden().IntVar(&cfg.tsdb.SamplesPerChunk) + agentOnlyFlag(a, "storage.agent.path", "Base path for metrics storage."). Default("data-agent/").StringVar(&cfg.agentStoragePath) @@ -1543,6 +1546,7 @@ type tsdbOptions struct { NoLockfile bool WALCompression bool HeadChunksWriteQueueSize int + SamplesPerChunk int StripeSize int MinBlockDuration model.Duration MaxBlockDuration model.Duration @@ -1563,6 +1567,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { AllowOverlappingCompaction: true, WALCompression: opts.WALCompression, HeadChunksWriteQueueSize: opts.HeadChunksWriteQueueSize, + SamplesPerChunk: opts.SamplesPerChunk, StripeSize: opts.StripeSize, MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond), MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond), diff --git a/tsdb/db.go b/tsdb/db.go index 61333c972..0cd00c2c5 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -78,6 +78,7 @@ func DefaultOptions() *Options { NoLockfile: false, AllowOverlappingCompaction: true, WALCompression: false, + SamplesPerChunk: DefaultSamplesPerChunk, StripeSize: DefaultStripeSize, HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize, IsolationDisabled: defaultIsolationDisabled, @@ -149,6 +150,9 @@ type Options struct { // HeadChunksWriteQueueSize configures the size of the chunk write queue used in the head chunks mapper. HeadChunksWriteQueueSize int + // SamplesPerChunk configures the target number of samples per chunk. + SamplesPerChunk int + // SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series. // It is always a no-op in Prometheus and mainly meant for external users who import TSDB. SeriesLifecycleCallback SeriesLifecycleCallback @@ -778,6 +782,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs headOpts.ChunkPool = db.chunkPool headOpts.ChunkWriteBufferSize = opts.HeadChunksWriteBufferSize headOpts.ChunkWriteQueueSize = opts.HeadChunksWriteQueueSize + headOpts.SamplesPerChunk = opts.SamplesPerChunk headOpts.StripeSize = opts.StripeSize headOpts.SeriesCallback = opts.SeriesLifecycleCallback headOpts.EnableExemplarStorage = opts.EnableExemplarStorage diff --git a/tsdb/head.go b/tsdb/head.go index f839adb72..a3c135300 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -150,6 +150,8 @@ type HeadOptions struct { ChunkWriteBufferSize int ChunkWriteQueueSize int + SamplesPerChunk int + // StripeSize sets the number of entries in the hash map, it must be a power of 2. // A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series. // A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series. @@ -169,6 +171,8 @@ type HeadOptions struct { const ( // DefaultOutOfOrderCapMax is the default maximum size of an in-memory out-of-order chunk. DefaultOutOfOrderCapMax int64 = 32 + // DefaultSamplesPerChunk provides a default target number of samples per chunk. + DefaultSamplesPerChunk = 120 ) func DefaultHeadOptions() *HeadOptions { @@ -178,6 +182,7 @@ func DefaultHeadOptions() *HeadOptions { ChunkPool: chunkenc.NewPool(), ChunkWriteBufferSize: chunks.DefaultWriteBufferSize, ChunkWriteQueueSize: chunks.DefaultWriteQueueSize, + SamplesPerChunk: DefaultSamplesPerChunk, StripeSize: DefaultStripeSize, SeriesCallback: &noopSeriesLifecycleCallback{}, IsolationDisabled: defaultIsolationDisabled, @@ -1607,7 +1612,7 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) { s, created, err := h.series.getOrSet(hash, lset, func() *memSeries { - return newMemSeries(lset, id, h.opts.IsolationDisabled) + return newMemSeries(lset, id, h.opts.IsolationDisabled, h.opts.SamplesPerChunk) }) if err != nil { return nil, false, err @@ -1915,7 +1920,8 @@ type memSeries struct { mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay. - nextAt int64 // Timestamp at which to cut the next chunk. + samplesPerChunk int // Target number of samples per chunk. + nextAt int64 // Timestamp at which to cut the next chunk. // We keep the last value here (in addition to appending it to the chunk) so we can check for duplicates. lastValue float64 @@ -1943,11 +1949,12 @@ type memSeriesOOOFields struct { firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0]. } -func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, isolationDisabled bool) *memSeries { +func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, isolationDisabled bool, samplesPerChunk int) *memSeries { s := &memSeries{ - lset: lset, - ref: id, - nextAt: math.MinInt64, + lset: lset, + ref: id, + nextAt: math.MinInt64, + samplesPerChunk: samplesPerChunk, } if !isolationDisabled { s.txs = newTxRing(4) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 86cb09751..2af01a2d6 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1332,11 +1332,6 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, func (s *memSeries) appendPreprocessor( t int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64, ) (c *memChunk, sampleInOrder, chunkCreated bool) { - // Based on Gorilla white papers this offers near-optimal compression ratio - // so anything bigger that this has diminishing returns and increases - // the time range within which we have to decompress all samples. - const samplesPerChunk = 120 - c = s.head() if c == nil { @@ -1373,7 +1368,7 @@ func (s *memSeries) appendPreprocessor( // for this chunk that will try to make samples equally distributed within // the remaining chunks in the current chunk range. // At latest it must happen at the timestamp set when the chunk was cut. - if numSamples == samplesPerChunk/4 { + if numSamples == s.samplesPerChunk/4 { s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt) } // If numSamples > samplesPerChunk*2 then our previous prediction was invalid, @@ -1381,7 +1376,7 @@ func (s *memSeries) appendPreprocessor( // Since we assume that the rate is higher, we're being conservative and cutting at 2*samplesPerChunk // as we expect more chunks to come. // Note that next chunk will have its nextAt recalculated for the new rate. - if t >= s.nextAt || numSamples >= samplesPerChunk*2 { + if t >= s.nextAt || numSamples >= s.samplesPerChunk*2 { c = s.cutNewHeadChunk(t, e, chunkDiskMapper, chunkRange) chunkCreated = true } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 45b587405..af3df378e 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -285,7 +285,7 @@ func BenchmarkLoadWAL(b *testing.B) { require.NoError(b, err) for k := 0; k < c.batches*c.seriesPerBatch; k++ { // Create one mmapped chunk per series, with one sample at the given time. - s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, defaultIsolationDisabled, DefaultSamplesPerChunk) s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper, c.mmappedChunkT) s.mmapCurrentHeadChunk(chunkDiskMapper) } @@ -807,7 +807,7 @@ func TestMemSeries_truncateChunks(t *testing.T) { }, } - s := newMemSeries(labels.FromStrings("a", "b"), 1, defaultIsolationDisabled) + s := newMemSeries(labels.FromStrings("a", "b"), 1, defaultIsolationDisabled, DefaultSamplesPerChunk) for i := 0; i < 4000; i += 5 { ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange) @@ -1338,7 +1338,7 @@ func TestMemSeries_append(t *testing.T) { }() const chunkRange = 500 - s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled, DefaultSamplesPerChunk) // Add first two samples at the very end of a chunk range and the next two // on and after it. @@ -1392,7 +1392,7 @@ func TestMemSeries_appendHistogram(t *testing.T) { }() chunkRange := int64(1000) - s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled, DefaultSamplesPerChunk) histograms := tsdbutil.GenerateTestHistograms(4) histogramWithOneMoreBucket := histograms[3].Copy() @@ -1448,7 +1448,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { }) chunkRange := DefaultBlockDuration - s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled, DefaultSamplesPerChunk) // At this slow rate, we will fill the chunk in two block durations. slowRate := (DefaultBlockDuration * 2) / samplesPerChunk @@ -2610,7 +2610,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) { }() const chunkRange = 500 - s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled, DefaultSamplesPerChunk) for i := 0; i < 7; i++ { ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange)