diff --git a/tsdb/chunkenc/histo.go b/tsdb/chunkenc/histo.go index 039bc4dd3..82700b8b4 100644 --- a/tsdb/chunkenc/histo.go +++ b/tsdb/chunkenc/histo.go @@ -246,6 +246,7 @@ func (a *HistoAppender) Append(int64, float64) {} // * the schema has changed // * the zerobucket threshold has changed // * any buckets disappeared +// * there was a counter reset in the count of observations or in any bucket, including the zero bucket func (a *HistoAppender) Appendable(h histogram.SparseHistogram) ([]Interjection, []Interjection, bool) { if h.Schema != a.schema || h.ZeroThreshold != a.zeroThreshold { return nil, nil, false @@ -258,9 +259,89 @@ func (a *HistoAppender) Appendable(h histogram.SparseHistogram) ([]Interjection, if !ok { return nil, nil, false } + + if h.Count < a.cnt || h.ZeroCount < a.zcnt { + // There has been a counter reset. + return nil, nil, false + } + + if counterResetInAnyBucket(a.posbuckets, h.PositiveBuckets, a.posSpans, h.PositiveSpans) { + return nil, nil, false + } + if counterResetInAnyBucket(a.negbuckets, h.NegativeBuckets, a.negSpans, h.NegativeSpans) { + return nil, nil, false + } + return posInterjections, negInterjections, ok } +// counterResetInAnyBucket returns true if there was a counter reset for any bucket. +// This should be called only when buckets are same or new buckets were added, +// and does not handle the case of buckets missing. +func counterResetInAnyBucket(oldBuckets, newBuckets []int64, oldSpans, newSpans []histogram.Span) bool { + if len(oldSpans) == 0 || len(oldBuckets) == 0 { + return false + } + + oldSpanSliceIdx, newSpanSliceIdx := 0, 0 // Index for the span slices. + oldInsideSpanIdx, newInsideSpanIdx := uint32(0), uint32(0) // Index inside a span. + oldIdx, newIdx := oldSpans[0].Offset, newSpans[0].Offset + + oldBucketSliceIdx, newBucketSliceIdx := 0, 0 // Index inside bucket slice. + oldVal, newVal := oldBuckets[0], newBuckets[0] + + // Since we assume that new spans won't have missing buckets, there will never be a case + // where the old index will not find a matching new index. + for { + if oldIdx == newIdx { + if newVal < oldVal { + return true + } + } + + if oldIdx <= newIdx { + // Moving ahead old bucket and span by 1 index. + if oldInsideSpanIdx == oldSpans[oldSpanSliceIdx].Length-1 { + // Current span is over. + oldSpanSliceIdx++ + oldInsideSpanIdx = 0 + if oldSpanSliceIdx >= len(oldSpans) { + // All old spans are over. + break + } + oldIdx += 1 + oldSpans[oldSpanSliceIdx].Offset + } else { + oldInsideSpanIdx++ + oldIdx++ + } + oldBucketSliceIdx++ + oldVal += oldBuckets[oldBucketSliceIdx] + } + + if oldIdx > newIdx { + // Moving ahead new bucket and span by 1 index. + if newInsideSpanIdx == newSpans[newSpanSliceIdx].Length-1 { + // Current span is over. + newSpanSliceIdx++ + newInsideSpanIdx = 0 + if newSpanSliceIdx >= len(newSpans) { + // All new spans are over. + // This should not happen, old spans above should catch this first. + panic("new spans over before old spans in counterReset") + } + newIdx += 1 + newSpans[newSpanSliceIdx].Offset + } else { + newInsideSpanIdx++ + newIdx++ + } + newBucketSliceIdx++ + newVal += newBuckets[newBucketSliceIdx] + } + } + + return false +} + // AppendHistogram appends a SparseHistogram to the chunk. We assume the // histogram is properly structured. E.g. that the number of pos/neg buckets // used corresponds to the number conveyed by the pos/neg span structures. diff --git a/tsdb/chunkenc/histo_test.go b/tsdb/chunkenc/histo_test.go index 5a6e657ca..7b9ba7bde 100644 --- a/tsdb/chunkenc/histo_test.go +++ b/tsdb/chunkenc/histo_test.go @@ -193,3 +193,129 @@ func TestHistoChunkBucketChanges(t *testing.T) { require.NoError(t, it.Err()) require.Equal(t, exp, act) } + +func TestHistoChunkAppendable(t *testing.T) { + c := Chunk(NewHistoChunk()) + + // Create fresh appender and add the first histogram. + app, err := c.Appender() + require.NoError(t, err) + require.Equal(t, 0, c.NumSamples()) + + ts := int64(1234567890) + h1 := histogram.SparseHistogram{ + Count: 5, + ZeroCount: 2, + Sum: 18.4, + ZeroThreshold: 1e-125, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + }, + PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24) + NegativeSpans: nil, + NegativeBuckets: []int64{}, + } + + app.AppendHistogram(ts, h1) + require.Equal(t, 1, c.NumSamples()) + + { // New histogram that has more buckets. + h2 := h1 + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 1}, + {Offset: 1, Length: 4}, + {Offset: 3, Length: 3}, + } + h2.Count += 9 + h2.ZeroCount++ + h2.Sum = 30 + // Existing histogram should get values converted from the above to: 6 3 0 3 0 0 2 4 5 0 1 (previous values with some new empty buckets in between) + // so the new histogram should have new counts >= these per-bucket counts, e.g.: + h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30) + + histoApp, _ := app.(*HistoAppender) + posInterjections, negInterjections, ok := histoApp.Appendable(h2) + require.Greater(t, len(posInterjections), 0) + require.Equal(t, 0, len(negInterjections)) + require.True(t, ok) // Only new buckets came in. + } + + { // New histogram that has a bucket missing. + h2 := h1 + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 5, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + } + h2.Sum = 21 + h2.PositiveBuckets = []int64{6, -3, -1, 2, 1, -4} // counts: 6, 3, 2, 4, 5, 1 (total 21) + + histoApp, _ := app.(*HistoAppender) + posInterjections, negInterjections, ok := histoApp.Appendable(h2) + require.Equal(t, 0, len(posInterjections)) + require.Equal(t, 0, len(negInterjections)) + require.False(t, ok) // Need to cut a new chunk. + } + + { // New histogram that has a counter reset while buckets are same. + h2 := h1 + h2.Sum = 23 + h2.PositiveBuckets = []int64{6, -4, 1, -1, 2, 1, -4} // counts: 6, 2, 3, 2, 4, 5, 1 (total 23) + + histoApp, _ := app.(*HistoAppender) + posInterjections, negInterjections, ok := histoApp.Appendable(h2) + require.Equal(t, 0, len(posInterjections)) + require.Equal(t, 0, len(negInterjections)) + require.False(t, ok) // Need to cut a new chunk. + } + + { // New histogram that has a counter reset while new buckets were added. + h2 := h1 + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 1}, + {Offset: 1, Length: 4}, + {Offset: 3, Length: 3}, + } + h2.Sum = 29 + // Existing histogram should get values converted from the above to: 6 3 0 3 0 0 2 4 5 0 1 (previous values with some new empty buckets in between) + // so the new histogram should have new counts >= these per-bucket counts, e.g.: + h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 0} // 7 5 1 3 1 0 2 5 5 0 0 (total 29) + + histoApp, _ := app.(*HistoAppender) + posInterjections, negInterjections, ok := histoApp.Appendable(h2) + require.Equal(t, 0, len(posInterjections)) + require.Equal(t, 0, len(negInterjections)) + require.False(t, ok) // Need to cut a new chunk. + } + + { // New histogram that has a counter reset while new buckets were added before the first bucket and reset on first bucket. + // (to catch the edge case where the new bucket should be forwarded ahead until first old bucket at start) + h2 := h1 + h2.PositiveSpans = []histogram.Span{ + {Offset: -3, Length: 2}, + {Offset: 1, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + } + h2.Sum = 26 + // Existing histogram should get values converted from the above to: 0, 0, 6, 3, 3, 2, 4, 5, 1 + // so the new histogram should have new counts >= these per-bucket counts, e.g.: + h2.PositiveBuckets = []int64{1, 1, 3, -2, 0, -1, 2, 1, -4} // counts: 1, 2, 5, 3, 3, 2, 4, 5, 1 (total 26) + + histoApp, _ := app.(*HistoAppender) + posInterjections, negInterjections, ok := histoApp.Appendable(h2) + require.Equal(t, 0, len(posInterjections)) + require.Equal(t, 0, len(negInterjections)) + require.False(t, ok) // Need to cut a new chunk. + } +}