tidy up the discovery logs,updating loops and selects (#4556)
* tidy up the discovery logs,updating loops and selects few objects renamings removed a very noise debug log on the k8s discovery. It would be usefull to show some summary rather than every update as this is impossible to follow. added most comments as debug logs so each block becomes self explanatory. when the discovery receiving channel is full will retry again on the next cycle. Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * add noop logger for the SD manager tests. Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * spelling nits Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
This commit is contained in:
parent
f708fd5c99
commit
ba7eb733e8
|
@ -383,7 +383,6 @@ func send(ctx context.Context, l log.Logger, role Role, ch chan<- []*targetgroup
|
|||
if tg == nil {
|
||||
return
|
||||
}
|
||||
level.Debug(l).Log("msg", "kubernetes discovery update", "role", string(role), "tg", fmt.Sprintf("%#v", tg))
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case ch <- []*targetgroup.Group{tg}:
|
||||
|
|
|
@ -162,47 +162,50 @@ func (m *Manager) startProvider(ctx context.Context, p *provider) {
|
|||
m.discoverCancel = append(m.discoverCancel, cancel)
|
||||
|
||||
go p.d.Run(ctx, updates)
|
||||
go m.runProvider(ctx, p, updates)
|
||||
go m.updater(ctx, p, updates)
|
||||
}
|
||||
|
||||
func (m *Manager) runProvider(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {
|
||||
func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
updateReceived := make(chan struct{}, 1)
|
||||
triggerUpdate := make(chan struct{}, 1)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case tgs, ok := <-updates:
|
||||
// Handle the case that a target provider(E.g. StaticProvider) exits and
|
||||
// closes the channel before the context is done.
|
||||
// This will prevent sending the updates to the receiver so we send them before exiting.
|
||||
if !ok {
|
||||
level.Debug(m.logger).Log("msg", "discoverer channel closed, sending the last update", "provider", p.name)
|
||||
select {
|
||||
case m.syncCh <- m.allGroups():
|
||||
default:
|
||||
level.Debug(m.logger).Log("msg", "discovery receiver's channel was full")
|
||||
}
|
||||
case m.syncCh <- m.allGroups(): // Waiting until the receiver can accept the last update.
|
||||
level.Debug(m.logger).Log("msg", "discoverer exited", "provider", p.name)
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
for _, s := range p.subs {
|
||||
m.updateGroup(poolKey{setName: s, provider: p.name}, tgs)
|
||||
}
|
||||
|
||||
// Signal that there was an update.
|
||||
select {
|
||||
case updateReceived <- struct{}{}:
|
||||
case triggerUpdate <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
case <-ticker.C: // Some discoverers send updates too often so we send these to the receiver once every 5 seconds.
|
||||
case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker.
|
||||
select {
|
||||
case <-updateReceived: // Send only when there is a new update.
|
||||
case <-triggerUpdate:
|
||||
select {
|
||||
case m.syncCh <- m.allGroups():
|
||||
default:
|
||||
level.Debug(m.logger).Log("msg", "discovery receiver's channel was full")
|
||||
level.Debug(m.logger).Log("msg", "discovery receiver's channel was full so will retry the next cycle", "provider", p.name)
|
||||
select {
|
||||
case triggerUpdate <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
default:
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
sd_config "github.com/prometheus/prometheus/discovery/config"
|
||||
|
@ -657,7 +658,7 @@ func TestTargetUpdatesOrder(t *testing.T) {
|
|||
for testIndex, testCase := range testCases {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
discoveryManager := NewManager(ctx, nil)
|
||||
discoveryManager := NewManager(ctx, log.NewNopLogger())
|
||||
|
||||
var totalUpdatesCount int
|
||||
|
||||
|
@ -743,7 +744,7 @@ scrape_configs:
|
|||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
discoveryManager := NewManager(ctx, nil)
|
||||
discoveryManager := NewManager(ctx, log.NewNopLogger())
|
||||
go discoveryManager.Run()
|
||||
|
||||
c := make(map[string]sd_config.ServiceDiscoveryConfig)
|
||||
|
@ -849,7 +850,7 @@ scrape_configs:
|
|||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
discoveryManager := NewManager(ctx, nil)
|
||||
discoveryManager := NewManager(ctx, log.NewNopLogger())
|
||||
go discoveryManager.Run()
|
||||
|
||||
c := make(map[string]sd_config.ServiceDiscoveryConfig)
|
||||
|
|
Loading…
Reference in New Issue