retrieval: avoid race conditions

This commit is contained in:
Fabian Reinartz 2015-07-08 21:27:52 +02:00
parent 1d6d39a9ed
commit d53cc7935d
1 changed files with 17 additions and 10 deletions

View File

@ -142,20 +142,17 @@ func (ts *TargetStatus) setLastError(err error) {
type Target struct {
// The status object for the target. It is only set once on initialization.
status *TargetStatus
// The HTTP client used to scrape the target's endpoint.
httpClient *http.Client
// Closing scraperStopping signals that scraping should stop.
scraperStopping chan struct{}
// Closing scraperStopped signals that scraping has been stopped.
scraperStopped chan struct{}
// Channel to buffer ingested samples.
ingestedSamples chan clientmodel.Samples
// Metric relabel configuration.
metricRelabelConfigs []*config.RelabelConfig
// Mutex protects the members below.
sync.RWMutex
// The HTTP client used to scrape the target's endpoint.
httpClient *http.Client
// url is the URL to be scraped. Its host is immutable.
url *url.URL
// Labels before any processing.
@ -169,6 +166,8 @@ type Target struct {
// Whether the target's labels have precedence over the base labels
// assigned by the scraping instance.
honorLabels bool
// Metric relabel configuration.
metricRelabelConfigs []*config.RelabelConfig
}
// NewTarget creates a reasonably configured target for querying.
@ -331,6 +330,14 @@ func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) {
start := time.Now()
baseLabels := t.BaseLabels()
t.RLock()
var (
honorLabels = t.honorLabels
httpClient = t.httpClient
metricRelabelConfigs = t.metricRelabelConfigs
)
t.RUnlock()
defer func() {
t.status.setLastError(err)
recordScrapeHealth(sampleAppender, clientmodel.TimestampFromTime(start), baseLabels, t.status.Health(), time.Since(start))
@ -342,7 +349,7 @@ func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) {
}
req.Header.Add("Accept", acceptHeader)
resp, err := t.httpClient.Do(req)
resp, err := httpClient.Do(req)
if err != nil {
return err
}
@ -368,7 +375,7 @@ func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) {
for samples := range t.ingestedSamples {
for _, s := range samples {
if t.honorLabels {
if honorLabels {
// Merge the metric with the baseLabels for labels not already set in the
// metric. This also considers labels explicitly set to the empty string.
for ln, lv := range baseLabels {
@ -387,10 +394,10 @@ func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) {
}
}
// Avoid the copy in Relabel if there are no configs.
if len(t.metricRelabelConfigs) > 0 {
labels, err := Relabel(clientmodel.LabelSet(s.Metric), t.metricRelabelConfigs...)
if len(metricRelabelConfigs) > 0 {
labels, err := Relabel(clientmodel.LabelSet(s.Metric), metricRelabelConfigs...)
if err != nil {
log.Errorf("Error while relabeling metric %s of instance %s: %s", s.Metric, t.url, err)
log.Errorf("Error while relabeling metric %s of instance %s: %s", s.Metric, req.URL, err)
continue
}
// Check if the timeseries was dropped.