rearange the contexts variables and logic

split the groupsMerge function to set and get
other small nits
This commit is contained in:
Krasi Georgiev 2017-12-01 12:59:24 +00:00
parent 6ff1d5c51e
commit 1ec76d1950
4 changed files with 81 additions and 61 deletions

View File

@ -231,12 +231,11 @@ func main() {
cfg.queryEngine.Logger = log.With(logger, "component", "query engine") cfg.queryEngine.Logger = log.With(logger, "component", "query engine")
var ( var (
ctxWeb, cancelWeb = context.WithCancel(context.Background()) ctxWeb, cancelWeb = context.WithCancel(context.Background())
ctxDiscovery, cancelDiscovery = context.WithCancel(context.Background()) ctxRule = context.Background()
ctxRule = context.Background()
notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier")) notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier"))
discoveryManager = discovery.NewManager(ctxDiscovery, log.With(logger, "component", "discovery manager")) discoveryManager = discovery.NewManager(log.With(logger, "component", "discovery manager"))
scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage) scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine)
ruleManager := rules.NewManager(&rules.ManagerOptions{ ruleManager := rules.NewManager(&rules.ManagerOptions{
@ -333,9 +332,10 @@ func main() {
) )
} }
{ {
ctxDiscovery, cancelDiscovery := context.WithCancel(context.Background())
g.Add( g.Add(
func() error { func() error {
err := discoveryManager.Run() err := discoveryManager.Run(ctxDiscovery)
level.Info(logger).Log("msg", "Discovery manager stopped") level.Info(logger).Log("msg", "Discovery manager stopped")
return err return err
}, },

View File

@ -58,11 +58,10 @@ type Discoverer interface {
// } // }
// NewManager is the Discovery Manager constructor // NewManager is the Discovery Manager constructor
func NewManager(ctx context.Context, logger log.Logger) *Manager { func NewManager(logger log.Logger) *Manager {
return &Manager{ return &Manager{
ctx: ctx,
logger: logger, logger: logger,
actionCh: make(chan func()), actionCh: make(chan func(context.Context)),
syncCh: make(chan map[string][]*config.TargetGroup), syncCh: make(chan map[string][]*config.TargetGroup),
targets: make(map[string]map[string][]*config.TargetGroup), targets: make(map[string]map[string][]*config.TargetGroup),
discoverCancel: []context.CancelFunc{}, discoverCancel: []context.CancelFunc{},
@ -70,24 +69,25 @@ func NewManager(ctx context.Context, logger log.Logger) *Manager {
} }
// Manager maintains a set of discovery providers and sends each update to a channel used by other packages. // Manager maintains a set of discovery providers and sends each update to a channel used by other packages.
// The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config.
// Targets pool is kept in a map with a format map[targetSetName]map[providerName].
type Manager struct { type Manager struct {
ctx context.Context
logger log.Logger logger log.Logger
syncCh chan map[string][]*config.TargetGroup // map[targetSetName] syncCh chan map[string][]*config.TargetGroup
actionCh chan func() actionCh chan func(context.Context)
discoverCancel []context.CancelFunc discoverCancel []context.CancelFunc
targets map[string]map[string][]*config.TargetGroup // map[targetSetName]map[providerName] targets map[string]map[string][]*config.TargetGroup
} }
// Run starts the background processing // Run starts the background processing
func (m *Manager) Run() error { func (m *Manager) Run(ctx context.Context) error {
for { for {
select { select {
case f := <-m.actionCh: case f := <-m.actionCh:
f() f(ctx)
case <-m.ctx.Done(): case <-ctx.Done():
m.cancelDiscoverers() m.cancelDiscoverers()
return m.ctx.Err() return ctx.Err()
} }
} }
} }
@ -100,11 +100,11 @@ func (m *Manager) SyncCh() <-chan map[string][]*config.TargetGroup {
// ApplyConfig removes all running discovery providers and starts new ones using the provided config. // ApplyConfig removes all running discovery providers and starts new ones using the provided config.
func (m *Manager) ApplyConfig(cfg *config.Config) error { func (m *Manager) ApplyConfig(cfg *config.Config) error {
err := make(chan error) err := make(chan error)
m.actionCh <- func() { m.actionCh <- func(ctx context.Context) {
m.cancelDiscoverers() m.cancelDiscoverers()
for _, scfg := range cfg.ScrapeConfigs { for _, scfg := range cfg.ScrapeConfigs {
for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) { for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) {
m.startProvider(scfg.JobName, provName, prov) m.startProvider(ctx, scfg.JobName, provName, prov)
} }
} }
close(err) close(err)
@ -113,28 +113,31 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error {
return <-err return <-err
} }
func (m *Manager) startProvider(jobName, provName string, worker Discoverer) { func (m *Manager) startProvider(ctx context.Context, jobName, provName string, worker Discoverer) {
ctx, cancel := context.WithCancel(m.ctx) ctx, cancel := context.WithCancel(ctx)
updates := make(chan []*config.TargetGroup) updates := make(chan []*config.TargetGroup)
m.discoverCancel = append(m.discoverCancel, cancel) m.discoverCancel = append(m.discoverCancel, cancel)
go worker.Run(ctx, updates) go worker.Run(ctx, updates)
go func(provName string) { go m.runProvider(ctx, provName, jobName, updates)
for { }
select {
case <-ctx.Done(): func (m *Manager) runProvider(ctx context.Context, provName, jobName string, updates chan []*config.TargetGroup) {
for {
select {
case <-ctx.Done():
return
case tgs, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
return return
case tgs, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
return
}
m.syncCh <- m.mergeGroups(jobName, provName, tgs)
} }
m.addGroup(jobName, provName, tgs)
m.syncCh <- m.allGroups(jobName)
} }
}(provName) }
} }
func (m *Manager) cancelDiscoverers() { func (m *Manager) cancelDiscoverers() {
@ -142,25 +145,33 @@ func (m *Manager) cancelDiscoverers() {
c() c()
} }
m.targets = make(map[string]map[string][]*config.TargetGroup) m.targets = make(map[string]map[string][]*config.TargetGroup)
m.discoverCancel = []context.CancelFunc{} m.discoverCancel = nil
} }
// mergeGroups adds a new target group for a given discovery provider and returns all target groups for a given target set func (m *Manager) addGroup(tsName, provName string, tg []*config.TargetGroup) {
func (m *Manager) mergeGroups(tsName, provName string, tg []*config.TargetGroup) map[string][]*config.TargetGroup { done := make(chan struct{})
tset := make(chan map[string][]*config.TargetGroup)
m.actionCh <- func() { m.actionCh <- func(ctx context.Context) {
if m.targets[tsName] == nil { if m.targets[tsName] == nil {
m.targets[tsName] = make(map[string][]*config.TargetGroup) m.targets[tsName] = make(map[string][]*config.TargetGroup)
} }
m.targets[tsName][provName] = []*config.TargetGroup{}
if tg != nil { if tg != nil {
m.targets[tsName][provName] = tg m.targets[tsName][provName] = tg
} }
close(done)
}
<-done
}
func (m *Manager) allGroups(tsName string) map[string][]*config.TargetGroup {
tset := make(chan map[string][]*config.TargetGroup)
m.actionCh <- func(ctx context.Context) {
tgAll := []*config.TargetGroup{} tgAll := []*config.TargetGroup{}
// Sort the providers alphabetically. // Sorting the providers is needed so that we can have predictable tests.
// Maps cannot be sorted so need to extract the keys to a slice and sort the string slice. // Maps cannot be sorted so need to extract the keys to a slice and sort the string slice.
var providerNames []string var providerNames []string
for providerName := range m.targets[tsName] { for providerName := range m.targets[tsName] {
@ -179,6 +190,7 @@ func (m *Manager) mergeGroups(tsName, provName string, tg []*config.TargetGroup)
tset <- t tset <- t
} }
return <-tset return <-tset
} }
func (m *Manager) providersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]Discoverer { func (m *Manager) providersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]Discoverer {

View File

@ -113,13 +113,16 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
}, },
}, },
expectedTargets: [][]*config.TargetGroup{ expectedTargets: [][]*config.TargetGroup{
{{ {
Source: "initial1", {
Targets: []model.LabelSet{{"__instance__": "1"}}, Source: "initial1",
}, { Targets: []model.LabelSet{{"__instance__": "1"}},
Source: "initial2", },
Targets: []model.LabelSet{{"__instance__": "2"}}, {
}}, Source: "initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
}, },
}, },
{ {
@ -131,7 +134,8 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
{ {
Source: "tp1-initial1", Source: "tp1-initial1",
Targets: []model.LabelSet{{"__instance__": "1"}}, Targets: []model.LabelSet{{"__instance__": "1"}},
}, { },
{
Source: "tp1-initial2", Source: "tp1-initial2",
Targets: []model.LabelSet{{"__instance__": "2"}}, Targets: []model.LabelSet{{"__instance__": "2"}},
}, },
@ -140,10 +144,12 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
}, },
"tp2": { "tp2": {
{ {
targetGroups: []config.TargetGroup{{ targetGroups: []config.TargetGroup{
Source: "tp2-initial1", {
Targets: []model.LabelSet{{"__instance__": "3"}}, Source: "tp2-initial1",
}}, Targets: []model.LabelSet{{"__instance__": "3"}},
},
},
interval: 10, interval: 10,
}, },
}, },
@ -153,7 +159,8 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
{ {
Source: "tp1-initial1", Source: "tp1-initial1",
Targets: []model.LabelSet{{"__instance__": "1"}}, Targets: []model.LabelSet{{"__instance__": "1"}},
}, { },
{
Source: "tp1-initial2", Source: "tp1-initial2",
Targets: []model.LabelSet{{"__instance__": "2"}}, Targets: []model.LabelSet{{"__instance__": "2"}},
}, },
@ -161,7 +168,8 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
{ {
Source: "tp1-initial1", Source: "tp1-initial1",
Targets: []model.LabelSet{{"__instance__": "1"}}, Targets: []model.LabelSet{{"__instance__": "1"}},
}, { },
{
Source: "tp1-initial2", Source: "tp1-initial2",
Targets: []model.LabelSet{{"__instance__": "2"}}, Targets: []model.LabelSet{{"__instance__": "2"}},
}, },
@ -576,13 +584,13 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
for testIndex, testCase := range testCases { for testIndex, testCase := range testCases {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
discoveryManager := NewManager(ctx, nil) discoveryManager := NewManager(nil)
go discoveryManager.Run() go discoveryManager.Run(ctx)
var totalUpdatesCount int var totalUpdatesCount int
for tpName, update := range testCase.updates { for tpName, update := range testCase.updates {
provider := newMockDiscoveryProvider(update) provider := newMockDiscoveryProvider(update)
discoveryManager.startProvider(strconv.Itoa(testIndex), tpName, provider) discoveryManager.startProvider(ctx, strconv.Itoa(testIndex), tpName, provider)
if len(update) > 0 { if len(update) > 0 {
totalUpdatesCount = totalUpdatesCount + len(update) totalUpdatesCount = totalUpdatesCount + len(update)
@ -603,8 +611,8 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
receivedFormated = receivedFormated + receivedTargets.Source + ":" + fmt.Sprint(receivedTargets.Targets) receivedFormated = receivedFormated + receivedTargets.Source + ":" + fmt.Sprint(receivedTargets.Targets)
} }
var expectedFormated string var expectedFormated string
for _, receivedTargets := range testCase.expectedTargets[x] { for _, expectedTargets := range testCase.expectedTargets[x] {
expectedFormated = expectedFormated + receivedTargets.Source + ":" + fmt.Sprint(receivedTargets.Targets) expectedFormated = expectedFormated + expectedTargets.Source + ":" + fmt.Sprint(expectedTargets.Targets)
} }
t.Errorf("%v. %v: \ntargets mismatch \nreceived: %v \nexpected: %v", t.Errorf("%v. %v: \ntargets mismatch \nreceived: %v \nexpected: %v",
@ -664,8 +672,8 @@ scrape_configs:
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
discoveryManager := NewManager(ctx, nil) discoveryManager := NewManager(nil)
go discoveryManager.Run() go discoveryManager.Run(ctx)
discoveryManager.ApplyConfig(cfg) discoveryManager.ApplyConfig(cfg)

View File

@ -41,7 +41,7 @@ func NewScrapeManager(logger log.Logger, app Appendable) *ScrapeManager {
} }
} }
// ScrapeManager maintains a set of scrape pools and manages start/stop cicles // ScrapeManager maintains a set of scrape pools and manages start/stop cycles
// when receiving new target groups form the discovery manager. // when receiving new target groups form the discovery manager.
type ScrapeManager struct { type ScrapeManager struct {
logger log.Logger logger log.Logger