From 609b12d719f29f2783059e480fe19d599a98d2bf Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 4 Jan 2023 15:43:37 +0530 Subject: [PATCH] tsdb: Support gauge float histogram with recoding of chunk Signed-off-by: Ganesh Vernekar --- tsdb/chunkenc/float_histogram.go | 68 ++++++++++++++++++++++++++------ tsdb/head_append.go | 24 +++++++++-- 2 files changed, 75 insertions(+), 17 deletions(-) diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 35b0e889e..10b20a1ca 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -174,6 +174,7 @@ func newFloatHistogramIterator(b []byte) *floatHistogramIterator { // The first 3 bytes contain chunk headers. // We skip that for actual samples. _, _ = it.br.readBits(24) + it.counterResetHeader = CounterResetHeader(b[2] & 0b11000000) return it } @@ -196,6 +197,14 @@ type FloatHistogramAppender struct { pBuckets, nBuckets []xorValue } +func (a *FloatHistogramAppender) GetCounterResetHeader() CounterResetHeader { + return CounterResetHeader(a.b.bytes()[2] & 0b11000000) +} + +func (a *FloatHistogramAppender) NumSamples() int { + return int(binary.BigEndian.Uint16(a.b.bytes())) +} + // Append implements Appender. This implementation panics because normal float // samples must never be appended to a histogram chunk. func (a *FloatHistogramAppender) Append(int64, float64) { @@ -211,6 +220,7 @@ func (a *FloatHistogramAppender) AppendHistogram(int64, *histogram.Histogram) { // Appendable returns whether the chunk can be appended to, and if so // whether any recoding needs to happen using the provided interjections // (in case of any new buckets, positive or negative range, respectively). +// If the sample is a gauge histogram, AppendableGauge must be used instead. // // The chunk is not appendable in the following cases: // @@ -228,6 +238,9 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) ( positiveInterjections, negativeInterjections []Interjection, okToAppend, counterReset bool, ) { + if a.NumSamples() > 0 && a.GetCounterResetHeader() == GaugeType { + return + } if value.IsStaleNaN(h.Sum) { // This is a stale sample whose buckets and spans don't matter. okToAppend = true @@ -284,17 +297,21 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) ( // 2. Any recoding needs to happen for the histogram being appended, using the backward interjections // (in case of any missing buckets, positive or negative range, respectively). // -// The chunk is not appendable in the following cases: +// This method must be only used for gauge histograms. // -// • The schema has changed. -// • The threshold for the zero bucket has changed. -// • The last sample in the chunk was stale while the current sample is not stale. +// The chunk is not appendable in the following cases: +// - The schema has changed. +// - The threshold for the zero bucket has changed. +// - The last sample in the chunk was stale while the current sample is not stale. func (a *FloatHistogramAppender) AppendableGauge(h *histogram.FloatHistogram) ( positiveInterjections, negativeInterjections []Interjection, backwardPositiveInterjections, backwardNegativeInterjections []Interjection, positiveSpans, negativeSpans []histogram.Span, okToAppend bool, ) { + if a.NumSamples() > 0 && a.GetCounterResetHeader() != GaugeType { + return + } if value.IsStaleNaN(h.Sum) { // This is a stale sample whose buckets and spans don't matter. okToAppend = true @@ -537,11 +554,29 @@ func (a *FloatHistogramAppender) Recode( return hc, app } +// RecodeHistogramm converts the current histogram (in-place) to accommodate an expansion of the set of +// (positive and/or negative) buckets used. +func (a *FloatHistogramAppender) RecodeHistogramm( + fh *histogram.FloatHistogram, + pBackwardInter, nBackwardInter []Interjection, +) { + if len(pBackwardInter) > 0 { + numPositiveBuckets := countSpans(fh.PositiveSpans) + fh.PositiveBuckets = interject(fh.PositiveBuckets, make([]float64, numPositiveBuckets), pBackwardInter, false) + } + if len(nBackwardInter) > 0 { + numNegativeBuckets := countSpans(fh.NegativeSpans) + fh.NegativeBuckets = interject(fh.NegativeBuckets, make([]float64, numNegativeBuckets), nBackwardInter, false) + } +} + type floatHistogramIterator struct { br bstreamReader numTotal uint16 numRead uint16 + counterResetHeader CounterResetHeader + // Layout: schema int32 zThreshold float64 @@ -594,16 +629,21 @@ func (it *floatHistogramIterator) AtFloatHistogram() (int64, *histogram.FloatHis return it.t, &histogram.FloatHistogram{Sum: it.sum.value} } it.atFloatHistogramCalled = true + crHint := histogram.UnknownCounterReset + if it.counterResetHeader == GaugeType { + crHint = histogram.GaugeType + } return it.t, &histogram.FloatHistogram{ - Count: it.cnt.value, - ZeroCount: it.zCnt.value, - Sum: it.sum.value, - ZeroThreshold: it.zThreshold, - Schema: it.schema, - PositiveSpans: it.pSpans, - NegativeSpans: it.nSpans, - PositiveBuckets: it.pBuckets, - NegativeBuckets: it.nBuckets, + CounterResetHint: crHint, + Count: it.cnt.value, + ZeroCount: it.zCnt.value, + Sum: it.sum.value, + ZeroThreshold: it.zThreshold, + Schema: it.schema, + PositiveSpans: it.pSpans, + NegativeSpans: it.nSpans, + PositiveBuckets: it.pBuckets, + NegativeBuckets: it.nBuckets, } } @@ -622,6 +662,8 @@ func (it *floatHistogramIterator) Reset(b []byte) { it.numTotal = binary.BigEndian.Uint16(b) it.numRead = 0 + it.counterResetHeader = CounterResetHeader(b[2] & 0b11000000) + it.t, it.tDelta = 0, 0 it.cnt, it.zCnt, it.sum = xorValue{}, xorValue{}, xorValue{} diff --git a/tsdb/head_append.go b/tsdb/head_append.go index c09027ae4..0785e99e8 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1138,9 +1138,10 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper // appendHistogram adds the histogram. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. +// TODO(codesome): Support gauge histograms here. func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) { // Head controls the execution of recoding, so that we own the proper - // chunk reference afterwards. We check for Appendable before + // chunk reference afterwards. We check for Appendable from appender before // appendPreprocessor because in case it ends up creating a new chunk, // we need to know if there was also a counter reset or not to set the // meta properly. @@ -1209,24 +1210,37 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) { // Head controls the execution of recoding, so that we own the proper - // chunk reference afterwards. We check for Appendable before + // chunk reference afterwards. We check for Appendable from appender before // appendPreprocessor because in case it ends up creating a new chunk, // we need to know if there was also a counter reset or not to set the // meta properly. app, _ := s.app.(*chunkenc.FloatHistogramAppender) var ( positiveInterjections, negativeInterjections []chunkenc.Interjection + pBackwardInter, nBackwardInter []chunkenc.Interjection + pMergedSpans, nMergedSpans []histogram.Span okToAppend, counterReset bool ) c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange) if !sampleInOrder { return sampleInOrder, chunkCreated } + gauge := fh.CounterResetHint == histogram.GaugeType if app != nil { - positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(fh) + if gauge { + positiveInterjections, negativeInterjections, pBackwardInter, nBackwardInter, + pMergedSpans, nMergedSpans, okToAppend = app.AppendableGauge(fh) + } else { + positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(fh) + } } if !chunkCreated { + if len(pBackwardInter)+len(nBackwardInter) > 0 { + fh.PositiveSpans = pMergedSpans + fh.NegativeSpans = nMergedSpans + app.RecodeHistogramm(fh, pBackwardInter, nBackwardInter) + } // We have 3 cases here // - !okToAppend -> We need to cut a new chunk. // - okToAppend but we have interjections → Existing chunk needs @@ -1251,7 +1265,9 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, if chunkCreated { hc := s.headChunk.chunk.(*chunkenc.FloatHistogramChunk) header := chunkenc.UnknownCounterReset - if counterReset { + if gauge { + header = chunkenc.GaugeType + } else if counterReset { header = chunkenc.CounterReset } else if okToAppend { header = chunkenc.NotCounterReset