From 4c4454e4c9e12ad08e765335ba11199f296ad986 Mon Sep 17 00:00:00 2001 From: Justin Lei Date: Thu, 25 May 2023 13:12:32 -0700 Subject: [PATCH 1/2] Group args to append to memSeries in chunkOpts Signed-off-by: Justin Lei --- tsdb/head_append.go | 37 ++++++++++----- tsdb/head_test.go | 113 +++++++++++++++++++++++++++++++------------- tsdb/head_wal.go | 12 +++-- 3 files changed, 113 insertions(+), 49 deletions(-) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 44847dceb2..a77c8a4ebc 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -881,9 +881,13 @@ func (a *headAppender) Commit() (err error) { oooMmapMarkers map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef oooRecords [][]byte oooCapMax = a.head.opts.OutOfOrderCapMax.Load() - chunkRange = a.head.chunkRange.Load() series *memSeries - enc record.Encoder + appendChunkOpts = chunkOpts{ + chunkDiskMapper: a.head.chunkDiskMapper, + chunkRange: a.head.chunkRange.Load(), + samplesPerChunk: a.head.opts.SamplesPerChunk, + } + enc record.Encoder ) defer func() { for i := range oooRecords { @@ -987,7 +991,7 @@ func (a *headAppender) Commit() (err error) { samplesAppended-- } default: - ok, chunkCreated = series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper, chunkRange, a.head.opts.SamplesPerChunk) + ok, chunkCreated = series.append(s.T, s.V, a.appendID, appendChunkOpts) if ok { if s.T < inOrderMint { inOrderMint = s.T @@ -1016,7 +1020,7 @@ func (a *headAppender) Commit() (err error) { for i, s := range a.histograms { series = a.histogramSeries[i] series.Lock() - ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, a.head.chunkDiskMapper, chunkRange, a.head.opts.SamplesPerChunk) + ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, appendChunkOpts) series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) series.pendingCommit = false series.Unlock() @@ -1042,7 +1046,7 @@ func (a *headAppender) Commit() (err error) { for i, s := range a.floatHistograms { series = a.floatHistogramSeries[i] series.Lock() - ok, chunkCreated := series.appendFloatHistogram(s.T, s.FH, a.appendID, a.head.chunkDiskMapper, chunkRange, a.head.opts.SamplesPerChunk) + ok, chunkCreated := series.appendFloatHistogram(s.T, s.FH, a.appendID, appendChunkOpts) series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) series.pendingCommit = false series.Unlock() @@ -1118,12 +1122,19 @@ func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDisk return ok, chunkCreated, mmapRef } +// chunkOpts are chunk-level options that are passed when appending to a memSeries. +type chunkOpts struct { + chunkDiskMapper *chunks.ChunkDiskMapper + chunkRange int64 + samplesPerChunk int +} + // append adds the sample (t, v) to the series. The caller also has to provide // the appendID for isolation. (The appendID can be zero, which results in no // isolation for this append.) // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. -func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64, samplesPerChunk int) (sampleInOrder, chunkCreated bool) { - c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, chunkDiskMapper, chunkRange, samplesPerChunk) +func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { + c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, o.chunkDiskMapper, o.chunkRange, o.samplesPerChunk) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1144,7 +1155,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper // appendHistogram adds the histogram. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. -func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64, samplesPerChunk int) (sampleInOrder, chunkCreated bool) { +func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { // Head controls the execution of recoding, so that we own the proper // chunk reference afterwards. We check for Appendable from appender before // appendPreprocessor because in case it ends up creating a new chunk, @@ -1157,7 +1168,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui pMergedSpans, nMergedSpans []histogram.Span okToAppend, counterReset, gauge bool ) - c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange, samplesPerChunk) + c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, o.chunkDiskMapper, o.chunkRange, o.samplesPerChunk) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1193,7 +1204,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui // - okToAppend and no inserts → Chunk is ready to support our histogram. switch { case !okToAppend || counterReset: - c = s.cutNewHeadChunk(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange) + c = s.cutNewHeadChunk(t, chunkenc.EncHistogram, o.chunkDiskMapper, o.chunkRange) chunkCreated = true case len(pForwardInserts) > 0 || len(nForwardInserts) > 0: // New buckets have appeared. We need to recode all @@ -1238,7 +1249,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui // appendFloatHistogram adds the float histogram. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. -func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64, samplesPerChunk int) (sampleInOrder, chunkCreated bool) { +func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { // Head controls the execution of recoding, so that we own the proper // chunk reference afterwards. We check for Appendable from appender before // appendPreprocessor because in case it ends up creating a new chunk, @@ -1251,7 +1262,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, pMergedSpans, nMergedSpans []histogram.Span okToAppend, counterReset, gauge bool ) - c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange, samplesPerChunk) + c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, o.chunkDiskMapper, o.chunkRange, o.samplesPerChunk) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1287,7 +1298,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, // - okToAppend and no inserts → Chunk is ready to support our histogram. switch { case !okToAppend || counterReset: - c = s.cutNewHeadChunk(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange) + c = s.cutNewHeadChunk(t, chunkenc.EncFloatHistogram, o.chunkDiskMapper, o.chunkRange) chunkCreated = true case len(pForwardInserts) > 0 || len(nForwardInserts) > 0: // New buckets have appeared. We need to recode all diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 14468e0716..8eb218b5ac 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -283,10 +283,15 @@ func BenchmarkLoadWAL(b *testing.B) { if c.mmappedChunkT != 0 { chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, mmappedChunksDir(dir), chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) require.NoError(b, err) + cOpts := chunkOpts{ + chunkDiskMapper: chunkDiskMapper, + chunkRange: c.mmappedChunkT, + samplesPerChunk: DefaultSamplesPerChunk, + } for k := 0; k < c.batches*c.seriesPerBatch; k++ { // Create one mmapped chunk per series, with one sample at the given time. s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, defaultIsolationDisabled) - s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper, c.mmappedChunkT, DefaultSamplesPerChunk) + s.append(c.mmappedChunkT, 42, 0, cOpts) s.mmapCurrentHeadChunk(chunkDiskMapper) } require.NoError(b, chunkDiskMapper.Close()) @@ -799,7 +804,11 @@ func TestMemSeries_truncateChunks(t *testing.T) { defer func() { require.NoError(t, chunkDiskMapper.Close()) }() - const chunkRange = 2000 + cOpts := chunkOpts{ + chunkDiskMapper: chunkDiskMapper, + chunkRange: 2000, + samplesPerChunk: DefaultSamplesPerChunk, + } memChunkPool := sync.Pool{ New: func() interface{} { @@ -810,7 +819,7 @@ func TestMemSeries_truncateChunks(t *testing.T) { s := newMemSeries(labels.FromStrings("a", "b"), 1, defaultIsolationDisabled) for i := 0; i < 4000; i += 5 { - ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, _ := s.append(int64(i), float64(i), 0, cOpts) require.True(t, ok, "sample append failed") } @@ -1336,26 +1345,30 @@ func TestMemSeries_append(t *testing.T) { defer func() { require.NoError(t, chunkDiskMapper.Close()) }() - const chunkRange = 500 + cOpts := chunkOpts{ + chunkDiskMapper: chunkDiskMapper, + chunkRange: 500, + samplesPerChunk: DefaultSamplesPerChunk, + } s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) // Add first two samples at the very end of a chunk range and the next two // on and after it. // New chunk must correctly be cut at 1000. - ok, chunkCreated := s.append(998, 1, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, chunkCreated := s.append(998, 1, 0, cOpts) require.True(t, ok, "append failed") require.True(t, chunkCreated, "first sample created chunk") - ok, chunkCreated = s.append(999, 2, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, chunkCreated = s.append(999, 2, 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") - ok, chunkCreated = s.append(1000, 3, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, chunkCreated = s.append(1000, 3, 0, cOpts) require.True(t, ok, "append failed") require.True(t, chunkCreated, "expected new chunk on boundary") - ok, chunkCreated = s.append(1001, 4, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, chunkCreated = s.append(1001, 4, 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") @@ -1368,7 +1381,7 @@ func TestMemSeries_append(t *testing.T) { // Fill the range [1000,2000) with many samples. Intermediate chunks should be cut // at approximately 120 samples per chunk. for i := 1; i < 1000; i++ { - ok, _ := s.append(1001+int64(i), float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, _ := s.append(1001+int64(i), float64(i), 0, cOpts) require.True(t, ok, "append failed") } @@ -1390,7 +1403,11 @@ func TestMemSeries_appendHistogram(t *testing.T) { defer func() { require.NoError(t, chunkDiskMapper.Close()) }() - chunkRange := int64(1000) + cOpts := chunkOpts{ + chunkDiskMapper: chunkDiskMapper, + chunkRange: int64(1000), + samplesPerChunk: DefaultSamplesPerChunk, + } s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) @@ -1404,19 +1421,19 @@ func TestMemSeries_appendHistogram(t *testing.T) { // Add first two samples at the very end of a chunk range and the next two // on and after it. // New chunk must correctly be cut at 1000. - ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, cOpts) require.True(t, ok, "append failed") require.True(t, chunkCreated, "first sample created chunk") - ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") - ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, cOpts) require.True(t, ok, "append failed") require.True(t, chunkCreated, "expected new chunk on boundary") - ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") @@ -1426,7 +1443,7 @@ func TestMemSeries_appendHistogram(t *testing.T) { require.Equal(t, int64(1000), s.headChunk.minTime, "wrong chunk range") require.Equal(t, int64(1001), s.headChunk.maxTime, "wrong chunk range") - ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "third sample should trigger a re-encoded chunk") @@ -1446,7 +1463,11 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { t.Cleanup(func() { require.NoError(t, chunkDiskMapper.Close()) }) - chunkRange := DefaultBlockDuration + cOpts := chunkOpts{ + chunkDiskMapper: chunkDiskMapper, + chunkRange: DefaultBlockDuration, + samplesPerChunk: samplesPerChunk, + } s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) @@ -1456,7 +1477,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { var nextTs int64 var totalAppendedSamples int for i := 0; i < samplesPerChunk/4; i++ { - ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, _ := s.append(nextTs, float64(i), 0, cOpts) require.Truef(t, ok, "slow sample %d was not appended", i) nextTs += slowRate totalAppendedSamples++ @@ -1465,12 +1486,12 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { // Suddenly, the rate increases and we receive a sample every millisecond. for i := 0; i < math.MaxUint16; i++ { - ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, _ := s.append(nextTs, float64(i), 0, cOpts) require.Truef(t, ok, "quick sample %d was not appended", i) nextTs++ totalAppendedSamples++ } - ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, cOpts) require.True(t, ok, "new chunk sample was not appended") require.True(t, chunkCreated, "sample at block duration timestamp should create a new chunk") @@ -1490,23 +1511,29 @@ func TestGCChunkAccess(t *testing.T) { require.NoError(t, h.Close()) }() + cOpts := chunkOpts{ + chunkDiskMapper: h.chunkDiskMapper, + chunkRange: chunkRange, + samplesPerChunk: DefaultSamplesPerChunk, + } + h.initTime(0) s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) // Appending 2 samples for the first chunk. - ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, chunkCreated := s.append(0, 0, 0, cOpts) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, chunkCreated = s.append(999, 999, 0, cOpts) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") // A new chunks should be created here as it's beyond the chunk range. - ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, chunkCreated = s.append(1000, 1000, 0, cOpts) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, chunkCreated = s.append(1999, 1999, 0, cOpts) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") @@ -1543,23 +1570,29 @@ func TestGCSeriesAccess(t *testing.T) { require.NoError(t, h.Close()) }() + cOpts := chunkOpts{ + chunkDiskMapper: h.chunkDiskMapper, + chunkRange: chunkRange, + samplesPerChunk: DefaultSamplesPerChunk, + } + h.initTime(0) s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) // Appending 2 samples for the first chunk. - ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, chunkCreated := s.append(0, 0, 0, cOpts) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, chunkCreated = s.append(999, 999, 0, cOpts) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") // A new chunks should be created here as it's beyond the chunk range. - ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, chunkCreated = s.append(1000, 1000, 0, cOpts) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, chunkCreated = s.append(1999, 1999, 0, cOpts) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") @@ -1791,14 +1824,20 @@ func TestHeadReadWriterRepair(t *testing.T) { require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.mmapChunkCorruptionTotal)) require.NoError(t, h.Init(math.MinInt64)) + cOpts := chunkOpts{ + chunkDiskMapper: h.chunkDiskMapper, + chunkRange: chunkRange, + samplesPerChunk: DefaultSamplesPerChunk, + } + s, created, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) require.True(t, created, "series was not created") for i := 0; i < 7; i++ { - ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, cOpts) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunk was not created") - ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, cOpts) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunk was created") h.chunkDiskMapper.CutNewFile() @@ -2144,9 +2183,15 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) { h.initTime(0) + cOpts := chunkOpts{ + chunkDiskMapper: h.chunkDiskMapper, + chunkRange: h.chunkRange.Load(), + samplesPerChunk: DefaultSamplesPerChunk, + } + s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) - ok, _ := s.append(0, 0, 0, h.chunkDiskMapper, h.chunkRange.Load(), DefaultSamplesPerChunk) + ok, _ := s.append(0, 0, 0, cOpts) require.True(t, ok, "Series append failed.") require.Equal(t, 0, s.txs.txIDCount, "Series should not have an appendID after append with appendID=0.") } @@ -2608,12 +2653,16 @@ func TestIteratorSeekIntoBuffer(t *testing.T) { defer func() { require.NoError(t, chunkDiskMapper.Close()) }() - const chunkRange = 500 + cOpts := chunkOpts{ + chunkDiskMapper: chunkDiskMapper, + chunkRange: 500, + samplesPerChunk: DefaultSamplesPerChunk, + } s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) for i := 0; i < 7; i++ { - ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) + ok, _ := s.append(int64(i), float64(i), 0, cOpts) require.True(t, ok, "sample append failed") } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 9741d1da04..71120c55e1 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -564,7 +564,11 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp minValidTime := h.minValidTime.Load() mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) - chunkRange := h.chunkRange.Load() + appendChunkOpts := chunkOpts{ + chunkDiskMapper: h.chunkDiskMapper, + chunkRange: h.chunkRange.Load(), + samplesPerChunk: h.opts.SamplesPerChunk, + } for in := range wp.input { if in.existingSeries != nil { @@ -588,7 +592,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp if s.T <= ms.mmMaxTime { continue } - if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper, chunkRange, h.opts.SamplesPerChunk); chunkCreated { + if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() } @@ -618,9 +622,9 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp } var chunkCreated bool if s.h != nil { - _, chunkCreated = ms.appendHistogram(s.t, s.h, 0, h.chunkDiskMapper, chunkRange, h.opts.SamplesPerChunk) + _, chunkCreated = ms.appendHistogram(s.t, s.h, 0, appendChunkOpts) } else { - _, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, h.chunkDiskMapper, chunkRange, h.opts.SamplesPerChunk) + _, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, appendChunkOpts) } if chunkCreated { h.metrics.chunksCreated.Inc() From e73d8b208408cf5566541a68c610776935f8789d Mon Sep 17 00:00:00 2001 From: Justin Lei Date: Thu, 25 May 2023 13:35:09 -0700 Subject: [PATCH 2/2] Also pass chunkOpts into appendPreprocessor Signed-off-by: Justin Lei --- tsdb/head_append.go | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index a77c8a4ebc..3d828d0667 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1134,7 +1134,7 @@ type chunkOpts struct { // isolation for this append.) // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { - c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, o.chunkDiskMapper, o.chunkRange, o.samplesPerChunk) + c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, o) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1168,7 +1168,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui pMergedSpans, nMergedSpans []histogram.Span okToAppend, counterReset, gauge bool ) - c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, o.chunkDiskMapper, o.chunkRange, o.samplesPerChunk) + c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, o) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1262,7 +1262,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, pMergedSpans, nMergedSpans []histogram.Span okToAppend, counterReset, gauge bool ) - c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, o.chunkDiskMapper, o.chunkRange, o.samplesPerChunk) + c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, o) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1344,9 +1344,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, // appendPreprocessor takes care of cutting new chunks and m-mapping old chunks. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. // This should be called only when appending data. -func (s *memSeries) appendPreprocessor( - t int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64, samplesPerChunk int, -) (c *memChunk, sampleInOrder, chunkCreated bool) { +func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) { c = s.head() if c == nil { @@ -1355,7 +1353,7 @@ func (s *memSeries) appendPreprocessor( return c, false, false } // There is no head chunk in this series yet, create the first chunk for the sample. - c = s.cutNewHeadChunk(t, e, chunkDiskMapper, chunkRange) + c = s.cutNewHeadChunk(t, e, o.chunkDiskMapper, o.chunkRange) chunkCreated = true } @@ -1367,7 +1365,7 @@ func (s *memSeries) appendPreprocessor( if c.chunk.Encoding() != e { // The chunk encoding expected by this append is different than the head chunk's // encoding. So we cut a new chunk with the expected encoding. - c = s.cutNewHeadChunk(t, e, chunkDiskMapper, chunkRange) + c = s.cutNewHeadChunk(t, e, o.chunkDiskMapper, o.chunkRange) chunkCreated = true } @@ -1376,14 +1374,14 @@ func (s *memSeries) appendPreprocessor( // It could be the new chunk created after reading the chunk snapshot, // hence we fix the minTime of the chunk here. c.minTime = t - s.nextAt = rangeForTimestamp(c.minTime, chunkRange) + s.nextAt = rangeForTimestamp(c.minTime, o.chunkRange) } // If we reach 25% of a chunk's desired sample count, predict an end time // for this chunk that will try to make samples equally distributed within // the remaining chunks in the current chunk range. // At latest it must happen at the timestamp set when the chunk was cut. - if numSamples == samplesPerChunk/4 { + if numSamples == o.samplesPerChunk/4 { s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt) } // If numSamples > samplesPerChunk*2 then our previous prediction was invalid, @@ -1391,8 +1389,8 @@ func (s *memSeries) appendPreprocessor( // Since we assume that the rate is higher, we're being conservative and cutting at 2*samplesPerChunk // as we expect more chunks to come. // Note that next chunk will have its nextAt recalculated for the new rate. - if t >= s.nextAt || numSamples >= samplesPerChunk*2 { - c = s.cutNewHeadChunk(t, e, chunkDiskMapper, chunkRange) + if t >= s.nextAt || numSamples >= o.samplesPerChunk*2 { + c = s.cutNewHeadChunk(t, e, o.chunkDiskMapper, o.chunkRange) chunkCreated = true }