tsdb: Add logic to determine appendable gauge float histograms

This is to check if a gauge histogram can be appended to the given chunk.
If not, it tells what changes to make to the chunk and the histogram
if possible.

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2023-01-04 15:30:06 +05:30
parent a87e7e9e33
commit d7f5129042
No known key found for this signature in database
GPG Key ID: F056451B52F1DC34
5 changed files with 375 additions and 28 deletions

View File

@ -215,14 +215,10 @@ func (a *FloatHistogramAppender) AppendHistogram(int64, *histogram.Histogram) {
// The chunk is not appendable in the following cases:
//
// • The schema has changed.
//
// • The threshold for the zero bucket has changed.
//
// • Any buckets have disappeared.
//
// • There was a counter reset in the count of observations or in any bucket,
// including the zero bucket.
//
// • The last sample in the chunk was stale while the current sample is not stale.
//
// The method returns an additional boolean set to true if it is not appendable
@ -260,12 +256,12 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) (
}
var ok bool
positiveInterjections, ok = compareSpans(a.pSpans, h.PositiveSpans)
positiveInterjections, ok = forwardCompareSpans(a.pSpans, h.PositiveSpans)
if !ok {
counterReset = true
return
}
negativeInterjections, ok = compareSpans(a.nSpans, h.NegativeSpans)
negativeInterjections, ok = forwardCompareSpans(a.nSpans, h.NegativeSpans)
if !ok {
counterReset = true
return
@ -281,6 +277,44 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) (
return
}
// AppendableGauge returns whether the chunk can be appended to, and if so
// whether:
// 1. Any recoding needs to happen to the chunk using the provided interjections
// (in case of any new buckets, positive or negative range, respectively).
// 2. Any recoding needs to happen for the histogram being appended, using the backward interjections
// (in case of any missing buckets, positive or negative range, respectively).
//
// The chunk is not appendable in the following cases:
//
// • The schema has changed.
// • The threshold for the zero bucket has changed.
// • The last sample in the chunk was stale while the current sample is not stale.
func (a *FloatHistogramAppender) AppendableGauge(h *histogram.FloatHistogram) (
positiveInterjections, negativeInterjections []Interjection,
backwardPositiveInterjections, backwardNegativeInterjections []Interjection,
okToAppend bool,
) {
if value.IsStaleNaN(h.Sum) {
// This is a stale sample whose buckets and spans don't matter.
okToAppend = true
return
}
if value.IsStaleNaN(a.sum.value) {
// If the last sample was stale, then we can only accept stale
// samples in this chunk.
return
}
if h.Schema != a.schema || h.ZeroThreshold != a.zThreshold {
return
}
positiveInterjections, backwardPositiveInterjections = bidirectionalCompareSpans(a.pSpans, h.PositiveSpans)
negativeInterjections, backwardNegativeInterjections = bidirectionalCompareSpans(a.nSpans, h.NegativeSpans)
okToAppend = true
return
}
// counterResetInAnyFloatBucket returns true if there was a counter reset for any
// bucket. This should be called only when the bucket layout is the same or new
// buckets were added. It does not handle the case of buckets missing.

View File

