From 0e1b9dd308b5cef8aaaa161114ad4855209c1cbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Rabenstein?= Date: Mon, 6 Dec 2021 13:49:18 +0100 Subject: [PATCH] Promql: Initial rate implementation for sparse histograms (#9926) Signed-off-by: beorn7 --- model/histogram/float_histogram.go | 289 +++++++++ model/histogram/float_histogram_test.go | 772 ++++++++++++++++++++++++ promql/engine_test.go | 25 +- promql/functions.go | 92 ++- 4 files changed, 1160 insertions(+), 18 deletions(-) create mode 100644 model/histogram/float_histogram_test.go diff --git a/model/histogram/float_histogram.go b/model/histogram/float_histogram.go index 576f8a6e9..a1cfebdd5 100644 --- a/model/histogram/float_histogram.go +++ b/model/histogram/float_histogram.go @@ -115,6 +115,295 @@ func (h *FloatHistogram) ZeroBucket() FloatBucket { } } +// Scale scales the FloatHistogram by the provided factor, i.e. it scales all +// bucket counts including the zero bucket and the count and the sum of +// observations. The bucket layout stays the same. This method changes the +// receiving histogram directly (rather than acting on a copy). It returns a +// pointer to the receiving histogram for convenience. +func (h *FloatHistogram) Scale(factor float64) *FloatHistogram { + h.ZeroCount *= factor + h.Count *= factor + h.Sum *= factor + for i := range h.PositiveBuckets { + h.PositiveBuckets[i] *= factor + } + for i := range h.NegativeBuckets { + h.NegativeBuckets[i] *= factor + } + return h +} + +// Add adds the provided other histogram to the receiving histogram. Count, Sum, +// and buckets from the other histogram are added to the corresponding +// components of the receiving histogram. Buckets in the other histogram that do +// not exist in the receiving histogram are inserted into the latter. The +// resulting histogram might have buckets with a population of zero or directly +// adjacent spans (offset=0). To normalize those, call the Compact method. +// +// This method returns a pointer to the receiving histogram for convenience. +// +// IMPORTANT: This method requires the Schema and the ZeroThreshold to be the +// same in both histograms. Otherwise, its behavior is undefined. +// TODO(beorn7): Change that! +func (h *FloatHistogram) Add(other *FloatHistogram) *FloatHistogram { + h.ZeroCount += other.ZeroCount + h.Count += other.Count + h.Sum += other.Sum + + // TODO(beorn7): If needed, this can be optimized by inspecting the + // spans in other and create missing buckets in h in batches. + iSpan, iBucket := -1, -1 + var iInSpan, index int32 + for it := other.PositiveBucketIterator(); it.Next(); { + b := it.At() + h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan = addBucket( + b, h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan, index, + ) + index = b.Index + } + iSpan, iBucket = -1, -1 + for it := other.NegativeBucketIterator(); it.Next(); { + b := it.At() + h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan = addBucket( + b, h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan, index, + ) + index = b.Index + } + return h +} + +// Sub works like Add but subtracts the other histogram. +// +// IMPORTANT: This method requires the Schema and the ZeroThreshold to be the +// same in both histograms. Otherwise, its behavior is undefined. +// TODO(beorn7): Change that! +func (h *FloatHistogram) Sub(other *FloatHistogram) *FloatHistogram { + h.ZeroCount -= other.ZeroCount + h.Count -= other.Count + h.Sum -= other.Sum + + // TODO(beorn7): If needed, this can be optimized by inspecting the + // spans in other and create missing buckets in h in batches. + iSpan, iBucket := -1, -1 + var iInSpan, index int32 + for it := other.PositiveBucketIterator(); it.Next(); { + b := it.At() + b.Count *= -1 + h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan = addBucket( + b, h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan, index, + ) + index = b.Index + } + iSpan, iBucket = -1, -1 + for it := other.NegativeBucketIterator(); it.Next(); { + b := it.At() + b.Count *= -1 + h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan = addBucket( + b, h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan, index, + ) + index = b.Index + } + return h +} + +// addBucket takes the "coordinates" of the last bucket that was handled and +// adds the provided bucket after it. If a corresponding bucket exists, the +// count is added. If not, the bucket is inserted. The updated slices and the +// coordinates of the inserted or added-to bucket are returned. +func addBucket( + b FloatBucket, + spans []Span, buckets []float64, + iSpan, iBucket int, + iInSpan, index int32, +) ( + newSpans []Span, newBuckets []float64, + newISpan, newIBucket int, newIInSpan int32, +) { + if iSpan == -1 { + // First add, check if it is before all spans. + if len(spans) == 0 || spans[0].Offset > b.Index { + // Add bucket before all others. + buckets = append(buckets, 0) + copy(buckets[1:], buckets) + buckets[0] = b.Count + if spans[0].Offset == b.Index+1 { + spans[0].Length++ + spans[0].Offset-- + return spans, buckets, 0, 0, 0 + } + spans = append(spans, Span{}) + copy(spans[1:], spans) + spans[0] = Span{Offset: b.Index, Length: 1} + if len(spans) > 1 { + // Convert the absolute offset in the formerly + // first span to a relative offset. + spans[1].Offset -= b.Index + 1 + } + return spans, buckets, 0, 0, 0 + } + if spans[0].Offset == b.Index { + // Just add to first bucket. + buckets[0] += b.Count + return spans, buckets, 0, 0, 0 + } + // We are behind the first bucket, so set everything to the + // first bucket and continue normally. + iSpan, iBucket, iInSpan = 0, 0, 0 + index = spans[0].Offset + } + deltaIndex := b.Index - index + for { + remainingInSpan := int32(spans[iSpan].Length) - iInSpan + if deltaIndex < remainingInSpan { + // Bucket is in current span. + iBucket += int(deltaIndex) + iInSpan += deltaIndex + buckets[iBucket] += b.Count + return spans, buckets, iSpan, iBucket, iInSpan + } + deltaIndex -= remainingInSpan + iBucket += int(remainingInSpan) + iSpan++ + if iSpan == len(spans) || deltaIndex < spans[iSpan].Offset { + // Bucket is in gap behind previous span (or there are no further spans). + buckets = append(buckets, 0) + copy(buckets[iBucket+1:], buckets[iBucket:]) + buckets[iBucket] = b.Count + if deltaIndex == 0 { + // Directly after previous span, extend previous span. + if iSpan < len(spans) { + spans[iSpan].Offset-- + } + iSpan-- + iInSpan = int32(spans[iSpan].Length) + spans[iSpan].Length++ + return spans, buckets, iSpan, iBucket, iInSpan + } + if iSpan < len(spans) && deltaIndex == spans[iSpan].Offset-1 { + // Directly before next span, extend next span. + iInSpan = 0 + spans[iSpan].Offset-- + spans[iSpan].Length++ + return spans, buckets, iSpan, iBucket, iInSpan + } + // No next span, or next span is not directly adjacent to new bucket. + // Add new span. + iInSpan = 0 + if iSpan < len(spans) { + spans[iSpan].Offset -= deltaIndex + 1 + } + spans = append(spans, Span{}) + copy(spans[iSpan+1:], spans[iSpan:]) + spans[iSpan] = Span{Length: 1, Offset: deltaIndex} + return spans, buckets, iSpan, iBucket, iInSpan + } + // Try start of next span. + deltaIndex -= spans[iSpan].Offset + iInSpan = 0 + } +} + +// Compact eliminates empty buckets at the beginning and end of each span, then +// merges spans that are consecutive or at most maxEmptyBuckets apart, and +// finally splits spans that contain more than maxEmptyBuckets. The compaction +// happens "in place" in the receiving histogram, but a pointer to it is +// returned for convenience. +func (h *FloatHistogram) Compact(maxEmptyBuckets int) *FloatHistogram { + // TODO(beorn7): Implement. + return h +} + +// DetectReset returns true if the receiving histogram is missing any buckets +// that have a non-zero population in the provided previous histogram. It also +// returns true if any count (in any bucket, in the zero count, or in the count +// of observations, but NOT the sum of observations) is smaller in the receiving +// histogram compared to the previous histogram. Otherwise, it returns false. +// +// IMPORTANT: This method requires the Schema and the ZeroThreshold to be the +// same in both histograms. Otherwise, its behavior is undefined. +// TODO(beorn7): Change that! +// +// Note that this kind of reset detection is quite expensive. Ideally, resets +// are detected at ingest time and stored in the TSDB, so that the reset +// information can be read directly from there rather than be detected each time +// again. +func (h *FloatHistogram) DetectReset(previous *FloatHistogram) bool { + if h.Count < previous.Count { + return true + } + if h.ZeroCount < previous.ZeroCount { + return true + } + currIt := h.PositiveBucketIterator() + prevIt := previous.PositiveBucketIterator() + if detectReset(currIt, prevIt) { + return true + } + currIt = h.NegativeBucketIterator() + prevIt = previous.NegativeBucketIterator() + return detectReset(currIt, prevIt) +} + +func detectReset(currIt, prevIt FloatBucketIterator) bool { + if !prevIt.Next() { + return false // If no buckets in previous histogram, nothing can be reset. + } + prevBucket := prevIt.At() + if !currIt.Next() { + // No bucket in current, but at least one in previous + // histogram. Check if any of those are non-zero, in which case + // this is a reset. + for { + if prevBucket.Count != 0 { + return true + } + if !prevIt.Next() { + return false + } + } + } + currBucket := currIt.At() + for { + // Forward currIt until we find the bucket corresponding to prevBucket. + for currBucket.Index < prevBucket.Index { + if !currIt.Next() { + // Reached end of currIt early, therefore + // previous histogram has a bucket that the + // current one does not have. Unlass all + // remaining buckets in the previous histogram + // are unpopulated, this is a reset. + for { + if prevBucket.Count != 0 { + return true + } + if !prevIt.Next() { + return false + } + } + } + currBucket = currIt.At() + } + if currBucket.Index > prevBucket.Index { + // Previous histogram has a bucket the current one does + // not have. If it's populated, it's a reset. + if prevBucket.Count != 0 { + return true + } + } else { + // We have reached corresponding buckets in both iterators. + // We can finally compare the counts. + if currBucket.Count < prevBucket.Count { + return true + } + } + if !prevIt.Next() { + // Reached end of prevIt without finding offending buckets. + return false + } + prevBucket = prevIt.At() + } +} + // PositiveBucketIterator returns a FloatBucketIterator to iterate over all // positive buckets in ascending order (starting next to the zero bucket and // going up). diff --git a/model/histogram/float_histogram_test.go b/model/histogram/float_histogram_test.go new file mode 100644 index 000000000..b372685d8 --- /dev/null +++ b/model/histogram/float_histogram_test.go @@ -0,0 +1,772 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package histogram + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestFloatHistogramScale(t *testing.T) { + cases := []struct { + name string + in *FloatHistogram + scale float64 + expected *FloatHistogram + }{ + { + "zero value", + &FloatHistogram{}, + 3.1415, + &FloatHistogram{}, + }, + { + "no-op", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 1}, {2, 3}}, + PositiveBuckets: []float64{1, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + 1, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 1}, {2, 3}}, + PositiveBuckets: []float64{1, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + }, + { + "double", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 1}, {2, 3}}, + PositiveBuckets: []float64{1, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + 2, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 3493.3 * 2, + Sum: 2349209.324 * 2, + PositiveSpans: []Span{{-2, 1}, {2, 3}}, + PositiveBuckets: []float64{2, 6.6, 8.4, 0.2}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{6.2, 6, 1.234e5 * 2, 2000}, + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + require.Equal(t, c.expected, c.in.Scale(c.scale)) + // Has it also happened in-place? + require.Equal(t, c.expected, c.in) + }) + } +} + +func TestFloatHistogramDetectReset(t *testing.T) { + cases := []struct { + name string + previous, current *FloatHistogram + resetExpected bool + }{ + { + "zero values", + &FloatHistogram{}, + &FloatHistogram{}, + false, + }, + { + "no buckets to some buckets", + &FloatHistogram{}, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 1}, {2, 3}}, + PositiveBuckets: []float64{1, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + false, + }, + { + "some buckets to no buckets", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 1}, {2, 3}}, + PositiveBuckets: []float64{1, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + &FloatHistogram{}, + true, + }, + { + "one bucket appears, nothing else changes", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 1}, {2, 3}}, + PositiveBuckets: []float64{1, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + false, + }, + { + "one bucket disappears, nothing else changes", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 1}, {2, 3}}, + PositiveBuckets: []float64{1, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + true, + }, + { + "an unpopulated bucket disappears, nothing else changes", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 1}, {2, 3}}, + PositiveBuckets: []float64{1, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + false, + }, + { + "an unpopulated bucket at the end disappears, nothing else changes", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 1}, {1, 3}}, + PositiveBuckets: []float64{1, 3.3, 4.2, 0}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 1}, {1, 2}}, + PositiveBuckets: []float64{1, 3.3, 4.2}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + false, + }, + { + "an unpopulated bucket disappears in a histogram with nothing else", + &FloatHistogram{ + PositiveSpans: []Span{{23, 1}}, + PositiveBuckets: []float64{0}, + }, + &FloatHistogram{}, + false, + }, + { + "zero count goes up", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.6, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + false, + }, + { + "zero count goes down", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.4, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + true, + }, + { + "count goes up", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.4, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + false, + }, + { + "count goes down", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.2, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + true, + }, + { + "sum goes up", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349210, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + false, + }, + { + "sum goes down", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349200, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + false, + }, + { + "one positive bucket goes up", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3.3, 4.3, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + false, + }, + { + "one positive bucket goes down", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3.3, 4.1, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + true, + }, + { + "one negative bucket goes up", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3.1, 1.234e5, 1000}, + }, + false, + }, + { + "one negative bucket goes down", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 5.5, + Count: 3493.3, + Sum: 2349209.324, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3.1, 2.9, 1.234e5, 1000}, + }, + true, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + require.Equal(t, c.resetExpected, c.current.DetectReset(c.previous)) + }) + } +} + +func TestFloatHistogramCompact(t *testing.T) { + cases := []struct { + name string + in *FloatHistogram + maxEmptyBuckets int + expected *FloatHistogram + }{ + // TODO(beorn7): Add test cases. + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + require.Equal(t, c.expected, c.in.Compact(c.maxEmptyBuckets)) + // Has it also happened in-place? + require.Equal(t, c.expected, c.in) + }) + } +} + +func TestFloatHistogramAdd(t *testing.T) { + cases := []struct { + name string + in1, in2, expected *FloatHistogram + }{ + { + "same bucket layout", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 30, + Sum: 2.345, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3, 4, 7}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3, 1, 5, 6}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 8, + Count: 21, + Sum: 1.234, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{0, 0, 2, 3, 6}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{1, 1, 4, 4}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 19, + Count: 51, + Sum: 3.579, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 5, 7, 13}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{4, 2, 9, 10}, + }, + }, + { + "same bucket layout, defined differently", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 30, + Sum: 2.345, + PositiveSpans: []Span{{-2, 2}, {1, 1}, {0, 2}}, + PositiveBuckets: []float64{1, 0, 3, 4, 7}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3, 1, 5, 6}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 8, + Count: 21, + Sum: 1.234, + PositiveSpans: []Span{{-2, 2}, {1, 2}, {0, 1}}, + PositiveBuckets: []float64{0, 0, 2, 3, 6}, + NegativeSpans: []Span{{3, 7}}, + NegativeBuckets: []float64{1, 1, 0, 0, 0, 4, 4}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 19, + Count: 51, + Sum: 3.579, + PositiveSpans: []Span{{-2, 2}, {1, 1}, {0, 2}}, + PositiveBuckets: []float64{1, 0, 5, 7, 13}, + NegativeSpans: []Span{{3, 5}, {0, 2}}, + NegativeBuckets: []float64{4, 2, 0, 0, 0, 9, 10}, + }, + }, + { + "non-overlapping spans", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 30, + Sum: 2.345, + PositiveSpans: []Span{{-2, 2}, {2, 3}}, + PositiveBuckets: []float64{1, 0, 3, 4, 7}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3, 1, 5, 6}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 8, + Count: 21, + Sum: 1.234, + PositiveSpans: []Span{{0, 2}, {3, 3}}, + PositiveBuckets: []float64{5, 4, 2, 3, 6}, + NegativeSpans: []Span{{-9, 2}, {3, 2}}, + NegativeBuckets: []float64{1, 1, 4, 4}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 19, + Count: 51, + Sum: 3.579, + PositiveSpans: []Span{{-2, 4}, {0, 6}}, + PositiveBuckets: []float64{1, 0, 5, 4, 3, 4, 7, 2, 3, 6}, + NegativeSpans: []Span{{-9, 2}, {3, 2}, {5, 2}, {3, 2}}, + NegativeBuckets: []float64{1, 1, 4, 4, 3, 1, 5, 6}, + }, + }, + { + "non-overlapping inverted order", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 8, + Count: 21, + Sum: 1.234, + PositiveSpans: []Span{{0, 2}, {3, 3}}, + PositiveBuckets: []float64{5, 4, 2, 3, 6}, + NegativeSpans: []Span{{-9, 2}, {3, 2}}, + NegativeBuckets: []float64{1, 1, 4, 4}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 30, + Sum: 2.345, + PositiveSpans: []Span{{-2, 2}, {2, 3}}, + PositiveBuckets: []float64{1, 0, 3, 4, 7}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3, 1, 5, 6}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 19, + Count: 51, + Sum: 3.579, + PositiveSpans: []Span{{-2, 2}, {0, 5}, {0, 3}}, + PositiveBuckets: []float64{1, 0, 5, 4, 3, 4, 7, 2, 3, 6}, + NegativeSpans: []Span{{-9, 2}, {3, 2}, {5, 2}, {3, 2}}, + NegativeBuckets: []float64{1, 1, 4, 4, 3, 1, 5, 6}, + }, + }, + { + "overlapping spans", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 30, + Sum: 2.345, + PositiveSpans: []Span{{-2, 2}, {2, 3}}, + PositiveBuckets: []float64{1, 0, 3, 4, 7}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3, 1, 5, 6}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 8, + Count: 21, + Sum: 1.234, + PositiveSpans: []Span{{-1, 4}, {0, 3}}, + PositiveBuckets: []float64{5, 4, 2, 3, 6, 2, 5}, + NegativeSpans: []Span{{4, 2}, {1, 2}}, + NegativeBuckets: []float64{1, 1, 4, 4}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 19, + Count: 51, + Sum: 3.579, + PositiveSpans: []Span{{-2, 4}, {0, 4}}, + PositiveBuckets: []float64{1, 5, 4, 2, 6, 10, 9, 5}, + NegativeSpans: []Span{{3, 3}, {1, 3}}, + NegativeBuckets: []float64{3, 2, 1, 4, 9, 6}, + }, + }, + { + "overlapping spans inverted order", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 8, + Count: 21, + Sum: 1.234, + PositiveSpans: []Span{{-1, 4}, {0, 3}}, + PositiveBuckets: []float64{5, 4, 2, 3, 6, 2, 5}, + NegativeSpans: []Span{{4, 2}, {1, 2}}, + NegativeBuckets: []float64{1, 1, 4, 4}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 30, + Sum: 2.345, + PositiveSpans: []Span{{-2, 2}, {2, 3}}, + PositiveBuckets: []float64{1, 0, 3, 4, 7}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3, 1, 5, 6}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 19, + Count: 51, + Sum: 3.579, + PositiveSpans: []Span{{-2, 5}, {0, 3}}, + PositiveBuckets: []float64{1, 5, 4, 2, 6, 10, 9, 5}, + NegativeSpans: []Span{{3, 3}, {1, 3}}, + NegativeBuckets: []float64{3, 2, 1, 4, 9, 6}, + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + require.Equal(t, c.expected, c.in1.Add(c.in2)) + // Has it also happened in-place? + require.Equal(t, c.expected, c.in1) + }) + } +} + +func TestFloatHistogramSub(t *testing.T) { + // This has fewer test cases than TestFloatHistogramAdd because Add and + // Sub share most of the trickier code. + cases := []struct { + name string + in1, in2, expected *FloatHistogram + }{ + { + "same bucket layout", + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 11, + Count: 30, + Sum: 23, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 3, 4, 7}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{3, 1, 5, 6}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 8, + Count: 21, + Sum: 12, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{0, 0, 2, 3, 6}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{1, 1, 4, 4}, + }, + &FloatHistogram{ + ZeroThreshold: 0.01, + ZeroCount: 3, + Count: 9, + Sum: 11, + PositiveSpans: []Span{{-2, 2}, {1, 3}}, + PositiveBuckets: []float64{1, 0, 1, 1, 1}, + NegativeSpans: []Span{{3, 2}, {3, 2}}, + NegativeBuckets: []float64{2, 0, 1, 2}, + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + require.Equal(t, c.expected, c.in1.Sub(c.in2)) + // Has it also happened in-place? + require.Equal(t, c.expected, c.in1) + }) + } +} diff --git a/promql/engine_test.go b/promql/engine_test.go index 06577095e..d646514df 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/goleak" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql/parser" @@ -2619,11 +2620,8 @@ func TestRangeQuery(t *testing.T) { } func TestSparseHistogramRate(t *testing.T) { - // Currently, this test it to only find panics or errors in the engine execution path. - // The panic stack trace will mostly tell you what code path is breaking and needs fixing for - // fetching the raw histograms and passing it rightly upto the rate() function implementation. - // TODO: Check the result for correctness once implementation is ready. - + // TODO(beorn7): Integrate histograms into the PromQL testing framework + // and write more tests there. test, err := NewTest(t, "") require.NoError(t, err) defer test.Close() @@ -2646,4 +2644,21 @@ func TestSparseHistogramRate(t *testing.T) { require.NoError(t, err) res := qry.Exec(test.Context()) require.NoError(t, res.Err) + vector, err := res.Vector() + require.NoError(t, err) + require.Len(t, vector, 1) + actualHistogram := vector[0].H + expectedHistogram := &histogram.FloatHistogram{ + Schema: 1, + ZeroThreshold: 0.001, + ZeroCount: 1. / 15., + Count: 4. / 15., + Sum: 1.226666666666667, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, + } + require.Equal(t, expectedHistogram, actualHistogram) } diff --git a/promql/functions.go b/promql/functions.go index fbafc7864..899d5c624 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -24,6 +24,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" ) @@ -60,9 +61,11 @@ func extrapolatedRate(vals []parser.Value, args parser.Expressions, enh *EvalNod ms := args[0].(*parser.MatrixSelector) vs := ms.VectorSelector.(*parser.VectorSelector) var ( - samples = vals[0].(Matrix)[0] - rangeStart = enh.Ts - durationMilliseconds(ms.Range+vs.Offset) - rangeEnd = enh.Ts - durationMilliseconds(vs.Offset) + samples = vals[0].(Matrix)[0] + rangeStart = enh.Ts - durationMilliseconds(ms.Range+vs.Offset) + rangeEnd = enh.Ts - durationMilliseconds(vs.Offset) + resultValue float64 + resultHistogram *histogram.FloatHistogram ) // No sense in trying to compute a rate without at least two points. Drop @@ -71,14 +74,32 @@ func extrapolatedRate(vals []parser.Value, args parser.Expressions, enh *EvalNod return enh.Out } - resultValue := samples.Points[len(samples.Points)-1].V - samples.Points[0].V - if isCounter { - var lastValue float64 - for _, sample := range samples.Points { - if sample.V < lastValue { - resultValue += lastValue + if samples.Points[0].H != nil { + resultHistogram = histogramRate(samples.Points, isCounter) + if resultHistogram == nil { + // Points are a mix of floats and histograms, or the histograms + // are not compatible with each other. + // TODO(beorn7): find a way of communicating the exact reason + return enh.Out + } + } else { + resultValue = samples.Points[len(samples.Points)-1].V - samples.Points[0].V + prevValue := samples.Points[0].V + // We have to iterate through everything even in the non-counter + // case because we have to check that everything is a float. + // TODO(beorn7): Find a way to check that earlier, e.g. by + // handing in a []FloatPoint and a []HistogramPoint separately. + for _, currPoint := range samples.Points[1:] { + if currPoint.H != nil { + return nil // Range contains a mix of histograms and floats. } - lastValue = sample.V + if !isCounter { + continue + } + if currPoint.V < prevValue { + resultValue += prevValue + } + prevValue = currPoint.V } } @@ -89,6 +110,7 @@ func extrapolatedRate(vals []parser.Value, args parser.Expressions, enh *EvalNod sampledInterval := float64(samples.Points[len(samples.Points)-1].T-samples.Points[0].T) / 1000 averageDurationBetweenSamples := sampledInterval / float64(len(samples.Points)-1) + // TODO(beorn7): Do this for histograms, too. if isCounter && resultValue > 0 && samples.Points[0].V >= 0 { // Counters cannot be negative. If we have any slope at // all (i.e. resultValue went up), we can extrapolate @@ -120,16 +142,60 @@ func extrapolatedRate(vals []parser.Value, args parser.Expressions, enh *EvalNod } else { extrapolateToInterval += averageDurationBetweenSamples / 2 } - resultValue = resultValue * (extrapolateToInterval / sampledInterval) + factor := extrapolateToInterval / sampledInterval if isRate { - resultValue = resultValue / ms.Range.Seconds() + factor /= ms.Range.Seconds() + } + if resultHistogram == nil { + resultValue *= factor + } else { + resultHistogram.Scale(factor) } return append(enh.Out, Sample{ - Point: Point{V: resultValue}, + Point: Point{V: resultValue, H: resultHistogram}, }) } +// histogramRate is a helper function for extrapolatedRate. It requires +// points[0] to be a histogram. It returns nil if any other Point in points is +// not a histogram. Currently, it also returns nil on mixed schemas or zero +// thresholds in the histograms, because it cannot handle those schema changes +// yet. +func histogramRate(points []Point, isCounter bool) *histogram.FloatHistogram { + prev := points[0].H // We already know that this is a histogram. + last := points[len(points)-1].H + if last == nil { + return nil // Last point in range is not a histogram. + } + if last.Schema != prev.Schema || last.ZeroThreshold != prev.ZeroThreshold { + return nil // TODO(beorn7): Handle schema changes properly. + } + h := last.Copy() + h.Sub(prev) + // We have to iterate through everything even in the non-counter case + // because we have to check that everything is a histogram. + // TODO(beorn7): Find a way to check that earlier, e.g. by handing in a + // []FloatPoint and a []HistogramPoint separately. + for _, currPoint := range points[1:] { + curr := currPoint.H + if curr == nil { + return nil // Range contains a mix of histograms and floats. + } + if !isCounter { + continue + } + if curr.Schema != prev.Schema || curr.ZeroThreshold != prev.ZeroThreshold { + return nil // TODO(beorn7): Handle schema changes properly. + } + if curr.DetectReset(prev) { + h.Add(prev) + } + prev = curr + } + return h.Compact(3) +} + // === delta(Matrix parser.ValueTypeMatrix) Vector === func funcDelta(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector { return extrapolatedRate(vals, args, enh, false, false)