Log sparse histograms into WAL and replay from it (#9191)

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2021-08-11 17:38:48 +05:30 committed by GitHub
parent 095f572d4a
commit f0688c21d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 281 additions and 31 deletions

View File

@ -30,9 +30,10 @@ import (
// neg bucket idx 3 2 1 0 -1 ...
// actively used bucket indices themselves are represented by the spans
type SparseHistogram struct {
Count, ZeroCount uint64
Sum, ZeroThreshold float64
Schema int32
ZeroThreshold float64
ZeroCount, Count uint64
Sum float64
PositiveSpans, NegativeSpans []Span
PositiveBuckets, NegativeBuckets []int64
}

View File

@ -179,9 +179,10 @@ func NewDecbufRaw(bs ByteSlice, length int) Decbuf {
return Decbuf{B: bs.Range(0, length)}
}
func (d *Decbuf) Uvarint() int { return int(d.Uvarint64()) }
func (d *Decbuf) Be32int() int { return int(d.Be32()) }
func (d *Decbuf) Be64int64() int64 { return int64(d.Be64()) }
func (d *Decbuf) Uvarint() int { return int(d.Uvarint64()) }
func (d *Decbuf) Uvarint32() uint32 { return uint32(d.Uvarint64()) }
func (d *Decbuf) Be32int() int { return int(d.Be32()) }
func (d *Decbuf) Be64int64() int64 { return int64(d.Be64()) }
// Crc32 returns a CRC32 checksum over the remaining bytes.
func (d *Decbuf) Crc32(castagnoliTable *crc32.Table) uint32 {

View File

@ -63,9 +63,6 @@ type Head struct {
lastWALTruncationTime atomic.Int64
lastMemoryTruncationTime atomic.Int64
lastSeriesID atomic.Uint64
// hasHistograms this is used to m-map all chunks in case there are histograms.
// A hack to avoid updating all the failing tests.
hasHistograms atomic.Bool
metrics *headMetrics
opts *HeadOptions
@ -1158,16 +1155,6 @@ func (h *Head) Close() error {
defer h.closedMtx.Unlock()
h.closed = true
// M-map all in-memory chunks.
// A hack for the histogram till it is stored in WAL and replayed.
if h.hasHistograms.Load() {
for _, m := range h.series.series {
for _, s := range m {
s.mmapCurrentHeadChunk(h.chunkDiskMapper)
}
}
}
errs := tsdb_errors.NewMulti(h.chunkDiskMapper.Close())
if errs.Err() == nil && h.opts.EnableMemorySnapshotOnShutdown {
errs.Add(h.performChunkSnapshot())

View File

@ -470,6 +470,13 @@ func (a *headAppender) log() error {
return errors.Wrap(err, "log exemplars")
}
}
if len(a.histograms) > 0 {
rec = enc.Histograms(a.histograms, buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return errors.Wrap(err, "log histograms")
}
}
return nil
}
@ -539,7 +546,6 @@ func (a *headAppender) Commit() (err error) {
for i, s := range a.histograms {
series = a.histogramSeries[i]
series.Lock()
a.head.hasHistograms.Store(true)
ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, a.head.chunkDiskMapper)
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false

View File

@ -2537,13 +2537,69 @@ func TestAppendHistogram(t *testing.T) {
}
}
func TestHistogramInWAL(t *testing.T) {
l := labels.Labels{{Name: "a", Value: "b"}}
numHistograms := 10
head, _ := newTestHead(t, 1000, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
require.NoError(t, head.Init(0))
app := head.Appender(context.Background())
type timedHist struct {
t int64
h histogram.SparseHistogram
}
expHists := make([]timedHist, 0, numHistograms)
for i, h := range generateHistograms(numHistograms) {
h.NegativeSpans = h.PositiveSpans
h.NegativeBuckets = h.PositiveBuckets
_, err := app.AppendHistogram(0, l, int64(i), h)
require.NoError(t, err)
expHists = append(expHists, timedHist{int64(i), h})
}
require.NoError(t, app.Commit())
// Restart head.
require.NoError(t, head.Close())
w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false)
require.NoError(t, err)
head, err = NewHead(nil, nil, w, head.opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(0))
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, q.Close())
})
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
require.True(t, ss.Next())
s := ss.At()
require.False(t, ss.Next())
it := s.Iterator()
actHists := make([]timedHist, 0, len(expHists))
for it.Next() {
t, h := it.AtHistogram()
actHists = append(actHists, timedHist{t, h.Copy()})
}
require.Equal(t, expHists, actHists)
}
func generateHistograms(n int) (r []histogram.SparseHistogram) {
for i := 0; i < n; i++ {
r = append(r, histogram.SparseHistogram{
Count: 5 + uint64(i*4),
ZeroCount: 2 + uint64(i),
Sum: 18.4 * float64(i+1),
Schema: 1,
Count: 5 + uint64(i*4),
ZeroCount: 2 + uint64(i),
ZeroThreshold: 0.001,
Sum: 18.4 * float64(i+1),
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},

View File

@ -46,16 +46,18 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks
// for error reporting.
var unknownRefs atomic.Uint64
var unknownExemplarRefs atomic.Uint64
var unknownHistogramRefs atomic.Uint64
// Start workers that each process samples for a partition of the series ID space.
// They are connected through a ring of channels which ensures that all sample batches
// read from the WAL are processed in order.
var (
wg sync.WaitGroup
n = runtime.GOMAXPROCS(0)
inputs = make([]chan []record.RefSample, n)
outputs = make([]chan []record.RefSample, n)
exemplarsInput chan record.RefExemplar
wg sync.WaitGroup
n = runtime.GOMAXPROCS(0)
inputs = make([]chan []record.RefSample, n)
outputs = make([]chan []record.RefSample, n)
exemplarsInput chan record.RefExemplar
histogramsInput chan record.RefHistogram
dec record.Decoder
shards = make([][]record.RefSample, n)
@ -82,6 +84,11 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks
return []record.RefExemplar{}
},
}
histogramsPool = sync.Pool{
New: func() interface{} {
return []record.RefHistogram{}
},
}
)
defer func() {
@ -94,6 +101,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks
}
}
close(exemplarsInput)
close(histogramsInput)
wg.Wait()
}
}()
@ -133,6 +141,42 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks
}
}(exemplarsInput)
wg.Add(1)
histogramsInput = make(chan record.RefHistogram, 300)
go func(input <-chan record.RefHistogram) {
defer wg.Done()
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
for rh := range input {
ms := h.series.getByID(rh.Ref)
if ms == nil {
unknownHistogramRefs.Inc()
continue
}
if rh.T < h.minValidTime.Load() {
continue
}
// At the moment the only possible error here is out of order exemplars, which we shouldn't see when
// replaying the WAL, so lets just log the error if it's not that type.
_, chunkCreated := ms.appendHistogram(rh.T, rh.H, 0, h.chunkDiskMapper)
if chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
}
if rh.T > maxt {
maxt = rh.T
}
if rh.T < mint {
mint = rh.T
}
}
h.updateMinMaxTime(mint, maxt)
}(histogramsInput)
go func() {
defer close(decoded)
for r.Next() {
@ -186,6 +230,18 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks
return
}
decoded <- exemplars
case record.Histograms:
hists := histogramsPool.Get().([]record.RefHistogram)[:0]
hists, err = dec.Histograms(rec, hists)
if err != nil {
decodeErr = &wal.CorruptionErr{
Err: errors.Wrap(err, "decode histograms"),
Segment: r.Segment(),
Offset: r.Offset(),
}
return
}
decoded <- hists
default:
// Noop.
}
@ -319,6 +375,13 @@ Outer:
}
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
exemplarsPool.Put(v)
case []record.RefHistogram:
// TODO: split into multiple slices and have multiple workers processing the histograms like we do for samples.
for _, rh := range v {
histogramsInput <- rh
}
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
histogramsPool.Put(v)
default:
panic(fmt.Errorf("unexpected decoded type: %T", d))
}
@ -341,14 +404,15 @@ Outer:
}
}
close(exemplarsInput)
close(histogramsInput)
wg.Wait()
if r.Err() != nil {
return errors.Wrap(r.Err(), "read records")
}
if unknownRefs.Load() > 0 || unknownExemplarRefs.Load() > 0 {
level.Warn(h.logger).Log("msg", "Unknown series references", "samples", unknownRefs.Load(), "exemplars", unknownExemplarRefs.Load())
if unknownRefs.Load() > 0 || unknownExemplarRefs.Load() > 0 || unknownHistogramRefs.Load() > 0 {
level.Warn(h.logger).Log("msg", "Unknown series references", "samples", unknownRefs.Load(), "exemplars", unknownExemplarRefs.Load(), "histograms", unknownHistogramRefs.Load())
}
return nil
}

