From ddd46de6f48fef82ade7d2a6f0a3cf35118c5383 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 9 Apr 2018 17:18:25 +0300 Subject: [PATCH] Races/3994 (#4005) Fix race by properly locking access to scrape pools. Use separate mutex for information needed by UI so that UI isn't blocked when targets are being updated. --- scrape/manager.go | 126 +++++++++++++++++++++-------------------- scrape/scrape.go | 13 ++++- web/api/v1/api.go | 16 +++--- web/api/v1/api_test.go | 4 +- web/web.go | 6 +- 5 files changed, 90 insertions(+), 75 deletions(-) diff --git a/scrape/manager.go b/scrape/manager.go index 64fc100b0..0f2c0d497 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -40,18 +40,25 @@ func NewManager(logger log.Logger, app Appendable) *Manager { scrapeConfigs: make(map[string]*config.ScrapeConfig), scrapePools: make(map[string]*scrapePool), graceShut: make(chan struct{}), + targetsAll: make(map[string][]*Target), } } // Manager maintains a set of scrape pools and manages start/stop cycles // when receiving new target groups form the discovery manager. type Manager struct { - logger log.Logger - append Appendable + logger log.Logger + append Appendable + graceShut chan struct{} + + mtxTargets sync.Mutex // Guards the fields below. + targetsActive []*Target + targetsDropped []*Target + targetsAll map[string][]*Target + + mtxScrape sync.Mutex // Guards the fields below. scrapeConfigs map[string]*config.ScrapeConfig scrapePools map[string]*scrapePool - mtx sync.RWMutex - graceShut chan struct{} } // Run starts background processing to handle target updates and reload the scraping loops. @@ -68,6 +75,9 @@ func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error { // Stop cancels all running scrape pools and blocks until all have exited. func (m *Manager) Stop() { + m.mtxScrape.Lock() + defer m.mtxScrape.Unlock() + for _, sp := range m.scrapePools { sp.stop() } @@ -76,8 +86,9 @@ func (m *Manager) Stop() { // ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg. func (m *Manager) ApplyConfig(cfg *config.Config) error { - m.mtx.Lock() - defer m.mtx.Unlock() + m.mtxScrape.Lock() + defer m.mtxScrape.Unlock() + c := make(map[string]*config.ScrapeConfig) for _, scfg := range cfg.ScrapeConfigs { c[scfg.JobName] = scfg @@ -97,71 +108,66 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error { return nil } -// TargetMap returns map of active and dropped targets and their corresponding scrape config job name. -func (m *Manager) TargetMap() map[string][]*Target { - m.mtx.Lock() - defer m.mtx.Unlock() - - targets := make(map[string][]*Target) - for jobName, sp := range m.scrapePools { - sp.mtx.RLock() - for _, t := range sp.targets { - targets[jobName] = append(targets[jobName], t) - } - targets[jobName] = append(targets[jobName], sp.droppedTargets...) - sp.mtx.RUnlock() - } - - return targets +// TargetsAll returns active and dropped targets grouped by job_name. +func (m *Manager) TargetsAll() map[string][]*Target { + m.mtxTargets.Lock() + defer m.mtxTargets.Unlock() + return m.targetsAll } -// Targets returns the targets currently being scraped. -func (m *Manager) Targets() []*Target { - m.mtx.Lock() - defer m.mtx.Unlock() - - var targets []*Target - for _, p := range m.scrapePools { - p.mtx.RLock() - for _, tt := range p.targets { - targets = append(targets, tt) - } - p.mtx.RUnlock() - } - - return targets +// TargetsActive returns the active targets currently being scraped. +func (m *Manager) TargetsActive() []*Target { + m.mtxTargets.Lock() + defer m.mtxTargets.Unlock() + return m.targetsActive } -// DroppedTargets returns the targets dropped during relabelling. -func (m *Manager) DroppedTargets() []*Target { - m.mtx.Lock() - defer m.mtx.Unlock() - var droppedTargets []*Target - for _, p := range m.scrapePools { - p.mtx.RLock() - droppedTargets = append(droppedTargets, p.droppedTargets...) - p.mtx.RUnlock() +// TargetsDropped returns the dropped targets during relabelling. +func (m *Manager) TargetsDropped() []*Target { + m.mtxTargets.Lock() + defer m.mtxTargets.Unlock() + return m.targetsDropped +} + +func (m *Manager) targetsUpdate(active, dropped map[string][]*Target) { + m.mtxTargets.Lock() + defer m.mtxTargets.Unlock() + + m.targetsAll = make(map[string][]*Target) + m.targetsActive = nil + m.targetsDropped = nil + for jobName, targets := range active { + m.targetsAll[jobName] = append(m.targetsAll[jobName], targets...) + m.targetsActive = append(m.targetsActive, targets...) + + } + for jobName, targets := range dropped { + m.targetsAll[jobName] = append(m.targetsAll[jobName], targets...) + m.targetsDropped = append(m.targetsDropped, targets...) } - return droppedTargets } func (m *Manager) reload(t map[string][]*targetgroup.Group) { + m.mtxScrape.Lock() + defer m.mtxScrape.Unlock() + + tDropped := make(map[string][]*Target) + tActive := make(map[string][]*Target) + for tsetName, tgroup := range t { - scrapeConfig, ok := m.scrapeConfigs[tsetName] - if !ok { - level.Error(m.logger).Log("msg", "error reloading target set", "err", fmt.Sprintf("invalid config id:%v", tsetName)) - continue - } - - // Scrape pool doesn't exist so start a new one. - existing, ok := m.scrapePools[tsetName] - if !ok { - sp := newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName)) + var sp *scrapePool + if existing, ok := m.scrapePools[tsetName]; !ok { + scrapeConfig, ok := m.scrapeConfigs[tsetName] + if !ok { + level.Error(m.logger).Log("msg", "error reloading target set", "err", fmt.Sprintf("invalid config id:%v", tsetName)) + continue + } + sp = newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName)) m.scrapePools[tsetName] = sp - sp.Sync(tgroup) - } else { - existing.Sync(tgroup) + sp = existing } + tActive[tsetName], tDropped[tsetName] = sp.Sync(tgroup) } + m.targetsUpdate(tActive, tDropped) } diff --git a/scrape/scrape.go b/scrape/scrape.go index 1c3adfb72..609ff058e 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -245,8 +245,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { } // Sync converts target groups into actual scrape targets and synchronizes -// the currently running scraper with the resulting set. -func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { +// the currently running scraper with the resulting set and returns all scraped and dropped targets. +func (sp *scrapePool) Sync(tgs []*targetgroup.Group) (tActive []*Target, tDropped []*Target) { start := time.Now() var all []*Target @@ -273,6 +273,15 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { time.Since(start).Seconds(), ) targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc() + + sp.mtx.RLock() + for _, t := range sp.targets { + tActive = append(tActive, t) + } + tDropped = sp.droppedTargets + sp.mtx.RUnlock() + + return tActive, tDropped } // sync takes a list of potentially duplicated targets, deduplicates them, starts diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 50f4ed505..db2da462f 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -82,8 +82,8 @@ func (e *apiError) Error() string { } type targetRetriever interface { - Targets() []*scrape.Target - DroppedTargets() []*scrape.Target + TargetsActive() []*scrape.Target + TargetsDropped() []*scrape.Target } type alertmanagerRetriever interface { @@ -452,11 +452,12 @@ type TargetDiscovery struct { } func (api *API) targets(r *http.Request) (interface{}, *apiError) { - targets := api.targetRetriever.Targets() - droppedTargets := api.targetRetriever.DroppedTargets() - res := &TargetDiscovery{ActiveTargets: make([]*Target, len(targets)), DroppedTargets: make([]*DroppedTarget, len(droppedTargets))} + tActive := api.targetRetriever.TargetsActive() + tDropped := api.targetRetriever.TargetsDropped() + res := &TargetDiscovery{ActiveTargets: make([]*Target, len(tActive)), DroppedTargets: make([]*DroppedTarget, len(tDropped))} + + for i, t := range tActive { - for i, t := range targets { lastErrStr := "" lastErr := t.LastError() if lastErr != nil { @@ -473,12 +474,11 @@ func (api *API) targets(r *http.Request) (interface{}, *apiError) { } } - for i, t := range droppedTargets { + for i, t := range tDropped { res.DroppedTargets[i] = &DroppedTarget{ DiscoveredLabels: t.DiscoveredLabels().Map(), } } - return res, nil } diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index e4128b1b4..921945b34 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -44,7 +44,7 @@ import ( type testTargetRetriever struct{} -func (t testTargetRetriever) Targets() []*scrape.Target { +func (t testTargetRetriever) TargetsActive() []*scrape.Target { return []*scrape.Target{ scrape.NewTarget( labels.FromMap(map[string]string{ @@ -57,7 +57,7 @@ func (t testTargetRetriever) Targets() []*scrape.Target { ), } } -func (t testTargetRetriever) DroppedTargets() []*scrape.Target { +func (t testTargetRetriever) TargetsDropped() []*scrape.Target { return []*scrape.Target{ scrape.NewTarget( nil, diff --git a/web/web.go b/web/web.go index fe68e62eb..d7e00441b 100644 --- a/web/web.go +++ b/web/web.go @@ -404,7 +404,7 @@ func (h *Handler) Run(ctx context.Context) error { h.options.QueryEngine, h.options.Storage.Querier, func() []*scrape.Target { - return h.options.ScrapeManager.Targets() + return h.options.ScrapeManager.TargetsActive() }, func() []*url.URL { return h.options.Notifier.Alertmanagers() @@ -592,7 +592,7 @@ func (h *Handler) rules(w http.ResponseWriter, r *http.Request) { func (h *Handler) serviceDiscovery(w http.ResponseWriter, r *http.Request) { var index []string - targets := h.scrapeManager.TargetMap() + targets := h.scrapeManager.TargetsAll() for job := range targets { index = append(index, job) } @@ -610,7 +610,7 @@ func (h *Handler) serviceDiscovery(w http.ResponseWriter, r *http.Request) { func (h *Handler) targets(w http.ResponseWriter, r *http.Request) { // Bucket targets by job label tps := map[string][]*scrape.Target{} - for _, t := range h.scrapeManager.Targets() { + for _, t := range h.scrapeManager.TargetsActive() { job := t.Labels().Get(model.JobLabel) tps[job] = append(tps[job], t) }