diff --git a/discovery/manager.go b/discovery/manager.go index 0a3d107a3..d3cab05c4 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -73,7 +73,7 @@ func NewManager(logger log.Logger) *Manager { logger: logger, actionCh: make(chan func(context.Context)), syncCh: make(chan map[string][]*targetgroup.Group), - targets: make(map[poolKey][]*targetgroup.Group), + targets: make(map[poolKey]map[string]*targetgroup.Group), discoverCancel: []context.CancelFunc{}, } } @@ -83,7 +83,8 @@ type Manager struct { logger log.Logger actionCh chan func(context.Context) discoverCancel []context.CancelFunc - targets map[poolKey][]*targetgroup.Group + // We use map[string]*targetgroup.Group to handle Discoverers that send only updates instead of all targets on every update. + targets map[poolKey]map[string]*targetgroup.Group // The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config. syncCh chan map[string][]*targetgroup.Group } @@ -143,8 +144,8 @@ func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan if !ok { return } - m.addGroup(poolKey, tgs) - m.syncCh <- m.allGroups(poolKey) + m.updateGroup(poolKey, tgs) + m.syncCh <- m.allGroups() } } } @@ -153,16 +154,21 @@ func (m *Manager) cancelDiscoverers() { for _, c := range m.discoverCancel { c() } - m.targets = make(map[poolKey][]*targetgroup.Group) + m.targets = make(map[poolKey]map[string]*targetgroup.Group) m.discoverCancel = nil } -func (m *Manager) addGroup(poolKey poolKey, tg []*targetgroup.Group) { +func (m *Manager) updateGroup(poolKey poolKey, tg []*targetgroup.Group) { done := make(chan struct{}) m.actionCh <- func(ctx context.Context) { if tg != nil { - m.targets[poolKey] = tg + for _, t := range tg { + if _, ok := m.targets[poolKey]; !ok { + m.targets[poolKey] = make(map[string]*targetgroup.Group) + } + m.targets[poolKey][t.Source] = t + } } close(done) @@ -170,7 +176,7 @@ func (m *Manager) addGroup(poolKey poolKey, tg []*targetgroup.Group) { <-done } -func (m *Manager) allGroups(pk poolKey) map[string][]*targetgroup.Group { +func (m *Manager) allGroups() map[string][]*targetgroup.Group { tSets := make(chan map[string][]*targetgroup.Group) m.actionCh <- func(ctx context.Context) { @@ -185,7 +191,9 @@ func (m *Manager) allGroups(pk poolKey) map[string][]*targetgroup.Group { tSetsAll := map[string][]*targetgroup.Group{} for _, pk := range pKeys { for _, tg := range m.targets[pk] { - if tg.Source != "" { // Don't add empty targets. + // Don't add empty targets. + // Some Discoverers(eg. k8s) send only the updates so removed targets will be updated with an empty Source value. + if tg.Source != "" { tSetsAll[pk.setName] = append(tSetsAll[pk.setName], tg) } }