@ -357,3 +357,170 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
require.True(t, cr)
}
}
func TestFloatHistogramChunkAppendableGauge(t *testing.T) {
c := Chunk(NewFloatHistogramChunk())
// Create fresh appender and add the first histogram.
app, err := c.Appender()
require.NoError(t, err)
require.Equal(t, 0, c.NumSamples())
ts := int64(1234567890)
h1 := &histogram.FloatHistogram{
Count: 5,
ZeroCount: 2,
Sum: 18.4,
ZeroThreshold: 1e-125,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 1},
{Offset: 3, Length: 2},
{Offset: 3, Length: 1},
{Offset: 1, Length: 1},
},
PositiveBuckets: []float64{6, 3, 3, 2, 4, 5, 1},
}
app.AppendFloatHistogram(ts, h1.Copy())
require.Equal(t, 1, c.NumSamples())
{ // Schema change.
h2 := h1.Copy()
h2.Schema++
hApp, _ := app.(*FloatHistogramAppender)
_, _, _, _, ok := hApp.AppendableGauge(h2)
require.False(t, ok)
}
{ // Zero threshold change.
h2 := h1.Copy()
h2.ZeroThreshold += 0.1
hApp, _ := app.(*FloatHistogramAppender)
_, _, _, _, ok := hApp.AppendableGauge(h2)
require.False(t, ok)
}
{ // New histogram that has more buckets.
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 3},
{Offset: 1, Length: 1},
{Offset: 1, Length: 4},
{Offset: 3, Length: 3},
}
h2.Count += 9
h2.ZeroCount++
h2.Sum = 30
h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 1}
hApp, _ := app.(*FloatHistogramAppender)
pI, nI, pBackwardI, nBackwardI, ok := hApp.AppendableGauge(h2)
require.Greater(t, len(pI), 0)
require.Len(t, nI, 0)
require.Len(t, pBackwardI, 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
}
{ // New histogram that has buckets missing.
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 1},
{Offset: 3, Length: 1},
{Offset: 4, Length: 1},
{Offset: 1, Length: 1},
}
h2.Count -= 4
h2.Sum--
h2.PositiveBuckets = []float64{6, 3, 3, 2, 5, 1}
hApp, _ := app.(*FloatHistogramAppender)
pI, nI, pBackwardI, nBackwardI, ok := hApp.AppendableGauge(h2)
require.Len(t, pI, 0)
require.Len(t, nI, 0)
require.Greater(t, len(pBackwardI), 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
}
{ // New histogram that has a bucket missing and new buckets.
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 5, Length: 2},
{Offset: 3, Length: 1},
{Offset: 1, Length: 1},
}
h2.Sum = 21
h2.PositiveBuckets = []float64{6, 3, 2, 4, 5, 1}
hApp, _ := app.(*FloatHistogramAppender)
pI, nI, pBackwardI, nBackwardI, ok := hApp.AppendableGauge(h2)
require.Greater(t, len(pI), 0)
require.Greater(t, len(pBackwardI), 0)
require.Len(t, nI, 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
}
{ // New histogram that has a counter reset while buckets are same.
h2 := h1.Copy()
h2.Sum = 23
h2.PositiveBuckets = []float64{6, 2, 3, 2, 4, 5, 1}
hApp, _ := app.(*FloatHistogramAppender)
pI, nI, pBackwardI, nBackwardI, ok := hApp.AppendableGauge(h2)
require.Len(t, pI, 0)
require.Len(t, nI, 0)
require.Len(t, pBackwardI, 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
}
{ // New histogram that has a counter reset while new buckets were added.
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 3},
{Offset: 1, Length: 1},
{Offset: 1, Length: 4},
{Offset: 3, Length: 3},
}
h2.Sum = 29
h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 0}
hApp, _ := app.(*FloatHistogramAppender)
pI, nI, pBackwardI, nBackwardI, ok := hApp.AppendableGauge(h2)
require.Greater(t, len(pI), 0)
require.Len(t, nI, 0)
require.Len(t, pBackwardI, 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
}
{
// New histogram that has a counter reset while new buckets were
// added before the first bucket and reset on first bucket.
h2 := h1.Copy()
h2.PositiveSpans = []histogram.Span{
{Offset: -3, Length: 2},
{Offset: 1, Length: 2},
{Offset: 2, Length: 1},
{Offset: 3, Length: 2},
{Offset: 3, Length: 1},
{Offset: 1, Length: 1},
}
h2.Sum = 26
h2.PositiveBuckets = []float64{1, 2, 5, 3, 3, 2, 4, 5, 1}
hApp, _ := app.(*FloatHistogramAppender)
pI, nI, pBackwardI, nBackwardI, ok := hApp.AppendableGauge(h2)
require.Greater(t, len(pI), 0)
require.Len(t, nI, 0)
require.Len(t, pBackwardI, 0)
require.Len(t, nBackwardI, 0)
require.True(t, ok)
}
}

View File

