From 710f3f418fe9ad5b5ed43e8c807be3878a7b8ecf Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 3 Mar 2017 10:06:45 +0100 Subject: [PATCH] vendor: tsdb compaction changes --- vendor/github.com/fabxc/tsdb/compact.go | 179 +++++++++------ vendor/github.com/fabxc/tsdb/db.go | 282 ++++++++++++------------ vendor/github.com/fabxc/tsdb/head.go | 15 +- vendor/github.com/fabxc/tsdb/wal.go | 2 +- vendor/vendor.json | 6 +- 5 files changed, 269 insertions(+), 215 deletions(-) diff --git a/vendor/github.com/fabxc/tsdb/compact.go b/vendor/github.com/fabxc/tsdb/compact.go index b58acba43..51d4ce0d1 100644 --- a/vendor/github.com/fabxc/tsdb/compact.go +++ b/vendor/github.com/fabxc/tsdb/compact.go @@ -13,6 +13,23 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// Compactor provides compaction against an underlying storage +// of time series data. +type Compactor interface { + // Plan returns a set of non-overlapping directories that can + // be compacted concurrently. + // Results returned when compactions are in progress are undefined. + Plan(dir string) ([][]string, error) + + // Write persists a Block into a directory. + Write(dir string, b Block) error + + // Compact runs compaction against the provided directories. Must + // only be called concurrently with results of Plan(). + Compact(dirs ...string) error +} + +// compactor implements the Compactor interface. type compactor struct { metrics *compactorMetrics opts *compactorOptions @@ -69,61 +86,55 @@ type compactionInfo struct { const compactionBlocksLen = 3 -// pick returns a range [i, j) in the blocks that are suitable to be compacted -// into a single block at position i. -func (c *compactor) pick(bs []compactionInfo) (i, j int, ok bool) { - if len(bs) == 0 { - return 0, 0, false +func (c *compactor) Plan(dir string) ([][]string, error) { + dirs, err := blockDirs(dir) + if err != nil { + return nil, err } - // First, we always compact pending in-memory blocks – oldest first. - for i, b := range bs { - if b.generation > 0 { - continue - } - // Directly compact into 2nd generation with previous generation 1 blocks. - if i+1 >= compactionBlocksLen { - match := true - for _, pb := range bs[i-compactionBlocksLen+1 : i] { - match = match && pb.generation == 1 - } - if match { - return i - compactionBlocksLen + 1, i + 1, true - } - } - // If we have enough generation 0 blocks to directly move to the - // 2nd generation, skip generation 1. - if len(bs)-i >= compactionBlocksLen { - // Guard against the newly compacted block becoming larger than - // the previous one. - if i == 0 || bs[i-1].generation >= 2 { - return i, i + compactionBlocksLen, true - } - } + var bs []*BlockMeta - // No optimizations possible, naiively compact the new block. - return i, i + 1, true + for _, dir := range dirs { + meta, err := readMetaFile(dir) + if err != nil { + return nil, err + } + if meta.Compaction.Generation > 0 { + bs = append(bs, meta) + } + } + + if len(bs) == 0 { + return nil, nil + } + + sliceDirs := func(i, j int) [][]string { + var res []string + for k := i; k < j; k++ { + res = append(res, dirs[k]) + } + return [][]string{res} } // Then we care about compacting multiple blocks, starting with the oldest. - for i := 0; i < len(bs)-compactionBlocksLen+1; i += compactionBlocksLen { + for i := 0; i < len(bs)-compactionBlocksLen+1; i++ { if c.match(bs[i : i+3]) { - return i, i + compactionBlocksLen, true + return sliceDirs(i, i+compactionBlocksLen), nil } } - return 0, 0, false + return nil, nil } -func (c *compactor) match(bs []compactionInfo) bool { - g := bs[0].generation +func (c *compactor) match(bs []*BlockMeta) bool { + g := bs[0].Compaction.Generation for _, b := range bs { - if b.generation != g { + if b.Compaction.Generation != g { return false } } - return uint64(bs[len(bs)-1].maxt-bs[0].mint) <= c.opts.maxBlockRange + return uint64(bs[len(bs)-1].MaxTime-bs[0].MinTime) <= c.opts.maxBlockRange } var entropy = rand.New(rand.NewSource(time.Now().UnixNano())) @@ -136,11 +147,7 @@ func mergeBlockMetas(blocks ...Block) (res BlockMeta) { res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime res.ULID = ulid.MustNew(ulid.Now(), entropy) - g := m0.Compaction.Generation - if g == 0 && len(blocks) > 1 { - g++ - } - res.Compaction.Generation = g + 1 + res.Compaction.Generation = m0.Compaction.Generation + 1 for _, b := range blocks { res.Stats.NumSamples += b.Meta().Stats.NumSamples @@ -148,35 +155,62 @@ func mergeBlockMetas(blocks ...Block) (res BlockMeta) { return res } -func (c *compactor) compact(dir string, blocks ...Block) (err error) { - start := time.Now() - defer func() { +func (c *compactor) Compact(dirs ...string) (err error) { + var blocks []Block + + for _, d := range dirs { + b, err := newPersistedBlock(d) + if err != nil { + return err + } + blocks = append(blocks, b) + } + + return c.write(dirs[0], blocks...) +} + +func (c *compactor) Write(dir string, b Block) error { + return c.write(dir, b) +} + +// write creates a new block that is the union of the provided blocks into dir. +// It cleans up all files of the old blocks after completing successfully. +func (c *compactor) write(dir string, blocks ...Block) (err error) { + defer func(t time.Time) { if err != nil { c.metrics.failed.Inc() } - c.metrics.duration.Observe(time.Since(start).Seconds()) - }() + c.metrics.duration.Observe(time.Since(t).Seconds()) + }(time.Now()) - if err = os.RemoveAll(dir); err != nil { + tmp := dir + ".tmp" + + if err = os.RemoveAll(tmp); err != nil { return err } - if err = os.MkdirAll(dir, 0777); err != nil { + if err = os.MkdirAll(tmp, 0777); err != nil { return err } - chunkw, err := newChunkWriter(chunkDir(dir)) + // Populate chunk and index files into temporary directory with + // data of all blocks. + chunkw, err := newChunkWriter(chunkDir(tmp)) if err != nil { return errors.Wrap(err, "open chunk writer") } - indexw, err := newIndexWriter(dir) + indexw, err := newIndexWriter(tmp) if err != nil { return errors.Wrap(err, "open index writer") } - if err = c.write(dir, blocks, indexw, chunkw); err != nil { + meta, err := c.populate(blocks, indexw, chunkw) + if err != nil { return errors.Wrap(err, "write compaction") } + if err = writeMetaFile(tmp, meta); err != nil { + return errors.Wrap(err, "write merged meta") + } if err = chunkw.Close(); err != nil { return errors.Wrap(err, "close chunk writer") @@ -184,16 +218,37 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { if err = indexw.Close(); err != nil { return errors.Wrap(err, "close index writer") } + + // Block successfully written, make visible and remove old ones. + if err := renameFile(tmp, dir); err != nil { + return errors.Wrap(err, "rename block dir") + } + for _, b := range blocks[1:] { + if err := os.RemoveAll(b.Dir()); err != nil { + return err + } + } + // Properly sync parent dir to ensure changes are visible. + df, err := fileutil.OpenDir(dir) + if err != nil { + return errors.Wrap(err, "sync block dir") + } + if err := fileutil.Fsync(df); err != nil { + return errors.Wrap(err, "sync block dir") + } + return nil } -func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw ChunkWriter) error { +// populate fills the index and chunk writers with new data gathered as the union +// of the provided blocks. It returns meta information for the new block. +func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) { var set compactionSet for i, b := range blocks { all, err := b.Index().Postings("", "") if err != nil { - return err + return nil, err } // TODO(fabxc): find more transparent way of handling this. if hb, ok := b.(*headBlock); ok { @@ -207,7 +262,7 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw } set, err = newCompactionMerger(set, s) if err != nil { - return err + return nil, err } } @@ -222,7 +277,7 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw for set.Next() { lset, chunks := set.At() if err := chunkw.WriteChunks(chunks...); err != nil { - return err + return nil, err } indexw.AddSeries(i, lset, chunks...) @@ -243,7 +298,7 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw i++ } if set.Err() != nil { - return set.Err() + return nil, set.Err() } s := make([]string, 0, 256) @@ -254,13 +309,13 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw s = append(s, x) } if err := indexw.WriteLabelIndex([]string{n}, s); err != nil { - return err + return nil, err } } for t := range postings.m { if err := indexw.WritePostings(t.name, t.value, postings.get(t)); err != nil { - return err + return nil, err } } // Write a postings list containing all series. @@ -269,10 +324,10 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw all[i] = uint32(i) } if err := indexw.WritePostings("", "", newListPostings(all)); err != nil { - return err + return nil, err } - return writeMetaFile(dir, &meta) + return &meta, nil } type compactionSet interface { diff --git a/vendor/github.com/fabxc/tsdb/db.go b/vendor/github.com/fabxc/tsdb/db.go index f967b044c..064dd2e2a 100644 --- a/vendor/github.com/fabxc/tsdb/db.go +++ b/vendor/github.com/fabxc/tsdb/db.go @@ -96,9 +96,10 @@ type DB struct { mtx sync.RWMutex persisted []*persistedBlock heads []*headBlock + seqBlocks map[int]Block headGen uint8 - compactor *compactor + compactor Compactor compactc chan struct{} donec chan struct{} @@ -175,10 +176,9 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db maxBlockRange: opts.MaxBlockDuration, }) - if err := db.initBlocks(); err != nil { + if err := db.reloadBlocks(); err != nil { return nil, err } - go db.run() return db, nil @@ -200,35 +200,8 @@ func (db *DB) run() { case <-db.compactc: db.metrics.compactionsTriggered.Inc() - var seqs []int - var infos []compactionInfo - for _, b := range db.compactable() { - m := b.Meta() - - infos = append(infos, compactionInfo{ - generation: m.Compaction.Generation, - mint: m.MinTime, - maxt: m.MaxTime, - seq: m.Sequence, - }) - seqs = append(seqs, m.Sequence) - } - - i, j, ok := db.compactor.pick(infos) - if !ok { - continue - } - db.logger.Log("msg", "compact", "seqs", fmt.Sprintf("%v", seqs[i:j])) - - if err := db.compact(i, j); err != nil { + if err := db.compact(); err != nil { db.logger.Log("msg", "compaction failed", "err", err) - continue - } - db.logger.Log("msg", "compaction completed") - // Trigger another compaction in case there's more work to do. - select { - case db.compactc <- struct{}{}: - default: } case <-db.stopc: @@ -237,150 +210,166 @@ func (db *DB) run() { } } -func (db *DB) getBlock(i int) Block { - if i < len(db.persisted) { - return db.persisted[i] - } - return db.heads[i-len(db.persisted)] -} +func (db *DB) compact() error { + changes := false + // Check whether we have pending head blocks that are ready to be persisted. + // They have the highest priority. + db.mtx.RLock() -// removeBlocks removes the blocks in range [i, j) from the list of persisted -// and head blocks. The blocks are not closed and their files not deleted. -func (db *DB) removeBlocks(i, j int) { - for k := i; k < j; k++ { - if i < len(db.persisted) { - db.persisted = append(db.persisted[:i], db.persisted[i+1:]...) - } else { - l := i - len(db.persisted) - db.heads = append(db.heads[:l], db.heads[l+1:]...) - } - } -} + if len(db.heads) > db.opts.AppendableBlocks { + for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] { + // Blocks that won't be appendable when instantiating a new appender + // might still have active appenders on them. + // Abort at the first one we encounter. + if atomic.LoadUint64(&h.activeWriters) > 0 { + break + } -func (db *DB) blocks() (bs []Block) { - for _, b := range db.persisted { - bs = append(bs, b) - } - for _, b := range db.heads { - bs = append(bs, b) - } - return bs -} + db.logger.Log("msg", "write head", "seq", h.Meta().Sequence) -// compact block in range [i, j) into a temporary directory and atomically -// swap the blocks out on successful completion. -func (db *DB) compact(i, j int) error { - if j <= i { - return errors.New("invalid compaction block range") - } - var blocks []Block - for k := i; k < j; k++ { - blocks = append(blocks, db.getBlock(k)) - } - var ( - dir = blocks[0].Dir() - tmpdir = dir + ".tmp" - ) + select { + case <-db.stopc: + db.mtx.RUnlock() + return nil + default: + } - if err := db.compactor.compact(tmpdir, blocks...); err != nil { - return err - } - - pb, err := newPersistedBlock(tmpdir) - if err != nil { - return err - } - - db.mtx.Lock() - defer db.mtx.Unlock() - - for _, b := range blocks { - if err := b.Close(); err != nil { - return errors.Wrapf(err, "close old block %s", b.Dir()) + if err := db.compactor.Write(h.Dir(), h); err != nil { + db.mtx.RUnlock() + return errors.Wrap(err, "persist head block") + } + changes = true } } - if err := renameFile(tmpdir, dir); err != nil { - return errors.Wrap(err, "rename dir") - } - pb.dir = dir + db.mtx.RUnlock() - db.removeBlocks(i, j) - db.persisted = append(db.persisted, pb) - - for _, b := range blocks[1:] { - db.logger.Log("msg", "remove old dir", "dir", b.Dir()) - if err := os.RemoveAll(b.Dir()); err != nil { - return errors.Wrap(err, "removing old block") + // Check for compactions of multiple blocks. + for { + plans, err := db.compactor.Plan(db.dir) + if err != nil { + return errors.Wrap(err, "plan compaction") } - } - if err := db.retentionCutoff(); err != nil { - return err - } - return nil -} + select { + case <-db.stopc: + return nil + default: + } + // We just execute compactions sequentially to not cause too extreme + // CPU and memory spikes. + // TODO(fabxc): return more descriptive plans in the future that allow + // estimation of resource usage and conditional parallelization? + for _, p := range plans { + db.logger.Log("msg", "compact blocks", "seq", fmt.Sprintf("%v", p)) -func (db *DB) retentionCutoff() error { - if db.opts.RetentionDuration == 0 { - return nil - } - h := db.heads[len(db.heads)-1] - t := h.meta.MinTime - int64(db.opts.RetentionDuration) - - var ( - blocks = db.blocks() - i int - b Block - ) - for i, b = range blocks { - if b.Meta().MinTime >= t { + if err := db.compactor.Compact(p...); err != nil { + return errors.Wrapf(err, "compact", p) + } + changes = true + } + // If we didn't compact anything, there's nothing left to do. + if len(plans) == 0 { break } } - if i <= 1 { - return nil - } - db.logger.Log("msg", "retention cutoff", "idx", i-1) - db.removeBlocks(0, i) - for _, b := range blocks[:i] { - if err := os.RemoveAll(b.Dir()); err != nil { - return errors.Wrap(err, "removing old block") - } + if changes { + return errors.Wrap(db.reloadBlocks(), "reload blocks") } return nil } -func (db *DB) initBlocks() error { - var ( - persisted []*persistedBlock - heads []*headBlock - ) +// func (db *DB) retentionCutoff() error { +// if db.opts.RetentionDuration == 0 { +// return nil +// } +// h := db.heads[len(db.heads)-1] +// t := h.meta.MinTime - int64(db.opts.RetentionDuration) + +// var ( +// blocks = db.blocks() +// i int +// b Block +// ) +// for i, b = range blocks { +// if b.Meta().MinTime >= t { +// break +// } +// } +// if i <= 1 { +// return nil +// } +// db.logger.Log("msg", "retention cutoff", "idx", i-1) +// db.removeBlocks(0, i) + +// for _, b := range blocks[:i] { +// if err := os.RemoveAll(b.Dir()); err != nil { +// return errors.Wrap(err, "removing old block") +// } +// } +// return nil +// } + +func (db *DB) reloadBlocks() error { + db.mtx.Lock() + defer db.mtx.Unlock() dirs, err := blockDirs(db.dir) if err != nil { - return err + return errors.Wrap(err, "find blocks") } + var ( + metas []*BlockMeta + persisted []*persistedBlock + heads []*headBlock + seqBlocks = make(map[int]Block, len(dirs)) + ) for _, dir := range dirs { - if fileutil.Exist(filepath.Join(dir, walDirName)) { - h, err := openHeadBlock(dir, db.logger) - if err != nil { - return err - } - h.generation = db.headGen - db.headGen++ - heads = append(heads, h) - continue - } - b, err := newPersistedBlock(dir) + meta, err := readMetaFile(dir) if err != nil { - return err + return errors.Wrapf(err, "read meta information %s", dir) } - persisted = append(persisted, b) + metas = append(metas, meta) } + for i, meta := range metas { + b, ok := db.seqBlocks[meta.Sequence] + if !ok { + return errors.Errorf("missing block for sequence %d", meta.Sequence) + } + + if meta.Compaction.Generation == 0 { + if meta.ULID != b.Meta().ULID { + return errors.Errorf("head block ULID changed unexpectedly") + } + heads = append(heads, b.(*headBlock)) + } else { + if meta.ULID != b.Meta().ULID { + if err := b.Close(); err != nil { + return err + } + b, err = newPersistedBlock(dirs[i]) + if err != nil { + return errors.Wrapf(err, "open persisted block %s", dirs[i]) + } + } + persisted = append(persisted, b.(*persistedBlock)) + } + + seqBlocks[meta.Sequence] = b + } + + for seq, b := range db.seqBlocks { + if _, ok := seqBlocks[seq]; !ok { + if err := b.Close(); err != nil { + return errors.Wrapf(err, "closing removed block %d", b.Meta().Sequence) + } + } + } + + db.seqBlocks = seqBlocks db.persisted = persisted db.heads = heads @@ -643,6 +632,7 @@ func (db *DB) cut(mint int64) (*headBlock, error) { } db.heads = append(db.heads, newHead) + db.seqBlocks[seq] = newHead db.headGen++ newHead.generation = db.headGen diff --git a/vendor/github.com/fabxc/tsdb/head.go b/vendor/github.com/fabxc/tsdb/head.go index 260fb962e..969bf3416 100644 --- a/vendor/github.com/fabxc/tsdb/head.go +++ b/vendor/github.com/fabxc/tsdb/head.go @@ -63,7 +63,10 @@ type headBlock struct { } func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*headBlock, error) { - if err := os.MkdirAll(dir, 0777); err != nil { + // Make head block creation appear atomic. + tmp := dir + ".tmp" + + if err := os.MkdirAll(tmp, 0777); err != nil { return nil, err } ulid, err := ulid.New(ulid.Now(), entropy) @@ -71,7 +74,7 @@ func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*head return nil, err } - if err := writeMetaFile(dir, &BlockMeta{ + if err := writeMetaFile(tmp, &BlockMeta{ ULID: ulid, Sequence: seq, MinTime: mint, @@ -79,6 +82,9 @@ func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*head }); err != nil { return nil, err } + if err := renameFile(tmp, dir); err != nil { + return nil, err + } return openHeadBlock(dir, l) } @@ -143,8 +149,11 @@ func (h *headBlock) Close() error { return err } // Check whether the head block still exists in the underlying dir - // or has already been replaced with a compacted version + // or has already been replaced with a compacted version or removed. meta, err := readMetaFile(h.dir) + if os.IsNotExist(err) { + return nil + } if err != nil { return err } diff --git a/vendor/github.com/fabxc/tsdb/wal.go b/vendor/github.com/fabxc/tsdb/wal.go index a923f5b3d..5962711f9 100644 --- a/vendor/github.com/fabxc/tsdb/wal.go +++ b/vendor/github.com/fabxc/tsdb/wal.go @@ -58,7 +58,7 @@ type WAL struct { const ( walDirName = "wal" - walSegmentSizeBytes = 64 * 1000 * 1000 // 64 MB + walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB ) // OpenWAL opens or creates a write ahead log in the given directory. diff --git a/vendor/vendor.json b/vendor/vendor.json index 54bfb885f..288561628 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -368,10 +368,10 @@ "revisionTime": "2016-09-30T00:14:02Z" }, { - "checksumSHA1": "E5C5z6CV6JeIA2cpT3KVWeFgZdM=", + "checksumSHA1": "L/5bfnMJXzbLXj+vN7Ph1F23+T4=", "path": "github.com/fabxc/tsdb", - "revision": "2c3b56350a6d75a15484494c5a87145828cb34ef", - "revisionTime": "2017-03-01T16:19:57Z" + "revision": "cc0a7c82793515d6311801b7eb3ef8562e61f4c3", + "revisionTime": "2017-03-02T20:54:30Z" }, { "checksumSHA1": "uVzWuLvF646YjiKomsc2CR1ua58=",