mirror of
https://github.com/prometheus/prometheus
synced 2025-01-15 19:32:05 +00:00
Merge pull request #14714 from krajorama/fix-ooo-chunkoriterablewithcopy
tsdb: Make requesting merge with OOO head explicit in chunk.Meta
This commit is contained in:
commit
1b86d54c7f
@ -133,6 +133,9 @@ type Meta struct {
|
||||
// Time range the data covers.
|
||||
// When MaxTime == math.MaxInt64 the chunk is still open and being appended to.
|
||||
MinTime, MaxTime int64
|
||||
|
||||
// Flag to indicate that this meta needs merge with OOO data.
|
||||
MergeOOO bool
|
||||
}
|
||||
|
||||
// ChunkFromSamples requires all samples to have the same type.
|
||||
|
337
tsdb/db_test.go
337
tsdb/db_test.go
@ -5036,16 +5036,15 @@ func testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T, scenario sa
|
||||
|
||||
func Test_Querier_OOOQuery(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.OutOfOrderCapMax = 30
|
||||
opts.OutOfOrderTimeWindow = 24 * time.Hour.Milliseconds()
|
||||
|
||||
series1 := labels.FromStrings("foo", "bar1")
|
||||
|
||||
type filterFunc func(t int64) bool
|
||||
defaultFilterFunc := func(t int64) bool { return true }
|
||||
|
||||
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
|
||||
addSample := func(db *DB, fromMins, toMins, queryMinT, queryMaxT int64, expSamples []chunks.Sample, filter func(int64) bool) ([]chunks.Sample, int) {
|
||||
if filter == nil {
|
||||
filter = func(int64) bool { return true }
|
||||
}
|
||||
addSample := func(db *DB, fromMins, toMins, queryMinT, queryMaxT int64, expSamples []chunks.Sample, filter filterFunc) ([]chunks.Sample, int) {
|
||||
app := db.Appender(context.Background())
|
||||
totalAppended := 0
|
||||
for m := fromMins; m <= toMins; m += time.Minute.Milliseconds() {
|
||||
@ -5060,68 +5059,173 @@ func Test_Querier_OOOQuery(t *testing.T) {
|
||||
totalAppended++
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
require.Positive(t, totalAppended, 0) // Sanity check that filter is not too zealous.
|
||||
return expSamples, totalAppended
|
||||
}
|
||||
|
||||
type sampleBatch struct {
|
||||
minT int64
|
||||
maxT int64
|
||||
filter filterFunc
|
||||
isOOO bool
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
oooCap int64
|
||||
queryMinT int64
|
||||
queryMaxT int64
|
||||
inOrderMinT int64
|
||||
inOrderMaxT int64
|
||||
oooMinT int64
|
||||
oooMaxT int64
|
||||
batches []sampleBatch
|
||||
}{
|
||||
{
|
||||
name: "query interval covering ooomint and inordermaxt returns all ingested samples",
|
||||
oooCap: 30,
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(200),
|
||||
inOrderMinT: minutes(100),
|
||||
inOrderMaxT: minutes(200),
|
||||
oooMinT: minutes(0),
|
||||
oooMaxT: minutes(99),
|
||||
batches: []sampleBatch{
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(200),
|
||||
filter: defaultFilterFunc,
|
||||
},
|
||||
{
|
||||
minT: minutes(0),
|
||||
maxT: minutes(99),
|
||||
filter: defaultFilterFunc,
|
||||
isOOO: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "partial query interval returns only samples within interval",
|
||||
oooCap: 30,
|
||||
queryMinT: minutes(20),
|
||||
queryMaxT: minutes(180),
|
||||
inOrderMinT: minutes(100),
|
||||
inOrderMaxT: minutes(200),
|
||||
oooMinT: minutes(0),
|
||||
oooMaxT: minutes(99),
|
||||
batches: []sampleBatch{
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(200),
|
||||
filter: defaultFilterFunc,
|
||||
},
|
||||
{
|
||||
name: "query overlapping inorder and ooo samples returns all ingested samples",
|
||||
minT: minutes(0),
|
||||
maxT: minutes(99),
|
||||
filter: defaultFilterFunc,
|
||||
isOOO: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "query overlapping inorder and ooo samples returns all ingested samples at the end of the interval",
|
||||
oooCap: 30,
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(200),
|
||||
inOrderMinT: minutes(100),
|
||||
inOrderMaxT: minutes(200),
|
||||
oooMinT: minutes(180 - opts.OutOfOrderCapMax/2), // Make sure to fit into the OOO head.
|
||||
oooMaxT: minutes(180),
|
||||
batches: []sampleBatch{
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(200),
|
||||
filter: func(t int64) bool { return t%2 == 0 },
|
||||
isOOO: false,
|
||||
},
|
||||
{
|
||||
minT: minutes(170),
|
||||
maxT: minutes(180),
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "query overlapping inorder and ooo in-memory samples returns all ingested samples at the beginning of the interval",
|
||||
oooCap: 30,
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(200),
|
||||
batches: []sampleBatch{
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(200),
|
||||
filter: func(t int64) bool { return t%2 == 0 },
|
||||
isOOO: false,
|
||||
},
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(110),
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "query inorder contain ooo mmaped samples returns all ingested samples at the beginning of the interval",
|
||||
oooCap: 5,
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(200),
|
||||
batches: []sampleBatch{
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(200),
|
||||
filter: func(t int64) bool { return t%2 == 0 },
|
||||
isOOO: false,
|
||||
},
|
||||
{
|
||||
minT: minutes(101),
|
||||
maxT: minutes(101 + (5-1)*2), // Append samples to fit in a single mmmaped OOO chunk and fit inside the first in-order mmaped chunk.
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
{
|
||||
minT: minutes(191),
|
||||
maxT: minutes(193), // Append some more OOO samples to trigger mapping the OOO chunk, but use time 151 to not overlap with in-order head chunk.
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "query overlapping inorder and ooo mmaped samples returns all ingested samples at the beginning of the interval",
|
||||
oooCap: 30,
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(200),
|
||||
batches: []sampleBatch{
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(200),
|
||||
filter: func(t int64) bool { return t%2 == 0 },
|
||||
isOOO: false,
|
||||
},
|
||||
{
|
||||
minT: minutes(101),
|
||||
maxT: minutes(101 + (30-1)*2), // Append samples to fit in a single mmmaped OOO chunk and overlap the first in-order mmaped chunk.
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
{
|
||||
minT: minutes(191),
|
||||
maxT: minutes(193), // Append some more OOO samples to trigger mapping the OOO chunk, but use time 151 to not overlap with in-order head chunk.
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
|
||||
opts.OutOfOrderCapMax = tc.oooCap
|
||||
db := openTestDB(t, opts, nil)
|
||||
db.DisableCompactions()
|
||||
defer func() {
|
||||
require.NoError(t, db.Close())
|
||||
}()
|
||||
|
||||
var (
|
||||
expSamples []chunks.Sample
|
||||
inoSamples int
|
||||
)
|
||||
var expSamples []chunks.Sample
|
||||
var oooSamples, appendedCount int
|
||||
|
||||
// Add in-order samples (at even minutes).
|
||||
expSamples, inoSamples = addSample(db, tc.inOrderMinT, tc.inOrderMaxT, tc.queryMinT, tc.queryMaxT, expSamples, func(t int64) bool { return t%2 == 0 })
|
||||
// Sanity check that filter is not too zealous.
|
||||
require.Positive(t, inoSamples, 0)
|
||||
|
||||
// Add out-of-order samples (at odd minutes).
|
||||
expSamples, oooSamples := addSample(db, tc.oooMinT, tc.oooMaxT, tc.queryMinT, tc.queryMaxT, expSamples, func(t int64) bool { return t%2 == 1 })
|
||||
// Sanity check that filter is not too zealous.
|
||||
require.Positive(t, oooSamples, 0)
|
||||
for _, batch := range tc.batches {
|
||||
expSamples, appendedCount = addSample(db, batch.minT, batch.maxT, tc.queryMinT, tc.queryMaxT, expSamples, batch.filter)
|
||||
if batch.isOOO {
|
||||
oooSamples += appendedCount
|
||||
}
|
||||
}
|
||||
|
||||
sort.Slice(expSamples, func(i, j int) bool {
|
||||
return expSamples[i].T() < expSamples[j].T()
|
||||
@ -5147,11 +5251,17 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) {
|
||||
|
||||
series1 := labels.FromStrings("foo", "bar1")
|
||||
|
||||
type filterFunc func(t int64) bool
|
||||
defaultFilterFunc := func(t int64) bool { return true }
|
||||
|
||||
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
|
||||
addSample := func(db *DB, fromMins, toMins, queryMinT, queryMaxT int64, expSamples []chunks.Sample) ([]chunks.Sample, int) {
|
||||
addSample := func(db *DB, fromMins, toMins, queryMinT, queryMaxT int64, expSamples []chunks.Sample, filter filterFunc) ([]chunks.Sample, int) {
|
||||
app := db.Appender(context.Background())
|
||||
totalAppended := 0
|
||||
for m := fromMins; m <= toMins; m += time.Minute.Milliseconds() {
|
||||
if !filter(m / time.Minute.Milliseconds()) {
|
||||
continue
|
||||
}
|
||||
_, err := app.Append(0, series1, m, float64(m))
|
||||
if m >= queryMinT && m <= queryMaxT {
|
||||
expSamples = append(expSamples, sample{t: m, f: float64(m)})
|
||||
@ -5160,39 +5270,158 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) {
|
||||
totalAppended++
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
require.Positive(t, totalAppended) // Sanity check that filter is not too zealous.
|
||||
return expSamples, totalAppended
|
||||
}
|
||||
|
||||
type sampleBatch struct {
|
||||
minT int64
|
||||
maxT int64
|
||||
filter filterFunc
|
||||
isOOO bool
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
oooCap int64
|
||||
queryMinT int64
|
||||
queryMaxT int64
|
||||
inOrderMinT int64
|
||||
inOrderMaxT int64
|
||||
oooMinT int64
|
||||
oooMaxT int64
|
||||
batches []sampleBatch
|
||||
}{
|
||||
{
|
||||
name: "query interval covering ooomint and inordermaxt returns all ingested samples",
|
||||
oooCap: 30,
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(200),
|
||||
inOrderMinT: minutes(100),
|
||||
inOrderMaxT: minutes(200),
|
||||
oooMinT: minutes(0),
|
||||
oooMaxT: minutes(99),
|
||||
batches: []sampleBatch{
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(200),
|
||||
filter: defaultFilterFunc,
|
||||
},
|
||||
{
|
||||
minT: minutes(0),
|
||||
maxT: minutes(99),
|
||||
filter: defaultFilterFunc,
|
||||
isOOO: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "partial query interval returns only samples within interval",
|
||||
oooCap: 30,
|
||||
queryMinT: minutes(20),
|
||||
queryMaxT: minutes(180),
|
||||
inOrderMinT: minutes(100),
|
||||
inOrderMaxT: minutes(200),
|
||||
oooMinT: minutes(0),
|
||||
oooMaxT: minutes(99),
|
||||
batches: []sampleBatch{
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(200),
|
||||
filter: defaultFilterFunc,
|
||||
},
|
||||
{
|
||||
minT: minutes(0),
|
||||
maxT: minutes(99),
|
||||
filter: defaultFilterFunc,
|
||||
isOOO: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "query overlapping inorder and ooo samples returns all ingested samples at the end of the interval",
|
||||
oooCap: 30,
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(200),
|
||||
batches: []sampleBatch{
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(200),
|
||||
filter: func(t int64) bool { return t%2 == 0 },
|
||||
isOOO: false,
|
||||
},
|
||||
{
|
||||
minT: minutes(170),
|
||||
maxT: minutes(180),
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "query overlapping inorder and ooo in-memory samples returns all ingested samples at the beginning of the interval",
|
||||
oooCap: 30,
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(200),
|
||||
batches: []sampleBatch{
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(200),
|
||||
filter: func(t int64) bool { return t%2 == 0 },
|
||||
isOOO: false,
|
||||
},
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(110),
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "query inorder contain ooo mmaped samples returns all ingested samples at the beginning of the interval",
|
||||
oooCap: 5,
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(200),
|
||||
batches: []sampleBatch{
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(200),
|
||||
filter: func(t int64) bool { return t%2 == 0 },
|
||||
isOOO: false,
|
||||
},
|
||||
{
|
||||
minT: minutes(101),
|
||||
maxT: minutes(101 + (5-1)*2), // Append samples to fit in a single mmmaped OOO chunk and fit inside the first in-order mmaped chunk.
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
{
|
||||
minT: minutes(191),
|
||||
maxT: minutes(193), // Append some more OOO samples to trigger mapping the OOO chunk, but use time 151 to not overlap with in-order head chunk.
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "query overlapping inorder and ooo mmaped samples returns all ingested samples at the beginning of the interval",
|
||||
oooCap: 30,
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(200),
|
||||
batches: []sampleBatch{
|
||||
{
|
||||
minT: minutes(100),
|
||||
maxT: minutes(200),
|
||||
filter: func(t int64) bool { return t%2 == 0 },
|
||||
isOOO: false,
|
||||
},
|
||||
{
|
||||
minT: minutes(101),
|
||||
maxT: minutes(101 + (30-1)*2), // Append samples to fit in a single mmmaped OOO chunk and overlap the first in-order mmaped chunk.
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
{
|
||||
minT: minutes(191),
|
||||
maxT: minutes(193), // Append some more OOO samples to trigger mapping the OOO chunk, but use time 151 to not overlap with in-order head chunk.
|
||||
filter: func(t int64) bool { return t%2 == 1 },
|
||||
isOOO: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
|
||||
opts.OutOfOrderCapMax = tc.oooCap
|
||||
db := openTestDB(t, opts, nil)
|
||||
db.DisableCompactions()
|
||||
defer func() {
|
||||
@ -5200,12 +5429,14 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) {
|
||||
}()
|
||||
|
||||
var expSamples []chunks.Sample
|
||||
var oooSamples, appendedCount int
|
||||
|
||||
// Add in-order samples.
|
||||
expSamples, _ = addSample(db, tc.inOrderMinT, tc.inOrderMaxT, tc.queryMinT, tc.queryMaxT, expSamples)
|
||||
|
||||
// Add out-of-order samples.
|
||||
expSamples, oooSamples := addSample(db, tc.oooMinT, tc.oooMaxT, tc.queryMinT, tc.queryMaxT, expSamples)
|
||||
for _, batch := range tc.batches {
|
||||
expSamples, appendedCount = addSample(db, batch.minT, batch.maxT, tc.queryMinT, tc.queryMaxT, expSamples, batch.filter)
|
||||
if batch.isOOO {
|
||||
oooSamples += appendedCount
|
||||
}
|
||||
}
|
||||
|
||||
sort.Slice(expSamples, func(i, j int) bool {
|
||||
return expSamples[i].T() < expSamples[j].T()
|
||||
|
@ -95,6 +95,7 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
|
||||
MaxTime: maxT,
|
||||
Ref: ref,
|
||||
Chunk: chunk,
|
||||
MergeOOO: true,
|
||||
})
|
||||
}
|
||||
|
||||
@ -160,6 +161,7 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
|
||||
if c.Chunk != nil {
|
||||
(*chks)[len(*chks)-1].Chunk = c.Chunk
|
||||
}
|
||||
(*chks)[len(*chks)-1].MergeOOO = (*chks)[len(*chks)-1].MergeOOO || c.MergeOOO
|
||||
}
|
||||
}
|
||||
|
||||
@ -241,8 +243,8 @@ func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader,
|
||||
}
|
||||
|
||||
func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
|
||||
sid, _, isOOO := unpackHeadChunkRef(meta.Ref)
|
||||
if !isOOO && meta.Chunk == nil { // meta.Chunk can have a copy of OOO head samples, even on non-OOO chunk ID.
|
||||
sid, _, _ := unpackHeadChunkRef(meta.Ref)
|
||||
if !meta.MergeOOO {
|
||||
return cr.cr.ChunkOrIterable(meta)
|
||||
}
|
||||
|
||||
@ -266,8 +268,7 @@ func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chu
|
||||
// ChunkOrIterableWithCopy implements ChunkReaderWithCopy. The special Copy
|
||||
// behaviour is only implemented for the in-order head chunk.
|
||||
func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {
|
||||
_, _, isOOO := unpackHeadChunkRef(meta.Ref)
|
||||
if !isOOO {
|
||||
if !meta.MergeOOO {
|
||||
return cr.cr.ChunkOrIterableWithCopy(meta)
|
||||
}
|
||||
chk, iter, err := cr.ChunkOrIterable(meta)
|
||||
|
@ -311,6 +311,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
||||
Chunk: chunkenc.Chunk(nil),
|
||||
MinTime: e.mint,
|
||||
MaxTime: e.maxt,
|
||||
MergeOOO: true, // Only OOO chunks are tested here, so we always request merge from OOO head.
|
||||
}
|
||||
|
||||
// Ref to whatever Ref the chunk has, that we refer to by ID
|
||||
@ -484,7 +485,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
|
||||
cr := NewHeadAndOOOChunkReader(db.head, 0, 1000, nil, nil, 0)
|
||||
defer cr.Close()
|
||||
c, iterable, err := cr.ChunkOrIterable(chunks.Meta{
|
||||
Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300,
|
||||
Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, MergeOOO: true,
|
||||
})
|
||||
require.Nil(t, iterable)
|
||||
require.Equal(t, err, fmt.Errorf("not found"))
|
||||
|
Loading…
Reference in New Issue
Block a user