@ -286,12 +286,12 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) (
}
var ok bool
positiveInterjections, ok = compareSpans(a.pSpans, h.PositiveSpans)
positiveInterjections, ok = forwardCompareSpans(a.pSpans, h.PositiveSpans)
if !ok {
counterReset = true
return
}
negativeInterjections, ok = compareSpans(a.nSpans, h.NegativeSpans)
negativeInterjections, ok = forwardCompareSpans(a.nSpans, h.NegativeSpans)
if !ok {
counterReset = true
return

View File

@ -191,7 +191,7 @@ type Interjection struct {
num int
}
// compareSpans returns the interjections to convert a slice of deltas to a new
// forwardCompareSpans returns the interjections to convert a slice of deltas to a new
// slice representing an expanded set of buckets, or false if incompatible
// (e.g. if buckets were removed).
//
@ -226,11 +226,11 @@ type Interjection struct {
// match a new span layout that adds buckets, we simply need to generate a list
// of interjections.
//
// Note: Within compareSpans we don't have to worry about the changes to the
// Note: Within forwardCompareSpans we don't have to worry about the changes to the
// spans themselves, thanks to the iterators we get to work with the more useful
// bucket indices (which of course directly correspond to the buckets we have to
// adjust).
func compareSpans(a, b []histogram.Span) ([]Interjection, bool) {
func forwardCompareSpans(a, b []histogram.Span) (forward []Interjection, ok bool) {
ai := newBucketIterator(a)
bi := newBucketIterator(b)
@ -278,6 +278,80 @@ loop:
return interjections, true
}
// bidirectionalCompareSpans does everything that forwardCompareSpans does and
// also returns interjections in the other direction (i.e. buckets missing in b that are missing in a).
func bidirectionalCompareSpans(a, b []histogram.Span) (forward, backward []Interjection) {
ai := newBucketIterator(a)
bi := newBucketIterator(b)
var interjections, bInterjections []Interjection
// When inter.num becomes > 0, this becomes a valid interjection that
// should be yielded when we finish a streak of new buckets.
var inter, bInter Interjection
av, aOK := ai.Next()
bv, bOK := bi.Next()
loop:
for {
switch {
case aOK && bOK:
switch {
case av == bv: // Both have an identical value. move on!
// Finish WIP interjection and reset.
if inter.num > 0 {
interjections = append(interjections, inter)
inter.num = 0
}
if bInter.num > 0 {
bInterjections = append(bInterjections, bInter)
bInter.num = 0
}
av, aOK = ai.Next()
bv, bOK = bi.Next()
inter.pos++
bInter.pos++
case av < bv: // b misses a value that is in a.
bInter.num++
// Collect the forward interjection before advancing the
// position of 'a'.
if inter.num > 0 {
interjections = append(interjections, inter)
inter.num = 0
}
inter.pos++
av, aOK = ai.Next()
case av > bv: // a misses a value that is in b. Forward b and recompare.
inter.num++
// Collect the backward interjection before advancing the
// position of 'b'.
if bInter.num > 0 {
bInterjections = append(bInterjections, bInter)
bInter.num = 0
}
bInter.pos++
bv, bOK = bi.Next()
}
case aOK && !bOK: // b misses a value that is in a.
bInter.num++
av, aOK = ai.Next()
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
inter.num++
bv, bOK = bi.Next()
default: // Both iterators ran out. We're done.
if inter.num > 0 {
interjections = append(interjections, inter)
}
if bInter.num > 0 {
bInterjections = append(bInterjections, bInter)
}
break loop
}
}
return interjections, bInterjections
}
// interject merges 'in' with the provided interjections and writes them into
// 'out', which must already have the appropriate length.
func interject[BV bucketValue](in, out []BV, interjections []Interjection, deltas bool) []BV {

View File

@ -111,13 +111,12 @@ func TestBucketIterator(t *testing.T) {
}
}
func TestInterjection(t *testing.T) {
func TestCompareSpansAndInterject(t *testing.T) {
scenarios := []struct {
description string
spansA, spansB []histogram.Span
valid bool
interjections []Interjection
bucketsIn, bucketsOut []int64
description string
spansA, spansB []histogram.Span
interjections, backwardInterjections []Interjection
bucketsIn, bucketsOut []int64
}{
{
description: "single prepend at the beginning",
@ -127,7 +126,6 @@ func TestInterjection(t *testing.T) {
spansB: []histogram.Span{
{Offset: -11, Length: 4},
},
valid: true,
interjections: []Interjection{
{
pos: 0,
@ -145,7 +143,6 @@ func TestInterjection(t *testing.T) {
spansB: []histogram.Span{
{Offset: -10, Length: 4},
},
valid: true,
interjections: []Interjection{
{
pos: 3,
@ -163,7 +160,6 @@ func TestInterjection(t *testing.T) {
spansB: []histogram.Span{
{Offset: -12, Length: 5},
},
valid: true,
interjections: []Interjection{
{
pos: 0,
@ -181,7 +177,6 @@ func TestInterjection(t *testing.T) {
spansB: []histogram.Span{
{Offset: -10, Length: 5},
},
valid: true,
interjections: []Interjection{
{
pos: 3,
@ -199,7 +194,6 @@ func TestInterjection(t *testing.T) {
spansB: []histogram.Span{
{Offset: -12, Length: 7},
},
valid: true,
interjections: []Interjection{
{
pos: 0,
@ -221,7 +215,9 @@ func TestInterjection(t *testing.T) {
spansB: []histogram.Span{
{Offset: -9, Length: 3},
},
valid: false,
backwardInterjections: []Interjection{
{pos: 0, num: 1},
},
},
{
description: "single removal of bucket in the middle",
@ -232,7 +228,9 @@ func TestInterjection(t *testing.T) {
{Offset: -10, Length: 2},
{Offset: 1, Length: 1},
},
valid: false,
backwardInterjections: []Interjection{
{pos: 2, num: 1},
},
},
{
description: "single removal of bucket at the end",
@ -242,7 +240,9 @@ func TestInterjection(t *testing.T) {
spansB: []histogram.Span{
{Offset: -10, Length: 3},
},
valid: false,
backwardInterjections: []Interjection{
{pos: 3, num: 1},
},
},
{
description: "as described in doc comment",
@ -259,7 +259,6 @@ func TestInterjection(t *testing.T) {
{Offset: 1, Length: 4},
{Offset: 3, Length: 3},
},
valid: true,
interjections: []Interjection{
{
pos: 2,
@ -277,12 +276,67 @@ func TestInterjection(t *testing.T) {
bucketsIn: []int64{6, -3, 0, -1, 2, 1, -4},
bucketsOut: []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1},
},
{
description: "both forward and backward interjections, complex case",
spansA: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 1},
{Offset: 3, Length: 2},
{Offset: 3, Length: 1},
{Offset: 1, Length: 1},
},
spansB: []histogram.Span{
{Offset: 1, Length: 2},
{Offset: 1, Length: 1},
{Offset: 1, Length: 2},
{Offset: 1, Length: 1},
{Offset: 4, Length: 1},
},
interjections: []Interjection{
{
pos: 2,
num: 1,
},
{
pos: 3,
num: 2,
},
{
pos: 6,
num: 1,
},
},
backwardInterjections: []Interjection{
{
pos: 0,
num: 1,
},
{
pos: 5,
num: 1,
},
{
pos: 6,
num: 1,
},
{
pos: 7,
num: 1,
},
},
},
}
for _, s := range scenarios {
t.Run(s.description, func(t *testing.T) {
interjections, valid := compareSpans(s.spansA, s.spansB)
if !s.valid {
if len(s.backwardInterjections) > 0 {
interjections, bInterjections := bidirectionalCompareSpans(s.spansA, s.spansB)
require.Equal(t, s.interjections, interjections)
require.Equal(t, s.backwardInterjections, bInterjections)
}
interjections, valid := forwardCompareSpans(s.spansA, s.spansB)
if len(s.backwardInterjections) > 0 {
require.False(t, valid, "compareScan unexpectedly returned true")
return
}
@ -292,6 +346,24 @@ func TestInterjection(t *testing.T) {
gotBuckets := make([]int64, len(s.bucketsOut))
interject(s.bucketsIn, gotBuckets, interjections, true)
require.Equal(t, s.bucketsOut, gotBuckets)
floatBucketsIn := make([]float64, len(s.bucketsIn))
last := s.bucketsIn[0]
floatBucketsIn[0] = float64(last)
for i := 1; i < len(floatBucketsIn); i++ {
last += s.bucketsIn[i]
floatBucketsIn[i] = float64(last)
}
floatBucketsOut := make([]float64, len(s.bucketsOut))
last = s.bucketsOut[0]
floatBucketsOut[0] = float64(last)
for i := 1; i < len(floatBucketsOut); i++ {
last += s.bucketsOut[i]
floatBucketsOut[i] = float64(last)
}
gotFloatBuckets := make([]float64, len(floatBucketsOut))
interject(floatBucketsIn, gotFloatBuckets, interjections, false)
require.Equal(t, floatBucketsOut, gotFloatBuckets)
})
}
}