compaction: add fast-path for compacting mem blocks
This commit is contained in:
parent
c20cc44b06
commit
a61a31a5d7
47
compact.go
47
compact.go
|
@ -50,7 +50,6 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
|
||||||
|
|
||||||
type compactorOptions struct {
|
type compactorOptions struct {
|
||||||
maxBlockRange uint64
|
maxBlockRange uint64
|
||||||
maxSize uint64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCompactor(r prometheus.Registerer, opts *compactorOptions) *compactor {
|
func newCompactor(r prometheus.Registerer, opts *compactorOptions) *compactor {
|
||||||
|
@ -65,29 +64,51 @@ type compactionInfo struct {
|
||||||
mint, maxt int64
|
mint, maxt int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// pick returns a range [i, j] in the blocks that are suitable to be compacted
|
const compactionBlocksLen = 4
|
||||||
|
|
||||||
|
// pick returns a range [i, j) in the blocks that are suitable to be compacted
|
||||||
// into a single block at position i.
|
// into a single block at position i.
|
||||||
func (c *compactor) pick(bs []compactionInfo) (i, j int, ok bool) {
|
func (c *compactor) pick(bs []compactionInfo) (i, j int, ok bool) {
|
||||||
|
|
||||||
last := len(bs) - 1
|
|
||||||
if len(bs) == 0 {
|
if len(bs) == 0 {
|
||||||
return 0, 0, false
|
return 0, 0, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure we always compact the last block if unpersisted.
|
// First, we always compact pending in-memory blocks – oldest first.
|
||||||
if bs[last].generation == 0 {
|
for i, b := range bs {
|
||||||
if len(bs) >= 3 && c.match(bs[last-2:last+1]) {
|
if b.generation > 0 {
|
||||||
return last - 2, last, true
|
continue
|
||||||
}
|
}
|
||||||
return last, last, true
|
// Directly compact into 2nd generation with previous generation 1 blocks.
|
||||||
|
if i+1 >= compactionBlocksLen {
|
||||||
|
match := true
|
||||||
|
for _, pb := range bs[i-compactionBlocksLen+1 : i] {
|
||||||
|
match = match && pb.generation == 1
|
||||||
|
}
|
||||||
|
if match {
|
||||||
|
return i - compactionBlocksLen + 1, i + 1, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// If we have enough generation 0 blocks to directly move to the
|
||||||
|
// 2nd generation, skip generation 1.
|
||||||
|
if len(bs)-i >= compactionBlocksLen {
|
||||||
|
// Guard against the newly compacted block becoming larger than
|
||||||
|
// the previous one.
|
||||||
|
if i == 0 || bs[i-1].generation >= 2 {
|
||||||
|
return i, i + compactionBlocksLen, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// No optimizations possible, naiively compact the new block.
|
||||||
|
return i, i + 1, true
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := len(bs); i-3 >= 0; i -= 3 {
|
// Then we care about compacting multiple blocks, starting with the oldest.
|
||||||
tpl := bs[i-3 : i]
|
for i := 0; i < len(bs)-compactionBlocksLen; i += compactionBlocksLen {
|
||||||
if c.match(tpl) {
|
if c.match(bs[i : i+2]) {
|
||||||
return i - 3, i - 1, true
|
return i, i + compactionBlocksLen, true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0, 0, false
|
return 0, 0, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
13
db.go
13
db.go
|
@ -140,7 +140,6 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) {
|
||||||
}
|
}
|
||||||
db.compactor = newCompactor(r, &compactorOptions{
|
db.compactor = newCompactor(r, &compactorOptions{
|
||||||
maxBlockRange: opts.MaxBlockRange,
|
maxBlockRange: opts.MaxBlockRange,
|
||||||
maxSize: 1 << 29, // 512MB
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if err := db.initBlocks(); err != nil {
|
if err := db.initBlocks(); err != nil {
|
||||||
|
@ -202,7 +201,7 @@ func (db *DB) run() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
db.logger.Log("msg", "picked", "i", i, "j", j)
|
db.logger.Log("msg", "picked", "i", i, "j", j)
|
||||||
for k := i; k <= j; k++ {
|
for k := i; k < j; k++ {
|
||||||
db.logger.Log("k", k, "generation", infos[k].generation)
|
db.logger.Log("k", k, "generation", infos[k].generation)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -230,10 +229,10 @@ func (db *DB) getBlock(i int) Block {
|
||||||
return db.heads[i-len(db.persisted)]
|
return db.heads[i-len(db.persisted)]
|
||||||
}
|
}
|
||||||
|
|
||||||
// removeBlocks removes the blocks in range [i, j] from the list of persisted
|
// removeBlocks removes the blocks in range [i, j) from the list of persisted
|
||||||
// and head blocks. The blocks are not closed and their files not deleted.
|
// and head blocks. The blocks are not closed and their files not deleted.
|
||||||
func (db *DB) removeBlocks(i, j int) {
|
func (db *DB) removeBlocks(i, j int) {
|
||||||
for k := i; k <= j; k++ {
|
for k := i; k < j; k++ {
|
||||||
if i < len(db.persisted) {
|
if i < len(db.persisted) {
|
||||||
db.persisted = append(db.persisted[:i], db.persisted[i+1:]...)
|
db.persisted = append(db.persisted[:i], db.persisted[i+1:]...)
|
||||||
} else {
|
} else {
|
||||||
|
@ -253,14 +252,14 @@ func (db *DB) blocks() (bs []Block) {
|
||||||
return bs
|
return bs
|
||||||
}
|
}
|
||||||
|
|
||||||
// compact block in range [i, j] into a temporary directory and atomically
|
// compact block in range [i, j) into a temporary directory and atomically
|
||||||
// swap the blocks out on successful completion.
|
// swap the blocks out on successful completion.
|
||||||
func (db *DB) compact(i, j int) error {
|
func (db *DB) compact(i, j int) error {
|
||||||
if j < i {
|
if j <= i {
|
||||||
return errors.New("invalid compaction block range")
|
return errors.New("invalid compaction block range")
|
||||||
}
|
}
|
||||||
var blocks []Block
|
var blocks []Block
|
||||||
for k := i; k <= j; k++ {
|
for k := i; k < j; k++ {
|
||||||
blocks = append(blocks, db.getBlock(k))
|
blocks = append(blocks, db.getBlock(k))
|
||||||
}
|
}
|
||||||
var (
|
var (
|
||||||
|
|
Loading…
Reference in New Issue