diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index ec0f6d403..69201c6db 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -133,6 +133,9 @@ type Meta struct { // Time range the data covers. // When MaxTime == math.MaxInt64 the chunk is still open and being appended to. MinTime, MaxTime int64 + + // Flag to indicate that this meta needs merge with OOO data. + MergeOOO bool } // ChunkFromSamples requires all samples to have the same type. diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index f1d06a421..a50decd62 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -91,10 +91,11 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) { tmpChks = append(tmpChks, chunks.Meta{ - MinTime: minT, - MaxTime: maxT, - Ref: ref, - Chunk: chunk, + MinTime: minT, + MaxTime: maxT, + Ref: ref, + Chunk: chunk, + MergeOOO: true, }) } @@ -160,6 +161,7 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap if c.Chunk != nil { (*chks)[len(*chks)-1].Chunk = c.Chunk } + (*chks)[len(*chks)-1].MergeOOO = (*chks)[len(*chks)-1].MergeOOO || c.MergeOOO } } @@ -241,8 +243,8 @@ func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader, } func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) { - sid, _, isOOO := unpackHeadChunkRef(meta.Ref) - if !isOOO && meta.Chunk == nil { // meta.Chunk can have a copy of OOO head samples, even on non-OOO chunk ID. + sid, _, _ := unpackHeadChunkRef(meta.Ref) + if !meta.MergeOOO { return cr.cr.ChunkOrIterable(meta) } @@ -266,8 +268,7 @@ func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chu // ChunkOrIterableWithCopy implements ChunkReaderWithCopy. The special Copy // behaviour is only implemented for the in-order head chunk. func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) { - _, _, isOOO := unpackHeadChunkRef(meta.Ref) - if !isOOO { + if !meta.MergeOOO { return cr.cr.ChunkOrIterableWithCopy(meta) } chk, iter, err := cr.ChunkOrIterable(meta) diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index f71d49732..e565933f8 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -308,9 +308,10 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { var expChunks []chunks.Meta for _, e := range tc.expChunks { meta := chunks.Meta{ - Chunk: chunkenc.Chunk(nil), - MinTime: e.mint, - MaxTime: e.maxt, + Chunk: chunkenc.Chunk(nil), + MinTime: e.mint, + MaxTime: e.maxt, + MergeOOO: true, // Only OOO chunks are tested here, so we always request merge from OOO head. } // Ref to whatever Ref the chunk has, that we refer to by ID @@ -484,7 +485,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { cr := NewHeadAndOOOChunkReader(db.head, 0, 1000, nil, nil, 0) defer cr.Close() c, iterable, err := cr.ChunkOrIterable(chunks.Meta{ - Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, + Ref: 0x1800000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300, MergeOOO: true, }) require.Nil(t, iterable) require.Equal(t, err, fmt.Errorf("not found"))