Merge pull request #11840 from prometheus/beorn7/histogram-gauge

tsdb: Add integer gauge histogram support
This commit is contained in:
Björn Rabenstein 2023-01-11 15:10:51 +01:00 committed by GitHub
commit ac96da3726
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 488 additions and 90 deletions

View File

@ -113,8 +113,8 @@ func (p *OpenMetricsParser) Series() ([]byte, *int64, float64) {
return p.series, nil, p.val
}
// Histogram always returns (nil, nil, nil, nil) because OpenMetrics does not support
// sparse histograms.
// Histogram returns (nil, nil, nil, nil) for now because OpenMetrics does not
// support sparse histograms yet.
func (p *OpenMetricsParser) Histogram() ([]byte, *int64, *histogram.Histogram, *histogram.FloatHistogram) {
return nil, nil, nil, nil
}

View File

@ -168,8 +168,8 @@ func (p *PromParser) Series() ([]byte, *int64, float64) {
return p.series, nil, p.val
}
// Histogram always returns (nil, nil, nil, nil) because the Prometheus text format
// does not support sparse histograms.
// Histogram returns (nil, nil, nil, nil) for now because the Prometheus text
// format does not support sparse histograms yet.
func (p *PromParser) Histogram() ([]byte, *int64, *histogram.Histogram, *histogram.FloatHistogram) {
return nil, nil, nil, nil
}

View File

