Cleanup internal target data

This commit is contained in:
Fabian Reinartz 2016-02-12 15:43:27 +01:00
parent e26e4b6e89
commit 65eba080a0
2 changed files with 167 additions and 144 deletions

View File

@ -119,6 +119,7 @@ type TargetStatus struct {
func (ts *TargetStatus) LastError() error {
ts.mu.RLock()
defer ts.mu.RUnlock()
return ts.lastError
}
@ -126,6 +127,7 @@ func (ts *TargetStatus) LastError() error {
func (ts *TargetStatus) LastScrape() time.Time {
ts.mu.RLock()
defer ts.mu.RUnlock()
return ts.lastScrape
}
@ -133,18 +135,21 @@ func (ts *TargetStatus) LastScrape() time.Time {
func (ts *TargetStatus) Health() TargetHealth {
ts.mu.RLock()
defer ts.mu.RUnlock()
return ts.health
}
func (ts *TargetStatus) setLastScrape(t time.Time) {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.lastScrape = t
}
func (ts *TargetStatus) setLastError(err error) {
ts.mu.Lock()
defer ts.mu.Unlock()
if err == nil {
ts.health = HealthGood
} else {
@ -164,37 +169,26 @@ type Target struct {
// Mutex protects the members below.
sync.RWMutex
// The HTTP client used to scrape the target's endpoint.
httpClient *http.Client
// url is the URL to be scraped. Its host is immutable.
url *url.URL
scrapeConfig *config.ScrapeConfig
// Labels before any processing.
metaLabels model.LabelSet
// Any base labels that are added to this target and its metrics.
baseLabels model.LabelSet
// Internal labels, such as scheme.
internalLabels model.LabelSet
// The time between two scrapes.
scrapeInterval time.Duration
// Whether the target's labels have precedence over the base labels
// assigned by the scraping instance.
honorLabels bool
// Metric relabel configuration.
metricRelabelConfigs []*config.RelabelConfig
labels model.LabelSet
// The HTTP client used to scrape the target's endpoint.
httpClient *http.Client
}
// NewTarget creates a reasonably configured target for querying.
func NewTarget(cfg *config.ScrapeConfig, baseLabels, metaLabels model.LabelSet) (*Target, error) {
func NewTarget(cfg *config.ScrapeConfig, labels, metaLabels model.LabelSet) (*Target, error) {
t := &Target{
url: &url.URL{
Scheme: string(baseLabels[model.SchemeLabel]),
Host: string(baseLabels[model.AddressLabel]),
},
status: &TargetStatus{},
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
}
err := t.Update(cfg, baseLabels, metaLabels)
err := t.Update(cfg, labels, metaLabels)
return t, err
}
@ -207,55 +201,21 @@ func (t *Target) Status() *TargetStatus {
// it belongs to.
func (t *Target) Update(cfg *config.ScrapeConfig, baseLabels, metaLabels model.LabelSet) error {
t.Lock()
defer t.Unlock()
httpClient, err := newHTTPClient(cfg)
if err != nil {
return fmt.Errorf("cannot create HTTP client: %v", err)
}
t.httpClient = httpClient
t.url.Scheme = string(baseLabels[model.SchemeLabel])
t.url.Path = string(baseLabels[model.MetricsPathLabel])
t.internalLabels = model.LabelSet{}
t.internalLabels[model.SchemeLabel] = baseLabels[model.SchemeLabel]
t.internalLabels[model.MetricsPathLabel] = baseLabels[model.MetricsPathLabel]
t.internalLabels[model.AddressLabel] = model.LabelValue(t.url.Host)
params := url.Values{}
for k, v := range cfg.Params {
params[k] = make([]string, len(v))
copy(params[k], v)
}
for k, v := range baseLabels {
if strings.HasPrefix(string(k), model.ParamLabelPrefix) {
if len(params[string(k[len(model.ParamLabelPrefix):])]) > 0 {
params[string(k[len(model.ParamLabelPrefix):])][0] = string(v)
} else {
params[string(k[len(model.ParamLabelPrefix):])] = []string{string(v)}
}
t.internalLabels[model.ParamLabelPrefix+k[len(model.ParamLabelPrefix):]] = v
}
}
t.url.RawQuery = params.Encode()
t.scrapeInterval = time.Duration(cfg.ScrapeInterval)
t.honorLabels = cfg.HonorLabels
t.scrapeConfig = cfg
t.labels = baseLabels
t.metaLabels = metaLabels
t.baseLabels = model.LabelSet{}
// All remaining internal labels will not be part of the label set.
for name, val := range baseLabels {
if !strings.HasPrefix(string(name), model.ReservedLabelPrefix) {
t.baseLabels[name] = val
}
t.Unlock()
httpClient, err := t.client()
if err != nil {
return fmt.Errorf("cannot create HTTP client: %s", err)
}
if _, ok := t.baseLabels[model.InstanceLabel]; !ok {
t.baseLabels[model.InstanceLabel] = model.LabelValue(t.InstanceIdentifier())
}
t.metricRelabelConfigs = cfg.MetricRelabelConfigs
t.Lock()
t.httpClient = httpClient
t.Unlock()
return nil
}
@ -304,16 +264,105 @@ func newHTTPClient(cfg *config.ScrapeConfig) (*http.Client, error) {
}
func (t *Target) String() string {
return t.url.Host
return t.host()
}
func (t *Target) client() (*http.Client, error) {
t.RLock()
defer t.RUnlock()
return newHTTPClient(t.scrapeConfig)
}
func (t *Target) interval() time.Duration {
t.RLock()
defer t.RUnlock()
return time.Duration(t.scrapeConfig.ScrapeInterval)
}
func (t *Target) timeout() time.Duration {
t.RLock()
defer t.RUnlock()
return time.Duration(t.scrapeConfig.ScrapeTimeout)
}
func (t *Target) scheme() string {
t.RLock()
defer t.RUnlock()
return string(t.labels[model.SchemeLabel])
}
func (t *Target) host() string {
t.RLock()
defer t.RUnlock()
return string(t.labels[model.AddressLabel])
}
func (t *Target) path() string {
t.RLock()
defer t.RUnlock()
return string(t.labels[model.MetricsPathLabel])
}
// URL returns a copy of the target's URL.
func (t *Target) URL() *url.URL {
t.RLock()
defer t.RUnlock()
params := url.Values{}
for k, v := range t.scrapeConfig.Params {
params[k] = make([]string, len(v))
copy(params[k], v)
}
for k, v := range t.labels {
if !strings.HasPrefix(string(k), model.ParamLabelPrefix) {
continue
}
ks := string(k[len(model.ParamLabelPrefix):])
if len(params[ks]) > 0 {
params[ks][0] = string(v)
} else {
params[ks] = []string{string(v)}
}
}
return &url.URL{
Scheme: string(t.labels[model.SchemeLabel]),
Host: string(t.labels[model.AddressLabel]),
Path: string(t.labels[model.MetricsPathLabel]),
RawQuery: params.Encode(),
}
}
// InstanceIdentifier returns the identifier for the target.
func (t *Target) InstanceIdentifier() string {
return t.host()
}
func (t *Target) fullLabels() model.LabelSet {
t.RLock()
defer t.RUnlock()
lset := t.labels.Clone()
if _, ok := lset[model.InstanceLabel]; !ok {
lset[model.InstanceLabel] = t.labels[model.AddressLabel]
}
return lset
}
// RunScraper implements Target.
func (t *Target) RunScraper(sampleAppender storage.SampleAppender) {
defer close(t.scraperStopped)
t.RLock()
lastScrapeInterval := t.scrapeInterval
t.RUnlock()
lastScrapeInterval := t.interval()
log.Debugf("Starting scraper for target %v...", t)
@ -353,15 +402,13 @@ func (t *Target) RunScraper(sampleAppender storage.SampleAppender) {
intervalStr := lastScrapeInterval.String()
t.RLock()
// On changed scrape interval the new interval becomes effective
// after the next scrape.
if lastScrapeInterval != t.scrapeInterval {
if iv := t.interval(); iv != lastScrapeInterval {
ticker.Stop()
ticker = time.NewTicker(t.scrapeInterval)
lastScrapeInterval = t.scrapeInterval
ticker = time.NewTicker(iv)
lastScrapeInterval = iv
}
t.RUnlock()
targetIntervalLength.WithLabelValues(intervalStr).Observe(
float64(took) / float64(time.Second), // Sub-second precision.
@ -389,27 +436,28 @@ func (t *Target) StopScraper() {
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`
func (t *Target) scrape(appender storage.SampleAppender) (err error) {
start := time.Now()
baseLabels := t.BaseLabels()
func (t *Target) scrape(appender storage.SampleAppender) error {
var (
err error
start = time.Now()
baseLabels = t.BaseLabels()
)
defer func(appender storage.SampleAppender) {
t.status.setLastError(err)
recordScrapeHealth(appender, start, baseLabels, t.status.Health(), time.Since(start))
}(appender)
t.RLock()
// The relabelAppender has to be inside the label-modifying appenders
// so the relabeling rules are applied to the correct label set.
if len(t.metricRelabelConfigs) > 0 {
if len(t.scrapeConfig.MetricRelabelConfigs) > 0 {
appender = relabelAppender{
SampleAppender: appender,
relabelings: t.metricRelabelConfigs,
relabelings: t.scrapeConfig.MetricRelabelConfigs,
}
}
if t.honorLabels {
if t.scrapeConfig.HonorLabels {
appender = honorLabelsAppender{
SampleAppender: appender,
labels: baseLabels,
@ -422,7 +470,6 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) {
}
httpClient := t.httpClient
t.RUnlock()
req, err := http.NewRequest("GET", t.URL().String(), nil)
@ -538,43 +585,22 @@ func (app relabelAppender) Append(s *model.Sample) error {
return app.SampleAppender.Append(s)
}
// URL returns a copy of the target's URL.
func (t *Target) URL() *url.URL {
t.RLock()
defer t.RUnlock()
u := &url.URL{}
*u = *t.url
return u
}
// InstanceIdentifier returns the identifier for the target.
func (t *Target) InstanceIdentifier() string {
return t.url.Host
}
// fullLabels returns the base labels plus internal labels defining the target.
func (t *Target) fullLabels() model.LabelSet {
t.RLock()
defer t.RUnlock()
lset := make(model.LabelSet, len(t.baseLabels)+len(t.internalLabels))
for ln, lv := range t.baseLabels {
lset[ln] = lv
}
for k, v := range t.internalLabels {
lset[k] = v
}
return lset
}
// BaseLabels returns a copy of the target's base labels.
func (t *Target) BaseLabels() model.LabelSet {
t.RLock()
defer t.RUnlock()
lset := make(model.LabelSet, len(t.baseLabels))
for ln, lv := range t.baseLabels {
lset[ln] = lv
lset := make(model.LabelSet, len(t.labels))
for ln, lv := range t.labels {
if !strings.HasPrefix(string(ln), model.ReservedLabelPrefix) {
lset[ln] = lv
}
}
if _, ok := lset[model.InstanceLabel]; !ok {
lset[model.InstanceLabel] = t.labels[model.AddressLabel]
}
return lset
}
@ -582,11 +608,8 @@ func (t *Target) BaseLabels() model.LabelSet {
func (t *Target) MetaLabels() model.LabelSet {
t.RLock()
defer t.RUnlock()
lset := make(model.LabelSet, len(t.metaLabels))
for ln, lv := range t.metaLabels {
lset[ln] = lv
}
return lset
return t.metaLabels.Clone()
}
func recordScrapeHealth(

View File

@ -105,7 +105,7 @@ func TestOverwriteLabels(t *testing.T) {
target := newTestTarget(server.URL, time.Second, nil)
target.honorLabels = false
target.scrapeConfig.HonorLabels = false
app := &collectResultAppender{}
if err := target.scrape(app); err != nil {
t.Fatal(err)
@ -117,7 +117,7 @@ func TestOverwriteLabels(t *testing.T) {
}
}
target.honorLabels = true
target.scrapeConfig.HonorLabels = true
app = &collectResultAppender{}
if err := target.scrape(app); err != nil {
t.Fatal(err)
@ -185,7 +185,7 @@ func TestTargetScrapeMetricRelabelConfigs(t *testing.T) {
)
defer server.Close()
testTarget := newTestTarget(server.URL, time.Second, model.LabelSet{})
testTarget.metricRelabelConfigs = []*config.RelabelConfig{
testTarget.scrapeConfig.MetricRelabelConfigs = []*config.RelabelConfig{
{
SourceLabels: model.LabelNames{"__name__"},
Regex: config.MustNewRegexp(".*drop.*"),
@ -216,7 +216,7 @@ func TestTargetScrapeMetricRelabelConfigs(t *testing.T) {
Metric: model.Metric{
model.MetricNameLabel: "test_metric_relabel",
"foo": "bar",
model.InstanceLabel: model.LabelValue(testTarget.url.Host),
model.InstanceLabel: model.LabelValue(testTarget.host()),
},
Timestamp: 0,
Value: 0,
@ -225,7 +225,7 @@ func TestTargetScrapeMetricRelabelConfigs(t *testing.T) {
{
Metric: model.Metric{
model.MetricNameLabel: scrapeHealthMetricName,
model.InstanceLabel: model.LabelValue(testTarget.url.Host),
model.InstanceLabel: model.LabelValue(testTarget.host()),
},
Timestamp: 0,
Value: 0,
@ -233,7 +233,7 @@ func TestTargetScrapeMetricRelabelConfigs(t *testing.T) {
{
Metric: model.Metric{
model.MetricNameLabel: scrapeDurationMetricName,
model.InstanceLabel: model.LabelValue(testTarget.url.Host),
model.InstanceLabel: model.LabelValue(testTarget.host()),
},
Timestamp: 0,
Value: 0,
@ -438,7 +438,8 @@ func TestURLParams(t *testing.T) {
model.AddressLabel: model.LabelValue(serverURL.Host),
"__param_foo": "bar",
},
nil)
nil,
)
if err != nil {
t.Fatal(err)
}
@ -448,29 +449,28 @@ func TestURLParams(t *testing.T) {
}
}
func newTestTarget(targetURL string, deadline time.Duration, baseLabels model.LabelSet) *Target {
cfg := &config.ScrapeConfig{
ScrapeTimeout: model.Duration(deadline),
}
c, _ := newHTTPClient(cfg)
func newTestTarget(targetURL string, deadline time.Duration, labels model.LabelSet) *Target {
labels = labels.Clone()
labels[model.SchemeLabel] = "http"
labels[model.AddressLabel] = model.LabelValue(strings.TrimLeft(targetURL, "http://"))
labels[model.MetricsPathLabel] = "/metrics"
t := &Target{
url: &url.URL{
Scheme: "http",
Host: strings.TrimLeft(targetURL, "http://"),
Path: "/metrics",
scrapeConfig: &config.ScrapeConfig{
ScrapeInterval: model.Duration(time.Millisecond),
ScrapeTimeout: model.Duration(deadline),
},
labels: labels,
status: &TargetStatus{},
scrapeInterval: 1 * time.Millisecond,
httpClient: c,
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
}
t.baseLabels = model.LabelSet{
model.InstanceLabel: model.LabelValue(t.InstanceIdentifier()),
}
for baseLabel, baseValue := range baseLabels {
t.baseLabels[baseLabel] = baseValue
var err error
if t.httpClient, err = t.client(); err != nil {
panic(err)
}
return t
}