diff --git a/retrieval/discovery/consul.go b/retrieval/discovery/consul.go index 512008170..7c7f7349b 100644 --- a/retrieval/discovery/consul.go +++ b/retrieval/discovery/consul.go @@ -1,4 +1,4 @@ -// Copyright 2015 The Prometheus Authors +// Copyright 2016 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -14,257 +14,10 @@ package discovery import ( - "fmt" - "strconv" - "strings" - "sync" - "time" - - consul "github.com/hashicorp/consul/api" - "github.com/prometheus/common/log" - "github.com/prometheus/common/model" - "golang.org/x/net/context" - "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/retrieval/discovery/consul" ) -const ( - consulWatchTimeout = 30 * time.Second - consulRetryInterval = 15 * time.Second - - // consulAddressLabel is the name for the label containing a target's address. - consulAddressLabel = model.MetaLabelPrefix + "consul_address" - // consulNodeLabel is the name for the label containing a target's node name. - consulNodeLabel = model.MetaLabelPrefix + "consul_node" - // consulTagsLabel is the name of the label containing the tags assigned to the target. - consulTagsLabel = model.MetaLabelPrefix + "consul_tags" - // consulServiceLabel is the name of the label containing the service name. - consulServiceLabel = model.MetaLabelPrefix + "consul_service" - // consulServiceAddressLabel is the name of the label containing the (optional) service address. - consulServiceAddressLabel = model.MetaLabelPrefix + "consul_service_address" - // consulServicePortLabel is the name of the label containing the service port. - consulServicePortLabel = model.MetaLabelPrefix + "consul_service_port" - // consulDCLabel is the name of the label containing the datacenter ID. - consulDCLabel = model.MetaLabelPrefix + "consul_dc" - // consulServiceIDLabel is the name of the label containing the service ID. - consulServiceIDLabel = model.MetaLabelPrefix + "consul_service_id" -) - -// ConsulDiscovery retrieves target information from a Consul server -// and updates them via watches. -type ConsulDiscovery struct { - client *consul.Client - clientConf *consul.Config - clientDatacenter string - tagSeparator string - scrapedServices map[string]struct{} - - mu sync.RWMutex - services map[string]*consulService -} - -// consulService contains data belonging to the same service. -type consulService struct { - name string - tgroup config.TargetGroup - lastIndex uint64 - removed bool - running bool - done chan struct{} -} - -// NewConsulDiscovery returns a new ConsulDiscovery for the given config. -func NewConsulDiscovery(conf *config.ConsulSDConfig) (*ConsulDiscovery, error) { - clientConf := &consul.Config{ - Address: conf.Server, - Scheme: conf.Scheme, - Datacenter: conf.Datacenter, - Token: conf.Token, - HttpAuth: &consul.HttpBasicAuth{ - Username: conf.Username, - Password: conf.Password, - }, - } - client, err := consul.NewClient(clientConf) - if err != nil { - return nil, err - } - cd := &ConsulDiscovery{ - client: client, - clientConf: clientConf, - tagSeparator: conf.TagSeparator, - scrapedServices: map[string]struct{}{}, - services: map[string]*consulService{}, - } - // If the datacenter isn't set in the clientConf, let's get it from the local Consul agent - // (Consul default is to use local node's datacenter if one isn't given for a query). - if clientConf.Datacenter == "" { - info, err := client.Agent().Self() - if err != nil { - return nil, err - } - cd.clientDatacenter = info["Config"]["Datacenter"].(string) - } else { - cd.clientDatacenter = clientConf.Datacenter - } - for _, name := range conf.Services { - cd.scrapedServices[name] = struct{}{} - } - return cd, nil -} - -// Run implements the TargetProvider interface. -func (cd *ConsulDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) - defer cd.stop() - - update := make(chan *consulService, 10) - go cd.watchServices(update, ctx.Done()) - - for { - select { - case <-ctx.Done(): - return - case srv := <-update: - if srv.removed { - close(srv.done) - - // Send clearing update. - ch <- []*config.TargetGroup{{Source: srv.name}} - break - } - // Launch watcher for the service. - if !srv.running { - go cd.watchService(srv, ch) - srv.running = true - } - } - } -} - -func (cd *ConsulDiscovery) stop() { - // The lock prevents Run from terminating while the watchers attempt - // to send on their channels. - cd.mu.Lock() - defer cd.mu.Unlock() - - for _, srv := range cd.services { - close(srv.done) - } -} - -// watchServices retrieves updates from Consul's services endpoint and sends -// potential updates to the update channel. -func (cd *ConsulDiscovery) watchServices(update chan<- *consulService, done <-chan struct{}) { - var lastIndex uint64 - for { - catalog := cd.client.Catalog() - srvs, meta, err := catalog.Services(&consul.QueryOptions{ - WaitIndex: lastIndex, - WaitTime: consulWatchTimeout, - }) - if err != nil { - log.Errorf("Error refreshing service list: %s", err) - time.Sleep(consulRetryInterval) - continue - } - // If the index equals the previous one, the watch timed out with no update. - if meta.LastIndex == lastIndex { - continue - } - lastIndex = meta.LastIndex - - cd.mu.Lock() - select { - case <-done: - cd.mu.Unlock() - return - default: - // Continue. - } - // Check for new services. - for name := range srvs { - if _, ok := cd.scrapedServices[name]; len(cd.scrapedServices) > 0 && !ok { - continue - } - srv, ok := cd.services[name] - if !ok { - srv = &consulService{ - name: name, - done: make(chan struct{}), - } - srv.tgroup.Source = name - cd.services[name] = srv - } - srv.tgroup.Labels = model.LabelSet{ - consulServiceLabel: model.LabelValue(name), - consulDCLabel: model.LabelValue(cd.clientDatacenter), - } - update <- srv - } - // Check for removed services. - for name, srv := range cd.services { - if _, ok := srvs[name]; !ok { - srv.removed = true - update <- srv - delete(cd.services, name) - } - } - cd.mu.Unlock() - } -} - -// watchService retrieves updates about srv from Consul's service endpoint. -// On a potential update the resulting target group is sent to ch. -func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- []*config.TargetGroup) { - catalog := cd.client.Catalog() - for { - nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{ - WaitIndex: srv.lastIndex, - WaitTime: consulWatchTimeout, - }) - if err != nil { - log.Errorf("Error refreshing service %s: %s", srv.name, err) - time.Sleep(consulRetryInterval) - continue - } - // If the index equals the previous one, the watch timed out with no update. - if meta.LastIndex == srv.lastIndex { - continue - } - srv.lastIndex = meta.LastIndex - srv.tgroup.Targets = make([]model.LabelSet, 0, len(nodes)) - - for _, node := range nodes { - addr := fmt.Sprintf("%s:%d", node.Address, node.ServicePort) - // We surround the separated list with the separator as well. This way regular expressions - // in relabeling rules don't have to consider tag positions. - tags := cd.tagSeparator + strings.Join(node.ServiceTags, cd.tagSeparator) + cd.tagSeparator - - srv.tgroup.Targets = append(srv.tgroup.Targets, model.LabelSet{ - model.AddressLabel: model.LabelValue(addr), - consulAddressLabel: model.LabelValue(node.Address), - consulNodeLabel: model.LabelValue(node.Node), - consulTagsLabel: model.LabelValue(tags), - consulServiceAddressLabel: model.LabelValue(node.ServiceAddress), - consulServicePortLabel: model.LabelValue(strconv.Itoa(node.ServicePort)), - consulServiceIDLabel: model.LabelValue(node.ServiceID), - }) - } - - cd.mu.Lock() - select { - case <-srv.done: - cd.mu.Unlock() - return - default: - // Continue. - } - // TODO(fabxc): do a copy for now to avoid races. The integration - // needs needs some general cleanup. - tg := srv.tgroup - ch <- []*config.TargetGroup{&tg} - - cd.mu.Unlock() - } +func NewConsul(cfg *config.ConsulSDConfig) (*consul.Discovery, error) { + return consul.NewDiscovery(cfg) } diff --git a/retrieval/discovery/consul/consul.go b/retrieval/discovery/consul/consul.go new file mode 100644 index 000000000..daf288eea --- /dev/null +++ b/retrieval/discovery/consul/consul.go @@ -0,0 +1,260 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consul + +import ( + "fmt" + "strconv" + "strings" + "time" + + consul "github.com/hashicorp/consul/api" + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "golang.org/x/net/context" + + "github.com/prometheus/prometheus/config" +) + +const ( + watchTimeout = 30 * time.Second + retryInterval = 15 * time.Second + + // addressLabel is the name for the label containing a target's address. + addressLabel = model.MetaLabelPrefix + "consul_address" + // nodeLabel is the name for the label containing a target's node name. + nodeLabel = model.MetaLabelPrefix + "consul_node" + // tagsLabel is the name of the label containing the tags assigned to the target. + tagsLabel = model.MetaLabelPrefix + "consul_tags" + // serviceLabel is the name of the label containing the service name. + serviceLabel = model.MetaLabelPrefix + "consul_service" + // serviceAddressLabel is the name of the label containing the (optional) service address. + serviceAddressLabel = model.MetaLabelPrefix + "consul_service_address" + //servicePortLabel is the name of the label containing the service port. + servicePortLabel = model.MetaLabelPrefix + "consul_service_port" + // datacenterLabel is the name of the label containing the datacenter ID. + datacenterLabel = model.MetaLabelPrefix + "consul_dc" + // serviceIDLabel is the name of the label containing the service ID. + serviceIDLabel = model.MetaLabelPrefix + "consul_service_id" +) + +// Discovery retrieves target information from a Consul server +// and updates them via watches. +type Discovery struct { + client *consul.Client + clientConf *consul.Config + clientDatacenter string + tagSeparator string + watchedServices []string // Set of services which will be discovered. +} + +// NewDiscovery returns a new Discovery for the given config. +func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) { + clientConf := &consul.Config{ + Address: conf.Server, + Scheme: conf.Scheme, + Datacenter: conf.Datacenter, + Token: conf.Token, + HttpAuth: &consul.HttpBasicAuth{ + Username: conf.Username, + Password: conf.Password, + }, + } + client, err := consul.NewClient(clientConf) + if err != nil { + return nil, err + } + cd := &Discovery{ + client: client, + clientConf: clientConf, + tagSeparator: conf.TagSeparator, + watchedServices: conf.Services, + } + // If the datacenter isn't set in the clientConf, let's get it from the local Consul agent + // (Consul default is to use local node's datacenter if one isn't given for a query). + if clientConf.Datacenter == "" { + info, err := client.Agent().Self() + if err != nil { + return nil, err + } + cd.clientDatacenter = info["Config"]["Datacenter"].(string) + } else { + cd.clientDatacenter = clientConf.Datacenter + } + return cd, nil +} + +// shouldWatch returns whether the service of the given name should be watched. +func (cd *Discovery) shouldWatch(name string) bool { + // If there's no fixed set of watched services, we watch everything. + if len(cd.watchedServices) == 0 { + return true + } + for _, sn := range cd.watchedServices { + if sn == name { + return true + } + } + return false +} + +// Run implements the TargetProvider interface. +func (cd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + defer close(ch) + + // Watched services and their cancelation functions. + services := map[string]func(){} + + var lastIndex uint64 + for { + catalog := cd.client.Catalog() + srvs, meta, err := catalog.Services(&consul.QueryOptions{ + WaitIndex: lastIndex, + WaitTime: watchTimeout, + }) + + // We have to check the context at least once. The checks during channel sends + // do not guarantee that. + select { + case <-ctx.Done(): + return + default: + } + + if err != nil { + log.Errorf("Error refreshing service list: %s", err) + time.Sleep(retryInterval) + continue + } + // If the index equals the previous one, the watch timed out with no update. + if meta.LastIndex == lastIndex { + continue + } + lastIndex = meta.LastIndex + + // Check for new services. + for name := range srvs { + if !cd.shouldWatch(name) { + continue + } + if _, ok := services[name]; ok { + continue // We are already watching the service. + } + + srv := &consulService{ + client: cd.client, + name: name, + labels: model.LabelSet{ + serviceLabel: model.LabelValue(name), + datacenterLabel: model.LabelValue(cd.clientDatacenter), + }, + tagSeparator: cd.tagSeparator, + } + + wctx, cancel := context.WithCancel(ctx) + go srv.watch(wctx, ch) + + services[name] = cancel + } + + // Check for removed services. + for name, cancel := range services { + if _, ok := srvs[name]; !ok { + // Call the watch cancelation function. + cancel() + delete(services, name) + + // Send clearing target group. + select { + case <-ctx.Done(): + return + case ch <- []*config.TargetGroup{{Source: name}}: + } + } + } + } +} + +// consulService contains data belonging to the same service. +type consulService struct { + name string + labels model.LabelSet + client *consul.Client + tagSeparator string +} + +func (srv *consulService) watch(ctx context.Context, ch chan<- []*config.TargetGroup) { + catalog := srv.client.Catalog() + + lastIndex := uint64(0) + for { + nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{ + WaitIndex: lastIndex, + WaitTime: watchTimeout, + }) + // Check the context before potentially falling in a continue-loop. + select { + case <-ctx.Done(): + return + default: + // Continue. + } + + if err != nil { + log.Errorf("Error refreshing service %s: %s", srv.name, err) + time.Sleep(retryInterval) + continue + } + // If the index equals the previous one, the watch timed out with no update. + if meta.LastIndex == lastIndex { + continue + } + lastIndex = meta.LastIndex + + tgroup := config.TargetGroup{ + Source: srv.name, + Labels: srv.labels, + Targets: make([]model.LabelSet, 0, len(nodes)), + } + + for _, node := range nodes { + var ( + addr = fmt.Sprintf("%s:%d", node.Address, node.ServicePort) + // We surround the separated list with the separator as well. This way regular expressions + // in relabeling rules don't have to consider tag positions. + tags = srv.tagSeparator + strings.Join(node.ServiceTags, srv.tagSeparator) + srv.tagSeparator + ) + tgroup.Targets = append(tgroup.Targets, model.LabelSet{ + model.AddressLabel: model.LabelValue(addr), + addressLabel: model.LabelValue(node.Address), + nodeLabel: model.LabelValue(node.Node), + tagsLabel: model.LabelValue(tags), + serviceAddressLabel: model.LabelValue(node.ServiceAddress), + servicePortLabel: model.LabelValue(strconv.Itoa(node.ServicePort)), + serviceIDLabel: model.LabelValue(node.ServiceID), + }) + } + // Check context twice to ensure we always catch cancelation. + select { + case <-ctx.Done(): + return + default: + } + select { + case <-ctx.Done(): + return + case ch <- []*config.TargetGroup{&tgroup}: + } + } +} diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 68051d95f..4d0e7af74 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -366,7 +366,7 @@ func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider { app("file", i, discovery.NewFileDiscovery(c)) } for i, c := range cfg.ConsulSDConfigs { - k, err := discovery.NewConsulDiscovery(c) + k, err := discovery.NewConsul(c) if err != nil { log.Errorf("Cannot create Consul discovery: %s", err) continue