Update MinOOOTime and MaxOOOTime properly after restart (#275)

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2022-06-24 15:56:33 +05:30 committed by GitHub
parent df59320886
commit abde1e0ba1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 5 deletions

View File

@ -4236,6 +4236,11 @@ func TestOOOAppendAndQuery(t *testing.T) {
require.Equal(t, float64(totalSamples-2), prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended), "number of ooo appended samples mismatch")
}
verifyOOOMinMaxTimes := func(expMin, expMax int64) {
require.Equal(t, minutes(expMin), db.head.MinOOOTime())
require.Equal(t, minutes(expMax), db.head.MaxOOOTime())
}
// In-order samples.
addSample(s1, 300, 300, false)
addSample(s2, 290, 290, false)
@ -4245,16 +4250,19 @@ func TestOOOAppendAndQuery(t *testing.T) {
// Some ooo samples.
addSample(s1, 250, 260, false)
addSample(s2, 255, 265, false)
verifyOOOMinMaxTimes(250, 265)
testQuery()
// Out of allowance.
addSample(s1, 59, 59, true)
addSample(s2, 49, 49, true)
verifyOOOMinMaxTimes(250, 265)
testQuery()
// At the edge of allowance, also it would be "out of bound" without the ooo support.
addSample(s1, 60, 65, false)
addSample(s2, 50, 55, false)
verifyOOOMinMaxTimes(50, 265)
testQuery()
// Out of allowance again.
@ -4268,6 +4276,7 @@ func TestOOOAppendAndQuery(t *testing.T) {
require.Equal(t, float64(4), prom_testutil.ToFloat64(db.head.metrics.chunksCreated))
addSample(s1, 180, 249, false)
require.Equal(t, float64(6), prom_testutil.ToFloat64(db.head.metrics.chunksCreated))
verifyOOOMinMaxTimes(50, 265)
testQuery()
}
@ -4393,6 +4402,7 @@ func TestWBLAndMmapReplay(t *testing.T) {
addSample(s1, 195, 249) // This creates some m-map chunks.
require.Equal(t, float64(4), prom_testutil.ToFloat64(db.head.metrics.chunksCreated))
testQuery(expSamples)
oooMint, oooMaxt := minutes(195), minutes(260)
// Collect the samples only present in the ooo m-map chunks.
ms, created, err := db.head.getOrCreate(s1.Hash(), s1)
@ -4436,6 +4446,8 @@ func TestWBLAndMmapReplay(t *testing.T) {
t.Run("Restart DB with both WBL and M-map files for ooo data", func(t *testing.T) {
db, err = Open(db.dir, nil, nil, opts, nil)
require.NoError(t, err)
require.Equal(t, oooMint, db.head.MinOOOTime())
require.Equal(t, oooMaxt, db.head.MaxOOOTime())
testQuery(expSamples)
require.NoError(t, db.Close())
})
@ -4445,6 +4457,8 @@ func TestWBLAndMmapReplay(t *testing.T) {
db, err = Open(db.dir, nil, nil, opts, nil)
require.NoError(t, err)
require.Equal(t, oooMint, db.head.MinOOOTime())
require.Equal(t, oooMaxt, db.head.MaxOOOTime())
testQuery(expSamples)
require.NoError(t, db.Close())
})
@ -4455,6 +4469,8 @@ func TestWBLAndMmapReplay(t *testing.T) {
db, err = Open(db.dir, nil, nil, opts, nil)
require.NoError(t, err)
require.Equal(t, oooMint, db.head.MinOOOTime())
require.Equal(t, oooMaxt, db.head.MaxOOOTime())
inOrderSample := expSamples[s1.String()][len(expSamples[s1.String()])-1]
testQuery(map[string][]tsdbutil.Sample{
s1.String(): append(s1MmapSamples, inOrderSample),
@ -4469,6 +4485,8 @@ func TestWBLAndMmapReplay(t *testing.T) {
opts.OutOfOrderCapMax = 60
db, err = Open(db.dir, nil, nil, opts, nil)
require.NoError(t, err)
require.Equal(t, oooMint, db.head.MinOOOTime())
require.Equal(t, oooMaxt, db.head.MaxOOOTime())
testQuery(expSamples)
require.NoError(t, db.Close())
})
@ -4479,6 +4497,8 @@ func TestWBLAndMmapReplay(t *testing.T) {
opts.OutOfOrderCapMax = 10
db, err = Open(db.dir, nil, nil, opts, nil)
require.NoError(t, err)
require.Equal(t, oooMint, db.head.MinOOOTime())
require.Equal(t, oooMaxt, db.head.MaxOOOTime())
testQuery(expSamples)
require.NoError(t, db.Close())
})
@ -4511,6 +4531,8 @@ func TestWBLAndMmapReplay(t *testing.T) {
opts.OutOfOrderCapMax = 30
db, err = Open(db.dir, nil, nil, opts, nil)
require.NoError(t, err)
require.Equal(t, oooMint, db.head.MinOOOTime())
require.Equal(t, oooMaxt, db.head.MaxOOOTime())
testQuery(expSamples)
require.NoError(t, db.Close())
})
@ -4521,6 +4543,8 @@ func TestWBLAndMmapReplay(t *testing.T) {
db, err = Open(db.dir, nil, nil, opts, nil)
require.NoError(t, err)
require.Equal(t, oooMint, db.head.MinOOOTime())
require.Equal(t, oooMaxt, db.head.MaxOOOTime())
testQuery(expSamples)
})
}

