From 2624d827fadc604323db979e1d7b59415531f3d3 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Wed, 21 Oct 2020 18:27:13 +0530 Subject: [PATCH] Read repair empty last file in chunks_head (#8061) * Read repair empty file in chunks_head Signed-off-by: Ganesh Vernekar * Refactor and introduce repairLastChunkFile Signed-off-by: Ganesh Vernekar * Attempt windows test fix Signed-off-by: Ganesh Vernekar * Fix review comments Signed-off-by: Ganesh Vernekar * Fix review comments Signed-off-by: Ganesh Vernekar --- tsdb/chunks/head_chunks.go | 36 +++++++++++++++++ tsdb/chunks/head_chunks_test.go | 70 +++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+) diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index 632682218..154149d0c 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -167,6 +167,11 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) { return err } + files, err = repairLastChunkFile(files) + if err != nil { + return err + } + chkFileIndices := make([]int, 0, len(files)) for seq, fn := range files { f, err := fileutil.OpenMmapFile(fn) @@ -226,9 +231,40 @@ func listChunkFiles(dir string) (map[int]string, error) { } res[int(seq)] = filepath.Join(dir, fi.Name()) } + return res, nil } +// repairLastChunkFile deletes the last file if it's empty. +// Because we don't fsync when creating these file, we could end +// up with an empty file at the end during an abrupt shutdown. +func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr error) { + lastFile := -1 + for seq := range files { + if seq > lastFile { + lastFile = seq + } + } + + if lastFile <= 0 { + return files, nil + } + + info, err := os.Stat(files[lastFile]) + if err != nil { + return files, errors.Wrap(err, "file stat during last head chunk file repair") + } + if info.Size() == 0 { + // Corrupt file, hence remove it. + if err := os.RemoveAll(files[lastFile]); err != nil { + return files, errors.Wrap(err, "delete corrupted, empty head chunk file during last file repair") + } + delete(files, lastFile) + } + + return files, nil +} + // WriteChunk writes the chunk to the disk. // The returned chunk ref is the reference from where the chunk encoding starts for the chunk. func (cdm *ChunkDiskMapper) WriteChunk(seriesRef uint64, mint, maxt int64, chk chunkenc.Chunk) (chkRef uint64, err error) { diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go index bfb4262ad..f8ed26cf1 100644 --- a/tsdb/chunks/head_chunks_test.go +++ b/tsdb/chunks/head_chunks_test.go @@ -19,6 +19,7 @@ import ( "io/ioutil" "math/rand" "os" + "strconv" "testing" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -363,6 +364,75 @@ func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) { testutil.Ok(t, hrw.Truncate(2000)) } +func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) { + hrw := testHeadReadWriter(t) + defer func() { + testutil.Ok(t, hrw.Close()) + }() + + timeRange := 0 + addChunk := func() { + step := 100 + mint, maxt := timeRange+1, timeRange+step-1 + _, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t)) + testutil.Ok(t, err) + timeRange += step + } + nonEmptyFile := func() { + testutil.Ok(t, hrw.CutNewFile()) + addChunk() + } + + addChunk() // 1. Created with the first chunk. + nonEmptyFile() // 2. + nonEmptyFile() // 3. + + testutil.Equals(t, 3, len(hrw.mmappedChunkFiles)) + lastFile := 0 + for idx := range hrw.mmappedChunkFiles { + if idx > lastFile { + lastFile = idx + } + } + testutil.Equals(t, 3, lastFile) + dir := hrw.dir.Name() + testutil.Ok(t, hrw.Close()) + + // Write an empty last file mimicking an abrupt shutdown on file creation. + emptyFileName := segmentFile(dir, lastFile+1) + f, err := os.OpenFile(emptyFileName, os.O_WRONLY|os.O_CREATE, 0666) + testutil.Ok(t, err) + testutil.Ok(t, f.Sync()) + stat, err := f.Stat() + testutil.Ok(t, err) + testutil.Equals(t, int64(0), stat.Size()) + testutil.Ok(t, f.Close()) + + // Open chunk disk mapper again, corrupt file should be removed. + hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool()) + testutil.Ok(t, err) + testutil.Assert(t, !hrw.fileMaxtSet, "") + testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) + testutil.Assert(t, hrw.fileMaxtSet, "") + + // Removed from memory. + testutil.Equals(t, 3, len(hrw.mmappedChunkFiles)) + for idx := range hrw.mmappedChunkFiles { + testutil.Assert(t, idx <= lastFile, "file index is bigger than previous last file") + } + + // Removed even from disk. + files, err := ioutil.ReadDir(dir) + testutil.Ok(t, err) + testutil.Equals(t, 3, len(files)) + for _, fi := range files { + seq, err := strconv.ParseUint(fi.Name(), 10, 64) + testutil.Ok(t, err) + testutil.Assert(t, seq <= uint64(lastFile), "file index on disk is bigger than previous last file") + } + +} + func testHeadReadWriter(t *testing.T) *ChunkDiskMapper { tmpdir, err := ioutil.TempDir("", "data") testutil.Ok(t, err)