diff --git a/discovery/kubernetes/endpointslice_test.go b/discovery/kubernetes/endpointslice_test.go index f23c9e655..a6579b954 100644 --- a/discovery/kubernetes/endpointslice_test.go +++ b/discovery/kubernetes/endpointslice_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/discovery/v1" "k8s.io/api/discovery/v1beta1" @@ -1405,3 +1406,41 @@ func TestEndpointSliceDiscoveryEmptyPodStatus(t *testing.T) { expectedRes: map[string]*targetgroup.Group{}, }.Run(t) } + +// TestEndpointSliceInfIndexersCount makes sure that RoleEndpointSlice discovery +// sets up indexing for the main Kube informer only when needed. +// See: https://github.com/prometheus/prometheus/pull/13554#discussion_r1490965817 +func TestEndpointSliceInfIndexersCount(t *testing.T) { + tests := []struct { + name string + withNodeMetadata bool + }{ + {"with node metadata", true}, + {"without node metadata", false}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + var ( + n *Discovery + mainInfIndexersCount int + ) + if tc.withNodeMetadata { + mainInfIndexersCount = 1 + n, _ = makeDiscoveryWithMetadata(RoleEndpointSlice, NamespaceDiscovery{}, AttachMetadataConfig{Node: true}) + } else { + n, _ = makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{}) + } + + k8sDiscoveryTest{ + discovery: n, + afterStart: func() { + n.RLock() + defer n.RUnlock() + require.Len(t, n.discoverers, 1) + require.Len(t, n.discoverers[0].(*EndpointSlice).endpointSliceInf.GetIndexer().GetIndexers(), mainInfIndexersCount) + }, + }.Run(t) + }) + } +} diff --git a/discovery/kubernetes/kubernetes.go b/discovery/kubernetes/kubernetes.go index 489365fa4..d6b811584 100644 --- a/discovery/kubernetes/kubernetes.go +++ b/discovery/kubernetes/kubernetes.go @@ -815,7 +815,7 @@ func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.Share func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object runtime.Object) cache.SharedIndexInformer { indexers := make(map[string]cache.IndexFunc) if !d.attachMetadata.Node { - cache.NewSharedIndexInformer(plw, &disv1.EndpointSlice{}, resyncDisabled, indexers) + return cache.NewSharedIndexInformer(plw, object, resyncDisabled, indexers) } indexers[nodeIndex] = func(obj interface{}) ([]string, error) { diff --git a/discovery/kubernetes/kubernetes_test.go b/discovery/kubernetes/kubernetes_test.go index f1f8070e8..e3dd09300 100644 --- a/discovery/kubernetes/kubernetes_test.go +++ b/discovery/kubernetes/kubernetes_test.go @@ -128,7 +128,7 @@ func (d k8sDiscoveryTest) Run(t *testing.T) { } resChan := make(chan map[string]*targetgroup.Group) - go readResultWithTimeout(t, ch, d.expectedMaxItems, time.Second, resChan) + go readResultWithTimeout(t, ctx, ch, d.expectedMaxItems, time.Second, resChan) dd, ok := d.discovery.(hasSynced) require.True(t, ok, "discoverer does not implement hasSynced interface") @@ -141,13 +141,18 @@ func (d k8sDiscoveryTest) Run(t *testing.T) { if d.expectedRes != nil { res := <-resChan requireTargetGroups(t, d.expectedRes, res) + } else { + // Stop readResultWithTimeout and wait for it. + cancel() + <-resChan } } // readResultWithTimeout reads all targetgroups from channel with timeout. // It merges targetgroups by source and sends the result to result channel. -func readResultWithTimeout(t *testing.T, ch <-chan []*targetgroup.Group, max int, timeout time.Duration, resChan chan<- map[string]*targetgroup.Group) { +func readResultWithTimeout(t *testing.T, ctx context.Context, ch <-chan []*targetgroup.Group, max int, stopAfter time.Duration, resChan chan<- map[string]*targetgroup.Group) { res := make(map[string]*targetgroup.Group) + timeout := time.After(stopAfter) Loop: for { select { @@ -162,12 +167,15 @@ Loop: // Reached max target groups we may get, break fast. break Loop } - case <-time.After(timeout): + case <-timeout: // Because we use queue, an object that is created then // deleted or updated may be processed only once. // So possibly we may skip events, timed out here. t.Logf("timed out, got %d (max: %d) items, some events are skipped", len(res), max) break Loop + case <-ctx.Done(): + t.Logf("stopped, got %d (max: %d) items", len(res), max) + break Loop } }