Merge pull request #207 from Gouthamve/compact-fail

Don't retry failed compactions.
This commit is contained in:
Goutham Veeramachaneni 2017-11-30 12:58:19 +05:30 committed by GitHub
commit 24ff293dc9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 170 additions and 18 deletions

View File

@ -77,6 +77,7 @@ type BlockMetaCompaction struct {
Level int `json:"level"`
// ULIDs of all source head blocks that went into the block.
Sources []ulid.ULID `json:"sources,omitempty"`
Failed bool `json:"failed,omitempty"`
}
const (
@ -244,6 +245,11 @@ func (pb *Block) Tombstones() (TombstoneReader, error) {
return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil
}
func (pb *Block) setCompactionFailed() error {
pb.meta.Compaction.Failed = true
return writeMetaFile(pb.dir, &pb.meta)
}
type blockIndexReader struct {
IndexReader
b *Block

View File

@ -12,3 +12,43 @@
// limitations under the License.
package tsdb
import (
"io/ioutil"
"os"
"testing"
)
func TestSetCompactionFailed(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test-tsdb")
Ok(t, err)
b := createEmptyBlock(t, tmpdir)
Equals(t, false, b.meta.Compaction.Failed)
Ok(t, b.setCompactionFailed())
Equals(t, true, b.meta.Compaction.Failed)
Ok(t, b.Close())
b, err = OpenBlock(tmpdir, nil)
Ok(t, err)
Equals(t, true, b.meta.Compaction.Failed)
}
func createEmptyBlock(t *testing.T, dir string) *Block {
Ok(t, os.MkdirAll(dir, 0777))
Ok(t, writeMetaFile(dir, &BlockMeta{}))
ir, err := newIndexWriter(dir)
Ok(t, err)
Ok(t, ir.Close())
Ok(t, os.MkdirAll(chunkDir(dir), 0777))
Ok(t, writeTombstoneFile(dir, newEmptyTombstoneReader()))
b, err := OpenBlock(dir, nil)
Ok(t, err)
return b
}

View File

@ -205,7 +205,15 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {
continue
}
Outer:
for _, p := range parts {
// Donot select the range if it has a block whose compaction failed.
for _, dm := range p {
if dm.meta.Compaction.Failed {
continue Outer
}
}
mint := p[0].meta.MinTime
maxt := p[len(p)-1].meta.MaxTime
// Pick the range of blocks if it spans the full range (potentially with gaps)
@ -297,6 +305,7 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
// provided directories.
func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) {
var blocks []BlockReader
var bs []*Block
var metas []*BlockMeta
for _, d := range dirs {
@ -313,12 +322,27 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) {
metas = append(metas, meta)
blocks = append(blocks, b)
bs = append(bs, b)
}
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy)
return c.write(dest, compactBlockMetas(uid, metas...), blocks...)
err = c.write(dest, compactBlockMetas(uid, metas...), blocks...)
if err == nil {
return nil
}
var merr MultiError
merr.Add(err)
for _, b := range bs {
if err := b.setCompactionFailed(); err != nil {
merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir()))
}
}
return merr
}
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) {
@ -360,17 +384,21 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...ChunkMeta) error {
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) {
level.Info(c.logger).Log("msg", "compact blocks", "count", len(blocks), "mint", meta.MinTime, "maxt", meta.MaxTime)
dir := filepath.Join(dest, meta.ULID.String())
tmp := dir + ".tmp"
defer func(t time.Time) {
if err != nil {
c.metrics.failed.Inc()
// TODO(gouthamve): Handle error how?
if err := os.RemoveAll(tmp); err != nil {
level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error())
}
}
c.metrics.ran.Inc()
c.metrics.duration.Observe(time.Since(t).Seconds())
}(time.Now())
dir := filepath.Join(dest, meta.ULID.String())
tmp := dir + ".tmp"
if err = os.RemoveAll(tmp); err != nil {
return err
}

View File

@ -14,8 +14,13 @@
package tsdb
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
)
@ -157,17 +162,6 @@ func TestLeveledCompactor_plan(t *testing.T) {
}, nil)
require.NoError(t, err)
metaRange := func(name string, mint, maxt int64, stats *BlockStats) dirMeta {
meta := &BlockMeta{MinTime: mint, MaxTime: maxt}
if stats != nil {
meta.Stats = *stats
}
return dirMeta{
dir: name,
meta: meta,
}
}
cases := []struct {
metas []dirMeta
expected []string
@ -274,3 +268,85 @@ func TestLeveledCompactor_plan(t *testing.T) {
require.Equal(t, c.expected, res, "test case %d", i)
}
}
func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) {
compactor, err := NewLeveledCompactor(nil, nil, []int64{
20,
60,
240,
720,
2160,
}, nil)
Ok(t, err)
cases := []struct {
metas []dirMeta
}{
{
metas: []dirMeta{
metaRange("1", 0, 20, nil),
metaRange("2", 20, 40, nil),
metaRange("3", 40, 60, nil),
},
},
{
metas: []dirMeta{
metaRange("1", 0, 20, nil),
metaRange("2", 20, 40, nil),
metaRange("3", 60, 80, nil),
},
},
{
metas: []dirMeta{
metaRange("1", 0, 20, nil),
metaRange("2", 20, 40, nil),
metaRange("3", 40, 60, nil),
metaRange("4", 60, 120, nil),
metaRange("5", 120, 180, nil),
},
},
}
for _, c := range cases {
c.metas[1].meta.Compaction.Failed = true
res, err := compactor.plan(c.metas)
Ok(t, err)
Equals(t, []string(nil), res)
}
}
func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
compactor, err := NewLeveledCompactor(nil, log.NewNopLogger(), []int64{
20,
60,
240,
720,
2160,
}, nil)
Ok(t, err)
tmpdir, err := ioutil.TempDir("", "test")
Ok(t, err)
NotOk(t, compactor.write(tmpdir, &BlockMeta{}, erringBReader{}))
_, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + ".tmp")
Assert(t, os.IsNotExist(err), "directory is not cleaned up")
}
func metaRange(name string, mint, maxt int64, stats *BlockStats) dirMeta {
meta := &BlockMeta{MinTime: mint, MaxTime: maxt}
if stats != nil {
meta.Stats = *stats
}
return dirMeta{
dir: name,
meta: meta,
}
}
type erringBReader struct{}
func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") }
func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") }
func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") }

View File

@ -27,9 +27,10 @@ import (
)
func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) {
tmpdir, _ := ioutil.TempDir("", "test")
tmpdir, err := ioutil.TempDir("", "test")
Ok(t, err)
db, err := Open(tmpdir, nil, nil, opts)
db, err = Open(tmpdir, nil, nil, opts)
require.NoError(t, err)
// Do not close the test database by default as it will deadlock on test failures.
@ -531,7 +532,8 @@ func TestDB_e2e(t *testing.T) {
}
func TestWALFlushedOnDBClose(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test")
tmpdir, err := ioutil.TempDir("", "test")
Ok(t, err)
defer os.RemoveAll(tmpdir)
db, err := Open(tmpdir, nil, nil, nil)