Close the retention blocks before deleting them.
Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
parent
a00d700d48
commit
41fd9c66ef
36
db.go
36
db.go
|
@ -277,16 +277,23 @@ func (db *DB) retentionCutoff() (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
db.mtx.RLock()
|
db.mtx.RLock()
|
||||||
defer db.mtx.RUnlock()
|
blocks := db.blocks[:]
|
||||||
|
db.mtx.RUnlock()
|
||||||
|
|
||||||
if len(db.blocks) == 0 {
|
if len(blocks) == 0 {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
last := db.blocks[len(db.blocks)-1]
|
last := blocks[len(db.blocks)-1]
|
||||||
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
|
|
||||||
|
|
||||||
return retentionCutoff(db.dir, mint)
|
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
|
||||||
|
dirs, err := retentionCutoffDirs(db.dir, mint)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// This will close the dirs and then delete the dirs.
|
||||||
|
return len(dirs) > 0, db.reload(dirs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Appender opens a new appender against the database.
|
// Appender opens a new appender against the database.
|
||||||
|
@ -388,40 +395,37 @@ func (db *DB) compact() (changes bool, err error) {
|
||||||
return changes, nil
|
return changes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// retentionCutoff deletes all directories of blocks in dir that are strictly
|
// retentionCutoffDirs returns all directories of blocks in dir that are strictly
|
||||||
// before mint.
|
// before mint.
|
||||||
func retentionCutoff(dir string, mint int64) (bool, error) {
|
func retentionCutoffDirs(dir string, mint int64) ([]string, error) {
|
||||||
df, err := fileutil.OpenDir(dir)
|
df, err := fileutil.OpenDir(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, errors.Wrapf(err, "open directory")
|
return nil, errors.Wrapf(err, "open directory")
|
||||||
}
|
}
|
||||||
defer df.Close()
|
defer df.Close()
|
||||||
|
|
||||||
dirs, err := blockDirs(dir)
|
dirs, err := blockDirs(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, errors.Wrapf(err, "list block dirs %s", dir)
|
return nil, errors.Wrapf(err, "list block dirs %s", dir)
|
||||||
}
|
}
|
||||||
|
|
||||||
changes := false
|
delDirs := []string{}
|
||||||
|
|
||||||
for _, dir := range dirs {
|
for _, dir := range dirs {
|
||||||
meta, err := readMetaFile(dir)
|
meta, err := readMetaFile(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return changes, errors.Wrapf(err, "read block meta %s", dir)
|
return nil, errors.Wrapf(err, "read block meta %s", dir)
|
||||||
}
|
}
|
||||||
// The first block we encounter marks that we crossed the boundary
|
// The first block we encounter marks that we crossed the boundary
|
||||||
// of deletable blocks.
|
// of deletable blocks.
|
||||||
if meta.MaxTime >= mint {
|
if meta.MaxTime >= mint {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
changes = true
|
|
||||||
|
|
||||||
if err := os.RemoveAll(dir); err != nil {
|
delDirs = append(delDirs, dir)
|
||||||
return changes, err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return changes, fileutil.Fsync(df)
|
return delDirs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
|
func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
|
||||||
|
|
57
db_test.go
57
db_test.go
|
@ -551,3 +551,60 @@ func TestWALFlushedOnDBClose(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, values, []string{"labelvalue"})
|
require.Equal(t, values, []string{"labelvalue"})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDB_Retention(t *testing.T) {
|
||||||
|
tmpdir, _ := ioutil.TempDir("", "test")
|
||||||
|
defer os.RemoveAll(tmpdir)
|
||||||
|
|
||||||
|
db, err := Open(tmpdir, nil, nil, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}}
|
||||||
|
|
||||||
|
app := db.Appender()
|
||||||
|
_, err = app.Add(lbls, 0, 1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
|
||||||
|
// create snapshot to make it create a block.
|
||||||
|
// TODO(gouthamve): Add a method to compact headblock.
|
||||||
|
snap, err := ioutil.TempDir("", "snap")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, db.Snapshot(snap))
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
defer os.RemoveAll(snap)
|
||||||
|
|
||||||
|
// reopen DB from snapshot
|
||||||
|
db, err = Open(snap, nil, nil, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
Equals(t, 1, len(db.blocks))
|
||||||
|
|
||||||
|
app = db.Appender()
|
||||||
|
_, err = app.Add(lbls, 100, 1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
|
||||||
|
// Snapshot again to create another block.
|
||||||
|
snap, err = ioutil.TempDir("", "snap")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, db.Snapshot(snap))
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
defer os.RemoveAll(snap)
|
||||||
|
|
||||||
|
// reopen DB from snapshot
|
||||||
|
db, err = Open(snap, nil, nil, &Options{
|
||||||
|
RetentionDuration: 10,
|
||||||
|
BlockRanges: []int64{50},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
Equals(t, 2, len(db.blocks))
|
||||||
|
|
||||||
|
// Now call rentention.
|
||||||
|
changes, err := db.retentionCutoff()
|
||||||
|
Ok(t, err)
|
||||||
|
Assert(t, changes, "there should be changes")
|
||||||
|
Equals(t, 1, len(db.blocks))
|
||||||
|
Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block.
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue