diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index 979f7b148..76e79843e 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -70,16 +70,13 @@ type Discovery struct { client *http.Client Conf *config.KubernetesSDConfig - apiServers []config.URL - apiServersMu sync.RWMutex - nodesResourceVersion string - servicesResourceVersion string - endpointsResourceVersion string - nodes map[string]*Node - services map[string]map[string]*Service - nodesMu sync.RWMutex - servicesMu sync.RWMutex - runDone chan struct{} + apiServers []config.URL + apiServersMu sync.RWMutex + nodes map[string]*Node + services map[string]map[string]*Service + nodesMu sync.RWMutex + servicesMu sync.RWMutex + runDone chan struct{} } // Initialize sets up the discovery for usage. @@ -92,8 +89,6 @@ func (kd *Discovery) Initialize() error { kd.apiServers = kd.Conf.APIServers kd.client = client - kd.nodes = map[string]*Node{} - kd.services = map[string]map[string]*Service{} kd.runDone = make(chan struct{}) return nil @@ -106,67 +101,42 @@ func (kd *Discovery) Sources() []string { sourceNames = append(sourceNames, apiServersTargetGroupName+":"+apiServer.Host) } - res, err := kd.queryAPIServerPath(nodesURL) + nodes, _, err := kd.getNodes() if err != nil { // If we can't list nodes then we can't watch them. Assume this is a misconfiguration // & log & return empty. - log.Errorf("Unable to list Kubernetes nodes: %s", err) - return []string{} - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Unable to list Kubernetes nodes. Unexpected response: %d %s", res.StatusCode, res.Status) + log.Errorf("Unable to initialize Kubernetes nodes: %s", err) return []string{} } + sourceNames = append(sourceNames, kd.nodeSources(nodes)...) - var nodes NodeList - if err := json.NewDecoder(res.Body).Decode(&nodes); err != nil { - body, _ := ioutil.ReadAll(res.Body) - log.Errorf("Unable to list Kubernetes nodes. Unexpected response body: %s", string(body)) - return []string{} - } - - kd.nodesMu.Lock() - defer kd.nodesMu.Unlock() - - kd.nodesResourceVersion = nodes.ResourceVersion - for idx, node := range nodes.Items { - sourceNames = append(sourceNames, nodesTargetGroupName+":"+node.ObjectMeta.Name) - kd.nodes[node.ObjectMeta.Name] = &nodes.Items[idx] - } - - res, err = kd.queryAPIServerPath(servicesURL) + services, _, err := kd.getServices() if err != nil { // If we can't list services then we can't watch them. Assume this is a misconfiguration // & log & return empty. - log.Errorf("Unable to list Kubernetes services: %s", err) + log.Errorf("Unable to initialize Kubernetes services: %s", err) return []string{} } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Unable to list Kubernetes services. Unexpected response: %d %s", res.StatusCode, res.Status) - return []string{} - } - var services ServiceList - if err := json.NewDecoder(res.Body).Decode(&services); err != nil { - body, _ := ioutil.ReadAll(res.Body) - log.Errorf("Unable to list Kubernetes services. Unexpected response body: %s", string(body)) - return []string{} - } - kd.servicesMu.Lock() - defer kd.servicesMu.Unlock() + sourceNames = append(sourceNames, kd.serviceSources(services)...) - kd.servicesResourceVersion = services.ResourceVersion - for idx, service := range services.Items { - sourceNames = append(sourceNames, serviceSource(&service)) - namespace, ok := kd.services[service.ObjectMeta.Namespace] - if !ok { - namespace = map[string]*Service{} - kd.services[service.ObjectMeta.Namespace] = namespace + return sourceNames +} + +func (kd *Discovery) nodeSources(nodes map[string]*Node) []string { + var sourceNames []string + for name := range nodes { + sourceNames = append(sourceNames, nodesTargetGroupName+":"+name) + } + return sourceNames +} + +func (kd *Discovery) serviceSources(services map[string]map[string]*Service) []string { + var sourceNames []string + for _, ns := range services { + for _, service := range ns { + sourceNames = append(sourceNames, serviceSource(service)) } - namespace[service.ObjectMeta.Name] = &services.Items[idx] } - return sourceNames } @@ -182,37 +152,12 @@ func (kd *Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { } } - if tg := kd.updateNodesTargetGroup(); tg != nil { - select { - case ch <- *tg: - case <-done: - return - } - } - - for _, ns := range kd.services { - for _, service := range ns { - tg := kd.addService(service) - - if tg == nil { - continue - } - - select { - case ch <- *tg: - case <-done: - return - } - } - } - retryInterval := time.Duration(kd.Conf.RetryInterval) update := make(chan interface{}, 10) go kd.watchNodes(update, done, retryInterval) - go kd.watchServices(update, done, retryInterval) - go kd.watchServiceEndpoints(update, done, retryInterval) + go kd.startServiceWatch(update, done, retryInterval) var tg *config.TargetGroup for { @@ -308,8 +253,8 @@ func (kd *Discovery) updateAPIServersTargetGroup() *config.TargetGroup { } func (kd *Discovery) updateNodesTargetGroup() *config.TargetGroup { - kd.nodesMu.Lock() - defer kd.nodesMu.Unlock() + kd.nodesMu.RLock() + defer kd.nodesMu.RUnlock() tg := &config.TargetGroup{ Source: nodesTargetGroupName, @@ -350,17 +295,86 @@ func (kd *Discovery) updateNode(node *Node, eventType EventType) { } } +func (kd *Discovery) getNodes() (map[string]*Node, string, error) { + res, err := kd.queryAPIServerPath(nodesURL) + if err != nil { + // If we can't list nodes then we can't watch them. Assume this is a misconfiguration + // & return error. + return nil, "", fmt.Errorf("Unable to list Kubernetes nodes: %s", err) + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return nil, "", fmt.Errorf("Unable to list Kubernetes nodes. Unexpected response: %d %s", res.StatusCode, res.Status) + } + + var nodes NodeList + if err := json.NewDecoder(res.Body).Decode(&nodes); err != nil { + body, _ := ioutil.ReadAll(res.Body) + return nil, "", fmt.Errorf("Unable to list Kubernetes nodes. Unexpected response body: %s", string(body)) + } + + nodeMap := map[string]*Node{} + for idx, node := range nodes.Items { + nodeMap[node.ObjectMeta.Name] = &nodes.Items[idx] + } + + return nodeMap, nodes.ResourceVersion, nil +} + +func (kd *Discovery) getServices() (map[string]map[string]*Service, string, error) { + res, err := kd.queryAPIServerPath(servicesURL) + if err != nil { + // If we can't list services then we can't watch them. Assume this is a misconfiguration + // & return error. + return nil, "", fmt.Errorf("Unable to list Kubernetes services: %s", err) + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return nil, "", fmt.Errorf("Unable to list Kubernetes services. Unexpected response: %d %s", res.StatusCode, res.Status) + } + var services ServiceList + if err := json.NewDecoder(res.Body).Decode(&services); err != nil { + body, _ := ioutil.ReadAll(res.Body) + return nil, "", fmt.Errorf("Unable to list Kubernetes services. Unexpected response body: %s", string(body)) + } + + serviceMap := map[string]map[string]*Service{} + for idx, service := range services.Items { + namespace, ok := serviceMap[service.ObjectMeta.Namespace] + if !ok { + namespace = map[string]*Service{} + serviceMap[service.ObjectMeta.Namespace] = namespace + } + namespace[service.ObjectMeta.Name] = &services.Items[idx] + } + + return serviceMap, services.ResourceVersion, nil +} + // watchNodes watches nodes as they come & go. func (kd *Discovery) watchNodes(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) { until(func() { + nodes, resourceVersion, err := kd.getNodes() + if err != nil { + log.Errorf("Cannot initialize nodes collection: %s", err) + return + } + + // Reset the known nodes. + kd.nodes = map[string]*Node{} + + for _, node := range nodes { + events <- &nodeEvent{added, node} + } + req, err := http.NewRequest("GET", nodesURL, nil) if err != nil { - log.Errorf("Failed to watch nodes: %s", err) + log.Errorf("Cannot create nodes request: %s", err) return } values := req.URL.Query() values.Add("watch", "true") - values.Add("resourceVersion", kd.nodesResourceVersion) + values.Add("resourceVersion", resourceVersion) req.URL.RawQuery = values.Encode() res, err := kd.queryAPIServerReq(req) if err != nil { @@ -378,10 +392,9 @@ func (kd *Discovery) watchNodes(events chan interface{}, done <-chan struct{}, r for { var event nodeEvent if err := d.Decode(&event); err != nil { - log.Errorf("Failed to watch nodes: %s", err) + log.Errorf("Watch nodes unexpectedly closed: %s", err) return } - kd.nodesResourceVersion = event.Node.ObjectMeta.ResourceVersion select { case events <- &event: @@ -392,62 +405,146 @@ func (kd *Discovery) watchNodes(events chan interface{}, done <-chan struct{}, r } // watchServices watches services as they come & go. -func (kd *Discovery) watchServices(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) { +func (kd *Discovery) startServiceWatch(events chan<- interface{}, done <-chan struct{}, retryInterval time.Duration) { until(func() { - req, err := http.NewRequest("GET", servicesURL, nil) + // We use separate target groups for each discovered service so we'll need to clean up any if they've been deleted + // in Kubernetes while we couldn't connect - small chance of this, but worth dealing with. + existingServices := kd.services + + // Reset the known services. + kd.services = map[string]map[string]*Service{} + + services, resourceVersion, err := kd.getServices() if err != nil { - log.Errorf("Failed to watch services: %s", err) - return - } - values := req.URL.Query() - values.Add("watch", "true") - values.Add("resourceVersion", kd.servicesResourceVersion) - req.URL.RawQuery = values.Encode() - - res, err := kd.queryAPIServerReq(req) - if err != nil { - log.Errorf("Failed to watch services: %s", err) - return - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Failed to watch services: %d", res.StatusCode) + log.Errorf("Cannot initialize services collection: %s", err) return } - d := json.NewDecoder(res.Body) - - for { - var event serviceEvent - if err := d.Decode(&event); err != nil { - log.Errorf("Unable to watch services: %s", err) - return - } - kd.servicesResourceVersion = event.Service.ObjectMeta.ResourceVersion - - select { - case events <- &event: - case <-done: + // Now let's loop through the old services & see if they still exist in here + for oldNSName, oldNS := range existingServices { + if ns, ok := services[oldNSName]; !ok { + for _, service := range existingServices[oldNSName] { + events <- &serviceEvent{deleted, service} + } + } else { + for oldServiceName, oldService := range oldNS { + if _, ok := ns[oldServiceName]; !ok { + events <- &serviceEvent{deleted, oldService} + } + } } } + + // Discard the existing services map for GC. + existingServices = nil + + for _, ns := range services { + for _, service := range ns { + events <- &serviceEvent{added, service} + } + } + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + kd.watchServices(resourceVersion, events, done) + wg.Done() + }() + go func() { + kd.watchServiceEndpoints(resourceVersion, events, done) + wg.Done() + }() + + wg.Wait() }, retryInterval, done) } +func (kd *Discovery) watchServices(resourceVersion string, events chan<- interface{}, done <-chan struct{}) { + req, err := http.NewRequest("GET", servicesURL, nil) + if err != nil { + log.Errorf("Failed to create services request: %s", err) + return + } + values := req.URL.Query() + values.Add("watch", "true") + values.Add("resourceVersion", resourceVersion) + req.URL.RawQuery = values.Encode() + + res, err := kd.queryAPIServerReq(req) + if err != nil { + log.Errorf("Failed to watch services: %s", err) + return + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to watch services: %d", res.StatusCode) + return + } + + d := json.NewDecoder(res.Body) + + for { + var event serviceEvent + if err := d.Decode(&event); err != nil { + log.Errorf("Watch services unexpectedly closed: %s", err) + return + } + + select { + case events <- &event: + case <-done: + return + } + } +} + +// watchServiceEndpoints watches service endpoints as they come & go. +func (kd *Discovery) watchServiceEndpoints(resourceVersion string, events chan<- interface{}, done <-chan struct{}) { + req, err := http.NewRequest("GET", endpointsURL, nil) + if err != nil { + log.Errorf("Failed to create service endpoints request: %s", err) + return + } + values := req.URL.Query() + values.Add("watch", "true") + values.Add("resourceVersion", resourceVersion) + req.URL.RawQuery = values.Encode() + + res, err := kd.queryAPIServerReq(req) + if err != nil { + log.Errorf("Failed to watch service endpoints: %s", err) + return + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to watch service endpoints: %d", res.StatusCode) + return + } + + d := json.NewDecoder(res.Body) + + for { + var event endpointsEvent + if err := d.Decode(&event); err != nil { + log.Errorf("Watch service endpoints unexpectedly closed: %s", err) + return + } + + select { + case events <- &event: + case <-done: + } + } +} + func (kd *Discovery) updateService(service *Service, eventType EventType) *config.TargetGroup { kd.servicesMu.Lock() defer kd.servicesMu.Unlock() - var ( - name = service.ObjectMeta.Name - namespace = service.ObjectMeta.Namespace - _, exists = kd.services[namespace][name] - ) - switch eventType { case deleted: - if exists { - return kd.deleteService(service) - } + return kd.deleteService(service) case added, modified: return kd.addService(service) } @@ -553,48 +650,6 @@ func (kd *Discovery) updateServiceTargetGroup(service *Service, eps *Endpoints) return tg } -// watchServiceEndpoints watches service endpoints as they come & go. -func (kd *Discovery) watchServiceEndpoints(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) { - until(func() { - req, err := http.NewRequest("GET", endpointsURL, nil) - if err != nil { - log.Errorf("Failed to watch service endpoints: %s", err) - return - } - values := req.URL.Query() - values.Add("watch", "true") - values.Add("resourceVersion", kd.servicesResourceVersion) - req.URL.RawQuery = values.Encode() - - res, err := kd.queryAPIServerReq(req) - if err != nil { - log.Errorf("Failed to watch service endpoints: %s", err) - return - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Failed to watch service endpoints: %d", res.StatusCode) - return - } - - d := json.NewDecoder(res.Body) - - for { - var event endpointsEvent - if err := d.Decode(&event); err != nil { - log.Errorf("Unable to watch service endpoints: %s", err) - return - } - kd.servicesResourceVersion = event.Endpoints.ObjectMeta.ResourceVersion - - select { - case events <- &event: - case <-done: - } - } - }, retryInterval, done) -} - func (kd *Discovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType) *config.TargetGroup { kd.servicesMu.Lock() defer kd.servicesMu.Unlock()