discovery: Schedule updates to throttle

This commit is contained in:
Frederic Branczyk 2018-01-26 14:57:33 +01:00
parent 40acc632bb
commit cfa0253ed8
No known key found for this signature in database
GPG Key ID: 7741A52782A90069
1 changed files with 26 additions and 1 deletions

View File

@ -17,6 +17,7 @@ import (
"context" "context"
"fmt" "fmt"
"sync" "sync"
"time"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
@ -81,6 +82,10 @@ type Manager struct {
targets map[poolKey]map[string]*targetgroup.Group targets map[poolKey]map[string]*targetgroup.Group
// The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config. // The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config.
syncCh chan map[string][]*targetgroup.Group syncCh chan map[string][]*targetgroup.Group
// True if updates were received in the last 5 seconds.
recentlyUpdated bool
// Protects recentlyUpdated.
recentlyUpdatedMtx sync.Mutex
} }
// Run starts the background processing // Run starts the background processing
@ -123,6 +128,7 @@ func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Dis
go worker.Run(ctx, updates) go worker.Run(ctx, updates)
go m.runProvider(ctx, poolKey, updates) go m.runProvider(ctx, poolKey, updates)
go m.runUpdater(ctx)
} }
func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan []*targetgroup.Group) { func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan []*targetgroup.Group) {
@ -137,7 +143,26 @@ func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan
return return
} }
m.updateGroup(poolKey, tgs) m.updateGroup(poolKey, tgs)
m.syncCh <- m.allGroups() m.recentlyUpdatedMtx.Lock()
m.recentlyUpdated = true
m.recentlyUpdatedMtx.Unlock()
}
}
}
func (m *Manager) runUpdater(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
m.recentlyUpdatedMtx.Lock()
if m.recentlyUpdated {
m.syncCh <- m.allGroups()
m.recentlyUpdated = false
}
m.recentlyUpdatedMtx.Unlock()
} }
} }
} }