diff --git a/checkpoint.go b/checkpoint.go index d1cad4c10..1c6239232 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -128,7 +128,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) defer sgmReader.Close() } - cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", to)) + cpdir := filepath.Join(w.Dir(), fmt.Sprintf(checkpointPrefix+"%06d", to)) cpdirtmp := cpdir + ".tmp" if err := os.MkdirAll(cpdirtmp, 0777); err != nil { @@ -139,6 +139,12 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) return nil, errors.Wrap(err, "open checkpoint") } + // Ensures that an early return caused by an error doesn't leave any tmp files. + defer func() { + cp.Close() + os.RemoveAll(cpdirtmp) + }() + r := wal.NewReader(sgmReader) var ( diff --git a/checkpoint_test.go b/checkpoint_test.go index 8538cf0c6..8b13c152a 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -15,11 +15,14 @@ package tsdb import ( + "fmt" "io/ioutil" "os" "path/filepath" + "strings" "testing" + "github.com/pkg/errors" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" @@ -180,3 +183,30 @@ func TestCheckpoint(t *testing.T) { {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, }, series) } + +func TestCheckpointNoTmpFolderAfterError(t *testing.T) { + // Create a new wal with an invalid records. + dir, err := ioutil.TempDir("", "test_checkpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + w, err := wal.NewSize(nil, nil, dir, 64*1024) + testutil.Ok(t, err) + testutil.Ok(t, w.Log([]byte{99})) + w.Close() + + // Run the checkpoint and since the wal contains an invalid records this should return an error. + _, err = Checkpoint(w, 0, 1, nil, 0) + testutil.NotOk(t, err) + + // Walk the wal dir to make sure there are no tmp folder left behind after the error. + err = filepath.Walk(w.Dir(), func(path string, info os.FileInfo, err error) error { + if err != nil { + return errors.Wrapf(err, "access err %q: %v\n", path, err) + } + if info.IsDir() && strings.HasSuffix(info.Name(), ".tmp") { + return fmt.Errorf("wal dir contains temporary folder:%s", info.Name()) + } + return nil + }) + testutil.Ok(t, err) +} diff --git a/wal/wal.go b/wal/wal.go index 92374e312..5433fd042 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -164,6 +164,7 @@ type WAL struct { page *page // active page stopc chan chan struct{} actorc chan func() + closed bool // To allow calling Close() more than once without blocking. fsyncDuration prometheus.Summary pageFlushes prometheus.Counter @@ -584,6 +585,10 @@ func (w *WAL) Close() (err error) { w.mtx.Lock() defer w.mtx.Unlock() + if w.closed { + return nil + } + // Flush the last page and zero out all its remaining size. // We must not flush an empty page as it would falsely signal // the segment is done if we start writing to it again after opening. @@ -603,7 +608,7 @@ func (w *WAL) Close() (err error) { if err := w.segment.Close(); err != nil { level.Error(w.logger).Log("msg", "close previous segment", "err", err) } - + w.closed = true return nil }