enhance histogram_quantile to get min/max value

Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>
This commit is contained in:
Ziqi Zhao 2023-07-05 19:05:53 +08:00
parent f93ac97867
commit 42d9169ba1
6 changed files with 52 additions and 376 deletions

View File

@ -200,28 +200,6 @@ observed values (in this case corresponding to “average request duration”):
/
histogram_count(rate(http_request_duration_seconds[10m]))
## `histogram_min()`
_This function only acts on native histograms, which are an experimental
feature. The behavior of this function may change in future versions of
Prometheus, including its removal from PromQL._
`histogram_min(v instant-vector)` returns the estimated minimum value stored in
a native histogram. This estimation is based on the lower boundary of the lowest
bucket that contains values in the native histogram. Samples that are not native
histograms are ignored and do not show up in the returned vector.
## `histogram_max()`
_This function only acts on native histograms, which are an experimental
feature. The behavior of this function may change in future versions of
Prometheus, including its removal from PromQL._
`histogram_max(v instant-vector)` returns the estimated maximum value stored in
a native histogram. This estimation is based on the upper boundary of the highest
bucket that contains values in the native histogram. Samples that are not native
histograms are ignored and do not show up in the returned vector.
## `histogram_fraction()`
_This function only acts on native histograms, which are an experimental
@ -339,6 +317,12 @@ bound of that bucket is greater than
bucket. Otherwise, the upper bound of the lowest bucket is returned for
quantiles located in the lowest bucket.
You can use `histogram_quantile(0, v instant-vector)` to get the estimated minimum value stored in
a histogram.
You can use `histogram_quantile(1, v instant-vector)` to get the estimated maximum value stored in
a histogram.
## `holt_winters()`

View File

