diff --git a/block.go b/block.go index 9ccc6f78b..45c8b60cf 100644 --- a/block.go +++ b/block.go @@ -60,6 +60,11 @@ type Block interface { type headBlock interface { Block Appendable + + // ActiveWriters returns the number of currently active appenders. + ActiveWriters() int + // HighTimestamp returns the highest currently inserted timestamp. + HighTimestamp() int64 } // Snapshottable defines an entity that can be backedup online. @@ -71,9 +76,6 @@ type Snapshottable interface { type Appendable interface { // Appender returns a new Appender against an underlying store. Appender() Appender - - // Busy returns whether there are any currently active appenders. - Busy() bool } // Queryable defines an entity which provides a Querier. diff --git a/db.go b/db.go index e0e8ff173..80a335c17 100644 --- a/db.go +++ b/db.go @@ -305,6 +305,15 @@ func (db *DB) retentionCutoff() (bool, error) { return retentionCutoff(db.dir, mint) } +// headFullness returns up to which fraction of a blocks time range samples +// were already inserted. +func headFullness(h headBlock) float64 { + m := h.Meta() + a := float64(h.HighTimestamp() - m.MinTime) + b := float64(m.MaxTime - m.MinTime) + return a / b +} + func (db *DB) compact() (changes bool, err error) { db.cmtx.Lock() defer db.cmtx.Unlock() @@ -319,12 +328,14 @@ func (db *DB) compact() (changes bool, err error) { // returning the lock to not block Appenders. // Selected blocks are semantically ensured to not be written to afterwards // by appendable(). - if len(db.heads) > 2 { - for _, h := range db.heads[:len(db.heads)-2] { + if len(db.heads) > 1 { + f := headFullness(db.heads[len(db.heads)-1]) + + for _, h := range db.heads[:len(db.heads)-1] { // Blocks that won't be appendable when instantiating a new appender // might still have active appenders on them. // Abort at the first one we encounter. - if h.Busy() { + if h.ActiveWriters() > 0 || f < 0.5 { break } singles = append(singles, h) @@ -849,6 +860,8 @@ func (db *DB) createHeadBlock(mint, maxt int64) (headBlock, error) { return nil, err } + db.logger.Log("msg", "created head block", "ulid", newHead.meta.ULID, "mint", mint, "maxt", maxt) + db.blocks = append(db.blocks, newHead) // TODO(fabxc): this is a race! db.heads = append(db.heads, newHead) diff --git a/head.go b/head.go index d37cd7ef5..d64c4eda5 100644 --- a/head.go +++ b/head.go @@ -57,6 +57,7 @@ type HeadBlock struct { wal WAL activeWriters uint64 + highTimestamp int64 closed bool // descs holds all chunk descs for the head block. Each chunk implicitly @@ -389,9 +390,14 @@ func (h *HeadBlock) Appender() Appender { return &headAppender{HeadBlock: h, samples: getHeadAppendBuffer()} } -// Busy returns true if the block has open write transactions. -func (h *HeadBlock) Busy() bool { - return atomic.LoadUint64(&h.activeWriters) > 0 +// ActiveWriters returns true if the block has open write transactions. +func (h *HeadBlock) ActiveWriters() int { + return int(atomic.LoadUint64(&h.activeWriters)) +} + +// HighTimestamp returns the highest inserted sample timestamp. +func (h *HeadBlock) HighTimestamp() int64 { + return atomic.LoadInt64(&h.highTimestamp) } var headPool = sync.Pool{} @@ -415,7 +421,8 @@ type headAppender struct { newLabels []labels.Labels newHashes map[uint64]uint64 - samples []RefSample + samples []RefSample + highTimestamp int64 } type hashedLabels struct { @@ -513,6 +520,10 @@ func (a *headAppender) AddFast(ref string, t int64, v float64) error { } } + if t > a.highTimestamp { + a.highTimestamp = t + } + a.samples = append(a.samples, RefSample{ Ref: refn, T: t, @@ -593,6 +604,16 @@ func (a *headAppender) Commit() error { atomic.AddUint64(&a.meta.Stats.NumSamples, total) atomic.AddUint64(&a.meta.Stats.NumSeries, uint64(len(a.newSeries))) + for { + ht := a.HeadBlock.HighTimestamp() + if a.highTimestamp <= ht { + break + } + if atomic.CompareAndSwapInt64(&a.HeadBlock.highTimestamp, ht, a.highTimestamp) { + break + } + } + return nil } diff --git a/querier.go b/querier.go index 10ae20d01..68c35a918 100644 --- a/querier.go +++ b/querier.go @@ -38,7 +38,7 @@ type Querier interface { Close() error } -// Series represents a single time series. +// Series exposes a single time series. type Series interface { // Labels returns the complete set of labels identifying the series. Labels() labels.Labels