From dbc08d390e38fefc98e8be84b7088e5b7a4fc81f Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 18 May 2015 11:13:13 +0200 Subject: [PATCH 1/3] Move target status data into its own object --- retrieval/target.go | 126 +++++++++++++++++++++----------------- retrieval/target_test.go | 25 ++++---- web/templates/status.html | 10 +-- 3 files changed, 90 insertions(+), 71 deletions(-) diff --git a/retrieval/target.go b/retrieval/target.go index f0342bae2..8ad2d8f0c 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -110,12 +110,8 @@ const ( type Target interface { extraction.Ingester - // Return the last encountered scrape error, if any. - LastError() error - // Return the health of the target. - State() TargetState - // Return the last time a scrape was attempted. - LastScrape() time.Time + // Status returns the current status of the target. + Status() *TargetStatus // The URL to which the Target corresponds. Out of all of the available // points in this interface, this one is the best candidate to change given // the ways to express the endpoint. @@ -141,6 +137,53 @@ type Target interface { Update(*config.ScrapeConfig, clientmodel.LabelSet) } +// TargetStatus contains information about the current status of a scrape target. +type TargetStatus struct { + lastError error + lastScrape time.Time + state TargetState + + mu sync.RWMutex +} + +// LastError returns the error encountered during the last scrape. +func (ts *TargetStatus) LastError() error { + ts.mu.RLock() + defer ts.mu.RUnlock() + return ts.lastError +} + +// LastScrape returns the time of the last scrape. +func (ts *TargetStatus) LastScrape() time.Time { + ts.mu.RLock() + defer ts.mu.RUnlock() + return ts.lastScrape +} + +// State returns the last known health state of the target. +func (ts *TargetStatus) State() TargetState { + ts.mu.RLock() + defer ts.mu.RUnlock() + return ts.state +} + +func (ts *TargetStatus) setLastScrape(t time.Time) { + ts.mu.Lock() + defer ts.mu.Unlock() + ts.lastScrape = t +} + +func (ts *TargetStatus) setLastError(err error) { + ts.mu.Lock() + defer ts.mu.Unlock() + if err == nil { + ts.state = Healthy + } else { + ts.state = Unhealthy + } + ts.lastError = err +} + // target is a Target that refers to a singular HTTP or HTTPS endpoint. type target struct { // Closing scraperStopping signals that scraping should stop. @@ -150,6 +193,9 @@ type target struct { // Channel to buffer ingested samples. ingestedSamples chan clientmodel.Samples + // The status object for the target. It is only set once on initialization. + status *TargetStatus + // The HTTP client used to scrape the target's endpoint. httpClient *http.Client @@ -159,12 +205,6 @@ type target struct { url *url.URL // Any base labels that are added to this target and its metrics. baseLabels clientmodel.LabelSet - // The current health state of the target. - state TargetState - // The last encountered scrape error, if any. - lastError error - // The last time a scrape was attempted. - lastScrape time.Time // What is the deadline for the HTTP or HTTPS against this endpoint. deadline time.Duration // The time between two scrapes. @@ -177,6 +217,7 @@ func NewTarget(cfg *config.ScrapeConfig, baseLabels clientmodel.LabelSet) Target url: &url.URL{ Host: string(baseLabels[clientmodel.AddressLabel]), }, + status: &TargetStatus{}, scraperStopping: make(chan struct{}), scraperStopped: make(chan struct{}), } @@ -184,6 +225,11 @@ func NewTarget(cfg *config.ScrapeConfig, baseLabels clientmodel.LabelSet) Target return t } +// Status implements the Target interface. +func (t *target) Status() *TargetStatus { + return t.status +} + // Update overwrites settings in the target that are derived from the job config // it belongs to. func (t *target) Update(cfg *config.ScrapeConfig, baseLabels clientmodel.LabelSet) { @@ -256,9 +302,7 @@ func (t *target) RunScraper(sampleAppender storage.SampleAppender) { ticker := time.NewTicker(lastScrapeInterval) defer ticker.Stop() - t.Lock() // Writing t.lastScrape requires the lock. - t.lastScrape = time.Now() - t.Unlock() + t.status.setLastScrape(time.Now()) t.scrape(sampleAppender) // Explanation of the contraption below: @@ -277,12 +321,12 @@ func (t *target) RunScraper(sampleAppender storage.SampleAppender) { case <-t.scraperStopping: return case <-ticker.C: - t.Lock() - took := time.Since(t.lastScrape) - t.lastScrape = time.Now() + took := time.Since(t.status.LastScrape()) + t.status.setLastScrape(time.Now()) intervalStr := lastScrapeInterval.String() + t.Lock() // On changed scrape interval the new interval becomes effective // after the next scrape. if lastScrapeInterval != t.scrapeInterval { @@ -315,21 +359,14 @@ const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client func (t *target) scrape(sampleAppender storage.SampleAppender) (err error) { t.RLock() - timestamp := clientmodel.Now() + start := time.Now() - defer func(start time.Time) { - t.recordScrapeHealth(sampleAppender, timestamp, err == nil, time.Since(start)) + defer func() { t.RUnlock() - t.Lock() // Writing t.state and t.lastError requires the lock. - if err == nil { - t.state = Healthy - } else { - t.state = Unhealthy - } - t.lastError = err - t.Unlock() - }(time.Now()) + t.status.setLastError(err) + t.recordScrapeHealth(sampleAppender, clientmodel.TimestampFromTime(start), time.Since(start)) + }() req, err := http.NewRequest("GET", t.url.String(), nil) if err != nil { @@ -354,7 +391,7 @@ func (t *target) scrape(sampleAppender storage.SampleAppender) (err error) { t.ingestedSamples = make(chan clientmodel.Samples, ingestedSamplesCap) processOptions := &extraction.ProcessOptions{ - Timestamp: timestamp, + Timestamp: clientmodel.TimestampFromTime(start), } go func() { err = processor.ProcessSingle(resp.Body, t, processOptions) @@ -370,27 +407,6 @@ func (t *target) scrape(sampleAppender storage.SampleAppender) (err error) { return err } -// LastError implements Target. -func (t *target) LastError() error { - t.RLock() - defer t.RUnlock() - return t.lastError -} - -// State implements Target. -func (t *target) State() TargetState { - t.RLock() - defer t.RUnlock() - return t.state -} - -// LastScrape implements Target. -func (t *target) LastScrape() time.Time { - t.RLock() - defer t.RUnlock() - return t.lastScrape -} - // URL implements Target. func (t *target) URL() string { t.RLock() @@ -454,10 +470,10 @@ func (t *target) BaseLabelsWithoutJobAndInstance() clientmodel.LabelSet { return ls } -func (t *target) recordScrapeHealth(sampleAppender storage.SampleAppender, timestamp clientmodel.Timestamp, healthy bool, scrapeDuration time.Duration) { +func (t *target) recordScrapeHealth(sampleAppender storage.SampleAppender, timestamp clientmodel.Timestamp, scrapeDuration time.Duration) { healthMetric := clientmodel.Metric{} durationMetric := clientmodel.Metric{} - for label, value := range t.baseLabels { + for label, value := range t.BaseLabels() { healthMetric[label] = value durationMetric[label] = value } @@ -465,7 +481,7 @@ func (t *target) recordScrapeHealth(sampleAppender storage.SampleAppender, times durationMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeDurationMetricName) healthValue := clientmodel.SampleValue(0) - if healthy { + if t.status.State() == Healthy { healthValue = clientmodel.SampleValue(1) } diff --git a/retrieval/target_test.go b/retrieval/target_test.go index 857bd7dd8..7d2c1327d 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -57,8 +57,8 @@ func TestTargetScrapeUpdatesState(t *testing.T) { testTarget := newTestTarget("bad schema", 0, nil) testTarget.scrape(nopAppender{}) - if testTarget.state != Unhealthy { - t.Errorf("Expected target state %v, actual: %v", Unhealthy, testTarget.state) + if testTarget.status.State() != Unhealthy { + t.Errorf("Expected target state %v, actual: %v", Unhealthy, testTarget.status.State()) } } @@ -80,11 +80,11 @@ func TestTargetScrapeWithFullChannel(t *testing.T) { testTarget := newTestTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{"dings": "bums"}) testTarget.scrape(slowAppender{}) - if testTarget.state != Unhealthy { - t.Errorf("Expected target state %v, actual: %v", Unhealthy, testTarget.state) + if testTarget.status.State() != Unhealthy { + t.Errorf("Expected target state %v, actual: %v", Unhealthy, testTarget.status.State()) } - if testTarget.lastError != errIngestChannelFull { - t.Errorf("Expected target error %q, actual: %q", errIngestChannelFull, testTarget.lastError) + if testTarget.status.LastError() != errIngestChannelFull { + t.Errorf("Expected target error %q, actual: %q", errIngestChannelFull, testTarget.status.LastError()) } } @@ -93,7 +93,7 @@ func TestTargetRecordScrapeHealth(t *testing.T) { now := clientmodel.Now() appender := &collectResultAppender{} - testTarget.recordScrapeHealth(appender, now, true, 2*time.Second) + testTarget.recordScrapeHealth(appender, now, 2*time.Second) result := appender.result @@ -205,17 +205,17 @@ func TestTargetRunScraperScrapes(t *testing.T) { // Enough time for a scrape to happen. time.Sleep(2 * time.Millisecond) - if testTarget.lastScrape.IsZero() { + if testTarget.status.LastScrape().IsZero() { t.Errorf("Scrape hasn't occured.") } testTarget.StopScraper() // Wait for it to take effect. time.Sleep(2 * time.Millisecond) - last := testTarget.lastScrape + last := testTarget.status.LastScrape() // Enough time for a scrape to happen. time.Sleep(2 * time.Millisecond) - if testTarget.lastScrape != last { + if testTarget.status.LastScrape() != last { t.Errorf("Scrape occured after it was stopped.") } } @@ -249,7 +249,10 @@ func newTestTarget(targetURL string, deadline time.Duration, baseLabels clientmo Host: strings.TrimLeft(targetURL, "http://"), Path: "/metrics", }, - deadline: deadline, + deadline: deadline, + status: &TargetStatus{ + state: Healthy, + }, scrapeInterval: 1 * time.Millisecond, httpClient: utility.NewDeadlineClient(deadline), scraperStopping: make(chan struct{}), diff --git a/web/templates/status.html b/web/templates/status.html index c6e2de472..900470978 100644 --- a/web/templates/status.html +++ b/web/templates/status.html @@ -51,19 +51,19 @@ {{.URL}} - - {{.State}} + + {{.Status.State}} {{.BaseLabelsWithoutJobAndInstance}} - {{if .LastScrape.IsZero}}Never{{else}}{{since .LastScrape}} ago{{end}} + {{if .Status.LastScrape.IsZero}}Never{{else}}{{since .Status.LastScrape}} ago{{end}} - {{if .LastError}} - {{.LastError}} + {{if .Status.LastError}} + {{.Status.LastError}} {{end}} From 1a2d57b45ca6986c5c298407731044c356c91115 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 18 May 2015 12:16:25 +0200 Subject: [PATCH 2/3] Move template functionality out of target. The target implementation and interface contain methods only serving a specific purpose of the templates. They were moved to the template as they operate on more fundamental target data. --- retrieval/target.go | 73 +++++++++++--------------------------- retrieval/target_test.go | 7 ---- retrieval/targetmanager.go | 2 +- web/templates/status.html | 4 +-- web/web.go | 43 ++++++++++++++++++---- 5 files changed, 59 insertions(+), 70 deletions(-) diff --git a/retrieval/target.go b/retrieval/target.go index 8ad2d8f0c..aae723789 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -19,7 +19,6 @@ import ( "math/rand" "net/http" "net/url" - "os" "strings" "sync" "time" @@ -53,8 +52,6 @@ const ( var ( errIngestChannelFull = errors.New("ingestion channel full") - localhostRepresentations = []string{"127.0.0.1", "localhost"} - targetIntervalLength = prometheus.NewSummaryVec( prometheus.SummaryOpts{ Namespace: namespace, @@ -118,17 +115,11 @@ type Target interface { URL() string // Used to populate the `instance` label in metrics. InstanceIdentifier() string - // The URL as seen from other hosts. References to localhost are resolved - // to the address of the prometheus server. - GlobalURL() string // Return the labels describing the targets. These are the base labels // as well as internal labels. - Labels() clientmodel.LabelSet + fullLabels() clientmodel.LabelSet // Return the target's base labels. BaseLabels() clientmodel.LabelSet - // Return the target's base labels without job and instance label. That's - // useful for display purposes. - BaseLabelsWithoutJobAndInstance() clientmodel.LabelSet // Start scraping the target in regular intervals. RunScraper(storage.SampleAppender) // Stop scraping, synchronous. @@ -195,7 +186,6 @@ type target struct { // The status object for the target. It is only set once on initialization. status *TargetStatus - // The HTTP client used to scrape the target's endpoint. httpClient *http.Client @@ -419,66 +409,43 @@ func (t *target) InstanceIdentifier() string { return t.url.Host } -// GlobalURL implements Target. -func (t *target) GlobalURL() string { - url := t.URL() - - hostname, err := os.Hostname() - if err != nil { - glog.Warningf("Couldn't get hostname: %s, returning target.URL()", err) - return url - } - for _, localhostRepresentation := range localhostRepresentations { - url = strings.Replace(url, "//"+localhostRepresentation, "//"+hostname, 1) - } - return url -} - -// Labels implements Target. -func (t *target) Labels() clientmodel.LabelSet { +// fullLabels implements Target. +func (t *target) fullLabels() clientmodel.LabelSet { t.RLock() defer t.RUnlock() - ls := clientmodel.LabelSet{} + lset := make(clientmodel.LabelSet, len(t.baseLabels)+2) for ln, lv := range t.baseLabels { - ls[ln] = lv + lset[ln] = lv } - ls[clientmodel.MetricsPathLabel] = clientmodel.LabelValue(t.url.Path) - ls[clientmodel.AddressLabel] = clientmodel.LabelValue(t.url.Host) - return ls + lset[clientmodel.MetricsPathLabel] = clientmodel.LabelValue(t.url.Path) + lset[clientmodel.AddressLabel] = clientmodel.LabelValue(t.url.Host) + return lset } // BaseLabels implements Target. func (t *target) BaseLabels() clientmodel.LabelSet { t.RLock() defer t.RUnlock() - return t.baseLabels -} - -// BaseLabelsWithoutJobAndInstance implements Target. -// -// TODO(fabxc): This method does not have to be part of the interface. Implement this -// as a template filter func for the single use case. -func (t *target) BaseLabelsWithoutJobAndInstance() clientmodel.LabelSet { - t.RLock() - defer t.RUnlock() - ls := clientmodel.LabelSet{} + lset := make(clientmodel.LabelSet, len(t.baseLabels)) for ln, lv := range t.baseLabels { - if ln != clientmodel.JobLabel && ln != clientmodel.InstanceLabel { - ls[ln] = lv - } + lset[ln] = lv } - return ls + return lset } func (t *target) recordScrapeHealth(sampleAppender storage.SampleAppender, timestamp clientmodel.Timestamp, scrapeDuration time.Duration) { - healthMetric := clientmodel.Metric{} - durationMetric := clientmodel.Metric{} - for label, value := range t.BaseLabels() { + t.RLock() + healthMetric := make(clientmodel.Metric, len(t.baseLabels)+1) + durationMetric := make(clientmodel.Metric, len(t.baseLabels)+1) + + healthMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeHealthMetricName) + durationMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeDurationMetricName) + + for label, value := range t.baseLabels { healthMetric[label] = value durationMetric[label] = value } - healthMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeHealthMetricName) - durationMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeDurationMetricName) + t.RUnlock() healthValue := clientmodel.SampleValue(0) if t.status.State() == Healthy { diff --git a/retrieval/target_test.go b/retrieval/target_test.go index 7d2c1327d..a70837791 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -44,13 +44,6 @@ func TestBaseLabels(t *testing.T) { if !reflect.DeepEqual(want, got) { t.Errorf("want base labels %v, got %v", want, got) } - delete(want, clientmodel.JobLabel) - delete(want, clientmodel.InstanceLabel) - - got = target.BaseLabelsWithoutJobAndInstance() - if !reflect.DeepEqual(want, got) { - t.Errorf("want base labels %v, got %v", want, got) - } } func TestTargetScrapeUpdatesState(t *testing.T) { diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index cacc2126e..8b71790bc 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -215,7 +215,7 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf // to build up. wg.Add(1) go func(t Target) { - match.Update(cfg, t.Labels()) + match.Update(cfg, t.fullLabels()) wg.Done() }(tnew) newTargets[i] = match diff --git a/web/templates/status.html b/web/templates/status.html index 900470978..ba37f458b 100644 --- a/web/templates/status.html +++ b/web/templates/status.html @@ -48,7 +48,7 @@ {{range $pool}} - {{.URL}} + {{.URL}} @@ -56,7 +56,7 @@ - {{.BaseLabelsWithoutJobAndInstance}} + {{stripLabels .BaseLabels "job" "instance"}} {{if .Status.LastScrape.IsZero}}Never{{else}}{{since .Status.LastScrape}} ago{{end}} diff --git a/web/web.go b/web/web.go index bd53b0de5..1548046c3 100644 --- a/web/web.go +++ b/web/web.go @@ -29,10 +29,14 @@ import ( "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" + clientmodel "github.com/prometheus/client_golang/model" + "github.com/prometheus/prometheus/web/api" "github.com/prometheus/prometheus/web/blob" ) +var localhostRepresentations = []string{"127.0.0.1", "localhost"} + // Commandline flags. var ( listenAddress = flag.String("web.listen-address", ":9090", "Address to listen on for the web interface, API, and telemetry.") @@ -150,28 +154,53 @@ func getConsoles(pathPrefix string) string { return "" } -func getTemplate(name string, pathPrefix string) (t *template.Template, err error) { - t = template.New("_base") +func getTemplate(name string, pathPrefix string) (*template.Template, error) { + t := template.New("_base") + var err error t.Funcs(template.FuncMap{ "since": time.Since, "getConsoles": func() string { return getConsoles(pathPrefix) }, "pathPrefix": func() string { return pathPrefix }, + "stripLabels": func(lset clientmodel.LabelSet, labels ...clientmodel.LabelName) clientmodel.LabelSet { + for _, ln := range labels { + delete(lset, ln) + } + return lset + }, + "globalURL": func(url string) string { + hostname, err := os.Hostname() + if err != nil { + glog.Warningf("Couldn't get hostname: %s, returning target.URL()", err) + return url + } + for _, localhostRepresentation := range localhostRepresentations { + url = strings.Replace(url, "//"+localhostRepresentation, "//"+hostname, 1) + } + return url + }, }) + file, err := getTemplateFile("_base") if err != nil { - glog.Error("Could not read base template: ", err) + glog.Errorln("Could not read base template:", err) return nil, err } - t.Parse(file) + t, err = t.Parse(file) + if err != nil { + glog.Errorln("Could not parse base template:", err) + } file, err = getTemplateFile(name) if err != nil { - glog.Error("Could not read base template: ", err) + glog.Error("Could not read template %d: ", name, err) return nil, err } - t.Parse(file) - return + t, err = t.Parse(file) + if err != nil { + glog.Errorf("Could not parse template %s: %s", name, err) + } + return t, err } func executeTemplate(w http.ResponseWriter, name string, data interface{}, pathPrefix string) { From 385919a65a284c340fa471a75f23a6746555199d Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 18 May 2015 13:14:41 +0200 Subject: [PATCH 3/3] Avoid inter-component blocking if ingestion/scraping blocks. Appending to the storage can block for a long time. Timing out scrapes can also cause longer blocks. This commit avoids that those blocks affect other compnents than the target itself. Also the Target interface was removed. --- retrieval/target.go | 159 +++++++++++++------------------- retrieval/target_test.go | 39 ++++---- retrieval/targetmanager.go | 20 ++-- retrieval/targetmanager_test.go | 2 +- web/status.go | 14 +-- web/templates/status.html | 3 +- web/web.go | 2 +- 7 files changed, 102 insertions(+), 137 deletions(-) diff --git a/retrieval/target.go b/retrieval/target.go index aae723789..4600cefb6 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -67,72 +67,35 @@ func init() { prometheus.MustRegister(targetIntervalLength) } -// TargetState describes the state of a Target. -type TargetState int +// TargetHealth describes the health state of a target. +type TargetHealth int -func (t TargetState) String() string { +func (t TargetHealth) String() string { switch t { - case Unknown: + case HealthUnknown: return "UNKNOWN" - case Healthy: + case HealthGood: return "HEALTHY" - case Unhealthy: + case HealthBad: return "UNHEALTHY" } - panic("unknown state") } const ( // Unknown is the state of a Target before it is first scraped. - Unknown TargetState = iota + HealthUnknown TargetHealth = iota // Healthy is the state of a Target that has been successfully scraped. - Healthy + HealthGood // Unhealthy is the state of a Target that was scraped unsuccessfully. - Unhealthy + HealthBad ) -// A Target represents an endpoint that should be interrogated for metrics. -// -// The protocol described by this type will likely change in future iterations, -// as it offers no good support for aggregated targets and fan out. Thusly, -// it is likely that the current Target and target uses will be -// wrapped with some resolver type. -// -// For the future, the Target protocol will abstract away the exact means that -// metrics are retrieved and deserialized from the given instance to which it -// refers. -// -// Target implements extraction.Ingester. -type Target interface { - extraction.Ingester - - // Status returns the current status of the target. - Status() *TargetStatus - // The URL to which the Target corresponds. Out of all of the available - // points in this interface, this one is the best candidate to change given - // the ways to express the endpoint. - URL() string - // Used to populate the `instance` label in metrics. - InstanceIdentifier() string - // Return the labels describing the targets. These are the base labels - // as well as internal labels. - fullLabels() clientmodel.LabelSet - // Return the target's base labels. - BaseLabels() clientmodel.LabelSet - // Start scraping the target in regular intervals. - RunScraper(storage.SampleAppender) - // Stop scraping, synchronous. - StopScraper() - // Update the target's state. - Update(*config.ScrapeConfig, clientmodel.LabelSet) -} - // TargetStatus contains information about the current status of a scrape target. type TargetStatus struct { lastError error lastScrape time.Time - state TargetState + health TargetHealth mu sync.RWMutex } @@ -151,11 +114,11 @@ func (ts *TargetStatus) LastScrape() time.Time { return ts.lastScrape } -// State returns the last known health state of the target. -func (ts *TargetStatus) State() TargetState { +// Health returns the last known health state of the target. +func (ts *TargetStatus) Health() TargetHealth { ts.mu.RLock() defer ts.mu.RUnlock() - return ts.state + return ts.health } func (ts *TargetStatus) setLastScrape(t time.Time) { @@ -168,15 +131,20 @@ func (ts *TargetStatus) setLastError(err error) { ts.mu.Lock() defer ts.mu.Unlock() if err == nil { - ts.state = Healthy + ts.health = HealthGood } else { - ts.state = Unhealthy + ts.health = HealthBad } ts.lastError = err } -// target is a Target that refers to a singular HTTP or HTTPS endpoint. -type target struct { +// Target refers to a singular HTTP or HTTPS endpoint. +type Target struct { + // The status object for the target. It is only set once on initialization. + status *TargetStatus + // The HTTP client used to scrape the target's endpoint. + httpClient *http.Client + // Closing scraperStopping signals that scraping should stop. scraperStopping chan struct{} // Closing scraperStopped signals that scraping has been stopped. @@ -184,14 +152,9 @@ type target struct { // Channel to buffer ingested samples. ingestedSamples chan clientmodel.Samples - // The status object for the target. It is only set once on initialization. - status *TargetStatus - // The HTTP client used to scrape the target's endpoint. - httpClient *http.Client - // Mutex protects the members below. sync.RWMutex - + // url is the URL to be scraped. Its host is immutable. url *url.URL // Any base labels that are added to this target and its metrics. baseLabels clientmodel.LabelSet @@ -202,8 +165,8 @@ type target struct { } // NewTarget creates a reasonably configured target for querying. -func NewTarget(cfg *config.ScrapeConfig, baseLabels clientmodel.LabelSet) Target { - t := &target{ +func NewTarget(cfg *config.ScrapeConfig, baseLabels clientmodel.LabelSet) *Target { + t := &Target{ url: &url.URL{ Host: string(baseLabels[clientmodel.AddressLabel]), }, @@ -215,14 +178,14 @@ func NewTarget(cfg *config.ScrapeConfig, baseLabels clientmodel.LabelSet) Target return t } -// Status implements the Target interface. -func (t *target) Status() *TargetStatus { +// Status returns the status of the target. +func (t *Target) Status() *TargetStatus { return t.status } // Update overwrites settings in the target that are derived from the job config // it belongs to. -func (t *target) Update(cfg *config.ScrapeConfig, baseLabels clientmodel.LabelSet) { +func (t *Target) Update(cfg *config.ScrapeConfig, baseLabels clientmodel.LabelSet) { t.Lock() defer t.Unlock() @@ -248,12 +211,15 @@ func (t *target) Update(cfg *config.ScrapeConfig, baseLabels clientmodel.LabelSe } } -func (t *target) String() string { +func (t *Target) String() string { return t.url.Host } -// Ingest implements Target and extraction.Ingester. -func (t *target) Ingest(s clientmodel.Samples) error { +// Ingest implements an extraction.Ingester. +func (t *Target) Ingest(s clientmodel.Samples) error { + t.RLock() + deadline := t.deadline + t.RUnlock() // Since the regular case is that ingestedSamples is ready to receive, // first try without setting a timeout so that we don't need to allocate // a timer most of the time. @@ -264,14 +230,17 @@ func (t *target) Ingest(s clientmodel.Samples) error { select { case t.ingestedSamples <- s: return nil - case <-time.After(t.deadline / 10): + case <-time.After(deadline / 10): return errIngestChannelFull } } } +// Ensure that Target implements extraction.Ingester at compile time. +var _ extraction.Ingester = (*Target)(nil) + // RunScraper implements Target. -func (t *target) RunScraper(sampleAppender storage.SampleAppender) { +func (t *Target) RunScraper(sampleAppender storage.SampleAppender) { defer close(t.scraperStopped) t.RLock() @@ -316,7 +285,7 @@ func (t *target) RunScraper(sampleAppender storage.SampleAppender) { intervalStr := lastScrapeInterval.String() - t.Lock() + t.RLock() // On changed scrape interval the new interval becomes effective // after the next scrape. if lastScrapeInterval != t.scrapeInterval { @@ -324,7 +293,7 @@ func (t *target) RunScraper(sampleAppender storage.SampleAppender) { ticker = time.NewTicker(t.scrapeInterval) lastScrapeInterval = t.scrapeInterval } - t.Unlock() + t.RUnlock() targetIntervalLength.WithLabelValues(intervalStr).Observe( float64(took) / float64(time.Second), // Sub-second precision. @@ -336,7 +305,7 @@ func (t *target) RunScraper(sampleAppender storage.SampleAppender) { } // StopScraper implements Target. -func (t *target) StopScraper() { +func (t *Target) StopScraper() { glog.V(1).Infof("Stopping scraper for target %v...", t) close(t.scraperStopping) @@ -347,18 +316,16 @@ func (t *target) StopScraper() { const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1` -func (t *target) scrape(sampleAppender storage.SampleAppender) (err error) { - t.RLock() +func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) { start := time.Now() + baseLabels := t.BaseLabels() defer func() { - t.RUnlock() - t.status.setLastError(err) - t.recordScrapeHealth(sampleAppender, clientmodel.TimestampFromTime(start), time.Since(start)) + recordScrapeHealth(sampleAppender, clientmodel.TimestampFromTime(start), baseLabels, t.status.Health(), time.Since(start)) }() - req, err := http.NewRequest("GET", t.url.String(), nil) + req, err := http.NewRequest("GET", t.URL(), nil) if err != nil { panic(err) } @@ -390,7 +357,7 @@ func (t *target) scrape(sampleAppender storage.SampleAppender) (err error) { for samples := range t.ingestedSamples { for _, s := range samples { - s.Metric.MergeFromLabelSet(t.baseLabels, clientmodel.ExporterLabelPrefix) + s.Metric.MergeFromLabelSet(baseLabels, clientmodel.ExporterLabelPrefix) sampleAppender.Append(s) } } @@ -398,19 +365,19 @@ func (t *target) scrape(sampleAppender storage.SampleAppender) (err error) { } // URL implements Target. -func (t *target) URL() string { +func (t *Target) URL() string { t.RLock() defer t.RUnlock() return t.url.String() } -// InstanceIdentifier implements Target. -func (t *target) InstanceIdentifier() string { +// InstanceIdentifier returns the identifier for the target. +func (t *Target) InstanceIdentifier() string { return t.url.Host } -// fullLabels implements Target. -func (t *target) fullLabels() clientmodel.LabelSet { +// fullLabels returns the base labels plus internal labels defining the target. +func (t *Target) fullLabels() clientmodel.LabelSet { t.RLock() defer t.RUnlock() lset := make(clientmodel.LabelSet, len(t.baseLabels)+2) @@ -422,8 +389,8 @@ func (t *target) fullLabels() clientmodel.LabelSet { return lset } -// BaseLabels implements Target. -func (t *target) BaseLabels() clientmodel.LabelSet { +// BaseLabels returns a copy of the target's base labels. +func (t *Target) BaseLabels() clientmodel.LabelSet { t.RLock() defer t.RUnlock() lset := make(clientmodel.LabelSet, len(t.baseLabels)) @@ -433,22 +400,26 @@ func (t *target) BaseLabels() clientmodel.LabelSet { return lset } -func (t *target) recordScrapeHealth(sampleAppender storage.SampleAppender, timestamp clientmodel.Timestamp, scrapeDuration time.Duration) { - t.RLock() - healthMetric := make(clientmodel.Metric, len(t.baseLabels)+1) - durationMetric := make(clientmodel.Metric, len(t.baseLabels)+1) +func recordScrapeHealth( + sampleAppender storage.SampleAppender, + timestamp clientmodel.Timestamp, + baseLabels clientmodel.LabelSet, + health TargetHealth, + scrapeDuration time.Duration, +) { + healthMetric := make(clientmodel.Metric, len(baseLabels)+1) + durationMetric := make(clientmodel.Metric, len(baseLabels)+1) healthMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeHealthMetricName) durationMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeDurationMetricName) - for label, value := range t.baseLabels { + for label, value := range baseLabels { healthMetric[label] = value durationMetric[label] = value } - t.RUnlock() healthValue := clientmodel.SampleValue(0) - if t.status.State() == Healthy { + if health == HealthGood { healthValue = clientmodel.SampleValue(1) } diff --git a/retrieval/target_test.go b/retrieval/target_test.go index a70837791..6a411f86a 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -29,10 +29,6 @@ import ( "github.com/prometheus/prometheus/utility" ) -func TestTargetInterface(t *testing.T) { - var _ Target = &target{} -} - func TestBaseLabels(t *testing.T) { target := newTestTarget("example.com:80", 0, clientmodel.LabelSet{"job": "some_job", "foo": "bar"}) want := clientmodel.LabelSet{ @@ -50,8 +46,8 @@ func TestTargetScrapeUpdatesState(t *testing.T) { testTarget := newTestTarget("bad schema", 0, nil) testTarget.scrape(nopAppender{}) - if testTarget.status.State() != Unhealthy { - t.Errorf("Expected target state %v, actual: %v", Unhealthy, testTarget.status.State()) + if testTarget.status.Health() != HealthBad { + t.Errorf("Expected target state %v, actual: %v", HealthBad, testTarget.status.Health()) } } @@ -73,8 +69,8 @@ func TestTargetScrapeWithFullChannel(t *testing.T) { testTarget := newTestTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{"dings": "bums"}) testTarget.scrape(slowAppender{}) - if testTarget.status.State() != Unhealthy { - t.Errorf("Expected target state %v, actual: %v", Unhealthy, testTarget.status.State()) + if testTarget.status.Health() != HealthBad { + t.Errorf("Expected target state %v, actual: %v", HealthBad, testTarget.status.Health()) } if testTarget.status.LastError() != errIngestChannelFull { t.Errorf("Expected target error %q, actual: %q", errIngestChannelFull, testTarget.status.LastError()) @@ -86,7 +82,8 @@ func TestTargetRecordScrapeHealth(t *testing.T) { now := clientmodel.Now() appender := &collectResultAppender{} - testTarget.recordScrapeHealth(appender, now, 2*time.Second) + testTarget.status.setLastError(nil) + recordScrapeHealth(appender, now, testTarget.BaseLabels(), testTarget.status.Health(), 2*time.Second) result := appender.result @@ -138,13 +135,13 @@ func TestTargetScrapeTimeout(t *testing.T) { ) defer server.Close() - var testTarget Target = newTestTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{}) + testTarget := newTestTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{}) appender := nopAppender{} // scrape once without timeout signal <- true - if err := testTarget.(*target).scrape(appender); err != nil { + if err := testTarget.scrape(appender); err != nil { t.Fatal(err) } @@ -153,12 +150,12 @@ func TestTargetScrapeTimeout(t *testing.T) { // now scrape again signal <- true - if err := testTarget.(*target).scrape(appender); err != nil { + if err := testTarget.scrape(appender); err != nil { t.Fatal(err) } // now timeout - if err := testTarget.(*target).scrape(appender); err == nil { + if err := testTarget.scrape(appender); err == nil { t.Fatal("expected scrape to timeout") } else { signal <- true // let handler continue @@ -166,7 +163,7 @@ func TestTargetScrapeTimeout(t *testing.T) { // now scrape again without timeout signal <- true - if err := testTarget.(*target).scrape(appender); err != nil { + if err := testTarget.scrape(appender); err != nil { t.Fatal(err) } } @@ -224,28 +221,26 @@ func BenchmarkScrape(b *testing.B) { ) defer server.Close() - var testTarget Target = newTestTarget(server.URL, 100*time.Millisecond, clientmodel.LabelSet{"dings": "bums"}) + testTarget := newTestTarget(server.URL, 100*time.Millisecond, clientmodel.LabelSet{"dings": "bums"}) appender := nopAppender{} b.ResetTimer() for i := 0; i < b.N; i++ { - if err := testTarget.(*target).scrape(appender); err != nil { + if err := testTarget.scrape(appender); err != nil { b.Fatal(err) } } } -func newTestTarget(targetURL string, deadline time.Duration, baseLabels clientmodel.LabelSet) *target { - t := &target{ +func newTestTarget(targetURL string, deadline time.Duration, baseLabels clientmodel.LabelSet) *Target { + t := &Target{ url: &url.URL{ Scheme: "http", Host: strings.TrimLeft(targetURL, "http://"), Path: "/metrics", }, - deadline: deadline, - status: &TargetStatus{ - state: Healthy, - }, + deadline: deadline, + status: &TargetStatus{}, scrapeInterval: 1 * time.Millisecond, httpClient: utility.NewDeadlineClient(deadline), scraperStopping: make(chan struct{}), diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 8b71790bc..7763a5604 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -56,7 +56,7 @@ type TargetManager struct { running bool // Targets by their source ID. - targets map[string][]Target + targets map[string][]*Target // Providers by the scrape configs they are derived from. providers map[*config.ScrapeConfig][]TargetProvider } @@ -65,7 +65,7 @@ type TargetManager struct { func NewTargetManager(sampleAppender storage.SampleAppender) *TargetManager { tm := &TargetManager{ sampleAppender: sampleAppender, - targets: make(map[string][]Target), + targets: make(map[string][]*Target), } return tm } @@ -165,7 +165,7 @@ func (tm *TargetManager) removeTargets(f func(string) bool) { } wg.Add(len(targets)) for _, target := range targets { - go func(t Target) { + go func(t *Target) { t.StopScraper() wg.Done() }(target) @@ -197,7 +197,7 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf // Replace the old targets with the new ones while keeping the state // of intersecting targets. for i, tnew := range newTargets { - var match Target + var match *Target for j, told := range oldTargets { if told == nil { continue @@ -214,7 +214,7 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf // Updating is blocked during a scrape. We don't want those wait times // to build up. wg.Add(1) - go func(t Target) { + go func(t *Target) { match.Update(cfg, t.fullLabels()) wg.Done() }(tnew) @@ -227,7 +227,7 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf for _, told := range oldTargets { if told != nil { wg.Add(1) - go func(t Target) { + go func(t *Target) { t.StopScraper() wg.Done() }(told) @@ -250,11 +250,11 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf } // Pools returns the targets currently being scraped bucketed by their job name. -func (tm *TargetManager) Pools() map[string][]Target { +func (tm *TargetManager) Pools() map[string][]*Target { tm.m.RLock() defer tm.m.RUnlock() - pools := map[string][]Target{} + pools := map[string][]*Target{} for _, ts := range tm.targets { for _, t := range ts { @@ -287,11 +287,11 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) { } // targetsFromGroup builds targets based on the given TargetGroup and config. -func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]Target, error) { +func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) { tm.m.RLock() defer tm.m.RUnlock() - targets := make([]Target, 0, len(tg.Targets)) + targets := make([]*Target, 0, len(tg.Targets)) for i, labels := range tg.Targets { addr := string(labels[clientmodel.AddressLabel]) // If no port was provided, infer it based on the used scheme. diff --git a/retrieval/targetmanager_test.go b/retrieval/targetmanager_test.go index f6aea2a40..aaa251539 100644 --- a/retrieval/targetmanager_test.go +++ b/retrieval/targetmanager_test.go @@ -45,7 +45,7 @@ func TestTargetManagerChan(t *testing.T) { providers: map[*config.ScrapeConfig][]TargetProvider{ testJob1: []TargetProvider{prov1}, }, - targets: make(map[string][]Target), + targets: make(map[string][]*Target), } go targetManager.Run() defer targetManager.Stop() diff --git a/web/status.go b/web/status.go index 6840505b9..f536f5561 100644 --- a/web/status.go +++ b/web/status.go @@ -32,18 +32,18 @@ type PrometheusStatusHandler struct { Flags map[string]string RuleManager *rules.Manager - TargetPools func() map[string][]retrieval.Target + TargetPools func() map[string][]*retrieval.Target Birth time.Time PathPrefix string } -// TargetStateToClass returns a map of TargetState to the name of a Bootstrap CSS class. -func (h *PrometheusStatusHandler) TargetStateToClass() map[retrieval.TargetState]string { - return map[retrieval.TargetState]string{ - retrieval.Unknown: "warning", - retrieval.Healthy: "success", - retrieval.Unhealthy: "danger", +// TargetHealthToClass returns a map of TargetHealth to the name of a Bootstrap CSS class. +func (h *PrometheusStatusHandler) TargetHealthToClass() map[retrieval.TargetHealth]string { + return map[retrieval.TargetHealth]string{ + retrieval.HealthUnknown: "warning", + retrieval.HealthBad: "success", + retrieval.HealthGood: "danger", } } diff --git a/web/templates/status.html b/web/templates/status.html index ba37f458b..c9609e786 100644 --- a/web/templates/status.html +++ b/web/templates/status.html @@ -32,7 +32,6 @@

Targets

- {{$stateToClass := .TargetStateToClass}} {{range $job, $pool := call .TargetPools}} @@ -51,7 +50,7 @@ {{.URL}} diff --git a/web/web.go b/web/web.go index 1548046c3..ff0917b2d 100644 --- a/web/web.go +++ b/web/web.go @@ -193,7 +193,7 @@ func getTemplate(name string, pathPrefix string) (*template.Template, error) { file, err = getTemplateFile(name) if err != nil { - glog.Error("Could not read template %d: ", name, err) + glog.Error("Could not read template %s: %s", name, err) return nil, err } t, err = t.Parse(file)
{{$job}}
- + {{.Status.State}}