diff --git a/retrieval/target.go b/retrieval/target.go index 7dd8cbd2d..43c19ca4e 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -167,7 +167,6 @@ type Target struct { scraperStopping chan struct{} // Closing scraperStopped signals that scraping has been stopped. scraperStopped chan struct{} - running bool // Mutex protects the members below. sync.RWMutex @@ -388,11 +387,9 @@ func (t *Target) InstanceIdentifier() string { // RunScraper implements Target. func (t *Target) RunScraper(sampleAppender storage.SampleAppender) { - defer close(t.scraperStopped) + log.Debugf("Running scraper for %v", t) - t.Lock() - t.running = true - t.Unlock() + defer close(t.scraperStopped) lastScrapeInterval := t.interval() @@ -452,14 +449,6 @@ func (t *Target) RunScraper(sampleAppender storage.SampleAppender) { // StopScraper implements Target. func (t *Target) StopScraper() { - t.Lock() - if !t.running { - t.Unlock() - return - } - t.running = false - t.Unlock() - log.Debugf("Stopping scraper for target %v...", t) close(t.scraperStopping) diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index ceb880b0d..6f2cd67f5 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -73,38 +73,9 @@ func (tm *TargetManager) Run() { log.Info("Starting target manager...") tm.mtx.Lock() + tm.ctx, tm.cancel = context.WithCancel(context.Background()) - - jobs := map[string]struct{}{} - - // Start new target sets and update existing ones. - for _, scfg := range tm.scrapeConfigs { - jobs[scfg.JobName] = struct{}{} - - ts, ok := tm.targetSets[scfg.JobName] - if !ok { - ts = newTargetSet(scfg, tm.appender) - tm.targetSets[scfg.JobName] = ts - } - ts.runProviders(tm.ctx, providersFromConfig(scfg)) - } - - // Stop old target sets. - for name := range tm.targetSets { - if _, ok := jobs[name]; !ok { - delete(tm.targetSets, name) - } - } - - // Run target sets. - for _, ts := range tm.targetSets { - tm.wg.Add(1) - - go func(ts *targetSet) { - ts.run(tm.ctx) - tm.wg.Done() - }(ts) - } + tm.reload() tm.mtx.Unlock() @@ -128,6 +99,38 @@ func (tm *TargetManager) Stop() { log.Debugln("Target manager stopped") } +func (tm *TargetManager) reload() { + jobs := map[string]struct{}{} + + // Start new target sets and update existing ones. + for _, scfg := range tm.scrapeConfigs { + jobs[scfg.JobName] = struct{}{} + + ts, ok := tm.targetSets[scfg.JobName] + if !ok { + ts = newTargetSet(scfg, tm.appender) + tm.targetSets[scfg.JobName] = ts + + tm.wg.Add(1) + + go func(ts *targetSet) { + ts.runScraping(tm.ctx) + tm.wg.Done() + }(ts) + } + ts.runProviders(tm.ctx, providersFromConfig(scfg)) + } + + // Remove old target sets. Waiting for stopping is already guaranteed + // by the goroutine that started the target set. + for name, ts := range tm.targetSets { + if _, ok := jobs[name]; !ok { + ts.cancel() + delete(tm.targetSets, name) + } + } +} + // Pools returns the targets currently being scraped bucketed by their job name. func (tm *TargetManager) Pools() map[string][]*Target { tm.mtx.RLock() @@ -151,21 +154,14 @@ func (tm *TargetManager) Pools() map[string][]*Target { // by the new cfg. The state of targets that are valid in the new configuration remains unchanged. // Returns true on success. func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool { - tm.mtx.RLock() - running := tm.ctx != nil - tm.mtx.RUnlock() - - if running { - tm.Stop() - defer func() { - go tm.Run() - }() - } - tm.mtx.Lock() - tm.scrapeConfigs = cfg.ScrapeConfigs - tm.mtx.Unlock() + defer tm.mtx.Unlock() + tm.scrapeConfigs = cfg.ScrapeConfigs + + if tm.ctx != nil { + tm.reload() + } return true } @@ -180,8 +176,9 @@ type targetSet struct { scrapePool *scrapePool config *config.ScrapeConfig - stopProviders func() - syncCh chan struct{} + syncCh chan struct{} + cancelScraping func() + cancelProviders func() } func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet { @@ -194,7 +191,21 @@ func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetS return ts } -func (ts *targetSet) run(ctx context.Context) { +func (ts *targetSet) cancel() { + ts.mtx.RLock() + defer ts.mtx.RUnlock() + + if ts.cancelScraping != nil { + ts.cancelScraping() + } + if ts.cancelProviders != nil { + ts.cancelProviders() + } +} + +func (ts *targetSet) runScraping(ctx context.Context) { + ctx, ts.cancelScraping = context.WithCancel(ctx) + ts.scrapePool.ctx = ctx Loop: @@ -234,10 +245,10 @@ func (ts *targetSet) runProviders(ctx context.Context, providers map[string]Targ var wg sync.WaitGroup - if ts.stopProviders != nil { - ts.stopProviders() + if ts.cancelProviders != nil { + ts.cancelProviders() } - ctx, ts.stopProviders = context.WithCancel(ctx) + ctx, ts.cancelProviders = context.WithCancel(ctx) for name, prov := range providers { wg.Add(1) @@ -373,7 +384,7 @@ func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) { go tnew.RunScraper(sp.appender) } } - for fp, told := range targets { + for fp, told := range prevTargets { // A previous target is no longer in the group. if _, ok := targets[fp]; !ok { wg.Add(1) @@ -388,7 +399,7 @@ func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) { // Stop scrapers for target groups that disappeared completely. for source, targets := range sp.tgroups { - if _, ok := tgroups[source]; !ok { + if _, ok := tgroups[source]; ok { continue } for _, told := range targets {