From 1fc94a02d12a5ed976993ea8ad33f4f5fd554143 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Tue, 21 Nov 2017 16:45:02 +0530 Subject: [PATCH] Don't retry failed compactions. Fixes prometheus/prometheus#3487 Signed-off-by: Goutham Veeramachaneni --- block.go | 6 +++ block_test.go | 40 ++++++++++++++++++++ compact.go | 36 ++++++++++++++++-- compact_test.go | 98 +++++++++++++++++++++++++++++++++++++++++++------ db_test.go | 8 ++-- 5 files changed, 170 insertions(+), 18 deletions(-) diff --git a/block.go b/block.go index 034f33673..a904008af 100644 --- a/block.go +++ b/block.go @@ -77,6 +77,7 @@ type BlockMetaCompaction struct { Level int `json:"level"` // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` + Failed bool `json:"failed,omitempty"` } const ( @@ -245,6 +246,11 @@ func (pb *Block) Tombstones() (TombstoneReader, error) { return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil } +func (pb *Block) setCompactionFailed() error { + pb.meta.Compaction.Failed = true + return writeMetaFile(pb.dir, &pb.meta) +} + type blockIndexReader struct { IndexReader b *Block diff --git a/block_test.go b/block_test.go index e75d4ac3f..e7b662528 100644 --- a/block_test.go +++ b/block_test.go @@ -12,3 +12,43 @@ // limitations under the License. package tsdb + +import ( + "io/ioutil" + "os" + "testing" +) + +func TestSetCompactionFailed(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "test-tsdb") + Ok(t, err) + + b := createEmptyBlock(t, tmpdir) + + Equals(t, false, b.meta.Compaction.Failed) + Ok(t, b.setCompactionFailed()) + Equals(t, true, b.meta.Compaction.Failed) + Ok(t, b.Close()) + + b, err = OpenBlock(tmpdir, nil) + Ok(t, err) + Equals(t, true, b.meta.Compaction.Failed) +} + +func createEmptyBlock(t *testing.T, dir string) *Block { + Ok(t, os.MkdirAll(dir, 0777)) + + Ok(t, writeMetaFile(dir, &BlockMeta{})) + + ir, err := newIndexWriter(dir) + Ok(t, err) + Ok(t, ir.Close()) + + Ok(t, os.MkdirAll(chunkDir(dir), 0777)) + + Ok(t, writeTombstoneFile(dir, newEmptyTombstoneReader())) + + b, err := OpenBlock(dir, nil) + Ok(t, err) + return b +} diff --git a/compact.go b/compact.go index 955ba3caf..62868d38c 100644 --- a/compact.go +++ b/compact.go @@ -205,7 +205,15 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta { continue } + Outer: for _, p := range parts { + // Donot select the range if it has a block whose compaction failed. + for _, dm := range p { + if dm.meta.Compaction.Failed { + continue Outer + } + } + mint := p[0].meta.MinTime maxt := p[len(p)-1].meta.MaxTime // Pick the range of blocks if it spans the full range (potentially with gaps) @@ -297,6 +305,7 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { // provided directories. func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) { var blocks []BlockReader + var bs []*Block var metas []*BlockMeta for _, d := range dirs { @@ -313,12 +322,27 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) { metas = append(metas, meta) blocks = append(blocks, b) + bs = append(bs, b) } entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) - return c.write(dest, compactBlockMetas(uid, metas...), blocks...) + err = c.write(dest, compactBlockMetas(uid, metas...), blocks...) + if err == nil { + return nil + } + + var merr MultiError + merr.Add(err) + + for _, b := range bs { + if err := b.setCompactionFailed(); err != nil { + merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir())) + } + } + + return merr } func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) error { @@ -360,17 +384,21 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...ChunkMeta) error { func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { level.Info(c.logger).Log("msg", "compact blocks", "count", len(blocks), "mint", meta.MinTime, "maxt", meta.MaxTime) + dir := filepath.Join(dest, meta.ULID.String()) + tmp := dir + ".tmp" + defer func(t time.Time) { if err != nil { c.metrics.failed.Inc() + // TODO(gouthamve): Handle error how? + if err := os.RemoveAll(tmp); err != nil { + level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) + } } c.metrics.ran.Inc() c.metrics.duration.Observe(time.Since(t).Seconds()) }(time.Now()) - dir := filepath.Join(dest, meta.ULID.String()) - tmp := dir + ".tmp" - if err = os.RemoveAll(tmp); err != nil { return err } diff --git a/compact_test.go b/compact_test.go index d1650cd16..a8ae48fce 100644 --- a/compact_test.go +++ b/compact_test.go @@ -14,8 +14,13 @@ package tsdb import ( + "io/ioutil" + "os" + "path/filepath" "testing" + "github.com/go-kit/kit/log" + "github.com/pkg/errors" "github.com/stretchr/testify/require" ) @@ -157,17 +162,6 @@ func TestLeveledCompactor_plan(t *testing.T) { }, nil) require.NoError(t, err) - metaRange := func(name string, mint, maxt int64, stats *BlockStats) dirMeta { - meta := &BlockMeta{MinTime: mint, MaxTime: maxt} - if stats != nil { - meta.Stats = *stats - } - return dirMeta{ - dir: name, - meta: meta, - } - } - cases := []struct { metas []dirMeta expected []string @@ -274,3 +268,85 @@ func TestLeveledCompactor_plan(t *testing.T) { require.Equal(t, c.expected, res, "test case %d", i) } } + +func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { + compactor, err := NewLeveledCompactor(nil, nil, []int64{ + 20, + 60, + 240, + 720, + 2160, + }, nil) + Ok(t, err) + + cases := []struct { + metas []dirMeta + }{ + { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 20, 40, nil), + metaRange("3", 40, 60, nil), + }, + }, + { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 20, 40, nil), + metaRange("3", 60, 80, nil), + }, + }, + { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 20, 40, nil), + metaRange("3", 40, 60, nil), + metaRange("4", 60, 120, nil), + metaRange("5", 120, 180, nil), + }, + }, + } + + for _, c := range cases { + c.metas[1].meta.Compaction.Failed = true + res, err := compactor.plan(c.metas) + Ok(t, err) + + Equals(t, []string(nil), res) + } +} + +func TestCompactionFailWillCleanUpTempDir(t *testing.T) { + compactor, err := NewLeveledCompactor(nil, log.NewNopLogger(), []int64{ + 20, + 60, + 240, + 720, + 2160, + }, nil) + Ok(t, err) + + tmpdir, err := ioutil.TempDir("", "test") + Ok(t, err) + + NotOk(t, compactor.write(tmpdir, &BlockMeta{}, erringBReader{})) + _, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + ".tmp") + Assert(t, os.IsNotExist(err), "directory is not cleaned up") +} + +func metaRange(name string, mint, maxt int64, stats *BlockStats) dirMeta { + meta := &BlockMeta{MinTime: mint, MaxTime: maxt} + if stats != nil { + meta.Stats = *stats + } + return dirMeta{ + dir: name, + meta: meta, + } +} + +type erringBReader struct{} + +func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") } +func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") } +func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") } diff --git a/db_test.go b/db_test.go index 9f91705e7..4be2d889e 100644 --- a/db_test.go +++ b/db_test.go @@ -27,9 +27,10 @@ import ( ) func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) { - tmpdir, _ := ioutil.TempDir("", "test") + tmpdir, err := ioutil.TempDir("", "test") + Ok(t, err) - db, err := Open(tmpdir, nil, nil, opts) + db, err = Open(tmpdir, nil, nil, opts) require.NoError(t, err) // Do not close the test database by default as it will deadlock on test failures. @@ -526,7 +527,8 @@ func TestDB_e2e(t *testing.T) { } func TestWALFlushedOnDBClose(t *testing.T) { - tmpdir, _ := ioutil.TempDir("", "test") + tmpdir, err := ioutil.TempDir("", "test") + Ok(t, err) defer os.RemoveAll(tmpdir) db, err := Open(tmpdir, nil, nil, nil)