diff --git a/model/textparse/openmetricsparse.go b/model/textparse/openmetricsparse.go index 3fc80b5d62..15a95a9592 100644 --- a/model/textparse/openmetricsparse.go +++ b/model/textparse/openmetricsparse.go @@ -113,8 +113,8 @@ func (p *OpenMetricsParser) Series() ([]byte, *int64, float64) { return p.series, nil, p.val } -// Histogram always returns (nil, nil, nil, nil) because OpenMetrics does not support -// sparse histograms. +// Histogram returns (nil, nil, nil, nil) for now because OpenMetrics does not +// support sparse histograms yet. func (p *OpenMetricsParser) Histogram() ([]byte, *int64, *histogram.Histogram, *histogram.FloatHistogram) { return nil, nil, nil, nil } diff --git a/model/textparse/promparse.go b/model/textparse/promparse.go index b137e53056..b0c963392d 100644 --- a/model/textparse/promparse.go +++ b/model/textparse/promparse.go @@ -168,8 +168,8 @@ func (p *PromParser) Series() ([]byte, *int64, float64) { return p.series, nil, p.val } -// Histogram always returns (nil, nil, nil, nil) because the Prometheus text format -// does not support sparse histograms. +// Histogram returns (nil, nil, nil, nil) for now because the Prometheus text +// format does not support sparse histograms yet. func (p *PromParser) Histogram() ([]byte, *int64, *histogram.Histogram, *histogram.FloatHistogram) { return nil, nil, nil, nil } diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index 9b26e5472b..b962e45a4a 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -177,6 +177,7 @@ func newHistogramIterator(b []byte) *histogramIterator { // The first 3 bytes contain chunk headers. // We skip that for actual samples. _, _ = it.br.readBits(24) + it.counterResetHeader = CounterResetHeader(b[2] & 0b11000000) return it } @@ -222,6 +223,14 @@ type HistogramAppender struct { trailing uint8 } +func (a *HistogramAppender) GetCounterResetHeader() CounterResetHeader { + return CounterResetHeader(a.b.bytes()[2] & 0b11000000) +} + +func (a *HistogramAppender) NumSamples() int { + return int(binary.BigEndian.Uint16(a.b.bytes())) +} + // Append implements Appender. This implementation panics because normal float // samples must never be appended to a histogram chunk. func (a *HistogramAppender) Append(int64, float64) { @@ -237,19 +246,16 @@ func (a *HistogramAppender) AppendFloatHistogram(int64, *histogram.FloatHistogra // Appendable returns whether the chunk can be appended to, and if so // whether any recoding needs to happen using the provided interjections // (in case of any new buckets, positive or negative range, respectively). +// If the sample is a gauge histogram, AppendableGauge must be used instead. // // The chunk is not appendable in the following cases: // -// • The schema has changed. -// -// • The threshold for the zero bucket has changed. -// -// • Any buckets have disappeared. -// -// • There was a counter reset in the count of observations or in any bucket, -// including the zero bucket. -// -// • The last sample in the chunk was stale while the current sample is not stale. +// - The schema has changed. +// - The threshold for the zero bucket has changed. +// - Any buckets have disappeared. +// - There was a counter reset in the count of observations or in any bucket, +// including the zero bucket. +// - The last sample in the chunk was stale while the current sample is not stale. // // The method returns an additional boolean set to true if it is not appendable // because of a counter reset. If the given sample is stale, it is always ok to @@ -258,6 +264,9 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) ( positiveInterjections, negativeInterjections []Interjection, okToAppend, counterReset bool, ) { + if a.NumSamples() > 0 && a.GetCounterResetHeader() == GaugeType { + return + } if value.IsStaleNaN(h.Sum) { // This is a stale sample whose buckets and spans don't matter. okToAppend = true @@ -307,8 +316,47 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) ( return } -type bucketValue interface { - int64 | float64 +// AppendableGauge returns whether the chunk can be appended to, and if so +// whether: +// 1. Any recoding needs to happen to the chunk using the provided interjections +// (in case of any new buckets, positive or negative range, respectively). +// 2. Any recoding needs to happen for the histogram being appended, using the backward interjections +// (in case of any missing buckets, positive or negative range, respectively). +// +// This method must be only used for gauge histograms. +// +// The chunk is not appendable in the following cases: +// - The schema has changed. +// - The threshold for the zero bucket has changed. +// - The last sample in the chunk was stale while the current sample is not stale. +func (a *HistogramAppender) AppendableGauge(h *histogram.Histogram) ( + positiveInterjections, negativeInterjections []Interjection, + backwardPositiveInterjections, backwardNegativeInterjections []Interjection, + positiveSpans, negativeSpans []histogram.Span, + okToAppend bool, +) { + if a.NumSamples() > 0 && a.GetCounterResetHeader() != GaugeType { + return + } + if value.IsStaleNaN(h.Sum) { + // This is a stale sample whose buckets and spans don't matter. + okToAppend = true + return + } + if value.IsStaleNaN(a.sum) { + // If the last sample was stale, then we can only accept stale + // samples in this chunk. + return + } + + if h.Schema != a.schema || h.ZeroThreshold != a.zThreshold { + return + } + + positiveInterjections, backwardPositiveInterjections, positiveSpans = bidirectionalCompareSpans(a.pSpans, h.PositiveSpans) + negativeInterjections, backwardNegativeInterjections, negativeSpans = bidirectionalCompareSpans(a.nSpans, h.NegativeSpans) + okToAppend = true + return } // counterResetInAnyBucket returns true if there was a counter reset for any @@ -542,6 +590,22 @@ func (a *HistogramAppender) Recode( return hc, app } +// RecodeHistogramm converts the current histogram (in-place) to accommodate an expansion of the set of +// (positive and/or negative) buckets used. +func (a *HistogramAppender) RecodeHistogramm( + h *histogram.Histogram, + pBackwardInter, nBackwardInter []Interjection, +) { + if len(pBackwardInter) > 0 { + numPositiveBuckets := countSpans(h.PositiveSpans) + h.PositiveBuckets = interject(h.PositiveBuckets, make([]int64, numPositiveBuckets), pBackwardInter, true) + } + if len(nBackwardInter) > 0 { + numNegativeBuckets := countSpans(h.NegativeSpans) + h.NegativeBuckets = interject(h.NegativeBuckets, make([]int64, numNegativeBuckets), nBackwardInter, true) + } +} + func (a *HistogramAppender) writeSumDelta(v float64) { xorWrite(a.b, v, a.sum, &a.leading, &a.trailing) } @@ -551,6 +615,8 @@ type histogramIterator struct { numTotal uint16 numRead uint16 + counterResetHeader CounterResetHeader + // Layout: schema int32 zThreshold float64 @@ -599,16 +665,21 @@ func (it *histogramIterator) AtHistogram() (int64, *histogram.Histogram) { return it.t, &histogram.Histogram{Sum: it.sum} } it.atHistogramCalled = true + crHint := histogram.UnknownCounterReset + if it.counterResetHeader == GaugeType { + crHint = histogram.GaugeType + } return it.t, &histogram.Histogram{ - Count: it.cnt, - ZeroCount: it.zCnt, - Sum: it.sum, - ZeroThreshold: it.zThreshold, - Schema: it.schema, - PositiveSpans: it.pSpans, - NegativeSpans: it.nSpans, - PositiveBuckets: it.pBuckets, - NegativeBuckets: it.nBuckets, + CounterResetHint: crHint, + Count: it.cnt, + ZeroCount: it.zCnt, + Sum: it.sum, + ZeroThreshold: it.zThreshold, + Schema: it.schema, + PositiveSpans: it.pSpans, + NegativeSpans: it.nSpans, + PositiveBuckets: it.pBuckets, + NegativeBuckets: it.nBuckets, } } @@ -617,16 +688,21 @@ func (it *histogramIterator) AtFloatHistogram() (int64, *histogram.FloatHistogra return it.t, &histogram.FloatHistogram{Sum: it.sum} } it.atFloatHistogramCalled = true + crHint := histogram.UnknownCounterReset + if it.counterResetHeader == GaugeType { + crHint = histogram.GaugeType + } return it.t, &histogram.FloatHistogram{ - Count: float64(it.cnt), - ZeroCount: float64(it.zCnt), - Sum: it.sum, - ZeroThreshold: it.zThreshold, - Schema: it.schema, - PositiveSpans: it.pSpans, - NegativeSpans: it.nSpans, - PositiveBuckets: it.pFloatBuckets, - NegativeBuckets: it.nFloatBuckets, + CounterResetHint: crHint, + Count: float64(it.cnt), + ZeroCount: float64(it.zCnt), + Sum: it.sum, + ZeroThreshold: it.zThreshold, + Schema: it.schema, + PositiveSpans: it.pSpans, + NegativeSpans: it.nSpans, + PositiveBuckets: it.pFloatBuckets, + NegativeBuckets: it.nFloatBuckets, } } @@ -645,6 +721,8 @@ func (it *histogramIterator) Reset(b []byte) { it.numTotal = binary.BigEndian.Uint16(b) it.numRead = 0 + it.counterResetHeader = CounterResetHeader(b[2] & 0b11000000) + it.t, it.cnt, it.zCnt = 0, 0, 0 it.tDelta, it.cntDelta, it.zCntDelta = 0, 0, 0 diff --git a/tsdb/chunkenc/histogram_meta.go b/tsdb/chunkenc/histogram_meta.go index f923dcb919..345b8cc519 100644 --- a/tsdb/chunkenc/histogram_meta.go +++ b/tsdb/chunkenc/histogram_meta.go @@ -376,6 +376,10 @@ loop: return interjections, bInterjections, mergedSpans } +type bucketValue interface { + int64 | float64 +} + // interject merges 'in' with the provided interjections and writes them into // 'out', which must already have the appropriate length. func interject[BV bucketValue](in, out []BV, interjections []Interjection, deltas bool) []BV { diff --git a/tsdb/chunkenc/histogram_test.go b/tsdb/chunkenc/histogram_test.go index 73831c6de5..73851c9dfb 100644 --- a/tsdb/chunkenc/histogram_test.go +++ b/tsdb/chunkenc/histogram_test.go @@ -517,3 +517,171 @@ func TestAtFloatHistogram(t *testing.T) { i++ } } + +func TestHistogramChunkAppendableGauge(t *testing.T) { + c := Chunk(NewHistogramChunk()) + + // Create fresh appender and add the first histogram. + app, err := c.Appender() + require.NoError(t, err) + require.Equal(t, 0, c.NumSamples()) + + ts := int64(1234567890) + h1 := &histogram.Histogram{ + Count: 5, + ZeroCount: 2, + Sum: 18.4, + ZeroThreshold: 1e-125, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + }, + PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // {6, 3, 3, 2, 4, 5, 1} + } + + app.AppendHistogram(ts, h1.Copy()) + require.Equal(t, 1, c.NumSamples()) + c.(*HistogramChunk).SetCounterResetHeader(GaugeType) + + { // Schema change. + h2 := h1.Copy() + h2.Schema++ + hApp, _ := app.(*HistogramAppender) + _, _, _, _, _, _, ok := hApp.AppendableGauge(h2) + require.False(t, ok) + } + + { // Zero threshold change. + h2 := h1.Copy() + h2.ZeroThreshold += 0.1 + hApp, _ := app.(*HistogramAppender) + _, _, _, _, _, _, ok := hApp.AppendableGauge(h2) + require.False(t, ok) + } + + { // New histogram that has more buckets. + h2 := h1.Copy() + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 1}, + {Offset: 1, Length: 4}, + {Offset: 3, Length: 3}, + } + h2.Count += 9 + h2.ZeroCount++ + h2.Sum = 30 + h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // {7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 1} + + hApp, _ := app.(*HistogramAppender) + pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2) + require.Greater(t, len(pI), 0) + require.Len(t, nI, 0) + require.Len(t, pBackwardI, 0) + require.Len(t, nBackwardI, 0) + require.True(t, ok) + } + + { // New histogram that has buckets missing. + h2 := h1.Copy() + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 1}, + {Offset: 4, Length: 1}, + {Offset: 1, Length: 1}, + } + h2.Count -= 4 + h2.Sum-- + h2.PositiveBuckets = []int64{6, -3, 0, -1, 3, -4} // {6, 3, 3, 2, 5, 1} + + hApp, _ := app.(*HistogramAppender) + pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2) + require.Len(t, pI, 0) + require.Len(t, nI, 0) + require.Greater(t, len(pBackwardI), 0) + require.Len(t, nBackwardI, 0) + require.True(t, ok) + } + + { // New histogram that has a bucket missing and new buckets. + h2 := h1.Copy() + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 5, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + } + h2.Sum = 21 + h2.PositiveBuckets = []int64{6, -3, -1, 2, 1, -4} // {6, 3, 2, 4, 5, 1} + + hApp, _ := app.(*HistogramAppender) + pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2) + require.Greater(t, len(pI), 0) + require.Greater(t, len(pBackwardI), 0) + require.Len(t, nI, 0) + require.Len(t, nBackwardI, 0) + require.True(t, ok) + } + + { // New histogram that has a counter reset while buckets are same. + h2 := h1.Copy() + h2.Sum = 23 + h2.PositiveBuckets = []int64{6, -4, 1, -1, 2, 1, -4} // {6, 2, 3, 2, 4, 5, 1} + + hApp, _ := app.(*HistogramAppender) + pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2) + require.Len(t, pI, 0) + require.Len(t, nI, 0) + require.Len(t, pBackwardI, 0) + require.Len(t, nBackwardI, 0) + require.True(t, ok) + } + + { // New histogram that has a counter reset while new buckets were added. + h2 := h1.Copy() + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 1}, + {Offset: 1, Length: 4}, + {Offset: 3, Length: 3}, + } + h2.Sum = 29 + h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 0} // {7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 0} + + hApp, _ := app.(*HistogramAppender) + pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2) + require.Greater(t, len(pI), 0) + require.Len(t, nI, 0) + require.Len(t, pBackwardI, 0) + require.Len(t, nBackwardI, 0) + require.True(t, ok) + } + + { + // New histogram that has a counter reset while new buckets were + // added before the first bucket and reset on first bucket. + h2 := h1.Copy() + h2.PositiveSpans = []histogram.Span{ + {Offset: -3, Length: 2}, + {Offset: 1, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + } + h2.Sum = 26 + h2.PositiveBuckets = []int64{1, 1, 3, -2, 0, -1, 2, 1, -4} // {1, 2, 5, 3, 3, 2, 4, 5, 1} + + hApp, _ := app.(*HistogramAppender) + pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2) + require.Greater(t, len(pI), 0) + require.Len(t, nI, 0) + require.Len(t, pBackwardI, 0) + require.Len(t, nBackwardI, 0) + require.True(t, ok) + } +} diff --git a/tsdb/head.go b/tsdb/head.go index 379bb222d5..e951e9eced 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -2048,6 +2048,32 @@ func GenerateTestHistograms(n int) (r []*histogram.Histogram) { return r } +func GenerateTestGaugeHistograms(n int) (r []*histogram.Histogram) { + for x := 0; x < n; x++ { + i := rand.Intn(n) + r = append(r, &histogram.Histogram{ + CounterResetHint: histogram.GaugeType, + Count: 10 + uint64(i*8), + ZeroCount: 2 + uint64(i), + ZeroThreshold: 0.001, + Sum: 18.4 * float64(i+1), + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []int64{int64(i + 1), 1, -1, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + NegativeBuckets: []int64{int64(i + 1), 1, -1, 0}, + }) + } + + return r +} + func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) { for i := 0; i < n; i++ { r = append(r, &histogram.FloatHistogram{ @@ -2072,7 +2098,7 @@ func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) { return r } -func GenerateTestGaugeHistograms(n int) (r []*histogram.FloatHistogram) { +func GenerateTestGaugeFloatHistograms(n int) (r []*histogram.FloatHistogram) { for x := 0; x < n; x++ { i := rand.Intn(n) r = append(r, &histogram.FloatHistogram{ diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 0785e99e80..46235ce933 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1148,18 +1148,29 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui app, _ := s.app.(*chunkenc.HistogramAppender) var ( positiveInterjections, negativeInterjections []chunkenc.Interjection + pBackwardInter, nBackwardInter []chunkenc.Interjection + pMergedSpans, nMergedSpans []histogram.Span okToAppend, counterReset bool ) c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange) if !sampleInOrder { return sampleInOrder, chunkCreated } - + gauge := h.CounterResetHint == histogram.GaugeType if app != nil { - positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(h) + if gauge { + positiveInterjections, negativeInterjections, pBackwardInter, nBackwardInter, pMergedSpans, nMergedSpans, okToAppend = app.AppendableGauge(h) + } else { + positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(h) + } } if !chunkCreated { + if len(pBackwardInter)+len(nBackwardInter) > 0 { + h.PositiveSpans = pMergedSpans + h.NegativeSpans = nMergedSpans + app.RecodeHistogramm(h, pBackwardInter, nBackwardInter) + } // We have 3 cases here // - !okToAppend -> We need to cut a new chunk. // - okToAppend but we have interjections → Existing chunk needs @@ -1184,9 +1195,12 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui if chunkCreated { hc := s.headChunk.chunk.(*chunkenc.HistogramChunk) header := chunkenc.UnknownCounterReset - if counterReset { + switch { + case gauge: + header = chunkenc.GaugeType + case counterReset: header = chunkenc.CounterReset - } else if okToAppend { + case okToAppend: header = chunkenc.NotCounterReset } hc.SetCounterResetHeader(header) @@ -1265,11 +1279,12 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, if chunkCreated { hc := s.headChunk.chunk.(*chunkenc.FloatHistogramChunk) header := chunkenc.UnknownCounterReset - if gauge { + switch { + case gauge: header = chunkenc.GaugeType - } else if counterReset { + case counterReset: header = chunkenc.CounterReset - } else if okToAppend { + case okToAppend: header = chunkenc.NotCounterReset } hc.SetCounterResetHeader(header) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 545bf5046e..5e92cd4b8e 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2834,12 +2834,13 @@ func TestAppendHistogram(t *testing.T) { ingestTs := int64(0) app := head.Appender(context.Background()) - // Integer histograms. type timedHistogram struct { t int64 h *histogram.Histogram } - expHistograms := make([]timedHistogram, 0, numHistograms) + expHistograms := make([]timedHistogram, 0, 2*numHistograms) + + // Counter integer histograms. for _, h := range GenerateTestHistograms(numHistograms) { _, err := app.AppendHistogram(0, l, ingestTs, h, nil) require.NoError(t, err) @@ -2851,12 +2852,25 @@ func TestAppendHistogram(t *testing.T) { } } + // Gauge integer histograms. + for _, h := range GenerateTestGaugeHistograms(numHistograms) { + _, err := app.AppendHistogram(0, l, ingestTs, h, nil) + require.NoError(t, err) + expHistograms = append(expHistograms, timedHistogram{ingestTs, h}) + ingestTs++ + if ingestTs%50 == 0 { + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + } + } + type timedFloatHistogram struct { t int64 h *histogram.FloatHistogram } - // Float counter histograms. - expFloatHistograms := make([]timedFloatHistogram, 0, numHistograms) + expFloatHistograms := make([]timedFloatHistogram, 0, 2*numHistograms) + + // Counter float histograms. for _, fh := range GenerateTestFloatHistograms(numHistograms) { _, err := app.AppendHistogram(0, l, ingestTs, nil, fh) require.NoError(t, err) @@ -2868,8 +2882,8 @@ func TestAppendHistogram(t *testing.T) { } } - // Float gauge histograms. - for _, fh := range GenerateTestGaugeHistograms(numHistograms) { + // Gauge float histograms. + for _, fh := range GenerateTestGaugeFloatHistograms(numHistograms) { _, err := app.AppendHistogram(0, l, ingestTs, nil, fh) require.NoError(t, err) expFloatHistograms = append(expFloatHistograms, timedFloatHistogram{ingestTs, fh}) @@ -2879,6 +2893,7 @@ func TestAppendHistogram(t *testing.T) { app = head.Appender(context.Background()) } } + require.NoError(t, app.Commit()) q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) @@ -2913,7 +2928,7 @@ func TestAppendHistogram(t *testing.T) { } func TestHistogramInWALAndMmapChunk(t *testing.T) { - head, _ := newTestHead(t, 2000, false, false) + head, _ := newTestHead(t, 3000, false, false) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -2924,27 +2939,36 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { k1 := s1.String() numHistograms := 300 exp := map[string][]tsdbutil.Sample{} - app := head.Appender(context.Background()) ts := int64(0) - for _, h := range GenerateTestHistograms(numHistograms) { - h.Count = h.Count * 2 - h.NegativeSpans = h.PositiveSpans - h.NegativeBuckets = h.PositiveBuckets - _, err := app.AppendHistogram(0, s1, ts, h, nil) - require.NoError(t, err) - exp[k1] = append(exp[k1], sample{t: ts, h: h.Copy()}) - ts++ - if ts%5 == 0 { - require.NoError(t, app.Commit()) - app = head.Appender(context.Background()) + var app storage.Appender + for _, gauge := range []bool{true, false} { + app = head.Appender(context.Background()) + var hists []*histogram.Histogram + if gauge { + hists = GenerateTestGaugeHistograms(numHistograms) + } else { + hists = GenerateTestHistograms(numHistograms) } + for _, h := range hists { + h.Count = h.Count * 2 + h.NegativeSpans = h.PositiveSpans + h.NegativeBuckets = h.PositiveBuckets + _, err := app.AppendHistogram(0, s1, ts, h, nil) + require.NoError(t, err) + exp[k1] = append(exp[k1], sample{t: ts, h: h.Copy()}) + ts++ + if ts%5 == 0 { + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + } + } + require.NoError(t, app.Commit()) } - require.NoError(t, app.Commit()) for _, gauge := range []bool{true, false} { app = head.Appender(context.Background()) var hists []*histogram.FloatHistogram if gauge { - hists = GenerateTestGaugeHistograms(numHistograms) + hists = GenerateTestGaugeFloatHistograms(numHistograms) } else { hists = GenerateTestFloatHistograms(numHistograms) } @@ -2964,10 +2988,10 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { require.NoError(t, app.Commit()) } - // There should be 7 mmap chunks in s1. + // There should be 11 mmap chunks in s1. ms := head.series.getByHash(s1.Hash(), s1) - require.Len(t, ms.mmappedChunks, 8) - expMmapChunks := make([]*mmappedChunk, 0, 8) + require.Len(t, ms.mmappedChunks, 11) + expMmapChunks := make([]*mmappedChunk, 0, 11) for _, mmap := range ms.mmappedChunks { require.Greater(t, mmap.numSamples, uint16(0)) cpy := *mmap @@ -2979,36 +3003,44 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { // Series with mix of histograms and float. s2 := labels.FromStrings("a", "b2") k2 := s2.String() - app = head.Appender(context.Background()) ts = 0 - for _, h := range GenerateTestHistograms(100) { - ts++ - h.Count = h.Count * 2 - h.NegativeSpans = h.PositiveSpans - h.NegativeBuckets = h.PositiveBuckets - _, err := app.AppendHistogram(0, s2, int64(ts), h, nil) - require.NoError(t, err) - exp[k2] = append(exp[k2], sample{t: int64(ts), h: h.Copy()}) - if ts%20 == 0 { - require.NoError(t, app.Commit()) - app = head.Appender(context.Background()) - // Add some float. - for i := 0; i < 10; i++ { - ts++ - _, err := app.Append(0, s2, int64(ts), float64(ts)) - require.NoError(t, err) - exp[k2] = append(exp[k2], sample{t: int64(ts), v: float64(ts)}) - } - require.NoError(t, app.Commit()) - app = head.Appender(context.Background()) + for _, gauge := range []bool{true, false} { + app = head.Appender(context.Background()) + var hists []*histogram.Histogram + if gauge { + hists = GenerateTestGaugeHistograms(100) + } else { + hists = GenerateTestHistograms(100) } + for _, h := range hists { + ts++ + h.Count = h.Count * 2 + h.NegativeSpans = h.PositiveSpans + h.NegativeBuckets = h.PositiveBuckets + _, err := app.AppendHistogram(0, s2, int64(ts), h, nil) + require.NoError(t, err) + exp[k2] = append(exp[k2], sample{t: int64(ts), h: h.Copy()}) + if ts%20 == 0 { + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + // Add some float. + for i := 0; i < 10; i++ { + ts++ + _, err := app.Append(0, s2, int64(ts), float64(ts)) + require.NoError(t, err) + exp[k2] = append(exp[k2], sample{t: int64(ts), v: float64(ts)}) + } + require.NoError(t, app.Commit()) + app = head.Appender(context.Background()) + } + } + require.NoError(t, app.Commit()) } - require.NoError(t, app.Commit()) for _, gauge := range []bool{true, false} { app = head.Appender(context.Background()) var hists []*histogram.FloatHistogram if gauge { - hists = GenerateTestGaugeHistograms(100) + hists = GenerateTestGaugeFloatHistograms(100) } else { hists = GenerateTestFloatHistograms(100) } @@ -4571,6 +4603,81 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) { }) require.NoError(t, head.Init(0)) + ts := int64(0) + appendHistogram := func(h *histogram.Histogram) { + ts++ + app := head.Appender(context.Background()) + _, err := app.AppendHistogram(0, l, ts, h.Copy(), nil) + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + + hists := GenerateTestGaugeHistograms(5) + hists[0].CounterResetHint = histogram.UnknownCounterReset + appendHistogram(hists[0]) + appendHistogram(hists[1]) + appendHistogram(hists[2]) + hists[3].CounterResetHint = histogram.UnknownCounterReset + appendHistogram(hists[3]) + appendHistogram(hists[3]) + appendHistogram(hists[4]) + + checkHeaders := func() { + ms, _, err := head.getOrCreate(l.Hash(), l) + require.NoError(t, err) + require.Len(t, ms.mmappedChunks, 3) + expHeaders := []chunkenc.CounterResetHeader{ + chunkenc.UnknownCounterReset, + chunkenc.GaugeType, + chunkenc.UnknownCounterReset, + chunkenc.GaugeType, + } + for i, mmapChunk := range ms.mmappedChunks { + chk, err := head.chunkDiskMapper.Chunk(mmapChunk.ref) + require.NoError(t, err) + require.Equal(t, expHeaders[i], chk.(*chunkenc.HistogramChunk).GetCounterResetHeader()) + } + require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.HistogramChunk).GetCounterResetHeader()) + } + checkHeaders() + + recs := readTestWAL(t, head.wal.Dir()) + require.Equal(t, []interface{}{ + []record.RefSeries{ + { + Ref: 1, + Labels: labels.FromStrings("a", "b"), + }, + }, + []record.RefHistogramSample{{Ref: 1, T: 1, H: hists[0]}}, + []record.RefHistogramSample{{Ref: 1, T: 2, H: hists[1]}}, + []record.RefHistogramSample{{Ref: 1, T: 3, H: hists[2]}}, + []record.RefHistogramSample{{Ref: 1, T: 4, H: hists[3]}}, + []record.RefHistogramSample{{Ref: 1, T: 5, H: hists[3]}}, + []record.RefHistogramSample{{Ref: 1, T: 6, H: hists[4]}}, + }, recs) + + // Restart Head without mmap chunks to expect the WAL replay to recognize gauge histograms. + require.NoError(t, head.Close()) + require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot))) + + w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false) + require.NoError(t, err) + head, err = NewHead(nil, nil, w, nil, head.opts, nil) + require.NoError(t, err) + require.NoError(t, head.Init(0)) + + checkHeaders() +} + +func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) { + l := labels.FromStrings("a", "b") + head, _ := newTestHead(t, 1000, false, false) + t.Cleanup(func() { + require.NoError(t, head.Close()) + }) + require.NoError(t, head.Init(0)) + ts := int64(0) appendHistogram := func(h *histogram.FloatHistogram) { ts++ @@ -4580,7 +4687,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) { require.NoError(t, app.Commit()) } - hists := GenerateTestGaugeHistograms(5) + hists := GenerateTestGaugeFloatHistograms(5) hists[0].CounterResetHint = histogram.UnknownCounterReset appendHistogram(hists[0]) appendHistogram(hists[1])