View File

@ -357,6 +357,19 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*m
mSeries.mmMaxTime = mmc[len(mmc)-1].maxTime
h.updateMinMaxTime(mmc[0].minTime, mSeries.mmMaxTime)
}
if len(oooMmc) != 0 {
// mint and maxt can be in any chunk, they are not sorted.
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
for _, ch := range oooMmc {
if ch.minTime < mint {
mint = ch.minTime
}
if ch.maxTime > maxt {
maxt = ch.maxTime
}
}
h.updateMinOOOMaxOOOTime(mint, maxt)
}
// Any samples replayed till now would already be compacted. Resetting the head chunk.
// We do not reset oooHeadChunk because that is being replayed from a different WAL
@ -497,7 +510,7 @@ func (h *Head) loadWbl(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
processors[i].setup()
go func(wp *wblSubsetProcessor) {
unknown := wp.processWALSamples(h)
unknown := wp.processWBLSamples(h)
unknownRefs.Add(unknown)
wg.Done()
}(&processors[i])
@ -679,14 +692,14 @@ func (wp *wblSubsetProcessor) reuseBuf() []record.RefSample {
return nil
}
// processWALSamples adds the samples it receives to the head and passes
// processWBLSamples adds the samples it receives to the head and passes
// the buffer received to an output channel for reuse.
// Samples before the minValidTime timestamp are discarded.
func (wp *wblSubsetProcessor) processWALSamples(h *Head) (unknownRefs uint64) {
func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) {
defer close(wp.output)
// We don't check for minValidTime for ooo samples.
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
for samples := range wp.input {
wp.mx.Lock()
for _, s := range samples {
@ -695,15 +708,26 @@ func (wp *wblSubsetProcessor) processWALSamples(h *Head) (unknownRefs uint64) {
unknownRefs++
continue
}
if _, chunkCreated, _ := ms.insert(s.T, s.V, h.chunkDiskMapper); chunkCreated {
ok, chunkCreated, _ := ms.insert(s.T, s.V, h.chunkDiskMapper)
if chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
}
if ok {
if s.T < mint {
mint = s.T
}
if s.T > maxt {
maxt = s.T
}
}
}
wp.mx.Unlock()
wp.output <- samples
}
h.updateMinOOOMaxOOOTime(mint, maxt)
return unknownRefs
}