diff --git a/db.go b/db.go index f1972f00b..c6c31cd2c 100644 --- a/db.go +++ b/db.go @@ -277,16 +277,23 @@ func (db *DB) retentionCutoff() (bool, error) { } db.mtx.RLock() - defer db.mtx.RUnlock() + blocks := db.blocks[:] + db.mtx.RUnlock() - if len(db.blocks) == 0 { + if len(blocks) == 0 { return false, nil } - last := db.blocks[len(db.blocks)-1] - mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) + last := blocks[len(db.blocks)-1] - return retentionCutoff(db.dir, mint) + mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) + dirs, err := retentionCutoffDirs(db.dir, mint) + if err != nil { + return false, err + } + + // This will close the dirs and then delete the dirs. + return len(dirs) > 0, db.reload(dirs...) } // Appender opens a new appender against the database. @@ -388,40 +395,37 @@ func (db *DB) compact() (changes bool, err error) { return changes, nil } -// retentionCutoff deletes all directories of blocks in dir that are strictly +// retentionCutoffDirs returns all directories of blocks in dir that are strictly // before mint. -func retentionCutoff(dir string, mint int64) (bool, error) { +func retentionCutoffDirs(dir string, mint int64) ([]string, error) { df, err := fileutil.OpenDir(dir) if err != nil { - return false, errors.Wrapf(err, "open directory") + return nil, errors.Wrapf(err, "open directory") } defer df.Close() dirs, err := blockDirs(dir) if err != nil { - return false, errors.Wrapf(err, "list block dirs %s", dir) + return nil, errors.Wrapf(err, "list block dirs %s", dir) } - changes := false + delDirs := []string{} for _, dir := range dirs { meta, err := readMetaFile(dir) if err != nil { - return changes, errors.Wrapf(err, "read block meta %s", dir) + return nil, errors.Wrapf(err, "read block meta %s", dir) } // The first block we encounter marks that we crossed the boundary // of deletable blocks. if meta.MaxTime >= mint { break } - changes = true - if err := os.RemoveAll(dir); err != nil { - return changes, err - } + delDirs = append(delDirs, dir) } - return changes, fileutil.Fsync(df) + return delDirs, nil } func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { diff --git a/db_test.go b/db_test.go index 8eb676af8..d2ad55129 100644 --- a/db_test.go +++ b/db_test.go @@ -556,3 +556,60 @@ func TestWALFlushedOnDBClose(t *testing.T) { require.NoError(t, err) require.Equal(t, values, []string{"labelvalue"}) } + +func TestDB_Retention(t *testing.T) { + tmpdir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(tmpdir) + + db, err := Open(tmpdir, nil, nil, nil) + require.NoError(t, err) + + lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}} + + app := db.Appender() + _, err = app.Add(lbls, 0, 1) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // create snapshot to make it create a block. + // TODO(gouthamve): Add a method to compact headblock. + snap, err := ioutil.TempDir("", "snap") + require.NoError(t, err) + require.NoError(t, db.Snapshot(snap)) + require.NoError(t, db.Close()) + defer os.RemoveAll(snap) + + // reopen DB from snapshot + db, err = Open(snap, nil, nil, nil) + require.NoError(t, err) + + Equals(t, 1, len(db.blocks)) + + app = db.Appender() + _, err = app.Add(lbls, 100, 1) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // Snapshot again to create another block. + snap, err = ioutil.TempDir("", "snap") + require.NoError(t, err) + require.NoError(t, db.Snapshot(snap)) + require.NoError(t, db.Close()) + defer os.RemoveAll(snap) + + // reopen DB from snapshot + db, err = Open(snap, nil, nil, &Options{ + RetentionDuration: 10, + BlockRanges: []int64{50}, + }) + require.NoError(t, err) + + Equals(t, 2, len(db.blocks)) + + // Now call rentention. + changes, err := db.retentionCutoff() + Ok(t, err) + Assert(t, changes, "there should be changes") + Equals(t, 1, len(db.blocks)) + Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block. +}