From 9c4235532efcb6f4e9f57ea770d95e15857b33a0 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 13 Jul 2017 16:15:13 +0200 Subject: [PATCH] Fix compaction selection after creating new heads This fixes the case where between block creations no compaction plans are ran. We were not compacting anything in these cases since the on creation the most recent head block always had a high timestamp of 0. --- cmd/tsdb/main.go | 5 +-- db.go | 79 +++++++++++++++++++++++++----------------------- 2 files changed, 44 insertions(+), 40 deletions(-) diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index dbe92f3df..d87b1d150 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -112,8 +112,8 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{ WALFlushInterval: 200 * time.Millisecond, - RetentionDuration: 2 * 24 * 60 * 60 * 1000, // 1 days in milliseconds - BlockRanges: tsdb.ExponentialBlockRanges(int64(2*time.Hour), 3, 5), + RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds + BlockRanges: tsdb.ExponentialBlockRanges(3*60*60*1000, 3, 5), }) if err != nil { exitWithError(err) @@ -188,6 +188,7 @@ func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (u } wg.Wait() } + fmt.Println("ingestion completed") return total, nil } diff --git a/db.go b/db.go index 2204de6c9..4f10fe9e8 100644 --- a/db.go +++ b/db.go @@ -325,37 +325,52 @@ func headFullness(h headBlock) float64 { return a / b } +// appendableHeads returns a copy of a slice of HeadBlocks that can still be appended to. +func (db *DB) appendableHeads() (r []headBlock) { + switch l := len(db.heads); l { + case 0: + case 1: + r = append(r, db.heads[0]) + default: + if headFullness(db.heads[l-1]) < 0.5 { + r = append(r, db.heads[l-2]) + } + r = append(r, db.heads[l-1]) + } + return r +} + +func (db *DB) completedHeads() (r []headBlock) { + db.headmtx.RLock() + defer db.headmtx.RUnlock() + + if len(db.heads) < 2 { + return nil + } + + // Select all old heads unless they still have pending appenders. + for _, h := range db.heads[:len(db.heads)-2] { + if h.ActiveWriters() > 0 { + return r + } + r = append(r, h) + } + // Add the 2nd last head if the last head is more than 50% filled. + // Compacting it early allows us to free its memory before allocating + // more for the next block and thus reduces spikes. + if h2 := db.heads[len(db.heads)-2]; headFullness(h2) >= 0.5 && h2.ActiveWriters() == 0 { + r = append(r, h2) + } + return r +} + func (db *DB) compact() (changes bool, err error) { db.cmtx.Lock() defer db.cmtx.Unlock() - db.headmtx.RLock() - // Check whether we have pending head blocks that are ready to be persisted. // They have the highest priority. - var singles []Block - - // Collect head blocks that are ready for compaction. Write them after - // returning the lock to not block Appenders. - // Selected blocks are semantically ensured to not be written to afterwards - // by appendable(). - if len(db.heads) > 1 { - f := headFullness(db.heads[len(db.heads)-1]) - - for _, h := range db.heads[:len(db.heads)-1] { - // Blocks that won't be appendable when instantiating a new appender - // might still have active appenders on them. - // Abort at the first one we encounter. - if h.ActiveWriters() > 0 || f < 0.5 { - break - } - singles = append(singles, h) - } - } - - db.headmtx.RUnlock() - - for _, h := range singles { + for _, h := range db.completedHeads() { select { case <-db.stopc: return changes, nil @@ -677,7 +692,7 @@ func (a *dbAppender) appenderAt(t int64) (*metaAppender, error) { } var hb headBlock - for _, h := range a.db.appendable() { + for _, h := range a.db.appendableHeads() { m := h.Meta() if intervalContains(m.MinTime, m.MaxTime-1, t) { @@ -809,18 +824,6 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { return nil } -// appendable returns a copy of a slice of HeadBlocks that can still be appended to. -func (db *DB) appendable() (r []headBlock) { - switch len(db.heads) { - case 0: - case 1: - r = append(r, db.heads[0]) - default: - r = append(r, db.heads[len(db.heads)-2:]...) - } - return r -} - func intervalOverlap(amin, amax, bmin, bmax int64) bool { // Checks Overlap: http://stackoverflow.com/questions/3269434/ return amin <= bmax && bmin <= amax