diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 805f66714..da6e54d27 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "bufio" "context" "encoding/binary" "fmt" @@ -155,6 +156,67 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { testutil.Equals(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: {sample{t: 0, v: 0}}}, seriesSet) } +// TestNoPanicAfterWALCorrutpion ensures that querying the db after a WAL corruption doesn't cause a panic. +// https://github.com/prometheus/prometheus/issues/7548 +func TestNoPanicAfterWALCorrutpion(t *testing.T) { + db, closeFn := openTestDB(t, &Options{WALSegmentSize: 32 * 1024}, nil) + t.Cleanup(closeFn) + + // Append until the the first mmaped head chunk. + // This is to ensure that all samples can be read from the mmaped chunks when the WAL is corrupted. + var expSamples []tsdbutil.Sample + var maxt int64 + { + for { + app := db.Appender() + _, err := app.Add(labels.FromStrings("foo", "bar"), maxt, 0) + expSamples = append(expSamples, sample{t: maxt, v: 0}) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + mmapedChunks, err := ioutil.ReadDir(mmappedChunksDir(db.Dir())) + testutil.Ok(t, err) + if len(mmapedChunks) > 0 { + break + } + maxt++ + } + testutil.Ok(t, db.Close()) + } + + // Corrupt the WAL after the first sample of the series so that it has at least one sample and + // it is not garbage collected. + // The repair deletes all WAL records after the corrupted record and these are read from the mmaped chunk. + { + walFiles, err := ioutil.ReadDir(path.Join(db.Dir(), "wal")) + testutil.Ok(t, err) + f, err := os.OpenFile(path.Join(db.Dir(), "wal", walFiles[0].Name()), os.O_RDWR, 0666) + testutil.Ok(t, err) + r := wal.NewReader(bufio.NewReader(f)) + testutil.Assert(t, r.Next(), "reading the series record") + testutil.Assert(t, r.Next(), "reading the first sample record") + // Write an invalid record header to corrupt everything after the first wal sample. + _, err = f.WriteAt([]byte{99}, r.Offset()) + testutil.Ok(t, err) + f.Close() + } + + // Query the data. + { + db, err := Open(db.Dir(), nil, nil, nil) + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, db.Close()) + }() + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.walCorruptionsTotal), "WAL corruption count mismatch") + + querier, err := db.Querier(context.TODO(), 0, maxt) + testutil.Ok(t, err) + seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "", "")) + // The last sample should be missing as it was after the WAL segment corruption. + testutil.Equals(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: expSamples[0 : len(expSamples)-1]}, seriesSet) + } +} + func TestDataNotAvailableAfterRollback(t *testing.T) { db, closeFn := openTestDB(t, nil, nil) defer func() { diff --git a/tsdb/head.go b/tsdb/head.go index d45307dd5..1c547ffe6 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -2078,8 +2078,9 @@ func (s *memSeries) chunkID(pos int) int { return pos + s.firstChunkID } -// truncateChunksBefore removes all chunks from the series that have not timestamp -// at or after mint. Chunk IDs remain unchanged. +// truncateChunksBefore removes all chunks from the series that +// have no timestamp at or after mint. +// Chunk IDs remain unchanged. func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { var k int if s.headChunk != nil && s.headChunk.maxTime < mint {