give each tree cache its unique channel to avoid multiple close on the same channel

Signed-off-by: Ye Ji <ye@hioscar.com>
This commit is contained in:
Ye Ji 2019-07-09 16:18:30 -04:00 committed by Frederic Branczyk
parent 4ef66003d9
commit 8873e49a45
No known key found for this signature in database
GPG Key ID: 7741A52782A90069
1 changed files with 22 additions and 5 deletions

View File

@ -108,8 +108,9 @@ type Discovery struct {
sources map[string]*targetgroup.Group
updates chan treecache.ZookeeperTreeCacheEvent
treeCaches []*treecache.ZookeeperTreeCache
updates chan treecache.ZookeeperTreeCacheEvent
pathUpdates []chan treecache.ZookeeperTreeCacheEvent
treeCaches []*treecache.ZookeeperTreeCache
parse func(data []byte, path string) (model.LabelSet, error)
logger log.Logger
@ -155,7 +156,9 @@ func NewDiscovery(
logger: logger,
}
for _, path := range paths {
sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, updates, logger))
pathUpdate := make(chan treecache.ZookeeperTreeCacheEvent)
sd.pathUpdates = append(sd.pathUpdates, pathUpdate)
sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, pathUpdate, logger))
}
return sd, nil
}
@ -166,12 +169,26 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
for _, tc := range d.treeCaches {
tc.Stop()
}
// Drain event channel in case the treecache leaks goroutines otherwise.
for range d.updates {
for _, pathUpdate := range d.pathUpdates {
// Drain event channel in case the treecache leaks goroutines otherwise.
for range pathUpdate {
}
}
d.conn.Close()
}()
for _, pathUpdate := range d.pathUpdates {
go func(update chan treecache.ZookeeperTreeCacheEvent) {
for event := range update {
select {
case d.updates <- event:
case <-ctx.Done():
return
}
}
}(pathUpdate)
}
for {
select {
case <-ctx.Done():