TSDB: Simplify OOO Select by copying the head chunk (#14396)

Instead of carrying around extra fields in `Meta` structs which let us
approximate what was in the chunk at the time, take a copy of the chunk.

This simplifies lots of code, and lets us correct a couple of tests which
were embedding the wrong answer.

We can also remove boundedIterator, which was only used to constrain
the OOO head chunk.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2024-07-03 15:08:07 +01:00 committed by GitHub
parent 3d54bcc018
commit 134e8dc7af
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 35 additions and 340 deletions

View File

@ -133,15 +133,6 @@ type Meta struct {
// Time range the data covers.
// When MaxTime == math.MaxInt64 the chunk is still open and being appended to.
MinTime, MaxTime int64
// OOOLastRef, OOOLastMinTime and OOOLastMaxTime are kept as markers for
// overlapping chunks.
// These fields point to the last created out of order Chunk (the head) that existed
// when Series() was called and was overlapping.
// Series() and Chunk() method responses should be consistent for the same
// query even if new data is added in between the calls.
OOOLastRef ChunkRef
OOOLastMinTime, OOOLastMaxTime int64
}
// ChunkFromSamples requires all samples to have the same type.

View File

@ -487,55 +487,24 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe
// We create a temporary slice of chunk metas to hold the information of all
// possible chunks that may overlap with the requested chunk.
tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.ooo.oooMmappedChunks))
oooHeadRef := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks))))
if s.ooo.oooHeadChunk != nil && s.ooo.oooHeadChunk.OverlapsClosedInterval(mint, maxt) {
// We only want to append the head chunk if this chunk existed when
// Series() was called. This brings consistency in case new data
// is added in between Series() and Chunk() calls.
if oooHeadRef == meta.OOOLastRef {
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{
meta: chunks.Meta{
// Ignoring samples added before and after the last known min and max time for this chunk.
MinTime: meta.OOOLastMinTime,
MaxTime: meta.OOOLastMaxTime,
Ref: oooHeadRef,
},
})
}
}
tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.ooo.oooMmappedChunks)+1)
for i, c := range s.ooo.oooMmappedChunks {
chunkRef := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i)))
// We can skip chunks that came in later than the last known OOOLastRef.
if chunkRef > meta.OOOLastRef {
break
}
switch {
case chunkRef == meta.OOOLastRef:
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{
meta: chunks.Meta{
MinTime: meta.OOOLastMinTime,
MaxTime: meta.OOOLastMaxTime,
Ref: chunkRef,
},
ref: c.ref,
origMinT: c.minTime,
origMaxT: c.maxTime,
})
case c.OverlapsClosedInterval(mint, maxt):
if c.OverlapsClosedInterval(mint, maxt) {
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{
meta: chunks.Meta{
MinTime: c.minTime,
MaxTime: c.maxTime,
Ref: chunkRef,
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i))),
},
ref: c.ref,
})
}
}
// Add in data copied from the head OOO chunk.
if meta.Chunk != nil {
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{meta: meta})
}
// Next we want to sort all the collected chunks by min time so we can find
// those that overlap and stop when we know the rest don't.
@ -548,22 +517,8 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe
continue
}
var iterable chunkenc.Iterable
if c.meta.Ref == oooHeadRef {
var xor *chunkenc.XORChunk
var err error
// If head chunk min and max time match the meta OOO markers
// that means that the chunk has not expanded so we can append
// it as it is.
if s.ooo.oooHeadChunk.minTime == meta.OOOLastMinTime && s.ooo.oooHeadChunk.maxTime == meta.OOOLastMaxTime {
xor, err = s.ooo.oooHeadChunk.chunk.ToXOR() // TODO(jesus.vazquez) (This is an optimization idea that has no priority and might not be that useful) See if we could use a copy of the underlying slice. That would leave the more expensive ToXOR() function only for the usecase where Bytes() is called.
} else {
// We need to remove samples that are outside of the markers
xor, err = s.ooo.oooHeadChunk.chunk.ToXORBetweenTimestamps(meta.OOOLastMinTime, meta.OOOLastMaxTime)
}
if err != nil {
return nil, fmt.Errorf("failed to convert ooo head chunk to xor chunk: %w", err)
}
iterable = xor
if c.meta.Chunk != nil {
iterable = c.meta.Chunk
} else {
chk, err := cdm.Chunk(c.ref)
if err != nil {
@ -573,17 +528,8 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe
}
return nil, err
}
if c.meta.Ref == meta.OOOLastRef &&
(c.origMinT != meta.OOOLastMinTime || c.origMaxT != meta.OOOLastMaxTime) {
// The head expanded and was memory mapped so now we need to
// wrap the chunk within a chunk that doesnt allows us to iterate
// through samples out of the OOOLastMinT and OOOLastMaxT
// markers.
iterable = boundedIterable{chk, meta.OOOLastMinTime, meta.OOOLastMaxTime}
} else {
iterable = chk
}
}
mc.chunkIterables = append(mc.chunkIterables, iterable)
if c.meta.MaxTime > absoluteMax {
absoluteMax = c.meta.MaxTime
@ -593,74 +539,6 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe
return mc, nil
}
var _ chunkenc.Iterable = &boundedIterable{}
// boundedIterable is an implementation of chunkenc.Iterable that uses a
// boundedIterator that only iterates through samples which timestamps are
// >= minT and <= maxT.
type boundedIterable struct {
chunk chunkenc.Chunk
minT int64
maxT int64
}
func (b boundedIterable) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator {
it := b.chunk.Iterator(iterator)
if it == nil {
panic("iterator shouldn't be nil")
}
return boundedIterator{it, b.minT, b.maxT}
}
var _ chunkenc.Iterator = &boundedIterator{}
// boundedIterator is an implementation of Iterator that only iterates through
// samples which timestamps are >= minT and <= maxT.
type boundedIterator struct {
chunkenc.Iterator
minT int64
maxT int64
}
// Next the first time its called it will advance as many positions as necessary
// until its able to find a sample within the bounds minT and maxT.
// If there are samples within bounds it will advance one by one amongst them.
// If there are no samples within bounds it will return false.
func (b boundedIterator) Next() chunkenc.ValueType {
for b.Iterator.Next() == chunkenc.ValFloat {
t, _ := b.Iterator.At()
switch {
case t < b.minT:
continue
case t > b.maxT:
return chunkenc.ValNone
default:
return chunkenc.ValFloat
}
}
return chunkenc.ValNone
}
func (b boundedIterator) Seek(t int64) chunkenc.ValueType {
if t < b.minT {
// We must seek at least up to b.minT if it is asked for something before that.
val := b.Iterator.Seek(b.minT)
if !(val == chunkenc.ValFloat) {
return chunkenc.ValNone
}
t, _ := b.Iterator.At()
if t <= b.maxT {
return chunkenc.ValFloat
}
}
if t > b.maxT {
// We seek anyway so that the subsequent Next() calls will also return false.
b.Iterator.Seek(t)
return chunkenc.ValNone
}
return b.Iterator.Seek(t)
}
// safeHeadChunk makes sure that the chunk can be accessed without a race condition.
type safeHeadChunk struct {
chunkenc.Chunk

View File

@ -15,7 +15,6 @@ package tsdb
import (
"context"
"fmt"
"sync"
"testing"
@ -26,150 +25,6 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks"
)
func TestBoundedChunk(t *testing.T) {
tests := []struct {
name string
inputChunk chunkenc.Chunk
inputMinT int64
inputMaxT int64
initialSeek int64
seekIsASuccess bool
expSamples []sample
}{
{
name: "if there are no samples it returns nothing",
inputChunk: newTestChunk(0),
expSamples: nil,
},
{
name: "bounds represent a single sample",
inputChunk: newTestChunk(10),
expSamples: []sample{
{0, 0, nil, nil},
},
},
{
name: "if there are bounds set only samples within them are returned",
inputChunk: newTestChunk(10),
inputMinT: 1,
inputMaxT: 8,
expSamples: []sample{
{1, 1, nil, nil},
{2, 2, nil, nil},
{3, 3, nil, nil},
{4, 4, nil, nil},
{5, 5, nil, nil},
{6, 6, nil, nil},
{7, 7, nil, nil},
{8, 8, nil, nil},
},
},
{
name: "if bounds set and only maxt is less than actual maxt",
inputChunk: newTestChunk(10),
inputMinT: 0,
inputMaxT: 5,
expSamples: []sample{
{0, 0, nil, nil},
{1, 1, nil, nil},
{2, 2, nil, nil},
{3, 3, nil, nil},
{4, 4, nil, nil},
{5, 5, nil, nil},
},
},
{
name: "if bounds set and only mint is more than actual mint",
inputChunk: newTestChunk(10),
inputMinT: 5,
inputMaxT: 9,
expSamples: []sample{
{5, 5, nil, nil},
{6, 6, nil, nil},
{7, 7, nil, nil},
{8, 8, nil, nil},
{9, 9, nil, nil},
},
},
{
name: "if there are bounds set with seek before mint",
inputChunk: newTestChunk(10),
inputMinT: 3,
inputMaxT: 7,
initialSeek: 1,
seekIsASuccess: true,
expSamples: []sample{
{3, 3, nil, nil},
{4, 4, nil, nil},
{5, 5, nil, nil},
{6, 6, nil, nil},
{7, 7, nil, nil},
},
},
{
name: "if there are bounds set with seek between mint and maxt",
inputChunk: newTestChunk(10),
inputMinT: 3,
inputMaxT: 7,
initialSeek: 5,
seekIsASuccess: true,
expSamples: []sample{
{5, 5, nil, nil},
{6, 6, nil, nil},
{7, 7, nil, nil},
},
},
{
name: "if there are bounds set with seek after maxt",
inputChunk: newTestChunk(10),
inputMinT: 3,
inputMaxT: 7,
initialSeek: 8,
seekIsASuccess: false,
},
}
for _, tc := range tests {
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
iterable := boundedIterable{tc.inputChunk, tc.inputMinT, tc.inputMaxT}
var samples []sample
it := iterable.Iterator(nil)
if tc.initialSeek != 0 {
// Testing Seek()
val := it.Seek(tc.initialSeek)
require.Equal(t, tc.seekIsASuccess, val == chunkenc.ValFloat)
if val == chunkenc.ValFloat {
t, v := it.At()
samples = append(samples, sample{t, v, nil, nil})
}
}
// Testing Next()
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
samples = append(samples, sample{t, v, nil, nil})
}
// it.Next() should keep returning no value.
for i := 0; i < 10; i++ {
require.Equal(t, chunkenc.ValNone, it.Next())
}
require.Equal(t, tc.expSamples, samples)
})
}
}
func newTestChunk(numSamples int) chunkenc.Chunk {
xor := chunkenc.NewXORChunk()
a, _ := xor.Appender()
for i := 0; i < numSamples; i++ {
a.Append(int64(i), float64(i))
}
return xor
}
// TestMemSeries_chunk runs a series of tests on memSeries.chunk() calls.
// It will simulate various conditions to ensure all code paths in that function are covered.
func TestMemSeries_chunk(t *testing.T) {

View File

@ -94,48 +94,32 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks))
// We define these markers to track the last chunk reference while we
// fill the chunk meta.
// These markers are useful to give consistent responses to repeated queries
// even if new chunks that might be overlapping or not are added afterwards.
// Also, lastMinT and lastMaxT are initialized to the max int as a sentinel
// value to know they are unset.
var lastChunkRef chunks.ChunkRef
lastMinT, lastMaxT := int64(math.MaxInt64), int64(math.MaxInt64)
addChunk := func(minT, maxT int64, ref chunks.ChunkRef) {
// the first time we get called is for the last included chunk.
// set the markers accordingly
if lastMinT == int64(math.MaxInt64) {
lastChunkRef = ref
lastMinT = minT
lastMaxT = maxT
}
addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) {
tmpChks = append(tmpChks, chunks.Meta{
MinTime: minT,
MaxTime: maxT,
Ref: ref,
OOOLastRef: lastChunkRef,
OOOLastMinTime: lastMinT,
OOOLastMaxTime: lastMaxT,
Chunk: chunk,
})
}
// Collect all chunks that overlap the query range, in order from most recent to most old,
// so we can set the correct markers.
// Collect all chunks that overlap the query range.
if s.ooo.oooHeadChunk != nil {
c := s.ooo.oooHeadChunk
if c.OverlapsClosedInterval(oh.mint, oh.maxt) && maxMmapRef == 0 {
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks))))
addChunk(c.minTime, c.maxTime, ref)
var xor chunkenc.Chunk
if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least.
xor, _ = c.chunk.ToXOR() // Ignoring error because it can't fail.
}
addChunk(c.minTime, c.maxTime, ref, xor)
}
}
for i := len(s.ooo.oooMmappedChunks) - 1; i >= 0; i-- {
c := s.ooo.oooMmappedChunks[i]
if c.OverlapsClosedInterval(oh.mint, oh.maxt) && (maxMmapRef == 0 || maxMmapRef.GreaterThanOrEqualTo(c.ref)) && (lastGarbageCollectedMmapRef == 0 || c.ref.GreaterThan(lastGarbageCollectedMmapRef)) {
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i)))
addChunk(c.minTime, c.maxTime, ref)
addChunk(c.minTime, c.maxTime, ref, nil)
}
}
@ -163,6 +147,12 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
case c.MaxTime > maxTime:
maxTime = c.MaxTime
(*chks)[len(*chks)-1].MaxTime = c.MaxTime
fallthrough
default:
// If the head OOO chunk is part of an output chunk, copy the chunk pointer.
if c.Chunk != nil {
(*chks)[len(*chks)-1].Chunk = c.Chunk
}
}
}
@ -187,8 +177,6 @@ func (oh *OOOHeadIndexReader) LabelValues(ctx context.Context, name string, matc
type chunkMetaAndChunkDiskMapperRef struct {
meta chunks.Meta
ref chunks.ChunkDiskMapperRef
origMinT int64
origMaxT int64
}
func refLessByMinTimeAndMinRef(a, b chunkMetaAndChunkDiskMapperRef) int {

View File

@ -304,18 +304,6 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
s1, _, _ := h.getOrCreate(s1ID, s1Lset)
s1.ooo = &memSeriesOOOFields{}
var lastChunk chunkInterval
var lastChunkPos int
// the marker should be set based on whichever is the last chunk/interval that overlaps with the query range
for i, interv := range intervals {
if overlapsClosedInterval(interv.mint, interv.maxt, tc.queryMinT, tc.queryMaxT) {
lastChunk = interv
lastChunkPos = i
}
}
lastChunkRef := chunks.ChunkRef(chunks.NewHeadChunkRef(1, chunks.HeadChunkID(uint64(lastChunkPos))))
// define our expected chunks, by looking at the expected ChunkIntervals and setting...
var expChunks []chunks.Meta
for _, e := range tc.expChunks {
@ -323,10 +311,6 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
Chunk: chunkenc.Chunk(nil),
MinTime: e.mint,
MaxTime: e.maxt,
// markers based on the last chunk we found above
OOOLastMinTime: lastChunk.mint,
OOOLastMaxTime: lastChunk.maxt,
OOOLastRef: lastChunkRef,
}
// Ref to whatever Ref the chunk has, that we refer to by ID
@ -343,6 +327,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
if headChunk && len(intervals) > 0 {
// Put the last interval in the head chunk
s1.ooo.oooHeadChunk = &oooHeadChunk{
chunk: NewOOOChunk(),
minTime: intervals[len(intervals)-1].mint,
maxTime: intervals[len(intervals)-1].maxt,
}
@ -842,8 +827,8 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
}
require.NoError(t, app.Commit())
// The Series method is the one that populates the chunk meta OOO
// markers like OOOLastRef. These are then used by the ChunkReader.
// The Series method populates the chunk metas, taking a copy of the
// head OOO chunk if necessary. These are then used by the ChunkReader.
ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0)
var chks []chunks.Meta
var b labels.ScratchBuilder
@ -939,7 +924,6 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
sample{t: minutes(25), f: float64(1)},
sample{t: minutes(26), f: float64(0)},
sample{t: minutes(30), f: float64(0)},
sample{t: minutes(32), f: float64(1)}, // This sample was added after Series() but before Chunk() and its in between the lastmint and maxt so it should be kept
sample{t: minutes(35), f: float64(1)},
},
},
@ -985,7 +969,6 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
sample{t: minutes(25), f: float64(1)},
sample{t: minutes(26), f: float64(0)},
sample{t: minutes(30), f: float64(0)},
sample{t: minutes(32), f: float64(1)}, // This sample was added after Series() but before Chunk() and its in between the lastmint and maxt so it should be kept
sample{t: minutes(35), f: float64(1)},
},
},
@ -1007,8 +990,8 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
}
require.NoError(t, app.Commit())
// The Series method is the one that populates the chunk meta OOO
// markers like OOOLastRef. These are then used by the ChunkReader.
// The Series method populates the chunk metas, taking a copy of the
// head OOO chunk if necessary. These are then used by the ChunkReader.
ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0)
var chks []chunks.Meta
var b labels.ScratchBuilder