From 581d8d86b4c3a33ec1e4a7099456ab370d1b6d34 Mon Sep 17 00:00:00 2001 From: Ayoub Mrini Date: Thu, 1 Feb 2024 13:34:37 +0100 Subject: [PATCH] Pod status changes not discovered by Kube Endpoints SD (#13337) * fix(discovery/kubernetes/endpoints): react to changes on Pods because some modifications can occur on them without triggering an update on the related Endpoints (The Pod phase changing from Pending to Running e.g.). --------- Signed-off-by: machine424 Co-authored-by: Guillermo Sanchez Gavier --- discovery/kubernetes/endpoints.go | 39 +++++++- discovery/kubernetes/endpoints_test.go | 120 +++++++++++++++++++++++++ discovery/kubernetes/endpointslice.go | 2 +- discovery/kubernetes/kubernetes.go | 19 ++++ discovery/kubernetes/pod.go | 7 +- 5 files changed, 183 insertions(+), 4 deletions(-) diff --git a/discovery/kubernetes/endpoints.go b/discovery/kubernetes/endpoints.go index d8c9689ca..c7a60ae6d 100644 --- a/discovery/kubernetes/endpoints.go +++ b/discovery/kubernetes/endpoints.go @@ -62,6 +62,8 @@ func NewEndpoints(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node ca svcUpdateCount := eventCount.WithLabelValues(RoleService.String(), MetricLabelRoleUpdate) svcDeleteCount := eventCount.WithLabelValues(RoleService.String(), MetricLabelRoleDelete) + podUpdateCount := eventCount.WithLabelValues(RolePod.String(), MetricLabelRoleUpdate) + e := &Endpoints{ logger: l, endpointsInf: eps, @@ -131,6 +133,29 @@ func NewEndpoints(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node ca if err != nil { level.Error(l).Log("msg", "Error adding services event handler.", "err", err) } + _, err = e.podInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(old, cur interface{}) { + podUpdateCount.Inc() + oldPod, ok := old.(*apiv1.Pod) + if !ok { + return + } + + curPod, ok := cur.(*apiv1.Pod) + if !ok { + return + } + + // the Pod's phase may change without triggering an update on the Endpoints/Service. + // https://github.com/prometheus/prometheus/issues/11305. + if curPod.Status.Phase != oldPod.Status.Phase { + e.enqueuePod(namespacedName(curPod.Namespace, curPod.Name)) + } + }, + }) + if err != nil { + level.Error(l).Log("msg", "Error adding pods event handler.", "err", err) + } if e.withNodeMetadata { _, err = e.nodeInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) { @@ -166,6 +191,18 @@ func (e *Endpoints) enqueueNode(nodeName string) { } } +func (e *Endpoints) enqueuePod(podNamespacedName string) { + endpoints, err := e.endpointsInf.GetIndexer().ByIndex(podIndex, podNamespacedName) + if err != nil { + level.Error(e.logger).Log("msg", "Error getting endpoints for pod", "pod", podNamespacedName, "err", err) + return + } + + for _, endpoint := range endpoints { + e.enqueue(endpoint) + } +} + func (e *Endpoints) enqueue(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { @@ -312,7 +349,7 @@ func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *targetgroup.Group { tg.Targets = append(tg.Targets, target) return } - s := pod.Namespace + "/" + pod.Name + s := namespacedName(pod.Namespace, pod.Name) sp, ok := seenPods[s] if !ok { diff --git a/discovery/kubernetes/endpoints_test.go b/discovery/kubernetes/endpoints_test.go index cf7fd9aee..e877657db 100644 --- a/discovery/kubernetes/endpoints_test.go +++ b/discovery/kubernetes/endpoints_test.go @@ -969,3 +969,123 @@ func TestEndpointsDiscoveryEmptyPodStatus(t *testing.T) { expectedRes: map[string]*targetgroup.Group{}, }.Run(t) } + +// TestEndpointsUpdatePod makes sure that Endpoints discovery detects underlying Pods changes. +// See https://github.com/prometheus/prometheus/issues/11305 for more details. +func TestEndpointsDiscoveryUpdatePod(t *testing.T) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + Namespace: "default", + UID: types.UID("deadbeef"), + }, + Spec: v1.PodSpec{ + NodeName: "testnode", + Containers: []v1.Container{ + { + Name: "c1", + Image: "c1:latest", + Ports: []v1.ContainerPort{ + { + Name: "mainport", + ContainerPort: 9000, + Protocol: v1.ProtocolTCP, + }, + }, + }, + }, + }, + Status: v1.PodStatus{ + // Pod is in Pending phase when discovered for first time. + Phase: "Pending", + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionFalse, + }, + }, + HostIP: "2.3.4.5", + PodIP: "4.3.2.1", + }, + } + objs := []runtime.Object{ + &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testendpoints", + Namespace: "default", + }, + Subsets: []v1.EndpointSubset{ + { + Addresses: []v1.EndpointAddress{ + { + IP: "4.3.2.1", + // The Pending Pod may be included because the Endpoints was created manually. + // Or because the corresponding service has ".spec.publishNotReadyAddresses: true". + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Name: "testpod", + Namespace: "default", + }, + }, + }, + Ports: []v1.EndpointPort{ + { + Name: "mainport", + Port: 9000, + Protocol: v1.ProtocolTCP, + }, + }, + }, + }, + }, + pod, + } + n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, objs...) + + k8sDiscoveryTest{ + discovery: n, + afterStart: func() { + // the Pod becomes Ready. + pod.Status.Phase = "Running" + pod.Status.Conditions = []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + } + c.CoreV1().Pods(pod.Namespace).Update(context.Background(), pod, metav1.UpdateOptions{}) + }, + expectedMaxItems: 2, + expectedRes: map[string]*targetgroup.Group{ + "endpoints/default/testendpoints": { + Targets: []model.LabelSet{ + { + "__address__": "4.3.2.1:9000", + "__meta_kubernetes_endpoint_port_name": "mainport", + "__meta_kubernetes_endpoint_port_protocol": "TCP", + "__meta_kubernetes_endpoint_ready": "true", + "__meta_kubernetes_endpoint_address_target_kind": "Pod", + "__meta_kubernetes_endpoint_address_target_name": "testpod", + "__meta_kubernetes_pod_name": "testpod", + "__meta_kubernetes_pod_ip": "4.3.2.1", + "__meta_kubernetes_pod_ready": "true", + "__meta_kubernetes_pod_phase": "Running", + "__meta_kubernetes_pod_node_name": "testnode", + "__meta_kubernetes_pod_host_ip": "2.3.4.5", + "__meta_kubernetes_pod_container_name": "c1", + "__meta_kubernetes_pod_container_image": "c1:latest", + "__meta_kubernetes_pod_container_port_name": "mainport", + "__meta_kubernetes_pod_container_port_number": "9000", + "__meta_kubernetes_pod_container_port_protocol": "TCP", + "__meta_kubernetes_pod_uid": "deadbeef", + }, + }, + Labels: model.LabelSet{ + "__meta_kubernetes_namespace": "default", + "__meta_kubernetes_endpoints_name": "testendpoints", + }, + Source: "endpoints/default/testendpoints", + }, + }, + }.Run(t) +} diff --git a/discovery/kubernetes/endpointslice.go b/discovery/kubernetes/endpointslice.go index f977ce262..116f02076 100644 --- a/discovery/kubernetes/endpointslice.go +++ b/discovery/kubernetes/endpointslice.go @@ -358,7 +358,7 @@ func (e *EndpointSlice) buildEndpointSlice(eps endpointSliceAdaptor) *targetgrou tg.Targets = append(tg.Targets, target) return } - s := pod.Namespace + "/" + pod.Name + s := namespacedName(pod.Namespace, pod.Name) sp, ok := seenPods[s] if !ok { diff --git a/discovery/kubernetes/kubernetes.go b/discovery/kubernetes/kubernetes.go index 362e05326..489365fa4 100644 --- a/discovery/kubernetes/kubernetes.go +++ b/discovery/kubernetes/kubernetes.go @@ -767,6 +767,21 @@ func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedInde func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer { indexers := make(map[string]cache.IndexFunc) + indexers[podIndex] = func(obj interface{}) ([]string, error) { + e, ok := obj.(*apiv1.Endpoints) + if !ok { + return nil, fmt.Errorf("object is not endpoints") + } + var pods []string + for _, target := range e.Subsets { + for _, addr := range target.Addresses { + if addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" { + pods = append(pods, namespacedName(addr.TargetRef.Namespace, addr.TargetRef.Name)) + } + } + } + return pods, nil + } if !d.attachMetadata.Node { return cache.NewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers) } @@ -872,3 +887,7 @@ func addObjectMetaLabels(labelSet model.LabelSet, objectMeta metav1.ObjectMeta, labelSet[model.LabelName(metaLabelPrefix+string(role)+"_annotationpresent_"+ln)] = presentValue } } + +func namespacedName(namespace, name string) string { + return namespace + "/" + name +} diff --git a/discovery/kubernetes/pod.go b/discovery/kubernetes/pod.go index 615717c13..02990e415 100644 --- a/discovery/kubernetes/pod.go +++ b/discovery/kubernetes/pod.go @@ -33,7 +33,10 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" ) -const nodeIndex = "node" +const ( + nodeIndex = "node" + podIndex = "pod" +) // Pod discovers new pod targets. type Pod struct { @@ -326,7 +329,7 @@ func podSource(pod *apiv1.Pod) string { } func podSourceFromNamespaceAndName(namespace, name string) string { - return "pod/" + namespace + "/" + name + return "pod/" + namespacedName(namespace, name) } func podReady(pod *apiv1.Pod) model.LabelValue {