diff --git a/discovery/kubernetes/kubernetes.go b/discovery/kubernetes/kubernetes.go index ac03eff53..592eab217 100644 --- a/discovery/kubernetes/kubernetes.go +++ b/discovery/kubernetes/kubernetes.go @@ -256,27 +256,6 @@ func New(l log.Logger, conf *SDConfig) (*Discovery, error) { const resyncPeriod = 10 * time.Minute -type hasSynced interface { - // hasSynced returns true if all informers' store has synced. - // This is only used in testing to determine when the cache stores have synced. - hasSynced() bool -} - -var _ hasSynced = &Discovery{} - -func (d *Discovery) hasSynced() bool { - d.RLock() - defer d.RUnlock() - for _, discoverer := range d.discoverers { - if hasSynceddiscoverer, ok := discoverer.(hasSynced); ok { - if !hasSynceddiscoverer.hasSynced() { - return false - } - } - } - return true -} - // Run implements the discoverer interface. func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { d.Lock() diff --git a/discovery/kubernetes/kubernetes_test.go b/discovery/kubernetes/kubernetes_test.go index 1b713c416..6e5ad3619 100644 --- a/discovery/kubernetes/kubernetes_test.go +++ b/discovery/kubernetes/kubernetes_test.go @@ -119,11 +119,14 @@ func (d k8sDiscoveryTest) Run(t *testing.T) { resChan := make(chan map[string]*targetgroup.Group) go readResultWithoutTimeout(t, ch, d.expectedMaxItems, time.Second, resChan) - if dd, ok := d.discovery.(hasSynced); ok { - if !cache.WaitForCacheSync(ctx.Done(), dd.hasSynced) { - t.Errorf("discoverer failed to sync: %v", dd) - return - } + dd, ok := d.discovery.(hasSynced) + if !ok { + t.Errorf("discoverer does not implement hasSynced interface") + return + } + if !cache.WaitForCacheSync(ctx.Done(), dd.hasSynced) { + t.Errorf("discoverer failed to sync: %v", dd) + return } if d.afterStart != nil { @@ -184,3 +187,49 @@ func requireTargetGroups(t *testing.T, expected, res map[string]*targetgroup.Gro require.JSONEq(t, string(b1), string(b2)) } + +type hasSynced interface { + // hasSynced returns true if all informers' store has synced. + // This is only used in testing to determine when the cache stores have synced. + hasSynced() bool +} + +var _ hasSynced = &Discovery{} +var _ hasSynced = &Node{} +var _ hasSynced = &Endpoints{} +var _ hasSynced = &Ingress{} +var _ hasSynced = &Pod{} +var _ hasSynced = &Service{} + +func (d *Discovery) hasSynced() bool { + d.RLock() + defer d.RUnlock() + for _, discoverer := range d.discoverers { + if hasSynceddiscoverer, ok := discoverer.(hasSynced); ok { + if !hasSynceddiscoverer.hasSynced() { + return false + } + } + } + return true +} + +func (n *Node) hasSynced() bool { + return n.informer.HasSynced() +} + +func (e *Endpoints) hasSynced() bool { + return e.endpointsInf.HasSynced() && e.serviceInf.HasSynced() && e.podInf.HasSynced() +} + +func (i *Ingress) hasSynced() bool { + return i.informer.HasSynced() +} + +func (p *Pod) hasSynced() bool { + return p.informer.HasSynced() +} + +func (s *Service) hasSynced() bool { + return s.informer.HasSynced() +} diff --git a/discovery/kubernetes/node.go b/discovery/kubernetes/node.go index f49466d2c..36e59e66f 100644 --- a/discovery/kubernetes/node.go +++ b/discovery/kubernetes/node.go @@ -39,7 +39,6 @@ type Node struct { } var _ discoverer = &Node{} -var _ hasSynced = &Node{} // NewNode returns a new node discovery. func NewNode(l log.Logger, inf cache.SharedInformer) *Node { @@ -73,10 +72,6 @@ func (e *Node) enqueue(obj interface{}) { e.queue.Add(key) } -func (n *Node) hasSynced() bool { - return n.informer.HasSynced() -} - // Run implements the Discoverer interface. func (n *Node) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { defer n.queue.ShutDown()