Use already open blocks while compacting. (#441)

This roughly halves the RAM requirements of compaction.

Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
Brian Brazil 2018-11-15 12:20:54 +00:00 committed by GitHub
parent 3385571ddf
commit 41b54585d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 28 additions and 10 deletions

View File

@ -59,7 +59,9 @@ type Compactor interface {
// Compact runs compaction against the provided directories. Must // Compact runs compaction against the provided directories. Must
// only be called concurrently with results of Plan(). // only be called concurrently with results of Plan().
Compact(dest string, dirs ...string) (ulid.ULID, error) // Can optionally pass a list of already open blocks,
// to avoid having to reopen them.
Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error)
} }
// LeveledCompactor implements the Compactor interface. // LeveledCompactor implements the Compactor interface.
@ -317,26 +319,41 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
// Compact creates a new block in the compactor's directory from the blocks in the // Compact creates a new block in the compactor's directory from the blocks in the
// provided directories. // provided directories.
func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID, err error) { func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) {
var ( var (
blocks []BlockReader blocks []BlockReader
bs []*Block bs []*Block
metas []*BlockMeta metas []*BlockMeta
uids []string uids []string
) )
start := time.Now()
for _, d := range dirs { for _, d := range dirs {
b, err := OpenBlock(d, c.chunkPool)
if err != nil {
return uid, err
}
defer b.Close()
meta, err := readMetaFile(d) meta, err := readMetaFile(d)
if err != nil { if err != nil {
return uid, err return uid, err
} }
var b *Block
// Use already open blocks if we can, to avoid
// having the index data in memory twice.
for _, o := range open {
if meta.ULID == o.Meta().ULID {
b = o
break
}
}
if b == nil {
var err error
b, err = OpenBlock(d, c.chunkPool)
if err != nil {
return uid, err
}
defer b.Close()
}
metas = append(metas, meta) metas = append(metas, meta)
blocks = append(blocks, b) blocks = append(blocks, b)
bs = append(bs, b) bs = append(bs, b)
@ -356,6 +373,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID,
"maxt", meta.MaxTime, "maxt", meta.MaxTime,
"ulid", meta.ULID, "ulid", meta.ULID,
"sources", fmt.Sprintf("%v", uids), "sources", fmt.Sprintf("%v", uids),
"duration", time.Since(start),
) )
return uid, nil return uid, nil
} }

2
db.go
View File

@ -429,7 +429,7 @@ func (db *DB) compact() (err error) {
default: default:
} }
if _, err := db.compactor.Compact(db.dir, plan...); err != nil { if _, err := db.compactor.Compact(db.dir, plan, db.blocks); err != nil {
return errors.Wrapf(err, "compact %s", plan) return errors.Wrapf(err, "compact %s", plan)
} }
runtime.GC() runtime.GC()

View File

@ -876,7 +876,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6
return block.Meta().ULID, nil return block.Meta().ULID, nil
} }
func (*mockCompactorFailing) Compact(dest string, dirs ...string) (ulid.ULID, error) { func (*mockCompactorFailing) Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error) {
return ulid.ULID{}, nil return ulid.ULID{}, nil
} }