diff --git a/compact.go b/compact.go index 0c42bda62..16a3bd747 100644 --- a/compact.go +++ b/compact.go @@ -14,6 +14,7 @@ package tsdb import ( + "fmt" "io" "math/rand" "os" @@ -33,7 +34,7 @@ import ( "github.com/prometheus/tsdb/labels" ) -// ExponentialBlockRanges returns the time ranges based on the stepSize +// ExponentialBlockRanges returns the time ranges based on the stepSize. func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64 { ranges := make([]int64, 0, steps) curRange := minSize @@ -215,7 +216,7 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta { Outer: for _, p := range parts { - // Donot select the range if it has a block whose compaction failed. + // Do not select the range if it has a block whose compaction failed. for _, dm := range p { if dm.meta.Compaction.Failed { continue Outer @@ -312,9 +313,12 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { // Compact creates a new block in the compactor's directory from the blocks in the // provided directories. func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID, err error) { - var blocks []BlockReader - var bs []*Block - var metas []*BlockMeta + var ( + blocks []BlockReader + bs []*Block + metas []*BlockMeta + uids []string + ) for _, d := range dirs { b, err := OpenBlock(d, c.chunkPool) @@ -331,13 +335,23 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID, metas = append(metas, meta) blocks = append(blocks, b) bs = append(bs, b) + uids = append(uids, meta.ULID.String()) } entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid = ulid.MustNew(ulid.Now(), entropy) - err = c.write(dest, compactBlockMetas(uid, metas...), blocks...) + meta := compactBlockMetas(uid, metas...) + err = c.write(dest, meta, blocks...) if err == nil { + level.Info(c.logger).Log( + "msg", "compact blocks", + "count", len(blocks), + "mint", meta.MinTime, + "maxt", meta.MaxTime, + "ulid", meta.ULID, + "sources", fmt.Sprintf("%v", uids), + ) return uid, nil } @@ -365,7 +379,13 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) ( meta.Compaction.Level = 1 meta.Compaction.Sources = []ulid.ULID{uid} - return uid, c.write(dest, meta, b) + err := c.write(dest, meta, b) + if err != nil { + return uid, err + } + + level.Info(c.logger).Log("msg", "write block", "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID) + return uid, nil } // instrumentedChunkWriter is used for level 1 compactions to record statistics @@ -390,8 +410,6 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error { // write creates a new block that is the union of the provided blocks into dir. // It cleans up all files of the old blocks after completing successfully. func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { - level.Info(c.logger).Log("msg", "compact blocks", "count", len(blocks), "mint", meta.MinTime, "maxt", meta.MaxTime) - dir := filepath.Join(dest, meta.ULID.String()) tmp := dir + ".tmp" @@ -472,7 +490,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe return errors.Wrap(err, "sync temporary dir file") } - // close temp dir before rename block dir(for windows platform) + // Close temp dir before rename block dir (for windows platform). if err = df.Close(); err != nil { return errors.Wrap(err, "close temporary dir") } @@ -482,6 +500,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err := renameFile(tmp, dir); err != nil { return errors.Wrap(err, "rename block dir") } + return nil } @@ -718,11 +737,6 @@ type compactionMerger struct { intervals Intervals } -type compactionSeries struct { - labels labels.Labels - chunks []*chunks.Meta -} - func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) { c := &compactionMerger{ a: a, diff --git a/repair.go b/repair.go index cc0f6e4a9..2722609a7 100644 --- a/repair.go +++ b/repair.go @@ -36,9 +36,20 @@ func repairBadIndexVersion(logger log.Logger, dir string) error { return err } if meta.Version == 1 { + level.Info(logger).Log( + "msg", "found healthy block", + "mint", meta.MinTime, + "maxt", meta.MaxTime, + "ulid", meta.ULID, + ) continue } - level.Info(logger).Log("msg", "fixing broken block", "ulid", meta.ULID) + level.Info(logger).Log( + "msg", "fixing broken block", + "mint", meta.MinTime, + "maxt", meta.MaxTime, + "ulid", meta.ULID, + ) repl, err := os.Create(filepath.Join(d, "index.repaired")) if err != nil {