diff --git a/docs/configuration/recording_rules.md b/docs/configuration/recording_rules.md index 7c10971a4..d70ffa0cb 100644 --- a/docs/configuration/recording_rules.md +++ b/docs/configuration/recording_rules.md @@ -17,9 +17,9 @@ Rule files use YAML. The rule files can be reloaded at runtime by sending `SIGHUP` to the Prometheus process. The changes are only applied if all rule files are well-formatted. -_Note about native histograms (experimental feature): Rules evaluating to -native histograms do not yet work as expected. Instead of a native histogram, -the sample stored is just a floating point value of zero._ +_Note about native histograms (experimental feature): Native histogram are always +recorded as gauge histograms (for now). Most cases will create gauge histograms +naturally, e.g. after `rate()`._ ## Syntax-checking rules diff --git a/docs/federation.md b/docs/federation.md index 0a241144a..3344a0ed0 100644 --- a/docs/federation.md +++ b/docs/federation.md @@ -8,8 +8,15 @@ sort_rank: 6 Federation allows a Prometheus server to scrape selected time series from another Prometheus server. -_Note about native histograms (experimental feature): Federation does not -support native histograms yet._ +_Note about native histograms (experimental feature): To scrape native histograms +via federation, the scraping Prometheus server needs to run with native histograms +enabled (via the command line flag `--enable-feature=native-histograms`), implying +that the protobuf format is used for scraping. Should the federated metrics contain +a mix of different sample types (float64, counter histogram, gauge histogram) for +the same metric name, the federation payload will contain multiple metric families +with the same name (but different types). Technically, this violates the rules of +the protobuf exposition format, but Prometheus is nevertheless able to ingest all +metrics correctly._ ## Use cases diff --git a/storage/buffer.go b/storage/buffer.go index dc9e9bca3..92767cdd7 100644 --- a/storage/buffer.go +++ b/storage/buffer.go @@ -68,9 +68,11 @@ func (b *BufferedSeriesIterator) ReduceDelta(delta int64) bool { // PeekBack returns the nth previous element of the iterator. If there is none buffered, // ok is false. -func (b *BufferedSeriesIterator) PeekBack(n int) (t int64, v float64, h *histogram.Histogram, ok bool) { +func (b *BufferedSeriesIterator) PeekBack(n int) ( + t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, ok bool, +) { s, ok := b.buf.nthLast(n) - return s.t, s.v, s.h, ok + return s.t, s.v, s.h, s.fh, ok } // Buffer returns an iterator over the buffered data. Invalidates previously diff --git a/storage/buffer_test.go b/storage/buffer_test.go index aac958397..44d11f0ed 100644 --- a/storage/buffer_test.go +++ b/storage/buffer_test.go @@ -107,7 +107,7 @@ func TestBufferedSeriesIterator(t *testing.T) { require.Equal(t, ev, v, "value mismatch") } prevSampleEq := func(ets int64, ev float64, eok bool) { - ts, v, _, ok := it.PeekBack(1) + ts, v, _, _, ok := it.PeekBack(1) require.Equal(t, eok, ok, "exist mismatch") require.Equal(t, ets, ts, "timestamp mismatch") require.Equal(t, ev, v, "value mismatch") diff --git a/web/federate.go b/web/federate.go index 85472bb44..93526771b 100644 --- a/web/federate.go +++ b/web/federate.go @@ -26,6 +26,7 @@ import ( "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/model/value" @@ -103,6 +104,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge) it := storage.NewBuffer(int64(h.lookbackDelta / 1e6)) var chkIter chunkenc.Iterator +Loop: for set.Next() { s := set.At() @@ -111,18 +113,26 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { chkIter = s.Iterator(chkIter) it.Reset(chkIter) - var t int64 - var v float64 - var ok bool - + var ( + t int64 + v float64 + h *histogram.Histogram + fh *histogram.FloatHistogram + ok bool + ) valueType := it.Seek(maxt) - if valueType == chunkenc.ValFloat { + switch valueType { + case chunkenc.ValFloat: t, v = it.At() - } else { - // TODO(beorn7): Handle histograms. - t, v, _, ok = it.PeekBack(1) + case chunkenc.ValFloatHistogram, chunkenc.ValHistogram: + t, fh = it.AtFloatHistogram() + default: + t, v, h, fh, ok = it.PeekBack(1) if !ok { - continue + continue Loop + } + if h != nil { + fh = h.ToFloat() } } // The exposition formats do not support stale markers, so drop them. This @@ -135,7 +145,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { vec = append(vec, promql.Sample{ Metric: s.Labels(), - Point: promql.Point{T: t, V: v}, + Point: promql.Point{T: t, V: v, H: fh}, }) } if ws := set.Warnings(); len(ws) > 0 { @@ -161,15 +171,22 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { sort.Strings(externalLabelNames) var ( - lastMetricName string - protMetricFam *dto.MetricFamily + lastMetricName string + lastWasHistogram, lastHistogramWasGauge bool + protMetricFam *dto.MetricFamily ) for _, s := range vec { + isHistogram := s.H != nil + if isHistogram && + format != expfmt.FmtProtoDelim && format != expfmt.FmtProtoText && format != expfmt.FmtProtoCompact { + // Can't serve the native histogram. + // TODO(codesome): Serve them when other protocols get the native histogram support. + continue + } + nameSeen := false globalUsed := map[string]struct{}{} - protMetric := &dto.Metric{ - Untyped: &dto.Untyped{}, - } + protMetric := &dto.Metric{} err := s.Metric.Validate(func(l labels.Label) error { if l.Value == "" { @@ -179,11 +196,18 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { } if l.Name == labels.MetricName { nameSeen = true - if l.Value == lastMetricName { - // We already have the name in the current MetricFamily, - // and we ignore nameless metrics. + if l.Value == lastMetricName && // We already have the name in the current MetricFamily, and we ignore nameless metrics. + lastWasHistogram == isHistogram && // The sample type matches (float vs histogram). + // If it was a histogram, the histogram type (counter vs gauge) also matches. + (!isHistogram || lastHistogramWasGauge == (s.H.CounterResetHint == histogram.GaugeType)) { return nil } + + // Since we now check for the sample type and type of histogram above, we will end up + // creating multiple metric families for the same metric name. This would technically be + // an invalid exposition. But since the consumer of this is Prometheus, and Prometheus can + // parse it fine, we allow it and bend the rules to make federation possible in those cases. + // Need to start a new MetricFamily. Ship off the old one (if any) before // creating the new one. if protMetricFam != nil { @@ -195,6 +219,13 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { Type: dto.MetricType_UNTYPED.Enum(), Name: proto.String(l.Value), } + if isHistogram { + if s.H.CounterResetHint == histogram.GaugeType { + protMetricFam.Type = dto.MetricType_GAUGE_HISTOGRAM.Enum() + } else { + protMetricFam.Type = dto.MetricType_HISTOGRAM.Enum() + } + } lastMetricName = l.Value return nil } @@ -228,9 +259,42 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { } protMetric.TimestampMs = proto.Int64(s.T) - protMetric.Untyped.Value = proto.Float64(s.V) - // TODO(beorn7): Handle histograms. - + if !isHistogram { + lastHistogramWasGauge = false + protMetric.Untyped = &dto.Untyped{ + Value: proto.Float64(s.V), + } + } else { + lastHistogramWasGauge = s.H.CounterResetHint == histogram.GaugeType + protMetric.Histogram = &dto.Histogram{ + SampleCountFloat: proto.Float64(s.H.Count), + SampleSum: proto.Float64(s.H.Sum), + Schema: proto.Int32(s.H.Schema), + ZeroThreshold: proto.Float64(s.H.ZeroThreshold), + ZeroCountFloat: proto.Float64(s.H.ZeroCount), + NegativeCount: s.H.NegativeBuckets, + PositiveCount: s.H.PositiveBuckets, + } + if len(s.H.PositiveSpans) > 0 { + protMetric.Histogram.PositiveSpan = make([]*dto.BucketSpan, len(s.H.PositiveSpans)) + for i, sp := range s.H.PositiveSpans { + protMetric.Histogram.PositiveSpan[i] = &dto.BucketSpan{ + Offset: proto.Int32(sp.Offset), + Length: proto.Uint32(sp.Length), + } + } + } + if len(s.H.NegativeSpans) > 0 { + protMetric.Histogram.NegativeSpan = make([]*dto.BucketSpan, len(s.H.NegativeSpans)) + for i, sp := range s.H.NegativeSpans { + protMetric.Histogram.NegativeSpan[i] = &dto.BucketSpan{ + Offset: proto.Int32(sp.Offset), + Length: proto.Uint32(sp.Length), + } + } + } + } + lastWasHistogram = isHistogram protMetricFam.Metric = append(protMetricFam.Metric, protMetric) } // Still have to ship off the last MetricFamily, if any. diff --git a/web/federate_test.go b/web/federate_test.go index f39daedf1..944b0f1ec 100644 --- a/web/federate_test.go +++ b/web/federate_test.go @@ -16,6 +16,8 @@ package web import ( "bytes" "context" + "fmt" + "io" "net/http" "net/http/httptest" "sort" @@ -28,7 +30,9 @@ import ( "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" @@ -299,3 +303,114 @@ func normalizeBody(body *bytes.Buffer) string { } return strings.Join(lines, "") } + +func TestFederationWithNativeHistograms(t *testing.T) { + suite, err := promql.NewTest(t, "") + if err != nil { + t.Fatal(err) + } + defer suite.Close() + + if err := suite.Run(); err != nil { + t.Fatal(err) + } + + var expVec promql.Vector + + db := suite.TSDB() + hist := &histogram.Histogram{ + Count: 10, + ZeroCount: 2, + ZeroThreshold: 0.001, + Sum: 39.4, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []int64{1, 1, -1, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + NegativeBuckets: []int64{1, 1, -1, 0}, + } + app := db.Appender(context.Background()) + for i := 0; i < 6; i++ { + l := labels.FromStrings("__name__", "test_metric", "foo", fmt.Sprintf("%d", i)) + expL := labels.FromStrings("__name__", "test_metric", "instance", "", "foo", fmt.Sprintf("%d", i)) + if i%3 == 0 { + _, err = app.Append(0, l, 100*60*1000, float64(i*100)) + expVec = append(expVec, promql.Sample{ + Point: promql.Point{T: 100 * 60 * 1000, V: float64(i * 100)}, + Metric: expL, + }) + } else { + hist.ZeroCount++ + _, err = app.AppendHistogram(0, l, 100*60*1000, hist.Copy(), nil) + expVec = append(expVec, promql.Sample{ + Point: promql.Point{T: 100 * 60 * 1000, H: hist.ToFloat()}, + Metric: expL, + }) + } + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + h := &Handler{ + localStorage: &dbAdapter{suite.TSDB()}, + lookbackDelta: 5 * time.Minute, + now: func() model.Time { return 101 * 60 * 1000 }, // 101min after epoch. + config: &config.Config{ + GlobalConfig: config.GlobalConfig{}, + }, + } + + req := httptest.NewRequest("GET", "http://example.org/federate?match[]=test_metric", nil) + req.Header.Add("Accept", `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited,application/openmetrics-text;version=1.0.0;q=0.8,application/openmetrics-text;version=0.0.1;q=0.75,text/plain;version=0.0.4;q=0.5,*/*;q=0.1`) + res := httptest.NewRecorder() + + h.federation(res, req) + + require.Equal(t, http.StatusOK, res.Code) + body, err := io.ReadAll(res.Body) + require.NoError(t, err) + + p := textparse.NewProtobufParser(body) + var actVec promql.Vector + metricFamilies := 0 + for { + et, err := p.Next() + if err == io.EOF { + break + } + require.NoError(t, err) + if et == textparse.EntryHelp { + metricFamilies++ + } + if et == textparse.EntryHistogram || et == textparse.EntrySeries { + l := labels.Labels{} + p.Metric(&l) + actVec = append(actVec, promql.Sample{Metric: l}) + } + if et == textparse.EntryHistogram { + _, parsedTimestamp, h, fh := p.Histogram() + require.Nil(t, h) + actVec[len(actVec)-1].Point = promql.Point{ + T: *parsedTimestamp, + H: fh, + } + } else if et == textparse.EntrySeries { + _, parsedTimestamp, v := p.Series() + actVec[len(actVec)-1].Point = promql.Point{ + T: *parsedTimestamp, + V: v, + } + } + } + + // TODO(codesome): Once PromQL is able to set the CounterResetHint on histograms, + // test it with switching histogram types for metric families. + require.Equal(t, 4, metricFamilies) + require.Equal(t, expVec, actVec) +}