diff --git a/block.go b/block.go index e08503b64..1b6e79d9d 100644 --- a/block.go +++ b/block.go @@ -29,6 +29,7 @@ import ( "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" tsdb_errors "github.com/prometheus/tsdb/errors" + "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" ) @@ -230,12 +231,17 @@ func readMetaFile(dir string) (*BlockMeta, error) { return &m, nil } -func writeMetaFile(dir string, meta *BlockMeta) error { +func writeMetaFile(logger log.Logger, dir string, meta *BlockMeta) error { meta.Version = 1 // Make any changes to the file appear atomic. path := filepath.Join(dir, metaFilename) tmp := path + ".tmp" + defer func() { + if err := os.RemoveAll(tmp); err != nil { + level.Error(logger).Log("msg", "remove tmp file", "err", err.Error()) + } + }() f, err := os.Create(tmp) if err != nil { @@ -246,7 +252,6 @@ func writeMetaFile(dir string, meta *BlockMeta) error { enc.SetIndent("", "\t") var merr tsdb_errors.MultiError - if merr.Add(enc.Encode(meta)); merr.Err() != nil { merr.Add(f.Close()) return merr.Err() @@ -259,7 +264,7 @@ func writeMetaFile(dir string, meta *BlockMeta) error { if err := f.Close(); err != nil { return err } - return renameFile(tmp, path) + return fileutil.Replace(tmp, path) } // Block represents a directory of time series data covering a continuous time range. @@ -278,6 +283,8 @@ type Block struct { chunkr ChunkReader indexr IndexReader tombstones TombstoneReader + + logger log.Logger } // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used @@ -322,7 +329,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er // that would be the logical place for a block size to be calculated. bs := blockSize(cr, ir, tsr) meta.Stats.NumBytes = bs - err = writeMetaFile(dir, meta) + err = writeMetaFile(logger, dir, meta) if err != nil { level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err) } @@ -334,6 +341,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er indexr: ir, tombstones: tr, symbolTableSize: ir.SymbolTableSize(), + logger: logger, } return pb, nil } @@ -429,7 +437,7 @@ func (pb *Block) GetSymbolTableSize() uint64 { func (pb *Block) setCompactionFailed() error { pb.meta.Compaction.Failed = true - return writeMetaFile(pb.dir, &pb.meta) + return writeMetaFile(pb.logger, pb.dir, &pb.meta) } type blockIndexReader struct { @@ -553,10 +561,10 @@ Outer: pb.tombstones = stones pb.meta.Stats.NumTombstones = pb.tombstones.Total() - if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil { + if err := writeTombstoneFile(pb.logger, pb.dir, pb.tombstones); err != nil { return err } - return writeMetaFile(pb.dir, &pb.meta) + return writeMetaFile(pb.logger, pb.dir, &pb.meta) } // CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones). diff --git a/block_test.go b/block_test.go index c6c3951df..bdfd58fbc 100644 --- a/block_test.go +++ b/block_test.go @@ -39,7 +39,7 @@ func TestBlockMetaMustNeverBeVersion2(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - testutil.Ok(t, writeMetaFile(dir, &BlockMeta{})) + testutil.Ok(t, writeMetaFile(log.NewNopLogger(), dir, &BlockMeta{})) meta, err := readMetaFile(dir) testutil.Ok(t, err) diff --git a/compact.go b/compact.go index 4a360d028..c0948bbf3 100644 --- a/compact.go +++ b/compact.go @@ -426,7 +426,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u if meta.Stats.NumSamples == 0 { for _, b := range bs { b.meta.Compaction.Deletable = true - if err = writeMetaFile(b.dir, &b.meta); err != nil { + if err = writeMetaFile(c.logger, b.dir, &b.meta); err != nil { level.Error(c.logger).Log( "msg", "Failed to write 'Deletable' to meta file after compaction", "ulid", b.meta.ULID, @@ -609,12 +609,12 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe return nil } - if err = writeMetaFile(tmp, meta); err != nil { + if err = writeMetaFile(c.logger, tmp, meta); err != nil { return errors.Wrap(err, "write merged meta") } // Create an empty tombstones file. - if err := writeTombstoneFile(tmp, newMemTombstones()); err != nil { + if err := writeTombstoneFile(c.logger, tmp, newMemTombstones()); err != nil { return errors.Wrap(err, "write new tombstones file") } @@ -639,7 +639,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe df = nil // Block successfully written, make visible and remove old ones. - if err := renameFile(tmp, dir); err != nil { + if err := fileutil.Replace(tmp, dir); err != nil { return errors.Wrap(err, "rename block dir") } @@ -1013,24 +1013,3 @@ func (c *compactionMerger) Err() error { func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, Intervals) { return c.l, c.c, c.intervals } - -func renameFile(from, to string) error { - if err := os.RemoveAll(to); err != nil { - return err - } - if err := os.Rename(from, to); err != nil { - return err - } - - // Directory was renamed; sync parent dir to persist rename. - pdir, err := fileutil.OpenDir(filepath.Dir(to)) - if err != nil { - return err - } - - if err = pdir.Sync(); err != nil { - pdir.Close() - return err - } - return pdir.Close() -} diff --git a/repair.go b/repair.go index a7fa084f0..38138b12a 100644 --- a/repair.go +++ b/repair.go @@ -23,6 +23,8 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" + tsdb_errors "github.com/prometheus/tsdb/errors" + "github.com/prometheus/tsdb/fileutil" ) // repairBadIndexVersion repairs an issue in index and meta.json persistence introduced in @@ -38,6 +40,16 @@ func repairBadIndexVersion(logger log.Logger, dir string) error { wrapErr := func(err error, d string) error { return errors.Wrapf(err, "block dir: %q", d) } + + tmpFiles := make([]string, 0, len(dir)) + defer func() { + for _, tmp := range tmpFiles { + if err := os.RemoveAll(tmp); err != nil { + level.Error(logger).Log("msg", "remove tmp file", "err", err.Error()) + } + } + }() + for _, d := range dirs { meta, err := readBogusMetaFile(d) if err != nil { @@ -63,6 +75,8 @@ func repairBadIndexVersion(logger log.Logger, dir string) error { if err != nil { return wrapErr(err, d) } + tmpFiles = append(tmpFiles, repl.Name()) + broken, err := os.Open(filepath.Join(d, indexFilename)) if err != nil { return wrapErr(err, d) @@ -70,12 +84,19 @@ func repairBadIndexVersion(logger log.Logger, dir string) error { if _, err := io.Copy(repl, broken); err != nil { return wrapErr(err, d) } + + var merr tsdb_errors.MultiError + // Set the 5th byte to 2 to indicate the correct file format version. if _, err := repl.WriteAt([]byte{2}, 4); err != nil { - return wrapErr(err, d) + merr.Add(wrapErr(err, d)) + merr.Add(wrapErr(repl.Close(), d)) + return merr.Err() } if err := repl.Sync(); err != nil { - return wrapErr(err, d) + merr.Add(wrapErr(err, d)) + merr.Add(wrapErr(repl.Close(), d)) + return merr.Err() } if err := repl.Close(); err != nil { return wrapErr(err, d) @@ -83,12 +104,12 @@ func repairBadIndexVersion(logger log.Logger, dir string) error { if err := broken.Close(); err != nil { return wrapErr(err, d) } - if err := renameFile(repl.Name(), broken.Name()); err != nil { + if err := fileutil.Replace(repl.Name(), broken.Name()); err != nil { return wrapErr(err, d) } // Reset version of meta.json to 1. meta.Version = 1 - if err := writeMetaFile(d, meta); err != nil { + if err := writeMetaFile(logger, d, meta); err != nil { return wrapErr(err, d) } } diff --git a/tombstones.go b/tombstones.go index ec025544a..220af4900 100644 --- a/tombstones.go +++ b/tombstones.go @@ -22,9 +22,12 @@ import ( "path/filepath" "sync" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/tsdb/encoding" tsdb_errors "github.com/prometheus/tsdb/errors" + "github.com/prometheus/tsdb/fileutil" ) const tombstoneFilename = "tombstones" @@ -51,7 +54,7 @@ type TombstoneReader interface { Close() error } -func writeTombstoneFile(dir string, tr TombstoneReader) error { +func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) error { path := filepath.Join(dir, tombstoneFilename) tmp := path + ".tmp" hash := newCRC32() @@ -62,7 +65,12 @@ func writeTombstoneFile(dir string, tr TombstoneReader) error { } defer func() { if f != nil { - f.Close() + if err := f.Close(); err != nil { + level.Error(logger).Log("msg", "close tmp file", "err", err.Error()) + } + } + if err := os.RemoveAll(tmp); err != nil { + level.Error(logger).Log("msg", "remove tmp file", "err", err.Error()) } }() @@ -111,7 +119,7 @@ func writeTombstoneFile(dir string, tr TombstoneReader) error { return err } f = nil - return renameFile(tmp, path) + return fileutil.Replace(tmp, path) } // Stone holds the information on the posting and time-range diff --git a/tombstones_test.go b/tombstones_test.go index d60ef832d..7b0d70b6b 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/go-kit/kit/log" "github.com/prometheus/tsdb/testutil" ) @@ -46,7 +47,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) { stones.addInterval(ref, dranges...) } - testutil.Ok(t, writeTombstoneFile(tmpdir, stones)) + testutil.Ok(t, writeTombstoneFile(log.NewNopLogger(), tmpdir, stones)) restr, _, err := readTombstones(tmpdir) testutil.Ok(t, err) diff --git a/wal.go b/wal.go index d7ffe0c1e..86b3bf79c 100644 --- a/wal.go +++ b/wal.go @@ -338,6 +338,12 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error { if err != nil { return errors.Wrap(err, "create compaction segment") } + defer func() { + if err := os.RemoveAll(f.Name()); err != nil { + level.Error(w.logger).Log("msg", "remove tmp file", "err", err.Error()) + } + }() + var ( csf = newSegmentFile(f) crc32 = newCRC32() @@ -389,7 +395,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error { csf.Close() candidates[0].Close() // need close before remove on platform windows - if err := renameFile(csf.Name(), candidates[0].Name()); err != nil { + if err := fileutil.Replace(csf.Name(), candidates[0].Name()); err != nil { return errors.Wrap(err, "rename compaction segment") } for _, f := range candidates[1:] {