View File

@ -40,6 +40,8 @@ const (
Tombstones Type = 3
// Exemplars is used to match WAL records of type Exemplars.
Exemplars Type = 4
// Histograms is used to match WAL records of type Histograms.
Histograms Type = 5
)
var (
@ -87,7 +89,7 @@ func (d *Decoder) Type(rec []byte) Type {
return Unknown
}
switch t := Type(rec[0]); t {
case Series, Samples, Tombstones, Exemplars:
case Series, Samples, Tombstones, Exemplars, Histograms:
return t
}
return Unknown
@ -226,6 +228,88 @@ func (d *Decoder) Exemplars(rec []byte, exemplars []RefExemplar) ([]RefExemplar,
return exemplars, nil
}
func (d *Decoder) Histograms(rec []byte, hists []RefHistogram) ([]RefHistogram, error) {
dec := encoding.Decbuf{B: rec}
t := Type(dec.Byte())
if t != Histograms {
return nil, errors.New("invalid record type")
}
if dec.Len() == 0 {
return hists, nil
}
var (
baseRef = dec.Be64()
baseTime = dec.Be64int64()
)
for len(dec.B) > 0 && dec.Err() == nil {
dref := dec.Varint64()
dtime := dec.Varint64()
rh := RefHistogram{
Ref: baseRef + uint64(dref),
T: baseTime + dtime,
H: histogram.SparseHistogram{
Schema: 0,
ZeroThreshold: 0,
ZeroCount: 0,
Count: 0,
Sum: 0,
},
}
rh.H.Schema = int32(dec.Varint64())
rh.H.ZeroThreshold = math.Float64frombits(dec.Be64())
rh.H.ZeroCount = dec.Uvarint64()
rh.H.Count = dec.Uvarint64()
rh.H.Sum = math.Float64frombits(dec.Be64())
l := dec.Uvarint()
if l > 0 {
rh.H.PositiveSpans = make([]histogram.Span, l)
}
for i := range rh.H.PositiveSpans {
rh.H.PositiveSpans[i].Offset = int32(dec.Varint64())
rh.H.PositiveSpans[i].Length = dec.Uvarint32()
}
l = dec.Uvarint()
if l > 0 {
rh.H.NegativeSpans = make([]histogram.Span, l)
}
for i := range rh.H.NegativeSpans {
rh.H.NegativeSpans[i].Offset = int32(dec.Varint64())
rh.H.NegativeSpans[i].Length = dec.Uvarint32()
}
l = dec.Uvarint()
if l > 0 {
rh.H.PositiveBuckets = make([]int64, l)
}
for i := range rh.H.PositiveBuckets {
rh.H.PositiveBuckets[i] = dec.Varint64()
}
l = dec.Uvarint()
if l > 0 {
rh.H.NegativeBuckets = make([]int64, l)
}
for i := range rh.H.NegativeBuckets {
rh.H.NegativeBuckets[i] = dec.Varint64()
}
hists = append(hists, rh)
}
if dec.Err() != nil {
return nil, errors.Wrapf(dec.Err(), "decode error after %d histograms", len(hists))
}
if len(dec.B) > 0 {
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
}
return hists, nil
}
// Encoder encodes series, sample, and tombstones records.
// The zero value is ready to use.
type Encoder struct {
@ -316,3 +400,54 @@ func (e *Encoder) Exemplars(exemplars []RefExemplar, b []byte) []byte {
return buf.Get()
}
func (e *Encoder) Histograms(hists []RefHistogram, b []byte) []byte {
buf := encoding.Encbuf{B: b}
buf.PutByte(byte(Histograms))
if len(hists) == 0 {
return buf.Get()
}
// Store base timestamp and base reference number of first histogram.
// All histograms encode their timestamp and ref as delta to those.
first := hists[0]
buf.PutBE64(first.Ref)
buf.PutBE64int64(first.T)
for _, h := range hists {
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
buf.PutVarint64(h.T - first.T)
buf.PutVarint64(int64(h.H.Schema))
buf.PutBE64(math.Float64bits(h.H.ZeroThreshold))
buf.PutUvarint64(h.H.ZeroCount)
buf.PutUvarint64(h.H.Count)
buf.PutBE64(math.Float64bits(h.H.Sum))
buf.PutUvarint(len(h.H.PositiveSpans))
for _, s := range h.H.PositiveSpans {
buf.PutVarint64(int64(s.Offset))
buf.PutUvarint32(s.Length)
}
buf.PutUvarint(len(h.H.NegativeSpans))
for _, s := range h.H.NegativeSpans {
buf.PutVarint64(int64(s.Offset))
buf.PutUvarint32(s.Length)
}
buf.PutUvarint(len(h.H.PositiveBuckets))
for _, b := range h.H.PositiveBuckets {
buf.PutVarint64(b)
}
buf.PutUvarint(len(h.H.NegativeBuckets))
for _, b := range h.H.NegativeBuckets {
buf.PutVarint64(b)
}
}
return buf.Get()
}