diff --git a/retrieval/format/processor.go b/retrieval/format/processor.go index cff9012d4..150838dc9 100644 --- a/retrieval/format/processor.go +++ b/retrieval/format/processor.go @@ -16,6 +16,7 @@ package format import ( "github.com/prometheus/prometheus/model" "io" + "time" ) // Processor is responsible for decoding the actual message responses from @@ -23,5 +24,5 @@ import ( // to the results channel. type Processor interface { // Process performs the work on the input and closes the incoming stream. - Process(stream io.ReadCloser, baseLabels model.LabelSet, results chan Result) (err error) + Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) (err error) } diff --git a/retrieval/format/processor0_0_1.go b/retrieval/format/processor0_0_1.go index 062cb1c2a..043736909 100644 --- a/retrieval/format/processor0_0_1.go +++ b/retrieval/format/processor0_0_1.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/prometheus/utility" "io" "io/ioutil" + "time" ) const ( @@ -57,7 +58,7 @@ type entity001 []struct { } `json:"metric"` } -func (p *processor001) Process(stream io.ReadCloser, baseLabels model.LabelSet, results chan Result) (err error) { +func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) (err error) { // TODO(matt): Replace with plain-jane JSON unmarshalling. defer stream.Close() @@ -73,8 +74,6 @@ func (p *processor001) Process(stream io.ReadCloser, baseLabels model.LabelSet, return } - now := p.time.Now() - // TODO(matt): This outer loop is a great basis for parallelization. for _, entity := range entities { for _, value := range entity.Metric.Value { @@ -101,7 +100,7 @@ func (p *processor001) Process(stream io.ReadCloser, baseLabels model.LabelSet, sample := model.Sample{ Metric: metric, - Timestamp: now, + Timestamp: timestamp, Value: model.SampleValue(sampleValue), } @@ -136,7 +135,7 @@ func (p *processor001) Process(stream io.ReadCloser, baseLabels model.LabelSet, sample := model.Sample{ Metric: childMetric, - Timestamp: now, + Timestamp: timestamp, Value: model.SampleValue(individualValue), } diff --git a/retrieval/format/processor0_0_1_test.go b/retrieval/format/processor0_0_1_test.go index 75cc10c5f..8e0ba5ed7 100644 --- a/retrieval/format/processor0_0_1_test.go +++ b/retrieval/format/processor0_0_1_test.go @@ -21,6 +21,7 @@ import ( "io/ioutil" "strings" "testing" + "time" ) func testProcessor001Process(t test.Tester) { @@ -172,7 +173,7 @@ func testProcessor001Process(t test.Tester) { reader := strings.NewReader(scenario.in) - err := Processor001.Process(ioutil.NopCloser(reader), model.LabelSet{}, inputChannel) + err := Processor001.Process(ioutil.NopCloser(reader), time.Now(), model.LabelSet{}, inputChannel) if !test.ErrorEqual(scenario.err, err) { t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err) continue diff --git a/retrieval/target.go b/retrieval/target.go index d4a4f181a..21feba610 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -162,6 +162,8 @@ func (t *target) Scrape(earliest time.Time, results chan format.Result) (err err done <- true }() + now := time.Now() + var resp *http.Response // Don't shadow "err" from the enclosing function. resp, err = http.Get(t.Address()) if err != nil { @@ -182,7 +184,7 @@ func (t *target) Scrape(earliest time.Time, results chan format.Result) (err err baseLabels[baseLabel] = baseValue } - err = processor.Process(resp.Body, baseLabels, results) + err = processor.Process(resp.Body, now, baseLabels, results) if err != nil { return }