More granular locking for scrapeLoop. (#8104)

Don't lock for all of Sync/stop/reload as that holds up /metrics and the
UI when they want a list of active/dropped targets. Instead take
advantage of the fact that Sync/stop/reload cannot be called
concurrently by the scrape Manager and lock just on the targets
themselves.

Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
Brian Brazil 2020-10-26 14:46:20 +00:00 committed by Frederic Branczyk
parent 601a3ef0bb
commit e7c6feabe1
No known key found for this signature in database
GPG Key ID: 576DA6AF8CB9027F
1 changed files with 24 additions and 15 deletions

View File

@ -192,7 +192,13 @@ type scrapePool struct {
appendable storage.Appendable appendable storage.Appendable
logger log.Logger logger log.Logger
mtx sync.Mutex // targetMtx protects activeTargets and droppedTargets from concurrent reads
// and writes. Only one of Sync/stop/reload may be called at once due to
// manager.mtxScrape so we only need to protect from concurrent reads from
// the ActiveTargets and DroppedTargets methods. This allows those two
// methods to always complete without having to wait on scrape loops to gracefull stop.
targetMtx sync.Mutex
config *config.ScrapeConfig config *config.ScrapeConfig
client *http.Client client *http.Client
// Targets and loops must always be synchronized to have the same // Targets and loops must always be synchronized to have the same
@ -273,8 +279,8 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed
} }
func (sp *scrapePool) ActiveTargets() []*Target { func (sp *scrapePool) ActiveTargets() []*Target {
sp.mtx.Lock() sp.targetMtx.Lock()
defer sp.mtx.Unlock() defer sp.targetMtx.Unlock()
var tActive []*Target var tActive []*Target
for _, t := range sp.activeTargets { for _, t := range sp.activeTargets {
@ -284,8 +290,8 @@ func (sp *scrapePool) ActiveTargets() []*Target {
} }
func (sp *scrapePool) DroppedTargets() []*Target { func (sp *scrapePool) DroppedTargets() []*Target {
sp.mtx.Lock() sp.targetMtx.Lock()
defer sp.mtx.Unlock() defer sp.targetMtx.Unlock()
return sp.droppedTargets return sp.droppedTargets
} }
@ -294,8 +300,7 @@ func (sp *scrapePool) stop() {
sp.cancel() sp.cancel()
var wg sync.WaitGroup var wg sync.WaitGroup
sp.mtx.Lock() sp.targetMtx.Lock()
defer sp.mtx.Unlock()
for fp, l := range sp.loops { for fp, l := range sp.loops {
wg.Add(1) wg.Add(1)
@ -308,6 +313,9 @@ func (sp *scrapePool) stop() {
delete(sp.loops, fp) delete(sp.loops, fp)
delete(sp.activeTargets, fp) delete(sp.activeTargets, fp)
} }
sp.targetMtx.Unlock()
wg.Wait() wg.Wait()
sp.client.CloseIdleConnections() sp.client.CloseIdleConnections()
@ -326,9 +334,6 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
targetScrapePoolReloads.Inc() targetScrapePoolReloads.Inc()
start := time.Now() start := time.Now()
sp.mtx.Lock()
defer sp.mtx.Unlock()
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, false, false) client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, false, false)
if err != nil { if err != nil {
targetScrapePoolReloadsFailed.Inc() targetScrapePoolReloadsFailed.Inc()
@ -352,6 +357,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
mrc = sp.config.MetricRelabelConfigs mrc = sp.config.MetricRelabelConfigs
) )
sp.targetMtx.Lock()
forcedErr := sp.refreshTargetLimitErr() forcedErr := sp.refreshTargetLimitErr()
for fp, oldLoop := range sp.loops { for fp, oldLoop := range sp.loops {
var cache *scrapeCache var cache *scrapeCache
@ -387,6 +394,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
sp.loops[fp] = newLoop sp.loops[fp] = newLoop
} }
sp.targetMtx.Unlock()
wg.Wait() wg.Wait()
oldClient.CloseIdleConnections() oldClient.CloseIdleConnections()
targetReloadIntervalLength.WithLabelValues(interval.String()).Observe( targetReloadIntervalLength.WithLabelValues(interval.String()).Observe(
@ -398,11 +407,9 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
// Sync converts target groups into actual scrape targets and synchronizes // Sync converts target groups into actual scrape targets and synchronizes
// the currently running scraper with the resulting set and returns all scraped and dropped targets. // the currently running scraper with the resulting set and returns all scraped and dropped targets.
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
sp.mtx.Lock()
defer sp.mtx.Unlock()
start := time.Now() start := time.Now()
sp.targetMtx.Lock()
var all []*Target var all []*Target
sp.droppedTargets = []*Target{} sp.droppedTargets = []*Target{}
for _, tg := range tgs { for _, tg := range tgs {
@ -419,6 +426,7 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
} }
} }
} }
sp.targetMtx.Unlock()
sp.sync(all) sp.sync(all)
targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe( targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
@ -431,7 +439,6 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
// scrape loops for new targets, and stops scrape loops for disappeared targets. // scrape loops for new targets, and stops scrape loops for disappeared targets.
// It returns after all stopped scrape loops terminated. // It returns after all stopped scrape loops terminated.
func (sp *scrapePool) sync(targets []*Target) { func (sp *scrapePool) sync(targets []*Target) {
// This function expects that you have acquired the sp.mtx lock.
var ( var (
uniqueLoops = make(map[uint64]loop) uniqueLoops = make(map[uint64]loop)
interval = time.Duration(sp.config.ScrapeInterval) interval = time.Duration(sp.config.ScrapeInterval)
@ -442,6 +449,7 @@ func (sp *scrapePool) sync(targets []*Target) {
mrc = sp.config.MetricRelabelConfigs mrc = sp.config.MetricRelabelConfigs
) )
sp.targetMtx.Lock()
for _, t := range targets { for _, t := range targets {
hash := t.hash() hash := t.hash()
@ -487,6 +495,8 @@ func (sp *scrapePool) sync(targets []*Target) {
} }
} }
sp.targetMtx.Unlock()
targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops))) targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops)))
forcedErr := sp.refreshTargetLimitErr() forcedErr := sp.refreshTargetLimitErr()
for _, l := range sp.loops { for _, l := range sp.loops {
@ -507,7 +517,6 @@ func (sp *scrapePool) sync(targets []*Target) {
// refreshTargetLimitErr returns an error that can be passed to the scrape loops // refreshTargetLimitErr returns an error that can be passed to the scrape loops
// if the number of targets exceeds the configured limit. // if the number of targets exceeds the configured limit.
func (sp *scrapePool) refreshTargetLimitErr() error { func (sp *scrapePool) refreshTargetLimitErr() error {
// This function expects that you have acquired the sp.mtx lock.
if sp.config == nil || sp.config.TargetLimit == 0 && !sp.targetLimitHit { if sp.config == nil || sp.config.TargetLimit == 0 && !sp.targetLimitHit {
return nil return nil
} }