Merge pull request #11830 from codesome/histo-fed

Support native histograms in federation
This commit is contained in:
Ganesh Vernekar 2023-01-12 21:54:44 +05:30 committed by GitHub
commit 72f20d949a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 217 additions and 29 deletions

View File

@ -17,9 +17,9 @@ Rule files use YAML.
The rule files can be reloaded at runtime by sending `SIGHUP` to the Prometheus
process. The changes are only applied if all rule files are well-formatted.
_Note about native histograms (experimental feature): Rules evaluating to
native histograms do not yet work as expected. Instead of a native histogram,
the sample stored is just a floating point value of zero._
_Note about native histograms (experimental feature): Native histogram are always
recorded as gauge histograms (for now). Most cases will create gauge histograms
naturally, e.g. after `rate()`._
## Syntax-checking rules

View File

@ -8,8 +8,15 @@ sort_rank: 6
Federation allows a Prometheus server to scrape selected time series from
another Prometheus server.
_Note about native histograms (experimental feature): Federation does not
support native histograms yet._
_Note about native histograms (experimental feature): To scrape native histograms
via federation, the scraping Prometheus server needs to run with native histograms
enabled (via the command line flag `--enable-feature=native-histograms`), implying
that the protobuf format is used for scraping. Should the federated metrics contain
a mix of different sample types (float64, counter histogram, gauge histogram) for
the same metric name, the federation payload will contain multiple metric families
with the same name (but different types). Technically, this violates the rules of
the protobuf exposition format, but Prometheus is nevertheless able to ingest all
metrics correctly._
## Use cases

View File

