From cd2e26b7fc21360b97d632a4cdd094d8771cab2a Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 5 Oct 2017 22:22:14 +0200 Subject: [PATCH] Load postings in batch on startup This allows to insert IDs to postings out of order until a trigger function is called. This avoids the insertion sort we usually do which can be very costly since WAL entries are more out of order than regular adds. --- head.go | 4 +++- postings.go | 60 +++++++++++++++++++++++++++++++++++++++++++++--- postings_test.go | 28 ++++++++++++++++++++++ 3 files changed, 88 insertions(+), 4 deletions(-) diff --git a/head.go b/head.go index 82a3459bd1..4f3c60c39f 100644 --- a/head.go +++ b/head.go @@ -178,7 +178,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( series: newStripeSeries(), values: map[string]stringset{}, symbols: map[string]struct{}{}, - postings: newMemPostings(), + postings: newUnorderedMemPostings(), tombstones: newEmptyTombstoneReader(), } h.metrics = newHeadMetrics(h, r) @@ -188,6 +188,8 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( // ReadWAL initializes the head by consuming the write ahead log. func (h *Head) ReadWAL() error { + defer h.postings.ensureOrder() + r := h.wal.Reader() mint := h.MinTime() diff --git a/postings.go b/postings.go index 0e51b221b3..2647f4dd8e 100644 --- a/postings.go +++ b/postings.go @@ -15,6 +15,7 @@ package tsdb import ( "encoding/binary" + "runtime" "sort" "strings" "sync" @@ -22,14 +23,30 @@ import ( "github.com/prometheus/tsdb/labels" ) +// memPostings holds postings list for series ID per label pair. They may be written +// to out of order. +// ensureOrder() must be called once before any reads are done. This allows for quick +// unordered batch fills on startup. type memPostings struct { - mtx sync.RWMutex - m map[labels.Label][]uint64 + mtx sync.RWMutex + m map[labels.Label][]uint64 + ordered bool } +// newMemPoistings returns a memPostings that's ready for reads and writes. func newMemPostings() *memPostings { return &memPostings{ - m: make(map[labels.Label][]uint64, 512), + m: make(map[labels.Label][]uint64, 512), + ordered: true, + } +} + +// newUnorderedMemPostings returns a memPostings that is not safe to be read from +// until ensureOrder was called once. +func newUnorderedMemPostings() *memPostings { + return &memPostings{ + m: make(map[labels.Label][]uint64, 512), + ordered: false, } } @@ -47,6 +64,40 @@ func (p *memPostings) get(name, value string) Postings { var allPostingsKey = labels.Label{} +// ensurePostings ensures that all postings lists are sorted. After it returns all further +// calls to add and addFor will insert new IDs in a sorted manner. +func (p *memPostings) ensureOrder() { + p.mtx.Lock() + defer p.mtx.Unlock() + + if p.ordered { + return + } + + n := runtime.GOMAXPROCS(0) + workc := make(chan []uint64) + + var wg sync.WaitGroup + wg.Add(n) + + for i := 0; i < n; i++ { + go func() { + for l := range workc { + sort.Slice(l, func(i, j int) bool { return l[i] < l[j] }) + } + wg.Done() + }() + } + + for _, l := range p.m { + workc <- l + } + close(workc) + wg.Wait() + + p.ordered = true +} + // add adds a document to the index. The caller has to ensure that no // term argument appears twice. func (p *memPostings) add(id uint64, lset labels.Labels) { @@ -64,6 +115,9 @@ func (p *memPostings) addFor(id uint64, l labels.Label) { list := append(p.m[l], id) p.m[l] = list + if !p.ordered { + return + } // There is no guarantee that no higher ID was inserted before as they may // be generated independently before adding them to postings. // We repair order violations on insert. The invariant is that the first n-1 diff --git a/postings_test.go b/postings_test.go index 48cd2b6088..694ba4d95e 100644 --- a/postings_test.go +++ b/postings_test.go @@ -15,9 +15,12 @@ package tsdb import ( "encoding/binary" + "fmt" "math/rand" + "sort" "testing" + "github.com/prometheus/tsdb/labels" "github.com/stretchr/testify/require" ) @@ -30,6 +33,31 @@ func TestMemPostings_addFor(t *testing.T) { require.Equal(t, []uint64{1, 2, 3, 4, 5, 6, 7, 8}, p.m[allPostingsKey]) } +func TestMemPostings_ensureOrder(t *testing.T) { + p := newUnorderedMemPostings() + + for i := 0; i < 100; i++ { + l := make([]uint64, 100) + for j := range l { + l[j] = rand.Uint64() + } + v := fmt.Sprintf("%d", i) + + p.m[labels.Label{"a", v}] = l + } + + p.ensureOrder() + + for _, l := range p.m { + ok := sort.SliceIsSorted(l, func(i, j int) bool { + return l[i] < l[j] + }) + if !ok { + t.Fatalf("postings list %v is not sorted", l) + } + } +} + type mockPostings struct { next func() bool seek func(uint64) bool