Refactor and add unittests to scrape result handling.
This commit is contained in:
parent
b5ded43594
commit
06b9df65ec
|
@ -427,21 +427,11 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
|
|||
}
|
||||
|
||||
samples, err := sl.scraper.scrape(scrapeCtx, start)
|
||||
if err == nil {
|
||||
// Collect samples post-relabelling and label handling in a buffer.
|
||||
buf := &bufferAppender{buffer: make(model.Samples, 0, len(samples))}
|
||||
app := sl.mutator(buf)
|
||||
for _, sample := range samples {
|
||||
app.Append(sample)
|
||||
}
|
||||
|
||||
// Send samples to storage.
|
||||
sl.append(buf.buffer)
|
||||
} else if errc != nil {
|
||||
err = sl.processScrapeResult(samples, err, start)
|
||||
if err != nil && errc != nil {
|
||||
errc <- err
|
||||
}
|
||||
|
||||
sl.report(start, time.Since(start), len(samples), err)
|
||||
last = start
|
||||
} else {
|
||||
targetSkippedScrapes.WithLabelValues(interval.String()).Inc()
|
||||
|
@ -460,6 +450,23 @@ func (sl *scrapeLoop) stop() {
|
|||
<-sl.done
|
||||
}
|
||||
|
||||
func (sl *scrapeLoop) processScrapeResult(samples model.Samples, scrapeErr error, start time.Time) error {
|
||||
if scrapeErr == nil {
|
||||
// Collect samples post-relabelling and label handling in a buffer.
|
||||
buf := &bufferAppender{buffer: make(model.Samples, 0, len(samples))}
|
||||
app := sl.mutator(buf)
|
||||
for _, sample := range samples {
|
||||
app.Append(sample)
|
||||
}
|
||||
|
||||
// Send samples to storage.
|
||||
sl.append(buf.buffer)
|
||||
}
|
||||
|
||||
sl.report(start, time.Since(start), len(samples), scrapeErr)
|
||||
return scrapeErr
|
||||
}
|
||||
|
||||
func (sl *scrapeLoop) append(samples model.Samples) {
|
||||
var (
|
||||
numOutOfOrder = 0
|
||||
|
|
|
@ -299,6 +299,118 @@ func TestScrapePoolSampleAppender(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestScrapeLoopSampleProcessing(t *testing.T) {
|
||||
readSamples := model.Samples{
|
||||
{
|
||||
Metric: model.Metric{"__name__": "a_metric"},
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{"__name__": "b_metric"},
|
||||
},
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
scrapedSamples model.Samples
|
||||
scrapeError error
|
||||
metricRelabelConfigs []*config.RelabelConfig
|
||||
expectedReportedSamples model.Samples
|
||||
expectedIngestedSamplesCount int
|
||||
}{
|
||||
{
|
||||
scrapedSamples: readSamples,
|
||||
scrapeError: nil,
|
||||
metricRelabelConfigs: []*config.RelabelConfig{},
|
||||
expectedReportedSamples: model.Samples{
|
||||
{
|
||||
Metric: model.Metric{"__name__": "up"},
|
||||
Value: 1,
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{"__name__": "scrape_duration_seconds"},
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{"__name__": "scrape_samples_scraped"},
|
||||
Value: 2,
|
||||
},
|
||||
},
|
||||
expectedIngestedSamplesCount: 2,
|
||||
},
|
||||
{
|
||||
scrapedSamples: readSamples,
|
||||
scrapeError: nil,
|
||||
metricRelabelConfigs: []*config.RelabelConfig{
|
||||
{
|
||||
Action: config.RelabelDrop,
|
||||
SourceLabels: model.LabelNames{"__name__"},
|
||||
Regex: config.MustNewRegexp("a.*"),
|
||||
},
|
||||
},
|
||||
expectedReportedSamples: model.Samples{
|
||||
{
|
||||
Metric: model.Metric{"__name__": "up"},
|
||||
Value: 1,
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{"__name__": "scrape_duration_seconds"},
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{"__name__": "scrape_samples_scraped"},
|
||||
Value: 2,
|
||||
},
|
||||
},
|
||||
expectedIngestedSamplesCount: 1,
|
||||
},
|
||||
{
|
||||
scrapedSamples: model.Samples{},
|
||||
scrapeError: fmt.Errorf("error"),
|
||||
metricRelabelConfigs: []*config.RelabelConfig{},
|
||||
expectedReportedSamples: model.Samples{
|
||||
{
|
||||
Metric: model.Metric{"__name__": "up"},
|
||||
Value: 0,
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{"__name__": "scrape_duration_seconds"},
|
||||
},
|
||||
{
|
||||
Metric: model.Metric{"__name__": "scrape_samples_scraped"},
|
||||
Value: 0,
|
||||
},
|
||||
},
|
||||
expectedIngestedSamplesCount: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
ingestedSamples := &bufferAppender{buffer: model.Samples{}}
|
||||
reportedSamples := &bufferAppender{buffer: model.Samples{}}
|
||||
|
||||
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
|
||||
cfg := &config.ScrapeConfig{
|
||||
MetricRelabelConfigs: test.metricRelabelConfigs,
|
||||
}
|
||||
|
||||
sp := newScrapePool(context.Background(), cfg, ingestedSamples)
|
||||
|
||||
scraper := &testScraper{}
|
||||
sl := newScrapeLoop(context.Background(), scraper, ingestedSamples, sp.sampleMutator(target), reportedSamples).(*scrapeLoop)
|
||||
sl.processScrapeResult(test.scrapedSamples, test.scrapeError, time.Unix(0, 0))
|
||||
|
||||
// Ignore value of scrape_duration_seconds, as it's time dependant.
|
||||
reportedSamples.buffer[1].Value = 0
|
||||
|
||||
if !reflect.DeepEqual(reportedSamples.buffer, test.expectedReportedSamples) {
|
||||
t.Errorf("Reported samples did not match expected metrics")
|
||||
t.Errorf("Expected: %v", test.expectedReportedSamples)
|
||||
t.Fatalf("Got: %v", reportedSamples.buffer)
|
||||
}
|
||||
if test.expectedIngestedSamplesCount != len(ingestedSamples.buffer) {
|
||||
t.Fatalf("Ingested samples %d did not match expected value %d", len(ingestedSamples.buffer), test.expectedIngestedSamplesCount)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestScrapeLoopStop(t *testing.T) {
|
||||
scraper := &testScraper{}
|
||||
sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil)
|
||||
|
|
|
@ -284,7 +284,7 @@ type bufferAppender struct {
|
|||
buffer model.Samples
|
||||
}
|
||||
|
||||
func (app bufferAppender) Append(s *model.Sample) error {
|
||||
func (app *bufferAppender) Append(s *model.Sample) error {
|
||||
app.buffer = append(app.buffer, s)
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue