mirror of
https://github.com/prometheus/prometheus
synced 2025-01-16 11:53:16 +00:00
Merge pull request #166 from prometheus/batchpostings
Load postings in batch on startup
This commit is contained in:
commit
dc87103807
4
head.go
4
head.go
@ -178,7 +178,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
|
||||
series: newStripeSeries(),
|
||||
values: map[string]stringset{},
|
||||
symbols: map[string]struct{}{},
|
||||
postings: newMemPostings(),
|
||||
postings: newUnorderedMemPostings(),
|
||||
tombstones: newEmptyTombstoneReader(),
|
||||
}
|
||||
h.metrics = newHeadMetrics(h, r)
|
||||
@ -188,6 +188,8 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
|
||||
|
||||
// ReadWAL initializes the head by consuming the write ahead log.
|
||||
func (h *Head) ReadWAL() error {
|
||||
defer h.postings.ensureOrder()
|
||||
|
||||
r := h.wal.Reader()
|
||||
mint := h.MinTime()
|
||||
|
||||
|
60
postings.go
60
postings.go
@ -15,6 +15,7 @@ package tsdb
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
@ -22,14 +23,30 @@ import (
|
||||
"github.com/prometheus/tsdb/labels"
|
||||
)
|
||||
|
||||
// memPostings holds postings list for series ID per label pair. They may be written
|
||||
// to out of order.
|
||||
// ensureOrder() must be called once before any reads are done. This allows for quick
|
||||
// unordered batch fills on startup.
|
||||
type memPostings struct {
|
||||
mtx sync.RWMutex
|
||||
m map[labels.Label][]uint64
|
||||
mtx sync.RWMutex
|
||||
m map[labels.Label][]uint64
|
||||
ordered bool
|
||||
}
|
||||
|
||||
// newMemPoistings returns a memPostings that's ready for reads and writes.
|
||||
func newMemPostings() *memPostings {
|
||||
return &memPostings{
|
||||
m: make(map[labels.Label][]uint64, 512),
|
||||
m: make(map[labels.Label][]uint64, 512),
|
||||
ordered: true,
|
||||
}
|
||||
}
|
||||
|
||||
// newUnorderedMemPostings returns a memPostings that is not safe to be read from
|
||||
// until ensureOrder was called once.
|
||||
func newUnorderedMemPostings() *memPostings {
|
||||
return &memPostings{
|
||||
m: make(map[labels.Label][]uint64, 512),
|
||||
ordered: false,
|
||||
}
|
||||
}
|
||||
|
||||
@ -47,6 +64,40 @@ func (p *memPostings) get(name, value string) Postings {
|
||||
|
||||
var allPostingsKey = labels.Label{}
|
||||
|
||||
// ensurePostings ensures that all postings lists are sorted. After it returns all further
|
||||
// calls to add and addFor will insert new IDs in a sorted manner.
|
||||
func (p *memPostings) ensureOrder() {
|
||||
p.mtx.Lock()
|
||||
defer p.mtx.Unlock()
|
||||
|
||||
if p.ordered {
|
||||
return
|
||||
}
|
||||
|
||||
n := runtime.GOMAXPROCS(0)
|
||||
workc := make(chan []uint64)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(n)
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
go func() {
|
||||
for l := range workc {
|
||||
sort.Slice(l, func(i, j int) bool { return l[i] < l[j] })
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
for _, l := range p.m {
|
||||
workc <- l
|
||||
}
|
||||
close(workc)
|
||||
wg.Wait()
|
||||
|
||||
p.ordered = true
|
||||
}
|
||||
|
||||
// add adds a document to the index. The caller has to ensure that no
|
||||
// term argument appears twice.
|
||||
func (p *memPostings) add(id uint64, lset labels.Labels) {
|
||||
@ -64,6 +115,9 @@ func (p *memPostings) addFor(id uint64, l labels.Label) {
|
||||
list := append(p.m[l], id)
|
||||
p.m[l] = list
|
||||
|
||||
if !p.ordered {
|
||||
return
|
||||
}
|
||||
// There is no guarantee that no higher ID was inserted before as they may
|
||||
// be generated independently before adding them to postings.
|
||||
// We repair order violations on insert. The invariant is that the first n-1
|
||||
|
@ -15,9 +15,12 @@ package tsdb
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/tsdb/labels"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@ -30,6 +33,31 @@ func TestMemPostings_addFor(t *testing.T) {
|
||||
require.Equal(t, []uint64{1, 2, 3, 4, 5, 6, 7, 8}, p.m[allPostingsKey])
|
||||
}
|
||||
|
||||
func TestMemPostings_ensureOrder(t *testing.T) {
|
||||
p := newUnorderedMemPostings()
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
l := make([]uint64, 100)
|
||||
for j := range l {
|
||||
l[j] = rand.Uint64()
|
||||
}
|
||||
v := fmt.Sprintf("%d", i)
|
||||
|
||||
p.m[labels.Label{"a", v}] = l
|
||||
}
|
||||
|
||||
p.ensureOrder()
|
||||
|
||||
for _, l := range p.m {
|
||||
ok := sort.SliceIsSorted(l, func(i, j int) bool {
|
||||
return l[i] < l[j]
|
||||
})
|
||||
if !ok {
|
||||
t.Fatalf("postings list %v is not sorted", l)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type mockPostings struct {
|
||||
next func() bool
|
||||
seek func(uint64) bool
|
||||
|
Loading…
Reference in New Issue
Block a user