From 33f880d123177377c03daf1d60019962b4b3361e Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Mon, 9 Jan 2023 17:06:15 +0530 Subject: [PATCH 1/5] Add native histogram support in federation Signed-off-by: Ganesh Vernekar --- web/federate.go | 100 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 80 insertions(+), 20 deletions(-) diff --git a/web/federate.go b/web/federate.go index 85472bb44..1589c893e 100644 --- a/web/federate.go +++ b/web/federate.go @@ -26,6 +26,7 @@ import ( "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/model/value" @@ -103,6 +104,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge) it := storage.NewBuffer(int64(h.lookbackDelta / 1e6)) var chkIter chunkenc.Iterator +Loop: for set.Next() { s := set.At() @@ -111,18 +113,22 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { chkIter = s.Iterator(chkIter) it.Reset(chkIter) - var t int64 - var v float64 - var ok bool - + var ( + t int64 + v float64 + h *histogram.FloatHistogram + ok bool + ) valueType := it.Seek(maxt) - if valueType == chunkenc.ValFloat { + switch valueType { + case chunkenc.ValFloat: t, v = it.At() - } else { - // TODO(beorn7): Handle histograms. + case chunkenc.ValFloatHistogram, chunkenc.ValHistogram: + t, h = it.AtFloatHistogram() + default: t, v, _, ok = it.PeekBack(1) if !ok { - continue + continue Loop } } // The exposition formats do not support stale markers, so drop them. This @@ -135,7 +141,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { vec = append(vec, promql.Sample{ Metric: s.Labels(), - Point: promql.Point{T: t, V: v}, + Point: promql.Point{T: t, V: v, H: h}, }) } if ws := set.Warnings(); len(ws) > 0 { @@ -161,15 +167,22 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { sort.Strings(externalLabelNames) var ( - lastMetricName string - protMetricFam *dto.MetricFamily + lastMetricName string + lastWasHistogram, lastHistogramWasGauge bool + protMetricFam *dto.MetricFamily ) for _, s := range vec { + isHistogram := s.H != nil + if isHistogram && + format != expfmt.FmtProtoDelim && format != expfmt.FmtProtoText && format != expfmt.FmtProtoCompact { + // Can't serve the native histogram. + // TODO(codesome): Serve them when other protocols get the native histogram support. + continue + } + nameSeen := false globalUsed := map[string]struct{}{} - protMetric := &dto.Metric{ - Untyped: &dto.Untyped{}, - } + protMetric := &dto.Metric{} err := s.Metric.Validate(func(l labels.Label) error { if l.Value == "" { @@ -179,11 +192,18 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { } if l.Name == labels.MetricName { nameSeen = true - if l.Value == lastMetricName { - // We already have the name in the current MetricFamily, - // and we ignore nameless metrics. + if l.Value == lastMetricName && // We already have the name in the current MetricFamily, and we ignore nameless metrics. + lastWasHistogram == isHistogram && // The sample type matches (float vs histogram). + // If it was a histogram, the histogram type (counter vs gauge) also matches. + (!isHistogram || lastHistogramWasGauge == (s.H.CounterResetHint == histogram.GaugeType)) { return nil } + + // Since we now check for the sample type and type of histogram above, we will end up + // creating multiple metric families for the same metric name. This would technically be + // an invalid exposition. But since the consumer of this is Prometheus, and Prometheus can + // parse it fine, we allow it and bend the rules to make federation possible in those cases. + // Need to start a new MetricFamily. Ship off the old one (if any) before // creating the new one. if protMetricFam != nil { @@ -195,6 +215,13 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { Type: dto.MetricType_UNTYPED.Enum(), Name: proto.String(l.Value), } + if isHistogram { + if s.H.CounterResetHint == histogram.GaugeType { + protMetricFam.Type = dto.MetricType_GAUGE_HISTOGRAM.Enum() + } else { + protMetricFam.Type = dto.MetricType_HISTOGRAM.Enum() + } + } lastMetricName = l.Value return nil } @@ -228,9 +255,42 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { } protMetric.TimestampMs = proto.Int64(s.T) - protMetric.Untyped.Value = proto.Float64(s.V) - // TODO(beorn7): Handle histograms. - + if !isHistogram { + lastHistogramWasGauge = false + protMetric.Untyped = &dto.Untyped{ + Value: proto.Float64(s.V), + } + } else { + lastHistogramWasGauge = s.H.CounterResetHint == histogram.GaugeType + protMetric.Histogram = &dto.Histogram{ + SampleCountFloat: proto.Float64(s.H.Count), + SampleSum: proto.Float64(s.H.Sum), + Schema: proto.Int32(s.H.Schema), + ZeroThreshold: proto.Float64(s.H.ZeroThreshold), + ZeroCountFloat: proto.Float64(s.H.ZeroCount), + NegativeCount: s.H.NegativeBuckets, + PositiveCount: s.H.PositiveBuckets, + } + if len(s.H.PositiveSpans) > 0 { + protMetric.Histogram.PositiveSpan = make([]*dto.BucketSpan, len(s.H.PositiveSpans)) + for i, sp := range s.H.PositiveSpans { + protMetric.Histogram.PositiveSpan[i] = &dto.BucketSpan{ + Offset: proto.Int32(sp.Offset), + Length: proto.Uint32(sp.Length), + } + } + } + if len(s.H.NegativeSpans) > 0 { + protMetric.Histogram.NegativeSpan = make([]*dto.BucketSpan, len(s.H.NegativeSpans)) + for i, sp := range s.H.NegativeSpans { + protMetric.Histogram.NegativeSpan[i] = &dto.BucketSpan{ + Offset: proto.Int32(sp.Offset), + Length: proto.Uint32(sp.Length), + } + } + } + } + lastWasHistogram = isHistogram protMetricFam.Metric = append(protMetricFam.Metric, protMetric) } // Still have to ship off the last MetricFamily, if any. From 7a88bc3581f40827bdf848897c7241699f2c8089 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Mon, 9 Jan 2023 18:35:04 +0530 Subject: [PATCH 2/5] Test federation with native histograms Signed-off-by: Ganesh Vernekar --- web/federate_test.go | 115 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/web/federate_test.go b/web/federate_test.go index f39daedf1..b0def6f79 100644 --- a/web/federate_test.go +++ b/web/federate_test.go @@ -16,6 +16,8 @@ package web import ( "bytes" "context" + "fmt" + "io" "net/http" "net/http/httptest" "sort" @@ -28,7 +30,9 @@ import ( "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" @@ -299,3 +303,114 @@ func normalizeBody(body *bytes.Buffer) string { } return strings.Join(lines, "") } + +func TestFederationWithNativeHistograms(t *testing.T) { + suite, err := promql.NewTest(t, "") + if err != nil { + t.Fatal(err) + } + defer suite.Close() + + if err := suite.Run(); err != nil { + t.Fatal(err) + } + + var expVec promql.Vector + + db := suite.TSDB() + hist := &histogram.Histogram{ + Count: 10, + ZeroCount: 2, + ZeroThreshold: 0.001, + Sum: 39.4, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []int64{1, 1, -1, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + NegativeBuckets: []int64{1, 1, -1, 0}, + } + app := db.Appender(context.Background()) + for i := 0; i < 6; i++ { + l := labels.FromStrings("__name__", "test_metric", "foo", fmt.Sprintf("%d", i)) + expL := labels.FromStrings("__name__", "test_metric", "instance", "", "foo", fmt.Sprintf("%d", i)) + if i%3 == 0 { + _, err = app.Append(0, l, 101*60*1000, float64(i*100)) + expVec = append(expVec, promql.Sample{ + Point: promql.Point{T: 101 * 60 * 1000, V: float64(i * 100)}, + Metric: expL, + }) + } else { + hist.ZeroCount++ + _, err = app.AppendHistogram(0, l, 101*60*1000, hist.Copy(), nil) + expVec = append(expVec, promql.Sample{ + Point: promql.Point{T: 101 * 60 * 1000, H: hist.ToFloat()}, + Metric: expL, + }) + } + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + h := &Handler{ + localStorage: &dbAdapter{suite.TSDB()}, + lookbackDelta: 5 * time.Minute, + now: func() model.Time { return 101 * 60 * 1000 }, // 101min after epoch. + config: &config.Config{ + GlobalConfig: config.GlobalConfig{}, + }, + } + + req := httptest.NewRequest("GET", "http://example.org/federate?match[]=test_metric", nil) + req.Header.Add("Accept", `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited,application/openmetrics-text;version=1.0.0;q=0.8,application/openmetrics-text;version=0.0.1;q=0.75,text/plain;version=0.0.4;q=0.5,*/*;q=0.1`) + res := httptest.NewRecorder() + + h.federation(res, req) + + require.Equal(t, http.StatusOK, res.Code) + body, err := io.ReadAll(res.Body) + require.NoError(t, err) + + p := textparse.NewProtobufParser(body) + var actVec promql.Vector + metricFamilies := 0 + for { + et, err := p.Next() + if err == io.EOF { + break + } + require.NoError(t, err) + if et == textparse.EntryHelp { + metricFamilies++ + } + if et == textparse.EntryHistogram || et == textparse.EntrySeries { + l := labels.Labels{} + p.Metric(&l) + actVec = append(actVec, promql.Sample{Metric: l}) + } + if et == textparse.EntryHistogram { + _, parsedTimestamp, h, fh := p.Histogram() + require.Nil(t, h) + actVec[len(actVec)-1].Point = promql.Point{ + T: *parsedTimestamp, + H: fh, + } + } else if et == textparse.EntrySeries { + _, parsedTimestamp, v := p.Series() + actVec[len(actVec)-1].Point = promql.Point{ + T: *parsedTimestamp, + V: v, + } + } + } + + // TODO(codesome): Once PromQL is able to set the CounterResetHint on histograms, + // test it with switching histogram types for metric families. + require.Equal(t, 4, metricFamilies) + require.Equal(t, expVec, actVec) +} From 2e538be5d7474a0624cd11b3b33be7c6107ce33c Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Thu, 12 Jan 2023 13:33:45 +0530 Subject: [PATCH 3/5] docs: Update federation docs for native histograms Signed-off-by: Ganesh Vernekar --- docs/federation.md | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/docs/federation.md b/docs/federation.md index 0a241144a..3344a0ed0 100644 --- a/docs/federation.md +++ b/docs/federation.md @@ -8,8 +8,15 @@ sort_rank: 6 Federation allows a Prometheus server to scrape selected time series from another Prometheus server. -_Note about native histograms (experimental feature): Federation does not -support native histograms yet._ +_Note about native histograms (experimental feature): To scrape native histograms +via federation, the scraping Prometheus server needs to run with native histograms +enabled (via the command line flag `--enable-feature=native-histograms`), implying +that the protobuf format is used for scraping. Should the federated metrics contain +a mix of different sample types (float64, counter histogram, gauge histogram) for +the same metric name, the federation payload will contain multiple metric families +with the same name (but different types). Technically, this violates the rules of +the protobuf exposition format, but Prometheus is nevertheless able to ingest all +metrics correctly._ ## Use cases From b4e15899d1481bfcb9497bdc920cccb93a017559 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Thu, 12 Jan 2023 13:35:41 +0530 Subject: [PATCH 4/5] docs: Update recording rule docs about native histograms Signed-off-by: Ganesh Vernekar --- docs/configuration/recording_rules.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/configuration/recording_rules.md b/docs/configuration/recording_rules.md index 7c10971a4..d70ffa0cb 100644 --- a/docs/configuration/recording_rules.md +++ b/docs/configuration/recording_rules.md @@ -17,9 +17,9 @@ Rule files use YAML. The rule files can be reloaded at runtime by sending `SIGHUP` to the Prometheus process. The changes are only applied if all rule files are well-formatted. -_Note about native histograms (experimental feature): Rules evaluating to -native histograms do not yet work as expected. Instead of a native histogram, -the sample stored is just a floating point value of zero._ +_Note about native histograms (experimental feature): Native histogram are always +recorded as gauge histograms (for now). Most cases will create gauge histograms +naturally, e.g. after `rate()`._ ## Syntax-checking rules From d121db7a65fe76650ed177c4a4ad44f1e3c6a145 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Thu, 12 Jan 2023 15:20:50 +0100 Subject: [PATCH 5/5] federate: Fix PeekBack usage In most cases, there is no sample at `maxt`, so `PeekBack` has to be used. So far, `PeekBack` did not return a float histogram, and we disregarded even any returned normal histogram. This fixes both, and also tweaks the unit test to discover the problem (by using an earlier timestamp than "now" for the samples in the TSDB). Signed-off-by: beorn7 --- storage/buffer.go | 6 ++++-- storage/buffer_test.go | 2 +- web/federate.go | 12 ++++++++---- web/federate_test.go | 8 ++++---- 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/storage/buffer.go b/storage/buffer.go index dc9e9bca3..92767cdd7 100644 --- a/storage/buffer.go +++ b/storage/buffer.go @@ -68,9 +68,11 @@ func (b *BufferedSeriesIterator) ReduceDelta(delta int64) bool { // PeekBack returns the nth previous element of the iterator. If there is none buffered, // ok is false. -func (b *BufferedSeriesIterator) PeekBack(n int) (t int64, v float64, h *histogram.Histogram, ok bool) { +func (b *BufferedSeriesIterator) PeekBack(n int) ( + t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, ok bool, +) { s, ok := b.buf.nthLast(n) - return s.t, s.v, s.h, ok + return s.t, s.v, s.h, s.fh, ok } // Buffer returns an iterator over the buffered data. Invalidates previously diff --git a/storage/buffer_test.go b/storage/buffer_test.go index aac958397..44d11f0ed 100644 --- a/storage/buffer_test.go +++ b/storage/buffer_test.go @@ -107,7 +107,7 @@ func TestBufferedSeriesIterator(t *testing.T) { require.Equal(t, ev, v, "value mismatch") } prevSampleEq := func(ets int64, ev float64, eok bool) { - ts, v, _, ok := it.PeekBack(1) + ts, v, _, _, ok := it.PeekBack(1) require.Equal(t, eok, ok, "exist mismatch") require.Equal(t, ets, ts, "timestamp mismatch") require.Equal(t, ev, v, "value mismatch") diff --git a/web/federate.go b/web/federate.go index 1589c893e..93526771b 100644 --- a/web/federate.go +++ b/web/federate.go @@ -116,7 +116,8 @@ Loop: var ( t int64 v float64 - h *histogram.FloatHistogram + h *histogram.Histogram + fh *histogram.FloatHistogram ok bool ) valueType := it.Seek(maxt) @@ -124,12 +125,15 @@ Loop: case chunkenc.ValFloat: t, v = it.At() case chunkenc.ValFloatHistogram, chunkenc.ValHistogram: - t, h = it.AtFloatHistogram() + t, fh = it.AtFloatHistogram() default: - t, v, _, ok = it.PeekBack(1) + t, v, h, fh, ok = it.PeekBack(1) if !ok { continue Loop } + if h != nil { + fh = h.ToFloat() + } } // The exposition formats do not support stale markers, so drop them. This // is good enough for staleness handling of federated data, as the @@ -141,7 +145,7 @@ Loop: vec = append(vec, promql.Sample{ Metric: s.Labels(), - Point: promql.Point{T: t, V: v, H: h}, + Point: promql.Point{T: t, V: v, H: fh}, }) } if ws := set.Warnings(); len(ws) > 0 { diff --git a/web/federate_test.go b/web/federate_test.go index b0def6f79..944b0f1ec 100644 --- a/web/federate_test.go +++ b/web/federate_test.go @@ -340,16 +340,16 @@ func TestFederationWithNativeHistograms(t *testing.T) { l := labels.FromStrings("__name__", "test_metric", "foo", fmt.Sprintf("%d", i)) expL := labels.FromStrings("__name__", "test_metric", "instance", "", "foo", fmt.Sprintf("%d", i)) if i%3 == 0 { - _, err = app.Append(0, l, 101*60*1000, float64(i*100)) + _, err = app.Append(0, l, 100*60*1000, float64(i*100)) expVec = append(expVec, promql.Sample{ - Point: promql.Point{T: 101 * 60 * 1000, V: float64(i * 100)}, + Point: promql.Point{T: 100 * 60 * 1000, V: float64(i * 100)}, Metric: expL, }) } else { hist.ZeroCount++ - _, err = app.AppendHistogram(0, l, 101*60*1000, hist.Copy(), nil) + _, err = app.AppendHistogram(0, l, 100*60*1000, hist.Copy(), nil) expVec = append(expVec, promql.Sample{ - Point: promql.Point{T: 101 * 60 * 1000, H: hist.ToFloat()}, + Point: promql.Point{T: 100 * 60 * 1000, H: hist.ToFloat()}, Metric: expL, }) }