Promql: Initial rate implementation for sparse histograms (#9926)

Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
Björn Rabenstein 2021-12-06 13:49:18 +01:00 committed by GitHub
parent e8e9155a11
commit 0e1b9dd308
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 1160 additions and 18 deletions

View File

@ -115,6 +115,295 @@ func (h *FloatHistogram) ZeroBucket() FloatBucket {
}
}
// Scale scales the FloatHistogram by the provided factor, i.e. it scales all
// bucket counts including the zero bucket and the count and the sum of
// observations. The bucket layout stays the same. This method changes the
// receiving histogram directly (rather than acting on a copy). It returns a
// pointer to the receiving histogram for convenience.
func (h *FloatHistogram) Scale(factor float64) *FloatHistogram {
h.ZeroCount *= factor
h.Count *= factor
h.Sum *= factor
for i := range h.PositiveBuckets {
h.PositiveBuckets[i] *= factor
}
for i := range h.NegativeBuckets {
h.NegativeBuckets[i] *= factor
}
return h
}
// Add adds the provided other histogram to the receiving histogram. Count, Sum,
// and buckets from the other histogram are added to the corresponding
// components of the receiving histogram. Buckets in the other histogram that do
// not exist in the receiving histogram are inserted into the latter. The
// resulting histogram might have buckets with a population of zero or directly
// adjacent spans (offset=0). To normalize those, call the Compact method.
//
// This method returns a pointer to the receiving histogram for convenience.
//
// IMPORTANT: This method requires the Schema and the ZeroThreshold to be the
// same in both histograms. Otherwise, its behavior is undefined.
// TODO(beorn7): Change that!
func (h *FloatHistogram) Add(other *FloatHistogram) *FloatHistogram {
h.ZeroCount += other.ZeroCount
h.Count += other.Count
h.Sum += other.Sum
// TODO(beorn7): If needed, this can be optimized by inspecting the
// spans in other and create missing buckets in h in batches.
iSpan, iBucket := -1, -1
var iInSpan, index int32
for it := other.PositiveBucketIterator(); it.Next(); {
b := it.At()
h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan = addBucket(
b, h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan, index,
)
index = b.Index
}
iSpan, iBucket = -1, -1
for it := other.NegativeBucketIterator(); it.Next(); {
b := it.At()
h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan = addBucket(
b, h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan, index,
)
index = b.Index
}
return h
}
// Sub works like Add but subtracts the other histogram.
//
// IMPORTANT: This method requires the Schema and the ZeroThreshold to be the
// same in both histograms. Otherwise, its behavior is undefined.
// TODO(beorn7): Change that!
func (h *FloatHistogram) Sub(other *FloatHistogram) *FloatHistogram {
h.ZeroCount -= other.ZeroCount
h.Count -= other.Count
h.Sum -= other.Sum
// TODO(beorn7): If needed, this can be optimized by inspecting the
// spans in other and create missing buckets in h in batches.
iSpan, iBucket := -1, -1
var iInSpan, index int32
for it := other.PositiveBucketIterator(); it.Next(); {
b := it.At()
b.Count *= -1
h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan = addBucket(
b, h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan, index,
)
index = b.Index
}
iSpan, iBucket = -1, -1
for it := other.NegativeBucketIterator(); it.Next(); {
b := it.At()
b.Count *= -1
h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan = addBucket(
b, h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan, index,
)
index = b.Index
}
return h
}
// addBucket takes the "coordinates" of the last bucket that was handled and
// adds the provided bucket after it. If a corresponding bucket exists, the
// count is added. If not, the bucket is inserted. The updated slices and the
// coordinates of the inserted or added-to bucket are returned.
func addBucket(
b FloatBucket,
spans []Span, buckets []float64,
iSpan, iBucket int,
iInSpan, index int32,
) (
newSpans []Span, newBuckets []float64,
newISpan, newIBucket int, newIInSpan int32,
) {
if iSpan == -1 {
// First add, check if it is before all spans.
if len(spans) == 0 || spans[0].Offset > b.Index {
// Add bucket before all others.
buckets = append(buckets, 0)
copy(buckets[1:], buckets)
buckets[0] = b.Count
if spans[0].Offset == b.Index+1 {
spans[0].Length++
spans[0].Offset--
return spans, buckets, 0, 0, 0
}
spans = append(spans, Span{})
copy(spans[1:], spans)
spans[0] = Span{Offset: b.Index, Length: 1}
if len(spans) > 1 {
// Convert the absolute offset in the formerly
// first span to a relative offset.
spans[1].Offset -= b.Index + 1
}
return spans, buckets, 0, 0, 0
}
if spans[0].Offset == b.Index {
// Just add to first bucket.
buckets[0] += b.Count
return spans, buckets, 0, 0, 0
}
// We are behind the first bucket, so set everything to the
// first bucket and continue normally.
iSpan, iBucket, iInSpan = 0, 0, 0
index = spans[0].Offset
}
deltaIndex := b.Index - index
for {
remainingInSpan := int32(spans[iSpan].Length) - iInSpan
if deltaIndex < remainingInSpan {
// Bucket is in current span.
iBucket += int(deltaIndex)
iInSpan += deltaIndex
buckets[iBucket] += b.Count
return spans, buckets, iSpan, iBucket, iInSpan
}
deltaIndex -= remainingInSpan
iBucket += int(remainingInSpan)
iSpan++
if iSpan == len(spans) || deltaIndex < spans[iSpan].Offset {
// Bucket is in gap behind previous span (or there are no further spans).
buckets = append(buckets, 0)
copy(buckets[iBucket+1:], buckets[iBucket:])
buckets[iBucket] = b.Count
if deltaIndex == 0 {
// Directly after previous span, extend previous span.
if iSpan < len(spans) {
spans[iSpan].Offset--
}
iSpan--
iInSpan = int32(spans[iSpan].Length)
spans[iSpan].Length++
return spans, buckets, iSpan, iBucket, iInSpan
}
if iSpan < len(spans) && deltaIndex == spans[iSpan].Offset-1 {
// Directly before next span, extend next span.
iInSpan = 0
spans[iSpan].Offset--
spans[iSpan].Length++
return spans, buckets, iSpan, iBucket, iInSpan
}
// No next span, or next span is not directly adjacent to new bucket.
// Add new span.
iInSpan = 0
if iSpan < len(spans) {
spans[iSpan].Offset -= deltaIndex + 1
}
spans = append(spans, Span{})
copy(spans[iSpan+1:], spans[iSpan:])
spans[iSpan] = Span{Length: 1, Offset: deltaIndex}
return spans, buckets, iSpan, iBucket, iInSpan
}
// Try start of next span.
deltaIndex -= spans[iSpan].Offset
iInSpan = 0
}
}
// Compact eliminates empty buckets at the beginning and end of each span, then
// merges spans that are consecutive or at most maxEmptyBuckets apart, and
// finally splits spans that contain more than maxEmptyBuckets. The compaction
// happens "in place" in the receiving histogram, but a pointer to it is
// returned for convenience.
func (h *FloatHistogram) Compact(maxEmptyBuckets int) *FloatHistogram {
// TODO(beorn7): Implement.
return h
}
// DetectReset returns true if the receiving histogram is missing any buckets
// that have a non-zero population in the provided previous histogram. It also
// returns true if any count (in any bucket, in the zero count, or in the count
// of observations, but NOT the sum of observations) is smaller in the receiving
// histogram compared to the previous histogram. Otherwise, it returns false.
//
// IMPORTANT: This method requires the Schema and the ZeroThreshold to be the
// same in both histograms. Otherwise, its behavior is undefined.
// TODO(beorn7): Change that!
//
// Note that this kind of reset detection is quite expensive. Ideally, resets
// are detected at ingest time and stored in the TSDB, so that the reset
// information can be read directly from there rather than be detected each time
// again.
func (h *FloatHistogram) DetectReset(previous *FloatHistogram) bool {
if h.Count < previous.Count {
return true
}
if h.ZeroCount < previous.ZeroCount {
return true
}
currIt := h.PositiveBucketIterator()
prevIt := previous.PositiveBucketIterator()
if detectReset(currIt, prevIt) {
return true
}
currIt = h.NegativeBucketIterator()
prevIt = previous.NegativeBucketIterator()
return detectReset(currIt, prevIt)
}
func detectReset(currIt, prevIt FloatBucketIterator) bool {
if !prevIt.Next() {
return false // If no buckets in previous histogram, nothing can be reset.
}
prevBucket := prevIt.At()
if !currIt.Next() {
// No bucket in current, but at least one in previous
// histogram. Check if any of those are non-zero, in which case
// this is a reset.
for {
if prevBucket.Count != 0 {
return true
}
if !prevIt.Next() {
return false
}
}
}
currBucket := currIt.At()
for {
// Forward currIt until we find the bucket corresponding to prevBucket.
for currBucket.Index < prevBucket.Index {
if !currIt.Next() {
// Reached end of currIt early, therefore
// previous histogram has a bucket that the
// current one does not have. Unlass all
// remaining buckets in the previous histogram
// are unpopulated, this is a reset.
for {
if prevBucket.Count != 0 {
return true
}
if !prevIt.Next() {
return false
}
}
}
currBucket = currIt.At()
}
if currBucket.Index > prevBucket.Index {
// Previous histogram has a bucket the current one does
// not have. If it's populated, it's a reset.
if prevBucket.Count != 0 {
return true
}
} else {
// We have reached corresponding buckets in both iterators.
// We can finally compare the counts.
if currBucket.Count < prevBucket.Count {
return true
}
}
if !prevIt.Next() {
// Reached end of prevIt without finding offending buckets.
return false
}
prevBucket = prevIt.At()
}
}
// PositiveBucketIterator returns a FloatBucketIterator to iterate over all
// positive buckets in ascending order (starting next to the zero bucket and
// going up).