@ -68,9 +68,11 @@ func (b *BufferedSeriesIterator) ReduceDelta(delta int64) bool {
// PeekBack returns the nth previous element of the iterator. If there is none buffered,
// ok is false.
func (b *BufferedSeriesIterator) PeekBack(n int) (t int64, v float64, h *histogram.Histogram, ok bool) {
func (b *BufferedSeriesIterator) PeekBack(n int) (
t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, ok bool,
) {
s, ok := b.buf.nthLast(n)
return s.t, s.v, s.h, ok
return s.t, s.v, s.h, s.fh, ok
}
// Buffer returns an iterator over the buffered data. Invalidates previously

View File

@ -107,7 +107,7 @@ func TestBufferedSeriesIterator(t *testing.T) {
require.Equal(t, ev, v, "value mismatch")
}
prevSampleEq := func(ets int64, ev float64, eok bool) {
ts, v, _, ok := it.PeekBack(1)
ts, v, _, _, ok := it.PeekBack(1)
require.Equal(t, eok, ok, "exist mismatch")
require.Equal(t, ets, ts, "timestamp mismatch")
require.Equal(t, ev, v, "value mismatch")

View File

@ -26,6 +26,7 @@ import (
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/model/value"
@ -103,6 +104,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
it := storage.NewBuffer(int64(h.lookbackDelta / 1e6))
var chkIter chunkenc.Iterator
Loop:
for set.Next() {
s := set.At()
@ -111,18 +113,26 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
chkIter = s.Iterator(chkIter)
it.Reset(chkIter)
var t int64
var v float64
var ok bool
var (
t int64
v float64
h *histogram.Histogram
fh *histogram.FloatHistogram
ok bool
)
valueType := it.Seek(maxt)
if valueType == chunkenc.ValFloat {
switch valueType {
case chunkenc.ValFloat:
t, v = it.At()
} else {
// TODO(beorn7): Handle histograms.
t, v, _, ok = it.PeekBack(1)
case chunkenc.ValFloatHistogram, chunkenc.ValHistogram:
t, fh = it.AtFloatHistogram()
default:
t, v, h, fh, ok = it.PeekBack(1)
if !ok {
continue
continue Loop
}
if h != nil {
fh = h.ToFloat()
}
}
// The exposition formats do not support stale markers, so drop them. This
@ -135,7 +145,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
vec = append(vec, promql.Sample{
Metric: s.Labels(),
Point: promql.Point{T: t, V: v},
Point: promql.Point{T: t, V: v, H: fh},
})
}
if ws := set.Warnings(); len(ws) > 0 {
@ -161,15 +171,22 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
sort.Strings(externalLabelNames)
var (
lastMetricName string
protMetricFam *dto.MetricFamily
lastMetricName string
lastWasHistogram, lastHistogramWasGauge bool
protMetricFam *dto.MetricFamily
)
for _, s := range vec {
isHistogram := s.H != nil
if isHistogram &&
format != expfmt.FmtProtoDelim && format != expfmt.FmtProtoText && format != expfmt.FmtProtoCompact {
// Can't serve the native histogram.
// TODO(codesome): Serve them when other protocols get the native histogram support.
continue
}
nameSeen := false
globalUsed := map[string]struct{}{}
protMetric := &dto.Metric{
Untyped: &dto.Untyped{},
}
protMetric := &dto.Metric{}
err := s.Metric.Validate(func(l labels.Label) error {
if l.Value == "" {
@ -179,11 +196,18 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
}
if l.Name == labels.MetricName {
nameSeen = true
if l.Value == lastMetricName {
// We already have the name in the current MetricFamily,
// and we ignore nameless metrics.
if l.Value == lastMetricName && // We already have the name in the current MetricFamily, and we ignore nameless metrics.
lastWasHistogram == isHistogram && // The sample type matches (float vs histogram).
// If it was a histogram, the histogram type (counter vs gauge) also matches.
(!isHistogram || lastHistogramWasGauge == (s.H.CounterResetHint == histogram.GaugeType)) {
return nil
}
// Since we now check for the sample type and type of histogram above, we will end up
// creating multiple metric families for the same metric name. This would technically be
// an invalid exposition. But since the consumer of this is Prometheus, and Prometheus can
// parse it fine, we allow it and bend the rules to make federation possible in those cases.
// Need to start a new MetricFamily. Ship off the old one (if any) before
// creating the new one.
if protMetricFam != nil {
@ -195,6 +219,13 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
Type: dto.MetricType_UNTYPED.Enum(),
Name: proto.String(l.Value),
}
if isHistogram {
if s.H.CounterResetHint == histogram.GaugeType {
protMetricFam.Type = dto.MetricType_GAUGE_HISTOGRAM.Enum()
} else {
protMetricFam.Type = dto.MetricType_HISTOGRAM.Enum()
}
}
lastMetricName = l.Value
return nil
}
@ -228,9 +259,42 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
}
protMetric.TimestampMs = proto.Int64(s.T)
protMetric.Untyped.Value = proto.Float64(s.V)
// TODO(beorn7): Handle histograms.
if !isHistogram {
lastHistogramWasGauge = false
protMetric.Untyped = &dto.Untyped{
Value: proto.Float64(s.V),
}
} else {
lastHistogramWasGauge = s.H.CounterResetHint == histogram.GaugeType
protMetric.Histogram = &dto.Histogram{
SampleCountFloat: proto.Float64(s.H.Count),
SampleSum: proto.Float64(s.H.Sum),
Schema: proto.Int32(s.H.Schema),
ZeroThreshold: proto.Float64(s.H.ZeroThreshold),
ZeroCountFloat: proto.Float64(s.H.ZeroCount),
NegativeCount: s.H.NegativeBuckets,
PositiveCount: s.H.PositiveBuckets,
}
if len(s.H.PositiveSpans) > 0 {
protMetric.Histogram.PositiveSpan = make([]*dto.BucketSpan, len(s.H.PositiveSpans))
for i, sp := range s.H.PositiveSpans {
protMetric.Histogram.PositiveSpan[i] = &dto.BucketSpan{
Offset: proto.Int32(sp.Offset),
Length: proto.Uint32(sp.Length),
}
}
}
if len(s.H.NegativeSpans) > 0 {
protMetric.Histogram.NegativeSpan = make([]*dto.BucketSpan, len(s.H.NegativeSpans))
for i, sp := range s.H.NegativeSpans {
protMetric.Histogram.NegativeSpan[i] = &dto.BucketSpan{
Offset: proto.Int32(sp.Offset),
Length: proto.Uint32(sp.Length),
}
}
}
}
lastWasHistogram = isHistogram
protMetricFam.Metric = append(protMetricFam.Metric, protMetric)
}
// Still have to ship off the last MetricFamily, if any.

View File

@ -16,6 +16,8 @@ package web
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"sort"
@ -28,7 +30,9 @@ import (
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/textparse"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
@ -299,3 +303,114 @@ func normalizeBody(body *bytes.Buffer) string {
}
return strings.Join(lines, "")
}
func TestFederationWithNativeHistograms(t *testing.T) {
suite, err := promql.NewTest(t, "")
if err != nil {
t.Fatal(err)
}
defer suite.Close()
if err := suite.Run(); err != nil {
t.Fatal(err)
}
var expVec promql.Vector
db := suite.TSDB()
hist := &histogram.Histogram{
Count: 10,
ZeroCount: 2,
ZeroThreshold: 0.001,
Sum: 39.4,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{1, 1, -1, 0},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
NegativeBuckets: []int64{1, 1, -1, 0},
}
app := db.Appender(context.Background())
for i := 0; i < 6; i++ {
l := labels.FromStrings("__name__", "test_metric", "foo", fmt.Sprintf("%d", i))
expL := labels.FromStrings("__name__", "test_metric", "instance", "", "foo", fmt.Sprintf("%d", i))
if i%3 == 0 {
_, err = app.Append(0, l, 100*60*1000, float64(i*100))
expVec = append(expVec, promql.Sample{
Point: promql.Point{T: 100 * 60 * 1000, V: float64(i * 100)},
Metric: expL,
})
} else {
hist.ZeroCount++
_, err = app.AppendHistogram(0, l, 100*60*1000, hist.Copy(), nil)
expVec = append(expVec, promql.Sample{
Point: promql.Point{T: 100 * 60 * 1000, H: hist.ToFloat()},
Metric: expL,
})
}
require.NoError(t, err)
}
require.NoError(t, app.Commit())
h := &Handler{
localStorage: &dbAdapter{suite.TSDB()},
lookbackDelta: 5 * time.Minute,
now: func() model.Time { return 101 * 60 * 1000 }, // 101min after epoch.
config: &config.Config{
GlobalConfig: config.GlobalConfig{},
},
}
req := httptest.NewRequest("GET", "http://example.org/federate?match[]=test_metric", nil)
req.Header.Add("Accept", `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited,application/openmetrics-text;version=1.0.0;q=0.8,application/openmetrics-text;version=0.0.1;q=0.75,text/plain;version=0.0.4;q=0.5,*/*;q=0.1`)
res := httptest.NewRecorder()
h.federation(res, req)
require.Equal(t, http.StatusOK, res.Code)
body, err := io.ReadAll(res.Body)
require.NoError(t, err)
p := textparse.NewProtobufParser(body)
var actVec promql.Vector
metricFamilies := 0
for {
et, err := p.Next()
if err == io.EOF {
break
}
require.NoError(t, err)
if et == textparse.EntryHelp {
metricFamilies++
}
if et == textparse.EntryHistogram || et == textparse.EntrySeries {
l := labels.Labels{}
p.Metric(&l)
actVec = append(actVec, promql.Sample{Metric: l})
}
if et == textparse.EntryHistogram {
_, parsedTimestamp, h, fh := p.Histogram()
require.Nil(t, h)
actVec[len(actVec)-1].Point = promql.Point{
T: *parsedTimestamp,
H: fh,
}
} else if et == textparse.EntrySeries {
_, parsedTimestamp, v := p.Series()
actVec[len(actVec)-1].Point = promql.Point{
T: *parsedTimestamp,
V: v,
}
}
}
// TODO(codesome): Once PromQL is able to set the CounterResetHint on histograms,
// test it with switching histogram types for metric families.
require.Equal(t, 4, metricFamilies)
require.Equal(t, expVec, actVec)
}