diff --git a/discovery/manager.go b/discovery/manager.go index 639a24180..ad1e5ad06 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -74,12 +74,26 @@ type poolKey struct { provider string } -// provider holds a Discoverer instance, its configuration and its subscribers. +// provider holds a Discoverer instance, its configuration, cancel func and its subscribers. type provider struct { name string d Discoverer - subs []string config interface{} + + cancel context.CancelFunc + // done should be called after cleaning up resources associated with cancelled provider. + done func() + + mu sync.RWMutex + subs map[string]struct{} + + // newSubs is used to temporary store subs to be used upon config reload completion. + newSubs map[string]struct{} +} + +// IsStarted return true if Discoverer is started. +func (p *provider) IsStarted() bool { + return p.cancel != nil } // NewManager is the Discovery Manager constructor. @@ -88,13 +102,12 @@ func NewManager(ctx context.Context, logger log.Logger, options ...func(*Manager logger = log.NewNopLogger() } mgr := &Manager{ - logger: logger, - syncCh: make(chan map[string][]*targetgroup.Group), - targets: make(map[poolKey]map[string]*targetgroup.Group), - discoverCancel: []context.CancelFunc{}, - ctx: ctx, - updatert: 5 * time.Second, - triggerSend: make(chan struct{}, 1), + logger: logger, + syncCh: make(chan map[string][]*targetgroup.Group), + targets: make(map[poolKey]map[string]*targetgroup.Group), + ctx: ctx, + updatert: 5 * time.Second, + triggerSend: make(chan struct{}, 1), } for _, option := range options { option(mgr) @@ -114,15 +127,16 @@ func Name(n string) func(*Manager) { // Manager maintains a set of discovery providers and sends each update to a map channel. // Targets are grouped by the target set name. type Manager struct { - logger log.Logger - name string - mtx sync.RWMutex - ctx context.Context - discoverCancel []context.CancelFunc + logger log.Logger + name string + mtx sync.RWMutex + ctx context.Context - // Some Discoverers(eg. k8s) send only the updates for a given target group + // Some Discoverers(e.g. k8s) send only the updates for a given target group, // so we use map[tg.Source]*targetgroup.Group to know which group to update. - targets map[poolKey]map[string]*targetgroup.Group + targets map[poolKey]map[string]*targetgroup.Group + targetsMtx sync.Mutex + // providers keeps track of SD providers. providers []*provider // The sync channel sends the updates as a map where the key is the job value from the scrape config. @@ -132,11 +146,14 @@ type Manager struct { // should only be modified in unit tests. updatert time.Duration - // The triggerSend channel signals to the manager that new updates have been received from providers. + // The triggerSend channel signals to the Manager that new updates have been received from providers. triggerSend chan struct{} + + // lastProvider counts providers registered during Manager's lifetime. + lastProvider uint } -// Run starts the background processing +// Run starts the background processing. func (m *Manager) Run() error { go m.sender() for range m.ctx.Done() { @@ -151,31 +168,80 @@ func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group { return m.syncCh } -// ApplyConfig removes all running discovery providers and starts new ones using the provided config. +// ApplyConfig checks if discovery provider with supplied config is already running and keeps them as is. +// Remaining providers are then stopped and new required providers are started using the provided config. func (m *Manager) ApplyConfig(cfg map[string]Configs) error { m.mtx.Lock() defer m.mtx.Unlock() - for pk := range m.targets { - if _, ok := cfg[pk.setName]; !ok { - discoveredTargets.DeleteLabelValues(m.name, pk.setName) - } - } - m.cancelDiscoverers() - m.targets = make(map[poolKey]map[string]*targetgroup.Group) - m.providers = nil - m.discoverCancel = nil - - failedCount := 0 + var failedCount int for name, scfg := range cfg { failedCount += m.registerProviders(scfg, name) - discoveredTargets.WithLabelValues(m.name, name).Set(0) } failedConfigs.WithLabelValues(m.name).Set(float64(failedCount)) + var ( + wg sync.WaitGroup + // keep shows if we keep any providers after reload. + keep bool + newProviders []*provider + ) for _, prov := range m.providers { - m.startProvider(m.ctx, prov) + // Cancel obsolete providers. + if len(prov.newSubs) == 0 { + wg.Add(1) + prov.done = func() { + wg.Done() + } + prov.cancel() + continue + } + newProviders = append(newProviders, prov) + // refTargets keeps reference targets used to populate new subs' targets + var refTargets map[string]*targetgroup.Group + prov.mu.Lock() + for s := range prov.subs { + keep = true + refTargets = m.targets[poolKey{s, prov.name}] + // Remove obsolete subs' targets. + if _, ok := prov.newSubs[s]; !ok { + m.targetsMtx.Lock() + delete(m.targets, poolKey{s, prov.name}) + m.targetsMtx.Unlock() + discoveredTargets.DeleteLabelValues(m.name, s) + } + } + // Set metrics and targets for new subs. + for s := range prov.newSubs { + if _, ok := prov.subs[s]; !ok { + discoveredTargets.WithLabelValues(m.name, s).Set(0) + } + if l := len(refTargets); l > 0 { + m.targets[poolKey{s, prov.name}] = make(map[string]*targetgroup.Group, l) + for k, v := range refTargets { + m.targets[poolKey{s, prov.name}][k] = v + } + } + } + prov.subs = prov.newSubs + prov.newSubs = map[string]struct{}{} + prov.mu.Unlock() + if !prov.IsStarted() { + m.startProvider(m.ctx, prov) + } } + // Currently downstream managers expect full target state upon config reload, so we must oblige. + // While startProvider does pull the trigger, it may take some time to do so, therefore + // we pull the trigger as soon as possible so that downstream managers can populate their state. + // See https://github.com/prometheus/prometheus/pull/8639 for details. + if keep { + select { + case m.triggerSend <- struct{}{}: + default: + } + } + m.providers = newProviders + wg.Wait() return nil } @@ -185,7 +251,9 @@ func (m *Manager) StartCustomProvider(ctx context.Context, name string, worker D p := &provider{ name: name, d: worker, - subs: []string{name}, + subs: map[string]struct{}{ + name: {}, + }, } m.providers = append(m.providers, p) m.startProvider(ctx, p) @@ -196,13 +264,29 @@ func (m *Manager) startProvider(ctx context.Context, p *provider) { ctx, cancel := context.WithCancel(ctx) updates := make(chan []*targetgroup.Group) - m.discoverCancel = append(m.discoverCancel, cancel) + p.cancel = cancel go p.d.Run(ctx, updates) go m.updater(ctx, p, updates) } +// cleaner cleans resources associated with provider. +func (m *Manager) cleaner(p *provider) { + m.targetsMtx.Lock() + p.mu.RLock() + for s := range p.subs { + delete(m.targets, poolKey{s, p.name}) + } + p.mu.RUnlock() + m.targetsMtx.Unlock() + if p.done != nil { + p.done() + } +} + func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) { + // Ensure targets from this provider are cleaned up. + defer m.cleaner(p) for { select { case <-ctx.Done(): @@ -211,12 +295,16 @@ func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targ receivedUpdates.WithLabelValues(m.name).Inc() if !ok { level.Debug(m.logger).Log("msg", "Discoverer channel closed", "provider", p.name) + // Wait for provider cancellation to ensure targets are cleaned up when expected. + <-ctx.Done() return } - for _, s := range p.subs { + p.mu.RLock() + for s := range p.subs { m.updateGroup(poolKey{setName: s, provider: p.name}, tgs) } + p.mu.RUnlock() select { case m.triggerSend <- struct{}{}: @@ -234,7 +322,7 @@ func (m *Manager) sender() { select { case <-m.ctx.Done(): return - case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker. + case <-ticker.C: // Some discoverers send updates too often, so we throttle these with the ticker. select { case <-m.triggerSend: sentUpdates.WithLabelValues(m.name).Inc() @@ -255,14 +343,18 @@ func (m *Manager) sender() { } func (m *Manager) cancelDiscoverers() { - for _, c := range m.discoverCancel { - c() + m.mtx.RLock() + defer m.mtx.RUnlock() + for _, p := range m.providers { + if p.cancel != nil { + p.cancel() + } } } func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) { - m.mtx.Lock() - defer m.mtx.Unlock() + m.targetsMtx.Lock() + defer m.targetsMtx.Unlock() if _, ok := m.targets[poolKey]; !ok { m.targets[poolKey] = make(map[string]*targetgroup.Group) @@ -275,11 +367,11 @@ func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) { } func (m *Manager) allGroups() map[string][]*targetgroup.Group { - m.mtx.RLock() - defer m.mtx.RUnlock() - tSets := map[string][]*targetgroup.Group{} n := map[string]int{} + + m.targetsMtx.Lock() + defer m.targetsMtx.Unlock() for pkey, tsets := range m.targets { for _, tg := range tsets { // Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager' @@ -303,7 +395,7 @@ func (m *Manager) registerProviders(cfgs Configs, setName string) int { add := func(cfg Config) { for _, p := range m.providers { if reflect.DeepEqual(cfg, p.config) { - p.subs = append(p.subs, setName) + p.newSubs[setName] = struct{}{} added = true return } @@ -318,11 +410,14 @@ func (m *Manager) registerProviders(cfgs Configs, setName string) int { return } m.providers = append(m.providers, &provider{ - name: fmt.Sprintf("%s/%d", typ, len(m.providers)), + name: fmt.Sprintf("%s/%d", typ, m.lastProvider), d: d, config: cfg, - subs: []string{setName}, + newSubs: map[string]struct{}{ + setName: {}, + }, }) + m.lastProvider++ added = true } for _, cfg := range cfgs { diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 0a438306e..2caae0080 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -18,6 +18,7 @@ import ( "fmt" "sort" "strconv" + "sync" "testing" "time" @@ -718,6 +719,32 @@ func staticConfig(addrs ...string) StaticConfig { return cfg } +func verifySyncedPresence(t *testing.T, tGroups map[string][]*targetgroup.Group, key string, label string, present bool) { + t.Helper() + if _, ok := tGroups[key]; !ok { + t.Fatalf("'%s' should be present in Group map keys: %v", key, tGroups) + return + } + match := false + var mergedTargets string + for _, targetGroups := range tGroups[key] { + for _, l := range targetGroups.Targets { + mergedTargets = mergedTargets + " " + l.String() + if l.String() == label { + match = true + } + } + + } + if match != present { + msg := "" + if !present { + msg = "not" + } + t.Fatalf("%q should %s be present in Group labels: %q", label, msg, mergedTargets) + } +} + func verifyPresence(t *testing.T, tSets map[poolKey]map[string]*targetgroup.Group, poolKey poolKey, label string, present bool) { t.Helper() if _, ok := tSets[poolKey]; !ok { @@ -746,7 +773,180 @@ func verifyPresence(t *testing.T, tSets map[poolKey]map[string]*targetgroup.Grou } } -func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) { +func pk(provider, setName string, n int) poolKey { + return poolKey{ + setName: setName, + provider: fmt.Sprintf("%s/%d", provider, n), + } +} + +func TestTargetSetTargetGroupsPresentOnConfigReload(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager.updatert = 100 * time.Millisecond + go discoveryManager.Run() + + c := map[string]Configs{ + "prometheus": { + staticConfig("foo:9090"), + }, + } + discoveryManager.ApplyConfig(c) + + syncedTargets := <-discoveryManager.SyncCh() + require.Equal(t, 1, len(syncedTargets)) + verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(syncedTargets["prometheus"])) + p := pk("static", "prometheus", 0) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(discoveryManager.targets)) + + discoveryManager.ApplyConfig(c) + + syncedTargets = <-discoveryManager.SyncCh() + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(discoveryManager.targets)) + require.Equal(t, 1, len(syncedTargets)) + verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(syncedTargets["prometheus"])) +} + +func TestTargetSetTargetGroupsPresentOnConfigRename(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager.updatert = 100 * time.Millisecond + go discoveryManager.Run() + + c := map[string]Configs{ + "prometheus": { + staticConfig("foo:9090"), + }, + } + discoveryManager.ApplyConfig(c) + + syncedTargets := <-discoveryManager.SyncCh() + require.Equal(t, 1, len(syncedTargets)) + verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(syncedTargets["prometheus"])) + p := pk("static", "prometheus", 0) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(discoveryManager.targets)) + + c["prometheus2"] = c["prometheus"] + delete(c, "prometheus") + discoveryManager.ApplyConfig(c) + + syncedTargets = <-discoveryManager.SyncCh() + p = pk("static", "prometheus2", 0) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(discoveryManager.targets)) + require.Equal(t, 1, len(syncedTargets)) + verifySyncedPresence(t, syncedTargets, "prometheus2", "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(syncedTargets["prometheus2"])) +} + +func TestTargetSetTargetGroupsPresentOnConfigDuplicateAndDeleteOriginal(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager.updatert = 100 * time.Millisecond + go discoveryManager.Run() + + c := map[string]Configs{ + "prometheus": { + staticConfig("foo:9090"), + }, + } + discoveryManager.ApplyConfig(c) + <-discoveryManager.SyncCh() + + c["prometheus2"] = c["prometheus"] + discoveryManager.ApplyConfig(c) + syncedTargets := <-discoveryManager.SyncCh() + require.Equal(t, 2, len(syncedTargets)) + verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(syncedTargets["prometheus"])) + verifySyncedPresence(t, syncedTargets, "prometheus2", "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(syncedTargets["prometheus2"])) + p := pk("static", "prometheus", 0) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true) + require.Equal(t, 2, len(discoveryManager.targets)) + + delete(c, "prometheus") + discoveryManager.ApplyConfig(c) + syncedTargets = <-discoveryManager.SyncCh() + p = pk("static", "prometheus2", 0) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(discoveryManager.targets)) + require.Equal(t, 1, len(syncedTargets)) + verifySyncedPresence(t, syncedTargets, "prometheus2", "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(syncedTargets["prometheus2"])) +} + +func TestTargetSetTargetGroupsPresentOnConfigChange(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager.updatert = 100 * time.Millisecond + go discoveryManager.Run() + + c := map[string]Configs{ + "prometheus": { + staticConfig("foo:9090"), + }, + } + discoveryManager.ApplyConfig(c) + + syncedTargets := <-discoveryManager.SyncCh() + require.Equal(t, 1, len(syncedTargets)) + verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(syncedTargets["prometheus"])) + + var mu sync.Mutex + c["prometheus2"] = Configs{ + lockStaticConfig{ + mu: &mu, + config: staticConfig("bar:9090"), + }, + } + mu.Lock() + discoveryManager.ApplyConfig(c) + + // Original targets should be present as soon as possible. + syncedTargets = <-discoveryManager.SyncCh() + mu.Unlock() + require.Equal(t, 1, len(syncedTargets)) + verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(syncedTargets["prometheus"])) + + // prometheus2 configs should be ready on second sync. + syncedTargets = <-discoveryManager.SyncCh() + require.Equal(t, 2, len(syncedTargets)) + verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(syncedTargets["prometheus"])) + verifySyncedPresence(t, syncedTargets, "prometheus2", "{__address__=\"bar:9090\"}", true) + require.Equal(t, 1, len(syncedTargets["prometheus2"])) + + p := pk("static", "prometheus", 0) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true) + p = pk("lockstatic", "prometheus2", 1) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"bar:9090\"}", true) + require.Equal(t, 2, len(discoveryManager.targets)) + + // Delete part of config and ensure only original targets exist. + delete(c, "prometheus2") + discoveryManager.ApplyConfig(c) + syncedTargets = <-discoveryManager.SyncCh() + require.Equal(t, 1, len(discoveryManager.targets)) + verifyPresence(t, discoveryManager.targets, pk("static", "prometheus", 0), "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(syncedTargets)) + verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(syncedTargets["prometheus"])) +} + +func TestTargetSetRecreatesTargetGroupsOnConfigChange(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() discoveryManager := NewManager(ctx, log.NewNopLogger()) @@ -760,18 +960,29 @@ func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) { } discoveryManager.ApplyConfig(c) - <-discoveryManager.SyncCh() - verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) - verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", true) + syncedTargets := <-discoveryManager.SyncCh() + p := pk("static", "prometheus", 0) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"bar:9090\"}", true) + require.Equal(t, 1, len(discoveryManager.targets)) + require.Equal(t, 1, len(syncedTargets)) + verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"bar:9090\"}", true) + require.Equal(t, 2, len(syncedTargets["prometheus"])) c["prometheus"] = Configs{ staticConfig("foo:9090"), } discoveryManager.ApplyConfig(c) - - <-discoveryManager.SyncCh() - verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) - verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", false) + syncedTargets = <-discoveryManager.SyncCh() + require.Equal(t, 1, len(discoveryManager.targets)) + p = pk("static", "prometheus", 1) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"bar:9090\"}", false) + require.Equal(t, 1, len(discoveryManager.targets)) + require.Equal(t, 1, len(syncedTargets)) + verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(syncedTargets["prometheus"])) } func TestDiscovererConfigs(t *testing.T) { @@ -789,10 +1000,18 @@ func TestDiscovererConfigs(t *testing.T) { } discoveryManager.ApplyConfig(c) - <-discoveryManager.SyncCh() - verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) - verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", true) - verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/1"}, "{__address__=\"baz:9090\"}", true) + syncedTargets := <-discoveryManager.SyncCh() + p := pk("static", "prometheus", 0) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"bar:9090\"}", true) + p = pk("static", "prometheus", 1) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"baz:9090\"}", true) + require.Equal(t, 2, len(discoveryManager.targets)) + require.Equal(t, 1, len(syncedTargets)) + verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"bar:9090\"}", true) + verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"baz:9090\"}", true) + require.Equal(t, 3, len(syncedTargets["prometheus"])) } // TestTargetSetRecreatesEmptyStaticConfigs ensures that reloading a config file after @@ -812,20 +1031,23 @@ func TestTargetSetRecreatesEmptyStaticConfigs(t *testing.T) { } discoveryManager.ApplyConfig(c) - <-discoveryManager.SyncCh() - verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) + syncedTargets := <-discoveryManager.SyncCh() + p := pk("static", "prometheus", 0) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(syncedTargets)) + verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(syncedTargets["prometheus"])) c["prometheus"] = Configs{ StaticConfig{{}}, } discoveryManager.ApplyConfig(c) - <-discoveryManager.SyncCh() - - pkey := poolKey{setName: "prometheus", provider: "static/0"} - targetGroups, ok := discoveryManager.targets[pkey] + syncedTargets = <-discoveryManager.SyncCh() + p = pk("static", "prometheus", 1) + targetGroups, ok := discoveryManager.targets[p] if !ok { - t.Fatalf("'%v' should be present in target groups", pkey) + t.Fatalf("'%v' should be present in target groups", p) } group, ok := targetGroups[""] if !ok { @@ -835,6 +1057,12 @@ func TestTargetSetRecreatesEmptyStaticConfigs(t *testing.T) { if len(group.Targets) != 0 { t.Fatalf("Invalid number of targets: expected 0, got %d", len(group.Targets)) } + require.Equal(t, 1, len(syncedTargets)) + require.Equal(t, 1, len(syncedTargets["prometheus"])) + if lbls := syncedTargets["prometheus"][0].Labels; lbls != nil { + t.Fatalf("Unexpected Group: expected nil Labels, got %v", lbls) + } + } func TestIdenticalConfigurationsAreCoalesced(t *testing.T) { @@ -854,12 +1082,17 @@ func TestIdenticalConfigurationsAreCoalesced(t *testing.T) { } discoveryManager.ApplyConfig(c) - <-discoveryManager.SyncCh() - verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) - verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus2", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) + syncedTargets := <-discoveryManager.SyncCh() + verifyPresence(t, discoveryManager.targets, pk("static", "prometheus", 0), "{__address__=\"foo:9090\"}", true) + verifyPresence(t, discoveryManager.targets, pk("static", "prometheus2", 0), "{__address__=\"foo:9090\"}", true) if len(discoveryManager.providers) != 1 { t.Fatalf("Invalid number of providers: expected 1, got %d", len(discoveryManager.providers)) } + require.Equal(t, 2, len(syncedTargets)) + verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(syncedTargets["prometheus"])) + verifySyncedPresence(t, syncedTargets, "prometheus2", "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(syncedTargets["prometheus2"])) } func TestApplyConfigDoesNotModifyStaticTargets(t *testing.T) { @@ -891,6 +1124,29 @@ type errorConfig struct{ err error } func (e errorConfig) Name() string { return "error" } func (e errorConfig) NewDiscoverer(DiscovererOptions) (Discoverer, error) { return nil, e.err } +type lockStaticConfig struct { + mu *sync.Mutex + config StaticConfig +} + +func (s lockStaticConfig) Name() string { return "lockstatic" } +func (s lockStaticConfig) NewDiscoverer(options DiscovererOptions) (Discoverer, error) { + return (lockStaticDiscoverer)(s), nil +} + +type lockStaticDiscoverer lockStaticConfig + +func (s lockStaticDiscoverer) Run(ctx context.Context, up chan<- []*targetgroup.Group) { + // TODO: existing implementation closes up chan, but documentation explicitly forbids it...? + defer close(up) + s.mu.Lock() + defer s.mu.Unlock() + select { + case <-ctx.Done(): + case up <- s.config: + } +} + func TestGaugeFailedConfigs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()