From 27d66628a1a764277479ec8f6175a0b54049cbba Mon Sep 17 00:00:00 2001 From: Brian Akins Date: Wed, 19 Apr 2017 08:36:34 -0400 Subject: [PATCH] Allow limiting Kubernetes service discover to certain namespaces Allow namespace discovery to be more easily extended in the future by using a struct rather than just a list. Rename fields for kubernetes namespace discovery --- config/config.go | 35 ++++- config/config_test.go | 27 ++++ config/testdata/conf.good.yml | 9 ++ .../kubernetes_namespace_discovery.bad.yml | 6 + discovery/kubernetes/kubernetes.go | 131 +++++++++++------- 5 files changed, 154 insertions(+), 54 deletions(-) create mode 100644 config/testdata/kubernetes_namespace_discovery.bad.yml diff --git a/config/config.go b/config/config.go index 2442f64b2..88abbb9f4 100644 --- a/config/config.go +++ b/config/config.go @@ -987,12 +987,13 @@ func (c *KubernetesRole) UnmarshalYAML(unmarshal func(interface{}) error) error // KubernetesSDConfig is the configuration for Kubernetes service discovery. type KubernetesSDConfig struct { - APIServer URL `yaml:"api_server"` - Role KubernetesRole `yaml:"role"` - BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"` - BearerToken string `yaml:"bearer_token,omitempty"` - BearerTokenFile string `yaml:"bearer_token_file,omitempty"` - TLSConfig TLSConfig `yaml:"tls_config,omitempty"` + APIServer URL `yaml:"api_server"` + Role KubernetesRole `yaml:"role"` + BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"` + BearerToken string `yaml:"bearer_token,omitempty"` + BearerTokenFile string `yaml:"bearer_token_file,omitempty"` + TLSConfig TLSConfig `yaml:"tls_config,omitempty"` + NamespaceDiscovery KubernetesNamespaceDiscovery `yaml:"namespaces"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` @@ -1026,6 +1027,28 @@ func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) er return nil } +// KubernetesNamespaceDiscovery is the configuration for discovering +// Kubernetes namespaces. +type KubernetesNamespaceDiscovery struct { + Names []string `yaml:"names"` + // Catches all undefined fields and must be empty after parsing. + XXX map[string]interface{} `yaml:",inline"` +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (c *KubernetesNamespaceDiscovery) UnmarshalYAML(unmarshal func(interface{}) error) error { + *c = KubernetesNamespaceDiscovery{} + type plain KubernetesNamespaceDiscovery + err := unmarshal((*plain)(c)) + if err != nil { + return err + } + if err := checkOverflow(c.XXX, "namespaces"); err != nil { + return err + } + return nil +} + // GCESDConfig is the configuration for GCE based service discovery. type GCESDConfig struct { // Project: The Google Cloud Project ID diff --git a/config/config_test.go b/config/config_test.go index 66972cbbf..c0ed1e40b 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -305,6 +305,30 @@ var expectedConf = &Config{ Username: "myusername", Password: "mypassword", }, + NamespaceDiscovery: KubernetesNamespaceDiscovery{}, + }, + }, + }, + }, + { + JobName: "service-kubernetes-namespaces", + + ScrapeInterval: model.Duration(15 * time.Second), + ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, + + MetricsPath: DefaultScrapeConfig.MetricsPath, + Scheme: DefaultScrapeConfig.Scheme, + + ServiceDiscoveryConfig: ServiceDiscoveryConfig{ + KubernetesSDConfigs: []*KubernetesSDConfig{ + { + APIServer: kubernetesSDHostURL(), + Role: KubernetesRoleEndpoint, + NamespaceDiscovery: KubernetesNamespaceDiscovery{ + Names: []string{ + "default", + }, + }, }, }, }, @@ -592,6 +616,9 @@ var expectedErrors = []struct { }, { filename: "kubernetes_role.bad.yml", errMsg: "role", + }, { + filename: "kubernetes_namespace_discovery.bad.yml", + errMsg: "unknown fields in namespaces", }, { filename: "kubernetes_bearertoken_basicauth.bad.yml", errMsg: "at most one of basic_auth, bearer_token & bearer_token_file must be configured", diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml index 05015da9f..1825edd9b 100644 --- a/config/testdata/conf.good.yml +++ b/config/testdata/conf.good.yml @@ -146,6 +146,15 @@ scrape_configs: username: 'myusername' password: 'mypassword' +- job_name: service-kubernetes-namespaces + + kubernetes_sd_configs: + - role: endpoints + api_server: 'https://localhost:1234' + namespaces: + names: + - default + - job_name: service-marathon marathon_sd_configs: - servers: diff --git a/config/testdata/kubernetes_namespace_discovery.bad.yml b/config/testdata/kubernetes_namespace_discovery.bad.yml new file mode 100644 index 000000000..c98d65d34 --- /dev/null +++ b/config/testdata/kubernetes_namespace_discovery.bad.yml @@ -0,0 +1,6 @@ +scrape_configs: +- kubernetes_sd_configs: + - api_server: kubernetes:443 + role: endpoints + namespaces: + foo: bar diff --git a/discovery/kubernetes/kubernetes.go b/discovery/kubernetes/kubernetes.go index 8e06b8362..64430f952 100644 --- a/discovery/kubernetes/kubernetes.go +++ b/discovery/kubernetes/kubernetes.go @@ -15,6 +15,7 @@ package kubernetes import ( "io/ioutil" + "sync" "time" "github.com/prometheus/client_golang/prometheus" @@ -62,9 +63,10 @@ func init() { // Discovery implements the TargetProvider interface for discovering // targets from Kubernetes. type Discovery struct { - client kubernetes.Interface - role config.KubernetesRole - logger log.Logger + client kubernetes.Interface + role config.KubernetesRole + logger log.Logger + namespaceDiscovery *config.KubernetesNamespaceDiscovery } func init() { @@ -75,6 +77,14 @@ func init() { } } +func (d *Discovery) getNamespaces() []string { + namespaces := d.namespaceDiscovery.Names + if len(namespaces) == 0 { + namespaces = []string{api.NamespaceAll} + } + return namespaces +} + // New creates a new Kubernetes discovery for the given role. func New(l log.Logger, conf *config.KubernetesSDConfig) (*Discovery, error) { var ( @@ -137,9 +147,10 @@ func New(l log.Logger, conf *config.KubernetesSDConfig) (*Discovery, error) { return nil, err } return &Discovery{ - client: c, - logger: l, - role: conf.Role, + client: c, + logger: l, + role: conf.Role, + namespaceDiscovery: &conf.NamespaceDiscovery, }, nil } @@ -149,58 +160,82 @@ const resyncPeriod = 10 * time.Minute func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { rclient := d.client.Core().GetRESTClient() + namespaces := d.getNamespaces() + switch d.role { case "endpoints": - elw := cache.NewListWatchFromClient(rclient, "endpoints", api.NamespaceAll, nil) - slw := cache.NewListWatchFromClient(rclient, "services", api.NamespaceAll, nil) - plw := cache.NewListWatchFromClient(rclient, "pods", api.NamespaceAll, nil) - eps := NewEndpoints( - d.logger.With("kubernetes_sd", "endpoint"), - cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod), - cache.NewSharedInformer(elw, &apiv1.Endpoints{}, resyncPeriod), - cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), - ) - go eps.endpointsInf.Run(ctx.Done()) - go eps.serviceInf.Run(ctx.Done()) - go eps.podInf.Run(ctx.Done()) + var wg sync.WaitGroup - for !eps.serviceInf.HasSynced() { - time.Sleep(100 * time.Millisecond) - } - for !eps.endpointsInf.HasSynced() { - time.Sleep(100 * time.Millisecond) - } - for !eps.podInf.HasSynced() { - time.Sleep(100 * time.Millisecond) - } - eps.Run(ctx, ch) + for _, namespace := range namespaces { + elw := cache.NewListWatchFromClient(rclient, "endpoints", namespace, nil) + slw := cache.NewListWatchFromClient(rclient, "services", namespace, nil) + plw := cache.NewListWatchFromClient(rclient, "pods", namespace, nil) + eps := NewEndpoints( + d.logger.With("kubernetes_sd", "endpoint"), + cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod), + cache.NewSharedInformer(elw, &apiv1.Endpoints{}, resyncPeriod), + cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), + ) + go eps.endpointsInf.Run(ctx.Done()) + go eps.serviceInf.Run(ctx.Done()) + go eps.podInf.Run(ctx.Done()) + for !eps.serviceInf.HasSynced() { + time.Sleep(100 * time.Millisecond) + } + for !eps.endpointsInf.HasSynced() { + time.Sleep(100 * time.Millisecond) + } + for !eps.podInf.HasSynced() { + time.Sleep(100 * time.Millisecond) + } + wg.Add(1) + go func() { + defer wg.Done() + eps.Run(ctx, ch) + }() + } + wg.Wait() case "pod": - plw := cache.NewListWatchFromClient(rclient, "pods", api.NamespaceAll, nil) - pod := NewPod( - d.logger.With("kubernetes_sd", "pod"), - cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), - ) - go pod.informer.Run(ctx.Done()) + var wg sync.WaitGroup + for _, namespace := range namespaces { + plw := cache.NewListWatchFromClient(rclient, "pods", namespace, nil) + pod := NewPod( + d.logger.With("kubernetes_sd", "pod"), + cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), + ) + go pod.informer.Run(ctx.Done()) - for !pod.informer.HasSynced() { - time.Sleep(100 * time.Millisecond) + for !pod.informer.HasSynced() { + time.Sleep(100 * time.Millisecond) + } + wg.Add(1) + go func() { + defer wg.Done() + pod.Run(ctx, ch) + }() } - pod.Run(ctx, ch) - + wg.Wait() case "service": - slw := cache.NewListWatchFromClient(rclient, "services", api.NamespaceAll, nil) - svc := NewService( - d.logger.With("kubernetes_sd", "service"), - cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod), - ) - go svc.informer.Run(ctx.Done()) + var wg sync.WaitGroup + for _, namespace := range namespaces { + slw := cache.NewListWatchFromClient(rclient, "services", namespace, nil) + svc := NewService( + d.logger.With("kubernetes_sd", "service"), + cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod), + ) + go svc.informer.Run(ctx.Done()) - for !svc.informer.HasSynced() { - time.Sleep(100 * time.Millisecond) + for !svc.informer.HasSynced() { + time.Sleep(100 * time.Millisecond) + } + wg.Add(1) + go func() { + defer wg.Done() + svc.Run(ctx, ch) + }() } - svc.Run(ctx, ch) - + wg.Wait() case "node": nlw := cache.NewListWatchFromClient(rclient, "nodes", api.NamespaceAll, nil) node := NewNode(