Refactor compactor

This commit is contained in:
Fabian Reinartz 2017-08-09 11:10:29 +02:00
parent 66ff7b12e9
commit 905af27cf9
7 changed files with 80 additions and 133 deletions

View File

@ -113,7 +113,7 @@ type BlockStats struct {
type BlockMetaCompaction struct {
// Maximum number of compaction cycles any source block has
// gone through.
Generation int `json:"generation"`
Level int `json:"level"`
// ULIDs of all source head blocks that went into the block.
Sources []ulid.ULID `json:"sources,omitempty"`
}

View File

@ -104,6 +104,9 @@ func (p *pool) Put(c Chunk) error {
switch c.Encoding() {
case EncXOR:
xc, ok := c.(*XORChunk)
// This may happen often with wrapped chunks. Nothing we can really do about
// it but returning an error would cause a lot of allocations again. Thus,
// we just skip it.
if !ok {
return nil
}

View File

@ -48,22 +48,22 @@ type Compactor interface {
// Plan returns a set of non-overlapping directories that can
// be compacted concurrently.
// Results returned when compactions are in progress are undefined.
Plan() ([][]string, error)
Plan(dir string) ([]string, error)
// Write persists a Block into a directory.
Write(b Block) error
Write(dest string, b Block) error
// Compact runs compaction against the provided directories. Must
// only be called concurrently with results of Plan().
Compact(dirs ...string) error
Compact(dest string, dirs ...string) error
}
// compactor implements the Compactor interface.
type compactor struct {
// LeveledCompactor implements the Compactor interface.
type LeveledCompactor struct {
dir string
metrics *compactorMetrics
logger log.Logger
opts *compactorOptions
opts *LeveledCompactorOptions
}
type compactorMetrics struct {
@ -98,19 +98,18 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
return m
}
type compactorOptions struct {
type LeveledCompactorOptions struct {
blockRanges []int64
chunkPool chunks.Pool
}
func NewCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor {
func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, opts *LeveledCompactorOptions) *LeveledCompactor {
if opts == nil {
opts = &compactorOptions{
opts = &LeveledCompactorOptions{
chunkPool: chunks.NewPool(),
}
}
return &compactor{
dir: dir,
return &LeveledCompactor{
opts: opts,
logger: l,
metrics: newCompactorMetrics(r),
@ -130,8 +129,9 @@ type dirMeta struct {
meta *BlockMeta
}
func (c *compactor) Plan() ([][]string, error) {
dirs, err := blockDirs(c.dir)
// Plan returns a list of compactable blocks in the provided directory.
func (c *LeveledCompactor) Plan(dir string) ([]string, error) {
dirs, err := blockDirs(dir)
if err != nil {
return nil, err
}
@ -143,7 +143,7 @@ func (c *compactor) Plan() ([][]string, error) {
if err != nil {
return nil, err
}
if meta.Compaction.Generation > 0 {
if meta.Compaction.Level > 0 {
dms = append(dms, dirMeta{dir, meta})
}
}
@ -155,20 +155,12 @@ func (c *compactor) Plan() ([][]string, error) {
return nil, nil
}
sliceDirs := func(dms []dirMeta) [][]string {
if len(dms) == 0 {
return nil
}
var res []string
for _, dm := range dms {
res = append(res, dm.dir)
}
return [][]string{res}
var res []string
for _, dm := range c.selectDirs(dms) {
res = append(res, dm.dir)
}
planDirs := sliceDirs(c.selectDirs(dms))
if len(dirs) > 1 {
return planDirs, nil
if len(res) > 0 {
return res, nil
}
// Compact any blocks that have >5% tombstones.
@ -179,7 +171,7 @@ func (c *compactor) Plan() ([][]string, error) {
}
if meta.Stats.NumSeries/meta.Stats.NumTombstones <= 20 { // 5%
return [][]string{{dms[i].dir}}, nil
return []string{dms[i].dir}, nil
}
}
@ -188,7 +180,7 @@ func (c *compactor) Plan() ([][]string, error) {
// selectDirs returns the dir metas that should be compacted into a single new block.
// If only a single block range is configured, the result is always nil.
func (c *compactor) selectDirs(ds []dirMeta) []dirMeta {
func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {
if len(c.opts.blockRanges) < 2 || len(ds) < 1 {
return nil
}
@ -267,18 +259,18 @@ func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) {
sources := map[ulid.ULID]struct{}{}
for _, b := range blocks {
if b.Compaction.Generation > res.Compaction.Generation {
res.Compaction.Generation = b.Compaction.Generation
if b.Compaction.Level > res.Compaction.Level {
res.Compaction.Level = b.Compaction.Level
}
for _, s := range b.Compaction.Sources {
sources[s] = struct{}{}
}
// If it's an in memory block, its ULID goes into the sources.
if b.Compaction.Generation == 0 {
if b.Compaction.Level == 0 {
sources[b.ULID] = struct{}{}
}
}
res.Compaction.Generation++
res.Compaction.Level++
for s := range sources {
res.Compaction.Sources = append(res.Compaction.Sources, s)
@ -290,7 +282,9 @@ func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) {
return res
}
func (c *compactor) Compact(dirs ...string) (err error) {
// Compact creates a new block in the compactor's directory from the blocks in the
// provided directories.
func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) {
var blocks []Block
for _, d := range dirs {
@ -306,24 +300,24 @@ func (c *compactor) Compact(dirs ...string) (err error) {
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy)
return c.write(uid, blocks...)
return c.write(dest, uid, blocks...)
}
func (c *compactor) Write(b Block) error {
func (c *LeveledCompactor) Write(dest string, b Block) error {
// Buffering blocks might have been created that often have no data.
if b.Meta().Stats.NumSeries == 0 {
return errors.Wrap(os.RemoveAll(b.Dir()), "remove empty block")
return nil
}
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy)
return c.write(uid, b)
return c.write(dest, uid, b)
}
// write creates a new block that is the union of the provided blocks into dir.
// It cleans up all files of the old blocks after completing successfully.
func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
func (c *LeveledCompactor) write(dest string, uid ulid.ULID, blocks ...Block) (err error) {
c.logger.Log("msg", "compact blocks", "blocks", fmt.Sprintf("%v", blocks))
defer func(t time.Time) {
@ -334,7 +328,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
c.metrics.duration.Observe(time.Since(t).Seconds())
}(time.Now())
dir := filepath.Join(c.dir, uid.String())
dir := filepath.Join(dest, uid.String())
tmp := dir + ".tmp"
if err = os.RemoveAll(tmp); err != nil {
@ -382,11 +376,6 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
if err := renameFile(tmp, dir); err != nil {
return errors.Wrap(err, "rename block dir")
}
for _, b := range blocks {
if err := os.RemoveAll(b.Dir()); err != nil {
return err
}
}
// Properly sync parent dir to ensure changes are visible.
df, err := fileutil.OpenDir(dir)
if err != nil {
@ -403,7 +392,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
// populateBlock fills the index and chunk writers with new data gathered as the union
// of the provided blocks. It returns meta information for the new block.
func (c *compactor) populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) {
func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) {
var (
set compactionSet
metas []BlockMeta

View File

@ -19,8 +19,8 @@ import (
"github.com/stretchr/testify/require"
)
func TestCompactionSelect(t *testing.T) {
opts := &compactorOptions{
func TestLeveledCompactor_Select(t *testing.T) {
opts := &LeveledCompactorOptions{
blockRanges: []int64{
20,
60,
@ -173,7 +173,7 @@ func TestCompactionSelect(t *testing.T) {
},
}
c := &compactor{
c := &LeveledCompactor{
opts: opts,
}
sliceDirs := func(dms []dirMeta) [][]string {

43
db.go
View File

@ -224,7 +224,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
db.lockf = &lockf
}
copts := &compactorOptions{
copts := &LeveledCompactorOptions{
blockRanges: opts.BlockRanges,
chunkPool: db.chunkPool,
}
@ -242,7 +242,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
copts.blockRanges = copts.blockRanges[:len(copts.blockRanges)-1]
}
db.compactor = NewCompactor(dir, r, l, copts)
db.compactor = NewLeveledCompactor(r, l, copts)
if err := db.reloadBlocks(); err != nil {
return nil, err
@ -390,20 +390,24 @@ func (db *DB) compact() (changes bool, err error) {
default:
}
if err = db.compactor.Write(h); err != nil {
if err = db.compactor.Write(db.dir, h); err != nil {
return changes, errors.Wrap(err, "persist head block")
}
changes = true
if err := os.RemoveAll(h.Dir()); err != nil {
return changes, errors.Wrap(err, "delete compacted head block")
}
runtime.GC()
}
// Check for compactions of multiple blocks.
for {
plans, err := db.compactor.Plan()
plan, err := db.compactor.Plan(db.dir)
if err != nil {
return changes, errors.Wrap(err, "plan compaction")
}
if len(plans) == 0 {
if len(plan) == 0 {
break
}
@ -413,17 +417,17 @@ func (db *DB) compact() (changes bool, err error) {
default:
}
// We just execute compactions sequentially to not cause too extreme
// CPU and memory spikes.
// TODO(fabxc): return more descriptive plans in the future that allow
// estimation of resource usage and conditional parallelization?
for _, p := range plans {
if err := db.compactor.Compact(p...); err != nil {
return changes, errors.Wrapf(err, "compact %s", p)
}
changes = true
runtime.GC()
if err := db.compactor.Compact(db.dir, plan...); err != nil {
return changes, errors.Wrapf(err, "compact %s", plan)
}
changes = true
for _, pd := range plan {
if err := os.RemoveAll(pd); err != nil {
return changes, errors.Wrap(err, "delete compacted block")
}
}
runtime.GC()
}
return changes, nil
@ -509,7 +513,7 @@ func (db *DB) reloadBlocks() (err error) {
b, ok := db.getBlock(meta.ULID)
if !ok {
if meta.Compaction.Generation == 0 {
if meta.Compaction.Level == 0 {
b, err = db.openHeadBlock(dir)
} else {
b, err = newPersistedBlock(dir, db.chunkPool)
@ -538,7 +542,7 @@ func (db *DB) reloadBlocks() (err error) {
db.heads = nil
for _, b := range blocks {
if b.Meta().Compaction.Generation == 0 {
if b.Meta().Compaction.Level == 0 {
db.heads = append(db.heads, b.(*HeadBlock))
}
}
@ -607,6 +611,9 @@ func (db *DB) EnableCompactions() {
// Snapshot writes the current data to the directory.
func (db *DB) Snapshot(dir string) error {
if dir == db.dir {
return errors.Errorf("cannot snapshot into base directory")
}
db.cmtx.Lock()
defer db.cmtx.Unlock()
@ -873,7 +880,7 @@ func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) {
return nil, errors.Wrap(err, "open WAL %s")
}
h, err := OpenHeadBlock(dir, log.With(db.logger, "block", dir), wal)
h, err := OpenHeadBlock(dir, log.With(db.logger, "block", dir), wal, db.compactor)
if err != nil {
return nil, errors.Wrapf(err, "open head block %s", dir)
}

76
head.go
View File

@ -52,9 +52,10 @@ var (
// HeadBlock handles reads and writes of time series data within a time window.
type HeadBlock struct {
mtx sync.RWMutex
dir string
wal WAL
mtx sync.RWMutex
dir string
wal WAL
compactor Compactor
activeWriters uint64
highTimestamp int64
@ -106,7 +107,7 @@ func TouchHeadBlock(dir string, mint, maxt int64) (string, error) {
}
// OpenHeadBlock opens the head block in dir.
func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) {
func OpenHeadBlock(dir string, l log.Logger, wal WAL, c Compactor) (*HeadBlock, error) {
meta, err := readMetaFile(dir)
if err != nil {
return nil, err
@ -115,6 +116,7 @@ func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) {
h := &HeadBlock{
dir: dir,
wal: wal,
compactor: c,
series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil.
hashes: map[uint64][]*memSeries{},
values: map[string]stringset{},
@ -266,68 +268,14 @@ Outer:
}
// Snapshot persists the current state of the headblock to the given directory.
// TODO(gouthamve): Snapshot must be called when there are no active appenders.
// This has been ensured by acquiring a Lock on DB.mtx, but this limitation should
// be removed in the future.
// Callers must ensure that there are no active appenders against the block.
// DB does this by acquiring its own write lock.
func (h *HeadBlock) Snapshot(snapshotDir string) error {
// if h.meta.Stats.NumSeries == 0 {
// return nil
// }
if h.meta.Stats.NumSeries == 0 {
return nil
}
// entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
// uid := ulid.MustNew(ulid.Now(), entropy)
// dir := filepath.Join(snapshotDir, uid.String())
// tmp := dir + ".tmp"
// if err := os.RemoveAll(tmp); err != nil {
// return err
// }
// if err := os.MkdirAll(tmp, 0777); err != nil {
// return err
// }
// // Populate chunk and index files into temporary directory with
// // data of all blocks.
// chunkw, err := newChunkWriter(chunkDir(tmp))
// if err != nil {
// return errors.Wrap(err, "open chunk writer")
// }
// indexw, err := newIndexWriter(tmp)
// if err != nil {
// return errors.Wrap(err, "open index writer")
// }
// meta, err := h.compactor.populateBlock([]Block{h}, indexw, chunkw, nil)
// if err != nil {
// return errors.Wrap(err, "write snapshot")
// }
// meta.ULID = uid
// meta.MaxTime = h.highTimestamp
// if err = writeMetaFile(tmp, meta); err != nil {
// return errors.Wrap(err, "write merged meta")
// }
// if err = chunkw.Close(); err != nil {
// return errors.Wrap(err, "close chunk writer")
// }
// if err = indexw.Close(); err != nil {
// return errors.Wrap(err, "close index writer")
// }
// // Create an empty tombstones file.
// if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil {
// return errors.Wrap(err, "write new tombstones file")
// }
// // Block successfully written, make visible
// if err := renameFile(tmp, dir); err != nil {
// return errors.Wrap(err, "rename block dir")
// }
return nil
return h.compactor.Write(snapshotDir, h)
}
// Dir returns the directory of the block.

View File

@ -43,7 +43,7 @@ func openTestHeadBlock(t testing.TB, dir string) *HeadBlock {
wal, err := OpenSegmentWAL(dir, nil, 5*time.Second)
require.NoError(t, err)
h, err := OpenHeadBlock(dir, nil, wal)
h, err := OpenHeadBlock(dir, nil, wal, nil)
require.NoError(t, err)
return h
}