From 41b54585d9b6afd995419e77265a5e63c607e95e Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Thu, 15 Nov 2018 12:20:54 +0000 Subject: [PATCH] Use already open blocks while compacting. (#441) This roughly halves the RAM requirements of compaction. Signed-off-by: Brian Brazil --- compact.go | 34 ++++++++++++++++++++++++++-------- db.go | 2 +- db_test.go | 2 +- 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/compact.go b/compact.go index ca027ad20..f8e6ff545 100644 --- a/compact.go +++ b/compact.go @@ -59,7 +59,9 @@ type Compactor interface { // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). - Compact(dest string, dirs ...string) (ulid.ULID, error) + // Can optionally pass a list of already open blocks, + // to avoid having to reopen them. + Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error) } // LeveledCompactor implements the Compactor interface. @@ -317,26 +319,41 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { // Compact creates a new block in the compactor's directory from the blocks in the // provided directories. -func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID, err error) { +func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) { var ( blocks []BlockReader bs []*Block metas []*BlockMeta uids []string ) + start := time.Now() for _, d := range dirs { - b, err := OpenBlock(d, c.chunkPool) - if err != nil { - return uid, err - } - defer b.Close() - meta, err := readMetaFile(d) if err != nil { return uid, err } + var b *Block + + // Use already open blocks if we can, to avoid + // having the index data in memory twice. + for _, o := range open { + if meta.ULID == o.Meta().ULID { + b = o + break + } + } + + if b == nil { + var err error + b, err = OpenBlock(d, c.chunkPool) + if err != nil { + return uid, err + } + defer b.Close() + } + metas = append(metas, meta) blocks = append(blocks, b) bs = append(bs, b) @@ -356,6 +373,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID, "maxt", meta.MaxTime, "ulid", meta.ULID, "sources", fmt.Sprintf("%v", uids), + "duration", time.Since(start), ) return uid, nil } diff --git a/db.go b/db.go index 8f452137e..731e6191a 100644 --- a/db.go +++ b/db.go @@ -429,7 +429,7 @@ func (db *DB) compact() (err error) { default: } - if _, err := db.compactor.Compact(db.dir, plan...); err != nil { + if _, err := db.compactor.Compact(db.dir, plan, db.blocks); err != nil { return errors.Wrapf(err, "compact %s", plan) } runtime.GC() diff --git a/db_test.go b/db_test.go index 014a126c2..46cc5cad6 100644 --- a/db_test.go +++ b/db_test.go @@ -876,7 +876,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6 return block.Meta().ULID, nil } -func (*mockCompactorFailing) Compact(dest string, dirs ...string) (ulid.ULID, error) { +func (*mockCompactorFailing) Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error) { return ulid.ULID{}, nil }