mirror of
https://github.com/prometheus/prometheus
synced 2024-12-26 00:23:18 +00:00
Pod status changes not discovered by Kube Endpoints SD (#13337)
* fix(discovery/kubernetes/endpoints): react to changes on Pods because some modifications can occur on them without triggering an update on the related Endpoints (The Pod phase changing from Pending to Running e.g.). --------- Signed-off-by: machine424 <ayoubmrini424@gmail.com> Co-authored-by: Guillermo Sanchez Gavier <gsanchez@newrelic.com>
This commit is contained in:
parent
34875ae8c7
commit
581d8d86b4
@ -62,6 +62,8 @@ func NewEndpoints(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node ca
|
||||
svcUpdateCount := eventCount.WithLabelValues(RoleService.String(), MetricLabelRoleUpdate)
|
||||
svcDeleteCount := eventCount.WithLabelValues(RoleService.String(), MetricLabelRoleDelete)
|
||||
|
||||
podUpdateCount := eventCount.WithLabelValues(RolePod.String(), MetricLabelRoleUpdate)
|
||||
|
||||
e := &Endpoints{
|
||||
logger: l,
|
||||
endpointsInf: eps,
|
||||
@ -131,6 +133,29 @@ func NewEndpoints(l log.Logger, eps cache.SharedIndexInformer, svc, pod, node ca
|
||||
if err != nil {
|
||||
level.Error(l).Log("msg", "Error adding services event handler.", "err", err)
|
||||
}
|
||||
_, err = e.podInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
podUpdateCount.Inc()
|
||||
oldPod, ok := old.(*apiv1.Pod)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
curPod, ok := cur.(*apiv1.Pod)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// the Pod's phase may change without triggering an update on the Endpoints/Service.
|
||||
// https://github.com/prometheus/prometheus/issues/11305.
|
||||
if curPod.Status.Phase != oldPod.Status.Phase {
|
||||
e.enqueuePod(namespacedName(curPod.Namespace, curPod.Name))
|
||||
}
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
level.Error(l).Log("msg", "Error adding pods event handler.", "err", err)
|
||||
}
|
||||
if e.withNodeMetadata {
|
||||
_, err = e.nodeInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(o interface{}) {
|
||||
@ -166,6 +191,18 @@ func (e *Endpoints) enqueueNode(nodeName string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Endpoints) enqueuePod(podNamespacedName string) {
|
||||
endpoints, err := e.endpointsInf.GetIndexer().ByIndex(podIndex, podNamespacedName)
|
||||
if err != nil {
|
||||
level.Error(e.logger).Log("msg", "Error getting endpoints for pod", "pod", podNamespacedName, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, endpoint := range endpoints {
|
||||
e.enqueue(endpoint)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Endpoints) enqueue(obj interface{}) {
|
||||
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||
if err != nil {
|
||||
@ -312,7 +349,7 @@ func (e *Endpoints) buildEndpoints(eps *apiv1.Endpoints) *targetgroup.Group {
|
||||
tg.Targets = append(tg.Targets, target)
|
||||
return
|
||||
}
|
||||
s := pod.Namespace + "/" + pod.Name
|
||||
s := namespacedName(pod.Namespace, pod.Name)
|
||||
|
||||
sp, ok := seenPods[s]
|
||||
if !ok {
|
||||
|
@ -969,3 +969,123 @@ func TestEndpointsDiscoveryEmptyPodStatus(t *testing.T) {
|
||||
expectedRes: map[string]*targetgroup.Group{},
|
||||
}.Run(t)
|
||||
}
|
||||
|
||||
// TestEndpointsUpdatePod makes sure that Endpoints discovery detects underlying Pods changes.
|
||||
// See https://github.com/prometheus/prometheus/issues/11305 for more details.
|
||||
func TestEndpointsDiscoveryUpdatePod(t *testing.T) {
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "testpod",
|
||||
Namespace: "default",
|
||||
UID: types.UID("deadbeef"),
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: "testnode",
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "c1",
|
||||
Image: "c1:latest",
|
||||
Ports: []v1.ContainerPort{
|
||||
{
|
||||
Name: "mainport",
|
||||
ContainerPort: 9000,
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
// Pod is in Pending phase when discovered for first time.
|
||||
Phase: "Pending",
|
||||
Conditions: []v1.PodCondition{
|
||||
{
|
||||
Type: v1.PodReady,
|
||||
Status: v1.ConditionFalse,
|
||||
},
|
||||
},
|
||||
HostIP: "2.3.4.5",
|
||||
PodIP: "4.3.2.1",
|
||||
},
|
||||
}
|
||||
objs := []runtime.Object{
|
||||
&v1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "testendpoints",
|
||||
Namespace: "default",
|
||||
},
|
||||
Subsets: []v1.EndpointSubset{
|
||||
{
|
||||
Addresses: []v1.EndpointAddress{
|
||||
{
|
||||
IP: "4.3.2.1",
|
||||
// The Pending Pod may be included because the Endpoints was created manually.
|
||||
// Or because the corresponding service has ".spec.publishNotReadyAddresses: true".
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Name: "testpod",
|
||||
Namespace: "default",
|
||||
},
|
||||
},
|
||||
},
|
||||
Ports: []v1.EndpointPort{
|
||||
{
|
||||
Name: "mainport",
|
||||
Port: 9000,
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
pod,
|
||||
}
|
||||
n, c := makeDiscovery(RoleEndpoint, NamespaceDiscovery{}, objs...)
|
||||
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
afterStart: func() {
|
||||
// the Pod becomes Ready.
|
||||
pod.Status.Phase = "Running"
|
||||
pod.Status.Conditions = []v1.PodCondition{
|
||||
{
|
||||
Type: v1.PodReady,
|
||||
Status: v1.ConditionTrue,
|
||||
},
|
||||
}
|
||||
c.CoreV1().Pods(pod.Namespace).Update(context.Background(), pod, metav1.UpdateOptions{})
|
||||
},
|
||||
expectedMaxItems: 2,
|
||||
expectedRes: map[string]*targetgroup.Group{
|
||||
"endpoints/default/testendpoints": {
|
||||
Targets: []model.LabelSet{
|
||||
{
|
||||
"__address__": "4.3.2.1:9000",
|
||||
"__meta_kubernetes_endpoint_port_name": "mainport",
|
||||
"__meta_kubernetes_endpoint_port_protocol": "TCP",
|
||||
"__meta_kubernetes_endpoint_ready": "true",
|
||||
"__meta_kubernetes_endpoint_address_target_kind": "Pod",
|
||||
"__meta_kubernetes_endpoint_address_target_name": "testpod",
|
||||
"__meta_kubernetes_pod_name": "testpod",
|
||||
"__meta_kubernetes_pod_ip": "4.3.2.1",
|
||||
"__meta_kubernetes_pod_ready": "true",
|
||||
"__meta_kubernetes_pod_phase": "Running",
|
||||
"__meta_kubernetes_pod_node_name": "testnode",
|
||||
"__meta_kubernetes_pod_host_ip": "2.3.4.5",
|
||||
"__meta_kubernetes_pod_container_name": "c1",
|
||||
"__meta_kubernetes_pod_container_image": "c1:latest",
|
||||
"__meta_kubernetes_pod_container_port_name": "mainport",
|
||||
"__meta_kubernetes_pod_container_port_number": "9000",
|
||||
"__meta_kubernetes_pod_container_port_protocol": "TCP",
|
||||
"__meta_kubernetes_pod_uid": "deadbeef",
|
||||
},
|
||||
},
|
||||
Labels: model.LabelSet{
|
||||
"__meta_kubernetes_namespace": "default",
|
||||
"__meta_kubernetes_endpoints_name": "testendpoints",
|
||||
},
|
||||
Source: "endpoints/default/testendpoints",
|
||||
},
|
||||
},
|
||||
}.Run(t)
|
||||
}
|
||||
|
@ -358,7 +358,7 @@ func (e *EndpointSlice) buildEndpointSlice(eps endpointSliceAdaptor) *targetgrou
|
||||
tg.Targets = append(tg.Targets, target)
|
||||
return
|
||||
}
|
||||
s := pod.Namespace + "/" + pod.Name
|
||||
s := namespacedName(pod.Namespace, pod.Name)
|
||||
|
||||
sp, ok := seenPods[s]
|
||||
if !ok {
|
||||
|
@ -767,6 +767,21 @@ func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedInde
|
||||
|
||||
func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer {
|
||||
indexers := make(map[string]cache.IndexFunc)
|
||||
indexers[podIndex] = func(obj interface{}) ([]string, error) {
|
||||
e, ok := obj.(*apiv1.Endpoints)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("object is not endpoints")
|
||||
}
|
||||
var pods []string
|
||||
for _, target := range e.Subsets {
|
||||
for _, addr := range target.Addresses {
|
||||
if addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" {
|
||||
pods = append(pods, namespacedName(addr.TargetRef.Namespace, addr.TargetRef.Name))
|
||||
}
|
||||
}
|
||||
}
|
||||
return pods, nil
|
||||
}
|
||||
if !d.attachMetadata.Node {
|
||||
return cache.NewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers)
|
||||
}
|
||||
@ -872,3 +887,7 @@ func addObjectMetaLabels(labelSet model.LabelSet, objectMeta metav1.ObjectMeta,
|
||||
labelSet[model.LabelName(metaLabelPrefix+string(role)+"_annotationpresent_"+ln)] = presentValue
|
||||
}
|
||||
}
|
||||
|
||||
func namespacedName(namespace, name string) string {
|
||||
return namespace + "/" + name
|
||||
}
|
||||
|
@ -33,7 +33,10 @@ import (
|
||||
"github.com/prometheus/prometheus/discovery/targetgroup"
|
||||
)
|
||||
|
||||
const nodeIndex = "node"
|
||||
const (
|
||||
nodeIndex = "node"
|
||||
podIndex = "pod"
|
||||
)
|
||||
|
||||
// Pod discovers new pod targets.
|
||||
type Pod struct {
|
||||
@ -326,7 +329,7 @@ func podSource(pod *apiv1.Pod) string {
|
||||
}
|
||||
|
||||
func podSourceFromNamespaceAndName(namespace, name string) string {
|
||||
return "pod/" + namespace + "/" + name
|
||||
return "pod/" + namespacedName(namespace, name)
|
||||
}
|
||||
|
||||
func podReady(pod *apiv1.Pod) model.LabelValue {
|
||||
|
Loading…
Reference in New Issue
Block a user