diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index f09f0ce42..757ec5e85 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -231,12 +231,11 @@ func main() { cfg.queryEngine.Logger = log.With(logger, "component", "query engine") var ( - ctxWeb, cancelWeb = context.WithCancel(context.Background()) - ctxDiscovery, cancelDiscovery = context.WithCancel(context.Background()) - ctxRule = context.Background() + ctxWeb, cancelWeb = context.WithCancel(context.Background()) + ctxRule = context.Background() notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier")) - discoveryManager = discovery.NewManager(ctxDiscovery, log.With(logger, "component", "discovery manager")) + discoveryManager = discovery.NewManager(log.With(logger, "component", "discovery manager")) scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage) queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) ruleManager := rules.NewManager(&rules.ManagerOptions{ @@ -333,9 +332,10 @@ func main() { ) } { + ctxDiscovery, cancelDiscovery := context.WithCancel(context.Background()) g.Add( func() error { - err := discoveryManager.Run() + err := discoveryManager.Run(ctxDiscovery) level.Info(logger).Log("msg", "Discovery manager stopped") return err }, diff --git a/discovery/manager.go b/discovery/manager.go index fa8cb183e..3cb68d060 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -58,11 +58,10 @@ type Discoverer interface { // } // NewManager is the Discovery Manager constructor -func NewManager(ctx context.Context, logger log.Logger) *Manager { +func NewManager(logger log.Logger) *Manager { return &Manager{ - ctx: ctx, logger: logger, - actionCh: make(chan func()), + actionCh: make(chan func(context.Context)), syncCh: make(chan map[string][]*config.TargetGroup), targets: make(map[string]map[string][]*config.TargetGroup), discoverCancel: []context.CancelFunc{}, @@ -70,24 +69,25 @@ func NewManager(ctx context.Context, logger log.Logger) *Manager { } // Manager maintains a set of discovery providers and sends each update to a channel used by other packages. +// The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config. +// Targets pool is kept in a map with a format map[targetSetName]map[providerName]. type Manager struct { - ctx context.Context logger log.Logger - syncCh chan map[string][]*config.TargetGroup // map[targetSetName] - actionCh chan func() + syncCh chan map[string][]*config.TargetGroup + actionCh chan func(context.Context) discoverCancel []context.CancelFunc - targets map[string]map[string][]*config.TargetGroup // map[targetSetName]map[providerName] + targets map[string]map[string][]*config.TargetGroup } // Run starts the background processing -func (m *Manager) Run() error { +func (m *Manager) Run(ctx context.Context) error { for { select { case f := <-m.actionCh: - f() - case <-m.ctx.Done(): + f(ctx) + case <-ctx.Done(): m.cancelDiscoverers() - return m.ctx.Err() + return ctx.Err() } } } @@ -100,11 +100,11 @@ func (m *Manager) SyncCh() <-chan map[string][]*config.TargetGroup { // ApplyConfig removes all running discovery providers and starts new ones using the provided config. func (m *Manager) ApplyConfig(cfg *config.Config) error { err := make(chan error) - m.actionCh <- func() { + m.actionCh <- func(ctx context.Context) { m.cancelDiscoverers() for _, scfg := range cfg.ScrapeConfigs { for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) { - m.startProvider(scfg.JobName, provName, prov) + m.startProvider(ctx, scfg.JobName, provName, prov) } } close(err) @@ -113,28 +113,31 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error { return <-err } -func (m *Manager) startProvider(jobName, provName string, worker Discoverer) { - ctx, cancel := context.WithCancel(m.ctx) +func (m *Manager) startProvider(ctx context.Context, jobName, provName string, worker Discoverer) { + ctx, cancel := context.WithCancel(ctx) updates := make(chan []*config.TargetGroup) m.discoverCancel = append(m.discoverCancel, cancel) go worker.Run(ctx, updates) - go func(provName string) { - for { - select { - case <-ctx.Done(): + go m.runProvider(ctx, provName, jobName, updates) +} + +func (m *Manager) runProvider(ctx context.Context, provName, jobName string, updates chan []*config.TargetGroup) { + for { + select { + case <-ctx.Done(): + return + case tgs, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { return - case tgs, ok := <-updates: - // Handle the case that a target provider exits and closes the channel - // before the context is done. - if !ok { - return - } - m.syncCh <- m.mergeGroups(jobName, provName, tgs) } + m.addGroup(jobName, provName, tgs) + m.syncCh <- m.allGroups(jobName) } - }(provName) + } } func (m *Manager) cancelDiscoverers() { @@ -142,25 +145,33 @@ func (m *Manager) cancelDiscoverers() { c() } m.targets = make(map[string]map[string][]*config.TargetGroup) - m.discoverCancel = []context.CancelFunc{} + m.discoverCancel = nil } -// mergeGroups adds a new target group for a given discovery provider and returns all target groups for a given target set -func (m *Manager) mergeGroups(tsName, provName string, tg []*config.TargetGroup) map[string][]*config.TargetGroup { - tset := make(chan map[string][]*config.TargetGroup) +func (m *Manager) addGroup(tsName, provName string, tg []*config.TargetGroup) { + done := make(chan struct{}) - m.actionCh <- func() { + m.actionCh <- func(ctx context.Context) { if m.targets[tsName] == nil { m.targets[tsName] = make(map[string][]*config.TargetGroup) } - m.targets[tsName][provName] = []*config.TargetGroup{} if tg != nil { m.targets[tsName][provName] = tg } + close(done) + + } + <-done +} + +func (m *Manager) allGroups(tsName string) map[string][]*config.TargetGroup { + tset := make(chan map[string][]*config.TargetGroup) + + m.actionCh <- func(ctx context.Context) { tgAll := []*config.TargetGroup{} - // Sort the providers alphabetically. + // Sorting the providers is needed so that we can have predictable tests. // Maps cannot be sorted so need to extract the keys to a slice and sort the string slice. var providerNames []string for providerName := range m.targets[tsName] { @@ -179,6 +190,7 @@ func (m *Manager) mergeGroups(tsName, provName string, tg []*config.TargetGroup) tset <- t } return <-tset + } func (m *Manager) providersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]Discoverer { diff --git a/discovery/manager_test.go b/discovery/manager_test.go index c4ddac462..3e5b41f47 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -113,13 +113,16 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { }, }, expectedTargets: [][]*config.TargetGroup{ - {{ - Source: "initial1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, { - Source: "initial2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }}, + { + { + Source: "initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, }, }, { @@ -131,7 +134,8 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { Source: "tp1-initial1", Targets: []model.LabelSet{{"__instance__": "1"}}, - }, { + }, + { Source: "tp1-initial2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, @@ -140,10 +144,12 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { }, "tp2": { { - targetGroups: []config.TargetGroup{{ - Source: "tp2-initial1", - Targets: []model.LabelSet{{"__instance__": "3"}}, - }}, + targetGroups: []config.TargetGroup{ + { + Source: "tp2-initial1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + }, interval: 10, }, }, @@ -153,7 +159,8 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { Source: "tp1-initial1", Targets: []model.LabelSet{{"__instance__": "1"}}, - }, { + }, + { Source: "tp1-initial2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, @@ -161,7 +168,8 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { Source: "tp1-initial1", Targets: []model.LabelSet{{"__instance__": "1"}}, - }, { + }, + { Source: "tp1-initial2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, @@ -576,13 +584,13 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { for testIndex, testCase := range testCases { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, nil) - go discoveryManager.Run() + discoveryManager := NewManager(nil) + go discoveryManager.Run(ctx) var totalUpdatesCount int for tpName, update := range testCase.updates { provider := newMockDiscoveryProvider(update) - discoveryManager.startProvider(strconv.Itoa(testIndex), tpName, provider) + discoveryManager.startProvider(ctx, strconv.Itoa(testIndex), tpName, provider) if len(update) > 0 { totalUpdatesCount = totalUpdatesCount + len(update) @@ -603,8 +611,8 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { receivedFormated = receivedFormated + receivedTargets.Source + ":" + fmt.Sprint(receivedTargets.Targets) } var expectedFormated string - for _, receivedTargets := range testCase.expectedTargets[x] { - expectedFormated = expectedFormated + receivedTargets.Source + ":" + fmt.Sprint(receivedTargets.Targets) + for _, expectedTargets := range testCase.expectedTargets[x] { + expectedFormated = expectedFormated + expectedTargets.Source + ":" + fmt.Sprint(expectedTargets.Targets) } t.Errorf("%v. %v: \ntargets mismatch \nreceived: %v \nexpected: %v", @@ -664,8 +672,8 @@ scrape_configs: } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, nil) - go discoveryManager.Run() + discoveryManager := NewManager(nil) + go discoveryManager.Run(ctx) discoveryManager.ApplyConfig(cfg) diff --git a/retrieval/manager.go b/retrieval/manager.go index 4f1f9775c..e1bfc0ff9 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -41,7 +41,7 @@ func NewScrapeManager(logger log.Logger, app Appendable) *ScrapeManager { } } -// ScrapeManager maintains a set of scrape pools and manages start/stop cicles +// ScrapeManager maintains a set of scrape pools and manages start/stop cycles // when receiving new target groups form the discovery manager. type ScrapeManager struct { logger log.Logger