@ -177,6 +177,7 @@ func newHistogramIterator(b []byte) *histogramIterator {
// The first 3 bytes contain chunk headers.
// We skip that for actual samples.
_, _ = it.br.readBits(24)
it.counterResetHeader = CounterResetHeader(b[2] & 0b11000000)
return it
}
@ -222,6 +223,14 @@ type HistogramAppender struct {
trailing uint8
}
func (a *HistogramAppender) GetCounterResetHeader() CounterResetHeader {
return CounterResetHeader(a.b.bytes()[2] & 0b11000000)
}
func (a *HistogramAppender) NumSamples() int {
return int(binary.BigEndian.Uint16(a.b.bytes()))
}
// Append implements Appender. This implementation panics because normal float
// samples must never be appended to a histogram chunk.
func (a *HistogramAppender) Append(int64, float64) {
@ -237,19 +246,16 @@ func (a *HistogramAppender) AppendFloatHistogram(int64, *histogram.FloatHistogra
// Appendable returns whether the chunk can be appended to, and if so
// whether any recoding needs to happen using the provided interjections
// (in case of any new buckets, positive or negative range, respectively).
// If the sample is a gauge histogram, AppendableGauge must be used instead.
//
// The chunk is not appendable in the following cases:
//
// • The schema has changed.
//
// • The threshold for the zero bucket has changed.
//
// • Any buckets have disappeared.
//
// • There was a counter reset in the count of observations or in any bucket,
// including the zero bucket.
//
// • The last sample in the chunk was stale while the current sample is not stale.
// - The schema has changed.
// - The threshold for the zero bucket has changed.
// - Any buckets have disappeared.
// - There was a counter reset in the count of observations or in any bucket,
// including the zero bucket.
// - The last sample in the chunk was stale while the current sample is not stale.
//
// The method returns an additional boolean set to true if it is not appendable
// because of a counter reset. If the given sample is stale, it is always ok to
@ -258,6 +264,9 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) (
positiveInterjections, negativeInterjections []Interjection,
okToAppend, counterReset bool,
) {
if a.NumSamples() > 0 && a.GetCounterResetHeader() == GaugeType {
return
}
if value.IsStaleNaN(h.Sum) {
// This is a stale sample whose buckets and spans don't matter.
okToAppend = true
@ -307,8 +316,47 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) (
return
}
type bucketValue interface {
int64 | float64
// AppendableGauge returns whether the chunk can be appended to, and if so
// whether:
// 1. Any recoding needs to happen to the chunk using the provided interjections
// (in case of any new buckets, positive or negative range, respectively).
// 2. Any recoding needs to happen for the histogram being appended, using the backward interjections
// (in case of any missing buckets, positive or negative range, respectively).
//
// This method must be only used for gauge histograms.
//
// The chunk is not appendable in the following cases:
// - The schema has changed.
// - The threshold for the zero bucket has changed.
// - The last sample in the chunk was stale while the current sample is not stale.
func (a *HistogramAppender) AppendableGauge(h *histogram.Histogram) (
positiveInterjections, negativeInterjections []Interjection,
backwardPositiveInterjections, backwardNegativeInterjections []Interjection,
positiveSpans, negativeSpans []histogram.Span,
okToAppend bool,
) {
if a.NumSamples() > 0 && a.GetCounterResetHeader() != GaugeType {
return
}
if value.IsStaleNaN(h.Sum) {
// This is a stale sample whose buckets and spans don't matter.
okToAppend = true
return
}
if value.IsStaleNaN(a.sum) {
// If the last sample was stale, then we can only accept stale
// samples in this chunk.
return
}
if h.Schema != a.schema || h.ZeroThreshold != a.zThreshold {
return
}
positiveInterjections, backwardPositiveInterjections, positiveSpans = bidirectionalCompareSpans(a.pSpans, h.PositiveSpans)
negativeInterjections, backwardNegativeInterjections, negativeSpans = bidirectionalCompareSpans(a.nSpans, h.NegativeSpans)
okToAppend = true
return
}
// counterResetInAnyBucket returns true if there was a counter reset for any
@ -542,6 +590,22 @@ func (a *HistogramAppender) Recode(
return hc, app
}
// RecodeHistogramm converts the current histogram (in-place) to accommodate an expansion of the set of
// (positive and/or negative) buckets used.
func (a *HistogramAppender) RecodeHistogramm(
h *histogram.Histogram,
pBackwardInter, nBackwardInter []Interjection,
) {
if len(pBackwardInter) > 0 {
numPositiveBuckets := countSpans(h.PositiveSpans)
h.PositiveBuckets = interject(h.PositiveBuckets, make([]int64, numPositiveBuckets), pBackwardInter, true)
}
if len(nBackwardInter) > 0 {
numNegativeBuckets := countSpans(h.NegativeSpans)
h.NegativeBuckets = interject(h.NegativeBuckets, make([]int64, numNegativeBuckets), nBackwardInter, true)
}
}
func (a *HistogramAppender) writeSumDelta(v float64) {
xorWrite(a.b, v, a.sum, &a.leading, &a.trailing)
}
@ -551,6 +615,8 @@ type histogramIterator struct {
numTotal uint16
numRead uint16
counterResetHeader CounterResetHeader
// Layout:
schema int32
zThreshold float64
@ -599,16 +665,21 @@ func (it *histogramIterator) AtHistogram() (int64, *histogram.Histogram) {
return it.t, &histogram.Histogram{Sum: it.sum}
}
it.atHistogramCalled = true
crHint := histogram.UnknownCounterReset
if it.counterResetHeader == GaugeType {
crHint = histogram.GaugeType
}
return it.t, &histogram.Histogram{
Count: it.cnt,
ZeroCount: it.zCnt,
Sum: it.sum,
ZeroThreshold: it.zThreshold,
Schema: it.schema,
PositiveSpans: it.pSpans,
NegativeSpans: it.nSpans,
PositiveBuckets: it.pBuckets,
NegativeBuckets: it.nBuckets,
CounterResetHint: crHint,
Count: it.cnt,
ZeroCount: it.zCnt,
Sum: it.sum,
ZeroThreshold: it.zThreshold,
Schema: it.schema,
PositiveSpans: it.pSpans,
NegativeSpans: it.nSpans,
PositiveBuckets: it.pBuckets,
NegativeBuckets: it.nBuckets,
}
}
@ -617,16 +688,21 @@ func (it *histogramIterator) AtFloatHistogram() (int64, *histogram.FloatHistogra
return it.t, &histogram.FloatHistogram{Sum: it.sum}
}
it.atFloatHistogramCalled = true
crHint := histogram.UnknownCounterReset
if it.counterResetHeader == GaugeType {
crHint = histogram.GaugeType
}
return it.t, &histogram.FloatHistogram{
Count: float64(it.cnt),
ZeroCount: float64(it.zCnt),
Sum: it.sum,
ZeroThreshold: it.zThreshold,
Schema: it.schema,
PositiveSpans: it.pSpans,
NegativeSpans: it.nSpans,
PositiveBuckets: it.pFloatBuckets,
NegativeBuckets: it.nFloatBuckets,
CounterResetHint: crHint,
Count: float64(it.cnt),
ZeroCount: float64(it.zCnt),
Sum: it.sum,
ZeroThreshold: it.zThreshold,
Schema: it.schema,
PositiveSpans: it.pSpans,
NegativeSpans: it.nSpans,
PositiveBuckets: it.pFloatBuckets,
NegativeBuckets: it.nFloatBuckets,
}
}
@ -645,6 +721,8 @@ func (it *histogramIterator) Reset(b []byte) {
it.numTotal = binary.BigEndian.Uint16(b)
it.numRead = 0
it.counterResetHeader = CounterResetHeader(b[2] & 0b11000000)
it.t, it.cnt, it.zCnt = 0, 0, 0
it.tDelta, it.cntDelta, it.zCntDelta = 0, 0, 0

View File

@ -376,6 +376,10 @@ loop:
return interjections, bInterjections, mergedSpans
}
type bucketValue interface {
int64 | float64
}
// interject merges 'in' with the provided interjections and writes them into
// 'out', which must already have the appropriate length.
func interject[BV bucketValue](in, out []BV, interjections []Interjection, deltas bool) []BV {

View File

@ -517,3 +517,171 @@ func TestAtFloatHistogram(t *testing.T) {
i++
}
}
func TestHistogramChunkAppendableGauge(t *testing.T) {
c := Chunk(NewHistogramChunk())
// Create fresh appender and add the first histogram.
app, err := c.Appender()
require.NoError(t, err)
require.Equal(t, 0, c.NumSamples())
ts := int64(1234567890)
h1 := &histogram.Histogram{
Count: 5,
ZeroCount: 2,
Sum: 18.4,
ZeroThreshold: 1e-125,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 1},
{Offset: 3, Length: 2},
{Offset: 3, Length: 1},
{Offset: 1, Length: 1},
},
PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // {6, 3, 3, 2, 4, 5, 1}
}
app.AppendHistogram(ts, h1.Copy())
require.Equal(t, 1, c.NumSamples())
c.(*HistogramChunk).SetCounterResetHeader(GaugeType)
{ // Schema change.
h2 := h1.Copy()
h2.Schema++
hApp, _ := app.(*HistogramAppender)
_, _, _, _, _, _, ok := hApp.AppendableGauge(h2)
require.False(t, ok)
}
{ // Zero threshold change.
h2 := h1.Copy()
h2.ZeroThreshold += 0.1
hApp, _ := app.(*HistogramAppender)
_, _, _, _, _, _, ok := hApp.AppendableGauge(h2)
require.False(t, ok)
}
{ // New histogram that has more buckets.
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 3},
{Offset: 1, Length: 1},
{Offset: 1, Length: 4},
{Offset: 3, Length: 3},
}
h2.Count += 9
h2.ZeroCount++
h2.Sum = 30
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // {7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 1}
hApp, _ := app.(*HistogramAppender)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
require.Greater(t, len(pI), 0)
require.Len(t, nI, 0)
require.Len(t, pBackwardI, 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
}
{ // New histogram that has buckets missing.
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 1},
{Offset: 3, Length: 1},
{Offset: 4, Length: 1},
{Offset: 1, Length: 1},
}
h2.Count -= 4
h2.Sum--
h2.PositiveBuckets = []int64{6, -3, 0, -1, 3, -4} // {6, 3, 3, 2, 5, 1}
hApp, _ := app.(*HistogramAppender)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
require.Len(t, pI, 0)
require.Len(t, nI, 0)
require.Greater(t, len(pBackwardI), 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
}
{ // New histogram that has a bucket missing and new buckets.
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 5, Length: 2},
{Offset: 3, Length: 1},
{Offset: 1, Length: 1},
}
h2.Sum = 21
h2.PositiveBuckets = []int64{6, -3, -1, 2, 1, -4} // {6, 3, 2, 4, 5, 1}
hApp, _ := app.(*HistogramAppender)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
require.Greater(t, len(pI), 0)
require.Greater(t, len(pBackwardI), 0)
require.Len(t, nI, 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
}
{ // New histogram that has a counter reset while buckets are same.
h2 := h1.Copy()
h2.Sum = 23
h2.PositiveBuckets = []int64{6, -4, 1, -1, 2, 1, -4} // {6, 2, 3, 2, 4, 5, 1}
hApp, _ := app.(*HistogramAppender)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
require.Len(t, pI, 0)
require.Len(t, nI, 0)
require.Len(t, pBackwardI, 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
}
{ // New histogram that has a counter reset while new buckets were added.
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 3},
{Offset: 1, Length: 1},
{Offset: 1, Length: 4},
{Offset: 3, Length: 3},
}
h2.Sum = 29
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 0} // {7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 0}
hApp, _ := app.(*HistogramAppender)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
require.Greater(t, len(pI), 0)
require.Len(t, nI, 0)
require.Len(t, pBackwardI, 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
}
{
// New histogram that has a counter reset while new buckets were
// added before the first bucket and reset on first bucket.
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: -3, Length: 2},
{Offset: 1, Length: 2},
{Offset: 2, Length: 1},
{Offset: 3, Length: 2},
{Offset: 3, Length: 1},
{Offset: 1, Length: 1},
}
h2.Sum = 26
h2.PositiveBuckets = []int64{1, 1, 3, -2, 0, -1, 2, 1, -4} // {1, 2, 5, 3, 3, 2, 4, 5, 1}
hApp, _ := app.(*HistogramAppender)
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
require.Greater(t, len(pI), 0)
require.Len(t, nI, 0)
require.Len(t, pBackwardI, 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
}
}

