From f5c2c5ff8fc8e21f9bfaa23d0f81c8245ba8632e Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 27 Nov 2017 01:59:34 +0000 Subject: [PATCH] brake the start provider func so that can run unit tests against it. --- discovery/manager.go | 78 +++++++++++++++++++++++--------------------- 1 file changed, 41 insertions(+), 37 deletions(-) diff --git a/discovery/manager.go b/discovery/manager.go index ea24dc9e7..77c9598d3 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -103,43 +103,7 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error { m.cancelDiscoverers() for _, scfg := range cfg.ScrapeConfigs { for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) { - ctx, cancel := context.WithCancel(m.ctx) - updates := make(chan []*config.TargetGroup) - - m.discoverCancel = append(m.discoverCancel, cancel) - - go prov.Run(ctx, updates) - go func(provName string) { - select { - case <-ctx.Done(): - // First set of all endpoints the provider knows. - case tgs, ok := <-updates: - // Handle the case that a target provider exits and closes the channel - // before the context is done. - if !ok { - break - } - m.syncCh <- m.mergeGroups(scfg.JobName, provName, tgs) - case <-time.After(5 * time.Second): - // Initial set didn't arrive. Act as if it was empty - // and wait for updates later on. - } - - // Start listening for further updates. - for { - select { - case <-ctx.Done(): - return - case tgs, ok := <-updates: - // Handle the case that a target provider exits and closes the channel - // before the context is done. - if !ok { - return - } - m.syncCh <- m.mergeGroups(scfg.JobName, provName, tgs) - } - } - }(provName) + m.startProvider(scfg.JobName, provName, prov) } } close(err) @@ -148,6 +112,46 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error { return <-err } +func (m *Manager) startProvider(jobName, provName string, worker Discoverer) { + ctx, cancel := context.WithCancel(m.ctx) + updates := make(chan []*config.TargetGroup) + + m.discoverCancel = append(m.discoverCancel, cancel) + + go worker.Run(ctx, updates) + go func(provName string) { + select { + case <-ctx.Done(): + // First set of all endpoints the provider knows. + case tgs, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + break + } + m.syncCh <- m.mergeGroups(jobName, provName, tgs) + case <-time.After(5 * time.Second): + // Initial set didn't arrive. Act as if it was empty + // and wait for updates later on. + } + + // Start listening for further updates. + for { + select { + case <-ctx.Done(): + return + case tgs, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + return + } + m.syncCh <- m.mergeGroups(jobName, provName, tgs) + } + } + }(provName) +} + func (m *Manager) cancelDiscoverers() { for _, c := range m.discoverCancel { c()