Allow to disable trimming when querying TSDB (#9647)
* Allow to disable trimming when querying TSDB Signed-off-by: Marco Pracucci <marco@pracucci.com> * Addressed review comments Signed-off-by: Marco Pracucci <marco@pracucci.com> * Added unit test Signed-off-by: Marco Pracucci <marco@pracucci.com> * Renamed TrimDisabled to DisableTrimming Signed-off-by: Marco Pracucci <marco@pracucci.com>
This commit is contained in:
parent
f82e56fbba
commit
9f5ff5b269
|
@ -145,6 +145,11 @@ type SelectHints struct {
|
||||||
Grouping []string // List of label names used in aggregation.
|
Grouping []string // List of label names used in aggregation.
|
||||||
By bool // Indicate whether it is without or by.
|
By bool // Indicate whether it is without or by.
|
||||||
Range int64 // Range vector selector range in milliseconds.
|
Range int64 // Range vector selector range in milliseconds.
|
||||||
|
|
||||||
|
// DisableTrimming allows to disable trimming of matching series chunks based on query Start and End time.
|
||||||
|
// When disabled, the result may contain samples outside the queried time range but Select() performances
|
||||||
|
// may be improved.
|
||||||
|
DisableTrimming bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(bwplotka): Move to promql/engine_test.go?
|
// TODO(bwplotka): Move to promql/engine_test.go?
|
||||||
|
|
|
@ -726,7 +726,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||||
}
|
}
|
||||||
all = indexr.SortedPostings(all)
|
all = indexr.SortedPostings(all)
|
||||||
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
|
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
|
||||||
sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime-1))
|
sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime-1, false))
|
||||||
syms := indexr.Symbols()
|
syms := indexr.Symbols()
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
symbols = syms
|
symbols = syms
|
||||||
|
|
|
@ -123,6 +123,8 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) {
|
||||||
func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
|
func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
|
||||||
mint := q.mint
|
mint := q.mint
|
||||||
maxt := q.maxt
|
maxt := q.maxt
|
||||||
|
disableTrimming := false
|
||||||
|
|
||||||
p, err := PostingsForMatchers(q.index, ms...)
|
p, err := PostingsForMatchers(q.index, ms...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return storage.ErrSeriesSet(err)
|
return storage.ErrSeriesSet(err)
|
||||||
|
@ -134,13 +136,14 @@ func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ..
|
||||||
if hints != nil {
|
if hints != nil {
|
||||||
mint = hints.Start
|
mint = hints.Start
|
||||||
maxt = hints.End
|
maxt = hints.End
|
||||||
|
disableTrimming = hints.DisableTrimming
|
||||||
if hints.Func == "series" {
|
if hints.Func == "series" {
|
||||||
// When you're only looking up metadata (for example series API), you don't need to load any chunks.
|
// When you're only looking up metadata (for example series API), you don't need to load any chunks.
|
||||||
return newBlockSeriesSet(q.index, newNopChunkReader(), q.tombstones, p, mint, maxt)
|
return newBlockSeriesSet(q.index, newNopChunkReader(), q.tombstones, p, mint, maxt, disableTrimming)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return newBlockSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt)
|
return newBlockSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming)
|
||||||
}
|
}
|
||||||
|
|
||||||
// blockChunkQuerier provides chunk querying access to a single block database.
|
// blockChunkQuerier provides chunk querying access to a single block database.
|
||||||
|
@ -160,9 +163,11 @@ func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier
|
||||||
func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet {
|
func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet {
|
||||||
mint := q.mint
|
mint := q.mint
|
||||||
maxt := q.maxt
|
maxt := q.maxt
|
||||||
|
disableTrimming := false
|
||||||
if hints != nil {
|
if hints != nil {
|
||||||
mint = hints.Start
|
mint = hints.Start
|
||||||
maxt = hints.End
|
maxt = hints.End
|
||||||
|
disableTrimming = hints.DisableTrimming
|
||||||
}
|
}
|
||||||
p, err := PostingsForMatchers(q.index, ms...)
|
p, err := PostingsForMatchers(q.index, ms...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -171,7 +176,7 @@ func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints,
|
||||||
if sortSeries {
|
if sortSeries {
|
||||||
p = q.index.SortedPostings(p)
|
p = q.index.SortedPostings(p)
|
||||||
}
|
}
|
||||||
return newBlockChunkSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt)
|
return newBlockChunkSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming)
|
||||||
}
|
}
|
||||||
|
|
||||||
func findSetMatches(pattern string) []string {
|
func findSetMatches(pattern string) []string {
|
||||||
|
@ -433,6 +438,7 @@ type blockBaseSeriesSet struct {
|
||||||
chunks ChunkReader
|
chunks ChunkReader
|
||||||
tombstones tombstones.Reader
|
tombstones tombstones.Reader
|
||||||
mint, maxt int64
|
mint, maxt int64
|
||||||
|
disableTrimming bool
|
||||||
|
|
||||||
currIterFn func() *populateWithDelGenericSeriesIterator
|
currIterFn func() *populateWithDelGenericSeriesIterator
|
||||||
currLabels labels.Labels
|
currLabels labels.Labels
|
||||||
|
@ -487,6 +493,7 @@ func (b *blockBaseSeriesSet) Next() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// If still not entirely deleted, check if trim is needed based on requested time range.
|
// If still not entirely deleted, check if trim is needed based on requested time range.
|
||||||
|
if !b.disableTrimming {
|
||||||
if chk.MinTime < b.mint {
|
if chk.MinTime < b.mint {
|
||||||
trimFront = true
|
trimFront = true
|
||||||
}
|
}
|
||||||
|
@ -494,6 +501,7 @@ func (b *blockBaseSeriesSet) Next() bool {
|
||||||
trimBack = true
|
trimBack = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if len(chks) == 0 {
|
if len(chks) == 0 {
|
||||||
continue
|
continue
|
||||||
|
@ -722,7 +730,7 @@ type blockSeriesSet struct {
|
||||||
blockBaseSeriesSet
|
blockBaseSeriesSet
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.SeriesSet {
|
func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64, disableTrimming bool) storage.SeriesSet {
|
||||||
return &blockSeriesSet{
|
return &blockSeriesSet{
|
||||||
blockBaseSeriesSet{
|
blockBaseSeriesSet{
|
||||||
index: i,
|
index: i,
|
||||||
|
@ -731,6 +739,7 @@ func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p inde
|
||||||
p: p,
|
p: p,
|
||||||
mint: mint,
|
mint: mint,
|
||||||
maxt: maxt,
|
maxt: maxt,
|
||||||
|
disableTrimming: disableTrimming,
|
||||||
bufLbls: make(labels.Labels, 0, 10),
|
bufLbls: make(labels.Labels, 0, 10),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -754,7 +763,7 @@ type blockChunkSeriesSet struct {
|
||||||
blockBaseSeriesSet
|
blockBaseSeriesSet
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.ChunkSeriesSet {
|
func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64, disableTrimming bool) storage.ChunkSeriesSet {
|
||||||
return &blockChunkSeriesSet{
|
return &blockChunkSeriesSet{
|
||||||
blockBaseSeriesSet{
|
blockBaseSeriesSet{
|
||||||
index: i,
|
index: i,
|
||||||
|
@ -763,6 +772,7 @@ func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p
|
||||||
p: p,
|
p: p,
|
||||||
mint: mint,
|
mint: mint,
|
||||||
maxt: maxt,
|
maxt: maxt,
|
||||||
|
disableTrimming: disableTrimming,
|
||||||
bufLbls: make(labels.Labels, 0, 10),
|
bufLbls: make(labels.Labels, 0, 10),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -162,6 +162,7 @@ func createIdxChkReaders(t *testing.T, tc []seriesSamples) (IndexReader, ChunkRe
|
||||||
type blockQuerierTestCase struct {
|
type blockQuerierTestCase struct {
|
||||||
mint, maxt int64
|
mint, maxt int64
|
||||||
ms []*labels.Matcher
|
ms []*labels.Matcher
|
||||||
|
hints *storage.SelectHints
|
||||||
exp storage.SeriesSet
|
exp storage.SeriesSet
|
||||||
expChks storage.ChunkSeriesSet
|
expChks storage.ChunkSeriesSet
|
||||||
}
|
}
|
||||||
|
@ -179,7 +180,7 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
res := q.Select(false, nil, c.ms...)
|
res := q.Select(false, c.hints, c.ms...)
|
||||||
defer func() { require.NoError(t, q.Close()) }()
|
defer func() { require.NoError(t, q.Close()) }()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
@ -214,7 +215,7 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C
|
||||||
maxt: c.maxt,
|
maxt: c.maxt,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
res := q.Select(false, nil, c.ms...)
|
res := q.Select(false, c.hints, c.ms...)
|
||||||
defer func() { require.NoError(t, q.Close()) }()
|
defer func() { require.NoError(t, q.Close()) }()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
@ -319,6 +320,56 @@ func TestBlockQuerier(t *testing.T) {
|
||||||
),
|
),
|
||||||
}),
|
}),
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
// This test runs a query disabling trimming. All chunks containing at least 1 sample within the queried
|
||||||
|
// time range will be returned.
|
||||||
|
mint: 2,
|
||||||
|
maxt: 6,
|
||||||
|
hints: &storage.SelectHints{Start: 2, End: 6, DisableTrimming: true},
|
||||||
|
ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "a")},
|
||||||
|
exp: newMockSeriesSet([]storage.Series{
|
||||||
|
storage.NewListSeries(labels.Labels{{Name: "a", Value: "a"}},
|
||||||
|
[]tsdbutil.Sample{sample{1, 2}, sample{2, 3}, sample{3, 4}, sample{5, 2}, sample{6, 3}, sample{7, 4}},
|
||||||
|
),
|
||||||
|
storage.NewListSeries(labels.Labels{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}},
|
||||||
|
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 3}, sample{6, 6}},
|
||||||
|
),
|
||||||
|
}),
|
||||||
|
expChks: newMockChunkSeriesSet([]storage.ChunkSeries{
|
||||||
|
storage.NewListChunkSeriesFromSamples(labels.Labels{{Name: "a", Value: "a"}},
|
||||||
|
[]tsdbutil.Sample{sample{1, 2}, sample{2, 3}, sample{3, 4}},
|
||||||
|
[]tsdbutil.Sample{sample{5, 2}, sample{6, 3}, sample{7, 4}},
|
||||||
|
),
|
||||||
|
storage.NewListChunkSeriesFromSamples(labels.Labels{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}},
|
||||||
|
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}},
|
||||||
|
[]tsdbutil.Sample{sample{5, 3}, sample{6, 6}},
|
||||||
|
),
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// This test runs a query disabling trimming. All chunks containing at least 1 sample within the queried
|
||||||
|
// time range will be returned.
|
||||||
|
mint: 5,
|
||||||
|
maxt: 6,
|
||||||
|
hints: &storage.SelectHints{Start: 5, End: 6, DisableTrimming: true},
|
||||||
|
ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "a")},
|
||||||
|
exp: newMockSeriesSet([]storage.Series{
|
||||||
|
storage.NewListSeries(labels.Labels{{Name: "a", Value: "a"}},
|
||||||
|
[]tsdbutil.Sample{sample{5, 2}, sample{6, 3}, sample{7, 4}},
|
||||||
|
),
|
||||||
|
storage.NewListSeries(labels.Labels{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}},
|
||||||
|
[]tsdbutil.Sample{sample{5, 3}, sample{6, 6}},
|
||||||
|
),
|
||||||
|
}),
|
||||||
|
expChks: newMockChunkSeriesSet([]storage.ChunkSeries{
|
||||||
|
storage.NewListChunkSeriesFromSamples(labels.Labels{{Name: "a", Value: "a"}},
|
||||||
|
[]tsdbutil.Sample{sample{5, 2}, sample{6, 3}, sample{7, 4}},
|
||||||
|
),
|
||||||
|
storage.NewListChunkSeriesFromSamples(labels.Labels{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}},
|
||||||
|
[]tsdbutil.Sample{sample{5, 3}, sample{6, 6}},
|
||||||
|
),
|
||||||
|
}),
|
||||||
|
},
|
||||||
} {
|
} {
|
||||||
t.Run("", func(t *testing.T) {
|
t.Run("", func(t *testing.T) {
|
||||||
ir, cr, _, _ := createIdxChkReaders(t, testData)
|
ir, cr, _, _ := createIdxChkReaders(t, testData)
|
||||||
|
|
Loading…
Reference in New Issue