View File

@ -2048,6 +2048,32 @@ func GenerateTestHistograms(n int) (r []*histogram.Histogram) {
return r
}
func GenerateTestGaugeHistograms(n int) (r []*histogram.Histogram) {
for x := 0; x < n; x++ {
i := rand.Intn(n)
r = append(r, &histogram.Histogram{
CounterResetHint: histogram.GaugeType,
Count: 10 + uint64(i*8),
ZeroCount: 2 + uint64(i),
ZeroThreshold: 0.001,
Sum: 18.4 * float64(i+1),
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{int64(i + 1), 1, -1, 0},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
NegativeBuckets: []int64{int64(i + 1), 1, -1, 0},
})
}
return r
}
func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) {
for i := 0; i < n; i++ {
r = append(r, &histogram.FloatHistogram{
@ -2072,7 +2098,7 @@ func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) {
return r
}
func GenerateTestGaugeHistograms(n int) (r []*histogram.FloatHistogram) {
func GenerateTestGaugeFloatHistograms(n int) (r []*histogram.FloatHistogram) {
for x := 0; x < n; x++ {
i := rand.Intn(n)
r = append(r, &histogram.FloatHistogram{

View File

@ -1148,18 +1148,29 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
app, _ := s.app.(*chunkenc.HistogramAppender)
var (
positiveInterjections, negativeInterjections []chunkenc.Interjection
pBackwardInter, nBackwardInter []chunkenc.Interjection
pMergedSpans, nMergedSpans []histogram.Span
okToAppend, counterReset bool
)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
gauge := h.CounterResetHint == histogram.GaugeType
if app != nil {
positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(h)
if gauge {
positiveInterjections, negativeInterjections, pBackwardInter, nBackwardInter, pMergedSpans, nMergedSpans, okToAppend = app.AppendableGauge(h)
} else {
positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(h)
}
}
if !chunkCreated {
if len(pBackwardInter)+len(nBackwardInter) > 0 {
h.PositiveSpans = pMergedSpans
h.NegativeSpans = nMergedSpans
app.RecodeHistogramm(h, pBackwardInter, nBackwardInter)
}
// We have 3 cases here
// - !okToAppend -> We need to cut a new chunk.
// - okToAppend but we have interjections → Existing chunk needs
@ -1184,9 +1195,12 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
if chunkCreated {
hc := s.headChunk.chunk.(*chunkenc.HistogramChunk)
header := chunkenc.UnknownCounterReset
if counterReset {
switch {
case gauge:
header = chunkenc.GaugeType
case counterReset:
header = chunkenc.CounterReset
} else if okToAppend {
case okToAppend:
header = chunkenc.NotCounterReset
}
hc.SetCounterResetHeader(header)
@ -1265,11 +1279,12 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
if chunkCreated {
hc := s.headChunk.chunk.(*chunkenc.FloatHistogramChunk)
header := chunkenc.UnknownCounterReset
if gauge {
switch {
case gauge:
header = chunkenc.GaugeType
} else if counterReset {
case counterReset:
header = chunkenc.CounterReset
} else if okToAppend {
case okToAppend:
header = chunkenc.NotCounterReset
}
hc.SetCounterResetHeader(header)

View File

@ -2834,12 +2834,13 @@ func TestAppendHistogram(t *testing.T) {
ingestTs := int64(0)
app := head.Appender(context.Background())
// Integer histograms.
type timedHistogram struct {
t int64
h *histogram.Histogram
}
expHistograms := make([]timedHistogram, 0, numHistograms)
expHistograms := make([]timedHistogram, 0, 2*numHistograms)
// Counter integer histograms.
for _, h := range GenerateTestHistograms(numHistograms) {
_, err := app.AppendHistogram(0, l, ingestTs, h, nil)
require.NoError(t, err)
@ -2851,12 +2852,25 @@ func TestAppendHistogram(t *testing.T) {
}
}
// Gauge integer histograms.
for _, h := range GenerateTestGaugeHistograms(numHistograms) {
_, err := app.AppendHistogram(0, l, ingestTs, h, nil)
require.NoError(t, err)
expHistograms = append(expHistograms, timedHistogram{ingestTs, h})
ingestTs++
if ingestTs%50 == 0 {
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
}
}
type timedFloatHistogram struct {
t int64
h *histogram.FloatHistogram
}
// Float counter histograms.
expFloatHistograms := make([]timedFloatHistogram, 0, numHistograms)
expFloatHistograms := make([]timedFloatHistogram, 0, 2*numHistograms)
// Counter float histograms.
for _, fh := range GenerateTestFloatHistograms(numHistograms) {
_, err := app.AppendHistogram(0, l, ingestTs, nil, fh)
require.NoError(t, err)
@ -2868,8 +2882,8 @@ func TestAppendHistogram(t *testing.T) {
}
}
// Float gauge histograms.
for _, fh := range GenerateTestGaugeHistograms(numHistograms) {
// Gauge float histograms.
for _, fh := range GenerateTestGaugeFloatHistograms(numHistograms) {
_, err := app.AppendHistogram(0, l, ingestTs, nil, fh)
require.NoError(t, err)
expFloatHistograms = append(expFloatHistograms, timedFloatHistogram{ingestTs, fh})
@ -2879,6 +2893,7 @@ func TestAppendHistogram(t *testing.T) {
app = head.Appender(context.Background())
}
}
require.NoError(t, app.Commit())
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime())
@ -2913,7 +2928,7 @@ func TestAppendHistogram(t *testing.T) {
}
func TestHistogramInWALAndMmapChunk(t *testing.T) {
head, _ := newTestHead(t, 2000, false, false)
head, _ := newTestHead(t, 3000, false, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
@ -2924,27 +2939,36 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
k1 := s1.String()
numHistograms := 300
exp := map[string][]tsdbutil.Sample{}
app := head.Appender(context.Background())
ts := int64(0)
for _, h := range GenerateTestHistograms(numHistograms) {
h.Count = h.Count * 2
h.NegativeSpans = h.PositiveSpans
h.NegativeBuckets = h.PositiveBuckets
_, err := app.AppendHistogram(0, s1, ts, h, nil)
require.NoError(t, err)
exp[k1] = append(exp[k1], sample{t: ts, h: h.Copy()})
ts++
if ts%5 == 0 {
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
var app storage.Appender
for _, gauge := range []bool{true, false} {
app = head.Appender(context.Background())
var hists []*histogram.Histogram
if gauge {
hists = GenerateTestGaugeHistograms(numHistograms)
} else {
hists = GenerateTestHistograms(numHistograms)
}
for _, h := range hists {
h.Count = h.Count * 2
h.NegativeSpans = h.PositiveSpans
h.NegativeBuckets = h.PositiveBuckets
_, err := app.AppendHistogram(0, s1, ts, h, nil)
require.NoError(t, err)
exp[k1] = append(exp[k1], sample{t: ts, h: h.Copy()})
ts++
if ts%5 == 0 {
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
}
}
require.NoError(t, app.Commit())
}
require.NoError(t, app.Commit())
for _, gauge := range []bool{true, false} {
app = head.Appender(context.Background())
var hists []*histogram.FloatHistogram
if gauge {
hists = GenerateTestGaugeHistograms(numHistograms)
hists = GenerateTestGaugeFloatHistograms(numHistograms)
} else {
hists = GenerateTestFloatHistograms(numHistograms)
}
@ -2964,10 +2988,10 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
require.NoError(t, app.Commit())
}
// There should be 7 mmap chunks in s1.
// There should be 11 mmap chunks in s1.
ms := head.series.getByHash(s1.Hash(), s1)
require.Len(t, ms.mmappedChunks, 8)
expMmapChunks := make([]*mmappedChunk, 0, 8)
require.Len(t, ms.mmappedChunks, 11)
expMmapChunks := make([]*mmappedChunk, 0, 11)
for _, mmap := range ms.mmappedChunks {
require.Greater(t, mmap.numSamples, uint16(0))
cpy := *mmap
@ -2979,36 +3003,44 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
// Series with mix of histograms and float.
s2 := labels.FromStrings("a", "b2")
k2 := s2.String()
app = head.Appender(context.Background())
ts = 0
for _, h := range GenerateTestHistograms(100) {
ts++
h.Count = h.Count * 2
h.NegativeSpans = h.PositiveSpans
h.NegativeBuckets = h.PositiveBuckets
_, err := app.AppendHistogram(0, s2, int64(ts), h, nil)
require.NoError(t, err)
exp[k2] = append(exp[k2], sample{t: int64(ts), h: h.Copy()})
if ts%20 == 0 {
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
// Add some float.
for i := 0; i < 10; i++ {
ts++
_, err := app.Append(0, s2, int64(ts), float64(ts))
require.NoError(t, err)
exp[k2] = append(exp[k2], sample{t: int64(ts), v: float64(ts)})
}
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
for _, gauge := range []bool{true, false} {
app = head.Appender(context.Background())
var hists []*histogram.Histogram
if gauge {
hists = GenerateTestGaugeHistograms(100)
} else {
hists = GenerateTestHistograms(100)
}
for _, h := range hists {
ts++
h.Count = h.Count * 2
h.NegativeSpans = h.PositiveSpans
h.NegativeBuckets = h.PositiveBuckets
_, err := app.AppendHistogram(0, s2, int64(ts), h, nil)
require.NoError(t, err)
exp[k2] = append(exp[k2], sample{t: int64(ts), h: h.Copy()})
if ts%20 == 0 {
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
// Add some float.
for i := 0; i < 10; i++ {
ts++
_, err := app.Append(0, s2, int64(ts), float64(ts))
require.NoError(t, err)
exp[k2] = append(exp[k2], sample{t: int64(ts), v: float64(ts)})
}
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
}
}
require.NoError(t, app.Commit())
}
require.NoError(t, app.Commit())
for _, gauge := range []bool{true, false} {
app = head.Appender(context.Background())
var hists []*histogram.FloatHistogram
if gauge {
hists = GenerateTestGaugeHistograms(100)
hists = GenerateTestGaugeFloatHistograms(100)
} else {
hists = GenerateTestFloatHistograms(100)
}
@ -4571,6 +4603,81 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) {
})
require.NoError(t, head.Init(0))
ts := int64(0)
appendHistogram := func(h *histogram.Histogram) {
ts++
app := head.Appender(context.Background())
_, err := app.AppendHistogram(0, l, ts, h.Copy(), nil)
require.NoError(t, err)
require.NoError(t, app.Commit())
}
hists := GenerateTestGaugeHistograms(5)
hists[0].CounterResetHint = histogram.UnknownCounterReset
appendHistogram(hists[0])
appendHistogram(hists[1])
appendHistogram(hists[2])
hists[3].CounterResetHint = histogram.UnknownCounterReset
appendHistogram(hists[3])
appendHistogram(hists[3])
appendHistogram(hists[4])
checkHeaders := func() {
ms, _, err := head.getOrCreate(l.Hash(), l)
require.NoError(t, err)
require.Len(t, ms.mmappedChunks, 3)
expHeaders := []chunkenc.CounterResetHeader{
chunkenc.UnknownCounterReset,
chunkenc.GaugeType,
chunkenc.UnknownCounterReset,
chunkenc.GaugeType,
}
for i, mmapChunk := range ms.mmappedChunks {
chk, err := head.chunkDiskMapper.Chunk(mmapChunk.ref)
require.NoError(t, err)
require.Equal(t, expHeaders[i], chk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
}
require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
}
checkHeaders()
recs := readTestWAL(t, head.wal.Dir())
require.Equal(t, []interface{}{
[]record.RefSeries{
{
Ref: 1,
Labels: labels.FromStrings("a", "b"),
},
},
[]record.RefHistogramSample{{Ref: 1, T: 1, H: hists[0]}},
[]record.RefHistogramSample{{Ref: 1, T: 2, H: hists[1]}},
[]record.RefHistogramSample{{Ref: 1, T: 3, H: hists[2]}},
[]record.RefHistogramSample{{Ref: 1, T: 4, H: hists[3]}},
[]record.RefHistogramSample{{Ref: 1, T: 5, H: hists[3]}},
[]record.RefHistogramSample{{Ref: 1, T: 6, H: hists[4]}},
}, recs)
// Restart Head without mmap chunks to expect the WAL replay to recognize gauge histograms.
require.NoError(t, head.Close())
require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot)))
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false)
require.NoError(t, err)
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(0))
checkHeaders()
}
func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) {
l := labels.FromStrings("a", "b")
head, _ := newTestHead(t, 1000, false, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
require.NoError(t, head.Init(0))
ts := int64(0)
appendHistogram := func(h *histogram.FloatHistogram) {
ts++
@ -4580,7 +4687,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) {
require.NoError(t, app.Commit())
}
hists := GenerateTestGaugeHistograms(5)
hists := GenerateTestGaugeFloatHistograms(5)
hists[0].CounterResetHint = histogram.UnknownCounterReset
appendHistogram(hists[0])
appendHistogram(hists[1])