diff --git a/storage/merge_test.go b/storage/merge_test.go index 2ed68879a..90f5725e1 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -465,6 +465,27 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { []tsdbutil.Sample{sample{31, 31}, sample{35, 35}}, ), }, + { + name: "110 overlapping", + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(0, 110)), // [0 - 110) + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(60, 50)), // [60 - 110) + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + tsdbutil.GenerateSamples(0, 110), + ), + }, + { + name: "150 overlapping samples, split chunk", + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(0, 90)), // [0 - 90) + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(60, 90)), // [90 - 150) + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + tsdbutil.GenerateSamples(0, 120), + tsdbutil.GenerateSamples(120, 30), + ), + }, } { t.Run(tc.name, func(t *testing.T) { merged := m(tc.input...) diff --git a/storage/series.go b/storage/series.go index c5440a45a..5f16d053c 100644 --- a/storage/series.go +++ b/storage/series.go @@ -217,7 +217,8 @@ type seriesToChunkEncoder struct { Series } -// TODO(bwplotka): Currently encoder will just naively build one chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670 +const seriesToChunkEncoderSplit = 120 + func (s *seriesToChunkEncoder) Iterator() chunks.Iterator { chk := chunkenc.NewXORChunk() app, err := chk.Appender() @@ -227,8 +228,28 @@ func (s *seriesToChunkEncoder) Iterator() chunks.Iterator { mint := int64(math.MaxInt64) maxt := int64(math.MinInt64) + chks := []chunks.Meta{} + + i := 0 seriesIter := s.Series.Iterator() for seriesIter.Next() { + // Create a new chunk if too many samples in the current one. + if i >= seriesToChunkEncoderSplit { + chks = append(chks, chunks.Meta{ + MinTime: mint, + MaxTime: maxt, + Chunk: chk, + }) + chk = chunkenc.NewXORChunk() + app, err = chk.Appender() + if err != nil { + return errChunksIterator{err: err} + } + mint = int64(math.MaxInt64) + // maxt is immediately overwritten below which is why setting it here won't make a difference. + i = 0 + } + t, v := seriesIter.At() app.Append(t, v) @@ -236,16 +257,19 @@ func (s *seriesToChunkEncoder) Iterator() chunks.Iterator { if mint == math.MaxInt64 { mint = t } + i++ } if err := seriesIter.Err(); err != nil { return errChunksIterator{err: err} } - return NewListChunkSeriesIterator(chunks.Meta{ + chks = append(chks, chunks.Meta{ MinTime: mint, MaxTime: maxt, Chunk: chk, }) + + return NewListChunkSeriesIterator(chks...) } type errChunksIterator struct { diff --git a/tsdb/tsdbutil/chunks.go b/tsdb/tsdbutil/chunks.go index 47760453e..5ae58b0a8 100644 --- a/tsdb/tsdbutil/chunks.go +++ b/tsdb/tsdbutil/chunks.go @@ -65,3 +65,15 @@ func PopulatedChunk(numSamples int, minTime int64) chunks.Meta { } return ChunkFromSamples(samples) } + +// GenerateSamples starting at start and counting up numSamples. +func GenerateSamples(start int, numSamples int) []Sample { + samples := make([]Sample, 0, numSamples) + for i := start; i < start+numSamples; i++ { + samples = append(samples, sample{ + t: int64(i), + v: float64(i), + }) + } + return samples +}