From 5837e6a97fb47af2b43cbcd5dae0fc4b27728191 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 25 Apr 2016 16:56:27 +0200 Subject: [PATCH 1/3] discovery: move consul SD into own package --- retrieval/discovery/consul.go | 255 +------------------------ retrieval/discovery/consul/consul.go | 270 +++++++++++++++++++++++++++ retrieval/targetmanager.go | 2 +- 3 files changed, 275 insertions(+), 252 deletions(-) create mode 100644 retrieval/discovery/consul/consul.go diff --git a/retrieval/discovery/consul.go b/retrieval/discovery/consul.go index 512008170..33560fb23 100644 --- a/retrieval/discovery/consul.go +++ b/retrieval/discovery/consul.go @@ -1,4 +1,4 @@ -// Copyright 2015 The Prometheus Authors +// Copyright 2016 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -14,257 +14,10 @@ package discovery import ( - "fmt" - "strconv" - "strings" - "sync" - "time" - - consul "github.com/hashicorp/consul/api" - "github.com/prometheus/common/log" - "github.com/prometheus/common/model" - "golang.org/x/net/context" - "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/retrieval/discovery/consul" ) -const ( - consulWatchTimeout = 30 * time.Second - consulRetryInterval = 15 * time.Second - - // consulAddressLabel is the name for the label containing a target's address. - consulAddressLabel = model.MetaLabelPrefix + "consul_address" - // consulNodeLabel is the name for the label containing a target's node name. - consulNodeLabel = model.MetaLabelPrefix + "consul_node" - // consulTagsLabel is the name of the label containing the tags assigned to the target. - consulTagsLabel = model.MetaLabelPrefix + "consul_tags" - // consulServiceLabel is the name of the label containing the service name. - consulServiceLabel = model.MetaLabelPrefix + "consul_service" - // consulServiceAddressLabel is the name of the label containing the (optional) service address. - consulServiceAddressLabel = model.MetaLabelPrefix + "consul_service_address" - // consulServicePortLabel is the name of the label containing the service port. - consulServicePortLabel = model.MetaLabelPrefix + "consul_service_port" - // consulDCLabel is the name of the label containing the datacenter ID. - consulDCLabel = model.MetaLabelPrefix + "consul_dc" - // consulServiceIDLabel is the name of the label containing the service ID. - consulServiceIDLabel = model.MetaLabelPrefix + "consul_service_id" -) - -// ConsulDiscovery retrieves target information from a Consul server -// and updates them via watches. -type ConsulDiscovery struct { - client *consul.Client - clientConf *consul.Config - clientDatacenter string - tagSeparator string - scrapedServices map[string]struct{} - - mu sync.RWMutex - services map[string]*consulService -} - -// consulService contains data belonging to the same service. -type consulService struct { - name string - tgroup config.TargetGroup - lastIndex uint64 - removed bool - running bool - done chan struct{} -} - -// NewConsulDiscovery returns a new ConsulDiscovery for the given config. -func NewConsulDiscovery(conf *config.ConsulSDConfig) (*ConsulDiscovery, error) { - clientConf := &consul.Config{ - Address: conf.Server, - Scheme: conf.Scheme, - Datacenter: conf.Datacenter, - Token: conf.Token, - HttpAuth: &consul.HttpBasicAuth{ - Username: conf.Username, - Password: conf.Password, - }, - } - client, err := consul.NewClient(clientConf) - if err != nil { - return nil, err - } - cd := &ConsulDiscovery{ - client: client, - clientConf: clientConf, - tagSeparator: conf.TagSeparator, - scrapedServices: map[string]struct{}{}, - services: map[string]*consulService{}, - } - // If the datacenter isn't set in the clientConf, let's get it from the local Consul agent - // (Consul default is to use local node's datacenter if one isn't given for a query). - if clientConf.Datacenter == "" { - info, err := client.Agent().Self() - if err != nil { - return nil, err - } - cd.clientDatacenter = info["Config"]["Datacenter"].(string) - } else { - cd.clientDatacenter = clientConf.Datacenter - } - for _, name := range conf.Services { - cd.scrapedServices[name] = struct{}{} - } - return cd, nil -} - -// Run implements the TargetProvider interface. -func (cd *ConsulDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) - defer cd.stop() - - update := make(chan *consulService, 10) - go cd.watchServices(update, ctx.Done()) - - for { - select { - case <-ctx.Done(): - return - case srv := <-update: - if srv.removed { - close(srv.done) - - // Send clearing update. - ch <- []*config.TargetGroup{{Source: srv.name}} - break - } - // Launch watcher for the service. - if !srv.running { - go cd.watchService(srv, ch) - srv.running = true - } - } - } -} - -func (cd *ConsulDiscovery) stop() { - // The lock prevents Run from terminating while the watchers attempt - // to send on their channels. - cd.mu.Lock() - defer cd.mu.Unlock() - - for _, srv := range cd.services { - close(srv.done) - } -} - -// watchServices retrieves updates from Consul's services endpoint and sends -// potential updates to the update channel. -func (cd *ConsulDiscovery) watchServices(update chan<- *consulService, done <-chan struct{}) { - var lastIndex uint64 - for { - catalog := cd.client.Catalog() - srvs, meta, err := catalog.Services(&consul.QueryOptions{ - WaitIndex: lastIndex, - WaitTime: consulWatchTimeout, - }) - if err != nil { - log.Errorf("Error refreshing service list: %s", err) - time.Sleep(consulRetryInterval) - continue - } - // If the index equals the previous one, the watch timed out with no update. - if meta.LastIndex == lastIndex { - continue - } - lastIndex = meta.LastIndex - - cd.mu.Lock() - select { - case <-done: - cd.mu.Unlock() - return - default: - // Continue. - } - // Check for new services. - for name := range srvs { - if _, ok := cd.scrapedServices[name]; len(cd.scrapedServices) > 0 && !ok { - continue - } - srv, ok := cd.services[name] - if !ok { - srv = &consulService{ - name: name, - done: make(chan struct{}), - } - srv.tgroup.Source = name - cd.services[name] = srv - } - srv.tgroup.Labels = model.LabelSet{ - consulServiceLabel: model.LabelValue(name), - consulDCLabel: model.LabelValue(cd.clientDatacenter), - } - update <- srv - } - // Check for removed services. - for name, srv := range cd.services { - if _, ok := srvs[name]; !ok { - srv.removed = true - update <- srv - delete(cd.services, name) - } - } - cd.mu.Unlock() - } -} - -// watchService retrieves updates about srv from Consul's service endpoint. -// On a potential update the resulting target group is sent to ch. -func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- []*config.TargetGroup) { - catalog := cd.client.Catalog() - for { - nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{ - WaitIndex: srv.lastIndex, - WaitTime: consulWatchTimeout, - }) - if err != nil { - log.Errorf("Error refreshing service %s: %s", srv.name, err) - time.Sleep(consulRetryInterval) - continue - } - // If the index equals the previous one, the watch timed out with no update. - if meta.LastIndex == srv.lastIndex { - continue - } - srv.lastIndex = meta.LastIndex - srv.tgroup.Targets = make([]model.LabelSet, 0, len(nodes)) - - for _, node := range nodes { - addr := fmt.Sprintf("%s:%d", node.Address, node.ServicePort) - // We surround the separated list with the separator as well. This way regular expressions - // in relabeling rules don't have to consider tag positions. - tags := cd.tagSeparator + strings.Join(node.ServiceTags, cd.tagSeparator) + cd.tagSeparator - - srv.tgroup.Targets = append(srv.tgroup.Targets, model.LabelSet{ - model.AddressLabel: model.LabelValue(addr), - consulAddressLabel: model.LabelValue(node.Address), - consulNodeLabel: model.LabelValue(node.Node), - consulTagsLabel: model.LabelValue(tags), - consulServiceAddressLabel: model.LabelValue(node.ServiceAddress), - consulServicePortLabel: model.LabelValue(strconv.Itoa(node.ServicePort)), - consulServiceIDLabel: model.LabelValue(node.ServiceID), - }) - } - - cd.mu.Lock() - select { - case <-srv.done: - cd.mu.Unlock() - return - default: - // Continue. - } - // TODO(fabxc): do a copy for now to avoid races. The integration - // needs needs some general cleanup. - tg := srv.tgroup - ch <- []*config.TargetGroup{&tg} - - cd.mu.Unlock() - } +func NewConsul(cfg *config.ConsulSDConfig) (*consul.Discovery, error) { + return nil, nil } diff --git a/retrieval/discovery/consul/consul.go b/retrieval/discovery/consul/consul.go new file mode 100644 index 000000000..871a291a9 --- /dev/null +++ b/retrieval/discovery/consul/consul.go @@ -0,0 +1,270 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consul + +import ( + "fmt" + "strconv" + "strings" + "sync" + "time" + + consul "github.com/hashicorp/consul/api" + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "golang.org/x/net/context" + + "github.com/prometheus/prometheus/config" +) + +const ( + watchTimeout = 30 * time.Second + retryInterval = 15 * time.Second + + // addressLabel is the name for the label containing a target's address. + addressLabel = model.MetaLabelPrefix + "consul_address" + // nodeLabel is the name for the label containing a target's node name. + nodeLabel = model.MetaLabelPrefix + "consul_node" + // tagsLabel is the name of the label containing the tags assigned to the target. + tagsLabel = model.MetaLabelPrefix + "consul_tags" + // serviceLabel is the name of the label containing the service name. + serviceLabel = model.MetaLabelPrefix + "consul_service" + // serviceAddressLabel is the name of the label containing the (optional) service address. + serviceAddressLabel = model.MetaLabelPrefix + "consul_service_address" + //servicePortLabel is the name of the label containing the service port. + servicePortLabel = model.MetaLabelPrefix + "consul_service_port" + // datacenterLabel is the name of the label containing the datacenter ID. + datacenterLabel = model.MetaLabelPrefix + "consul_dc" + // serviceIDLabel is the name of the label containing the service ID. + serviceIDLabel = model.MetaLabelPrefix + "consul_service_id" +) + +// Discovery retrieves target information from a Consul server +// and updates them via watches. +type Discovery struct { + client *consul.Client + clientConf *consul.Config + clientDatacenter string + tagSeparator string + scrapedServices map[string]struct{} + + mu sync.RWMutex + services map[string]*consulService +} + +// consulService contains data belonging to the same service. +type consulService struct { + name string + tgroup config.TargetGroup + lastIndex uint64 + removed bool + running bool + done chan struct{} +} + +// NewDiscovery returns a new Discovery for the given config. +func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) { + clientConf := &consul.Config{ + Address: conf.Server, + Scheme: conf.Scheme, + Datacenter: conf.Datacenter, + Token: conf.Token, + HttpAuth: &consul.HttpBasicAuth{ + Username: conf.Username, + Password: conf.Password, + }, + } + client, err := consul.NewClient(clientConf) + if err != nil { + return nil, err + } + cd := &Discovery{ + client: client, + clientConf: clientConf, + tagSeparator: conf.TagSeparator, + scrapedServices: map[string]struct{}{}, + services: map[string]*consulService{}, + } + // If the datacenter isn't set in the clientConf, let's get it from the local Consul agent + // (Consul default is to use local node's datacenter if one isn't given for a query). + if clientConf.Datacenter == "" { + info, err := client.Agent().Self() + if err != nil { + return nil, err + } + cd.clientDatacenter = info["Config"]["Datacenter"].(string) + } else { + cd.clientDatacenter = clientConf.Datacenter + } + for _, name := range conf.Services { + cd.scrapedServices[name] = struct{}{} + } + return cd, nil +} + +// Run implements the TargetProvider interface. +func (cd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + defer close(ch) + defer cd.stop() + + update := make(chan *consulService, 10) + go cd.watchServices(update, ctx.Done()) + + for { + select { + case <-ctx.Done(): + return + case srv := <-update: + if srv.removed { + close(srv.done) + + // Send clearing update. + ch <- []*config.TargetGroup{{Source: srv.name}} + break + } + // Launch watcher for the service. + if !srv.running { + go cd.watchService(srv, ch) + srv.running = true + } + } + } +} + +func (cd *Discovery) stop() { + // The lock prevents Run from terminating while the watchers attempt + // to send on their channels. + cd.mu.Lock() + defer cd.mu.Unlock() + + for _, srv := range cd.services { + close(srv.done) + } +} + +// watchServices retrieves updates from Consul's services endpoint and sends +// potential updates to the update channel. +func (cd *Discovery) watchServices(update chan<- *consulService, done <-chan struct{}) { + var lastIndex uint64 + for { + catalog := cd.client.Catalog() + srvs, meta, err := catalog.Services(&consul.QueryOptions{ + WaitIndex: lastIndex, + WaitTime: watchTimeout, + }) + if err != nil { + log.Errorf("Error refreshing service list: %s", err) + time.Sleep(retryInterval) + continue + } + // If the index equals the previous one, the watch timed out with no update. + if meta.LastIndex == lastIndex { + continue + } + lastIndex = meta.LastIndex + + cd.mu.Lock() + select { + case <-done: + cd.mu.Unlock() + return + default: + // Continue. + } + // Check for new services. + for name := range srvs { + if _, ok := cd.scrapedServices[name]; len(cd.scrapedServices) > 0 && !ok { + continue + } + srv, ok := cd.services[name] + if !ok { + srv = &consulService{ + name: name, + done: make(chan struct{}), + } + srv.tgroup.Source = name + cd.services[name] = srv + } + srv.tgroup.Labels = model.LabelSet{ + serviceLabel: model.LabelValue(name), + datacenterLabel: model.LabelValue(cd.clientDatacenter), + } + update <- srv + } + // Check for removed services. + for name, srv := range cd.services { + if _, ok := srvs[name]; !ok { + srv.removed = true + update <- srv + delete(cd.services, name) + } + } + cd.mu.Unlock() + } +} + +// watchService retrieves updates about srv from Consul's service endpoint. +// On a potential update the resulting target group is sent to ch. +func (cd *Discovery) watchService(srv *consulService, ch chan<- []*config.TargetGroup) { + catalog := cd.client.Catalog() + for { + nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{ + WaitIndex: srv.lastIndex, + WaitTime: watchTimeout, + }) + if err != nil { + log.Errorf("Error refreshing service %s: %s", srv.name, err) + time.Sleep(retryInterval) + continue + } + // If the index equals the previous one, the watch timed out with no update. + if meta.LastIndex == srv.lastIndex { + continue + } + srv.lastIndex = meta.LastIndex + srv.tgroup.Targets = make([]model.LabelSet, 0, len(nodes)) + + for _, node := range nodes { + addr := fmt.Sprintf("%s:%d", node.Address, node.ServicePort) + // We surround the separated list with the separator as well. This way regular expressions + // in relabeling rules don't have to consider tag positions. + tags := cd.tagSeparator + strings.Join(node.ServiceTags, cd.tagSeparator) + cd.tagSeparator + + srv.tgroup.Targets = append(srv.tgroup.Targets, model.LabelSet{ + model.AddressLabel: model.LabelValue(addr), + addressLabel: model.LabelValue(node.Address), + nodeLabel: model.LabelValue(node.Node), + tagsLabel: model.LabelValue(tags), + serviceAddressLabel: model.LabelValue(node.ServiceAddress), + servicePortLabel: model.LabelValue(strconv.Itoa(node.ServicePort)), + serviceIDLabel: model.LabelValue(node.ServiceID), + }) + } + + cd.mu.Lock() + select { + case <-srv.done: + cd.mu.Unlock() + return + default: + // Continue. + } + // TODO(fabxc): do a copy for now to avoid races. The integration + // needs needs some general cleanup. + tg := srv.tgroup + ch <- []*config.TargetGroup{&tg} + + cd.mu.Unlock() + } +} diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 77443e31e..6886d503e 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -366,7 +366,7 @@ func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider { app("file", i, discovery.NewFileDiscovery(c)) } for i, c := range cfg.ConsulSDConfigs { - k, err := discovery.NewConsulDiscovery(c) + k, err := discovery.NewConsul(c) if err != nil { log.Errorf("Cannot create Consul discovery: %s", err) continue From e805e68c012a9df25cb0ba1bf3b77eac5920db5b Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 25 Apr 2016 18:40:39 +0200 Subject: [PATCH 2/3] discovery: sanitize Consul service discovery This commits simplifies the SD's structure and ensures that all channel sends are checked against a canceled context. --- retrieval/discovery/consul/consul.go | 214 ++++++++++++--------------- 1 file changed, 97 insertions(+), 117 deletions(-) diff --git a/retrieval/discovery/consul/consul.go b/retrieval/discovery/consul/consul.go index 871a291a9..78c43e16d 100644 --- a/retrieval/discovery/consul/consul.go +++ b/retrieval/discovery/consul/consul.go @@ -17,7 +17,6 @@ import ( "fmt" "strconv" "strings" - "sync" "time" consul "github.com/hashicorp/consul/api" @@ -57,20 +56,7 @@ type Discovery struct { clientConf *consul.Config clientDatacenter string tagSeparator string - scrapedServices map[string]struct{} - - mu sync.RWMutex - services map[string]*consulService -} - -// consulService contains data belonging to the same service. -type consulService struct { - name string - tgroup config.TargetGroup - lastIndex uint64 - removed bool - running bool - done chan struct{} + scrapedServices map[string]struct{} // Set of services which will be discovered. } // NewDiscovery returns a new Discovery for the given config. @@ -94,7 +80,6 @@ func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) { clientConf: clientConf, tagSeparator: conf.TagSeparator, scrapedServices: map[string]struct{}{}, - services: map[string]*consulService{}, } // If the datacenter isn't set in the clientConf, let's get it from the local Consul agent // (Consul default is to use local node's datacenter if one isn't given for a query). @@ -116,46 +101,10 @@ func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) { // Run implements the TargetProvider interface. func (cd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { defer close(ch) - defer cd.stop() - update := make(chan *consulService, 10) - go cd.watchServices(update, ctx.Done()) + // Watched services and their cancelation functions. + services := map[string]func(){} - for { - select { - case <-ctx.Done(): - return - case srv := <-update: - if srv.removed { - close(srv.done) - - // Send clearing update. - ch <- []*config.TargetGroup{{Source: srv.name}} - break - } - // Launch watcher for the service. - if !srv.running { - go cd.watchService(srv, ch) - srv.running = true - } - } - } -} - -func (cd *Discovery) stop() { - // The lock prevents Run from terminating while the watchers attempt - // to send on their channels. - cd.mu.Lock() - defer cd.mu.Unlock() - - for _, srv := range cd.services { - close(srv.done) - } -} - -// watchServices retrieves updates from Consul's services endpoint and sends -// potential updates to the update channel. -func (cd *Discovery) watchServices(update chan<- *consulService, done <-chan struct{}) { var lastIndex uint64 for { catalog := cd.client.Catalog() @@ -163,6 +112,15 @@ func (cd *Discovery) watchServices(update chan<- *consulService, done <-chan str WaitIndex: lastIndex, WaitTime: watchTimeout, }) + + // We have to check the context at least once. The checks during channel sends + // do not guarantee that. + select { + case <-ctx.Done(): + return + default: + } + if err != nil { log.Errorf("Error refreshing service list: %s", err) time.Sleep(retryInterval) @@ -174,74 +132,100 @@ func (cd *Discovery) watchServices(update chan<- *consulService, done <-chan str } lastIndex = meta.LastIndex - cd.mu.Lock() + // Check for new services. + for name := range srvs { + // If no restriction on scraped services is set, we scrape everything. + if _, ok := cd.scrapedServices[name]; len(cd.scrapedServices) > 0 && !ok { + continue + } + if _, ok := services[name]; ok { + continue // We are already watching the service. + } + + srv := &consulService{ + client: cd.client, + name: name, + labels: model.LabelSet{ + serviceLabel: model.LabelValue(name), + datacenterLabel: model.LabelValue(cd.clientDatacenter), + }, + tagSeparator: cd.tagSeparator, + } + + wctx, cancel := context.WithCancel(ctx) + go srv.watch(wctx, ch) + + services[name] = cancel + } + + // Check for removed services. + for name, cancel := range services { + if _, ok := srvs[name]; !ok { + // Call the watch cancelation function. + cancel() + delete(services, name) + + // Send clearing target group. + select { + case <-ctx.Done(): + return + case ch <- []*config.TargetGroup{{Source: name}}: + } + } + } + } +} + +// consulService contains data belonging to the same service. +type consulService struct { + name string + labels model.LabelSet + client *consul.Client + tagSeparator string +} + +func (srv *consulService) watch(ctx context.Context, ch chan<- []*config.TargetGroup) { + catalog := srv.client.Catalog() + + lastIndex := uint64(0) + for { + nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{ + WaitIndex: lastIndex, + WaitTime: watchTimeout, + }) + // Check the context before potentially falling in a continue-loop. select { - case <-done: - cd.mu.Unlock() + case <-ctx.Done(): return default: // Continue. } - // Check for new services. - for name := range srvs { - if _, ok := cd.scrapedServices[name]; len(cd.scrapedServices) > 0 && !ok { - continue - } - srv, ok := cd.services[name] - if !ok { - srv = &consulService{ - name: name, - done: make(chan struct{}), - } - srv.tgroup.Source = name - cd.services[name] = srv - } - srv.tgroup.Labels = model.LabelSet{ - serviceLabel: model.LabelValue(name), - datacenterLabel: model.LabelValue(cd.clientDatacenter), - } - update <- srv - } - // Check for removed services. - for name, srv := range cd.services { - if _, ok := srvs[name]; !ok { - srv.removed = true - update <- srv - delete(cd.services, name) - } - } - cd.mu.Unlock() - } -} -// watchService retrieves updates about srv from Consul's service endpoint. -// On a potential update the resulting target group is sent to ch. -func (cd *Discovery) watchService(srv *consulService, ch chan<- []*config.TargetGroup) { - catalog := cd.client.Catalog() - for { - nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{ - WaitIndex: srv.lastIndex, - WaitTime: watchTimeout, - }) if err != nil { log.Errorf("Error refreshing service %s: %s", srv.name, err) time.Sleep(retryInterval) continue } // If the index equals the previous one, the watch timed out with no update. - if meta.LastIndex == srv.lastIndex { + if meta.LastIndex == lastIndex { continue } - srv.lastIndex = meta.LastIndex - srv.tgroup.Targets = make([]model.LabelSet, 0, len(nodes)) + lastIndex = meta.LastIndex + + tgroup := config.TargetGroup{ + Source: srv.name, + Labels: srv.labels, + Targets: make([]model.LabelSet, 0, len(nodes)), + } for _, node := range nodes { - addr := fmt.Sprintf("%s:%d", node.Address, node.ServicePort) - // We surround the separated list with the separator as well. This way regular expressions - // in relabeling rules don't have to consider tag positions. - tags := cd.tagSeparator + strings.Join(node.ServiceTags, cd.tagSeparator) + cd.tagSeparator - - srv.tgroup.Targets = append(srv.tgroup.Targets, model.LabelSet{ + var ( + addr = fmt.Sprintf("%s:%d", node.Address, node.ServicePort) + // We surround the separated list with the separator as well. This way regular expressions + // in relabeling rules don't have to consider tag positions. + tags = srv.tagSeparator + strings.Join(node.ServiceTags, srv.tagSeparator) + srv.tagSeparator + ) + tgroup.Targets = append(tgroup.Targets, model.LabelSet{ model.AddressLabel: model.LabelValue(addr), addressLabel: model.LabelValue(node.Address), nodeLabel: model.LabelValue(node.Node), @@ -251,20 +235,16 @@ func (cd *Discovery) watchService(srv *consulService, ch chan<- []*config.Target serviceIDLabel: model.LabelValue(node.ServiceID), }) } - - cd.mu.Lock() + // Check context twice to ensure we always catch cancelation. select { - case <-srv.done: - cd.mu.Unlock() + case <-ctx.Done(): return default: - // Continue. } - // TODO(fabxc): do a copy for now to avoid races. The integration - // needs needs some general cleanup. - tg := srv.tgroup - ch <- []*config.TargetGroup{&tg} - - cd.mu.Unlock() + select { + case <-ctx.Done(): + return + case ch <- []*config.TargetGroup{&tgroup}: + } } } From 086f7caceb3daa81fdabe9ba63792d764ebb0131 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 25 Apr 2016 18:47:21 +0200 Subject: [PATCH 3/3] discovery: extract Consul shouldWatch logic --- retrieval/discovery/consul.go | 2 +- retrieval/discovery/consul/consul.go | 24 +++++++++++++++++------- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/retrieval/discovery/consul.go b/retrieval/discovery/consul.go index 33560fb23..7c7f7349b 100644 --- a/retrieval/discovery/consul.go +++ b/retrieval/discovery/consul.go @@ -19,5 +19,5 @@ import ( ) func NewConsul(cfg *config.ConsulSDConfig) (*consul.Discovery, error) { - return nil, nil + return consul.NewDiscovery(cfg) } diff --git a/retrieval/discovery/consul/consul.go b/retrieval/discovery/consul/consul.go index 78c43e16d..daf288eea 100644 --- a/retrieval/discovery/consul/consul.go +++ b/retrieval/discovery/consul/consul.go @@ -56,7 +56,7 @@ type Discovery struct { clientConf *consul.Config clientDatacenter string tagSeparator string - scrapedServices map[string]struct{} // Set of services which will be discovered. + watchedServices []string // Set of services which will be discovered. } // NewDiscovery returns a new Discovery for the given config. @@ -79,7 +79,7 @@ func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) { client: client, clientConf: clientConf, tagSeparator: conf.TagSeparator, - scrapedServices: map[string]struct{}{}, + watchedServices: conf.Services, } // If the datacenter isn't set in the clientConf, let's get it from the local Consul agent // (Consul default is to use local node's datacenter if one isn't given for a query). @@ -92,12 +92,23 @@ func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) { } else { cd.clientDatacenter = clientConf.Datacenter } - for _, name := range conf.Services { - cd.scrapedServices[name] = struct{}{} - } return cd, nil } +// shouldWatch returns whether the service of the given name should be watched. +func (cd *Discovery) shouldWatch(name string) bool { + // If there's no fixed set of watched services, we watch everything. + if len(cd.watchedServices) == 0 { + return true + } + for _, sn := range cd.watchedServices { + if sn == name { + return true + } + } + return false +} + // Run implements the TargetProvider interface. func (cd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { defer close(ch) @@ -134,8 +145,7 @@ func (cd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { // Check for new services. for name := range srvs { - // If no restriction on scraped services is set, we scrape everything. - if _, ok := cd.scrapedServices[name]; len(cd.scrapedServices) > 0 && !ok { + if !cd.shouldWatch(name) { continue } if _, ok := services[name]; ok {