diff --git a/discovery/kubernetes/kubernetes.go b/discovery/kubernetes/kubernetes.go index 4ab431fc5..2b2bb9ae7 100644 --- a/discovery/kubernetes/kubernetes.go +++ b/discovery/kubernetes/kubernetes.go @@ -383,7 +383,6 @@ func send(ctx context.Context, l log.Logger, role Role, ch chan<- []*targetgroup if tg == nil { return } - level.Debug(l).Log("msg", "kubernetes discovery update", "role", string(role), "tg", fmt.Sprintf("%#v", tg)) select { case <-ctx.Done(): case ch <- []*targetgroup.Group{tg}: diff --git a/discovery/manager.go b/discovery/manager.go index 11be4b4c3..bb4409fea 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -162,47 +162,50 @@ func (m *Manager) startProvider(ctx context.Context, p *provider) { m.discoverCancel = append(m.discoverCancel, cancel) go p.d.Run(ctx, updates) - go m.runProvider(ctx, p, updates) + go m.updater(ctx, p, updates) } -func (m *Manager) runProvider(ctx context.Context, p *provider, updates chan []*targetgroup.Group) { +func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() - updateReceived := make(chan struct{}, 1) + triggerUpdate := make(chan struct{}, 1) for { select { case <-ctx.Done(): return case tgs, ok := <-updates: - // Handle the case that a target provider(E.g. StaticProvider) exits and - // closes the channel before the context is done. - // This will prevent sending the updates to the receiver so we send them before exiting. if !ok { + level.Debug(m.logger).Log("msg", "discoverer channel closed, sending the last update", "provider", p.name) select { - case m.syncCh <- m.allGroups(): - default: - level.Debug(m.logger).Log("msg", "discovery receiver's channel was full") + case m.syncCh <- m.allGroups(): // Waiting until the receiver can accept the last update. + level.Debug(m.logger).Log("msg", "discoverer exited", "provider", p.name) + return + case <-ctx.Done(): + return } - return + } for _, s := range p.subs { m.updateGroup(poolKey{setName: s, provider: p.name}, tgs) } - // Signal that there was an update. select { - case updateReceived <- struct{}{}: + case triggerUpdate <- struct{}{}: default: } - case <-ticker.C: // Some discoverers send updates too often so we send these to the receiver once every 5 seconds. + case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker. select { - case <-updateReceived: // Send only when there is a new update. + case <-triggerUpdate: select { case m.syncCh <- m.allGroups(): default: - level.Debug(m.logger).Log("msg", "discovery receiver's channel was full") + level.Debug(m.logger).Log("msg", "discovery receiver's channel was full so will retry the next cycle", "provider", p.name) + select { + case triggerUpdate <- struct{}{}: + default: + } } default: } diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 402c75b9d..d9566201f 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/go-kit/kit/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" sd_config "github.com/prometheus/prometheus/discovery/config" @@ -657,7 +658,7 @@ func TestTargetUpdatesOrder(t *testing.T) { for testIndex, testCase := range testCases { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, nil) + discoveryManager := NewManager(ctx, log.NewNopLogger()) var totalUpdatesCount int @@ -743,7 +744,7 @@ scrape_configs: } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, nil) + discoveryManager := NewManager(ctx, log.NewNopLogger()) go discoveryManager.Run() c := make(map[string]sd_config.ServiceDiscoveryConfig) @@ -849,7 +850,7 @@ scrape_configs: } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, nil) + discoveryManager := NewManager(ctx, log.NewNopLogger()) go discoveryManager.Run() c := make(map[string]sd_config.ServiceDiscoveryConfig)