Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
This commit is contained in:
Krasi Georgiev 2019-01-30 11:40:55 +02:00
parent 315de4c782
commit 6c34eb8b63
2 changed files with 11 additions and 12 deletions

View File

@ -765,21 +765,21 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
return 1 return 0
}, },
"Test Block Compaction": func(db *DB) int { "Test Block Compaction": func(db *DB) int {
expBlocks := []*BlockMeta{ blocks := []*BlockMeta{
{MinTime: 0, MaxTime: 100}, {MinTime: 0, MaxTime: 100},
{MinTime: 100, MaxTime: 150}, {MinTime: 100, MaxTime: 150},
{MinTime: 150, MaxTime: 200}, {MinTime: 150, MaxTime: 200},
} }
for _, m := range expBlocks { for _, m := range blocks {
createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime)) createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime))
} }
testutil.Ok(t, db.reload()) testutil.Ok(t, db.reload())
testutil.Equals(t, len(expBlocks), len(db.Blocks()), "unexpected block count after a reload") testutil.Equals(t, len(blocks), len(db.Blocks()), "unexpected block count after a reload")
return len(expBlocks) + 1 return len(blocks)
}, },
} }
@ -801,15 +801,14 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
lastBlockIndex := path.Join(blockPath, indexFilename) lastBlockIndex := path.Join(blockPath, indexFilename)
actBlocks, err := blockDirs(db.Dir()) actBlocks, err := blockDirs(db.Dir())
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, expBlocks, len(actBlocks)) testutil.Equals(t, expBlocks, len(actBlocks)-1) // -1 to exclude the corrupted block.
testutil.Ok(t, os.RemoveAll(lastBlockIndex)) // Corrupt the block by removing the index file. testutil.Ok(t, os.RemoveAll(lastBlockIndex)) // Corrupt the block by removing the index file.
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "initial 'failed db reload' count metrics mismatch") testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "initial 'failed db reload' count metrics mismatch")
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial `compactions` count metric mismatch") testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial `compactions` count metric mismatch")
// Do the compaction and check the metrics. // Do the compaction and check the metrics.
// Since the most recent block is not included in the compaction, // Compaction should succeed, but the reload should fail and
// the compaction should succeed, but the reload should fail and
// the new block created from the compaction should be deleted. // the new block created from the compaction should be deleted.
db.EnableCompactions() db.EnableCompactions()
testutil.NotOk(t, db.compact()) testutil.NotOk(t, db.compact())
@ -817,7 +816,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "`compaction` count metric mismatch") testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "`compaction` count metric mismatch")
actBlocks, err = blockDirs(db.Dir()) actBlocks, err = blockDirs(db.Dir())
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, expBlocks, len(actBlocks)) testutil.Equals(t, expBlocks, len(actBlocks)-1, "block count should be the same as before the compaction") // -1 to exclude the corrupted block.
}) })
} }
} }

4
db.go
View File

@ -426,7 +426,7 @@ func (db *DB) compact() (err error) {
if err := db.reload(); err != nil { if err := db.reload(); err != nil {
if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil {
return errors.Wrapf(err, "delete persisted head block after unsuccessful db reload:%s", uid) return errors.Wrapf(err, "delete persisted head block after failed db reload:%s", uid)
} }
return errors.Wrap(err, "reload blocks") return errors.Wrap(err, "reload blocks")
} }
@ -465,7 +465,7 @@ func (db *DB) compact() (err error) {
if err := db.reload(); err != nil { if err := db.reload(); err != nil {
if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil {
return errors.Wrapf(err, "delete compacted block after unsuccessful db reload:%s", uid) return errors.Wrapf(err, "delete compacted block after failed db reload:%s", uid)
} }
return errors.Wrap(err, "reload blocks") return errors.Wrap(err, "reload blocks")
} }