From 0fcea6e9fb12c914bf8923cfe5ced047eaefb90e Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Mon, 14 Nov 2016 16:21:38 +0100 Subject: [PATCH] retrieval/discovery/kubernetes: fix cache state unknown behavior (#2180) * retrieval/discovery/kubernetes: fix cache state unknown behavior * retrieval/discovery/kubernetes: extract type casting * retrieval/discovery/kubernetes: add tests for possible regressions --- retrieval/discovery/kubernetes/endpoints.go | 57 +++++++++++++++---- .../discovery/kubernetes/endpoints_test.go | 16 ++++++ retrieval/discovery/kubernetes/node.go | 37 +++++++++++- retrieval/discovery/kubernetes/node_test.go | 30 ++++++++++ retrieval/discovery/kubernetes/pod.go | 37 +++++++++++- retrieval/discovery/kubernetes/pod_test.go | 39 ++++++++++++- retrieval/discovery/kubernetes/service.go | 38 ++++++++++++- .../discovery/kubernetes/service_test.go | 32 +++++++++++ 8 files changed, 266 insertions(+), 20 deletions(-) diff --git a/retrieval/discovery/kubernetes/endpoints.go b/retrieval/discovery/kubernetes/endpoints.go index cb7a9e551..e0c788286 100644 --- a/retrieval/discovery/kubernetes/endpoints.go +++ b/retrieval/discovery/kubernetes/endpoints.go @@ -83,17 +83,38 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { e.endpointsInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) { - send(e.buildEndpoints(o.(*apiv1.Endpoints))) + eps, err := convertToEndpoints(o) + if err != nil { + e.logger.With("err", err).Errorln("converting to Endpoints object failed") + return + } + send(e.buildEndpoints(eps)) }, UpdateFunc: func(_, o interface{}) { - send(e.buildEndpoints(o.(*apiv1.Endpoints))) + eps, err := convertToEndpoints(o) + if err != nil { + e.logger.With("err", err).Errorln("converting to Endpoints object failed") + return + } + send(e.buildEndpoints(eps)) }, DeleteFunc: func(o interface{}) { - send(&config.TargetGroup{Source: endpointsSource(o.(*apiv1.Endpoints).ObjectMeta)}) + eps, err := convertToEndpoints(o) + if err != nil { + e.logger.With("err", err).Errorln("converting to Endpoints object failed") + return + } + send(&config.TargetGroup{Source: endpointsSource(eps)}) }, }) - serviceUpdate := func(svc *apiv1.Service) { + serviceUpdate := func(o interface{}) { + svc, err := convertToService(o) + if err != nil { + e.logger.With("err", err).Errorln("converting to Service object failed") + return + } + ep := &apiv1.Endpoints{} ep.Namespace = svc.Namespace ep.Name = svc.Name @@ -108,17 +129,33 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { e.serviceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ // TODO(fabxc): potentially remove add and delete event handlers. Those should // be triggered via the endpoint handlers already. - AddFunc: func(o interface{}) { serviceUpdate(o.(*apiv1.Service)) }, - UpdateFunc: func(_, o interface{}) { serviceUpdate(o.(*apiv1.Service)) }, - DeleteFunc: func(o interface{}) { serviceUpdate(o.(*apiv1.Service)) }, + AddFunc: func(o interface{}) { serviceUpdate(o) }, + UpdateFunc: func(_, o interface{}) { serviceUpdate(o) }, + DeleteFunc: func(o interface{}) { serviceUpdate(o) }, }) // Block until the target provider is explicitly canceled. <-ctx.Done() } -func endpointsSource(ep apiv1.ObjectMeta) string { - return "endpoints/" + ep.Namespace + "/" + ep.Name +func convertToEndpoints(o interface{}) (*apiv1.Endpoints, error) { + endpoints, isEndpoints := o.(*apiv1.Endpoints) + if !isEndpoints { + deletedState, ok := o.(cache.DeletedFinalStateUnknown) + if !ok { + return nil, fmt.Errorf("Received unexpected object: %v", o) + } + endpoints, ok = deletedState.Obj.(*apiv1.Endpoints) + if !ok { + return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Endpoints object: %v", deletedState.Obj) + } + } + + return endpoints, nil +} + +func endpointsSource(ep *apiv1.Endpoints) string { + return "endpoints/" + ep.ObjectMeta.Namespace + "/" + ep.ObjectMeta.Name } const ( @@ -134,7 +171,7 @@ func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *config.TargetGroup { } tg := &config.TargetGroup{ - Source: endpointsSource(eps.ObjectMeta), + Source: endpointsSource(eps), } tg.Labels = model.LabelSet{ namespaceLabel: lv(eps.Namespace), diff --git a/retrieval/discovery/kubernetes/endpoints_test.go b/retrieval/discovery/kubernetes/endpoints_test.go index 8ea304a19..296249f7e 100644 --- a/retrieval/discovery/kubernetes/endpoints_test.go +++ b/retrieval/discovery/kubernetes/endpoints_test.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "k8s.io/client-go/1.5/pkg/api/v1" + "k8s.io/client-go/1.5/tools/cache" ) func endpointsStoreKeyFunc(obj interface{}) (string, error) { @@ -248,6 +249,21 @@ func TestEndpointsDiscoveryDelete(t *testing.T) { }.Run(t) } +func TestEndpointsDiscoveryDeleteUnknownCacheState(t *testing.T) { + n, _, eps, _ := makeTestEndpointsDiscovery() + eps.GetStore().Add(makeEndpoints()) + + k8sDiscoveryTest{ + discovery: n, + afterStart: func() { go func() { eps.Delete(cache.DeletedFinalStateUnknown{Obj: makeEndpoints()}) }() }, + expectedRes: []*config.TargetGroup{ + &config.TargetGroup{ + Source: "endpoints/default/testendpoints", + }, + }, + }.Run(t) +} + func TestEndpointsDiscoveryUpdate(t *testing.T) { n, _, eps, _ := makeTestEndpointsDiscovery() eps.GetStore().Add(makeEndpoints()) diff --git a/retrieval/discovery/kubernetes/node.go b/retrieval/discovery/kubernetes/node.go index e2d684b84..81f579e75 100644 --- a/retrieval/discovery/kubernetes/node.go +++ b/retrieval/discovery/kubernetes/node.go @@ -63,13 +63,28 @@ func (n *Node) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { } n.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) { - send(n.buildNode(o.(*apiv1.Node))) + node, err := convertToNode(o) + if err != nil { + n.logger.With("err", err).Errorln("converting to Node object failed") + return + } + send(n.buildNode(node)) }, DeleteFunc: func(o interface{}) { - send(&config.TargetGroup{Source: nodeSource(o.(*apiv1.Node))}) + node, err := convertToNode(o) + if err != nil { + n.logger.With("err", err).Errorln("converting to Node object failed") + return + } + send(&config.TargetGroup{Source: nodeSource(node)}) }, UpdateFunc: func(_, o interface{}) { - send(n.buildNode(o.(*apiv1.Node))) + node, err := convertToNode(o) + if err != nil { + n.logger.With("err", err).Errorln("converting to Node object failed") + return + } + send(n.buildNode(node)) }, }) @@ -77,6 +92,22 @@ func (n *Node) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { <-ctx.Done() } +func convertToNode(o interface{}) (*apiv1.Node, error) { + node, isNode := o.(*apiv1.Node) + if !isNode { + deletedState, ok := o.(cache.DeletedFinalStateUnknown) + if !ok { + return nil, fmt.Errorf("Received unexpected object: %v", o) + } + node, ok = deletedState.Obj.(*apiv1.Node) + if !ok { + return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj) + } + } + + return node, nil +} + func nodeSource(n *apiv1.Node) string { return "node/" + n.Name } diff --git a/retrieval/discovery/kubernetes/node_test.go b/retrieval/discovery/kubernetes/node_test.go index 0bb69cec9..9f78f7b02 100644 --- a/retrieval/discovery/kubernetes/node_test.go +++ b/retrieval/discovery/kubernetes/node_test.go @@ -270,6 +270,36 @@ func TestNodeDiscoveryDelete(t *testing.T) { }.Run(t) } +func TestNodeDiscoveryDeleteUnknownCacheState(t *testing.T) { + n, i := makeTestNodeDiscovery() + i.GetStore().Add(makeEnumeratedNode(0)) + + k8sDiscoveryTest{ + discovery: n, + afterStart: func() { go func() { i.Delete(cache.DeletedFinalStateUnknown{Obj: makeEnumeratedNode(0)}) }() }, + expectedInitial: []*config.TargetGroup{ + &config.TargetGroup{ + Targets: []model.LabelSet{ + model.LabelSet{ + "__address__": "1.2.3.4:10250", + "instance": "test0", + "__meta_kubernetes_node_address_InternalIP": "1.2.3.4", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_node_name": "test0", + }, + Source: "node/test0", + }, + }, + expectedRes: []*config.TargetGroup{ + &config.TargetGroup{ + Source: "node/test0", + }, + }, + }.Run(t) +} + func TestNodeDiscoveryUpdate(t *testing.T) { n, i := makeTestNodeDiscovery() i.GetStore().Add(makeEnumeratedNode(0)) diff --git a/retrieval/discovery/kubernetes/pod.go b/retrieval/discovery/kubernetes/pod.go index 3d16ea8b9..654d47eff 100644 --- a/retrieval/discovery/kubernetes/pod.go +++ b/retrieval/discovery/kubernetes/pod.go @@ -71,13 +71,28 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { } p.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) { - send(p.buildPod(o.(*apiv1.Pod))) + pod, err := convertToPod(o) + if err != nil { + p.logger.With("err", err).Errorln("converting to Pod object failed") + return + } + send(p.buildPod(pod)) }, DeleteFunc: func(o interface{}) { - send(&config.TargetGroup{Source: podSource(o.(*apiv1.Pod))}) + pod, err := convertToPod(o) + if err != nil { + p.logger.With("err", err).Errorln("converting to Pod object failed") + return + } + send(&config.TargetGroup{Source: podSource(pod)}) }, UpdateFunc: func(_, o interface{}) { - send(p.buildPod(o.(*apiv1.Pod))) + pod, err := convertToPod(o) + if err != nil { + p.logger.With("err", err).Errorln("converting to Pod object failed") + return + } + send(p.buildPod(pod)) }, }) @@ -85,6 +100,22 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { <-ctx.Done() } +func convertToPod(o interface{}) (*apiv1.Pod, error) { + pod, isPod := o.(*apiv1.Pod) + if !isPod { + deletedState, ok := o.(cache.DeletedFinalStateUnknown) + if !ok { + return nil, fmt.Errorf("Received unexpected object: %v", o) + } + pod, ok = deletedState.Obj.(*apiv1.Pod) + if !ok { + return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Pod object: %v", deletedState.Obj) + } + } + + return pod, nil +} + const ( podNameLabel = metaLabelPrefix + "pod_name" podIPLabel = metaLabelPrefix + "pod_ip" diff --git a/retrieval/discovery/kubernetes/pod_test.go b/retrieval/discovery/kubernetes/pod_test.go index 8b3112e52..c683e825b 100644 --- a/retrieval/discovery/kubernetes/pod_test.go +++ b/retrieval/discovery/kubernetes/pod_test.go @@ -14,13 +14,13 @@ package kubernetes import ( - //"fmt" "testing" "github.com/prometheus/common/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "k8s.io/client-go/1.5/pkg/api/v1" + "k8s.io/client-go/1.5/tools/cache" ) func podStoreKeyFunc(obj interface{}) (string, error) { @@ -226,6 +226,43 @@ func TestPodDiscoveryDelete(t *testing.T) { }.Run(t) } +func TestPodDiscoveryDeleteUnknownCacheState(t *testing.T) { + n, i := makeTestPodDiscovery() + i.GetStore().Add(makePod()) + + k8sDiscoveryTest{ + discovery: n, + afterStart: func() { go func() { i.Delete(cache.DeletedFinalStateUnknown{Obj: makePod()}) }() }, + expectedInitial: []*config.TargetGroup{ + &config.TargetGroup{ + Targets: []model.LabelSet{ + model.LabelSet{ + "__address__": "1.2.3.4:9000", + "__meta_kubernetes_pod_container_name": "testcontainer", + "__meta_kubernetes_pod_container_port_name": "testport", + "__meta_kubernetes_pod_container_port_number": "9000", + "__meta_kubernetes_pod_container_port_protocol": "TCP", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_pod_name": "testpod", + "__meta_kubernetes_namespace": "default", + "__meta_kubernetes_pod_node_name": "testnode", + "__meta_kubernetes_pod_ip": "1.2.3.4", + "__meta_kubernetes_pod_host_ip": "2.3.4.5", + "__meta_kubernetes_pod_ready": "true", + }, + Source: "pod/default/testpod", + }, + }, + expectedRes: []*config.TargetGroup{ + &config.TargetGroup{ + Source: "pod/default/testpod", + }, + }, + }.Run(t) +} + func TestPodDiscoveryUpdate(t *testing.T) { n, i := makeTestPodDiscovery() i.GetStore().Add(&v1.Pod{ diff --git a/retrieval/discovery/kubernetes/service.go b/retrieval/discovery/kubernetes/service.go index bb6cb6631..3acca8d55 100644 --- a/retrieval/discovery/kubernetes/service.go +++ b/retrieval/discovery/kubernetes/service.go @@ -14,6 +14,7 @@ package kubernetes import ( + "fmt" "net" "strconv" @@ -61,13 +62,28 @@ func (s *Service) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { } s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) { - send(s.buildService(o.(*apiv1.Service))) + svc, err := convertToService(o) + if err != nil { + s.logger.With("err", err).Errorln("converting to Service object failed") + return + } + send(s.buildService(svc)) }, DeleteFunc: func(o interface{}) { - send(&config.TargetGroup{Source: serviceSource(o.(*apiv1.Service))}) + svc, err := convertToService(o) + if err != nil { + s.logger.With("err", err).Errorln("converting to Service object failed") + return + } + send(&config.TargetGroup{Source: serviceSource(svc)}) }, UpdateFunc: func(_, o interface{}) { - send(s.buildService(o.(*apiv1.Service))) + svc, err := convertToService(o) + if err != nil { + s.logger.With("err", err).Errorln("converting to Service object failed") + return + } + send(s.buildService(svc)) }, }) @@ -75,6 +91,22 @@ func (s *Service) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { <-ctx.Done() } +func convertToService(o interface{}) (*apiv1.Service, error) { + service, isService := o.(*apiv1.Service) + if !isService { + deletedState, ok := o.(cache.DeletedFinalStateUnknown) + if !ok { + return nil, fmt.Errorf("Received unexpected object: %v", o) + } + service, ok = deletedState.Obj.(*apiv1.Service) + if !ok { + return nil, fmt.Errorf("DeletedFinalStateUnknown contained non-Service object: %v", deletedState.Obj) + } + } + + return service, nil +} + func serviceSource(s *apiv1.Service) string { return "svc/" + s.Namespace + "/" + s.Name } diff --git a/retrieval/discovery/kubernetes/service_test.go b/retrieval/discovery/kubernetes/service_test.go index d12084ac8..fc4fce73b 100644 --- a/retrieval/discovery/kubernetes/service_test.go +++ b/retrieval/discovery/kubernetes/service_test.go @@ -21,6 +21,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "k8s.io/client-go/1.5/pkg/api/v1" + "k8s.io/client-go/1.5/tools/cache" ) func serviceStoreKeyFunc(obj interface{}) (string, error) { @@ -171,6 +172,37 @@ func TestServiceDiscoveryDelete(t *testing.T) { }.Run(t) } +func TestServiceDiscoveryDeleteUnknownCacheState(t *testing.T) { + n, i := makeTestServiceDiscovery() + i.GetStore().Add(makeService()) + + k8sDiscoveryTest{ + discovery: n, + afterStart: func() { go func() { i.Delete(cache.DeletedFinalStateUnknown{Obj: makeService()}) }() }, + expectedInitial: []*config.TargetGroup{ + &config.TargetGroup{ + Targets: []model.LabelSet{ + model.LabelSet{ + "__meta_kubernetes_service_port_protocol": "TCP", + "__address__": "testservice.default.svc:30900", + "__meta_kubernetes_service_port_name": "testport", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_service_name": "testservice", + "__meta_kubernetes_namespace": "default", + }, + Source: "svc/default/testservice", + }, + }, + expectedRes: []*config.TargetGroup{ + &config.TargetGroup{ + Source: "svc/default/testservice", + }, + }, + }.Run(t) +} + func TestServiceDiscoveryUpdate(t *testing.T) { n, i := makeTestServiceDiscovery() i.GetStore().Add(makeService())