@ -615,10 +615,10 @@ func (h *FloatHistogram) NegativeReverseBucketIterator() BucketIterator[float64]
// set to the zero threshold.
func (h *FloatHistogram) AllBucketIterator() BucketIterator[float64] {
return &allFloatBucketIterator{
h: h,
negIter: h.NegativeReverseBucketIterator(),
posIter: h.PositiveBucketIterator(),
state: -1,
h: h,
leftIter: h.NegativeReverseBucketIterator(),
rightIter: h.PositiveBucketIterator(),
state: -1,
}
}
@ -628,11 +628,11 @@ func (h *FloatHistogram) AllBucketIterator() BucketIterator[float64] {
// overlap with the zero bucket, their upper or lower boundary, respectively, is
// set to the zero threshold.
func (h *FloatHistogram) AllReverseBucketIterator() BucketIterator[float64] {
return &allReverseFloatBucketIterator{
h: h,
negIter: h.NegativeBucketIterator(),
posIter: h.PositiveReverseBucketIterator(),
state: 1,
return &allFloatBucketIterator{
h: h,
leftIter: h.PositiveReverseBucketIterator(),
rightIter: h.NegativeBucketIterator(),
state: -1,
}
}
@ -917,8 +917,8 @@ func (i *reverseFloatBucketIterator) Next() bool {
}
type allFloatBucketIterator struct {
h *FloatHistogram
negIter, posIter BucketIterator[float64]
h *FloatHistogram
leftIter, rightIter BucketIterator[float64]
// -1 means we are iterating negative buckets.
// 0 means it is time for the zero bucket.
// 1 means we are iterating positive buckets.
@ -930,10 +930,13 @@ type allFloatBucketIterator struct {
func (i *allFloatBucketIterator) Next() bool {
switch i.state {
case -1:
if i.negIter.Next() {
i.currBucket = i.negIter.At()
if i.currBucket.Upper > -i.h.ZeroThreshold {
if i.leftIter.Next() {
i.currBucket = i.leftIter.At()
switch {
case i.currBucket.Upper < 0 && i.currBucket.Upper > -i.h.ZeroThreshold:
i.currBucket.Upper = -i.h.ZeroThreshold
case i.currBucket.Lower > 0 && i.currBucket.Lower < i.h.ZeroThreshold:
i.currBucket.Lower = i.h.ZeroThreshold
}
return true
}
@ -954,10 +957,13 @@ func (i *allFloatBucketIterator) Next() bool {
}
return i.Next()
case 1:
if i.posIter.Next() {
i.currBucket = i.posIter.At()
if i.currBucket.Lower < i.h.ZeroThreshold {
if i.rightIter.Next() {
i.currBucket = i.rightIter.At()
switch {
case i.currBucket.Lower > 0 && i.currBucket.Lower < i.h.ZeroThreshold:
i.currBucket.Lower = i.h.ZeroThreshold
case i.currBucket.Upper < 0 && i.currBucket.Upper > -i.h.ZeroThreshold:
i.currBucket.Upper = -i.h.ZeroThreshold
}
return true
}
@ -971,59 +977,3 @@ func (i *allFloatBucketIterator) Next() bool {
func (i *allFloatBucketIterator) At() Bucket[float64] {
return i.currBucket
}
type allReverseFloatBucketIterator struct {
h *FloatHistogram
negIter, posIter BucketIterator[float64]
// 1 means we are iterating positive buckets.
// 0 means it is time for the zero bucket.
// -1 means we are iterating negative buckets.
// Anything else means iteration is over.
state int8
currBucket Bucket[float64]
}
func (i *allReverseFloatBucketIterator) Next() bool {
switch i.state {
case 1:
if i.posIter.Next() {
i.currBucket = i.posIter.At()
if i.currBucket.Lower < i.h.ZeroThreshold {
i.currBucket.Lower = i.h.ZeroThreshold
}
return true
}
i.state = 0
return i.Next()
case 0:
i.state = -1
if i.h.ZeroCount > 0 {
i.currBucket = Bucket[float64]{
Lower: -i.h.ZeroThreshold,
Upper: i.h.ZeroThreshold,
LowerInclusive: true,
UpperInclusive: true,
Count: i.h.ZeroCount,
// Index is irrelevant for the zero bucket.
}
return true
}
return i.Next()
case -1:
if i.negIter.Next() {
i.currBucket = i.negIter.At()
if i.currBucket.Upper > -i.h.ZeroThreshold {
i.currBucket.Upper = -i.h.ZeroThreshold
}
return true
}
i.state = 42
return false
}
return false
}
func (i *allReverseFloatBucketIterator) At() Bucket[float64] {
return i.currBucket
}

View File

@ -3295,212 +3295,6 @@ func TestNativeHistogram_HistogramCountAndSum(t *testing.T) {
}
}
func TestNativeHistogram_HistogramMinAndMax(t *testing.T) {
// TODO(carrieedwards): Integrate histograms into the PromQL testing framework
// and write more tests there.
cases := []struct {
text string
// Histogram to test.
h *histogram.Histogram
// Expected
expectedMin float64
expectedMax float64
}{
{
text: "all negative buckets",
h: &histogram.Histogram{
Count: 12,
ZeroThreshold: 0.001,
Sum: 100, // Does not matter.
Schema: 0,
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
NegativeBuckets: []int64{2, 1, -2, 3},
},
expectedMin: -16,
expectedMax: -0.5,
},
{
text: "all positive buckets",
h: &histogram.Histogram{
Count: 12,
ZeroThreshold: 0.001,
Sum: 100, // Does not matter.
Schema: 0,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{2, 1, -2, 3},
},
expectedMin: 0.5,
expectedMax: 16,
},
{
text: "all negative buckets",
h: &histogram.Histogram{
Count: 12,
ZeroThreshold: 0.001,
Sum: 100, // Does not matter.
Schema: 0,
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
NegativeBuckets: []int64{2, 1, -2, 3},
},
expectedMin: -16,
expectedMax: -0.5,
},
{
text: "both positive and negative buckets",
h: &histogram.Histogram{
Count: 24,
ZeroThreshold: 0.001,
Sum: 100, // Does not matter.
Schema: 0,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{2, 1, -2, 3},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
NegativeBuckets: []int64{2, 1, -2, 3},
},
expectedMin: -16,
expectedMax: 16,
},
{
text: "all positive buckets with zero bucket count",
h: &histogram.Histogram{
Count: 12,
ZeroCount: 2,
ZeroThreshold: 0.001,
Sum: 100, // Does not matter.
Schema: 0,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{2, 1, -2, 3},
},
expectedMin: -0.001,
expectedMax: 16,
},
{
text: "all negative buckets with zero bucket count",
h: &histogram.Histogram{
Count: 12,
ZeroCount: 2,
ZeroThreshold: 0.001,
Sum: 100, // Does not matter.
Schema: 0,
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
NegativeBuckets: []int64{2, 1, -2, 3},
},
expectedMin: -16,
expectedMax: 0.001,
},
{
text: "both positive and negative buckets with zero bucket count",
h: &histogram.Histogram{
Count: 24,
ZeroCount: 4,
ZeroThreshold: 0.001,
Sum: 100, // Does not matter.
Schema: 0,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{2, 1, -2, 3},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
NegativeBuckets: []int64{2, 1, -2, 3},
},
expectedMin: -16,
expectedMax: 16,
},
{
text: "empty histogram",
h: &histogram.Histogram{},
expectedMin: math.NaN(),
expectedMax: math.NaN(),
},
}
test, err := NewTest(t, "")
require.NoError(t, err)
t.Cleanup(test.Close)
idx := int64(0)
for _, floatHisto := range []bool{true, false} {
for _, c := range cases {
t.Run(fmt.Sprintf("%s floatHistogram=%t", c.text, floatHisto), func(t *testing.T) {
seriesName := "sparse_histogram_series"
lbls := labels.FromStrings("__name__", seriesName)
engine := test.QueryEngine()
ts := idx * int64(10*time.Minute/time.Millisecond)
app := test.Storage().Appender(context.TODO())
if floatHisto {
_, err = app.AppendHistogram(0, lbls, ts, nil, c.h.ToFloat())
} else {
_, err = app.AppendHistogram(0, lbls, ts, c.h, nil)
}
require.NoError(t, err)
require.NoError(t, app.Commit())
queryString := fmt.Sprintf("histogram_min(%s)", seriesName)
qry, err := engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(ts))
require.NoError(t, err)
res := qry.Exec(test.Context())
require.NoError(t, res.Err)
vector, err := res.Vector()
require.NoError(t, err)
require.Len(t, vector, 1)
require.Nil(t, vector[0].H)
if math.IsNaN(c.expectedMin) {
require.True(t, math.IsNaN(vector[0].V))
} else {
require.Equal(t, float64(c.expectedMin), vector[0].V)
}
queryString = fmt.Sprintf("histogram_max(%s)", seriesName)
qry, err = engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(ts))
require.NoError(t, err)
res = qry.Exec(test.Context())
require.NoError(t, res.Err)
vector, err = res.Vector()
require.NoError(t, err)
require.Len(t, vector, 1)
require.Nil(t, vector[0].H)
if math.IsNaN(c.expectedMax) {
require.True(t, math.IsNaN(vector[0].V))
} else {
require.Equal(t, c.expectedMax, vector[0].V)
}
idx++
})
}
}
}
func TestNativeHistogram_HistogramQuantile(t *testing.T) {
// TODO(codesome): Integrate histograms into the PromQL testing framework
// and write more tests there.

View File

@ -996,66 +996,6 @@ func funcHistogramSum(vals []parser.Value, args parser.Expressions, enh *EvalNod
return enh.Out
}
// === histogram_min(Vector parser.ValueTypeVector) Vector ===
func funcHistogramMin(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
inVec := vals[0].(Vector)
for _, sample := range inVec {
// Skip non-histogram samples.
if sample.H == nil {
continue
}
min := math.NaN() // initialize to NaN in case histogram is empty
it := sample.H.AllBucketIterator() // AllBucketIterator starts at the lowest bucket in the native histogram
for it.Next() {
bucket := it.At()
// Find the lower limit of the lowest populated bucket
if bucket.Count > 0 {
min = bucket.Lower
break
}
}
enh.Out = append(enh.Out, Sample{
Metric: enh.DropMetricName(sample.Metric),
Point: Point{V: min},
})
}
return enh.Out
}
// === histogram_max(Vector parser.ValueTypeVector) Vector ===
func funcHistogramMax(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
inVec := vals[0].(Vector)
for _, sample := range inVec {
// Skip non-histogram samples.
if sample.H == nil {
continue
}
max := math.NaN() // initialize to NaN in case histogram is empty
it := sample.H.AllReverseBucketIterator() // AllReverseBucketIterator starts at the highest bucket in the native histogram
for it.Next() {
bucket := it.At()
// Find the upper limit of the highest populated bucket
if bucket.Count > 0 {
max = bucket.Upper
break
}
}
enh.Out = append(enh.Out, Sample{
Metric: enh.DropMetricName(sample.Metric),
Point: Point{V: max},
})
}
return enh.Out
}
// === histogram_fraction(lower, upper parser.ValueTypeScalar, Vector parser.ValueTypeVector) Vector ===
func funcHistogramFraction(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
lower := vals[0].(Vector)[0].F
@ -1435,8 +1375,6 @@ var FunctionCalls = map[string]FunctionCall{
"floor": funcFloor,
"histogram_count": funcHistogramCount,
"histogram_fraction": funcHistogramFraction,
"histogram_max": funcHistogramMax,
"histogram_min": funcHistogramMin,
"histogram_quantile": funcHistogramQuantile,
"histogram_sum": funcHistogramSum,
"holt_winters": funcHoltWinters,

View File

@ -173,16 +173,6 @@ var Functions = map[string]*Function{
ArgTypes: []ValueType{ValueTypeVector},
ReturnType: ValueTypeVector,
},
"histogram_min": {
Name: "histogram_min",
ArgTypes: []ValueType{ValueTypeVector},
ReturnType: ValueTypeVector,
},
"histogram_max": {
Name: "histogram_max",
ArgTypes: []ValueType{ValueTypeVector},
ReturnType: ValueTypeVector,
},
"histogram_fraction": {
Name: "histogram_fraction",
ArgTypes: []ValueType{ValueTypeScalar, ValueTypeScalar, ValueTypeVector},

View File

@ -158,9 +158,21 @@ func histogramQuantile(q float64, h *histogram.FloatHistogram) float64 {
var (
bucket histogram.Bucket[float64]
count float64
it = h.AllBucketIterator()
rank = q * h.Count
it histogram.BucketIterator[float64]
rank float64
)
// if there are NaN observations in the histogram (h.Sum is NaN), use the forward iterator
// if the q < 0.5, use the forward iterator
// if the q >= 0.5, use the reverse iterator
if math.IsNaN(h.Sum) || q < 0.5 {
it = h.AllBucketIterator()
rank = q * h.Count
} else {
it = h.AllReverseBucketIterator()
rank = (1 - q) * h.Count
}
for it.Next() {
bucket = it.At()
count += bucket.Count
@ -193,7 +205,15 @@ func histogramQuantile(q float64, h *histogram.FloatHistogram) float64 {
return bucket.Upper
}
rank -= count - bucket.Count
// if there are NaN observations in the histogram (h.Sum is NaN), use the forward iterator
// if the q < 0.5, use the forward iterator
// if the q >= 0.5, use the reverse iterator
if math.IsNaN(h.Sum) || q < 0.5 {
rank -= count - bucket.Count
} else {
rank = count - rank
}
// TODO(codesome): Use a better estimation than linear.
return bucket.Lower + (bucket.Upper-bucket.Lower)*(rank/bucket.Count)
}