diff --git a/block.go b/block.go index 733b1bd6b..bc9f581ab 100644 --- a/block.go +++ b/block.go @@ -113,7 +113,7 @@ type BlockStats struct { type BlockMetaCompaction struct { // Maximum number of compaction cycles any source block has // gone through. - Generation int `json:"generation"` + Level int `json:"level"` // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` } diff --git a/chunks/chunk.go b/chunks/chunk.go index 21b00b3eb..181693fae 100644 --- a/chunks/chunk.go +++ b/chunks/chunk.go @@ -104,6 +104,9 @@ func (p *pool) Put(c Chunk) error { switch c.Encoding() { case EncXOR: xc, ok := c.(*XORChunk) + // This may happen often with wrapped chunks. Nothing we can really do about + // it but returning an error would cause a lot of allocations again. Thus, + // we just skip it. if !ok { return nil } diff --git a/compact.go b/compact.go index 5d2e4c4ce..3860509d8 100644 --- a/compact.go +++ b/compact.go @@ -48,22 +48,22 @@ type Compactor interface { // Plan returns a set of non-overlapping directories that can // be compacted concurrently. // Results returned when compactions are in progress are undefined. - Plan() ([][]string, error) + Plan(dir string) ([]string, error) // Write persists a Block into a directory. - Write(b Block) error + Write(dest string, b Block) error // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). - Compact(dirs ...string) error + Compact(dest string, dirs ...string) error } -// compactor implements the Compactor interface. -type compactor struct { +// LeveledCompactor implements the Compactor interface. +type LeveledCompactor struct { dir string metrics *compactorMetrics logger log.Logger - opts *compactorOptions + opts *LeveledCompactorOptions } type compactorMetrics struct { @@ -98,19 +98,18 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { return m } -type compactorOptions struct { +type LeveledCompactorOptions struct { blockRanges []int64 chunkPool chunks.Pool } -func NewCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor { +func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, opts *LeveledCompactorOptions) *LeveledCompactor { if opts == nil { - opts = &compactorOptions{ + opts = &LeveledCompactorOptions{ chunkPool: chunks.NewPool(), } } - return &compactor{ - dir: dir, + return &LeveledCompactor{ opts: opts, logger: l, metrics: newCompactorMetrics(r), @@ -130,8 +129,9 @@ type dirMeta struct { meta *BlockMeta } -func (c *compactor) Plan() ([][]string, error) { - dirs, err := blockDirs(c.dir) +// Plan returns a list of compactable blocks in the provided directory. +func (c *LeveledCompactor) Plan(dir string) ([]string, error) { + dirs, err := blockDirs(dir) if err != nil { return nil, err } @@ -143,7 +143,7 @@ func (c *compactor) Plan() ([][]string, error) { if err != nil { return nil, err } - if meta.Compaction.Generation > 0 { + if meta.Compaction.Level > 0 { dms = append(dms, dirMeta{dir, meta}) } } @@ -155,20 +155,12 @@ func (c *compactor) Plan() ([][]string, error) { return nil, nil } - sliceDirs := func(dms []dirMeta) [][]string { - if len(dms) == 0 { - return nil - } - var res []string - for _, dm := range dms { - res = append(res, dm.dir) - } - return [][]string{res} + var res []string + for _, dm := range c.selectDirs(dms) { + res = append(res, dm.dir) } - - planDirs := sliceDirs(c.selectDirs(dms)) - if len(dirs) > 1 { - return planDirs, nil + if len(res) > 0 { + return res, nil } // Compact any blocks that have >5% tombstones. @@ -179,7 +171,7 @@ func (c *compactor) Plan() ([][]string, error) { } if meta.Stats.NumSeries/meta.Stats.NumTombstones <= 20 { // 5% - return [][]string{{dms[i].dir}}, nil + return []string{dms[i].dir}, nil } } @@ -188,7 +180,7 @@ func (c *compactor) Plan() ([][]string, error) { // selectDirs returns the dir metas that should be compacted into a single new block. // If only a single block range is configured, the result is always nil. -func (c *compactor) selectDirs(ds []dirMeta) []dirMeta { +func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta { if len(c.opts.blockRanges) < 2 || len(ds) < 1 { return nil } @@ -267,18 +259,18 @@ func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) { sources := map[ulid.ULID]struct{}{} for _, b := range blocks { - if b.Compaction.Generation > res.Compaction.Generation { - res.Compaction.Generation = b.Compaction.Generation + if b.Compaction.Level > res.Compaction.Level { + res.Compaction.Level = b.Compaction.Level } for _, s := range b.Compaction.Sources { sources[s] = struct{}{} } // If it's an in memory block, its ULID goes into the sources. - if b.Compaction.Generation == 0 { + if b.Compaction.Level == 0 { sources[b.ULID] = struct{}{} } } - res.Compaction.Generation++ + res.Compaction.Level++ for s := range sources { res.Compaction.Sources = append(res.Compaction.Sources, s) @@ -290,7 +282,9 @@ func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) { return res } -func (c *compactor) Compact(dirs ...string) (err error) { +// Compact creates a new block in the compactor's directory from the blocks in the +// provided directories. +func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) { var blocks []Block for _, d := range dirs { @@ -306,24 +300,24 @@ func (c *compactor) Compact(dirs ...string) (err error) { entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) - return c.write(uid, blocks...) + return c.write(dest, uid, blocks...) } -func (c *compactor) Write(b Block) error { +func (c *LeveledCompactor) Write(dest string, b Block) error { // Buffering blocks might have been created that often have no data. if b.Meta().Stats.NumSeries == 0 { - return errors.Wrap(os.RemoveAll(b.Dir()), "remove empty block") + return nil } entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) - return c.write(uid, b) + return c.write(dest, uid, b) } // write creates a new block that is the union of the provided blocks into dir. // It cleans up all files of the old blocks after completing successfully. -func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { +func (c *LeveledCompactor) write(dest string, uid ulid.ULID, blocks ...Block) (err error) { c.logger.Log("msg", "compact blocks", "blocks", fmt.Sprintf("%v", blocks)) defer func(t time.Time) { @@ -334,7 +328,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { c.metrics.duration.Observe(time.Since(t).Seconds()) }(time.Now()) - dir := filepath.Join(c.dir, uid.String()) + dir := filepath.Join(dest, uid.String()) tmp := dir + ".tmp" if err = os.RemoveAll(tmp); err != nil { @@ -382,11 +376,6 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { if err := renameFile(tmp, dir); err != nil { return errors.Wrap(err, "rename block dir") } - for _, b := range blocks { - if err := os.RemoveAll(b.Dir()); err != nil { - return err - } - } // Properly sync parent dir to ensure changes are visible. df, err := fileutil.OpenDir(dir) if err != nil { @@ -403,7 +392,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { // populateBlock fills the index and chunk writers with new data gathered as the union // of the provided blocks. It returns meta information for the new block. -func (c *compactor) populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) { +func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) { var ( set compactionSet metas []BlockMeta diff --git a/compact_test.go b/compact_test.go index 91a2812ca..d9791a080 100644 --- a/compact_test.go +++ b/compact_test.go @@ -19,8 +19,8 @@ import ( "github.com/stretchr/testify/require" ) -func TestCompactionSelect(t *testing.T) { - opts := &compactorOptions{ +func TestLeveledCompactor_Select(t *testing.T) { + opts := &LeveledCompactorOptions{ blockRanges: []int64{ 20, 60, @@ -173,7 +173,7 @@ func TestCompactionSelect(t *testing.T) { }, } - c := &compactor{ + c := &LeveledCompactor{ opts: opts, } sliceDirs := func(dms []dirMeta) [][]string { diff --git a/db.go b/db.go index fe48b3ce1..8d581cdfa 100644 --- a/db.go +++ b/db.go @@ -224,7 +224,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db db.lockf = &lockf } - copts := &compactorOptions{ + copts := &LeveledCompactorOptions{ blockRanges: opts.BlockRanges, chunkPool: db.chunkPool, } @@ -242,7 +242,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db copts.blockRanges = copts.blockRanges[:len(copts.blockRanges)-1] } - db.compactor = NewCompactor(dir, r, l, copts) + db.compactor = NewLeveledCompactor(r, l, copts) if err := db.reloadBlocks(); err != nil { return nil, err @@ -390,20 +390,24 @@ func (db *DB) compact() (changes bool, err error) { default: } - if err = db.compactor.Write(h); err != nil { + if err = db.compactor.Write(db.dir, h); err != nil { return changes, errors.Wrap(err, "persist head block") } changes = true + + if err := os.RemoveAll(h.Dir()); err != nil { + return changes, errors.Wrap(err, "delete compacted head block") + } runtime.GC() } // Check for compactions of multiple blocks. for { - plans, err := db.compactor.Plan() + plan, err := db.compactor.Plan(db.dir) if err != nil { return changes, errors.Wrap(err, "plan compaction") } - if len(plans) == 0 { + if len(plan) == 0 { break } @@ -413,17 +417,17 @@ func (db *DB) compact() (changes bool, err error) { default: } - // We just execute compactions sequentially to not cause too extreme - // CPU and memory spikes. - // TODO(fabxc): return more descriptive plans in the future that allow - // estimation of resource usage and conditional parallelization? - for _, p := range plans { - if err := db.compactor.Compact(p...); err != nil { - return changes, errors.Wrapf(err, "compact %s", p) - } - changes = true - runtime.GC() + if err := db.compactor.Compact(db.dir, plan...); err != nil { + return changes, errors.Wrapf(err, "compact %s", plan) } + changes = true + + for _, pd := range plan { + if err := os.RemoveAll(pd); err != nil { + return changes, errors.Wrap(err, "delete compacted block") + } + } + runtime.GC() } return changes, nil @@ -509,7 +513,7 @@ func (db *DB) reloadBlocks() (err error) { b, ok := db.getBlock(meta.ULID) if !ok { - if meta.Compaction.Generation == 0 { + if meta.Compaction.Level == 0 { b, err = db.openHeadBlock(dir) } else { b, err = newPersistedBlock(dir, db.chunkPool) @@ -538,7 +542,7 @@ func (db *DB) reloadBlocks() (err error) { db.heads = nil for _, b := range blocks { - if b.Meta().Compaction.Generation == 0 { + if b.Meta().Compaction.Level == 0 { db.heads = append(db.heads, b.(*HeadBlock)) } } @@ -607,6 +611,9 @@ func (db *DB) EnableCompactions() { // Snapshot writes the current data to the directory. func (db *DB) Snapshot(dir string) error { + if dir == db.dir { + return errors.Errorf("cannot snapshot into base directory") + } db.cmtx.Lock() defer db.cmtx.Unlock() @@ -873,7 +880,7 @@ func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) { return nil, errors.Wrap(err, "open WAL %s") } - h, err := OpenHeadBlock(dir, log.With(db.logger, "block", dir), wal) + h, err := OpenHeadBlock(dir, log.With(db.logger, "block", dir), wal, db.compactor) if err != nil { return nil, errors.Wrapf(err, "open head block %s", dir) } diff --git a/head.go b/head.go index fae0937ec..045378d9c 100644 --- a/head.go +++ b/head.go @@ -52,9 +52,10 @@ var ( // HeadBlock handles reads and writes of time series data within a time window. type HeadBlock struct { - mtx sync.RWMutex - dir string - wal WAL + mtx sync.RWMutex + dir string + wal WAL + compactor Compactor activeWriters uint64 highTimestamp int64 @@ -106,7 +107,7 @@ func TouchHeadBlock(dir string, mint, maxt int64) (string, error) { } // OpenHeadBlock opens the head block in dir. -func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) { +func OpenHeadBlock(dir string, l log.Logger, wal WAL, c Compactor) (*HeadBlock, error) { meta, err := readMetaFile(dir) if err != nil { return nil, err @@ -115,6 +116,7 @@ func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) { h := &HeadBlock{ dir: dir, wal: wal, + compactor: c, series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil. hashes: map[uint64][]*memSeries{}, values: map[string]stringset{}, @@ -266,68 +268,14 @@ Outer: } // Snapshot persists the current state of the headblock to the given directory. -// TODO(gouthamve): Snapshot must be called when there are no active appenders. -// This has been ensured by acquiring a Lock on DB.mtx, but this limitation should -// be removed in the future. +// Callers must ensure that there are no active appenders against the block. +// DB does this by acquiring its own write lock. func (h *HeadBlock) Snapshot(snapshotDir string) error { - // if h.meta.Stats.NumSeries == 0 { - // return nil - // } + if h.meta.Stats.NumSeries == 0 { + return nil + } - // entropy := rand.New(rand.NewSource(time.Now().UnixNano())) - // uid := ulid.MustNew(ulid.Now(), entropy) - - // dir := filepath.Join(snapshotDir, uid.String()) - // tmp := dir + ".tmp" - - // if err := os.RemoveAll(tmp); err != nil { - // return err - // } - - // if err := os.MkdirAll(tmp, 0777); err != nil { - // return err - // } - - // // Populate chunk and index files into temporary directory with - // // data of all blocks. - // chunkw, err := newChunkWriter(chunkDir(tmp)) - // if err != nil { - // return errors.Wrap(err, "open chunk writer") - // } - // indexw, err := newIndexWriter(tmp) - // if err != nil { - // return errors.Wrap(err, "open index writer") - // } - - // meta, err := h.compactor.populateBlock([]Block{h}, indexw, chunkw, nil) - // if err != nil { - // return errors.Wrap(err, "write snapshot") - // } - // meta.ULID = uid - // meta.MaxTime = h.highTimestamp - - // if err = writeMetaFile(tmp, meta); err != nil { - // return errors.Wrap(err, "write merged meta") - // } - - // if err = chunkw.Close(); err != nil { - // return errors.Wrap(err, "close chunk writer") - // } - // if err = indexw.Close(); err != nil { - // return errors.Wrap(err, "close index writer") - // } - - // // Create an empty tombstones file. - // if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil { - // return errors.Wrap(err, "write new tombstones file") - // } - - // // Block successfully written, make visible - // if err := renameFile(tmp, dir); err != nil { - // return errors.Wrap(err, "rename block dir") - // } - - return nil + return h.compactor.Write(snapshotDir, h) } // Dir returns the directory of the block. diff --git a/head_test.go b/head_test.go index c86c40768..eee730774 100644 --- a/head_test.go +++ b/head_test.go @@ -43,7 +43,7 @@ func openTestHeadBlock(t testing.TB, dir string) *HeadBlock { wal, err := OpenSegmentWAL(dir, nil, 5*time.Second) require.NoError(t, err) - h, err := OpenHeadBlock(dir, nil, wal) + h, err := OpenHeadBlock(dir, nil, wal, nil) require.NoError(t, err) return h }