diff --git a/scrape/scrape.go b/scrape/scrape.go index d52b0ac7f..f6aa49810 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -192,7 +192,13 @@ type scrapePool struct { appendable storage.Appendable logger log.Logger - mtx sync.Mutex + // targetMtx protects activeTargets and droppedTargets from concurrent reads + // and writes. Only one of Sync/stop/reload may be called at once due to + // manager.mtxScrape so we only need to protect from concurrent reads from + // the ActiveTargets and DroppedTargets methods. This allows those two + // methods to always complete without having to wait on scrape loops to gracefull stop. + targetMtx sync.Mutex + config *config.ScrapeConfig client *http.Client // Targets and loops must always be synchronized to have the same @@ -273,8 +279,8 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed } func (sp *scrapePool) ActiveTargets() []*Target { - sp.mtx.Lock() - defer sp.mtx.Unlock() + sp.targetMtx.Lock() + defer sp.targetMtx.Unlock() var tActive []*Target for _, t := range sp.activeTargets { @@ -284,8 +290,8 @@ func (sp *scrapePool) ActiveTargets() []*Target { } func (sp *scrapePool) DroppedTargets() []*Target { - sp.mtx.Lock() - defer sp.mtx.Unlock() + sp.targetMtx.Lock() + defer sp.targetMtx.Unlock() return sp.droppedTargets } @@ -294,8 +300,7 @@ func (sp *scrapePool) stop() { sp.cancel() var wg sync.WaitGroup - sp.mtx.Lock() - defer sp.mtx.Unlock() + sp.targetMtx.Lock() for fp, l := range sp.loops { wg.Add(1) @@ -308,6 +313,9 @@ func (sp *scrapePool) stop() { delete(sp.loops, fp) delete(sp.activeTargets, fp) } + + sp.targetMtx.Unlock() + wg.Wait() sp.client.CloseIdleConnections() @@ -326,9 +334,6 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { targetScrapePoolReloads.Inc() start := time.Now() - sp.mtx.Lock() - defer sp.mtx.Unlock() - client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, false, false) if err != nil { targetScrapePoolReloadsFailed.Inc() @@ -352,6 +357,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { mrc = sp.config.MetricRelabelConfigs ) + sp.targetMtx.Lock() + forcedErr := sp.refreshTargetLimitErr() for fp, oldLoop := range sp.loops { var cache *scrapeCache @@ -387,6 +394,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { sp.loops[fp] = newLoop } + sp.targetMtx.Unlock() + wg.Wait() oldClient.CloseIdleConnections() targetReloadIntervalLength.WithLabelValues(interval.String()).Observe( @@ -398,11 +407,9 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { // Sync converts target groups into actual scrape targets and synchronizes // the currently running scraper with the resulting set and returns all scraped and dropped targets. func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { - sp.mtx.Lock() - defer sp.mtx.Unlock() - start := time.Now() + sp.targetMtx.Lock() var all []*Target sp.droppedTargets = []*Target{} for _, tg := range tgs { @@ -419,6 +426,7 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { } } } + sp.targetMtx.Unlock() sp.sync(all) targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe( @@ -431,7 +439,6 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { // scrape loops for new targets, and stops scrape loops for disappeared targets. // It returns after all stopped scrape loops terminated. func (sp *scrapePool) sync(targets []*Target) { - // This function expects that you have acquired the sp.mtx lock. var ( uniqueLoops = make(map[uint64]loop) interval = time.Duration(sp.config.ScrapeInterval) @@ -442,6 +449,7 @@ func (sp *scrapePool) sync(targets []*Target) { mrc = sp.config.MetricRelabelConfigs ) + sp.targetMtx.Lock() for _, t := range targets { hash := t.hash() @@ -487,6 +495,8 @@ func (sp *scrapePool) sync(targets []*Target) { } } + sp.targetMtx.Unlock() + targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops))) forcedErr := sp.refreshTargetLimitErr() for _, l := range sp.loops { @@ -507,7 +517,6 @@ func (sp *scrapePool) sync(targets []*Target) { // refreshTargetLimitErr returns an error that can be passed to the scrape loops // if the number of targets exceeds the configured limit. func (sp *scrapePool) refreshTargetLimitErr() error { - // This function expects that you have acquired the sp.mtx lock. if sp.config == nil || sp.config.TargetLimit == 0 && !sp.targetLimitHit { return nil }