Move metric modifications into SampleAppenders
This commit is contained in:
parent
6d0f58dcf3
commit
01834fa528
|
@ -212,7 +212,9 @@ func (t *Target) Update(cfg *config.ScrapeConfig, baseLabels, metaLabels model.L
|
|||
|
||||
t.url.Scheme = string(baseLabels[model.SchemeLabel])
|
||||
t.url.Path = string(baseLabels[model.MetricsPathLabel])
|
||||
|
||||
params := url.Values{}
|
||||
|
||||
for k, v := range cfg.Params {
|
||||
params[k] = make([]string, len(v))
|
||||
copy(params[k], v)
|
||||
|
@ -402,22 +404,41 @@ func (t *Target) ingest(s model.Vector) error {
|
|||
|
||||
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`
|
||||
|
||||
func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) {
|
||||
func (t *Target) scrape(appender storage.SampleAppender) (err error) {
|
||||
start := time.Now()
|
||||
baseLabels := t.BaseLabels()
|
||||
|
||||
t.RLock()
|
||||
var (
|
||||
honorLabels = t.honorLabels
|
||||
httpClient = t.httpClient
|
||||
metricRelabelConfigs = t.metricRelabelConfigs
|
||||
)
|
||||
t.RUnlock()
|
||||
|
||||
defer func() {
|
||||
defer func(appender storage.SampleAppender) {
|
||||
t.status.setLastError(err)
|
||||
recordScrapeHealth(sampleAppender, start, baseLabels, t.status.Health(), time.Since(start))
|
||||
}()
|
||||
recordScrapeHealth(appender, start, baseLabels, t.status.Health(), time.Since(start))
|
||||
}(appender)
|
||||
|
||||
t.RLock()
|
||||
|
||||
// The relabelAppender has to be inside the label-modifying appenders
|
||||
// so the relabeling rules are applied to the correct label set.
|
||||
if len(t.metricRelabelConfigs) > 0 {
|
||||
appender = relabelAppender{
|
||||
app: appender,
|
||||
relabelings: t.metricRelabelConfigs,
|
||||
}
|
||||
}
|
||||
|
||||
if t.honorLabels {
|
||||
appender = honorLabelsAppender{
|
||||
app: appender,
|
||||
labels: baseLabels,
|
||||
}
|
||||
} else {
|
||||
appender = ruleLabelsAppender{
|
||||
app: appender,
|
||||
labels: baseLabels,
|
||||
}
|
||||
}
|
||||
|
||||
httpClient := t.httpClient
|
||||
|
||||
t.RUnlock()
|
||||
|
||||
req, err := http.NewRequest("GET", t.URL().String(), nil)
|
||||
if err != nil {
|
||||
|
@ -450,7 +471,7 @@ func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) {
|
|||
|
||||
go func() {
|
||||
for {
|
||||
// TODO(fabxc): Changex the SampleAppender interface to return an error
|
||||
// TODO(fabxc): Change the SampleAppender interface to return an error
|
||||
// so we can proceed based on the status and don't leak goroutines trying
|
||||
// to append a single sample after dropping all the other ones.
|
||||
//
|
||||
|
@ -468,38 +489,7 @@ func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) {
|
|||
|
||||
for samples := range t.ingestedSamples {
|
||||
for _, s := range samples {
|
||||
if honorLabels {
|
||||
// Merge the metric with the baseLabels for labels not already set in the
|
||||
// metric. This also considers labels explicitly set to the empty string.
|
||||
for ln, lv := range baseLabels {
|
||||
if _, ok := s.Metric[ln]; !ok {
|
||||
s.Metric[ln] = lv
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Merge the ingested metric with the base label set. On a collision the
|
||||
// value of the label is stored in a label prefixed with the exported prefix.
|
||||
for ln, lv := range baseLabels {
|
||||
if v, ok := s.Metric[ln]; ok && v != "" {
|
||||
s.Metric[model.ExportedLabelPrefix+ln] = v
|
||||
}
|
||||
s.Metric[ln] = lv
|
||||
}
|
||||
}
|
||||
// Avoid the copy in Relabel if there are no configs.
|
||||
if len(metricRelabelConfigs) > 0 {
|
||||
labels, err := Relabel(model.LabelSet(s.Metric), metricRelabelConfigs...)
|
||||
if err != nil {
|
||||
log.Errorf("Error while relabeling metric %s of instance %s: %s", s.Metric, req.URL, err)
|
||||
continue
|
||||
}
|
||||
// Check if the timeseries was dropped.
|
||||
if labels == nil {
|
||||
continue
|
||||
}
|
||||
s.Metric = model.Metric(labels)
|
||||
}
|
||||
sampleAppender.Append(s)
|
||||
appender.Append(s)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -509,10 +499,69 @@ func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) {
|
|||
return err
|
||||
}
|
||||
|
||||
// Merges the ingested sample's metric with the label set. On a collision the
|
||||
// value of the ingested label is stored in a label prefixed with 'exported_'.
|
||||
type ruleLabelsAppender struct {
|
||||
app storage.SampleAppender
|
||||
labels model.LabelSet
|
||||
}
|
||||
|
||||
func (app ruleLabelsAppender) Append(s *model.Sample) {
|
||||
for ln, lv := range app.labels {
|
||||
if v, ok := s.Metric[ln]; ok && v != "" {
|
||||
s.Metric[model.ExportedLabelPrefix+ln] = v
|
||||
}
|
||||
s.Metric[ln] = lv
|
||||
}
|
||||
|
||||
app.app.Append(s)
|
||||
}
|
||||
|
||||
type honorLabelsAppender struct {
|
||||
app storage.SampleAppender
|
||||
labels model.LabelSet
|
||||
}
|
||||
|
||||
// Merges the sample's metric with the given labels if the label is not
|
||||
// already present in the metric.
|
||||
// This also considers labels explicitly set to the empty string.
|
||||
func (app honorLabelsAppender) Append(s *model.Sample) {
|
||||
for ln, lv := range app.labels {
|
||||
if _, ok := s.Metric[ln]; !ok {
|
||||
s.Metric[ln] = lv
|
||||
}
|
||||
}
|
||||
|
||||
app.app.Append(s)
|
||||
}
|
||||
|
||||
// Applies a set of relabel configurations to the sample's metric
|
||||
// before actually appending it.
|
||||
type relabelAppender struct {
|
||||
app storage.SampleAppender
|
||||
relabelings []*config.RelabelConfig
|
||||
}
|
||||
|
||||
func (app relabelAppender) Append(s *model.Sample) {
|
||||
labels, err := Relabel(model.LabelSet(s.Metric), app.relabelings...)
|
||||
if err != nil {
|
||||
log.Errorf("Error while relabeling metric %s: %s", s.Metric, err)
|
||||
return
|
||||
}
|
||||
// Check if the timeseries was dropped.
|
||||
if labels == nil {
|
||||
return
|
||||
}
|
||||
s.Metric = model.Metric(labels)
|
||||
|
||||
app.app.Append(s)
|
||||
}
|
||||
|
||||
// URL returns a copy of the target's URL.
|
||||
func (t *Target) URL() *url.URL {
|
||||
t.RLock()
|
||||
defer t.RUnlock()
|
||||
|
||||
u := &url.URL{}
|
||||
*u = *t.url
|
||||
return u
|
||||
|
|
Loading…
Reference in New Issue