From cfa0253ed8ca449d803fc34aa4461f49c2de0db9 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Fri, 26 Jan 2018 14:57:33 +0100 Subject: [PATCH] discovery: Schedule updates to throttle --- discovery/manager.go | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/discovery/manager.go b/discovery/manager.go index ae10de5cb..dd1e9b533 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -81,6 +82,10 @@ type Manager struct { targets map[poolKey]map[string]*targetgroup.Group // The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config. syncCh chan map[string][]*targetgroup.Group + // True if updates were received in the last 5 seconds. + recentlyUpdated bool + // Protects recentlyUpdated. + recentlyUpdatedMtx sync.Mutex } // Run starts the background processing @@ -123,6 +128,7 @@ func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Dis go worker.Run(ctx, updates) go m.runProvider(ctx, poolKey, updates) + go m.runUpdater(ctx) } func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan []*targetgroup.Group) { @@ -137,7 +143,26 @@ func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan return } m.updateGroup(poolKey, tgs) - m.syncCh <- m.allGroups() + m.recentlyUpdatedMtx.Lock() + m.recentlyUpdated = true + m.recentlyUpdatedMtx.Unlock() + } + } +} + +func (m *Manager) runUpdater(ctx context.Context) { + ticker := time.NewTicker(5 * time.Second) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + m.recentlyUpdatedMtx.Lock() + if m.recentlyUpdated { + m.syncCh <- m.allGroups() + m.recentlyUpdated = false + } + m.recentlyUpdatedMtx.Unlock() } } }