Simplify code.

- Unified `send` function.
- Pass InformerSynced functions to `cache.WaitForCacheSync`.
- Use `Role\w+` constants instead of literal string.

Signed-off-by: Yecheng Fu <cofyc.jackson@gmail.com>
This commit is contained in:
Yecheng Fu 2018-04-10 16:53:00 +08:00 committed by beorn7
parent 3a253f796c
commit 46683dd67d
6 changed files with 37 additions and 83 deletions

View File

@ -126,30 +126,13 @@ func (e *Endpoints) enqueue(obj interface{}) {
func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
defer e.queue.ShutDown()
cacheSyncs := []cache.InformerSynced{
e.endpointsInf.HasSynced,
e.serviceInf.HasSynced,
e.podInf.HasSynced,
}
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
if !cache.WaitForCacheSync(ctx.Done(), e.endpointsInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced) {
level.Error(e.logger).Log("msg", "endpoints informer unable to sync cache")
return
}
// Send target groups for pod updates.
send := func(tg *targetgroup.Group) {
if tg == nil {
return
}
level.Debug(e.logger).Log("msg", "endpoints update", "tg", fmt.Sprintf("%#v", tg))
select {
case <-ctx.Done():
case ch <- []*targetgroup.Group{tg}:
}
}
go func() {
for e.process(send) {
for e.process(ctx, ch) {
}
}()
@ -157,7 +140,7 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
<-ctx.Done()
}
func (e *Endpoints) process(send func(tg *targetgroup.Group)) bool {
func (e *Endpoints) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool {
keyObj, quit := e.queue.Get()
if quit {
return false
@ -177,7 +160,7 @@ func (e *Endpoints) process(send func(tg *targetgroup.Group)) bool {
return true
}
if !exists {
send(&targetgroup.Group{Source: endpointsSourceFromNamespaceAndName(namespace, name)})
send(ctx, e.logger, RoleEndpoint, ch, &targetgroup.Group{Source: endpointsSourceFromNamespaceAndName(namespace, name)})
return true
}
eps, err := convertToEndpoints(o)
@ -185,7 +168,7 @@ func (e *Endpoints) process(send func(tg *targetgroup.Group)) bool {
level.Error(e.logger).Log("msg", "converting to Endpoints object failed", "err", err)
return true
}
send(e.buildEndpoints(eps))
send(ctx, e.logger, RoleEndpoint, ch, e.buildEndpoints(eps))
return true
}

View File

@ -73,16 +73,8 @@ func (s *Ingress) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
return
}
// Send target groups for ingress updates.
send := func(tg *targetgroup.Group) {
select {
case <-ctx.Done():
case ch <- []*targetgroup.Group{tg}:
}
}
go func() {
for s.process(send) {
for s.process(ctx, ch) {
}
}()
@ -90,8 +82,7 @@ func (s *Ingress) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
<-ctx.Done()
}
func (s *Ingress) process(send func(tg *targetgroup.Group)) bool {
func (s *Ingress) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool {
keyObj, quit := s.queue.Get()
if quit {
return false
@ -109,7 +100,7 @@ func (s *Ingress) process(send func(tg *targetgroup.Group)) bool {
return true
}
if !exists {
send(&targetgroup.Group{Source: ingressSourceFromNamespaceAndName(namespace, name)})
send(ctx, s.logger, RoleIngress, ch, &targetgroup.Group{Source: ingressSourceFromNamespaceAndName(namespace, name)})
return true
}
eps, err := convertToIngress(o)
@ -117,7 +108,7 @@ func (s *Ingress) process(send func(tg *targetgroup.Group)) bool {
level.Error(s.logger).Log("msg", "converting to Ingress object failed", "err", err)
return true
}
send(s.buildIngress(eps))
send(ctx, s.logger, RoleIngress, ch, s.buildIngress(eps))
return true
}

View File

@ -262,7 +262,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
namespaces := d.getNamespaces()
switch d.role {
case "endpoints":
case RoleEndpoint:
for _, namespace := range namespaces {
elw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
@ -299,7 +299,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
go eps.serviceInf.Run(ctx.Done())
go eps.podInf.Run(ctx.Done())
}
case "pod":
case RolePod:
for _, namespace := range namespaces {
plw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
@ -316,7 +316,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
d.discoverers = append(d.discoverers, pod)
go pod.informer.Run(ctx.Done())
}
case "service":
case RoleService:
for _, namespace := range namespaces {
slw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
@ -333,7 +333,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
d.discoverers = append(d.discoverers, svc)
go svc.informer.Run(ctx.Done())
}
case "ingress":
case RoleIngress:
for _, namespace := range namespaces {
ilw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
@ -350,7 +350,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
d.discoverers = append(d.discoverers, ingress)
go ingress.informer.Run(ctx.Done())
}
case "node":
case RoleNode:
nlw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return d.client.CoreV1().Nodes().List(options)
@ -385,3 +385,14 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
func lv(s string) model.LabelValue {
return model.LabelValue(s)
}
func send(ctx context.Context, l log.Logger, role Role, ch chan<- []*targetgroup.Group, tg *targetgroup.Group) {
if tg == nil {
return
}
level.Debug(l).Log("msg", "kubernetes discovery update", "role", string(role), "tg", fmt.Sprintf("%#v", tg))
select {
case <-ctx.Done():
case ch <- []*targetgroup.Group{tg}:
}
}

View File

@ -81,19 +81,8 @@ func (n *Node) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
return
}
// Send target groups for service updates.
send := func(tg *targetgroup.Group) {
if tg == nil {
return
}
select {
case <-ctx.Done():
case ch <- []*targetgroup.Group{tg}:
}
}
go func() {
for n.process(send) {
for n.process(ctx, ch) {
}
}()
@ -101,7 +90,7 @@ func (n *Node) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
<-ctx.Done()
}
func (n *Node) process(send func(tg *targetgroup.Group)) bool {
func (n *Node) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool {
keyObj, quit := n.queue.Get()
if quit {
return false
@ -119,7 +108,7 @@ func (n *Node) process(send func(tg *targetgroup.Group)) bool {
return true
}
if !exists {
send(&targetgroup.Group{Source: nodeSourceFromName(name)})
send(ctx, n.logger, RoleNode, ch, &targetgroup.Group{Source: nodeSourceFromName(name)})
return true
}
node, err := convertToNode(o)
@ -127,7 +116,7 @@ func (n *Node) process(send func(tg *targetgroup.Group)) bool {
level.Error(n.logger).Log("msg", "converting to Node object failed", "err", err)
return true
}
send(n.buildNode(node))
send(ctx, n.logger, RoleNode, ch, n.buildNode(node))
return true
}

View File

@ -86,20 +86,8 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
return
}
// Send target groups for pod updates.
send := func(tg *targetgroup.Group) {
if tg == nil {
return
}
level.Debug(p.logger).Log("msg", "pod update", "tg", fmt.Sprintf("%#v", tg))
select {
case <-ctx.Done():
case ch <- []*targetgroup.Group{tg}:
}
}
go func() {
for p.process(send) {
for p.process(ctx, ch) {
}
}()
@ -107,7 +95,7 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
<-ctx.Done()
}
func (p *Pod) process(send func(tg *targetgroup.Group)) bool {
func (p *Pod) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool {
keyObj, quit := p.queue.Get()
if quit {
return false
@ -125,7 +113,7 @@ func (p *Pod) process(send func(tg *targetgroup.Group)) bool {
return true
}
if !exists {
send(&targetgroup.Group{Source: podSourceFromNamespaceAndName(namespace, name)})
send(ctx, p.logger, RolePod, ch, &targetgroup.Group{Source: podSourceFromNamespaceAndName(namespace, name)})
return true
}
eps, err := convertToPod(o)
@ -133,7 +121,7 @@ func (p *Pod) process(send func(tg *targetgroup.Group)) bool {
level.Error(p.logger).Log("msg", "converting to Pod object failed", "err", err)
return true
}
send(p.buildPod(eps))
send(ctx, p.logger, RolePod, ch, p.buildPod(eps))
return true
}

View File

@ -79,16 +79,8 @@ func (s *Service) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
return
}
// Send target groups for service updates.
send := func(tg *targetgroup.Group) {
select {
case <-ctx.Done():
case ch <- []*targetgroup.Group{tg}:
}
}
go func() {
for s.process(send) {
for s.process(ctx, ch) {
}
}()
@ -96,7 +88,7 @@ func (s *Service) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
<-ctx.Done()
}
func (s *Service) process(send func(tg *targetgroup.Group)) bool {
func (s *Service) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool {
keyObj, quit := s.queue.Get()
if quit {
return false
@ -114,7 +106,7 @@ func (s *Service) process(send func(tg *targetgroup.Group)) bool {
return true
}
if !exists {
send(&targetgroup.Group{Source: serviceSourceFromNamespaceAndName(namespace, name)})
send(ctx, s.logger, RoleService, ch, &targetgroup.Group{Source: serviceSourceFromNamespaceAndName(namespace, name)})
return true
}
eps, err := convertToService(o)
@ -122,7 +114,7 @@ func (s *Service) process(send func(tg *targetgroup.Group)) bool {
level.Error(s.logger).Log("msg", "converting to Service object failed", "err", err)
return true
}
send(s.buildService(eps))
send(ctx, s.logger, RoleService, ch, s.buildService(eps))
return true
}