View File

@ -0,0 +1,772 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package histogram
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestFloatHistogramScale(t *testing.T) {
cases := []struct {
name string
in *FloatHistogram
scale float64
expected *FloatHistogram
}{
{
"zero value",
&FloatHistogram{},
3.1415,
&FloatHistogram{},
},
{
"no-op",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 1}, {2, 3}},
PositiveBuckets: []float64{1, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
1,
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 1}, {2, 3}},
PositiveBuckets: []float64{1, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
},
{
"double",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 1}, {2, 3}},
PositiveBuckets: []float64{1, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
2,
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 11,
Count: 3493.3 * 2,
Sum: 2349209.324 * 2,
PositiveSpans: []Span{{-2, 1}, {2, 3}},
PositiveBuckets: []float64{2, 6.6, 8.4, 0.2},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{6.2, 6, 1.234e5 * 2, 2000},
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
require.Equal(t, c.expected, c.in.Scale(c.scale))
// Has it also happened in-place?
require.Equal(t, c.expected, c.in)
})
}
}
func TestFloatHistogramDetectReset(t *testing.T) {
cases := []struct {
name string
previous, current *FloatHistogram
resetExpected bool
}{
{
"zero values",
&FloatHistogram{},
&FloatHistogram{},
false,
},
{
"no buckets to some buckets",
&FloatHistogram{},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 1}, {2, 3}},
PositiveBuckets: []float64{1, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
false,
},
{
"some buckets to no buckets",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 1}, {2, 3}},
PositiveBuckets: []float64{1, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
&FloatHistogram{},
true,
},
{
"one bucket appears, nothing else changes",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 1}, {2, 3}},
PositiveBuckets: []float64{1, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
false,
},
{
"one bucket disappears, nothing else changes",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 1.23, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 1}, {2, 3}},
PositiveBuckets: []float64{1, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
true,
},
{
"an unpopulated bucket disappears, nothing else changes",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 1}, {2, 3}},
PositiveBuckets: []float64{1, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
false,
},
{
"an unpopulated bucket at the end disappears, nothing else changes",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 1}, {1, 3}},
PositiveBuckets: []float64{1, 3.3, 4.2, 0},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 1}, {1, 2}},
PositiveBuckets: []float64{1, 3.3, 4.2},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
false,
},
{
"an unpopulated bucket disappears in a histogram with nothing else",
&FloatHistogram{
PositiveSpans: []Span{{23, 1}},
PositiveBuckets: []float64{0},
},
&FloatHistogram{},
false,
},
{
"zero count goes up",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.6,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
false,
},
{
"zero count goes down",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.4,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
true,
},
{
"count goes up",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.4,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
false,
},
{
"count goes down",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.2,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
true,
},
{
"sum goes up",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349210,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
false,
},
{
"sum goes down",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349200,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
false,
},
{
"one positive bucket goes up",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3.3, 4.3, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
false,
},
{
"one positive bucket goes down",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3.3, 4.1, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
true,
},
{
"one negative bucket goes up",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3.1, 1.234e5, 1000},
},
false,
},
{
"one negative bucket goes down",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 3493.3,
Sum: 2349209.324,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3.3, 4.2, 0.1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3.1, 2.9, 1.234e5, 1000},
},
true,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
require.Equal(t, c.resetExpected, c.current.DetectReset(c.previous))
})
}
}
func TestFloatHistogramCompact(t *testing.T) {
cases := []struct {
name string
in *FloatHistogram
maxEmptyBuckets int
expected *FloatHistogram
}{
// TODO(beorn7): Add test cases.
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
require.Equal(t, c.expected, c.in.Compact(c.maxEmptyBuckets))
// Has it also happened in-place?
require.Equal(t, c.expected, c.in)
})
}
}
func TestFloatHistogramAdd(t *testing.T) {
cases := []struct {
name string
in1, in2, expected *FloatHistogram
}{
{
"same bucket layout",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 11,
Count: 30,
Sum: 2.345,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3, 4, 7},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3, 1, 5, 6},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 8,
Count: 21,
Sum: 1.234,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{0, 0, 2, 3, 6},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{1, 1, 4, 4},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 19,
Count: 51,
Sum: 3.579,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 5, 7, 13},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{4, 2, 9, 10},
},
},
{
"same bucket layout, defined differently",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 11,
Count: 30,
Sum: 2.345,
PositiveSpans: []Span{{-2, 2}, {1, 1}, {0, 2}},
PositiveBuckets: []float64{1, 0, 3, 4, 7},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3, 1, 5, 6},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 8,
Count: 21,
Sum: 1.234,
PositiveSpans: []Span{{-2, 2}, {1, 2}, {0, 1}},
PositiveBuckets: []float64{0, 0, 2, 3, 6},
NegativeSpans: []Span{{3, 7}},
NegativeBuckets: []float64{1, 1, 0, 0, 0, 4, 4},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 19,
Count: 51,
Sum: 3.579,
PositiveSpans: []Span{{-2, 2}, {1, 1}, {0, 2}},
PositiveBuckets: []float64{1, 0, 5, 7, 13},
NegativeSpans: []Span{{3, 5}, {0, 2}},
NegativeBuckets: []float64{4, 2, 0, 0, 0, 9, 10},
},
},
{
"non-overlapping spans",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 11,
Count: 30,
Sum: 2.345,
PositiveSpans: []Span{{-2, 2}, {2, 3}},
PositiveBuckets: []float64{1, 0, 3, 4, 7},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3, 1, 5, 6},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 8,
Count: 21,
Sum: 1.234,
PositiveSpans: []Span{{0, 2}, {3, 3}},
PositiveBuckets: []float64{5, 4, 2, 3, 6},
NegativeSpans: []Span{{-9, 2}, {3, 2}},
NegativeBuckets: []float64{1, 1, 4, 4},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 19,
Count: 51,
Sum: 3.579,
PositiveSpans: []Span{{-2, 4}, {0, 6}},
PositiveBuckets: []float64{1, 0, 5, 4, 3, 4, 7, 2, 3, 6},
NegativeSpans: []Span{{-9, 2}, {3, 2}, {5, 2}, {3, 2}},
NegativeBuckets: []float64{1, 1, 4, 4, 3, 1, 5, 6},
},
},
{
"non-overlapping inverted order",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 8,
Count: 21,
Sum: 1.234,
PositiveSpans: []Span{{0, 2}, {3, 3}},
PositiveBuckets: []float64{5, 4, 2, 3, 6},
NegativeSpans: []Span{{-9, 2}, {3, 2}},
NegativeBuckets: []float64{1, 1, 4, 4},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 11,
Count: 30,
Sum: 2.345,
PositiveSpans: []Span{{-2, 2}, {2, 3}},
PositiveBuckets: []float64{1, 0, 3, 4, 7},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3, 1, 5, 6},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 19,
Count: 51,
Sum: 3.579,
PositiveSpans: []Span{{-2, 2}, {0, 5}, {0, 3}},
PositiveBuckets: []float64{1, 0, 5, 4, 3, 4, 7, 2, 3, 6},
NegativeSpans: []Span{{-9, 2}, {3, 2}, {5, 2}, {3, 2}},
NegativeBuckets: []float64{1, 1, 4, 4, 3, 1, 5, 6},
},
},
{
"overlapping spans",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 11,
Count: 30,
Sum: 2.345,
PositiveSpans: []Span{{-2, 2}, {2, 3}},
PositiveBuckets: []float64{1, 0, 3, 4, 7},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3, 1, 5, 6},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 8,
Count: 21,
Sum: 1.234,
PositiveSpans: []Span{{-1, 4}, {0, 3}},
PositiveBuckets: []float64{5, 4, 2, 3, 6, 2, 5},
NegativeSpans: []Span{{4, 2}, {1, 2}},
NegativeBuckets: []float64{1, 1, 4, 4},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 19,
Count: 51,
Sum: 3.579,
PositiveSpans: []Span{{-2, 4}, {0, 4}},
PositiveBuckets: []float64{1, 5, 4, 2, 6, 10, 9, 5},
NegativeSpans: []Span{{3, 3}, {1, 3}},
NegativeBuckets: []float64{3, 2, 1, 4, 9, 6},
},
},
{
"overlapping spans inverted order",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 8,
Count: 21,
Sum: 1.234,
PositiveSpans: []Span{{-1, 4}, {0, 3}},
PositiveBuckets: []float64{5, 4, 2, 3, 6, 2, 5},
NegativeSpans: []Span{{4, 2}, {1, 2}},
NegativeBuckets: []float64{1, 1, 4, 4},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 11,
Count: 30,
Sum: 2.345,
PositiveSpans: []Span{{-2, 2}, {2, 3}},
PositiveBuckets: []float64{1, 0, 3, 4, 7},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3, 1, 5, 6},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 19,
Count: 51,
Sum: 3.579,
PositiveSpans: []Span{{-2, 5}, {0, 3}},
PositiveBuckets: []float64{1, 5, 4, 2, 6, 10, 9, 5},
NegativeSpans: []Span{{3, 3}, {1, 3}},
NegativeBuckets: []float64{3, 2, 1, 4, 9, 6},
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
require.Equal(t, c.expected, c.in1.Add(c.in2))
// Has it also happened in-place?
require.Equal(t, c.expected, c.in1)
})
}
}
func TestFloatHistogramSub(t *testing.T) {
// This has fewer test cases than TestFloatHistogramAdd because Add and
// Sub share most of the trickier code.
cases := []struct {
name string
in1, in2, expected *FloatHistogram
}{
{
"same bucket layout",
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 11,
Count: 30,
Sum: 23,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 3, 4, 7},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{3, 1, 5, 6},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 8,
Count: 21,
Sum: 12,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{0, 0, 2, 3, 6},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{1, 1, 4, 4},
},
&FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 3,
Count: 9,
Sum: 11,
PositiveSpans: []Span{{-2, 2}, {1, 3}},
PositiveBuckets: []float64{1, 0, 1, 1, 1},
NegativeSpans: []Span{{3, 2}, {3, 2}},
NegativeBuckets: []float64{2, 0, 1, 2},
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
require.Equal(t, c.expected, c.in1.Sub(c.in2))
// Has it also happened in-place?
require.Equal(t, c.expected, c.in1)
})
}
}

