mirror of
https://github.com/prometheus/prometheus
synced 2025-01-26 01:13:33 +00:00
Kubernetes SD: Refactor to handle missing Kubernetes events
This commit is contained in:
parent
308d447cd7
commit
9faa7515c6
@ -72,9 +72,6 @@ type Discovery struct {
|
||||
|
||||
apiServers []config.URL
|
||||
apiServersMu sync.RWMutex
|
||||
nodesResourceVersion string
|
||||
servicesResourceVersion string
|
||||
endpointsResourceVersion string
|
||||
nodes map[string]*Node
|
||||
services map[string]map[string]*Service
|
||||
nodesMu sync.RWMutex
|
||||
@ -92,8 +89,6 @@ func (kd *Discovery) Initialize() error {
|
||||
|
||||
kd.apiServers = kd.Conf.APIServers
|
||||
kd.client = client
|
||||
kd.nodes = map[string]*Node{}
|
||||
kd.services = map[string]map[string]*Service{}
|
||||
kd.runDone = make(chan struct{})
|
||||
|
||||
return nil
|
||||
@ -106,67 +101,42 @@ func (kd *Discovery) Sources() []string {
|
||||
sourceNames = append(sourceNames, apiServersTargetGroupName+":"+apiServer.Host)
|
||||
}
|
||||
|
||||
res, err := kd.queryAPIServerPath(nodesURL)
|
||||
nodes, _, err := kd.getNodes()
|
||||
if err != nil {
|
||||
// If we can't list nodes then we can't watch them. Assume this is a misconfiguration
|
||||
// & log & return empty.
|
||||
log.Errorf("Unable to list Kubernetes nodes: %s", err)
|
||||
return []string{}
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode != http.StatusOK {
|
||||
log.Errorf("Unable to list Kubernetes nodes. Unexpected response: %d %s", res.StatusCode, res.Status)
|
||||
log.Errorf("Unable to initialize Kubernetes nodes: %s", err)
|
||||
return []string{}
|
||||
}
|
||||
sourceNames = append(sourceNames, kd.nodeSources(nodes)...)
|
||||
|
||||
var nodes NodeList
|
||||
if err := json.NewDecoder(res.Body).Decode(&nodes); err != nil {
|
||||
body, _ := ioutil.ReadAll(res.Body)
|
||||
log.Errorf("Unable to list Kubernetes nodes. Unexpected response body: %s", string(body))
|
||||
return []string{}
|
||||
}
|
||||
|
||||
kd.nodesMu.Lock()
|
||||
defer kd.nodesMu.Unlock()
|
||||
|
||||
kd.nodesResourceVersion = nodes.ResourceVersion
|
||||
for idx, node := range nodes.Items {
|
||||
sourceNames = append(sourceNames, nodesTargetGroupName+":"+node.ObjectMeta.Name)
|
||||
kd.nodes[node.ObjectMeta.Name] = &nodes.Items[idx]
|
||||
}
|
||||
|
||||
res, err = kd.queryAPIServerPath(servicesURL)
|
||||
services, _, err := kd.getServices()
|
||||
if err != nil {
|
||||
// If we can't list services then we can't watch them. Assume this is a misconfiguration
|
||||
// & log & return empty.
|
||||
log.Errorf("Unable to list Kubernetes services: %s", err)
|
||||
log.Errorf("Unable to initialize Kubernetes services: %s", err)
|
||||
return []string{}
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode != http.StatusOK {
|
||||
log.Errorf("Unable to list Kubernetes services. Unexpected response: %d %s", res.StatusCode, res.Status)
|
||||
return []string{}
|
||||
}
|
||||
var services ServiceList
|
||||
if err := json.NewDecoder(res.Body).Decode(&services); err != nil {
|
||||
body, _ := ioutil.ReadAll(res.Body)
|
||||
log.Errorf("Unable to list Kubernetes services. Unexpected response body: %s", string(body))
|
||||
return []string{}
|
||||
}
|
||||
kd.servicesMu.Lock()
|
||||
defer kd.servicesMu.Unlock()
|
||||
sourceNames = append(sourceNames, kd.serviceSources(services)...)
|
||||
|
||||
kd.servicesResourceVersion = services.ResourceVersion
|
||||
for idx, service := range services.Items {
|
||||
sourceNames = append(sourceNames, serviceSource(&service))
|
||||
namespace, ok := kd.services[service.ObjectMeta.Namespace]
|
||||
if !ok {
|
||||
namespace = map[string]*Service{}
|
||||
kd.services[service.ObjectMeta.Namespace] = namespace
|
||||
}
|
||||
namespace[service.ObjectMeta.Name] = &services.Items[idx]
|
||||
return sourceNames
|
||||
}
|
||||
|
||||
func (kd *Discovery) nodeSources(nodes map[string]*Node) []string {
|
||||
var sourceNames []string
|
||||
for name := range nodes {
|
||||
sourceNames = append(sourceNames, nodesTargetGroupName+":"+name)
|
||||
}
|
||||
return sourceNames
|
||||
}
|
||||
|
||||
func (kd *Discovery) serviceSources(services map[string]map[string]*Service) []string {
|
||||
var sourceNames []string
|
||||
for _, ns := range services {
|
||||
for _, service := range ns {
|
||||
sourceNames = append(sourceNames, serviceSource(service))
|
||||
}
|
||||
}
|
||||
return sourceNames
|
||||
}
|
||||
|
||||
@ -182,37 +152,12 @@ func (kd *Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
|
||||
}
|
||||
}
|
||||
|
||||
if tg := kd.updateNodesTargetGroup(); tg != nil {
|
||||
select {
|
||||
case ch <- *tg:
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
for _, ns := range kd.services {
|
||||
for _, service := range ns {
|
||||
tg := kd.addService(service)
|
||||
|
||||
if tg == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case ch <- *tg:
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
retryInterval := time.Duration(kd.Conf.RetryInterval)
|
||||
|
||||
update := make(chan interface{}, 10)
|
||||
|
||||
go kd.watchNodes(update, done, retryInterval)
|
||||
go kd.watchServices(update, done, retryInterval)
|
||||
go kd.watchServiceEndpoints(update, done, retryInterval)
|
||||
go kd.startServiceWatch(update, done, retryInterval)
|
||||
|
||||
var tg *config.TargetGroup
|
||||
for {
|
||||
@ -308,8 +253,8 @@ func (kd *Discovery) updateAPIServersTargetGroup() *config.TargetGroup {
|
||||
}
|
||||
|
||||
func (kd *Discovery) updateNodesTargetGroup() *config.TargetGroup {
|
||||
kd.nodesMu.Lock()
|
||||
defer kd.nodesMu.Unlock()
|
||||
kd.nodesMu.RLock()
|
||||
defer kd.nodesMu.RUnlock()
|
||||
|
||||
tg := &config.TargetGroup{
|
||||
Source: nodesTargetGroupName,
|
||||
@ -350,17 +295,86 @@ func (kd *Discovery) updateNode(node *Node, eventType EventType) {
|
||||
}
|
||||
}
|
||||
|
||||
func (kd *Discovery) getNodes() (map[string]*Node, string, error) {
|
||||
res, err := kd.queryAPIServerPath(nodesURL)
|
||||
if err != nil {
|
||||
// If we can't list nodes then we can't watch them. Assume this is a misconfiguration
|
||||
// & return error.
|
||||
return nil, "", fmt.Errorf("Unable to list Kubernetes nodes: %s", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode != http.StatusOK {
|
||||
return nil, "", fmt.Errorf("Unable to list Kubernetes nodes. Unexpected response: %d %s", res.StatusCode, res.Status)
|
||||
}
|
||||
|
||||
var nodes NodeList
|
||||
if err := json.NewDecoder(res.Body).Decode(&nodes); err != nil {
|
||||
body, _ := ioutil.ReadAll(res.Body)
|
||||
return nil, "", fmt.Errorf("Unable to list Kubernetes nodes. Unexpected response body: %s", string(body))
|
||||
}
|
||||
|
||||
nodeMap := map[string]*Node{}
|
||||
for idx, node := range nodes.Items {
|
||||
nodeMap[node.ObjectMeta.Name] = &nodes.Items[idx]
|
||||
}
|
||||
|
||||
return nodeMap, nodes.ResourceVersion, nil
|
||||
}
|
||||
|
||||
func (kd *Discovery) getServices() (map[string]map[string]*Service, string, error) {
|
||||
res, err := kd.queryAPIServerPath(servicesURL)
|
||||
if err != nil {
|
||||
// If we can't list services then we can't watch them. Assume this is a misconfiguration
|
||||
// & return error.
|
||||
return nil, "", fmt.Errorf("Unable to list Kubernetes services: %s", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode != http.StatusOK {
|
||||
return nil, "", fmt.Errorf("Unable to list Kubernetes services. Unexpected response: %d %s", res.StatusCode, res.Status)
|
||||
}
|
||||
var services ServiceList
|
||||
if err := json.NewDecoder(res.Body).Decode(&services); err != nil {
|
||||
body, _ := ioutil.ReadAll(res.Body)
|
||||
return nil, "", fmt.Errorf("Unable to list Kubernetes services. Unexpected response body: %s", string(body))
|
||||
}
|
||||
|
||||
serviceMap := map[string]map[string]*Service{}
|
||||
for idx, service := range services.Items {
|
||||
namespace, ok := serviceMap[service.ObjectMeta.Namespace]
|
||||
if !ok {
|
||||
namespace = map[string]*Service{}
|
||||
serviceMap[service.ObjectMeta.Namespace] = namespace
|
||||
}
|
||||
namespace[service.ObjectMeta.Name] = &services.Items[idx]
|
||||
}
|
||||
|
||||
return serviceMap, services.ResourceVersion, nil
|
||||
}
|
||||
|
||||
// watchNodes watches nodes as they come & go.
|
||||
func (kd *Discovery) watchNodes(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
|
||||
until(func() {
|
||||
nodes, resourceVersion, err := kd.getNodes()
|
||||
if err != nil {
|
||||
log.Errorf("Cannot initialize nodes collection: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Reset the known nodes.
|
||||
kd.nodes = map[string]*Node{}
|
||||
|
||||
for _, node := range nodes {
|
||||
events <- &nodeEvent{added, node}
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("GET", nodesURL, nil)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to watch nodes: %s", err)
|
||||
log.Errorf("Cannot create nodes request: %s", err)
|
||||
return
|
||||
}
|
||||
values := req.URL.Query()
|
||||
values.Add("watch", "true")
|
||||
values.Add("resourceVersion", kd.nodesResourceVersion)
|
||||
values.Add("resourceVersion", resourceVersion)
|
||||
req.URL.RawQuery = values.Encode()
|
||||
res, err := kd.queryAPIServerReq(req)
|
||||
if err != nil {
|
||||
@ -378,10 +392,9 @@ func (kd *Discovery) watchNodes(events chan interface{}, done <-chan struct{}, r
|
||||
for {
|
||||
var event nodeEvent
|
||||
if err := d.Decode(&event); err != nil {
|
||||
log.Errorf("Failed to watch nodes: %s", err)
|
||||
log.Errorf("Watch nodes unexpectedly closed: %s", err)
|
||||
return
|
||||
}
|
||||
kd.nodesResourceVersion = event.Node.ObjectMeta.ResourceVersion
|
||||
|
||||
select {
|
||||
case events <- &event:
|
||||
@ -392,16 +405,70 @@ func (kd *Discovery) watchNodes(events chan interface{}, done <-chan struct{}, r
|
||||
}
|
||||
|
||||
// watchServices watches services as they come & go.
|
||||
func (kd *Discovery) watchServices(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
|
||||
func (kd *Discovery) startServiceWatch(events chan<- interface{}, done <-chan struct{}, retryInterval time.Duration) {
|
||||
until(func() {
|
||||
// We use separate target groups for each discovered service so we'll need to clean up any if they've been deleted
|
||||
// in Kubernetes while we couldn't connect - small chance of this, but worth dealing with.
|
||||
existingServices := kd.services
|
||||
|
||||
// Reset the known services.
|
||||
kd.services = map[string]map[string]*Service{}
|
||||
|
||||
services, resourceVersion, err := kd.getServices()
|
||||
if err != nil {
|
||||
log.Errorf("Cannot initialize services collection: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Now let's loop through the old services & see if they still exist in here
|
||||
for oldNSName, oldNS := range existingServices {
|
||||
if ns, ok := services[oldNSName]; !ok {
|
||||
for _, service := range existingServices[oldNSName] {
|
||||
events <- &serviceEvent{deleted, service}
|
||||
}
|
||||
} else {
|
||||
for oldServiceName, oldService := range oldNS {
|
||||
if _, ok := ns[oldServiceName]; !ok {
|
||||
events <- &serviceEvent{deleted, oldService}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Discard the existing services map for GC.
|
||||
existingServices = nil
|
||||
|
||||
for _, ns := range services {
|
||||
for _, service := range ns {
|
||||
events <- &serviceEvent{added, service}
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
|
||||
go func() {
|
||||
kd.watchServices(resourceVersion, events, done)
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
kd.watchServiceEndpoints(resourceVersion, events, done)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
}, retryInterval, done)
|
||||
}
|
||||
|
||||
func (kd *Discovery) watchServices(resourceVersion string, events chan<- interface{}, done <-chan struct{}) {
|
||||
req, err := http.NewRequest("GET", servicesURL, nil)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to watch services: %s", err)
|
||||
log.Errorf("Failed to create services request: %s", err)
|
||||
return
|
||||
}
|
||||
values := req.URL.Query()
|
||||
values.Add("watch", "true")
|
||||
values.Add("resourceVersion", kd.servicesResourceVersion)
|
||||
values.Add("resourceVersion", resourceVersion)
|
||||
req.URL.RawQuery = values.Encode()
|
||||
|
||||
res, err := kd.queryAPIServerReq(req)
|
||||
@ -420,34 +487,64 @@ func (kd *Discovery) watchServices(events chan interface{}, done <-chan struct{}
|
||||
for {
|
||||
var event serviceEvent
|
||||
if err := d.Decode(&event); err != nil {
|
||||
log.Errorf("Unable to watch services: %s", err)
|
||||
log.Errorf("Watch services unexpectedly closed: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case events <- &event:
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// watchServiceEndpoints watches service endpoints as they come & go.
|
||||
func (kd *Discovery) watchServiceEndpoints(resourceVersion string, events chan<- interface{}, done <-chan struct{}) {
|
||||
req, err := http.NewRequest("GET", endpointsURL, nil)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to create service endpoints request: %s", err)
|
||||
return
|
||||
}
|
||||
values := req.URL.Query()
|
||||
values.Add("watch", "true")
|
||||
values.Add("resourceVersion", resourceVersion)
|
||||
req.URL.RawQuery = values.Encode()
|
||||
|
||||
res, err := kd.queryAPIServerReq(req)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to watch service endpoints: %s", err)
|
||||
return
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode != http.StatusOK {
|
||||
log.Errorf("Failed to watch service endpoints: %d", res.StatusCode)
|
||||
return
|
||||
}
|
||||
|
||||
d := json.NewDecoder(res.Body)
|
||||
|
||||
for {
|
||||
var event endpointsEvent
|
||||
if err := d.Decode(&event); err != nil {
|
||||
log.Errorf("Watch service endpoints unexpectedly closed: %s", err)
|
||||
return
|
||||
}
|
||||
kd.servicesResourceVersion = event.Service.ObjectMeta.ResourceVersion
|
||||
|
||||
select {
|
||||
case events <- &event:
|
||||
case <-done:
|
||||
}
|
||||
}
|
||||
}, retryInterval, done)
|
||||
}
|
||||
|
||||
func (kd *Discovery) updateService(service *Service, eventType EventType) *config.TargetGroup {
|
||||
kd.servicesMu.Lock()
|
||||
defer kd.servicesMu.Unlock()
|
||||
|
||||
var (
|
||||
name = service.ObjectMeta.Name
|
||||
namespace = service.ObjectMeta.Namespace
|
||||
_, exists = kd.services[namespace][name]
|
||||
)
|
||||
|
||||
switch eventType {
|
||||
case deleted:
|
||||
if exists {
|
||||
return kd.deleteService(service)
|
||||
}
|
||||
case added, modified:
|
||||
return kd.addService(service)
|
||||
}
|
||||
@ -553,48 +650,6 @@ func (kd *Discovery) updateServiceTargetGroup(service *Service, eps *Endpoints)
|
||||
return tg
|
||||
}
|
||||
|
||||
// watchServiceEndpoints watches service endpoints as they come & go.
|
||||
func (kd *Discovery) watchServiceEndpoints(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
|
||||
until(func() {
|
||||
req, err := http.NewRequest("GET", endpointsURL, nil)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to watch service endpoints: %s", err)
|
||||
return
|
||||
}
|
||||
values := req.URL.Query()
|
||||
values.Add("watch", "true")
|
||||
values.Add("resourceVersion", kd.servicesResourceVersion)
|
||||
req.URL.RawQuery = values.Encode()
|
||||
|
||||
res, err := kd.queryAPIServerReq(req)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to watch service endpoints: %s", err)
|
||||
return
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode != http.StatusOK {
|
||||
log.Errorf("Failed to watch service endpoints: %d", res.StatusCode)
|
||||
return
|
||||
}
|
||||
|
||||
d := json.NewDecoder(res.Body)
|
||||
|
||||
for {
|
||||
var event endpointsEvent
|
||||
if err := d.Decode(&event); err != nil {
|
||||
log.Errorf("Unable to watch service endpoints: %s", err)
|
||||
return
|
||||
}
|
||||
kd.servicesResourceVersion = event.Endpoints.ObjectMeta.ResourceVersion
|
||||
|
||||
select {
|
||||
case events <- &event:
|
||||
case <-done:
|
||||
}
|
||||
}
|
||||
}, retryInterval, done)
|
||||
}
|
||||
|
||||
func (kd *Discovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType) *config.TargetGroup {
|
||||
kd.servicesMu.Lock()
|
||||
defer kd.servicesMu.Unlock()
|
||||
|
Loading…
Reference in New Issue
Block a user