prometheus/retrieval/target.go

679 lines
17 KiB
Go
Raw Normal View History

// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"errors"
2013-01-04 13:41:47 +00:00
"fmt"
2015-08-20 17:02:29 +00:00
"io"
"io/ioutil"
2013-01-04 13:41:47 +00:00
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
2015-08-20 17:02:29 +00:00
"github.com/prometheus/common/expfmt"
2015-10-03 08:21:43 +00:00
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage"
2016-02-02 13:01:44 +00:00
"github.com/prometheus/prometheus/storage/local"
2015-05-29 11:30:30 +00:00
"github.com/prometheus/prometheus/util/httputil"
)
const (
2015-08-21 21:01:08 +00:00
scrapeHealthMetricName = "up"
scrapeDurationMetricName = "scrape_duration_seconds"
// Capacity of the channel to buffer samples during ingestion.
ingestedSamplesCap = 256
// Constants for instrumentation.
namespace = "prometheus"
interval = "interval"
)
var (
errSkippedScrape = errors.New("scrape skipped due to throttled ingestion")
targetIntervalLength = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "target_interval_length_seconds",
Help: "Actual intervals between scrapes.",
2015-01-21 14:42:25 +00:00
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
},
[]string{interval},
)
targetSkippedScrapes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "target_skipped_scrapes_total",
Help: "Total number of scrapes that were skipped because the metric storage was throttled.",
},
[]string{interval},
)
)
func init() {
prometheus.MustRegister(targetIntervalLength)
prometheus.MustRegister(targetSkippedScrapes)
}
// TargetHealth describes the health state of a target.
type TargetHealth int
func (t TargetHealth) String() string {
switch t {
case HealthUnknown:
return "unknown"
case HealthGood:
return "up"
case HealthBad:
return "down"
}
panic("unknown state")
}
2015-08-21 21:01:08 +00:00
func (t TargetHealth) value() model.SampleValue {
if t == HealthGood {
return 1
}
return 0
}
const (
// HealthUnknown is the state of a Target before it is first scraped.
HealthUnknown TargetHealth = iota
// HealthGood is the state of a Target that has been successfully scraped.
HealthGood
// HealthBad is the state of a Target that was scraped unsuccessfully.
HealthBad
)
// TargetStatus contains information about the current status of a scrape target.
type TargetStatus struct {
lastError error
lastScrape time.Time
health TargetHealth
mu sync.RWMutex
}
// LastError returns the error encountered during the last scrape.
func (ts *TargetStatus) LastError() error {
ts.mu.RLock()
defer ts.mu.RUnlock()
2016-02-12 14:43:27 +00:00
return ts.lastError
}
// LastScrape returns the time of the last scrape.
func (ts *TargetStatus) LastScrape() time.Time {
ts.mu.RLock()
defer ts.mu.RUnlock()
2016-02-12 14:43:27 +00:00
return ts.lastScrape
}
// Health returns the last known health state of the target.
func (ts *TargetStatus) Health() TargetHealth {
ts.mu.RLock()
defer ts.mu.RUnlock()
2016-02-12 14:43:27 +00:00
return ts.health
}
func (ts *TargetStatus) setLastScrape(t time.Time) {
ts.mu.Lock()
defer ts.mu.Unlock()
2016-02-12 14:43:27 +00:00
ts.lastScrape = t
}
func (ts *TargetStatus) setLastError(err error) {
ts.mu.Lock()
defer ts.mu.Unlock()
2016-02-12 14:43:27 +00:00
if err == nil {
ts.health = HealthGood
} else {
ts.health = HealthBad
}
ts.lastError = err
}
// Target refers to a singular HTTP or HTTPS endpoint.
type Target struct {
// The status object for the target. It is only set once on initialization.
status *TargetStatus
// Closing scraperStopping signals that scraping should stop.
scraperStopping chan struct{}
// Closing scraperStopped signals that scraping has been stopped.
scraperStopped chan struct{}
// Mutex protects the members below.
sync.RWMutex
2016-02-12 14:43:27 +00:00
scrapeConfig *config.ScrapeConfig
// Labels before any processing.
metaLabels model.LabelSet
2016-02-15 09:31:38 +00:00
// Any labels that are added to this target and its metrics.
2016-02-12 14:43:27 +00:00
labels model.LabelSet
// The HTTP client used to scrape the target's endpoint.
httpClient *http.Client
}
// NewTarget creates a reasonably configured target for querying.
2016-02-12 14:43:27 +00:00
func NewTarget(cfg *config.ScrapeConfig, labels, metaLabels model.LabelSet) (*Target, error) {
t := &Target{
status: &TargetStatus{},
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
2013-01-04 13:41:47 +00:00
}
2016-02-12 14:43:27 +00:00
err := t.Update(cfg, labels, metaLabels)
return t, err
}
// Status returns the status of the target.
func (t *Target) Status() *TargetStatus {
return t.status
}
// Update overwrites settings in the target that are derived from the job config
// it belongs to.
2016-02-15 09:31:38 +00:00
func (t *Target) Update(cfg *config.ScrapeConfig, labels, metaLabels model.LabelSet) error {
t.Lock()
2016-02-12 14:43:27 +00:00
t.scrapeConfig = cfg
2016-02-15 09:31:38 +00:00
t.labels = labels
2016-02-12 14:43:27 +00:00
t.metaLabels = metaLabels
2016-02-12 14:43:27 +00:00
t.Unlock()
2016-02-12 14:43:27 +00:00
httpClient, err := t.client()
if err != nil {
return fmt.Errorf("cannot create HTTP client: %s", err)
}
2016-02-12 14:43:27 +00:00
t.Lock()
t.httpClient = httpClient
t.Unlock()
return nil
}
func newHTTPClient(cfg *config.ScrapeConfig) (*http.Client, error) {
tlsOpts := httputil.TLSOptions{
InsecureSkipVerify: cfg.TLSConfig.InsecureSkipVerify,
CAFile: cfg.TLSConfig.CAFile,
}
if len(cfg.TLSConfig.CertFile) > 0 && len(cfg.TLSConfig.KeyFile) > 0 {
tlsOpts.CertFile = cfg.TLSConfig.CertFile
tlsOpts.KeyFile = cfg.TLSConfig.KeyFile
}
tlsConfig, err := httputil.NewTLSConfig(tlsOpts)
if err != nil {
return nil, err
}
// The only timeout we care about is the configured scrape timeout.
// It is applied on request. So we leave out any timings here.
var rt http.RoundTripper = &http.Transport{
Proxy: http.ProxyURL(cfg.ProxyURL.URL),
DisableKeepAlives: true,
TLSClientConfig: tlsConfig,
}
// If a bearer token is provided, create a round tripper that will set the
// Authorization header correctly on each request.
bearerToken := cfg.BearerToken
if len(bearerToken) == 0 && len(cfg.BearerTokenFile) > 0 {
b, err := ioutil.ReadFile(cfg.BearerTokenFile)
if err != nil {
return nil, fmt.Errorf("unable to read bearer token file %s: %s", cfg.BearerTokenFile, err)
}
bearerToken = string(b)
}
if len(bearerToken) > 0 {
rt = httputil.NewBearerAuthRoundTripper(bearerToken, rt)
}
if cfg.BasicAuth != nil {
rt = httputil.NewBasicAuthRoundTripper(cfg.BasicAuth.Username, cfg.BasicAuth.Password, rt)
}
// Return a new client with the configured round tripper.
return httputil.NewClient(rt), nil
}
func (t *Target) String() string {
2016-02-12 14:43:27 +00:00
return t.host()
}
// fingerprint returns an identifying hash for the target.
func (t *Target) fingerprint() model.Fingerprint {
t.RLock()
defer t.RUnlock()
return t.labels.Fingerprint()
}
// offset returns the time until the next scrape cycle for the target.
func (t *Target) offset(interval time.Duration) time.Duration {
now := time.Now().UnixNano()
var (
base = now % int64(interval)
offset = uint64(t.fingerprint()) % uint64(interval)
next = base + int64(offset)
)
if next > int64(interval) {
next -= int64(interval)
}
return time.Duration(next)
}
2016-02-12 14:43:27 +00:00
func (t *Target) client() (*http.Client, error) {
t.RLock()
defer t.RUnlock()
return newHTTPClient(t.scrapeConfig)
}
func (t *Target) interval() time.Duration {
t.RLock()
defer t.RUnlock()
return time.Duration(t.scrapeConfig.ScrapeInterval)
}
func (t *Target) timeout() time.Duration {
t.RLock()
defer t.RUnlock()
return time.Duration(t.scrapeConfig.ScrapeTimeout)
}
func (t *Target) scheme() string {
t.RLock()
defer t.RUnlock()
return string(t.labels[model.SchemeLabel])
}
func (t *Target) host() string {
t.RLock()
defer t.RUnlock()
return string(t.labels[model.AddressLabel])
}
func (t *Target) path() string {
t.RLock()
defer t.RUnlock()
return string(t.labels[model.MetricsPathLabel])
}
2016-02-15 15:10:29 +00:00
// wrapAppender wraps a SampleAppender for samples ingested from the target.
// RLock must be acquired by the caller.
2016-02-15 15:10:29 +00:00
func (t *Target) wrapAppender(app storage.SampleAppender) storage.SampleAppender {
// The relabelAppender has to be inside the label-modifying appenders
// so the relabeling rules are applied to the correct label set.
if mrc := t.scrapeConfig.MetricRelabelConfigs; len(mrc) > 0 {
app = relabelAppender{
SampleAppender: app,
relabelings: mrc,
}
}
if t.scrapeConfig.HonorLabels {
app = honorLabelsAppender{
SampleAppender: app,
labels: t.unlockedLabels(),
2016-02-15 15:10:29 +00:00
}
} else {
app = ruleLabelsAppender{
SampleAppender: app,
labels: t.unlockedLabels(),
2016-02-15 15:10:29 +00:00
}
}
return app
}
2016-02-15 15:21:03 +00:00
// wrapReportingAppender wraps an appender for target status report samples.
// It ignores any relabeling rules set for the target.
// RLock must not be acquired by the caller.
2016-02-15 15:21:03 +00:00
func (t *Target) wrapReportingAppender(app storage.SampleAppender) storage.SampleAppender {
return ruleLabelsAppender{
SampleAppender: app,
labels: t.Labels(),
}
}
2016-02-12 14:43:27 +00:00
// URL returns a copy of the target's URL.
func (t *Target) URL() *url.URL {
t.RLock()
defer t.RUnlock()
params := url.Values{}
for k, v := range t.scrapeConfig.Params {
params[k] = make([]string, len(v))
copy(params[k], v)
}
for k, v := range t.labels {
if !strings.HasPrefix(string(k), model.ParamLabelPrefix) {
continue
}
ks := string(k[len(model.ParamLabelPrefix):])
if len(params[ks]) > 0 {
params[ks][0] = string(v)
} else {
params[ks] = []string{string(v)}
}
}
return &url.URL{
Scheme: string(t.labels[model.SchemeLabel]),
Host: string(t.labels[model.AddressLabel]),
Path: string(t.labels[model.MetricsPathLabel]),
RawQuery: params.Encode(),
}
}
// InstanceIdentifier returns the identifier for the target.
func (t *Target) InstanceIdentifier() string {
return t.host()
}
// RunScraper implements Target.
func (t *Target) RunScraper(sampleAppender storage.SampleAppender) {
defer close(t.scraperStopped)
2016-02-12 14:43:27 +00:00
lastScrapeInterval := t.interval()
log.Debugf("Starting scraper for target %v...", t)
select {
case <-time.After(t.offset(lastScrapeInterval)):
// Continue after scraping offset.
case <-t.scraperStopping:
return
}
ticker := time.NewTicker(lastScrapeInterval)
defer ticker.Stop()
t.scrape(sampleAppender)
// Explanation of the contraption below:
//
// In case t.scraperStopping has something to receive, we want to read
// from that channel rather than starting a new scrape (which might take very
// long). That's why the outer select has no ticker.C. Should t.scraperStopping
// not have anything to receive, we go into the inner select, where ticker.C
// is in the mix.
for {
select {
case <-t.scraperStopping:
return
default:
select {
case <-t.scraperStopping:
return
case <-ticker.C:
took := time.Since(t.status.LastScrape())
intervalStr := lastScrapeInterval.String()
// On changed scrape interval the new interval becomes effective
// after the next scrape.
2016-02-12 14:43:27 +00:00
if iv := t.interval(); iv != lastScrapeInterval {
ticker.Stop()
2016-02-12 14:43:27 +00:00
ticker = time.NewTicker(iv)
lastScrapeInterval = iv
}
targetIntervalLength.WithLabelValues(intervalStr).Observe(
float64(took) / float64(time.Second), // Sub-second precision.
)
if sampleAppender.NeedsThrottling() {
targetSkippedScrapes.WithLabelValues(intervalStr).Inc()
t.status.setLastError(errSkippedScrape)
continue
}
t.scrape(sampleAppender)
}
}
}
}
// StopScraper implements Target.
func (t *Target) StopScraper() {
log.Debugf("Stopping scraper for target %v...", t)
close(t.scraperStopping)
<-t.scraperStopped
log.Debugf("Scraper for target %v stopped.", t)
}
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`
2016-02-12 14:43:27 +00:00
func (t *Target) scrape(appender storage.SampleAppender) error {
var (
2016-02-15 15:21:03 +00:00
err error
start = time.Now()
2016-02-12 14:43:27 +00:00
)
defer func(appender storage.SampleAppender) {
2016-02-15 15:21:03 +00:00
t.report(appender, start, time.Since(start), err)
}(appender)
2015-07-08 19:27:52 +00:00
t.RLock()
2016-02-15 15:10:29 +00:00
appender = t.wrapAppender(appender)
client := t.httpClient
t.RUnlock()
2013-01-04 13:41:47 +00:00
req, err := http.NewRequest("GET", t.URL().String(), nil)
if err != nil {
2015-09-10 10:10:12 +00:00
return err
}
req.Header.Add("Accept", acceptHeader)
ctx, _ := context.WithTimeout(context.Background(), t.timeout())
resp, err := ctxhttp.Do(ctx, client, req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("server returned HTTP status %s", resp.Status)
}
2013-01-04 13:41:47 +00:00
2015-09-19 09:49:40 +00:00
dec := expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header))
2013-01-04 13:41:47 +00:00
2015-08-20 17:02:29 +00:00
sdec := expfmt.SampleDecoder{
Dec: dec,
Opts: &expfmt.DecodeOptions{
Timestamp: model.TimeFromUnixNano(start.UnixNano()),
},
}
2016-02-02 13:01:44 +00:00
var (
samples model.Vector
numOutOfOrder int
2016-02-03 09:39:34 +00:00
logger = log.With("target", t.InstanceIdentifier())
2016-02-02 13:01:44 +00:00
)
for {
if err = sdec.Decode(&samples); err != nil {
break
2015-08-20 17:02:29 +00:00
}
for _, s := range samples {
2016-02-02 13:01:44 +00:00
err := appender.Append(s)
if err != nil {
if err == local.ErrOutOfOrderSample {
numOutOfOrder++
} else {
2016-02-03 09:39:34 +00:00
logger.With("sample", s).Warnf("Error inserting sample: %s", err)
2016-02-02 13:01:44 +00:00
}
}
}
}
2016-02-02 13:01:44 +00:00
if numOutOfOrder > 0 {
2016-02-03 09:39:34 +00:00
logger.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples")
2016-02-02 13:01:44 +00:00
}
2015-08-20 17:02:29 +00:00
if err == io.EOF {
// Set err to nil since it is used in the scrape health recording.
err = nil
2015-08-20 17:02:29 +00:00
}
return err
}
2016-02-15 15:21:03 +00:00
func (t *Target) report(app storage.SampleAppender, start time.Time, duration time.Duration, err error) {
t.status.setLastScrape(start)
t.status.setLastError(err)
ts := model.TimeFromUnixNano(start.UnixNano())
var health model.SampleValue
if err == nil {
health = 1
}
healthSample := &model.Sample{
Metric: model.Metric{
model.MetricNameLabel: scrapeHealthMetricName,
},
Timestamp: ts,
Value: health,
}
durationSample := &model.Sample{
Metric: model.Metric{
model.MetricNameLabel: scrapeDurationMetricName,
},
Timestamp: ts,
Value: model.SampleValue(float64(duration) / float64(time.Second)),
}
app = t.wrapReportingAppender(app)
app.Append(healthSample)
app.Append(durationSample)
}
// Merges the ingested sample's metric with the label set. On a collision the
// value of the ingested label is stored in a label prefixed with 'exported_'.
type ruleLabelsAppender struct {
storage.SampleAppender
labels model.LabelSet
}
2016-02-02 13:01:44 +00:00
func (app ruleLabelsAppender) Append(s *model.Sample) error {
for ln, lv := range app.labels {
if v, ok := s.Metric[ln]; ok && v != "" {
s.Metric[model.ExportedLabelPrefix+ln] = v
}
s.Metric[ln] = lv
}
2016-02-02 13:01:44 +00:00
return app.SampleAppender.Append(s)
}
type honorLabelsAppender struct {
storage.SampleAppender
labels model.LabelSet
}
// Merges the sample's metric with the given labels if the label is not
// already present in the metric.
// This also considers labels explicitly set to the empty string.
2016-02-02 13:01:44 +00:00
func (app honorLabelsAppender) Append(s *model.Sample) error {
for ln, lv := range app.labels {
if _, ok := s.Metric[ln]; !ok {
s.Metric[ln] = lv
}
}
2016-02-02 13:01:44 +00:00
return app.SampleAppender.Append(s)
}
// Applies a set of relabel configurations to the sample's metric
// before actually appending it.
type relabelAppender struct {
storage.SampleAppender
relabelings []*config.RelabelConfig
}
2016-02-02 13:01:44 +00:00
func (app relabelAppender) Append(s *model.Sample) error {
labels, err := Relabel(model.LabelSet(s.Metric), app.relabelings...)
if err != nil {
2016-02-02 13:01:44 +00:00
return fmt.Errorf("metric relabeling error %s: %s", s.Metric, err)
}
// Check if the timeseries was dropped.
if labels == nil {
2016-02-02 13:01:44 +00:00
return nil
}
s.Metric = model.Metric(labels)
2016-02-02 13:01:44 +00:00
return app.SampleAppender.Append(s)
}
2016-02-15 09:31:38 +00:00
// Labels returns a copy of the set of all public labels of the target.
func (t *Target) Labels() model.LabelSet {
t.RLock()
defer t.RUnlock()
return t.unlockedLabels()
}
// unlockedLabels does the same as Labels but does not lock the mutex (useful
// for internal usage when the mutex is already locked).
func (t *Target) unlockedLabels() model.LabelSet {
2016-02-12 14:43:27 +00:00
lset := make(model.LabelSet, len(t.labels))
for ln, lv := range t.labels {
if !strings.HasPrefix(string(ln), model.ReservedLabelPrefix) {
lset[ln] = lv
}
}
2016-02-12 14:43:27 +00:00
if _, ok := lset[model.InstanceLabel]; !ok {
lset[model.InstanceLabel] = t.labels[model.AddressLabel]
}
2016-02-12 14:43:27 +00:00
return lset
}
// MetaLabels returns a copy of the target's labels before any processing.
func (t *Target) MetaLabels() model.LabelSet {
t.RLock()
defer t.RUnlock()
2016-02-12 14:43:27 +00:00
return t.metaLabels.Clone()
}