Add retention deletion
This commit is contained in:
parent
4962175218
commit
d0770302ed
88
db.go
88
db.go
|
@ -207,10 +207,13 @@ func (db *DB) run() {
|
|||
|
||||
var merr MultiError
|
||||
|
||||
changes, err := db.compact()
|
||||
changes1, err := db.retentionCutoff()
|
||||
merr.Add(err)
|
||||
|
||||
if changes {
|
||||
changes2, err := db.compact()
|
||||
merr.Add(err)
|
||||
|
||||
if changes1 || changes2 {
|
||||
merr.Add(db.reloadBlocks())
|
||||
}
|
||||
if err := merr.Err(); err != nil {
|
||||
|
@ -223,11 +226,31 @@ func (db *DB) run() {
|
|||
}
|
||||
}
|
||||
|
||||
func (db *DB) compact() (changes bool, err error) {
|
||||
// Check whether we have pending head blocks that are ready to be persisted.
|
||||
// They have the highest priority.
|
||||
func (db *DB) retentionCutoff() (bool, error) {
|
||||
db.headmtx.RLock()
|
||||
|
||||
if db.opts.RetentionDuration == 0 {
|
||||
return false, nil
|
||||
}
|
||||
// We don't count the span covered by head blocks towards the
|
||||
// retention time as it generally makes up a fraction of it.
|
||||
if len(db.persisted) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
last := db.persisted[len(db.persisted)-1]
|
||||
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
|
||||
|
||||
db.headmtx.RUnlock()
|
||||
|
||||
return retentionCutoff(db.dir, mint)
|
||||
}
|
||||
|
||||
func (db *DB) compact() (changes bool, err error) {
|
||||
db.headmtx.RLock()
|
||||
|
||||
// Check whether we have pending head blocks that are ready to be persisted.
|
||||
// They have the highest priority.
|
||||
var singles []*headBlock
|
||||
|
||||
// Collect head blocks that are ready for compaction. Write them after
|
||||
|
@ -297,36 +320,35 @@ Loop:
|
|||
return changes, nil
|
||||
}
|
||||
|
||||
// func (db *DB) retentionCutoff() error {
|
||||
// if db.opts.RetentionDuration == 0 {
|
||||
// return nil
|
||||
// }
|
||||
// h := db.heads[len(db.heads)-1]
|
||||
// t := h.meta.MinTime - int64(db.opts.RetentionDuration)
|
||||
// retentionCutoff deletes all directories of blocks in dir that are strictly
|
||||
// before mint.
|
||||
func retentionCutoff(dir string, mint int64) (bool, error) {
|
||||
dirs, err := blockDirs(dir)
|
||||
if err != nil {
|
||||
return false, errors.Wrapf(err, "list block dirs %s", dir)
|
||||
}
|
||||
|
||||
// var (
|
||||
// blocks = db.blocks()
|
||||
// i int
|
||||
// b Block
|
||||
// )
|
||||
// for i, b = range blocks {
|
||||
// if b.Meta().MinTime >= t {
|
||||
// break
|
||||
// }
|
||||
// }
|
||||
// if i <= 1 {
|
||||
// return nil
|
||||
// }
|
||||
// db.logger.Log("msg", "retention cutoff", "idx", i-1)
|
||||
// db.removeBlocks(0, i)
|
||||
changes := false
|
||||
|
||||
// for _, b := range blocks[:i] {
|
||||
// if err := os.RemoveAll(b.Dir()); err != nil {
|
||||
// return errors.Wrap(err, "removing old block")
|
||||
// }
|
||||
// }
|
||||
// return nil
|
||||
// }
|
||||
for _, dir := range dirs {
|
||||
meta, err := readMetaFile(dir)
|
||||
if err != nil {
|
||||
return changes, errors.Wrapf(err, "read block meta %s", dir)
|
||||
}
|
||||
// The first block we encounter marks that we crossed the boundary
|
||||
// of deletable blocks.
|
||||
if meta.MaxTime >= mint {
|
||||
break
|
||||
}
|
||||
changes = true
|
||||
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
return changes, err
|
||||
}
|
||||
}
|
||||
|
||||
return changes, nil
|
||||
}
|
||||
|
||||
func (db *DB) reloadBlocks() error {
|
||||
var cs []io.Closer
|
||||
|
|
Loading…
Reference in New Issue