From 902817e17f194931ebe7e5181c6aeacbbc6223de Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Thu, 9 Dec 2021 16:06:38 +0530 Subject: [PATCH 1/2] Fix queries after a failed snapshot replay (#9980) (#9987) Signed-off-by: Ganesh Vernekar --- tsdb/head.go | 4 +- tsdb/head_test.go | 104 ++++++++++++++++++++++++++++++++++++++++++++++ tsdb/head_wal.go | 3 +- 3 files changed, 109 insertions(+), 2 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index 72387236d..f2dab2208 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -466,7 +466,9 @@ const cardinalityCacheExpirationTime = time.Duration(30) * time.Second // limits the ingested samples to the head min valid time. func (h *Head) Init(minValidTime int64) error { h.minValidTime.Store(minValidTime) - defer h.postings.EnsureOrder() + defer func() { + h.postings.EnsureOrder() + }() defer h.gc() // After loading the wal remove the obsolete data from the head. defer func() { // Loading of m-mapped chunks and snapshot can make the mint of the Head diff --git a/tsdb/head_test.go b/tsdb/head_test.go index e404e94f6..6bdefd32a 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2827,3 +2827,107 @@ func TestSnapshotError(t *testing.T) { require.NoError(t, err) require.Equal(t, 0, len(tm)) } + +// Tests https://github.com/prometheus/prometheus/issues/9725. +func TestChunkSnapshotReplayBug(t *testing.T) { + dir := t.TempDir() + wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) + require.NoError(t, err) + + // Write few series records and samples such that the series references are not in order in the WAL + // for status_code="200". + var buf []byte + for i := 1; i <= 1000; i++ { + var ref uint64 + if i <= 500 { + ref = uint64(i * 100) + } else { + ref = uint64((i - 500) * 50) + } + seriesRec := record.RefSeries{ + Ref: ref, + Labels: labels.Labels{ + {Name: "__name__", Value: "request_duration"}, + {Name: "status_code", Value: "200"}, + {Name: "foo", Value: fmt.Sprintf("baz%d", rand.Int())}, + }, + } + // Add a sample so that the series is not garbage collected. + samplesRec := record.RefSample{Ref: ref, T: 1000, V: 1000} + var enc record.Encoder + + rec := enc.Series([]record.RefSeries{seriesRec}, buf) + buf = rec[:0] + require.NoError(t, wlog.Log(rec)) + rec = enc.Samples([]record.RefSample{samplesRec}, buf) + buf = rec[:0] + require.NoError(t, wlog.Log(rec)) + } + + // Write a corrupt snapshot to fail the replay on startup. + snapshotName := chunkSnapshotDir(0, 100) + cpdir := filepath.Join(dir, snapshotName) + require.NoError(t, os.MkdirAll(cpdir, 0o777)) + + err = ioutil.WriteFile(filepath.Join(cpdir, "00000000"), []byte{1, 5, 3, 5, 6, 7, 4, 2, 2}, 0o777) + require.NoError(t, err) + + opts := DefaultHeadOptions() + opts.ChunkDirRoot = dir + opts.EnableMemorySnapshotOnShutdown = true + head, err := NewHead(nil, nil, wlog, opts, nil) + require.NoError(t, err) + require.NoError(t, head.Init(math.MinInt64)) + defer func() { + require.NoError(t, head.Close()) + }() + + // Snapshot replay should error out. + require.Equal(t, 1.0, prom_testutil.ToFloat64(head.metrics.snapshotReplayErrorTotal)) + + // Querying `request_duration{status_code!="200"}` should return no series since all of + // them have status_code="200". + q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + series := query(t, q, + labels.MustNewMatcher(labels.MatchEqual, "__name__", "request_duration"), + labels.MustNewMatcher(labels.MatchNotEqual, "status_code", "200"), + ) + require.Len(t, series, 0, "there should be no series found") +} + +func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) { + dir := t.TempDir() + wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) + require.NoError(t, err) + + // Write a snapshot with .tmp suffix. This used to fail taking any further snapshots or replay of snapshots. + snapshotName := chunkSnapshotDir(0, 100) + ".tmp" + cpdir := filepath.Join(dir, snapshotName) + require.NoError(t, os.MkdirAll(cpdir, 0o777)) + + opts := DefaultHeadOptions() + opts.ChunkDirRoot = dir + opts.EnableMemorySnapshotOnShutdown = true + head, err := NewHead(nil, nil, wlog, opts, nil) + require.NoError(t, err) + require.NoError(t, head.Init(math.MinInt64)) + + require.Equal(t, 0.0, prom_testutil.ToFloat64(head.metrics.snapshotReplayErrorTotal)) + + // Add some samples for the snapshot. + app := head.Appender(context.Background()) + _, err = app.Append(0, labels.Labels{{Name: "foo", Value: "bar"}}, 10, 10) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // Should not return any error for a successful snapshot. + require.NoError(t, head.Close()) + + // Verify the snapshot. + name, idx, offset, err := LastChunkSnapshot(dir) + require.NoError(t, err) + require.True(t, name != "") + require.Equal(t, 0, idx) + require.Greater(t, offset, 0) +} diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 478829824..b9a0b5ba4 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -752,7 +752,8 @@ func LastChunkSnapshot(dir string) (string, int, int, error) { splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".") if len(splits) != 2 { - return "", 0, 0, errors.Errorf("chunk snapshot %s is not in the right format", fi.Name()) + // Chunk snapshots is not in the right format, we do not care about it. + continue } idx, err := strconv.Atoi(splits[0]) From c0a8b0cd405531820d928b84d2e5ed8e7ba34dcb Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Thu, 9 Dec 2021 16:47:47 +0530 Subject: [PATCH 2/2] Cut v2.30.4 (#9988) Signed-off-by: Ganesh Vernekar --- CHANGELOG.md | 4 ++++ VERSION | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 00919b057..0dce254e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 2.30.4 / 2021-12-09 + +* [BUGFIX] TSDB: Fix queries after a failed snapshot replay. #9980 + ## 2.30.3 / 2021-10-05 * [BUGFIX] TSDB: Fix panic on failed snapshot replay. #9438 diff --git a/VERSION b/VERSION index e88ba89ba..ea5b93e74 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.30.3 +2.30.4