Cut a new chunk on counter resets for any bucket

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2021-08-17 22:04:45 +05:30
parent eedb86783e
commit c373200b75
No known key found for this signature in database
GPG Key ID: 0F8729A5EB59B965
2 changed files with 207 additions and 0 deletions

View File

@ -246,6 +246,7 @@ func (a *HistoAppender) Append(int64, float64) {}
// * the schema has changed // * the schema has changed
// * the zerobucket threshold has changed // * the zerobucket threshold has changed
// * any buckets disappeared // * any buckets disappeared
// * there was a counter reset in the count of observations or in any bucket, including the zero bucket
func (a *HistoAppender) Appendable(h histogram.SparseHistogram) ([]Interjection, []Interjection, bool) { func (a *HistoAppender) Appendable(h histogram.SparseHistogram) ([]Interjection, []Interjection, bool) {
if h.Schema != a.schema || h.ZeroThreshold != a.zeroThreshold { if h.Schema != a.schema || h.ZeroThreshold != a.zeroThreshold {
return nil, nil, false return nil, nil, false
@ -258,9 +259,89 @@ func (a *HistoAppender) Appendable(h histogram.SparseHistogram) ([]Interjection,
if !ok { if !ok {
return nil, nil, false return nil, nil, false
} }
if h.Count < a.cnt || h.ZeroCount < a.zcnt {
// There has been a counter reset.
return nil, nil, false
}
if counterResetInAnyBucket(a.posbuckets, h.PositiveBuckets, a.posSpans, h.PositiveSpans) {
return nil, nil, false
}
if counterResetInAnyBucket(a.negbuckets, h.NegativeBuckets, a.negSpans, h.NegativeSpans) {
return nil, nil, false
}
return posInterjections, negInterjections, ok return posInterjections, negInterjections, ok
} }
// counterResetInAnyBucket returns true if there was a counter reset for any bucket.
// This should be called only when buckets are same or new buckets were added,
// and does not handle the case of buckets missing.
func counterResetInAnyBucket(oldBuckets, newBuckets []int64, oldSpans, newSpans []histogram.Span) bool {
if len(oldSpans) == 0 || len(oldBuckets) == 0 {
return false
}
oldSpanSliceIdx, newSpanSliceIdx := 0, 0 // Index for the span slices.
oldInsideSpanIdx, newInsideSpanIdx := uint32(0), uint32(0) // Index inside a span.
oldIdx, newIdx := oldSpans[0].Offset, newSpans[0].Offset
oldBucketSliceIdx, newBucketSliceIdx := 0, 0 // Index inside bucket slice.
oldVal, newVal := oldBuckets[0], newBuckets[0]
// Since we assume that new spans won't have missing buckets, there will never be a case
// where the old index will not find a matching new index.
for {
if oldIdx == newIdx {
if newVal < oldVal {
return true
}
}
if oldIdx <= newIdx {
// Moving ahead old bucket and span by 1 index.
if oldInsideSpanIdx == oldSpans[oldSpanSliceIdx].Length-1 {
// Current span is over.
oldSpanSliceIdx++
oldInsideSpanIdx = 0
if oldSpanSliceIdx >= len(oldSpans) {
// All old spans are over.
break
}
oldIdx += 1 + oldSpans[oldSpanSliceIdx].Offset
} else {
oldInsideSpanIdx++
oldIdx++
}
oldBucketSliceIdx++
oldVal += oldBuckets[oldBucketSliceIdx]
}
if oldIdx > newIdx {
// Moving ahead new bucket and span by 1 index.
if newInsideSpanIdx == newSpans[newSpanSliceIdx].Length-1 {
// Current span is over.
newSpanSliceIdx++
newInsideSpanIdx = 0
if newSpanSliceIdx >= len(newSpans) {
// All new spans are over.
// This should not happen, old spans above should catch this first.
panic("new spans over before old spans in counterReset")
}
newIdx += 1 + newSpans[newSpanSliceIdx].Offset
} else {
newInsideSpanIdx++
newIdx++
}
newBucketSliceIdx++
newVal += newBuckets[newBucketSliceIdx]
}
}
return false
}
// AppendHistogram appends a SparseHistogram to the chunk. We assume the // AppendHistogram appends a SparseHistogram to the chunk. We assume the
// histogram is properly structured. E.g. that the number of pos/neg buckets // histogram is properly structured. E.g. that the number of pos/neg buckets
// used corresponds to the number conveyed by the pos/neg span structures. // used corresponds to the number conveyed by the pos/neg span structures.

View File

@ -193,3 +193,129 @@ func TestHistoChunkBucketChanges(t *testing.T) {
require.NoError(t, it.Err()) require.NoError(t, it.Err())
require.Equal(t, exp, act) require.Equal(t, exp, act)
} }
func TestHistoChunkAppendable(t *testing.T) {
c := Chunk(NewHistoChunk())
// Create fresh appender and add the first histogram.
app, err := c.Appender()
require.NoError(t, err)
require.Equal(t, 0, c.NumSamples())
ts := int64(1234567890)
h1 := histogram.SparseHistogram{
Count: 5,
ZeroCount: 2,
Sum: 18.4,
ZeroThreshold: 1e-125,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 1},
{Offset: 3, Length: 2},
{Offset: 3, Length: 1},
{Offset: 1, Length: 1},
},
PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24)
NegativeSpans: nil,
NegativeBuckets: []int64{},
}
app.AppendHistogram(ts, h1)
require.Equal(t, 1, c.NumSamples())
{ // New histogram that has more buckets.
h2 := h1
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 3},
{Offset: 1, Length: 1},
{Offset: 1, Length: 4},
{Offset: 3, Length: 3},
}
h2.Count += 9
h2.ZeroCount++
h2.Sum = 30
// Existing histogram should get values converted from the above to: 6 3 0 3 0 0 2 4 5 0 1 (previous values with some new empty buckets in between)
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30)
histoApp, _ := app.(*HistoAppender)
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
require.Greater(t, len(posInterjections), 0)
require.Equal(t, 0, len(negInterjections))
require.True(t, ok) // Only new buckets came in.
}
{ // New histogram that has a bucket missing.
h2 := h1
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 5, Length: 2},
{Offset: 3, Length: 1},
{Offset: 1, Length: 1},
}
h2.Sum = 21
h2.PositiveBuckets = []int64{6, -3, -1, 2, 1, -4} // counts: 6, 3, 2, 4, 5, 1 (total 21)
histoApp, _ := app.(*HistoAppender)
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
require.Equal(t, 0, len(posInterjections))
require.Equal(t, 0, len(negInterjections))
require.False(t, ok) // Need to cut a new chunk.
}
{ // New histogram that has a counter reset while buckets are same.
h2 := h1
h2.Sum = 23
h2.PositiveBuckets = []int64{6, -4, 1, -1, 2, 1, -4} // counts: 6, 2, 3, 2, 4, 5, 1 (total 23)
histoApp, _ := app.(*HistoAppender)
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
require.Equal(t, 0, len(posInterjections))
require.Equal(t, 0, len(negInterjections))
require.False(t, ok) // Need to cut a new chunk.
}
{ // New histogram that has a counter reset while new buckets were added.
h2 := h1
h2.PositiveSpans = []histogram.Span{
{Offset: 0, Length: 3},
{Offset: 1, Length: 1},
{Offset: 1, Length: 4},
{Offset: 3, Length: 3},
}
h2.Sum = 29
// Existing histogram should get values converted from the above to: 6 3 0 3 0 0 2 4 5 0 1 (previous values with some new empty buckets in between)
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 0} // 7 5 1 3 1 0 2 5 5 0 0 (total 29)
histoApp, _ := app.(*HistoAppender)
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
require.Equal(t, 0, len(posInterjections))
require.Equal(t, 0, len(negInterjections))
require.False(t, ok) // Need to cut a new chunk.
}
{ // New histogram that has a counter reset while new buckets were added before the first bucket and reset on first bucket.
// (to catch the edge case where the new bucket should be forwarded ahead until first old bucket at start)
h2 := h1
h2.PositiveSpans = []histogram.Span{
{Offset: -3, Length: 2},
{Offset: 1, Length: 2},
{Offset: 2, Length: 1},
{Offset: 3, Length: 2},
{Offset: 3, Length: 1},
{Offset: 1, Length: 1},
}
h2.Sum = 26
// Existing histogram should get values converted from the above to: 0, 0, 6, 3, 3, 2, 4, 5, 1
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
h2.PositiveBuckets = []int64{1, 1, 3, -2, 0, -1, 2, 1, -4} // counts: 1, 2, 5, 3, 3, 2, 4, 5, 1 (total 26)
histoApp, _ := app.(*HistoAppender)
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
require.Equal(t, 0, len(posInterjections))
require.Equal(t, 0, len(negInterjections))
require.False(t, ok) // Need to cut a new chunk.
}
}