Refactor target scrape reporting.
This commit is contained in:
parent
27d71b08d1
commit
463dd3ea06
|
@ -357,6 +357,15 @@ func (t *Target) wrapAppender(app storage.SampleAppender) storage.SampleAppender
|
|||
return app
|
||||
}
|
||||
|
||||
// wrapReportingAppender wraps an appender for target status report samples.
|
||||
// It ignores any relabeling rules set for the target.
|
||||
func (t *Target) wrapReportingAppender(app storage.SampleAppender) storage.SampleAppender {
|
||||
return ruleLabelsAppender{
|
||||
SampleAppender: app,
|
||||
labels: t.Labels(),
|
||||
}
|
||||
}
|
||||
|
||||
// URL returns a copy of the target's URL.
|
||||
func (t *Target) URL() *url.URL {
|
||||
t.RLock()
|
||||
|
@ -484,13 +493,11 @@ const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client
|
|||
|
||||
func (t *Target) scrape(appender storage.SampleAppender) error {
|
||||
var (
|
||||
err error
|
||||
start = time.Now()
|
||||
labels = t.Labels()
|
||||
err error
|
||||
start = time.Now()
|
||||
)
|
||||
defer func(appender storage.SampleAppender) {
|
||||
t.status.setLastError(err)
|
||||
recordScrapeHealth(appender, start, labels, t.status.Health(), time.Since(start))
|
||||
t.report(appender, start, time.Since(start), err)
|
||||
}(appender)
|
||||
|
||||
t.RLock()
|
||||
|
@ -556,6 +563,38 @@ func (t *Target) scrape(appender storage.SampleAppender) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (t *Target) report(app storage.SampleAppender, start time.Time, duration time.Duration, err error) {
|
||||
t.status.setLastScrape(start)
|
||||
t.status.setLastError(err)
|
||||
|
||||
ts := model.TimeFromUnixNano(start.UnixNano())
|
||||
|
||||
var health model.SampleValue
|
||||
if err == nil {
|
||||
health = 1
|
||||
}
|
||||
|
||||
healthSample := &model.Sample{
|
||||
Metric: model.Metric{
|
||||
model.MetricNameLabel: scrapeHealthMetricName,
|
||||
},
|
||||
Timestamp: ts,
|
||||
Value: health,
|
||||
}
|
||||
durationSample := &model.Sample{
|
||||
Metric: model.Metric{
|
||||
model.MetricNameLabel: scrapeDurationMetricName,
|
||||
},
|
||||
Timestamp: ts,
|
||||
Value: model.SampleValue(float64(duration) / float64(time.Second)),
|
||||
}
|
||||
|
||||
app = t.wrapReportingAppender(app)
|
||||
|
||||
app.Append(healthSample)
|
||||
app.Append(durationSample)
|
||||
}
|
||||
|
||||
// Merges the ingested sample's metric with the label set. On a collision the
|
||||
// value of the ingested label is stored in a label prefixed with 'exported_'.
|
||||
type ruleLabelsAppender struct {
|
||||
|
@ -639,38 +678,3 @@ func (t *Target) MetaLabels() model.LabelSet {
|
|||
|
||||
return t.metaLabels.Clone()
|
||||
}
|
||||
|
||||
func recordScrapeHealth(
|
||||
sampleAppender storage.SampleAppender,
|
||||
timestamp time.Time,
|
||||
labels model.LabelSet,
|
||||
health TargetHealth,
|
||||
scrapeDuration time.Duration,
|
||||
) {
|
||||
healthMetric := make(model.Metric, len(labels)+1)
|
||||
durationMetric := make(model.Metric, len(labels)+1)
|
||||
|
||||
healthMetric[model.MetricNameLabel] = scrapeHealthMetricName
|
||||
durationMetric[model.MetricNameLabel] = scrapeDurationMetricName
|
||||
|
||||
for ln, lv := range labels {
|
||||
healthMetric[ln] = lv
|
||||
durationMetric[ln] = lv
|
||||
}
|
||||
|
||||
ts := model.TimeFromUnixNano(timestamp.UnixNano())
|
||||
|
||||
healthSample := &model.Sample{
|
||||
Metric: healthMetric,
|
||||
Timestamp: ts,
|
||||
Value: health.value(),
|
||||
}
|
||||
durationSample := &model.Sample{
|
||||
Metric: durationMetric,
|
||||
Timestamp: ts,
|
||||
Value: model.SampleValue(float64(scrapeDuration) / float64(time.Second)),
|
||||
}
|
||||
|
||||
sampleAppender.Append(healthSample)
|
||||
sampleAppender.Append(durationSample)
|
||||
}
|
||||
|
|
|
@ -91,6 +91,40 @@ func TestTargetOffset(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestTargetWrapReportingAppender(t *testing.T) {
|
||||
cfg := &config.ScrapeConfig{
|
||||
MetricRelabelConfigs: []*config.RelabelConfig{
|
||||
{}, {}, {},
|
||||
},
|
||||
}
|
||||
|
||||
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
|
||||
target.scrapeConfig = cfg
|
||||
app := &nopAppender{}
|
||||
|
||||
cfg.HonorLabels = false
|
||||
wrapped := target.wrapReportingAppender(app)
|
||||
|
||||
rl, ok := wrapped.(ruleLabelsAppender)
|
||||
if !ok {
|
||||
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
|
||||
}
|
||||
if rl.SampleAppender != app {
|
||||
t.Fatalf("Expected base appender but got %T", rl.SampleAppender)
|
||||
}
|
||||
|
||||
cfg.HonorLabels = true
|
||||
wrapped = target.wrapReportingAppender(app)
|
||||
|
||||
hl, ok := wrapped.(ruleLabelsAppender)
|
||||
if !ok {
|
||||
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
|
||||
}
|
||||
if hl.SampleAppender != app {
|
||||
t.Fatalf("Expected base appender but got %T", hl.SampleAppender)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTargetWrapAppender(t *testing.T) {
|
||||
cfg := &config.ScrapeConfig{
|
||||
MetricRelabelConfigs: []*config.RelabelConfig{
|
||||
|
@ -335,12 +369,13 @@ func TestTargetScrapeMetricRelabelConfigs(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestTargetRecordScrapeHealth(t *testing.T) {
|
||||
testTarget := newTestTarget("example.url:80", 0, model.LabelSet{model.JobLabel: "testjob"})
|
||||
var (
|
||||
testTarget = newTestTarget("example.url:80", 0, model.LabelSet{model.JobLabel: "testjob"})
|
||||
now = model.Now()
|
||||
appender = &collectResultAppender{}
|
||||
)
|
||||
|
||||
now := model.Now()
|
||||
appender := &collectResultAppender{}
|
||||
testTarget.status.setLastError(nil)
|
||||
recordScrapeHealth(appender, now.Time(), testTarget.Labels(), testTarget.status.Health(), 2*time.Second)
|
||||
testTarget.report(appender, now.Time(), 2*time.Second, nil)
|
||||
|
||||
result := appender.result
|
||||
|
||||
|
|
Loading…
Reference in New Issue