View File

@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql/parser"
@ -2619,11 +2620,8 @@ func TestRangeQuery(t *testing.T) {
}
func TestSparseHistogramRate(t *testing.T) {
// Currently, this test it to only find panics or errors in the engine execution path.
// The panic stack trace will mostly tell you what code path is breaking and needs fixing for
// fetching the raw histograms and passing it rightly upto the rate() function implementation.
// TODO: Check the result for correctness once implementation is ready.
// TODO(beorn7): Integrate histograms into the PromQL testing framework
// and write more tests there.
test, err := NewTest(t, "")
require.NoError(t, err)
defer test.Close()
@ -2646,4 +2644,21 @@ func TestSparseHistogramRate(t *testing.T) {
require.NoError(t, err)
res := qry.Exec(test.Context())
require.NoError(t, res.Err)
vector, err := res.Vector()
require.NoError(t, err)
require.Len(t, vector, 1)
actualHistogram := vector[0].H
expectedHistogram := &histogram.FloatHistogram{
Schema: 1,
ZeroThreshold: 0.001,
ZeroCount: 1. / 15.,
Count: 4. / 15.,
Sum: 1.226666666666667,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
}
require.Equal(t, expectedHistogram, actualHistogram)
}

View File

@ -24,6 +24,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
)
@ -60,9 +61,11 @@ func extrapolatedRate(vals []parser.Value, args parser.Expressions, enh *EvalNod
ms := args[0].(*parser.MatrixSelector)
vs := ms.VectorSelector.(*parser.VectorSelector)
var (
samples = vals[0].(Matrix)[0]
rangeStart = enh.Ts - durationMilliseconds(ms.Range+vs.Offset)
rangeEnd = enh.Ts - durationMilliseconds(vs.Offset)
samples = vals[0].(Matrix)[0]
rangeStart = enh.Ts - durationMilliseconds(ms.Range+vs.Offset)
rangeEnd = enh.Ts - durationMilliseconds(vs.Offset)
resultValue float64
resultHistogram *histogram.FloatHistogram
)
// No sense in trying to compute a rate without at least two points. Drop
@ -71,14 +74,32 @@ func extrapolatedRate(vals []parser.Value, args parser.Expressions, enh *EvalNod
return enh.Out
}
resultValue := samples.Points[len(samples.Points)-1].V - samples.Points[0].V
if isCounter {
var lastValue float64
for _, sample := range samples.Points {
if sample.V < lastValue {
resultValue += lastValue
if samples.Points[0].H != nil {
resultHistogram = histogramRate(samples.Points, isCounter)
if resultHistogram == nil {
// Points are a mix of floats and histograms, or the histograms
// are not compatible with each other.
// TODO(beorn7): find a way of communicating the exact reason
return enh.Out
}
} else {
resultValue = samples.Points[len(samples.Points)-1].V - samples.Points[0].V
prevValue := samples.Points[0].V
// We have to iterate through everything even in the non-counter
// case because we have to check that everything is a float.
// TODO(beorn7): Find a way to check that earlier, e.g. by
// handing in a []FloatPoint and a []HistogramPoint separately.
for _, currPoint := range samples.Points[1:] {
if currPoint.H != nil {
return nil // Range contains a mix of histograms and floats.
}
lastValue = sample.V
if !isCounter {
continue
}
if currPoint.V < prevValue {
resultValue += prevValue
}
prevValue = currPoint.V
}
}
@ -89,6 +110,7 @@ func extrapolatedRate(vals []parser.Value, args parser.Expressions, enh *EvalNod
sampledInterval := float64(samples.Points[len(samples.Points)-1].T-samples.Points[0].T) / 1000
averageDurationBetweenSamples := sampledInterval / float64(len(samples.Points)-1)
// TODO(beorn7): Do this for histograms, too.
if isCounter && resultValue > 0 && samples.Points[0].V >= 0 {
// Counters cannot be negative. If we have any slope at
// all (i.e. resultValue went up), we can extrapolate
@ -120,16 +142,60 @@ func extrapolatedRate(vals []parser.Value, args parser.Expressions, enh *EvalNod
} else {
extrapolateToInterval += averageDurationBetweenSamples / 2
}
resultValue = resultValue * (extrapolateToInterval / sampledInterval)
factor := extrapolateToInterval / sampledInterval
if isRate {
resultValue = resultValue / ms.Range.Seconds()
factor /= ms.Range.Seconds()
}
if resultHistogram == nil {
resultValue *= factor
} else {
resultHistogram.Scale(factor)
}
return append(enh.Out, Sample{
Point: Point{V: resultValue},
Point: Point{V: resultValue, H: resultHistogram},
})
}
// histogramRate is a helper function for extrapolatedRate. It requires
// points[0] to be a histogram. It returns nil if any other Point in points is
// not a histogram. Currently, it also returns nil on mixed schemas or zero
// thresholds in the histograms, because it cannot handle those schema changes
// yet.
func histogramRate(points []Point, isCounter bool) *histogram.FloatHistogram {
prev := points[0].H // We already know that this is a histogram.
last := points[len(points)-1].H
if last == nil {
return nil // Last point in range is not a histogram.
}
if last.Schema != prev.Schema || last.ZeroThreshold != prev.ZeroThreshold {
return nil // TODO(beorn7): Handle schema changes properly.
}
h := last.Copy()
h.Sub(prev)
// We have to iterate through everything even in the non-counter case
// because we have to check that everything is a histogram.
// TODO(beorn7): Find a way to check that earlier, e.g. by handing in a
// []FloatPoint and a []HistogramPoint separately.
for _, currPoint := range points[1:] {
curr := currPoint.H
if curr == nil {
return nil // Range contains a mix of histograms and floats.
}
if !isCounter {
continue
}
if curr.Schema != prev.Schema || curr.ZeroThreshold != prev.ZeroThreshold {
return nil // TODO(beorn7): Handle schema changes properly.
}
if curr.DetectReset(prev) {
h.Add(prev)
}
prev = curr
}
return h.Compact(3)
}
// === delta(Matrix parser.ValueTypeMatrix) Vector ===
func funcDelta(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
return extrapolatedRate(vals, args, enh, false, false)