From 01834fa528abbe380a89141ba31cfca90f1da4c4 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 21 Aug 2015 23:34:51 +0200 Subject: [PATCH] Move metric modifications into SampleAppenders --- retrieval/target.go | 139 ++++++++++++++++++++++++++++++-------------- 1 file changed, 94 insertions(+), 45 deletions(-) diff --git a/retrieval/target.go b/retrieval/target.go index d175336cd..b778359ab 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -212,7 +212,9 @@ func (t *Target) Update(cfg *config.ScrapeConfig, baseLabels, metaLabels model.L t.url.Scheme = string(baseLabels[model.SchemeLabel]) t.url.Path = string(baseLabels[model.MetricsPathLabel]) + params := url.Values{} + for k, v := range cfg.Params { params[k] = make([]string, len(v)) copy(params[k], v) @@ -402,22 +404,41 @@ func (t *Target) ingest(s model.Vector) error { const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1` -func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) { +func (t *Target) scrape(appender storage.SampleAppender) (err error) { start := time.Now() baseLabels := t.BaseLabels() - t.RLock() - var ( - honorLabels = t.honorLabels - httpClient = t.httpClient - metricRelabelConfigs = t.metricRelabelConfigs - ) - t.RUnlock() - - defer func() { + defer func(appender storage.SampleAppender) { t.status.setLastError(err) - recordScrapeHealth(sampleAppender, start, baseLabels, t.status.Health(), time.Since(start)) - }() + recordScrapeHealth(appender, start, baseLabels, t.status.Health(), time.Since(start)) + }(appender) + + t.RLock() + + // The relabelAppender has to be inside the label-modifying appenders + // so the relabeling rules are applied to the correct label set. + if len(t.metricRelabelConfigs) > 0 { + appender = relabelAppender{ + app: appender, + relabelings: t.metricRelabelConfigs, + } + } + + if t.honorLabels { + appender = honorLabelsAppender{ + app: appender, + labels: baseLabels, + } + } else { + appender = ruleLabelsAppender{ + app: appender, + labels: baseLabels, + } + } + + httpClient := t.httpClient + + t.RUnlock() req, err := http.NewRequest("GET", t.URL().String(), nil) if err != nil { @@ -450,7 +471,7 @@ func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) { go func() { for { - // TODO(fabxc): Changex the SampleAppender interface to return an error + // TODO(fabxc): Change the SampleAppender interface to return an error // so we can proceed based on the status and don't leak goroutines trying // to append a single sample after dropping all the other ones. // @@ -468,38 +489,7 @@ func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) { for samples := range t.ingestedSamples { for _, s := range samples { - if honorLabels { - // Merge the metric with the baseLabels for labels not already set in the - // metric. This also considers labels explicitly set to the empty string. - for ln, lv := range baseLabels { - if _, ok := s.Metric[ln]; !ok { - s.Metric[ln] = lv - } - } - } else { - // Merge the ingested metric with the base label set. On a collision the - // value of the label is stored in a label prefixed with the exported prefix. - for ln, lv := range baseLabels { - if v, ok := s.Metric[ln]; ok && v != "" { - s.Metric[model.ExportedLabelPrefix+ln] = v - } - s.Metric[ln] = lv - } - } - // Avoid the copy in Relabel if there are no configs. - if len(metricRelabelConfigs) > 0 { - labels, err := Relabel(model.LabelSet(s.Metric), metricRelabelConfigs...) - if err != nil { - log.Errorf("Error while relabeling metric %s of instance %s: %s", s.Metric, req.URL, err) - continue - } - // Check if the timeseries was dropped. - if labels == nil { - continue - } - s.Metric = model.Metric(labels) - } - sampleAppender.Append(s) + appender.Append(s) } } @@ -509,10 +499,69 @@ func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) { return err } +// Merges the ingested sample's metric with the label set. On a collision the +// value of the ingested label is stored in a label prefixed with 'exported_'. +type ruleLabelsAppender struct { + app storage.SampleAppender + labels model.LabelSet +} + +func (app ruleLabelsAppender) Append(s *model.Sample) { + for ln, lv := range app.labels { + if v, ok := s.Metric[ln]; ok && v != "" { + s.Metric[model.ExportedLabelPrefix+ln] = v + } + s.Metric[ln] = lv + } + + app.app.Append(s) +} + +type honorLabelsAppender struct { + app storage.SampleAppender + labels model.LabelSet +} + +// Merges the sample's metric with the given labels if the label is not +// already present in the metric. +// This also considers labels explicitly set to the empty string. +func (app honorLabelsAppender) Append(s *model.Sample) { + for ln, lv := range app.labels { + if _, ok := s.Metric[ln]; !ok { + s.Metric[ln] = lv + } + } + + app.app.Append(s) +} + +// Applies a set of relabel configurations to the sample's metric +// before actually appending it. +type relabelAppender struct { + app storage.SampleAppender + relabelings []*config.RelabelConfig +} + +func (app relabelAppender) Append(s *model.Sample) { + labels, err := Relabel(model.LabelSet(s.Metric), app.relabelings...) + if err != nil { + log.Errorf("Error while relabeling metric %s: %s", s.Metric, err) + return + } + // Check if the timeseries was dropped. + if labels == nil { + return + } + s.Metric = model.Metric(labels) + + app.app.Append(s) +} + // URL returns a copy of the target's URL. func (t *Target) URL() *url.URL { t.RLock() defer t.RUnlock() + u := &url.URL{} *u = *t.url return u