diff --git a/discovery/manager.go b/discovery/manager.go index 94b8caf98..11be4b4c3 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -16,11 +16,13 @@ package discovery import ( "context" "fmt" + "reflect" "sync" "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" sd_config "github.com/prometheus/prometheus/discovery/config" "github.com/prometheus/prometheus/discovery/targetgroup" @@ -38,6 +40,19 @@ import ( "github.com/prometheus/prometheus/discovery/zookeeper" ) +var ( + failedConfigs = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "prometheus_sd_configs_failed_total", + Help: "Total number of service discovery configurations that failed to load.", + }, + ) +) + +func init() { + prometheus.MustRegister(failedConfigs) +} + // Discoverer provides information about target groups. It maintains a set // of sources from which TargetGroups can originate. Whenever a discovery provider // detects a potential change, it sends the TargetGroup through its channel. @@ -59,8 +74,19 @@ type poolKey struct { provider string } +// provider holds a Discoverer instance, its configuration and its subscribers. +type provider struct { + name string + d Discoverer + subs []string + config interface{} +} + // NewManager is the Discovery Manager constructor func NewManager(ctx context.Context, logger log.Logger) *Manager { + if logger == nil { + logger = log.NewNopLogger() + } return &Manager{ logger: logger, syncCh: make(chan map[string][]*targetgroup.Group), @@ -77,9 +103,12 @@ type Manager struct { mtx sync.RWMutex ctx context.Context discoverCancel []context.CancelFunc + // Some Discoverers(eg. k8s) send only the updates for a given target group // so we use map[tg.Source]*targetgroup.Group to know which group to update. targets map[poolKey]map[string]*targetgroup.Group + // providers keeps track of SD providers. + providers []*provider // The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config. syncCh chan map[string][]*targetgroup.Group } @@ -105,9 +134,10 @@ func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) e m.cancelDiscoverers() for name, scfg := range cfg { - for provName, prov := range m.providersFromConfig(scfg) { - m.startProvider(m.ctx, poolKey{setName: name, provider: provName}, prov) - } + m.registerProviders(scfg, name) + } + for _, prov := range m.providers { + m.startProvider(m.ctx, prov) } return nil @@ -115,22 +145,27 @@ func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) e // StartCustomProvider is used for sdtool. Only use this if you know what you're doing. func (m *Manager) StartCustomProvider(ctx context.Context, name string, worker Discoverer) { - // Pool key for non-standard SD implementations are unknown. - poolKey := poolKey{setName: name, provider: name} - m.startProvider(ctx, poolKey, worker) + p := &provider{ + name: name, + d: worker, + subs: []string{name}, + } + m.providers = append(m.providers, p) + m.startProvider(ctx, p) } -func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Discoverer) { +func (m *Manager) startProvider(ctx context.Context, p *provider) { + level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs)) ctx, cancel := context.WithCancel(ctx) updates := make(chan []*targetgroup.Group) m.discoverCancel = append(m.discoverCancel, cancel) - go worker.Run(ctx, updates) - go m.runProvider(ctx, poolKey, updates) + go p.d.Run(ctx, updates) + go m.runProvider(ctx, p, updates) } -func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan []*targetgroup.Group) { +func (m *Manager) runProvider(ctx context.Context, p *provider, updates chan []*targetgroup.Group) { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() @@ -152,7 +187,9 @@ func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan } return } - m.updateGroup(poolKey, tgs) + for _, s := range p.subs { + m.updateGroup(poolKey{setName: s, provider: p.name}, tgs) + } // Signal that there was an update. select { @@ -178,6 +215,7 @@ func (m *Manager) cancelDiscoverers() { c() } m.targets = make(map[poolKey]map[string]*targetgroup.Group) + m.providers = nil m.discoverCancel = nil } @@ -210,85 +248,97 @@ func (m *Manager) allGroups() map[string][]*targetgroup.Group { return tSets } -func (m *Manager) providersFromConfig(cfg sd_config.ServiceDiscoveryConfig) map[string]Discoverer { - providers := map[string]Discoverer{} +func (m *Manager) registerProviders(cfg sd_config.ServiceDiscoveryConfig, setName string) { + add := func(cfg interface{}, newDiscoverer func() (Discoverer, error)) { + t := reflect.TypeOf(cfg).String() + for _, p := range m.providers { + if reflect.DeepEqual(cfg, p.config) { + p.subs = append(p.subs, setName) + return + } + } - app := func(mech string, i int, tp Discoverer) { - providers[fmt.Sprintf("%s/%d", mech, i)] = tp + d, err := newDiscoverer() + if err != nil { + level.Error(m.logger).Log("msg", "Cannot create service discovery", "err", err, "type", t) + failedConfigs.Inc() + return + } + + provider := provider{ + name: fmt.Sprintf("%s/%d", t, len(m.providers)), + d: d, + config: cfg, + subs: []string{setName}, + } + m.providers = append(m.providers, &provider) } - for i, c := range cfg.DNSSDConfigs { - app("dns", i, dns.NewDiscovery(*c, log.With(m.logger, "discovery", "dns"))) + for _, c := range cfg.DNSSDConfigs { + add(c, func() (Discoverer, error) { + return dns.NewDiscovery(*c, log.With(m.logger, "discovery", "dns")), nil + }) } - for i, c := range cfg.FileSDConfigs { - app("file", i, file.NewDiscovery(c, log.With(m.logger, "discovery", "file"))) + for _, c := range cfg.FileSDConfigs { + add(c, func() (Discoverer, error) { + return file.NewDiscovery(c, log.With(m.logger, "discovery", "file")), nil + }) } - for i, c := range cfg.ConsulSDConfigs { - k, err := consul.NewDiscovery(c, log.With(m.logger, "discovery", "consul")) - if err != nil { - level.Error(m.logger).Log("msg", "Cannot create Consul discovery", "err", err) - continue - } - app("consul", i, k) + for _, c := range cfg.ConsulSDConfigs { + add(c, func() (Discoverer, error) { + return consul.NewDiscovery(c, log.With(m.logger, "discovery", "consul")) + }) } - for i, c := range cfg.MarathonSDConfigs { - t, err := marathon.NewDiscovery(*c, log.With(m.logger, "discovery", "marathon")) - if err != nil { - level.Error(m.logger).Log("msg", "Cannot create Marathon discovery", "err", err) - continue - } - app("marathon", i, t) + for _, c := range cfg.MarathonSDConfigs { + add(c, func() (Discoverer, error) { + return marathon.NewDiscovery(*c, log.With(m.logger, "discovery", "marathon")) + }) } - for i, c := range cfg.KubernetesSDConfigs { - k, err := kubernetes.New(log.With(m.logger, "discovery", "k8s"), c) - if err != nil { - level.Error(m.logger).Log("msg", "Cannot create Kubernetes discovery", "err", err) - continue - } - app("kubernetes", i, k) + for _, c := range cfg.KubernetesSDConfigs { + add(c, func() (Discoverer, error) { + return kubernetes.New(log.With(m.logger, "discovery", "k8s"), c) + }) } - for i, c := range cfg.ServersetSDConfigs { - app("serverset", i, zookeeper.NewServersetDiscovery(c, log.With(m.logger, "discovery", "zookeeper"))) + for _, c := range cfg.ServersetSDConfigs { + add(c, func() (Discoverer, error) { + return zookeeper.NewServersetDiscovery(c, log.With(m.logger, "discovery", "zookeeper")), nil + }) } - for i, c := range cfg.NerveSDConfigs { - app("nerve", i, zookeeper.NewNerveDiscovery(c, log.With(m.logger, "discovery", "nerve"))) + for _, c := range cfg.NerveSDConfigs { + add(c, func() (Discoverer, error) { + return zookeeper.NewNerveDiscovery(c, log.With(m.logger, "discovery", "nerve")), nil + }) } - for i, c := range cfg.EC2SDConfigs { - app("ec2", i, ec2.NewDiscovery(c, log.With(m.logger, "discovery", "ec2"))) + for _, c := range cfg.EC2SDConfigs { + add(c, func() (Discoverer, error) { + return ec2.NewDiscovery(c, log.With(m.logger, "discovery", "ec2")), nil + }) } - for i, c := range cfg.OpenstackSDConfigs { - openstackd, err := openstack.NewDiscovery(c, log.With(m.logger, "discovery", "openstack")) - if err != nil { - level.Error(m.logger).Log("msg", "Cannot initialize OpenStack discovery", "err", err) - continue - } - app("openstack", i, openstackd) + for _, c := range cfg.OpenstackSDConfigs { + add(c, func() (Discoverer, error) { + return openstack.NewDiscovery(c, log.With(m.logger, "discovery", "openstack")) + }) } - - for i, c := range cfg.GCESDConfigs { - gced, err := gce.NewDiscovery(*c, log.With(m.logger, "discovery", "gce")) - if err != nil { - level.Error(m.logger).Log("msg", "Cannot initialize GCE discovery", "err", err) - continue - } - app("gce", i, gced) + for _, c := range cfg.GCESDConfigs { + add(c, func() (Discoverer, error) { + return gce.NewDiscovery(*c, log.With(m.logger, "discovery", "gce")) + }) } - for i, c := range cfg.AzureSDConfigs { - app("azure", i, azure.NewDiscovery(c, log.With(m.logger, "discovery", "azure"))) + for _, c := range cfg.AzureSDConfigs { + add(c, func() (Discoverer, error) { + return azure.NewDiscovery(c, log.With(m.logger, "discovery", "azure")), nil + }) } - for i, c := range cfg.TritonSDConfigs { - t, err := triton.New(log.With(m.logger, "discovery", "trition"), c) - if err != nil { - level.Error(m.logger).Log("msg", "Cannot create Triton discovery", "err", err) - continue - } - app("triton", i, t) + for _, c := range cfg.TritonSDConfigs { + add(c, func() (Discoverer, error) { + return triton.New(log.With(m.logger, "discovery", "triton"), c) + }) } if len(cfg.StaticConfigs) > 0 { - app("static", 0, &StaticProvider{cfg.StaticConfigs}) + add(setName, func() (Discoverer, error) { + return &StaticProvider{cfg.StaticConfigs}, nil + }) } - - return providers } // StaticProvider holds a list of target groups that never change. diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 19efb7d62..402c75b9d 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -16,6 +16,8 @@ package discovery import ( "context" "fmt" + "io/ioutil" + "os" "reflect" "sort" "strconv" @@ -32,7 +34,7 @@ import ( // TestTargetUpdatesOrder checks that the target updates are received in the expected order. func TestTargetUpdatesOrder(t *testing.T) { - // The order by which the updates are send is detirmened by the interval passed to the mock discovery adapter + // The order by which the updates are send is determined by the interval passed to the mock discovery adapter // Final targets array is ordered alphabetically by the name of the discoverer. // For example discoverer "A" with targets "t2,t3" and discoverer "B" with targets "t1,t2" will result in "t2,t3,t1,t2" after the merge. testCases := []struct { @@ -699,34 +701,34 @@ func TestTargetUpdatesOrder(t *testing.T) { } } -func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) { - verifyPresence := func(tSets map[poolKey]map[string]*targetgroup.Group, poolKey poolKey, label string, present bool) { - if _, ok := tSets[poolKey]; !ok { - t.Fatalf("'%s' should be present in Pool keys: %v", poolKey, tSets) - return - } - - match := false - var mergedTargets string - for _, targetGroup := range tSets[poolKey] { - - for _, l := range targetGroup.Targets { - mergedTargets = mergedTargets + " " + l.String() - if l.String() == label { - match = true - } - } - - } - if match != present { - msg := "" - if !present { - msg = "not" - } - t.Fatalf("'%s' should %s be present in Targets labels: %v", label, msg, mergedTargets) - } +func verifyPresence(t *testing.T, tSets map[poolKey]map[string]*targetgroup.Group, poolKey poolKey, label string, present bool) { + if _, ok := tSets[poolKey]; !ok { + t.Fatalf("'%s' should be present in Pool keys: %v", poolKey, tSets) + return } + match := false + var mergedTargets string + for _, targetGroup := range tSets[poolKey] { + + for _, l := range targetGroup.Targets { + mergedTargets = mergedTargets + " " + l.String() + if l.String() == label { + match = true + } + } + + } + if match != present { + msg := "" + if !present { + msg = "not" + } + t.Fatalf("'%s' should %s be present in Targets labels: %v", label, msg, mergedTargets) + } +} + +func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) { cfg := &config.Config{} sOne := ` @@ -751,8 +753,8 @@ scrape_configs: discoveryManager.ApplyConfig(c) <-discoveryManager.SyncCh() - verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) - verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", true) + verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "string/0"}, "{__address__=\"foo:9090\"}", true) + verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "string/0"}, "{__address__=\"bar:9090\"}", true) sTwo := ` scrape_configs: @@ -770,8 +772,60 @@ scrape_configs: discoveryManager.ApplyConfig(c) <-discoveryManager.SyncCh() - verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) - verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", false) + verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "string/0"}, "{__address__=\"foo:9090\"}", true) + verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "string/0"}, "{__address__=\"bar:9090\"}", false) +} + +func TestIdenticalConfigurationsAreCoalesced(t *testing.T) { + tmpFile, err := ioutil.TempFile("", "sd") + if err != nil { + t.Fatalf("error creating temporary file: %v", err) + } + defer os.Remove(tmpFile.Name()) + if _, err := tmpFile.Write([]byte(`[{"targets": ["foo:9090"]}]`)); err != nil { + t.Fatalf("error writing temporary file: %v", err) + } + if err := tmpFile.Close(); err != nil { + t.Fatalf("error closing temporary file: %v", err) + } + tmpFile2 := fmt.Sprintf("%s.json", tmpFile.Name()) + if err = os.Link(tmpFile.Name(), tmpFile2); err != nil { + t.Fatalf("error linking temporary file: %v", err) + } + defer os.Remove(tmpFile2) + + cfg := &config.Config{} + + sOne := ` +scrape_configs: + - job_name: 'prometheus' + file_sd_configs: + - files: ["%s"] + - job_name: 'prometheus2' + file_sd_configs: + - files: ["%s"] +` + sOne = fmt.Sprintf(sOne, tmpFile2, tmpFile2) + if err := yaml.UnmarshalStrict([]byte(sOne), cfg); err != nil { + t.Fatalf("Unable to load YAML config sOne: %s", err) + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + discoveryManager := NewManager(ctx, nil) + go discoveryManager.Run() + + c := make(map[string]sd_config.ServiceDiscoveryConfig) + for _, v := range cfg.ScrapeConfigs { + c[v.JobName] = v.ServiceDiscoveryConfig + } + discoveryManager.ApplyConfig(c) + + <-discoveryManager.SyncCh() + verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "*file.SDConfig/0"}, "{__address__=\"foo:9090\"}", true) + verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus2", provider: "*file.SDConfig/0"}, "{__address__=\"foo:9090\"}", true) + if len(discoveryManager.providers) != 1 { + t.Fatalf("Invalid number of providers: expected 1, got %d", len(discoveryManager.providers)) + } } func TestApplyConfigDoesNotModifyStaticProviderTargets(t *testing.T) {