Cleanup new blocks on 'CleanTombstones' faliure.
Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
This commit is contained in:
parent
ae33d7873d
commit
528439aa93
15
block.go
15
block.go
|
@ -468,9 +468,9 @@ Outer:
|
|||
return writeMetaFile(pb.dir, &pb.meta)
|
||||
}
|
||||
|
||||
// CleanTombstones will rewrite the block if there any tombstones to remove them
|
||||
// and returns if there was a re-write.
|
||||
func (pb *Block) CleanTombstones(dest string, c Compactor) (bool, error) {
|
||||
// CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones).
|
||||
// If there was a rewrite, then it returns the ULID of the new block written, else nil.
|
||||
func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) {
|
||||
numStones := 0
|
||||
|
||||
pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
|
||||
|
@ -480,14 +480,15 @@ func (pb *Block) CleanTombstones(dest string, c Compactor) (bool, error) {
|
|||
})
|
||||
|
||||
if numStones == 0 {
|
||||
return false, nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if _, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime); err != nil {
|
||||
return false, err
|
||||
uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
return &uid, nil
|
||||
}
|
||||
|
||||
// Snapshot creates snapshot of the block into dir.
|
||||
|
|
33
db.go
33
db.go
|
@ -835,34 +835,49 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
|||
}
|
||||
|
||||
// CleanTombstones re-writes any blocks with tombstones.
|
||||
func (db *DB) CleanTombstones() error {
|
||||
func (db *DB) CleanTombstones() (err error) {
|
||||
db.cmtx.Lock()
|
||||
defer db.cmtx.Unlock()
|
||||
|
||||
start := time.Now()
|
||||
defer db.metrics.tombCleanTimer.Observe(time.Since(start).Seconds())
|
||||
|
||||
newUIDs := []ulid.ULID{}
|
||||
defer func() {
|
||||
// If any error is caused, we need to delete all the new directory created.
|
||||
if err != nil {
|
||||
for _, uid := range newUIDs {
|
||||
dir := filepath.Join(db.Dir(), uid.String())
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
level.Error(db.logger).Log("msg", "failed to delete block after failed `CleanTombstones`", "dir", dir, "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
db.mtx.RLock()
|
||||
blocks := db.blocks[:]
|
||||
db.mtx.RUnlock()
|
||||
|
||||
deleted := []string{}
|
||||
deletable := []string{}
|
||||
for _, b := range blocks {
|
||||
ok, err := b.CleanTombstones(db.Dir(), db.compactor)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "clean tombstones: %s", b.Dir())
|
||||
uid, er := b.CleanTombstones(db.Dir(), db.compactor)
|
||||
if er != nil {
|
||||
err = errors.Wrapf(er, "clean tombstones: %s", b.Dir())
|
||||
return err
|
||||
}
|
||||
|
||||
if ok {
|
||||
deleted = append(deleted, b.Dir())
|
||||
if uid != nil { // New block was created.
|
||||
deletable = append(deletable, b.Dir())
|
||||
newUIDs = append(newUIDs, *uid)
|
||||
}
|
||||
}
|
||||
|
||||
if len(deleted) == 0 {
|
||||
if len(deletable) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.Wrap(db.reload(deleted...), "reload blocks")
|
||||
return errors.Wrap(db.reload(deletable...), "reload blocks")
|
||||
}
|
||||
|
||||
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
|
||||
|
|
Loading…
Reference in New Issue