Merge pull request #12127 from codesome/ooo-mmap-replay

Update OOO min/max time properly after replaying m-map chunks
This commit is contained in:
Ganesh Vernekar 2023-04-04 12:05:57 +05:30 committed by GitHub
commit e709b0b36e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 138 additions and 16 deletions

View File

@ -4451,6 +4451,115 @@ func TestOOOCompactionWithDisabledWriteLog(t *testing.T) {
verifySamples(db.Blocks()[1], 250, 350) verifySamples(db.Blocks()[1], 250, 350)
} }
// TestOOOQueryAfterRestartWithSnapshotAndRemovedWBL tests the scenario where the WBL goes
// missing after a restart while snapshot was enabled, but the query still returns the right
// data from the mmap chunks.
func TestOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T) {
dir := t.TempDir()
opts := DefaultOptions()
opts.OutOfOrderCapMax = 10
opts.OutOfOrderTimeWindow = 300 * time.Minute.Milliseconds()
opts.EnableMemorySnapshotOnShutdown = true
db, err := Open(dir, nil, nil, opts, nil)
require.NoError(t, err)
db.DisableCompactions() // We want to manually call it.
t.Cleanup(func() {
require.NoError(t, db.Close())
})
series1 := labels.FromStrings("foo", "bar1")
series2 := labels.FromStrings("foo", "bar2")
addSamples := func(fromMins, toMins int64) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
_, err := app.Append(0, series1, ts, float64(ts))
require.NoError(t, err)
_, err = app.Append(0, series2, ts, float64(2*ts))
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
// Add an in-order samples.
addSamples(250, 350)
// Add ooo samples that will result into a single block.
addSamples(90, 110) // The sample 110 will not be in m-map chunks.
// Checking that there are some ooo m-map chunks.
for _, lbls := range []labels.Labels{series1, series2} {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
require.NoError(t, err)
require.False(t, created)
require.Equal(t, 2, len(ms.ooo.oooMmappedChunks))
require.NotNil(t, ms.ooo.oooHeadChunk)
}
// Restart DB.
require.NoError(t, db.Close())
// For some reason wbl goes missing.
require.NoError(t, os.RemoveAll(path.Join(dir, "wbl")))
db, err = Open(dir, nil, nil, opts, nil)
require.NoError(t, err)
db.DisableCompactions() // We want to manually call it.
// Check ooo m-map chunks again.
for _, lbls := range []labels.Labels{series1, series2} {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
require.NoError(t, err)
require.False(t, created)
require.Equal(t, 2, len(ms.ooo.oooMmappedChunks))
require.Equal(t, 109*time.Minute.Milliseconds(), ms.ooo.oooMmappedChunks[1].maxTime)
require.Nil(t, ms.ooo.oooHeadChunk) // Because of missing wbl.
}
verifySamples := func(fromMins, toMins int64) {
series1Samples := make([]tsdbutil.Sample, 0, toMins-fromMins+1)
series2Samples := make([]tsdbutil.Sample, 0, toMins-fromMins+1)
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil})
series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil})
}
expRes := map[string][]tsdbutil.Sample{
series1.String(): series1Samples,
series2.String(): series2Samples,
}
q, err := db.Querier(context.Background(), fromMins*time.Minute.Milliseconds(), toMins*time.Minute.Milliseconds())
require.NoError(t, err)
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
require.Equal(t, expRes, actRes)
}
// Checking for expected ooo data from mmap chunks.
verifySamples(90, 109)
// Compaction should also work fine.
require.Equal(t, len(db.Blocks()), 0)
require.NoError(t, db.CompactOOOHead())
require.Equal(t, len(db.Blocks()), 1) // One block from OOO data.
require.Equal(t, int64(0), db.Blocks()[0].MinTime())
require.Equal(t, 120*time.Minute.Milliseconds(), db.Blocks()[0].MaxTime())
// Checking that ooo chunk is empty in Head.
for _, lbls := range []labels.Labels{series1, series2} {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
require.NoError(t, err)
require.False(t, created)
require.Nil(t, ms.ooo)
}
verifySamples(90, 109)
}
func Test_Querier_OOOQuery(t *testing.T) { func Test_Querier_OOOQuery(t *testing.T) {
opts := DefaultOptions() opts := DefaultOptions()
opts.OutOfOrderCapMax = 30 opts.OutOfOrderCapMax = 30

View File

@ -591,6 +591,7 @@ func (h *Head) Init(minValidTime int64) error {
snapIdx, snapOffset := -1, 0 snapIdx, snapOffset := -1, 0
refSeries := make(map[chunks.HeadSeriesRef]*memSeries) refSeries := make(map[chunks.HeadSeriesRef]*memSeries)
snapshotLoaded := false
if h.opts.EnableMemorySnapshotOnShutdown { if h.opts.EnableMemorySnapshotOnShutdown {
level.Info(h.logger).Log("msg", "Chunk snapshot is enabled, replaying from the snapshot") level.Info(h.logger).Log("msg", "Chunk snapshot is enabled, replaying from the snapshot")
// If there are any WAL files, there should be at least one WAL file with an index that is current or newer // If there are any WAL files, there should be at least one WAL file with an index that is current or newer
@ -620,6 +621,7 @@ func (h *Head) Init(minValidTime int64) error {
var err error var err error
snapIdx, snapOffset, refSeries, err = h.loadChunkSnapshot() snapIdx, snapOffset, refSeries, err = h.loadChunkSnapshot()
if err == nil { if err == nil {
snapshotLoaded = true
level.Info(h.logger).Log("msg", "Chunk snapshot loading time", "duration", time.Since(start).String()) level.Info(h.logger).Log("msg", "Chunk snapshot loading time", "duration", time.Since(start).String())
} }
if err != nil { if err != nil {
@ -637,26 +639,36 @@ func (h *Head) Init(minValidTime int64) error {
} }
mmapChunkReplayStart := time.Now() mmapChunkReplayStart := time.Now()
mmappedChunks, oooMmappedChunks, lastMmapRef, err := h.loadMmappedChunks(refSeries) var (
if err != nil { mmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk
// TODO(codesome): clear out all m-map chunks here for refSeries. oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk
level.Error(h.logger).Log("msg", "Loading on-disk chunks failed", "err", err) lastMmapRef chunks.ChunkDiskMapperRef
if _, ok := errors.Cause(err).(*chunks.CorruptionErr); ok { err error
h.metrics.mmapChunkCorruptionTotal.Inc() )
} if snapshotLoaded || h.wal != nil {
// If snapshot was not loaded and if there is no WAL, then m-map chunks will be discarded
// Discard snapshot data since we need to replay the WAL for the missed m-map chunks data. // anyway. So we only load m-map chunks when it won't be discarded.
snapIdx, snapOffset = -1, 0 mmappedChunks, oooMmappedChunks, lastMmapRef, err = h.loadMmappedChunks(refSeries)
// If this fails, data will be recovered from WAL.
// Hence we wont lose any data (given WAL is not corrupt).
mmappedChunks, oooMmappedChunks, lastMmapRef, err = h.removeCorruptedMmappedChunks(err)
if err != nil { if err != nil {
return err // TODO(codesome): clear out all m-map chunks here for refSeries.
level.Error(h.logger).Log("msg", "Loading on-disk chunks failed", "err", err)
if _, ok := errors.Cause(err).(*chunks.CorruptionErr); ok {
h.metrics.mmapChunkCorruptionTotal.Inc()
}
// Discard snapshot data since we need to replay the WAL for the missed m-map chunks data.
snapIdx, snapOffset = -1, 0
// If this fails, data will be recovered from WAL.
// Hence we wont lose any data (given WAL is not corrupt).
mmappedChunks, oooMmappedChunks, lastMmapRef, err = h.removeCorruptedMmappedChunks(err)
if err != nil {
return err
}
} }
level.Info(h.logger).Log("msg", "On-disk memory mappable chunks replay completed", "duration", time.Since(mmapChunkReplayStart).String())
} }
level.Info(h.logger).Log("msg", "On-disk memory mappable chunks replay completed", "duration", time.Since(mmapChunkReplayStart).String())
if h.wal == nil { if h.wal == nil {
level.Info(h.logger).Log("msg", "WAL not found") level.Info(h.logger).Log("msg", "WAL not found")
return nil return nil
@ -824,6 +836,7 @@ func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries)
numSamples: numSamples, numSamples: numSamples,
}) })
h.updateMinOOOMaxOOOTime(mint, maxt)
return nil return nil
} }