From c7f5590a71c5b07b1059ac143e7cdd5421e2d25b Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 13 Jan 2017 15:25:11 +0100 Subject: [PATCH] Ensure order of postings when adding new series --- db.go | 3 ++- head.go | 23 +++++++++++++++++------ postings.go | 18 +++++++++++++++++- 3 files changed, 36 insertions(+), 8 deletions(-) diff --git a/db.go b/db.go index 32732f6a1..2e225f3e3 100644 --- a/db.go +++ b/db.go @@ -123,7 +123,8 @@ func Open(dir string, logger log.Logger) (db *DB, err error) { return nil, err } } - r := prometheus.DefaultRegisterer + var r prometheus.Registerer + // r := prometheus.DefaultRegisterer db = &DB{ dir: dir, diff --git a/head.go b/head.go index 42dda760f..3fc39b854 100644 --- a/head.go +++ b/head.go @@ -128,8 +128,11 @@ type headAppender struct { *headBlock newSeries map[uint32]hashedLabels + newHashes map[uint64]uint32 newLabels []labels.Labels - samples []hashedSample + newRefs []uint32 + + samples []hashedSample } type hashedLabels struct { @@ -145,12 +148,18 @@ func (a *headAppender) setSeries(hash uint64, lset labels.Labels) (uint64, error if ms := a.get(hash, lset); ms != nil { return uint64(ms.ref), nil } + if ref, ok := a.newHashes[hash]; ok { + return uint64(ref), nil + } id := atomic.AddUint64(&a.nextSeriesID, 1) - 1 if a.newSeries == nil { a.newSeries = map[uint32]hashedLabels{} + a.newHashes = map[uint64]uint32{} } a.newSeries[uint32(id)] = hashedLabels{hash: hash, labels: lset} + a.newHashes[hash] = uint32(id) + a.newRefs = append(a.newRefs, uint32(id)) return id, nil } @@ -210,16 +219,17 @@ func (a *headAppender) createSeries() { a.mtx.RUnlock() a.mtx.Lock() - for id, l := range a.newSeries { + for _, ref := range a.newRefs { + l := a.newSeries[ref] // We switched locks and have to re-validate that the series were not // created by another goroutine in the meantime. - if int(id) < len(a.series) && a.series[id] != nil { + if int(ref) < len(a.series) && a.series[ref] != nil { continue } // Series is still new. a.newLabels = append(a.newLabels, l.labels) - a.create(id, l.hash, l.labels) + a.create(ref, l.hash, l.labels) } a.mtx.Unlock() @@ -228,15 +238,15 @@ func (a *headAppender) createSeries() { func (a *headAppender) Commit() error { defer putHeadAppendBuffer(a.samples) - defer a.mtx.RUnlock() - // Write all new series and samples to the WAL and add it to the // in-mem database on success. if err := a.wal.Log(a.newLabels, a.samples); err != nil { + a.mtx.RUnlock() return err } a.createSeries() + var ( total = uint64(len(a.samples)) mint = int64(math.MaxInt64) @@ -248,6 +258,7 @@ func (a *headAppender) Commit() error { total-- } } + a.mtx.RUnlock() a.stats.mtx.Lock() defer a.stats.mtx.Unlock() diff --git a/postings.go b/postings.go index b3afe831a..e8a4a4621 100644 --- a/postings.go +++ b/postings.go @@ -26,7 +26,23 @@ func (p *memPostings) get(t term) Postings { // term argument appears twice. func (p *memPostings) add(id uint32, terms ...term) { for _, t := range terms { - p.m[t] = append(p.m[t], id) + // We expect IDs to roughly be appended in order but some concurrency + // related out of order at the end. We do insertion sort from the end + // to account for it. + l := p.m[t] + i := len(l) - 1 + + for ; i >= 0; i-- { + if id > l[i] { + break + } + } + l = append(l, 0) + + copy(l[i+2:], l[i+1:]) + l[i+1] = id + + p.m[t] = l } }