From f8b20f30accd559c3bc14bc8c5a22bceafc0a1c4 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Mon, 12 Aug 2013 15:15:41 +0200 Subject: [PATCH] Make retrieval work with client's new Ingester interface. --- retrieval/target.go | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/retrieval/target.go b/retrieval/target.go index 8bff1d7fd..9f90406e7 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -202,6 +202,19 @@ func (t *target) Scrape(earliest time.Time, results chan<- *extraction.Result) e const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,application/json;schema=prometheus/telemetry;version=0.0.2;q=0.2,*/*;q=0.1` +type extendLabelsIngester struct { + baseLabels clientmodel.LabelSet + results chan<- *extraction.Result +} + +func (i *extendLabelsIngester) Ingest(r *extraction.Result) error { + for _, s := range r.Samples { + s.Metric.MergeFromLabelSet(i.baseLabels, clientmodel.ExporterLabelPrefix) + } + i.results <- r + return nil +} + func (t *target) scrape(timestamp time.Time, results chan<- *extraction.Result) (err error) { defer func(start time.Time) { ms := float64(time.Since(start)) / float64(time.Millisecond) @@ -238,20 +251,22 @@ func (t *target) scrape(timestamp time.Time, results chan<- *extraction.Result) baseLabels[baseLabel] = baseValue } - processOptions := &extraction.ProcessOptions{ - Timestamp: timestamp, - BaseLabels: baseLabels, - } - // N.B. - It is explicitly required to extract the entire payload before - // attempting to deserialize, as the underlying reader expects will - // interpret pending data as a truncated message. + // attempting to deserialize, as the underlying reader will interpret + // pending data as a truncated message. buf := new(bytes.Buffer) if _, err := buf.ReadFrom(resp.Body); err != nil { return err } - return processor.ProcessSingle(buf, results, processOptions) + ingester := &extendLabelsIngester{ + baseLabels: baseLabels, + results: results, + } + processOptions := &extraction.ProcessOptions{ + Timestamp: timestamp, + } + return processor.ProcessSingle(buf, ingester, processOptions) } func (t *target) State() TargetState {