wal: parallelize sample processing
This commit is contained in:
parent
d3682d701c
commit
7efb830d70
109
head.go
109
head.go
|
@ -15,6 +15,7 @@ package tsdb
|
|||
|
||||
import (
|
||||
"math"
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
@ -186,29 +187,19 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
|
|||
return h, nil
|
||||
}
|
||||
|
||||
// ReadWAL initializes the head by consuming the write ahead log.
|
||||
func (h *Head) ReadWAL() error {
|
||||
defer h.postings.ensureOrder()
|
||||
// processWALSamples adds a partition of samples it receives to the head and passes
|
||||
// them on to other workers.
|
||||
// Samples before the mint timestamp are discarded.
|
||||
func (h *Head) processWALSamples(
|
||||
mint int64,
|
||||
partition, total uint64,
|
||||
input <-chan []RefSample, output chan<- []RefSample,
|
||||
) (unknownRefs uint64) {
|
||||
defer close(output)
|
||||
|
||||
r := h.wal.Reader()
|
||||
mint := h.MinTime()
|
||||
|
||||
// Track number of samples that referenced a series we don't know about
|
||||
// for error reporting.
|
||||
var unknownRefs int
|
||||
|
||||
seriesFunc := func(series []RefSeries) {
|
||||
for _, s := range series {
|
||||
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
|
||||
|
||||
if h.lastSeriesID < s.Ref {
|
||||
h.lastSeriesID = s.Ref
|
||||
}
|
||||
}
|
||||
}
|
||||
samplesFunc := func(samples []RefSample) {
|
||||
for samples := range input {
|
||||
for _, s := range samples {
|
||||
if s.T < mint {
|
||||
if s.T < mint || s.Ref%total != partition {
|
||||
continue
|
||||
}
|
||||
ms := h.series.getByID(s.Ref)
|
||||
|
@ -222,6 +213,63 @@ func (h *Head) ReadWAL() error {
|
|||
h.metrics.chunks.Inc()
|
||||
}
|
||||
}
|
||||
output <- samples
|
||||
}
|
||||
return unknownRefs
|
||||
}
|
||||
|
||||
// ReadWAL initializes the head by consuming the write ahead log.
|
||||
func (h *Head) ReadWAL() error {
|
||||
defer h.postings.ensureOrder()
|
||||
|
||||
r := h.wal.Reader()
|
||||
mint := h.MinTime()
|
||||
|
||||
// Track number of samples that referenced a series we don't know about
|
||||
// for error reporting.
|
||||
var unknownRefs uint64
|
||||
|
||||
// Start workers that each process samples for a partition of the series ID space.
|
||||
// They are connected through a ring of channels which ensures that all sample batches
|
||||
// read from the WAL are processed in order.
|
||||
var (
|
||||
n = runtime.GOMAXPROCS(0)
|
||||
firstInput = make(chan []RefSample, 300)
|
||||
input = firstInput
|
||||
)
|
||||
for i := 0; i < n; i++ {
|
||||
output := make(chan []RefSample, 300)
|
||||
|
||||
go func(i int, input <-chan []RefSample, output chan<- []RefSample) {
|
||||
unknown := h.processWALSamples(mint, uint64(i), uint64(n), input, output)
|
||||
atomic.AddUint64(&unknownRefs, unknown)
|
||||
}(i, input, output)
|
||||
|
||||
// The output feeds the next worker goroutine. For the last worker,
|
||||
// it feeds the initial input again to reuse the RefSample slices.
|
||||
input = output
|
||||
}
|
||||
|
||||
// TODO(fabxc): series entries spread between samples can starve the sample workers.
|
||||
// Even with bufferd channels, this can impact startup time with lots of series churn.
|
||||
// We must not pralellize series creation itself but could make the indexing asynchronous.
|
||||
seriesFunc := func(series []RefSeries) {
|
||||
for _, s := range series {
|
||||
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
|
||||
|
||||
if h.lastSeriesID < s.Ref {
|
||||
h.lastSeriesID = s.Ref
|
||||
}
|
||||
}
|
||||
}
|
||||
samplesFunc := func(samples []RefSample) {
|
||||
var buf []RefSample
|
||||
select {
|
||||
case buf = <-input:
|
||||
default:
|
||||
buf = make([]RefSample, 0, len(samples)*11/10)
|
||||
}
|
||||
firstInput <- append(buf[:0], samples...)
|
||||
}
|
||||
deletesFunc := func(stones []Stone) {
|
||||
for _, s := range stones {
|
||||
|
@ -234,13 +282,18 @@ func (h *Head) ReadWAL() error {
|
|||
}
|
||||
}
|
||||
|
||||
err := r.Read(seriesFunc, samplesFunc, deletesFunc)
|
||||
|
||||
// Signal termination to first worker and wait for last one to close its output channel.
|
||||
close(firstInput)
|
||||
for range input {
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "consume WAL")
|
||||
}
|
||||
if unknownRefs > 0 {
|
||||
level.Warn(h.logger).Log("msg", "unknown series references in WAL samples", "count", unknownRefs)
|
||||
}
|
||||
|
||||
if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil {
|
||||
return errors.Wrap(err, "consume WAL")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1168,10 +1221,12 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
|
|||
c = s.cut(t)
|
||||
chunkCreated = true
|
||||
}
|
||||
numSamples := c.chunk.NumSamples()
|
||||
|
||||
if c.maxTime >= t {
|
||||
return false, chunkCreated
|
||||
}
|
||||
if c.chunk.NumSamples() > samplesPerChunk/4 && t >= s.nextAt {
|
||||
if numSamples > samplesPerChunk/4 && t >= s.nextAt {
|
||||
c = s.cut(t)
|
||||
chunkCreated = true
|
||||
}
|
||||
|
@ -1179,7 +1234,7 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
|
|||
|
||||
c.maxTime = t
|
||||
|
||||
if c.chunk.NumSamples() == samplesPerChunk/4 {
|
||||
if numSamples == samplesPerChunk/4 {
|
||||
_, maxt := rangeForTimestamp(c.minTime, s.chunkRange)
|
||||
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxt)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue