Improve target discovery pipeline

Replace the TargetProvider Stop method with done channels
that ensure properly broadcasted shutdown of the whole pipeline.
This commit is contained in:
Fabian Reinartz 2015-08-10 16:44:32 +02:00
parent 15b4115a25
commit 4e84b86510
10 changed files with 198 additions and 165 deletions

View File

@ -57,9 +57,8 @@ type ConsulDiscovery struct {
tagSeparator string
scrapedServices map[string]struct{}
mu sync.RWMutex
services map[string]*consulService
runDone, srvsDone chan struct{}
mu sync.RWMutex
services map[string]*consulService
}
// consulService contains data belonging to the same service.
@ -93,8 +92,6 @@ func NewConsulDiscovery(conf *config.ConsulSDConfig) *ConsulDiscovery {
client: client,
clientConf: clientConf,
tagSeparator: conf.TagSeparator,
runDone: make(chan struct{}),
srvsDone: make(chan struct{}, 1),
scrapedServices: map[string]struct{}{},
services: map[string]*consulService{},
}
@ -133,18 +130,22 @@ func (cd *ConsulDiscovery) Sources() []string {
}
// Run implements the TargetProvider interface.
func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup) {
func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
defer close(ch)
defer cd.stop()
update := make(chan *consulService, 10)
go cd.watchServices(update)
go cd.watchServices(update, done)
for {
select {
case <-cd.runDone:
case <-done:
return
case srv := <-update:
if srv.removed {
close(srv.done)
// Send clearing update.
ch <- &config.TargetGroup{Source: srv.name}
break
}
@ -157,31 +158,20 @@ func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup) {
}
}
// Stop implements the TargetProvider interface.
func (cd *ConsulDiscovery) Stop() {
log.Debugf("Stopping Consul service discovery for %s", cd.clientConf.Address)
func (cd *ConsulDiscovery) stop() {
// The lock prevents Run from terminating while the watchers attempt
// to send on their channels.
cd.mu.Lock()
defer cd.mu.Unlock()
// The watching goroutines will terminate after their next watch timeout.
// As this can take long, the channel is buffered and we do not wait.
for _, srv := range cd.services {
srv.done <- struct{}{}
close(srv.done)
}
cd.srvsDone <- struct{}{}
// Terminate Run.
cd.runDone <- struct{}{}
log.Debugf("Consul service discovery for %s stopped.", cd.clientConf.Address)
}
// watchServices retrieves updates from Consul's services endpoint and sends
// potential updates to the update channel.
func (cd *ConsulDiscovery) watchServices(update chan<- *consulService) {
func (cd *ConsulDiscovery) watchServices(update chan<- *consulService, done <-chan struct{}) {
var lastIndex uint64
for {
catalog := cd.client.Catalog()
@ -191,8 +181,7 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService) {
})
if err != nil {
log.Errorf("Error refreshing service list: %s", err)
<-time.After(consulRetryInterval)
continue
time.Sleep(consulRetryInterval)
}
// If the index equals the previous one, the watch timed out with no update.
if meta.LastIndex == lastIndex {
@ -202,7 +191,7 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService) {
cd.mu.Lock()
select {
case <-cd.srvsDone:
case <-done:
cd.mu.Unlock()
return
default:
@ -218,7 +207,7 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService) {
srv = &consulService{
name: name,
tgroup: &config.TargetGroup{},
done: make(chan struct{}, 1),
done: make(chan struct{}),
}
srv.tgroup.Source = name
cd.services[name] = srv
@ -234,7 +223,6 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService) {
if _, ok := srvs[name]; !ok {
srv.removed = true
update <- srv
srv.done <- struct{}{}
delete(cd.services, name)
}
}
@ -253,7 +241,7 @@ func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- *config.Ta
})
if err != nil {
log.Errorf("Error refreshing service %s: %s", srv.name, err)
<-time.After(consulRetryInterval)
time.Sleep(consulRetryInterval)
continue
}
// If the index equals the previous one, the watch timed out with no update.

View File

@ -64,11 +64,11 @@ func init() {
type DNSDiscovery struct {
names []string
done chan struct{}
ticker *time.Ticker
m sync.RWMutex
port int
qtype uint16
done chan struct{}
interval time.Duration
m sync.RWMutex
port int
qtype uint16
}
// NewDNSDiscovery returns a new DNSDiscovery which periodically refreshes its targets.
@ -83,41 +83,34 @@ func NewDNSDiscovery(conf *config.DNSSDConfig) *DNSDiscovery {
qtype = dns.TypeSRV
}
return &DNSDiscovery{
names: conf.Names,
done: make(chan struct{}),
ticker: time.NewTicker(time.Duration(conf.RefreshInterval)),
qtype: qtype,
port: conf.Port,
names: conf.Names,
done: make(chan struct{}),
interval: time.Duration(conf.RefreshInterval),
qtype: qtype,
port: conf.Port,
}
}
// Run implements the TargetProvider interface.
func (dd *DNSDiscovery) Run(ch chan<- *config.TargetGroup) {
func (dd *DNSDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
defer close(ch)
ticker := time.NewTicker(dd.interval)
defer ticker.Stop()
// Get an initial set right away.
dd.refreshAll(ch)
for {
select {
case <-dd.ticker.C:
case <-ticker.C:
dd.refreshAll(ch)
case <-dd.done:
case <-done:
return
}
}
}
// Stop implements the TargetProvider interface.
func (dd *DNSDiscovery) Stop() {
log.Debug("Stopping DNS discovery for %s...", dd.names)
dd.ticker.Stop()
dd.done <- struct{}{}
log.Debug("DNS discovery for %s stopped.", dd.names)
}
// Sources implements the TargetProvider interface.
func (dd *DNSDiscovery) Sources() []string {
var srcs []string

View File

@ -39,7 +39,6 @@ type FileDiscovery struct {
paths []string
watcher *fsnotify.Watcher
interval time.Duration
done chan struct{}
// lastRefresh stores which files were found during the last refresh
// and how many target groups they contained.
@ -52,7 +51,6 @@ func NewFileDiscovery(conf *config.FileSDConfig) *FileDiscovery {
return &FileDiscovery{
paths: conf.Names,
interval: time.Duration(conf.RefreshInterval),
done: make(chan struct{}),
}
}
@ -106,8 +104,9 @@ func (fd *FileDiscovery) watchFiles() {
}
// Run implements the TargetProvider interface.
func (fd *FileDiscovery) Run(ch chan<- *config.TargetGroup) {
func (fd *FileDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
defer close(ch)
defer fd.stop()
watcher, err := fsnotify.NewWatcher()
if err != nil {
@ -125,10 +124,13 @@ func (fd *FileDiscovery) Run(ch chan<- *config.TargetGroup) {
// Stopping has priority over refreshing. Thus we wrap the actual select
// clause to always catch done signals.
select {
case <-fd.done:
case <-done:
return
default:
select {
case <-done:
return
case event := <-fd.watcher.Events:
// fsnotify sometimes sends a bunch of events without name or operation.
// It's unclear what they are and why they are sent - filter them out.
@ -154,9 +156,6 @@ func (fd *FileDiscovery) Run(ch chan<- *config.TargetGroup) {
if err != nil {
log.Errorf("Error on file watch: %s", err)
}
case <-fd.done:
return
}
}
}
@ -198,11 +197,10 @@ func fileSource(filename string, i int) string {
return fmt.Sprintf("%s:%d", filename, i)
}
// Stop implements the TargetProvider interface.
func (fd *FileDiscovery) Stop() {
// stop shuts down the file watcher.
func (fd *FileDiscovery) stop() {
log.Debugf("Stopping file discovery for %s...", fd.paths)
fd.done <- struct{}{}
// Closing the watcher will deadlock unless all events and errors are drained.
go func() {
for {
@ -210,15 +208,13 @@ func (fd *FileDiscovery) Stop() {
case <-fd.watcher.Errors:
case <-fd.watcher.Events:
// Drain all events and errors.
case <-fd.done:
default:
return
}
}
}()
fd.watcher.Close()
fd.done <- struct{}{}
log.Debugf("File discovery for %s stopped.", fd.paths)
}

View File

@ -24,11 +24,13 @@ func testFileSD(t *testing.T, ext string) {
conf.Names = []string{"fixtures/_*" + ext}
conf.RefreshInterval = config.Duration(1 * time.Hour)
fsd := NewFileDiscovery(&conf)
ch := make(chan *config.TargetGroup)
go fsd.Run(ch)
defer fsd.Stop()
var (
fsd = NewFileDiscovery(&conf)
ch = make(chan *config.TargetGroup)
done = make(chan struct{})
)
go fsd.Run(ch, done)
defer close(done)
select {
case <-time.After(25 * time.Millisecond):

View File

@ -40,13 +40,12 @@ func (md *MarathonDiscovery) Sources() []string {
}
// Run implements the TargetProvider interface.
func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup) {
func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
defer close(ch)
for {
select {
case <-md.done:
log.Debug("Shutting down marathon discovery.")
case <-done:
return
case <-time.After(md.refreshInterval):
err := md.updateServices(ch)
@ -57,11 +56,6 @@ func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup) {
}
}
// Stop implements the TargetProvider interface.
func (md *MarathonDiscovery) Stop() {
md.done <- struct{}{}
}
func (md *MarathonDiscovery) updateServices(ch chan<- *config.TargetGroup) error {
targetMap, err := md.fetchTargetGroups()
if err != nil {

View File

@ -181,17 +181,18 @@ func TestMarathonSDRunAndStop(t *testing.T) {
return marathonTestAppList(marathonValidLabel, 1), nil
})
md.refreshInterval = time.Millisecond * 10
done := make(chan struct{})
go func() {
select {
case <-ch:
md.Stop()
close(done)
case <-time.After(md.refreshInterval * 3):
md.Stop()
close(done)
t.Fatalf("Update took too long.")
}
}()
md.Run(ch)
md.Run(ch, done)
select {
case <-ch:
default:

View File

@ -67,7 +67,6 @@ type ServersetDiscovery struct {
sources map[string]*config.TargetGroup
sdUpdates *chan<- *config.TargetGroup
updates chan zookeeperTreeCacheEvent
runDone chan struct{}
treeCache *zookeeperTreeCache
}
@ -84,7 +83,6 @@ func NewServersetDiscovery(conf *config.ServersetSDConfig) *ServersetDiscovery {
conn: conn,
updates: updates,
sources: map[string]*config.TargetGroup{},
runDone: make(chan struct{}),
}
go sd.processUpdates()
sd.treeCache = NewZookeeperTreeCache(conn, conf.Paths[0], updates)
@ -132,7 +130,7 @@ func (sd *ServersetDiscovery) processUpdates() {
}
// Run implements the TargetProvider interface.
func (sd *ServersetDiscovery) Run(ch chan<- *config.TargetGroup) {
func (sd *ServersetDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
// Send on everything we have seen so far.
sd.mu.Lock()
for _, targetGroup := range sd.sources {
@ -142,20 +140,10 @@ func (sd *ServersetDiscovery) Run(ch chan<- *config.TargetGroup) {
sd.sdUpdates = &ch
sd.mu.Unlock()
<-sd.runDone
<-done
sd.treeCache.Stop()
}
// Stop implements the TargetProvider interface.
func (sd *ServersetDiscovery) Stop() {
log.Debugf("Stopping serverset service discovery for %s %s", sd.conf.Servers, sd.conf.Paths)
// Terminate Run.
sd.runDone <- struct{}{}
log.Debugf("Serverset service discovery for %s %s stopped", sd.conf.Servers, sd.conf.Paths)
}
func parseServersetMember(data []byte, path string) (*clientmodel.LabelSet, error) {
member := serversetMember{}
err := json.Unmarshal(data, &member)

View File

@ -53,17 +53,18 @@ type fakeTargetProvider struct {
update chan *config.TargetGroup
}
func (tp *fakeTargetProvider) Run(ch chan<- *config.TargetGroup) {
func (tp *fakeTargetProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
defer close(ch)
for tg := range tp.update {
ch <- tg
for {
select {
case tg := <-tp.update:
ch <- tg
case <-done:
return
}
}
}
func (tp *fakeTargetProvider) Stop() {
close(tp.update)
}
func (tp *fakeTargetProvider) Sources() []string {
return tp.sources
}

View File

@ -43,20 +43,19 @@ type TargetProvider interface {
// Run hands a channel to the target provider through which it can send
// updated target groups. The channel must be closed by the target provider
// if no more updates will be sent.
Run(chan<- *config.TargetGroup)
// Stop terminates any potential computation of the target provider. The
// channel received on Run must be closed afterwards.
Stop()
// On receiving from done Run must return.
Run(up chan<- *config.TargetGroup, done <-chan struct{})
}
// TargetManager maintains a set of targets, starts and stops their scraping and
// creates the new targets based on the target groups it receives from various
// target providers.
type TargetManager struct {
m sync.RWMutex
mtx sync.RWMutex
globalLabels clientmodel.LabelSet
sampleAppender storage.SampleAppender
running bool
done chan struct{}
// Targets by their source ID.
targets map[string][]*Target
@ -73,31 +72,96 @@ func NewTargetManager(sampleAppender storage.SampleAppender) *TargetManager {
return tm
}
// merge multiple target group channels into a single output channel.
func merge(done <-chan struct{}, cs ...<-chan targetGroupUpdate) <-chan targetGroupUpdate {
var wg sync.WaitGroup
out := make(chan targetGroupUpdate)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c or done is closed, then calls
// wg.Done.
redir := func(c <-chan targetGroupUpdate) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
wg.Add(len(cs))
for _, c := range cs {
go redir(c)
}
// Close the out channel if all inbound channels are closed.
go func() {
wg.Wait()
close(out)
}()
return out
}
// targetGroupUpdate is a potentially changed/new target group
// for the given scrape configuration.
type targetGroupUpdate struct {
tg *config.TargetGroup
scfg *config.ScrapeConfig
}
// Run starts background processing to handle target updates.
func (tm *TargetManager) Run() {
log.Info("Starting target manager...")
tm.done = make(chan struct{})
sources := map[string]struct{}{}
updates := []<-chan targetGroupUpdate{}
for scfg, provs := range tm.providers {
for _, prov := range provs {
ch := make(chan *config.TargetGroup)
go tm.handleTargetUpdates(scfg, ch)
// Get an initial set of available sources so we don't remove
// target groups from the last run that are still available.
for _, src := range prov.Sources() {
sources[src] = struct{}{}
}
tgc := make(chan *config.TargetGroup)
// Run the target provider after cleanup of the stale targets is done.
defer func(p TargetProvider, c chan *config.TargetGroup) {
go p.Run(c)
}(prov, ch)
defer func(prov TargetProvider, tgc chan *config.TargetGroup) {
go prov.Run(tgc, tm.done)
}(prov, tgc)
tgupc := make(chan targetGroupUpdate)
updates = append(updates, tgupc)
go func(scfg *config.ScrapeConfig) {
defer close(tgupc)
for {
select {
case tg := <-tgc:
if tg == nil {
break
}
tgupc <- targetGroupUpdate{tg: tg, scfg: scfg}
case <-tm.done:
return
}
}
}(scfg)
}
}
tm.m.Lock()
defer tm.m.Unlock()
// Merge all channels of incoming target group updates into a single
// one and keep applying the updates.
go tm.handleUpdates(merge(tm.done, updates...), tm.done)
tm.mtx.Lock()
defer tm.mtx.Unlock()
// Remove old target groups that are no longer in the set of sources.
tm.removeTargets(func(src string) bool {
if _, ok := sources[src]; ok {
return false
@ -110,24 +174,32 @@ func (tm *TargetManager) Run() {
// handleTargetUpdates receives target group updates and handles them in the
// context of the given job config.
func (tm *TargetManager) handleTargetUpdates(cfg *config.ScrapeConfig, ch <-chan *config.TargetGroup) {
for tg := range ch {
log.Debugf("Received potential update for target group %q", tg.Source)
func (tm *TargetManager) handleUpdates(ch <-chan targetGroupUpdate, done <-chan struct{}) {
for {
select {
case update := <-ch:
if update.tg == nil {
break
}
log.Debugf("Received potential update for target group %q", update.tg.Source)
if err := tm.updateTargetGroup(tg, cfg); err != nil {
log.Errorf("Error updating targets: %s", err)
if err := tm.updateTargetGroup(update.tg, update.scfg); err != nil {
log.Errorf("Error updating targets: %s", err)
}
case <-done:
return
}
}
}
// Stop all background processing.
func (tm *TargetManager) Stop() {
tm.m.RLock()
tm.mtx.RLock()
if tm.running {
defer tm.stop(true)
}
// Return the lock before calling tm.stop().
defer tm.m.RUnlock()
defer tm.mtx.RUnlock()
}
// stop background processing of the target manager. If removeTargets is true,
@ -136,25 +208,10 @@ func (tm *TargetManager) stop(removeTargets bool) {
log.Info("Stopping target manager...")
defer log.Info("Target manager stopped.")
tm.m.Lock()
provs := []TargetProvider{}
for _, ps := range tm.providers {
provs = append(provs, ps...)
}
tm.m.Unlock()
close(tm.done)
var wg sync.WaitGroup
wg.Add(len(provs))
for _, prov := range provs {
go func(p TargetProvider) {
p.Stop()
wg.Done()
}(prov)
}
wg.Wait()
tm.m.Lock()
defer tm.m.Unlock()
tm.mtx.Lock()
defer tm.mtx.Unlock()
if removeTargets {
tm.removeTargets(nil)
@ -194,8 +251,8 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf
return err
}
tm.m.Lock()
defer tm.m.Unlock()
tm.mtx.Lock()
defer tm.mtx.Unlock()
if !tm.running {
return nil
@ -261,8 +318,8 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf
// Pools returns the targets currently being scraped bucketed by their job name.
func (tm *TargetManager) Pools() map[string][]*Target {
tm.m.RLock()
defer tm.m.RUnlock()
tm.mtx.RLock()
defer tm.mtx.RUnlock()
pools := map[string][]*Target{}
@ -279,9 +336,9 @@ func (tm *TargetManager) Pools() map[string][]*Target {
// by the new cfg. The state of targets that are valid in the new configuration remains unchanged.
// Returns true on success.
func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool {
tm.m.RLock()
tm.mtx.RLock()
running := tm.running
tm.m.RUnlock()
tm.mtx.RUnlock()
if running {
tm.stop(false)
@ -294,8 +351,8 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool {
providers[scfg] = providersFromConfig(scfg)
}
tm.m.Lock()
defer tm.m.Unlock()
tm.mtx.Lock()
defer tm.mtx.Unlock()
tm.globalLabels = cfg.GlobalConfig.Labels
tm.providers = providers
@ -325,15 +382,23 @@ func (tp *prefixedTargetProvider) Sources() []string {
return srcs
}
func (tp *prefixedTargetProvider) Run(ch chan<- *config.TargetGroup) {
func (tp *prefixedTargetProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
defer close(ch)
ch2 := make(chan *config.TargetGroup)
go tp.TargetProvider.Run(ch2)
go tp.TargetProvider.Run(ch2, done)
for tg := range ch2 {
tg.Source = tp.prefix(tg.Source)
ch <- tg
for {
select {
case <-done:
return
case tg := <-ch2:
if tg == nil {
break
}
tg.Source = tp.prefix(tg.Source)
ch <- tg
}
}
}
@ -382,8 +447,8 @@ func providersFromConfig(cfg *config.ScrapeConfig) []TargetProvider {
// targetsFromGroup builds targets based on the given TargetGroup and config.
func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) {
tm.m.RLock()
defer tm.m.RUnlock()
tm.mtx.RLock()
defer tm.mtx.RUnlock()
targets := make([]*Target, 0, len(tg.Targets))
for i, labels := range tg.Targets {
@ -470,15 +535,17 @@ func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider {
}
// Run implements the TargetProvider interface.
func (sd *StaticProvider) Run(ch chan<- *config.TargetGroup) {
for _, tg := range sd.TargetGroups {
ch <- tg
}
close(ch) // This provider never sends any updates.
}
func (sd *StaticProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
defer close(ch)
// Stop implements the TargetProvider interface.
func (sd *StaticProvider) Stop() {}
for _, tg := range sd.TargetGroups {
select {
case <-done:
return
case ch <- tg:
}
}
}
// TargetGroups returns the provider's target groups.
func (sd *StaticProvider) Sources() (srcs []string) {

View File

@ -54,7 +54,10 @@ func TestPrefixedTargetProvider(t *testing.T) {
}
ch := make(chan *config.TargetGroup)
go tp.Run(ch)
done := make(chan struct{})
defer close(done)
go tp.Run(ch, done)
expGroup1 := *targetGroups[0]
expGroup2 := *targetGroups[1]
@ -347,10 +350,10 @@ func TestTargetManagerConfigUpdate(t *testing.T) {
conf.ScrapeConfigs = step.scrapeConfigs
targetManager.ApplyConfig(conf)
<-time.After(1 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
if len(targetManager.targets) != len(step.expected) {
t.Fatalf("step %d: sources mismatch %v, %v", targetManager.targets, step.expected)
t.Fatalf("step %d: sources mismatch: expected %v, got %v", i, step.expected, targetManager.targets)
}
for source, actTargets := range targetManager.targets {