tsdb: Support gauge float histogram with recoding of chunk

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2023-01-04 15:43:37 +05:30
parent 8ad0d2d5d7
commit 609b12d719
No known key found for this signature in database
GPG Key ID: F056451B52F1DC34
2 changed files with 75 additions and 17 deletions

View File

@ -174,6 +174,7 @@ func newFloatHistogramIterator(b []byte) *floatHistogramIterator {
// The first 3 bytes contain chunk headers. // The first 3 bytes contain chunk headers.
// We skip that for actual samples. // We skip that for actual samples.
_, _ = it.br.readBits(24) _, _ = it.br.readBits(24)
it.counterResetHeader = CounterResetHeader(b[2] & 0b11000000)
return it return it
} }
@ -196,6 +197,14 @@ type FloatHistogramAppender struct {
pBuckets, nBuckets []xorValue pBuckets, nBuckets []xorValue
} }
func (a *FloatHistogramAppender) GetCounterResetHeader() CounterResetHeader {
return CounterResetHeader(a.b.bytes()[2] & 0b11000000)
}
func (a *FloatHistogramAppender) NumSamples() int {
return int(binary.BigEndian.Uint16(a.b.bytes()))
}
// Append implements Appender. This implementation panics because normal float // Append implements Appender. This implementation panics because normal float
// samples must never be appended to a histogram chunk. // samples must never be appended to a histogram chunk.
func (a *FloatHistogramAppender) Append(int64, float64) { func (a *FloatHistogramAppender) Append(int64, float64) {
@ -211,6 +220,7 @@ func (a *FloatHistogramAppender) AppendHistogram(int64, *histogram.Histogram) {
// Appendable returns whether the chunk can be appended to, and if so // Appendable returns whether the chunk can be appended to, and if so
// whether any recoding needs to happen using the provided interjections // whether any recoding needs to happen using the provided interjections
// (in case of any new buckets, positive or negative range, respectively). // (in case of any new buckets, positive or negative range, respectively).
// If the sample is a gauge histogram, AppendableGauge must be used instead.
// //
// The chunk is not appendable in the following cases: // The chunk is not appendable in the following cases:
// //
@ -228,6 +238,9 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) (
positiveInterjections, negativeInterjections []Interjection, positiveInterjections, negativeInterjections []Interjection,
okToAppend, counterReset bool, okToAppend, counterReset bool,
) { ) {
if a.NumSamples() > 0 && a.GetCounterResetHeader() == GaugeType {
return
}
if value.IsStaleNaN(h.Sum) { if value.IsStaleNaN(h.Sum) {
// This is a stale sample whose buckets and spans don't matter. // This is a stale sample whose buckets and spans don't matter.
okToAppend = true okToAppend = true
@ -284,17 +297,21 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) (
// 2. Any recoding needs to happen for the histogram being appended, using the backward interjections // 2. Any recoding needs to happen for the histogram being appended, using the backward interjections
// (in case of any missing buckets, positive or negative range, respectively). // (in case of any missing buckets, positive or negative range, respectively).
// //
// The chunk is not appendable in the following cases: // This method must be only used for gauge histograms.
// //
// • The schema has changed. // The chunk is not appendable in the following cases:
// • The threshold for the zero bucket has changed. // - The schema has changed.
// • The last sample in the chunk was stale while the current sample is not stale. // - The threshold for the zero bucket has changed.
// - The last sample in the chunk was stale while the current sample is not stale.
func (a *FloatHistogramAppender) AppendableGauge(h *histogram.FloatHistogram) ( func (a *FloatHistogramAppender) AppendableGauge(h *histogram.FloatHistogram) (
positiveInterjections, negativeInterjections []Interjection, positiveInterjections, negativeInterjections []Interjection,
backwardPositiveInterjections, backwardNegativeInterjections []Interjection, backwardPositiveInterjections, backwardNegativeInterjections []Interjection,
positiveSpans, negativeSpans []histogram.Span, positiveSpans, negativeSpans []histogram.Span,
okToAppend bool, okToAppend bool,
) { ) {
if a.NumSamples() > 0 && a.GetCounterResetHeader() != GaugeType {
return
}
if value.IsStaleNaN(h.Sum) { if value.IsStaleNaN(h.Sum) {
// This is a stale sample whose buckets and spans don't matter. // This is a stale sample whose buckets and spans don't matter.
okToAppend = true okToAppend = true
@ -537,11 +554,29 @@ func (a *FloatHistogramAppender) Recode(
return hc, app return hc, app
} }
// RecodeHistogramm converts the current histogram (in-place) to accommodate an expansion of the set of
// (positive and/or negative) buckets used.
func (a *FloatHistogramAppender) RecodeHistogramm(
fh *histogram.FloatHistogram,
pBackwardInter, nBackwardInter []Interjection,
) {
if len(pBackwardInter) > 0 {
numPositiveBuckets := countSpans(fh.PositiveSpans)
fh.PositiveBuckets = interject(fh.PositiveBuckets, make([]float64, numPositiveBuckets), pBackwardInter, false)
}
if len(nBackwardInter) > 0 {
numNegativeBuckets := countSpans(fh.NegativeSpans)
fh.NegativeBuckets = interject(fh.NegativeBuckets, make([]float64, numNegativeBuckets), nBackwardInter, false)
}
}
type floatHistogramIterator struct { type floatHistogramIterator struct {
br bstreamReader br bstreamReader
numTotal uint16 numTotal uint16
numRead uint16 numRead uint16
counterResetHeader CounterResetHeader
// Layout: // Layout:
schema int32 schema int32
zThreshold float64 zThreshold float64
@ -594,16 +629,21 @@ func (it *floatHistogramIterator) AtFloatHistogram() (int64, *histogram.FloatHis
return it.t, &histogram.FloatHistogram{Sum: it.sum.value} return it.t, &histogram.FloatHistogram{Sum: it.sum.value}
} }
it.atFloatHistogramCalled = true it.atFloatHistogramCalled = true
crHint := histogram.UnknownCounterReset
if it.counterResetHeader == GaugeType {
crHint = histogram.GaugeType
}
return it.t, &histogram.FloatHistogram{ return it.t, &histogram.FloatHistogram{
Count: it.cnt.value, CounterResetHint: crHint,
ZeroCount: it.zCnt.value, Count: it.cnt.value,
Sum: it.sum.value, ZeroCount: it.zCnt.value,
ZeroThreshold: it.zThreshold, Sum: it.sum.value,
Schema: it.schema, ZeroThreshold: it.zThreshold,
PositiveSpans: it.pSpans, Schema: it.schema,
NegativeSpans: it.nSpans, PositiveSpans: it.pSpans,
PositiveBuckets: it.pBuckets, NegativeSpans: it.nSpans,
NegativeBuckets: it.nBuckets, PositiveBuckets: it.pBuckets,
NegativeBuckets: it.nBuckets,
} }
} }
@ -622,6 +662,8 @@ func (it *floatHistogramIterator) Reset(b []byte) {
it.numTotal = binary.BigEndian.Uint16(b) it.numTotal = binary.BigEndian.Uint16(b)
it.numRead = 0 it.numRead = 0
it.counterResetHeader = CounterResetHeader(b[2] & 0b11000000)
it.t, it.tDelta = 0, 0 it.t, it.tDelta = 0, 0
it.cnt, it.zCnt, it.sum = xorValue{}, xorValue{}, xorValue{} it.cnt, it.zCnt, it.sum = xorValue{}, xorValue{}, xorValue{}

View File

@ -1138,9 +1138,10 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
// appendHistogram adds the histogram. // appendHistogram adds the histogram.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
// TODO(codesome): Support gauge histograms here.
func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) { func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) {
// Head controls the execution of recoding, so that we own the proper // Head controls the execution of recoding, so that we own the proper
// chunk reference afterwards. We check for Appendable before // chunk reference afterwards. We check for Appendable from appender before
// appendPreprocessor because in case it ends up creating a new chunk, // appendPreprocessor because in case it ends up creating a new chunk,
// we need to know if there was also a counter reset or not to set the // we need to know if there was also a counter reset or not to set the
// meta properly. // meta properly.
@ -1209,24 +1210,37 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) { func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) {
// Head controls the execution of recoding, so that we own the proper // Head controls the execution of recoding, so that we own the proper
// chunk reference afterwards. We check for Appendable before // chunk reference afterwards. We check for Appendable from appender before
// appendPreprocessor because in case it ends up creating a new chunk, // appendPreprocessor because in case it ends up creating a new chunk,
// we need to know if there was also a counter reset or not to set the // we need to know if there was also a counter reset or not to set the
// meta properly. // meta properly.
app, _ := s.app.(*chunkenc.FloatHistogramAppender) app, _ := s.app.(*chunkenc.FloatHistogramAppender)
var ( var (
positiveInterjections, negativeInterjections []chunkenc.Interjection positiveInterjections, negativeInterjections []chunkenc.Interjection
pBackwardInter, nBackwardInter []chunkenc.Interjection
pMergedSpans, nMergedSpans []histogram.Span
okToAppend, counterReset bool okToAppend, counterReset bool
) )
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange) c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange)
if !sampleInOrder { if !sampleInOrder {
return sampleInOrder, chunkCreated return sampleInOrder, chunkCreated
} }
gauge := fh.CounterResetHint == histogram.GaugeType
if app != nil { if app != nil {
positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(fh) if gauge {
positiveInterjections, negativeInterjections, pBackwardInter, nBackwardInter,
pMergedSpans, nMergedSpans, okToAppend = app.AppendableGauge(fh)
} else {
positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(fh)
}
} }
if !chunkCreated { if !chunkCreated {
if len(pBackwardInter)+len(nBackwardInter) > 0 {
fh.PositiveSpans = pMergedSpans
fh.NegativeSpans = nMergedSpans
app.RecodeHistogramm(fh, pBackwardInter, nBackwardInter)
}
// We have 3 cases here // We have 3 cases here
// - !okToAppend -> We need to cut a new chunk. // - !okToAppend -> We need to cut a new chunk.
// - okToAppend but we have interjections → Existing chunk needs // - okToAppend but we have interjections → Existing chunk needs
@ -1251,7 +1265,9 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
if chunkCreated { if chunkCreated {
hc := s.headChunk.chunk.(*chunkenc.FloatHistogramChunk) hc := s.headChunk.chunk.(*chunkenc.FloatHistogramChunk)
header := chunkenc.UnknownCounterReset header := chunkenc.UnknownCounterReset
if counterReset { if gauge {
header = chunkenc.GaugeType
} else if counterReset {
header = chunkenc.CounterReset header = chunkenc.CounterReset
} else if okToAppend { } else if okToAppend {
header = chunkenc.NotCounterReset header = chunkenc.NotCounterReset