diff --git a/retrieval/discovery/consul/consul.go b/retrieval/discovery/consul/consul.go index 75d81704b..58b2da806 100644 --- a/retrieval/discovery/consul/consul.go +++ b/retrieval/discovery/consul/consul.go @@ -21,6 +21,7 @@ import ( "time" consul "github.com/hashicorp/consul/api" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/prometheus/common/model" "golang.org/x/net/context" @@ -48,8 +49,37 @@ const ( datacenterLabel = model.MetaLabelPrefix + "consul_dc" // serviceIDLabel is the name of the label containing the service ID. serviceIDLabel = model.MetaLabelPrefix + "consul_service_id" + + // Constants for instrumentation. + namespace = "prometheus" ) +var ( + rpcFailuresCount = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "sd_consul_rpc_failures_total", + Help: "The number of Consul RPC call failures.", + }) + rpcDuration = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "sd_consul_rpc_duration_seconds", + Help: "The duration of a Consul RPC call in seconds.", + }, + []string{"endpoint", "call"}, + ) +) + +func init() { + prometheus.MustRegister(rpcFailuresCount) + prometheus.MustRegister(rpcDuration) + + // Initialize metric vectors. + rpcDuration.WithLabelValues("catalog", "service") + rpcDuration.WithLabelValues("catalog", "services") +} + // Discovery retrieves target information from a Consul server // and updates them via watches. type Discovery struct { @@ -110,10 +140,12 @@ func (cd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { var lastIndex uint64 for { catalog := cd.client.Catalog() + t0 := time.Now() srvs, meta, err := catalog.Services(&consul.QueryOptions{ WaitIndex: lastIndex, WaitTime: watchTimeout, }) + rpcDuration.WithLabelValues("catalog", "services").Observe(time.Since(t0).Seconds()) // We have to check the context at least once. The checks during channel sends // do not guarantee that. @@ -125,6 +157,7 @@ func (cd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { if err != nil { log.Errorf("Error refreshing service list: %s", err) + rpcFailuresCount.Inc() time.Sleep(retryInterval) continue } @@ -202,10 +235,13 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*config.TargetG lastIndex := uint64(0) for { + t0 := time.Now() nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{ WaitIndex: lastIndex, WaitTime: watchTimeout, }) + rpcDuration.WithLabelValues("catalog", "service").Observe(time.Since(t0).Seconds()) + // Check the context before potentially falling in a continue-loop. select { case <-ctx.Done(): @@ -216,6 +252,7 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*config.TargetG if err != nil { log.Errorf("Error refreshing service %s: %s", srv.name, err) + rpcFailuresCount.Inc() time.Sleep(retryInterval) continue }