diff --git a/storage/merge.go b/storage/merge.go index 7726f9bdc..2f175d3e7 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -717,3 +717,56 @@ func (h *chunkIteratorHeap) Pop() interface{} { *h = old[0 : n-1] return x } + +// NewConcatenatingChunkSeriesMerger returns a VerticalChunkSeriesMergeFunc that simply concatenates the +// chunks from the series. The resultant stream of chunks for a series might be overlapping and unsorted. +func NewConcatenatingChunkSeriesMerger() VerticalChunkSeriesMergeFunc { + return func(series ...ChunkSeries) ChunkSeries { + if len(series) == 0 { + return nil + } + return &ChunkSeriesEntry{ + Lset: series[0].Labels(), + ChunkIteratorFn: func() chunks.Iterator { + iterators := make([]chunks.Iterator, 0, len(series)) + for _, s := range series { + iterators = append(iterators, s.Iterator()) + } + return &concatenatingChunkIterator{ + iterators: iterators, + } + }, + } + } +} + +type concatenatingChunkIterator struct { + iterators []chunks.Iterator + idx int + + curr chunks.Meta +} + +func (c *concatenatingChunkIterator) At() chunks.Meta { + return c.curr +} + +func (c *concatenatingChunkIterator) Next() bool { + if c.idx >= len(c.iterators) { + return false + } + if c.iterators[c.idx].Next() { + c.curr = c.iterators[c.idx].At() + return true + } + c.idx++ + return c.Next() +} + +func (c *concatenatingChunkIterator) Err() error { + errs := tsdb_errors.NewMulti() + for _, iter := range c.iterators { + errs.Add(iter.Err()) + } + return errs.Err() +} diff --git a/storage/merge_test.go b/storage/merge_test.go index 933e5b1bd..acd5a41e5 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -499,6 +499,140 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { } } +func TestConcatenatingChunkSeriesMerger(t *testing.T) { + m := NewConcatenatingChunkSeriesMerger() + + for _, tc := range []struct { + name string + input []ChunkSeries + expected ChunkSeries + }{ + { + name: "single empty series", + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil), + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil), + }, + { + name: "single series", + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}), + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}), + }, + { + name: "two empty series", + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil), + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil, nil), + }, + { + name: "two non overlapping", + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{5, 5}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{5, 5}}, []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + }, + { + name: "two overlapping", + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{8, 8}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{8, 8}}, + []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}, + ), + }, + { + name: "two duplicated", + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{5, 5}}), + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}, + []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{5, 5}}, + ), + }, + { + name: "three overlapping", + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{6, 6}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{4, 4}}), + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}, + []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{6, 6}}, + []tsdbutil.Sample{sample{0, 0}, sample{4, 4}}, + ), + }, + { + name: "three in chained overlap", + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{4, 4}, sample{6, 66}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{6, 6}, sample{10, 10}}), + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}, + []tsdbutil.Sample{sample{4, 4}, sample{6, 66}}, + []tsdbutil.Sample{sample{6, 6}, sample{10, 10}}, + ), + }, + { + name: "three in chained overlap complex", + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{5, 5}}, []tsdbutil.Sample{sample{10, 10}, sample{15, 15}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{20, 20}}, []tsdbutil.Sample{sample{25, 25}, sample{30, 30}}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{18, 18}, sample{26, 26}}, []tsdbutil.Sample{sample{31, 31}, sample{35, 35}}), + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []tsdbutil.Sample{sample{0, 0}, sample{5, 5}}, []tsdbutil.Sample{sample{10, 10}, sample{15, 15}}, + []tsdbutil.Sample{sample{2, 2}, sample{20, 20}}, []tsdbutil.Sample{sample{25, 25}, sample{30, 30}}, + []tsdbutil.Sample{sample{18, 18}, sample{26, 26}}, []tsdbutil.Sample{sample{31, 31}, sample{35, 35}}, + ), + }, + { + name: "110 overlapping", + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(0, 110)), // [0 - 110) + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(60, 50)), // [60 - 110) + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + tsdbutil.GenerateSamples(0, 110), + tsdbutil.GenerateSamples(60, 50), + ), + }, + { + name: "150 overlapping samples, simply concatenated and no splits", + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(0, 90)), // [0 - 90) + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(60, 90)), // [90 - 150) + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + tsdbutil.GenerateSamples(0, 90), + tsdbutil.GenerateSamples(60, 90), + ), + }, + } { + t.Run(tc.name, func(t *testing.T) { + merged := m(tc.input...) + require.Equal(t, tc.expected.Labels(), merged.Labels()) + actChks, actErr := ExpandChunks(merged.Iterator()) + expChks, expErr := ExpandChunks(tc.expected.Iterator()) + + require.Equal(t, expErr, actErr) + require.Equal(t, expChks, actChks) + }) + } +} + type mockQuerier struct { LabelQuerier diff --git a/tsdb/blockwriter.go b/tsdb/blockwriter.go index 4e9afda61..fe798a350 100644 --- a/tsdb/blockwriter.go +++ b/tsdb/blockwriter.go @@ -39,7 +39,7 @@ type BlockWriter struct { } // ErrNoSeriesAppended is returned if the series count is zero while flushing blocks. -var ErrNoSeriesAppended error = errors.New("no series appended, aborting") +var ErrNoSeriesAppended = errors.New("no series appended, aborting") // NewBlockWriter create a new block writer. // diff --git a/tsdb/db.go b/tsdb/db.go index f14cb974f..9a2d3b38c 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1817,8 +1817,9 @@ func (db *DB) Querier(_ context.Context, mint, maxt int64) (storage.Querier, err return storage.NewMergeQuerier(blockQueriers, nil, storage.ChainedSeriesMerge), nil } -// ChunkQuerier returns a new chunk querier over the data partition for the given time range. -func (db *DB) ChunkQuerier(_ context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { +// blockQueriersForRange returns individual block chunk queriers from the persistent blocks, in-order head block, and the +// out-of-order head block, overlapping with the given time range. +func (db *DB) blockChunkQuerierForRange(mint, maxt int64) ([]storage.ChunkQuerier, error) { var blocks []BlockReader db.mtx.RLock() @@ -1888,9 +1889,28 @@ func (db *DB) ChunkQuerier(_ context.Context, mint, maxt int64) (storage.ChunkQu blockQueriers = append(blockQueriers, outOfOrderHeadQuerier) } + return blockQueriers, nil +} + +// ChunkQuerier returns a new chunk querier over the data partition for the given time range. +func (db *DB) ChunkQuerier(_ context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { + blockQueriers, err := db.blockChunkQuerierForRange(mint, maxt) + if err != nil { + return nil, err + } return storage.NewMergeChunkQuerier(blockQueriers, nil, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)), nil } +// UnorderedChunkQuerier returns a new chunk querier over the data partition for the given time range. +// The chunks can be overlapping and not sorted. +func (db *DB) UnorderedChunkQuerier(_ context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { + blockQueriers, err := db.blockChunkQuerierForRange(mint, maxt) + if err != nil { + return nil, err + } + return storage.NewMergeChunkQuerier(blockQueriers, nil, storage.NewConcatenatingChunkSeriesMerger()), nil +} + func (db *DB) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { return db.head.exemplars.ExemplarQuerier(ctx) }