diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 132605c36..c60f5150b 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -231,16 +231,24 @@ func main() { cfg.queryEngine.Logger = log.With(logger, "component", "query engine") var ( - notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier")) - ctxDiscovery, cancelDiscovery = context.WithCancel(context.Background()) - discoveryManager = discovery.NewDiscoveryManager(ctxDiscovery, log.With(logger, "component", "discovery manager")) - ctxScrape, cancelScrape = context.WithCancel(context.Background()) - scrapeManager = retrieval.NewScrapeManager(ctxScrape, log.With(logger, "component", "scrape manager"), fanoutStorage) - queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) ctxWeb, cancelWeb = context.WithCancel(context.Background()) - webHandler = web.New(log.With(logger, "component", "web"), &cfg.web) + ctxDiscovery, cancelDiscovery = context.WithCancel(context.Background()) + ctxRule = context.Background() + + notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier")) + discoveryManager = discovery.NewDiscoveryManager(ctxDiscovery, log.With(logger, "component", "discovery manager")) + scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage) + queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) + ruleManager = rules.NewManager(&rules.ManagerOptions{Appendable: fanoutStorage, + Notifier: notifier, + QueryEngine: queryEngine, + Context: ctxRule, + ExternalURL: cfg.web.ExternalURL, + Logger: log.With(logger, "component", "rule manager"), + }) ) +<<<<<<< HEAD ctx := context.Background() ruleManager := rules.NewManager(&rules.ManagerOptions{ Appendable: fanoutStorage, @@ -253,6 +261,9 @@ func main() { }) cfg.web.Context = ctx +======= + cfg.web.Context = ctxWeb +>>>>>>> 95b1dec3... scrape pool doesn't rely on context as Stop() needs to be blocking to prevent Scrape loops trying to write to a closed TSDB storage. cfg.web.TSDB = localStorage.Get cfg.web.Storage = fanoutStorage cfg.web.QueryEngine = queryEngine @@ -274,6 +285,9 @@ func main() { cfg.web.Flags[f.Name] = f.Value.String() } + // Depend on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager + webHandler := web.New(log.With(logger, "component", "web"), &cfg.web) + // Monitor outgoing connections on default transport with conntrack. http.DefaultTransport.(*http.Transport).DialContext = conntrack.NewDialContextFunc( conntrack.DialWithTracing(), @@ -310,17 +324,6 @@ func main() { var g group.Group { - g.Add( - func() error { - err := discoveryManager.Run() - level.Info(logger).Log("msg", "Discovery manager stopped") - return err - }, - func(err error) { - level.Info(logger).Log("msg", "Stopping discovery manager...") - cancelDiscovery() - }, - ) term := make(chan os.Signal) signal.Notify(term, os.Interrupt, syscall.SIGTERM) cancel := make(chan struct{}) @@ -341,6 +344,34 @@ func main() { }, ) } + { + g.Add( + func() error { + err := discoveryManager.Run() + level.Info(logger).Log("msg", "Discovery manager stopped") + return err + }, + func(err error) { + level.Info(logger).Log("msg", "Stopping discovery manager...") + cancelDiscovery() + }, + ) + } + { + g.Add( + func() error { + err := scrapeManager.Run(discoveryManager.SyncCh()) + level.Info(logger).Log("msg", "Scrape manager stopped") + return err + }, + func(err error) { + // Scrape manager needs to be stopped before closing the local TSDB + // so that it doesn't try to write samples to a closed storage. + level.Info(logger).Log("msg", "Stopping scrape manager...") + scrapeManager.Stop() + }, + ) + } { // Make sure that sighup handler is registered with a redirect to the channel before the potentially // long and synchronous tsdb init. @@ -482,19 +513,6 @@ func main() { }, ) } - { - g.Add( - func() error { - err := scrapeManager.Run(discoveryManager.SyncCh()) - level.Info(logger).Log("msg", "Scrape manager stopped") - return err - }, - func(err error) { - level.Info(logger).Log("msg", "Stopping scrape manager...") - cancelScrape() - }, - ) - } if err := g.Run(); err != nil { level.Error(logger).Log("err", err) } diff --git a/discovery/manager.go b/discovery/manager.go index 7119c5470..070c5b900 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -87,7 +87,6 @@ func (m *DiscoveryManager) Run() error { return m.ctx.Err() } } - } // SyncCh returns a read only channel used by all DiscoveryProviders targetSet updates diff --git a/retrieval/manager.go b/retrieval/manager.go index 694d14b97..dec7f8d9b 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -14,7 +14,6 @@ package retrieval import ( - "context" "fmt" "github.com/go-kit/kit/log" @@ -30,27 +29,27 @@ type Appendable interface { } // NewScrapeManager is the ScrapeManager constructor -func NewScrapeManager(ctx context.Context, logger log.Logger, app Appendable) *ScrapeManager { +func NewScrapeManager(logger log.Logger, app Appendable) *ScrapeManager { return &ScrapeManager{ - ctx: ctx, append: app, logger: logger, actionCh: make(chan func()), scrapeConfigs: make(map[string]*config.ScrapeConfig), scrapePools: make(map[string]*scrapePool), + graceShut: make(chan struct{}), } } // ScrapeManager maintains a set of scrape pools and manages start/stop cicles // when receiving new target groups form the discovery manager. type ScrapeManager struct { - ctx context.Context logger log.Logger append Appendable scrapeConfigs map[string]*config.ScrapeConfig scrapePools map[string]*scrapePool actionCh chan func() + graceShut chan struct{} } // Run starts background processing to handle target updates and reload the scraping loops. @@ -63,12 +62,20 @@ func (m *ScrapeManager) Run(tsets <-chan map[string][]*config.TargetGroup) error f() case ts := <-tsets: m.reload(ts) - case <-m.ctx.Done(): - return m.ctx.Err() + case <-m.graceShut: + return nil } } } +// Stop cancels all running scrape pools and blocks until all have exited. +func (m *ScrapeManager) Stop() { + for _, sp := range m.scrapePools { + sp.stop() + } + close(m.graceShut) +} + // ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg. func (m *ScrapeManager) ApplyConfig(cfg *config.Config) error { done := make(chan struct{}) @@ -123,10 +130,10 @@ func (m *ScrapeManager) reload(t map[string][]*config.TargetGroup) error { return fmt.Errorf("target set '%v' doesn't have valid config", tsetName) } - // scrape pool doesn't exist so start a new one + // Scrape pool doesn't exist so start a new one. existing, ok := m.scrapePools[tsetName] if !ok { - sp := newScrapePool(m.ctx, scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName)) + sp := newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName)) m.scrapePools[tsetName] = sp sp.Sync(tgroup) @@ -134,7 +141,7 @@ func (m *ScrapeManager) reload(t map[string][]*config.TargetGroup) error { existing.Sync(tgroup) } - // cleanup - check config and cancel the scrape loops if it don't exist in the scrape config + // Cleanup - check the config and cancel the scrape loops if it don't exist in the scrape config. jobs := make(map[string]struct{}) for k := range m.scrapeConfigs { diff --git a/retrieval/scrape.go b/retrieval/scrape.go index ed1aac14d..62caadade 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -117,7 +117,6 @@ func init() { type scrapePool struct { appendable Appendable logger log.Logger - ctx context.Context mtx sync.RWMutex config *config.ScrapeConfig @@ -136,7 +135,7 @@ const maxAheadTime = 10 * time.Minute type labelsMutator func(labels.Labels) labels.Labels -func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool { +func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool { if logger == nil { logger = log.NewNopLogger() } @@ -152,14 +151,15 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable sp := &scrapePool{ appendable: app, config: cfg, - ctx: ctx, client: client, targets: map[uint64]*Target{}, loops: map[uint64]loop{}, logger: logger, } sp.newLoop = func(t *Target, s scraper) loop { - return newScrapeLoop(sp.ctx, s, + return newScrapeLoop( + context.Background(), + s, log.With(logger, "target", t), buffers, func(l labels.Labels) labels.Labels { return sp.mutateSampleLabels(l, t) }, @@ -189,7 +189,6 @@ func (sp *scrapePool) stop() { delete(sp.loops, fp) delete(sp.targets, fp) } - wg.Wait() } @@ -582,8 +581,7 @@ func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) { } } -func newScrapeLoop( - ctx context.Context, +func newScrapeLoop(ctx context.Context, sc scraper, l log.Logger, buffers *pool.BytesPool, @@ -605,8 +603,8 @@ func newScrapeLoop( sampleMutator: sampleMutator, reportSampleMutator: reportSampleMutator, stopped: make(chan struct{}), - ctx: ctx, l: l, + ctx: ctx, } sl.scrapeCtx, sl.cancel = context.WithCancel(ctx) diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index e43ed80a7..cc2cfc121 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -44,7 +44,7 @@ func TestNewScrapePool(t *testing.T) { var ( app = &nopAppendable{} cfg = &config.ScrapeConfig{} - sp = newScrapePool(context.Background(), cfg, app, nil) + sp = newScrapePool(cfg, app, nil) ) if a, ok := sp.appendable.(*nopAppendable); !ok || a != app { @@ -231,7 +231,7 @@ func TestScrapePoolReload(t *testing.T) { func TestScrapePoolAppender(t *testing.T) { cfg := &config.ScrapeConfig{} app := &nopAppendable{} - sp := newScrapePool(context.Background(), cfg, app, nil) + sp := newScrapePool(cfg, app, nil) wrapped := sp.appender()