From 61bd698143ce43164976d0235c5a696d931f4261 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 30 Dec 2016 19:34:45 +0100 Subject: [PATCH] web: implement federation for new storage --- web/federate.go | 226 +++++++++++++++++++++++++++--------------------- 1 file changed, 126 insertions(+), 100 deletions(-) diff --git a/web/federate.go b/web/federate.go index e08c2b4e4..5b0fba7a6 100644 --- a/web/federate.go +++ b/web/federate.go @@ -15,14 +15,19 @@ package web import ( "net/http" + "sort" - "github.com/pkg/errors" + "github.com/gogo/protobuf/proto" "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage" ) var ( @@ -49,124 +54,145 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { } var ( - // minTimestamp = h.now().Add(-promql.StalenessDelta) + mint = timestamp.FromTime(h.now().Time().Add(-promql.StalenessDelta)) + maxt = timestamp.FromTime(h.now().Time()) format = expfmt.Negotiate(req.Header) - // enc = expfmt.NewEncoder(w, format) + enc = expfmt.NewEncoder(w, format) ) w.Header().Set("Content-Type", string(format)) - federationErrors.Inc() - http.Error(w, errors.Errorf("federation disabled").Error(), http.StatusInternalServerError) - return + q, err := h.storage.Querier(mint, maxt) + if err != nil { + federationErrors.Inc() + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer q.Close() - // q, err := h.storage.Querier() - // if err != nil { - // federationErrors.Inc() - // http.Error(w, err.Error(), http.StatusInternalServerError) - // return - // } - // defer q.Close() + // TODO(fabxc): expose merge functionality in storage interface. + // We just concatenate results for all sets of matchers, which may produce + // duplicated results. + vec := make(promql.Vector, 0, 8000) - // TODO(fabxc): support via TSDB storage. + for _, mset := range matcherSets { + series := q.Select(mset...) + for series.Next() { + s := series.Series() + // TODO(fabxc): allow fast path for most recent sample either + // in the storage itself or caching layer in Prometheus. + it := storage.NewBuffer(s.Iterator(), int64(promql.StalenessDelta/1e6)) - // var sets []tsdb.SeriesSet - // for _, matchers := range matcherSets { - // set, err := q.Select(matchers) - // sets = append(sets, set) - // } + var t int64 + var v float64 - // vector, err := q.LastSampleForLabelMatchers(h.context, minTimestamp, matcherSets...) - // if err != nil { - // federationErrors.Inc() - // http.Error(w, err.Error(), http.StatusInternalServerError) - // return - // } - // sort.Sort(byName(vector)) + ok := it.Seek(maxt) + if ok { + t, v = it.Values() + } else { + t, v, ok = it.PeekBack() + if !ok { + continue + } + } - // var ( - // lastMetricName model.LabelValue - // protMetricFam *dto.MetricFamily - // ) - // for _, s := range vector { - // nameSeen := false - // globalUsed := map[model.LabelName]struct{}{} - // protMetric := &dto.Metric{ - // Untyped: &dto.Untyped{}, - // } + vec = append(vec, promql.Sample{ + Metric: s.Labels(), + Point: promql.Point{T: t, V: v}, + }) + } + if series.Err() != nil { + federationErrors.Inc() + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } - // for ln, lv := range s.Metric { - // if lv == "" { - // // No value means unset. Never consider those labels. - // // This is also important to protect against nameless metrics. - // continue - // } - // if ln == model.MetricNameLabel { - // nameSeen = true - // if lv == lastMetricName { - // // We already have the name in the current MetricFamily, - // // and we ignore nameless metrics. - // continue - // } - // // Need to start a new MetricFamily. Ship off the old one (if any) before - // // creating the new one. - // if protMetricFam != nil { - // if err := enc.Encode(protMetricFam); err != nil { - // federationErrors.Inc() - // log.With("err", err).Error("federation failed") - // return - // } - // } - // protMetricFam = &dto.MetricFamily{ - // Type: dto.MetricType_UNTYPED.Enum(), - // Name: proto.String(string(lv)), - // } - // lastMetricName = lv - // continue - // } - // protMetric.Label = append(protMetric.Label, &dto.LabelPair{ - // Name: proto.String(string(ln)), - // Value: proto.String(string(lv)), - // }) - // if _, ok := h.externalLabels[ln]; ok { - // globalUsed[ln] = struct{}{} - // } - // } - // if !nameSeen { - // log.With("metric", s.Metric).Warn("Ignoring nameless metric during federation.") - // continue - // } - // // Attach global labels if they do not exist yet. - // for ln, lv := range h.externalLabels { - // if _, ok := globalUsed[ln]; !ok { - // protMetric.Label = append(protMetric.Label, &dto.LabelPair{ - // Name: proto.String(string(ln)), - // Value: proto.String(string(lv)), - // }) - // } - // } + sort.Sort(byName(vec)) - // protMetric.TimestampMs = proto.Int64(int64(s.Timestamp)) - // protMetric.Untyped.Value = proto.Float64(float64(s.Value)) + var ( + lastMetricName string + protMetricFam *dto.MetricFamily + ) + for _, s := range vec { + nameSeen := false + globalUsed := map[string]struct{}{} + protMetric := &dto.Metric{ + Untyped: &dto.Untyped{}, + } - // protMetricFam.Metric = append(protMetricFam.Metric, protMetric) - // } - // // Still have to ship off the last MetricFamily, if any. - // if protMetricFam != nil { - // if err := enc.Encode(protMetricFam); err != nil { - // federationErrors.Inc() - // log.With("err", err).Error("federation failed") - // } - // } + for _, l := range s.Metric { + if l.Value == "" { + // No value means unset. Never consider those labels. + // This is also important to protect against nameless metrics. + continue + } + if l.Name == labels.MetricName { + nameSeen = true + if l.Value == lastMetricName { + // We already have the name in the current MetricFamily, + // and we ignore nameless metrics. + continue + } + // Need to start a new MetricFamily. Ship off the old one (if any) before + // creating the new one. + if protMetricFam != nil { + if err := enc.Encode(protMetricFam); err != nil { + federationErrors.Inc() + log.With("err", err).Error("federation failed") + return + } + } + protMetricFam = &dto.MetricFamily{ + Type: dto.MetricType_UNTYPED.Enum(), + Name: proto.String(l.Value), + } + lastMetricName = l.Value + continue + } + protMetric.Label = append(protMetric.Label, &dto.LabelPair{ + Name: proto.String(l.Name), + Value: proto.String(l.Value), + }) + if _, ok := h.externalLabels[model.LabelName(l.Name)]; ok { + globalUsed[l.Name] = struct{}{} + } + } + if !nameSeen { + log.With("metric", s.Metric).Warn("Ignoring nameless metric during federation.") + continue + } + // Attach global labels if they do not exist yet. + for ln, lv := range h.externalLabels { + if _, ok := globalUsed[string(ln)]; !ok { + protMetric.Label = append(protMetric.Label, &dto.LabelPair{ + Name: proto.String(string(ln)), + Value: proto.String(string(lv)), + }) + } + } + + protMetric.TimestampMs = proto.Int64(s.T) + protMetric.Untyped.Value = proto.Float64(s.V) + + protMetricFam.Metric = append(protMetricFam.Metric, protMetric) + } + // Still have to ship off the last MetricFamily, if any. + if protMetricFam != nil { + if err := enc.Encode(protMetricFam); err != nil { + federationErrors.Inc() + log.With("err", err).Error("federation failed") + } + } } // byName makes a model.Vector sortable by metric name. -type byName model.Vector +type byName promql.Vector func (vec byName) Len() int { return len(vec) } func (vec byName) Swap(i, j int) { vec[i], vec[j] = vec[j], vec[i] } func (vec byName) Less(i, j int) bool { - ni := vec[i].Metric[model.MetricNameLabel] - nj := vec[j].Metric[model.MetricNameLabel] + ni := vec[i].Metric.Get(labels.MetricName) + nj := vec[j].Metric.Get(labels.MetricName) return ni < nj }