From f482c7bdd734ba012f6cedd68ddc24b1416efb35 Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Thu, 30 Jul 2020 14:20:24 +0200 Subject: [PATCH] Add per scrape-config targets limit (#7554) * Add per scrape-config targets limit Signed-off-by: Julien Pivotto --- config/config.go | 5 +- docs/configuration/configuration.md | 9 +- .../prometheus-mixin/alerts.libsonnet | 14 ++ scrape/scrape.go | 181 +++++++++++++----- scrape/scrape_test.go | 155 ++++++++++++++- 5 files changed, 317 insertions(+), 47 deletions(-) diff --git a/config/config.go b/config/config.go index f8866ff5d..b87245d2a 100644 --- a/config/config.go +++ b/config/config.go @@ -379,8 +379,11 @@ type ScrapeConfig struct { MetricsPath string `yaml:"metrics_path,omitempty"` // The URL scheme with which to fetch metrics from targets. Scheme string `yaml:"scheme,omitempty"` - // More than this many samples post metric-relabelling will cause the scrape to fail. + // More than this many samples post metric-relabeling will cause the scrape to fail. SampleLimit uint `yaml:"sample_limit,omitempty"` + // More than this many targets after the target relabeling will cause the + // scrapes to fail. + TargetLimit uint `yaml:"target_limit,omitempty"` // We cannot do proper Go type embedding below as the parser will then parse // values arbitrarily into the overflow maps of further-down types. diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 24b32619c..c3857bdf7 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -252,9 +252,16 @@ metric_relabel_configs: [ - ... ] # Per-scrape limit on number of scraped samples that will be accepted. -# If more than this number of samples are present after metric relabelling +# If more than this number of samples are present after metric relabeling # the entire scrape will be treated as failed. 0 means no limit. [ sample_limit: | default = 0 ] + +# Per-scrape config limit on number of unique targets that will be +# accepted. If more than this number of targets are present after target +# relabeling, Prometheus will mark the targets as failed without scraping them. +# 0 means no limit. This is an experimental feature, this behaviour could +# change in the future. +[ target_limit: | default = 0 ] ``` Where `` must be unique across all scrape configurations. diff --git a/documentation/prometheus-mixin/alerts.libsonnet b/documentation/prometheus-mixin/alerts.libsonnet index ee88c096b..e0ddf4152 100644 --- a/documentation/prometheus-mixin/alerts.libsonnet +++ b/documentation/prometheus-mixin/alerts.libsonnet @@ -259,6 +259,20 @@ description: 'Prometheus %(prometheusName)s has missed {{ printf "%%.0f" $value }} rule group evaluations in the last 5m.' % $._config, }, }, + { + alert: 'PrometheusTargetLimitHit', + expr: ||| + increase(prometheus_target_scrape_pool_exceeded_target_limit_total{%(prometheusSelector)s}[5m]) > 0 + ||| % $._config, + 'for': '15m', + labels: { + severity: 'warning', + }, + annotations: { + summary: 'Prometheus has dropped targets because some scrape configs have exceeded the targets limit.', + description: 'Prometheus %(prometheusName)s has dropped {{ printf "%%.0f" $value }} targets because the number of targets exceeded the configured target_limit.' % $._config, + }, + }, ], }, ], diff --git a/scrape/scrape.go b/scrape/scrape.go index ce0e33332..91c8e6274 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -81,15 +81,35 @@ var ( targetScrapePoolReloads = prometheus.NewCounter( prometheus.CounterOpts{ Name: "prometheus_target_scrape_pool_reloads_total", - Help: "Total number of scrape loop reloads.", + Help: "Total number of scrape pool reloads.", }, ) targetScrapePoolReloadsFailed = prometheus.NewCounter( prometheus.CounterOpts{ Name: "prometheus_target_scrape_pool_reloads_failed_total", - Help: "Total number of failed scrape loop reloads.", + Help: "Total number of failed scrape pool reloads.", }, ) + targetScrapePoolExceededTargetLimit = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "prometheus_target_scrape_pool_exceeded_target_limit_total", + Help: "Total number of times scrape pools hit the target limit, during sync or config reload.", + }, + ) + targetScrapePoolTargetLimit = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "prometheus_target_scrape_pool_target_limit", + Help: "Maximum number of targets allowed in this scrape pool.", + }, + []string{"scrape_job"}, + ) + targetScrapePoolTargetsAdded = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "prometheus_target_scrape_pool_targets", + Help: "Current number of targets in this scrape pool.", + }, + []string{"scrape_job"}, + ) targetSyncIntervalLength = prometheus.NewSummaryVec( prometheus.SummaryOpts{ Name: "prometheus_target_sync_length_seconds", @@ -151,6 +171,9 @@ func init() { targetScrapeSampleDuplicate, targetScrapeSampleOutOfOrder, targetScrapeSampleOutOfBounds, + targetScrapePoolExceededTargetLimit, + targetScrapePoolTargetLimit, + targetScrapePoolTargetsAdded, targetScrapeCacheFlushForced, targetMetadataCache, ) @@ -170,6 +193,7 @@ type scrapePool struct { droppedTargets []*Target loops map[uint64]loop cancel context.CancelFunc + targetLimitHit bool // Internal state to speed up the target_limit checks. // Constructor for new scrape loops. This is settable for testing convenience. newLoop func(scrapeLoopOptions) loop @@ -278,6 +302,13 @@ func (sp *scrapePool) stop() { } wg.Wait() sp.client.CloseIdleConnections() + + if sp.config != nil { + targetScrapePoolSyncsCounter.DeleteLabelValues(sp.config.JobName) + targetScrapePoolTargetLimit.DeleteLabelValues(sp.config.JobName) + targetScrapePoolTargetsAdded.DeleteLabelValues(sp.config.JobName) + targetSyncIntervalLength.DeleteLabelValues(sp.config.JobName) + } } // reload the scrape pool with the given scrape configuration. The target state is preserved @@ -301,6 +332,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { oldClient := sp.client sp.client = client + targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit)) + var ( wg sync.WaitGroup interval = time.Duration(sp.config.ScrapeInterval) @@ -311,6 +344,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { mrc = sp.config.MetricRelabelConfigs ) + forcedErr := sp.refreshTargetLimitErr() for fp, oldLoop := range sp.loops { var cache *scrapeCache if oc := oldLoop.getCache(); reuseCache && oc != nil { @@ -338,6 +372,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { oldLoop.stop() wg.Done() + newLoop.setForcedError(forcedErr) go newLoop.run(interval, timeout, nil) }(oldLoop, newLoop) @@ -355,10 +390,12 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { // Sync converts target groups into actual scrape targets and synchronizes // the currently running scraper with the resulting set and returns all scraped and dropped targets. func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { + sp.mtx.Lock() + defer sp.mtx.Unlock() + start := time.Now() var all []*Target - sp.mtx.Lock() sp.droppedTargets = []*Target{} for _, tg := range tgs { targets, err := targetsFromGroup(tg, sp.config) @@ -374,7 +411,6 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { } } } - sp.mtx.Unlock() sp.sync(all) targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe( @@ -387,11 +423,9 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { // scrape loops for new targets, and stops scrape loops for disappeared targets. // It returns after all stopped scrape loops terminated. func (sp *scrapePool) sync(targets []*Target) { - sp.mtx.Lock() - defer sp.mtx.Unlock() - + // This function expects that you have acquired the sp.mtx lock. var ( - uniqueTargets = map[uint64]struct{}{} + uniqueLoops = make(map[uint64]loop) interval = time.Duration(sp.config.ScrapeInterval) timeout = time.Duration(sp.config.ScrapeTimeout) limit = int(sp.config.SampleLimit) @@ -403,7 +437,6 @@ func (sp *scrapePool) sync(targets []*Target) { for _, t := range targets { t := t hash := t.hash() - uniqueTargets[hash] = struct{}{} if _, ok := sp.activeTargets[hash]; !ok { s := &targetScraper{Target: t, client: sp.client, timeout: timeout} @@ -419,8 +452,12 @@ func (sp *scrapePool) sync(targets []*Target) { sp.activeTargets[hash] = t sp.loops[hash] = l - go l.run(interval, timeout, nil) + uniqueLoops[hash] = l } else { + // This might be a duplicated target. + if _, ok := uniqueLoops[hash]; !ok { + uniqueLoops[hash] = nil + } // Need to keep the most updated labels information // for displaying it in the Service Discovery web page. sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels()) @@ -431,7 +468,7 @@ func (sp *scrapePool) sync(targets []*Target) { // Stop and remove old targets and scraper loops. for hash := range sp.activeTargets { - if _, ok := uniqueTargets[hash]; !ok { + if _, ok := uniqueLoops[hash]; !ok { wg.Add(1) go func(l loop) { l.stop() @@ -443,6 +480,16 @@ func (sp *scrapePool) sync(targets []*Target) { } } + targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops))) + forcedErr := sp.refreshTargetLimitErr() + for _, l := range sp.loops { + l.setForcedError(forcedErr) + } + for _, l := range uniqueLoops { + if l != nil { + go l.run(interval, timeout, nil) + } + } // Wait for all potentially stopped scrapers to terminate. // This covers the case of flapping targets. If the server is under high load, a new scraper // may be active and tries to insert. The old scraper that didn't terminate yet could still @@ -450,6 +497,26 @@ func (sp *scrapePool) sync(targets []*Target) { wg.Wait() } +// refreshTargetLimitErr returns an error that can be passed to the scrape loops +// if the number of targets exceeds the configured limit. +func (sp *scrapePool) refreshTargetLimitErr() error { + // This function expects that you have acquired the sp.mtx lock. + if sp.config == nil || sp.config.TargetLimit == 0 && !sp.targetLimitHit { + return nil + } + l := len(sp.activeTargets) + if l <= int(sp.config.TargetLimit) && !sp.targetLimitHit { + return nil + } + var err error + sp.targetLimitHit = l > int(sp.config.TargetLimit) + if sp.targetLimitHit { + targetScrapePoolExceededTargetLimit.Inc() + err = fmt.Errorf("target_limit exceeded (number of targets: %d, limit: %d)", l, sp.config.TargetLimit) + } + return err +} + func mutateSampleLabels(lset labels.Labels, target *Target, honor bool, rc []*relabel.Config) labels.Labels { lb := labels.NewBuilder(lset) @@ -590,6 +657,7 @@ func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) // A loop can run and be stopped again. It must not be reused after it was stopped. type loop interface { run(interval, timeout time.Duration, errc chan<- error) + setForcedError(err error) stop() getCache() *scrapeCache disableEndOfRunStalenessMarkers() @@ -610,6 +678,8 @@ type scrapeLoop struct { buffers *pool.Pool jitterSeed uint64 honorTimestamps bool + forcedErr error + forcedErrMtx sync.Mutex appender func() storage.Appender sampleMutator labelsMutator @@ -953,10 +1023,7 @@ mainLoop: // together with reporting metrics, by using as few appenders as possible. // In the happy scenario, a single appender is used. func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time.Time, errc chan<- error) time.Time { - var ( - start = time.Now() - scrapeCtx, cancel = context.WithTimeout(sl.ctx, timeout) - ) + start := time.Now() // Only record after the first scrape. if !last.IsZero() { @@ -968,26 +1035,9 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time b := sl.buffers.Get(sl.lastScrapeSize).([]byte) buf := bytes.NewBuffer(b) - contentType, scrapeErr := sl.scraper.scrape(scrapeCtx, buf) - cancel() - - if scrapeErr == nil { - b = buf.Bytes() - // NOTE: There were issues with misbehaving clients in the past - // that occasionally returned empty results. We don't want those - // to falsely reset our buffer size. - if len(b) > 0 { - sl.lastScrapeSize = len(b) - } - } else { - level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error()) - if errc != nil { - errc <- scrapeErr - } - } - app := sl.appender() - var err error + var total, added, seriesAdded int + var err, appErr, scrapeErr error defer func() { if err != nil { app.Rollback() @@ -998,20 +1048,53 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time level.Error(sl.l).Log("msg", "Scrape commit failed", "err", err) } }() - // A failed scrape is the same as an empty scrape, - // we still call sl.append to trigger stale markers. - total, added, seriesAdded, appErr := sl.append(app, b, contentType, start) - if appErr != nil { - app.Rollback() - app = sl.appender() - level.Debug(sl.l).Log("msg", "Append failed", "err", appErr) - // The append failed, probably due to a parse error or sample limit. - // Call sl.append again with an empty scrape to trigger stale markers. + if forcedErr := sl.getForcedError(); forcedErr != nil { + appErr = forcedErr + // Add stale markers. if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil { app.Rollback() app = sl.appender() level.Warn(sl.l).Log("msg", "Append failed", "err", err) } + if errc != nil { + errc <- forcedErr + } + } else { + var contentType string + scrapeCtx, cancel := context.WithTimeout(sl.ctx, timeout) + contentType, scrapeErr = sl.scraper.scrape(scrapeCtx, buf) + cancel() + + if scrapeErr == nil { + b = buf.Bytes() + // NOTE: There were issues with misbehaving clients in the past + // that occasionally returned empty results. We don't want those + // to falsely reset our buffer size. + if len(b) > 0 { + sl.lastScrapeSize = len(b) + } + } else { + level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error()) + if errc != nil { + errc <- scrapeErr + } + } + + // A failed scrape is the same as an empty scrape, + // we still call sl.append to trigger stale markers. + total, added, seriesAdded, appErr = sl.append(app, b, contentType, start) + if appErr != nil { + app.Rollback() + app = sl.appender() + level.Debug(sl.l).Log("msg", "Append failed", "err", appErr) + // The append failed, probably due to a parse error or sample limit. + // Call sl.append again with an empty scrape to trigger stale markers. + if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil { + app.Rollback() + app = sl.appender() + level.Warn(sl.l).Log("msg", "Append failed", "err", err) + } + } } sl.buffers.Put(b) @@ -1026,6 +1109,18 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time return start } +func (sl *scrapeLoop) setForcedError(err error) { + sl.forcedErrMtx.Lock() + defer sl.forcedErrMtx.Unlock() + sl.forcedErr = err +} + +func (sl *scrapeLoop) getForcedError() error { + sl.forcedErrMtx.Lock() + defer sl.forcedErrMtx.Unlock() + return sl.forcedErr +} + func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, interval time.Duration) { // Scraping has stopped. We want to write stale markers but // the target may be recreated, so we wait just over 2 scrape intervals diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index e9e1cfe36..7b0e69dd1 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -138,8 +138,10 @@ func TestDiscoveredLabelsUpdate(t *testing.T) { } type testLoop struct { - startFunc func(interval, timeout time.Duration, errc chan<- error) - stopFunc func() + startFunc func(interval, timeout time.Duration, errc chan<- error) + stopFunc func() + forcedErr error + forcedErrMtx sync.Mutex } func (l *testLoop) run(interval, timeout time.Duration, errc chan<- error) { @@ -149,6 +151,18 @@ func (l *testLoop) run(interval, timeout time.Duration, errc chan<- error) { func (l *testLoop) disableEndOfRunStalenessMarkers() { } +func (l *testLoop) setForcedError(err error) { + l.forcedErrMtx.Lock() + defer l.forcedErrMtx.Unlock() + l.forcedErr = err +} + +func (l *testLoop) getForcedError() error { + l.forcedErrMtx.Lock() + defer l.forcedErrMtx.Unlock() + return l.forcedErr +} + func (l *testLoop) stop() { l.stopFunc() } @@ -301,6 +315,92 @@ func TestScrapePoolReload(t *testing.T) { testutil.Equals(t, numTargets, len(sp.loops), "Unexpected number of stopped loops after reload") } +func TestScrapePoolTargetLimit(t *testing.T) { + // On starting to run, new loops created on reload check whether their preceding + // equivalents have been stopped. + newLoop := func(opts scrapeLoopOptions) loop { + l := &testLoop{ + startFunc: func(interval, timeout time.Duration, errc chan<- error) {}, + stopFunc: func() {}, + } + return l + } + sp := &scrapePool{ + appendable: &nopAppendable{}, + activeTargets: map[uint64]*Target{}, + loops: map[uint64]loop{}, + newLoop: newLoop, + logger: nil, + client: http.DefaultClient, + } + + var tgs = []*targetgroup.Group{} + for i := 0; i < 50; i++ { + tgs = append(tgs, + &targetgroup.Group{ + Targets: []model.LabelSet{ + {model.AddressLabel: model.LabelValue(fmt.Sprintf("127.0.0.1:%d", 9090+i))}, + }, + }, + ) + } + + var limit uint + reloadWithLimit := func(l uint) { + limit = l + testutil.Ok(t, sp.reload(&config.ScrapeConfig{ + ScrapeInterval: model.Duration(3 * time.Second), + ScrapeTimeout: model.Duration(2 * time.Second), + TargetLimit: l, + })) + } + + var targets int + loadTargets := func(n int) { + targets = n + sp.Sync(tgs[:n]) + } + + validateErrorMessage := func(shouldErr bool) { + for _, l := range sp.loops { + lerr := l.(*testLoop).getForcedError() + if shouldErr { + testutil.Assert(t, lerr != nil, "error was expected for %d targets with a limit of %d", targets, limit) + testutil.Equals(t, fmt.Sprintf("target_limit exceeded (number of targets: %d, limit: %d)", targets, limit), lerr.Error()) + } else { + testutil.Equals(t, nil, lerr) + } + } + } + + reloadWithLimit(0) + loadTargets(50) + + // Simulate an initial config with a limit. + sp.config.TargetLimit = 30 + limit = 30 + loadTargets(50) + validateErrorMessage(true) + + reloadWithLimit(50) + validateErrorMessage(false) + + reloadWithLimit(40) + validateErrorMessage(true) + + loadTargets(30) + validateErrorMessage(false) + + loadTargets(40) + validateErrorMessage(false) + + loadTargets(41) + validateErrorMessage(true) + + reloadWithLimit(51) + validateErrorMessage(false) +} + func TestScrapePoolAppender(t *testing.T) { cfg := &config.ScrapeConfig{} app := &nopAppendable{} @@ -595,6 +695,57 @@ func TestScrapeLoopRun(t *testing.T) { } } +func TestScrapeLoopForcedErr(t *testing.T) { + var ( + signal = make(chan struct{}, 1) + errc = make(chan error) + + scraper = &testScraper{} + app = func() storage.Appender { return &nopAppender{} } + ) + + ctx, cancel := context.WithCancel(context.Background()) + sl := newScrapeLoop(ctx, + scraper, + nil, nil, + nopMutator, + nopMutator, + app, + nil, + 0, + true, + ) + + forcedErr := fmt.Errorf("forced err") + sl.setForcedError(forcedErr) + + scraper.scrapeFunc = func(context.Context, io.Writer) error { + t.Fatalf("should not be scraped") + return nil + } + + go func() { + sl.run(time.Second, time.Hour, errc) + signal <- struct{}{} + }() + + select { + case err := <-errc: + if err != forcedErr { + t.Fatalf("Expected forced error but got: %s", err) + } + case <-time.After(3 * time.Second): + t.Fatalf("Expected forced error but got none") + } + cancel() + + select { + case <-signal: + case <-time.After(5 * time.Second): + t.Fatalf("Scrape not stopped") + } +} + func TestScrapeLoopMetadata(t *testing.T) { var ( signal = make(chan struct{})