From 426125298e7ecb822273165b5e2c2b5562a53ded Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 3 Jul 2017 14:41:02 +0200 Subject: [PATCH] vendor: update prometheus/tsdb --- vendor/github.com/prometheus/tsdb/block.go | 8 +- vendor/github.com/prometheus/tsdb/db.go | 23 +++++- vendor/github.com/prometheus/tsdb/head.go | 35 ++++++-- vendor/github.com/prometheus/tsdb/postings.go | 18 ++--- vendor/github.com/prometheus/tsdb/querier.go | 81 +++++++++---------- vendor/vendor.json | 14 ++-- 6 files changed, 105 insertions(+), 74 deletions(-) diff --git a/vendor/github.com/prometheus/tsdb/block.go b/vendor/github.com/prometheus/tsdb/block.go index 9ccc6f78b..45c8b60cf 100644 --- a/vendor/github.com/prometheus/tsdb/block.go +++ b/vendor/github.com/prometheus/tsdb/block.go @@ -60,6 +60,11 @@ type Block interface { type headBlock interface { Block Appendable + + // ActiveWriters returns the number of currently active appenders. + ActiveWriters() int + // HighTimestamp returns the highest currently inserted timestamp. + HighTimestamp() int64 } // Snapshottable defines an entity that can be backedup online. @@ -71,9 +76,6 @@ type Snapshottable interface { type Appendable interface { // Appender returns a new Appender against an underlying store. Appender() Appender - - // Busy returns whether there are any currently active appenders. - Busy() bool } // Queryable defines an entity which provides a Querier. diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index e0e8ff173..17f3e5f5f 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -80,6 +80,7 @@ type Appender interface { // Returned reference numbers are ephemeral and may be rejected in calls // to AddFast() at any point. Adding the sample via Add() returns a new // reference number. + // If the reference is the empty string it must not be used for caching. Add(l labels.Labels, t int64, v float64) (string, error) // Add adds a sample pair for the referenced series. It is generally faster @@ -305,6 +306,15 @@ func (db *DB) retentionCutoff() (bool, error) { return retentionCutoff(db.dir, mint) } +// headFullness returns up to which fraction of a blocks time range samples +// were already inserted. +func headFullness(h headBlock) float64 { + m := h.Meta() + a := float64(h.HighTimestamp() - m.MinTime) + b := float64(m.MaxTime - m.MinTime) + return a / b +} + func (db *DB) compact() (changes bool, err error) { db.cmtx.Lock() defer db.cmtx.Unlock() @@ -319,12 +329,14 @@ func (db *DB) compact() (changes bool, err error) { // returning the lock to not block Appenders. // Selected blocks are semantically ensured to not be written to afterwards // by appendable(). - if len(db.heads) > 2 { - for _, h := range db.heads[:len(db.heads)-2] { + if len(db.heads) > 1 { + f := headFullness(db.heads[len(db.heads)-1]) + + for _, h := range db.heads[:len(db.heads)-1] { // Blocks that won't be appendable when instantiating a new appender // might still have active appenders on them. // Abort at the first one we encounter. - if h.Busy() { + if h.ActiveWriters() > 0 || f < 0.5 { break } singles = append(singles, h) @@ -605,6 +617,9 @@ func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (string, error) } a.samples++ + if ref == "" { + return "", nil + } return string(append(h.meta.ULID[:], ref...)), nil } @@ -849,6 +864,8 @@ func (db *DB) createHeadBlock(mint, maxt int64) (headBlock, error) { return nil, err } + db.logger.Log("msg", "created head block", "ulid", newHead.meta.ULID, "mint", mint, "maxt", maxt) + db.blocks = append(db.blocks, newHead) // TODO(fabxc): this is a race! db.heads = append(db.heads, newHead) diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index d37cd7ef5..90e03f727 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -57,6 +57,7 @@ type HeadBlock struct { wal WAL activeWriters uint64 + highTimestamp int64 closed bool // descs holds all chunk descs for the head block. Each chunk implicitly @@ -389,9 +390,14 @@ func (h *HeadBlock) Appender() Appender { return &headAppender{HeadBlock: h, samples: getHeadAppendBuffer()} } -// Busy returns true if the block has open write transactions. -func (h *HeadBlock) Busy() bool { - return atomic.LoadUint64(&h.activeWriters) > 0 +// ActiveWriters returns true if the block has open write transactions. +func (h *HeadBlock) ActiveWriters() int { + return int(atomic.LoadUint64(&h.activeWriters)) +} + +// HighTimestamp returns the highest inserted sample timestamp. +func (h *HeadBlock) HighTimestamp() int64 { + return atomic.LoadInt64(&h.highTimestamp) } var headPool = sync.Pool{} @@ -415,7 +421,8 @@ type headAppender struct { newLabels []labels.Labels newHashes map[uint64]uint64 - samples []RefSample + samples []RefSample + highTimestamp int64 } type hashedLabels struct { @@ -443,7 +450,7 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (string, erro // XXX(fabxc): there's no fast path for multiple samples for the same new series // in the same transaction. We always return the invalid empty ref. It's has not // been a relevant use case so far and is not worth the trouble. - return nullRef, a.AddFast(string(refb), t, v) + return "", a.AddFast(string(refb), t, v) } // The series is completely new. @@ -464,11 +471,9 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (string, erro a.newHashes[hash] = ref binary.BigEndian.PutUint64(refb, ref) - return nullRef, a.AddFast(string(refb), t, v) + return "", a.AddFast(string(refb), t, v) } -var nullRef = string([]byte{0, 0, 0, 0, 0, 0, 0, 0}) - func (a *headAppender) AddFast(ref string, t int64, v float64) error { if len(ref) != 8 { return errors.Wrap(ErrNotFound, "invalid ref length") @@ -513,6 +518,10 @@ func (a *headAppender) AddFast(ref string, t int64, v float64) error { } } + if t > a.highTimestamp { + a.highTimestamp = t + } + a.samples = append(a.samples, RefSample{ Ref: refn, T: t, @@ -593,6 +602,16 @@ func (a *headAppender) Commit() error { atomic.AddUint64(&a.meta.Stats.NumSamples, total) atomic.AddUint64(&a.meta.Stats.NumSeries, uint64(len(a.newSeries))) + for { + ht := a.HeadBlock.HighTimestamp() + if a.highTimestamp <= ht { + break + } + if atomic.CompareAndSwapInt64(&a.HeadBlock.highTimestamp, ht, a.highTimestamp) { + break + } + } + return nil } diff --git a/vendor/github.com/prometheus/tsdb/postings.go b/vendor/github.com/prometheus/tsdb/postings.go index f2f452ac1..f2f1eb5b8 100644 --- a/vendor/github.com/prometheus/tsdb/postings.go +++ b/vendor/github.com/prometheus/tsdb/postings.go @@ -78,12 +78,11 @@ func Intersect(its ...Postings) Postings { if len(its) == 0 { return emptyPostings } - a := its[0] - - for _, b := range its[1:] { - a = newIntersectPostings(a, b) + if len(its) == 1 { + return its[0] } - return a + l := len(its) / 2 + return newIntersectPostings(Intersect(its[:l]...), Intersect(its[l:]...)) } type intersectPostings struct { @@ -145,12 +144,11 @@ func Merge(its ...Postings) Postings { if len(its) == 0 { return nil } - a := its[0] - - for _, b := range its[1:] { - a = newMergedPostings(a, b) + if len(its) == 1 { + return its[0] } - return a + l := len(its) / 2 + return newMergedPostings(Merge(its[:l]...), Merge(its[l:]...)) } type mergedPostings struct { diff --git a/vendor/github.com/prometheus/tsdb/querier.go b/vendor/github.com/prometheus/tsdb/querier.go index 601fc7440..523a67398 100644 --- a/vendor/github.com/prometheus/tsdb/querier.go +++ b/vendor/github.com/prometheus/tsdb/querier.go @@ -15,7 +15,6 @@ package tsdb import ( "fmt" - "sort" "strings" "github.com/prometheus/tsdb/chunks" @@ -38,7 +37,7 @@ type Querier interface { Close() error } -// Series represents a single time series. +// Series exposes a single time series. type Series interface { // Labels returns the complete set of labels identifying the series. Labels() labels.Labels @@ -75,22 +74,26 @@ func (s *DB) Querier(mint, maxt int64) Querier { } func (q *querier) LabelValues(n string) ([]string, error) { - if len(q.blocks) == 0 { + return q.lvals(q.blocks, n) +} + +func (q *querier) lvals(qs []Querier, n string) ([]string, error) { + if len(qs) == 0 { return nil, nil } - res, err := q.blocks[0].LabelValues(n) + if len(qs) == 1 { + return qs[0].LabelValues(n) + } + l := len(qs) / 2 + s1, err := q.lvals(qs[:l], n) if err != nil { return nil, err } - for _, bq := range q.blocks[1:] { - pr, err := bq.LabelValues(n) - if err != nil { - return nil, err - } - // Merge new values into deduplicated result. - res = mergeStrings(res, pr) + s2, err := q.lvals(qs[l:], n) + if err != nil { + return nil, err } - return res, nil + return mergeStrings(s1, s2), nil } func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) { @@ -98,19 +101,19 @@ func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) { } func (q *querier) Select(ms ...labels.Matcher) SeriesSet { - // Sets from different blocks have no time overlap. The reference numbers - // they emit point to series sorted in lexicographic order. - // We can fully connect partial series by simply comparing with the previous - // label set. - if len(q.blocks) == 0 { + return q.sel(q.blocks, ms) + +} + +func (q *querier) sel(qs []Querier, ms []labels.Matcher) SeriesSet { + if len(qs) == 0 { return nopSeriesSet{} } - r := q.blocks[0].Select(ms...) - - for _, s := range q.blocks[1:] { - r = newMergedSeriesSet(r, s.Select(ms...)) + if len(qs) == 1 { + return qs[0].Select(ms...) } - return r + l := len(qs) / 2 + return newMergedSeriesSet(q.sel(qs[:l], ms), q.sel(qs[l:], ms)) } func (q *querier) Close() error { @@ -657,10 +660,6 @@ func newChunkSeriesIterator(cs []*ChunkMeta, dranges intervals, mint, maxt int64 } } -func (it *chunkSeriesIterator) inBounds(t int64) bool { - return t >= it.mint && t <= it.maxt -} - func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { if t > it.maxt { return false @@ -671,23 +670,13 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { t = it.mint } - // Only do binary search forward to stay in line with other iterators - // that can only move forward. - x := sort.Search(len(it.chunks[it.i:]), func(i int) bool { return it.chunks[i].MinTime >= t }) - x += it.i - - // If the timestamp was not found, it might be in the last chunk. - if x == len(it.chunks) { - x-- - - // Go to previous chunk if the chunk doesn't exactly start with t. - // If we are already at the first chunk, we use it as it's the best we have. - } else if x > 0 && it.chunks[x].MinTime > t { - x-- + for ; it.chunks[it.i].MaxTime < t; it.i++ { + if it.i == len(it.chunks)-1 { + return false + } } - it.i = x - it.cur = it.chunks[x].Chunk.Iterator() + it.cur = it.chunks[it.i].Chunk.Iterator() if len(it.intervals) > 0 { it.cur = &deletedIterator{it: it.cur, intervals: it.intervals} } @@ -708,9 +697,15 @@ func (it *chunkSeriesIterator) At() (t int64, v float64) { func (it *chunkSeriesIterator) Next() bool { for it.cur.Next() { t, _ := it.cur.At() - if it.inBounds(t) { - return true + if t < it.mint { + return it.Seek(it.mint) } + + if t > it.maxt { + return false + } + + return true } if err := it.cur.Err(); err != nil { diff --git a/vendor/vendor.json b/vendor/vendor.json index b742bef11..d7ccaa9b7 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -763,22 +763,22 @@ "revisionTime": "2016-04-11T19:08:41Z" }, { - "checksumSHA1": "XXXDHMZe3Y3gosaF/1staHm3INc=", + "checksumSHA1": "kT9X/dKXjFCoxV48N2C9NZhPRvA=", "path": "github.com/prometheus/tsdb", - "revision": "9963a4c7c3b2a742e00a63c54084b051e3174b06", - "revisionTime": "2017-06-12T09:17:49Z" + "revision": "d492bfd973c24026ab784c1c1821af426bc80e90", + "revisionTime": "2017-06-30T13:17:34Z" }, { "checksumSHA1": "9EH3v+JdbikCUJAgD4VEOPIaWfs=", "path": "github.com/prometheus/tsdb/chunks", - "revision": "4f2eb2057ee0a7f2b984503886bff970a9dab1a8", - "revisionTime": "2017-05-22T06:49:09Z" + "revision": "d492bfd973c24026ab784c1c1821af426bc80e90", + "revisionTime": "2017-06-30T13:17:34Z" }, { "checksumSHA1": "3RHZcB/ZvIae9K0tJxNlajJg0jA=", "path": "github.com/prometheus/tsdb/labels", - "revision": "4f2eb2057ee0a7f2b984503886bff970a9dab1a8", - "revisionTime": "2017-05-22T06:49:09Z" + "revision": "d492bfd973c24026ab784c1c1821af426bc80e90", + "revisionTime": "2017-06-30T13:17:34Z" }, { "checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",