mirror of
https://github.com/prometheus/prometheus
synced 2024-12-23 23:13:11 +00:00
More granular locking for scrapeLoop. (#8104)
Don't lock for all of Sync/stop/reload as that holds up /metrics and the UI when they want a list of active/dropped targets. Instead take advantage of the fact that Sync/stop/reload cannot be called concurrently by the scrape Manager and lock just on the targets themselves. Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
parent
e7e60623ff
commit
3f8e51738c
@ -192,7 +192,13 @@ type scrapePool struct {
|
||||
appendable storage.Appendable
|
||||
logger log.Logger
|
||||
|
||||
mtx sync.Mutex
|
||||
// targetMtx protects activeTargets and droppedTargets from concurrent reads
|
||||
// and writes. Only one of Sync/stop/reload may be called at once due to
|
||||
// manager.mtxScrape so we only need to protect from concurrent reads from
|
||||
// the ActiveTargets and DroppedTargets methods. This allows those two
|
||||
// methods to always complete without having to wait on scrape loops to gracefull stop.
|
||||
targetMtx sync.Mutex
|
||||
|
||||
config *config.ScrapeConfig
|
||||
client *http.Client
|
||||
// Targets and loops must always be synchronized to have the same
|
||||
@ -273,8 +279,8 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed
|
||||
}
|
||||
|
||||
func (sp *scrapePool) ActiveTargets() []*Target {
|
||||
sp.mtx.Lock()
|
||||
defer sp.mtx.Unlock()
|
||||
sp.targetMtx.Lock()
|
||||
defer sp.targetMtx.Unlock()
|
||||
|
||||
var tActive []*Target
|
||||
for _, t := range sp.activeTargets {
|
||||
@ -284,8 +290,8 @@ func (sp *scrapePool) ActiveTargets() []*Target {
|
||||
}
|
||||
|
||||
func (sp *scrapePool) DroppedTargets() []*Target {
|
||||
sp.mtx.Lock()
|
||||
defer sp.mtx.Unlock()
|
||||
sp.targetMtx.Lock()
|
||||
defer sp.targetMtx.Unlock()
|
||||
return sp.droppedTargets
|
||||
}
|
||||
|
||||
@ -294,8 +300,7 @@ func (sp *scrapePool) stop() {
|
||||
sp.cancel()
|
||||
var wg sync.WaitGroup
|
||||
|
||||
sp.mtx.Lock()
|
||||
defer sp.mtx.Unlock()
|
||||
sp.targetMtx.Lock()
|
||||
|
||||
for fp, l := range sp.loops {
|
||||
wg.Add(1)
|
||||
@ -308,6 +313,9 @@ func (sp *scrapePool) stop() {
|
||||
delete(sp.loops, fp)
|
||||
delete(sp.activeTargets, fp)
|
||||
}
|
||||
|
||||
sp.targetMtx.Unlock()
|
||||
|
||||
wg.Wait()
|
||||
sp.client.CloseIdleConnections()
|
||||
|
||||
@ -326,9 +334,6 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
||||
targetScrapePoolReloads.Inc()
|
||||
start := time.Now()
|
||||
|
||||
sp.mtx.Lock()
|
||||
defer sp.mtx.Unlock()
|
||||
|
||||
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, false, false)
|
||||
if err != nil {
|
||||
targetScrapePoolReloadsFailed.Inc()
|
||||
@ -352,6 +357,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
||||
mrc = sp.config.MetricRelabelConfigs
|
||||
)
|
||||
|
||||
sp.targetMtx.Lock()
|
||||
|
||||
forcedErr := sp.refreshTargetLimitErr()
|
||||
for fp, oldLoop := range sp.loops {
|
||||
var cache *scrapeCache
|
||||
@ -387,6 +394,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
||||
sp.loops[fp] = newLoop
|
||||
}
|
||||
|
||||
sp.targetMtx.Unlock()
|
||||
|
||||
wg.Wait()
|
||||
oldClient.CloseIdleConnections()
|
||||
targetReloadIntervalLength.WithLabelValues(interval.String()).Observe(
|
||||
@ -398,11 +407,9 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
||||
// Sync converts target groups into actual scrape targets and synchronizes
|
||||
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
|
||||
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
|
||||
sp.mtx.Lock()
|
||||
defer sp.mtx.Unlock()
|
||||
|
||||
start := time.Now()
|
||||
|
||||
sp.targetMtx.Lock()
|
||||
var all []*Target
|
||||
sp.droppedTargets = []*Target{}
|
||||
for _, tg := range tgs {
|
||||
@ -419,6 +426,7 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
|
||||
}
|
||||
}
|
||||
}
|
||||
sp.targetMtx.Unlock()
|
||||
sp.sync(all)
|
||||
|
||||
targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
|
||||
@ -431,7 +439,6 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
|
||||
// scrape loops for new targets, and stops scrape loops for disappeared targets.
|
||||
// It returns after all stopped scrape loops terminated.
|
||||
func (sp *scrapePool) sync(targets []*Target) {
|
||||
// This function expects that you have acquired the sp.mtx lock.
|
||||
var (
|
||||
uniqueLoops = make(map[uint64]loop)
|
||||
interval = time.Duration(sp.config.ScrapeInterval)
|
||||
@ -442,6 +449,7 @@ func (sp *scrapePool) sync(targets []*Target) {
|
||||
mrc = sp.config.MetricRelabelConfigs
|
||||
)
|
||||
|
||||
sp.targetMtx.Lock()
|
||||
for _, t := range targets {
|
||||
hash := t.hash()
|
||||
|
||||
@ -487,6 +495,8 @@ func (sp *scrapePool) sync(targets []*Target) {
|
||||
}
|
||||
}
|
||||
|
||||
sp.targetMtx.Unlock()
|
||||
|
||||
targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops)))
|
||||
forcedErr := sp.refreshTargetLimitErr()
|
||||
for _, l := range sp.loops {
|
||||
@ -507,7 +517,6 @@ func (sp *scrapePool) sync(targets []*Target) {
|
||||
// refreshTargetLimitErr returns an error that can be passed to the scrape loops
|
||||
// if the number of targets exceeds the configured limit.
|
||||
func (sp *scrapePool) refreshTargetLimitErr() error {
|
||||
// This function expects that you have acquired the sp.mtx lock.
|
||||
if sp.config == nil || sp.config.TargetLimit == 0 && !sp.targetLimitHit {
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user