From ae413704e837d10cb18dc93348509552028430c3 Mon Sep 17 00:00:00 2001 From: Patrick Bogen Date: Wed, 18 May 2016 17:18:52 -0700 Subject: [PATCH 1/3] kubernetes pod-level discovery --- .../examples/prometheus-kubernetes.yml | 32 ++ retrieval/discovery/kubernetes/discovery.go | 331 ++++++++++++++++-- .../discovery/kubernetes/discovery_test.go | 175 +++++++++ retrieval/discovery/kubernetes/types.go | 40 +++ 4 files changed, 554 insertions(+), 24 deletions(-) create mode 100644 retrieval/discovery/kubernetes/discovery_test.go diff --git a/documentation/examples/prometheus-kubernetes.yml b/documentation/examples/prometheus-kubernetes.yml index 8b45456a5..c5c73c894 100644 --- a/documentation/examples/prometheus-kubernetes.yml +++ b/documentation/examples/prometheus-kubernetes.yml @@ -114,3 +114,35 @@ scrape_configs: target_label: kubernetes_namespace - source_labels: [__meta_kubernetes_service_name] target_label: kubernetes_name + +# Example scrape config for pods +# +# The relabeling allows the actual pod scrape endpoint to be configured via the +# following annotations: +# +# * `prometheus.io/scrape`: Only scrape pods that have a value of `true` +# * `prometheus.io/port`: Scrape the pod on the indicated port instead of the default of `9102`. +- job_name: 'kubernetes-pods' + + kubernetes_sd_configs: + - api_servers: + - 'https://kubernetes.default.svc' + in_cluster: true + + relabel_configs: + - source_labels: [__meta_kubernetes_role, __meta_kubernetes_pod_annotation_prometheus_io_scrape] + action: keep + regex: pod;true + - source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port] + action: replace + regex: (.+):(?:\d+);(\d+) + replacement: ${1]:${2} + target_label: __address__ + - action: labelmap + regex: __meta_kubernetes_pod_label_(.+) + - source_labels: [__meta_kubernetes_pod_namespace] + action: replace + target_label: kubernetes_namespace + - source_labels: [__meta_kubernetes_pod_name] + action: replace + target_label: kubernetes_pod_name diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index fd7974f8f..c8249917e 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -14,12 +14,16 @@ package kubernetes import ( + "bytes" "encoding/json" "fmt" "io/ioutil" "net" "net/http" "os" + "sort" + "strconv" + "strings" "sync" "time" @@ -33,27 +37,52 @@ import ( ) const ( - sourceServicePrefix = "services" - // kubernetesMetaLabelPrefix is the meta prefix used for all meta labels. // in this discovery. metaLabelPrefix = model.MetaLabelPrefix + "kubernetes_" + + // roleLabel is the name for the label containing a target's role. + roleLabel = metaLabelPrefix + "role" + + sourcePodPrefix = "pods" + // podsTargetGroupNAme is the name given to the target group for pods + podsTargetGroupName = "pods" + // podNamespaceLabel is the name for the label containing a target pod's namespace + podNamespaceLabel = metaLabelPrefix + "pod_namespace" + // podNameLabel is the name for the label containing a target pod's name + podNameLabel = metaLabelPrefix + "pod_name" + // podAddressLabel is the name for the label containing a target pod's IP address (the PodIP) + podAddressLabel = metaLabelPrefix + "pod_address" + // podContainerNameLabel is the name for the label containing a target's container name + podContainerNameLabel = metaLabelPrefix + "pod_container_name" + // podContainerPortNameLabel is the name for the label containing the name of the port selected for a target + podContainerPortNameLabel = metaLabelPrefix + "pod_container_port_name" + // PodContainerPortListLabel is the name for the label containing a list of all TCP ports on the target container + podContainerPortListLabel = metaLabelPrefix + "pod_container_port_list" + // podReadyLabel is the name for the label containing the 'Ready' status (true/false/unknown) for a target + podReadyLabel = metaLabelPrefix + "pod_ready" + // podLabelPrefix is the prefix for prom label names corresponding to k8s labels for a target pod + podLabelPrefix = metaLabelPrefix + "pod_label_" + // podAnnotationPrefix is the prefix for prom label names corresponding to k8s annotations for a target pod + podAnnotationPrefix = metaLabelPrefix + "pod_annotation_" + + sourceServicePrefix = "services" // serviceNamespaceLabel is the name for the label containing a target's service namespace. serviceNamespaceLabel = metaLabelPrefix + "service_namespace" // serviceNameLabel is the name for the label containing a target's service name. serviceNameLabel = metaLabelPrefix + "service_name" - // nodeLabelPrefix is the prefix for the node labels. - nodeLabelPrefix = metaLabelPrefix + "node_label_" // serviceLabelPrefix is the prefix for the service labels. serviceLabelPrefix = metaLabelPrefix + "service_label_" // serviceAnnotationPrefix is the prefix for the service annotations. serviceAnnotationPrefix = metaLabelPrefix + "service_annotation_" + // nodesTargetGroupName is the name given to the target group for nodes. nodesTargetGroupName = "nodes" + // nodeLabelPrefix is the prefix for the node labels. + nodeLabelPrefix = metaLabelPrefix + "node_label_" + // apiServersTargetGroupName is the name given to the target group for API servers. apiServersTargetGroupName = "apiServers" - // roleLabel is the name for the label containing a target's role. - roleLabel = metaLabelPrefix + "role" serviceAccountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token" serviceAccountCACert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" @@ -61,6 +90,7 @@ const ( apiVersion = "v1" apiPrefix = "/api/" + apiVersion nodesURL = apiPrefix + "/nodes" + podsURL = apiPrefix + "/pods" servicesURL = apiPrefix + "/services" endpointsURL = apiPrefix + "/endpoints" serviceEndpointsURL = apiPrefix + "/namespaces/%s/endpoints/%s" @@ -75,9 +105,12 @@ type Discovery struct { apiServersMu sync.RWMutex nodes map[string]*Node services map[string]map[string]*Service - nodesMu sync.RWMutex - servicesMu sync.RWMutex - runDone chan struct{} + // map of namespace to (map of pod name to pod) + pods map[string]map[string]*Pod + nodesMu sync.RWMutex + servicesMu sync.RWMutex + podsMu sync.RWMutex + runDone chan struct{} } // Initialize sets up the discovery for usage. @@ -97,6 +130,7 @@ func (kd *Discovery) Initialize() error { // Run implements the TargetProvider interface. func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + log.Debugf("Kubernetes Discovery.Run beginning") defer close(ch) // Send an initial full view. @@ -106,6 +140,12 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { all = append(all, kd.updateAPIServersTargetGroup()) all = append(all, kd.updateNodesTargetGroup()) + all = append(all, kd.updatePodsTargetGroup()) + for _, ns := range kd.pods { + for _, pod := range ns { + all = append(all, kd.updatePodTargetGroup(pod)) + } + } select { case ch <- all: @@ -119,21 +159,32 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { go kd.watchNodes(update, ctx.Done(), retryInterval) go kd.startServiceWatch(update, ctx.Done(), retryInterval) + go kd.watchPods(update, ctx.Done(), retryInterval) - var tg *config.TargetGroup for { + tg := []*config.TargetGroup{} select { case <-ctx.Done(): return case event := <-update: switch obj := event.(type) { case *nodeEvent: + log.Debugf("k8s discovery received node event (EventType=%s, Node Name=%s)", obj.EventType, obj.Node.ObjectMeta.Name) kd.updateNode(obj.Node, obj.EventType) - tg = kd.updateNodesTargetGroup() + tg = append(tg, kd.updateNodesTargetGroup()) case *serviceEvent: - tg = kd.updateService(obj.Service, obj.EventType) + log.Debugf("k8s discovery received service event (EventType=%s, Service Name=%s)", obj.EventType, obj.Service.ObjectMeta.Name) + tg = append(tg, kd.updateService(obj.Service, obj.EventType)) case *endpointsEvent: - tg = kd.updateServiceEndpoints(obj.Endpoints, obj.EventType) + log.Debugf("k8s discovery received endpoint event (EventType=%s, Endpoint Name=%s)", obj.EventType, obj.Endpoints.ObjectMeta.Name) + tg = append(tg, kd.updateServiceEndpoints(obj.Endpoints, obj.EventType)) + case *podEvent: + log.Debugf("k8s discovery received pod event (EventType=%s, Pod Name=%s)", obj.EventType, obj.Pod.ObjectMeta.Name) + // Update the per-pod target group + kd.updatePod(obj.Pod, obj.EventType) + tg = append(tg, kd.updatePodTargetGroup(obj.Pod)) + // ...and update the all pods target group + tg = append(tg, kd.updatePodsTargetGroup()) } } @@ -141,10 +192,12 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { continue } - select { - case ch <- []*config.TargetGroup{tg}: - case <-ctx.Done(): - return + for _, t := range tg { + select { + case ch <- []*config.TargetGroup{t}: + case <-ctx.Done(): + return + } } } } @@ -173,7 +226,7 @@ func (kd *Discovery) queryAPIServerReq(req *http.Request) (*http.Response, error lastErr = err kd.rotateAPIServers() } - return nil, fmt.Errorf("Unable to query any API servers: %v", lastErr) + return nil, fmt.Errorf("unable to query any API servers: %v", lastErr) } func (kd *Discovery) rotateAPIServers() { @@ -267,17 +320,17 @@ func (kd *Discovery) getNodes() (map[string]*Node, string, error) { if err != nil { // If we can't list nodes then we can't watch them. Assume this is a misconfiguration // & return error. - return nil, "", fmt.Errorf("Unable to list Kubernetes nodes: %s", err) + return nil, "", fmt.Errorf("unable to list Kubernetes nodes: %s", err) } defer res.Body.Close() if res.StatusCode != http.StatusOK { - return nil, "", fmt.Errorf("Unable to list Kubernetes nodes. Unexpected response: %d %s", res.StatusCode, res.Status) + return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response: %d %s", res.StatusCode, res.Status) } var nodes NodeList if err := json.NewDecoder(res.Body).Decode(&nodes); err != nil { body, _ := ioutil.ReadAll(res.Body) - return nil, "", fmt.Errorf("Unable to list Kubernetes nodes. Unexpected response body: %s", string(body)) + return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response body: %s", string(body)) } nodeMap := map[string]*Node{} @@ -293,16 +346,16 @@ func (kd *Discovery) getServices() (map[string]map[string]*Service, string, erro if err != nil { // If we can't list services then we can't watch them. Assume this is a misconfiguration // & return error. - return nil, "", fmt.Errorf("Unable to list Kubernetes services: %s", err) + return nil, "", fmt.Errorf("unable to list Kubernetes services: %s", err) } defer res.Body.Close() if res.StatusCode != http.StatusOK { - return nil, "", fmt.Errorf("Unable to list Kubernetes services. Unexpected response: %d %s", res.StatusCode, res.Status) + return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response: %d %s", res.StatusCode, res.Status) } var services ServiceList if err := json.NewDecoder(res.Body).Decode(&services); err != nil { body, _ := ioutil.ReadAll(res.Body) - return nil, "", fmt.Errorf("Unable to list Kubernetes services. Unexpected response body: %s", string(body)) + return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response body: %s", string(body)) } serviceMap := map[string]map[string]*Service{} @@ -735,3 +788,233 @@ func nodeHostIP(node *Node) (net.IP, error) { } return nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses) } + +//////////////////////////////////// +// Here there be dragons. // +// Pod discovery code lies below. // +//////////////////////////////////// + +func (kd *Discovery) updatePod(pod *Pod, eventType EventType) { + kd.podsMu.Lock() + defer kd.podsMu.Unlock() + + switch eventType { + case deleted: + if _, ok := kd.pods[pod.ObjectMeta.Namespace]; ok { + delete(kd.pods[pod.ObjectMeta.Namespace], pod.ObjectMeta.Name) + if len(kd.pods[pod.ObjectMeta.Namespace]) == 0 { + delete(kd.pods, pod.ObjectMeta.Namespace) + } + } + case added, modified: + if _, ok := kd.pods[pod.ObjectMeta.Namespace]; !ok { + kd.pods[pod.ObjectMeta.Namespace] = map[string]*Pod{} + } + kd.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = pod + } +} + +func (kd *Discovery) getPods() (map[string]map[string]*Pod, string, error) { + res, err := kd.queryAPIServerPath(podsURL) + if err != nil { + return nil, "", fmt.Errorf("unable to list Kubernetes pods: %s", err) + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response: %d %s", res.StatusCode, res.Status) + } + + var pods PodList + if err := json.NewDecoder(res.Body).Decode(&pods); err != nil { + body, _ := ioutil.ReadAll(res.Body) + return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response body: %s", string(body)) + } + + podMap := map[string]map[string]*Pod{} + for idx, pod := range pods.Items { + if _, ok := podMap[pod.ObjectMeta.Namespace]; !ok { + podMap[pod.ObjectMeta.Namespace] = map[string]*Pod{} + } + log.Debugf("Got pod %s in namespace %s", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace) + podMap[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = &pods.Items[idx] + } + + return podMap, pods.ResourceVersion, nil +} + +func (kd *Discovery) watchPods(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) { + until(func() { + pods, resourceVersion, err := kd.getPods() + if err != nil { + log.Errorf("Cannot initialize pods collection: %s", err) + return + } + kd.podsMu.Lock() + kd.pods = pods + kd.podsMu.Unlock() + + req, err := http.NewRequest("GET", podsURL, nil) + if err != nil { + log.Errorf("Cannot create pods request: %s", err) + return + } + + values := req.URL.Query() + values.Add("watch", "true") + values.Add("resourceVersion", resourceVersion) + req.URL.RawQuery = values.Encode() + res, err := kd.queryAPIServerReq(req) + if err != nil { + log.Errorf("Failed to watch pods: %s", err) + return + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to watch pods: %d", res.StatusCode) + return + } + + d := json.NewDecoder(res.Body) + + for { + var event podEvent + if err := d.Decode(&event); err != nil { + log.Errorf("Watch pods unexpectedly closed: %s", err) + return + } + + select { + case events <- &event: + case <-done: + } + } + }, retryInterval, done) +} + +func podSource(pod *Pod) string { + return sourcePodPrefix + ":" + pod.ObjectMeta.Namespace + ":" + pod.ObjectMeta.Name +} + +type ByContainerPort []ContainerPort + +func (a ByContainerPort) Len() int { return len(a) } +func (a ByContainerPort) Less(i, j int) bool { return a[i].ContainerPort < a[j].ContainerPort } +func (a ByContainerPort) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet { + var targets []model.LabelSet = make([]model.LabelSet, 0, len(pod.PodSpec.Containers)) + if pod.PodStatus.PodIP == "" { + log.Debugf("skipping pod %s -- PodStatus.PodIP is empty", pod.ObjectMeta.Name) + return targets + } + + if pod.PodStatus.Phase != "Running" { + log.Debugf("skipping pod %s -- status is not `Running`", pod.ObjectMeta.Name) + return targets + } + + ready := "unknown" + for _, cond := range pod.PodStatus.Conditions { + if strings.ToLower(cond.Type) == "ready" { + ready = strings.ToLower(cond.Status) + } + } + + for _, container := range pod.PodSpec.Containers { + // Collect a list of TCP ports + // Sort by port number, ascending + // Product a target pointed at the first port + // Include a label containing all ports (portName=port,PortName=port,...,) + var tcpPorts []ContainerPort + var portLabel bytes.Buffer + + for _, port := range container.Ports { + if port.Protocol == "TCP" { + tcpPorts = append(tcpPorts, port) + } + } + + if len(tcpPorts) == 0 { + log.Debugf("skipping container %s with no TCP ports", container.Name) + continue + } + + sort.Sort(ByContainerPort(tcpPorts)) + + for _, port := range tcpPorts { + portLabel.WriteString(port.Name) + portLabel.WriteString("=") + portLabel.WriteString(strconv.FormatInt(int64(port.ContainerPort), 10)) + portLabel.WriteString(",") + } + + t := model.LabelSet{ + model.AddressLabel: model.LabelValue(net.JoinHostPort(pod.PodIP, strconv.FormatInt(int64(tcpPorts[0].ContainerPort), 10))), + podNameLabel: model.LabelValue(pod.ObjectMeta.Name), + podAddressLabel: model.LabelValue(pod.PodStatus.PodIP), + podNamespaceLabel: model.LabelValue(pod.ObjectMeta.Namespace), + podContainerNameLabel: model.LabelValue(container.Name), + podContainerPortNameLabel: model.LabelValue(tcpPorts[0].Name), + podContainerPortListLabel: model.LabelValue(portLabel.String()), + podReadyLabel: model.LabelValue(ready), + } + + for k, v := range pod.ObjectMeta.Labels { + labelName := strutil.SanitizeLabelName(podLabelPrefix + k) + t[model.LabelName(labelName)] = model.LabelValue(v) + } + + for k, v := range pod.ObjectMeta.Annotations { + labelName := strutil.SanitizeLabelName(podAnnotationPrefix + k) + t[model.LabelName(labelName)] = model.LabelValue(v) + } + + targets = append(targets, t) + + if !allContainers { + break + } + } + return targets +} + +func (kd *Discovery) updatePodTargetGroup(pod *Pod) *config.TargetGroup { + kd.podsMu.RLock() + defer kd.podsMu.RUnlock() + + tg := &config.TargetGroup{ + Source: podSource(pod), + } + + // If this pod doesn't exist, return an empty target group + if _, ok := kd.pods[pod.ObjectMeta.Namespace]; !ok { + return tg + } + if _, ok := kd.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name]; !ok { + return tg + } + + tg.Labels = model.LabelSet{ + roleLabel: model.LabelValue("container"), + } + tg.Targets = updatePodTargets(pod, true) + + return tg +} + +func (kd *Discovery) updatePodsTargetGroup() *config.TargetGroup { + tg := &config.TargetGroup{ + Source: podsTargetGroupName, + Labels: model.LabelSet{ + roleLabel: model.LabelValue("pod"), + }, + } + + for _, namespace := range kd.pods { + for _, pod := range namespace { + tg.Targets = append(tg.Targets, updatePodTargets(pod, false)...) + } + } + + return tg +} diff --git a/retrieval/discovery/kubernetes/discovery_test.go b/retrieval/discovery/kubernetes/discovery_test.go new file mode 100644 index 000000000..c7460b1fd --- /dev/null +++ b/retrieval/discovery/kubernetes/discovery_test.go @@ -0,0 +1,175 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "flag" + "os" + "testing" + + _ "github.com/prometheus/common/log" + "github.com/prometheus/common/model" +) + +func TestMain(m *testing.M) { + flag.Parse() + os.Exit(m.Run()) +} + +var containerA = Container{ + Name: "a", + Ports: []ContainerPort{ + ContainerPort{ + Name: "http", + ContainerPort: 80, + Protocol: "TCP", + }, + }, +} + +var containerB = Container{ + Name: "b", + Ports: []ContainerPort{ + ContainerPort{ + Name: "http", + ContainerPort: 80, + Protocol: "TCP", + }, + }, +} + +var containerNoTcp = Container{ + Name: "no-tcp", + Ports: []ContainerPort{ + ContainerPort{ + Name: "dns", + ContainerPort: 53, + Protocol: "UDP", + }, + }, +} + +var containerMultiA = Container{ + Name: "a", + Ports: []ContainerPort{ + ContainerPort{ + Name: "http", + ContainerPort: 80, + Protocol: "TCP", + }, + ContainerPort{ + Name: "ssh", + ContainerPort: 22, + Protocol: "TCP", + }, + }, +} + +var containerMultiB = Container{ + Name: "b", + Ports: []ContainerPort{ + ContainerPort{ + Name: "http", + ContainerPort: 80, + Protocol: "TCP", + }, + ContainerPort{ + Name: "https", + ContainerPort: 443, + Protocol: "TCP", + }, + }, +} + +func pod(name string, containers []Container) *Pod { + return &Pod{ + ObjectMeta: ObjectMeta{ + Name: name, + }, + PodStatus: PodStatus{ + PodIP: "1.1.1.1", + Phase: "Running", + Conditions: []PodCondition{ + PodCondition{ + Type: "Ready", + Status: "True", + }, + }, + }, + PodSpec: PodSpec{ + Containers: containers, + }, + } +} + +func TestUpdatePodTargets(t *testing.T) { + var result []model.LabelSet + + // Return no targets for a pod that isn't "Running" + result = updatePodTargets(&Pod{PodStatus: PodStatus{PodIP: "1.1.1.1"}}, true) + if len(result) > 0 { + t.Fatalf("expected 0 targets, received %d", len(result)) + } + + // Return no targets for a pod with no IP + result = updatePodTargets(&Pod{PodStatus: PodStatus{Phase: "Running"}}, true) + if len(result) > 0 { + t.Fatalf("expected 0 targets, received %d", len(result)) + } + + // A pod with no containers (?!) should not produce any targets + result = updatePodTargets(pod("empty", []Container{}), true) + if len(result) > 0 { + t.Fatalf("expected 0 targets, received %d", len(result)) + } + + // A pod with all valid containers should return one target per container with allContainers=true + result = updatePodTargets(pod("easy", []Container{containerA, containerB}), true) + if len(result) != 2 { + t.Fatalf("expected 2 targets, received %d", len(result)) + } + if result[0][podReadyLabel] != "true" { + t.Fatalf("expected result[0] podReadyLabel 'true', received '%s'", result[0][podReadyLabel]) + } + + // A pod with all valid containers should return one target with allContainers=false + result = updatePodTargets(pod("easy", []Container{containerA, containerB}), false) + if len(result) != 1 { + t.Fatalf("expected 1 targets, received %d", len(result)) + } + + // A pod with some non-targetable containers should return one target per targetable container with allContainers=true + result = updatePodTargets(pod("mixed", []Container{containerA, containerNoTcp, containerB}), true) + if len(result) != 2 { + t.Fatalf("expected 2 targets, received %d", len(result)) + } + + // A pod with a container with multiple ports should return the numerically smallest port + result = updatePodTargets(pod("hard", []Container{containerMultiA, containerMultiB}), true) + if len(result) != 2 { + t.Fatalf("expected 2 targets, received %d", len(result)) + } + if result[0][model.AddressLabel] != "1.1.1.1:22" { + t.Fatalf("expected result[0] address to be 1.1.1.1:22, received %s", result[0][model.AddressLabel]) + } + if result[0][podContainerPortListLabel] != "ssh=22,http=80," { + t.Fatalf("expected result[0] podContainerPortListLabel to be 'ssh=22,http=80,', received '%s'", result[0][podContainerPortListLabel]) + } + if result[1][model.AddressLabel] != "1.1.1.1:80" { + t.Fatalf("expected result[1] address to be 1.1.1.1:80, received %s", result[1][model.AddressLabel]) + } + if result[1][podContainerPortListLabel] != "http=80,https=443," { + t.Fatalf("expected result[1] podContainerPortListLabel to be 'http=80,https=443,', received '%s'", result[1][podContainerPortListLabel]) + } +} diff --git a/retrieval/discovery/kubernetes/types.go b/retrieval/discovery/kubernetes/types.go index 52a75887e..29c33d9c9 100644 --- a/retrieval/discovery/kubernetes/types.go +++ b/retrieval/discovery/kubernetes/types.go @@ -100,6 +100,14 @@ type Container struct { Name string `json:"name" description:"name of the container; must be a DNS_LABEL and unique within the pod; cannot be updated"` // Optional. Image string `json:"image,omitempty" description:"Docker image name; see http://releases.k8s.io/HEAD/docs/user-guide/images.md"` + + Ports []ContainerPort `json:"ports"` +} + +type ContainerPort struct { + Name string `json:"name"` + ContainerPort int32 `json:"containerPort"` + Protocol string `json:"protocol"` } // Service is a named abstraction of software service (for example, mysql) consisting of local port @@ -231,3 +239,35 @@ type NodeList struct { Items []Node `json:"items" description:"list of nodes"` } + +type Pod struct { + ObjectMeta `json:"metadata,omitempty" description:"standard object metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"` + PodStatus `json:"status,omitempty" description:"pod status object; see http://kubernetes.io/v1.1/docs/api-reference/v1/definitions.html#_v1_podstatus"` + PodSpec `json:"spec,omitempty" description:"pod spec object; see http://kubernetes.io/v1.1/docs/api-reference/v1/definitions.html#_v1_podspec"` +} + +type podEvent struct { + EventType EventType `json:"type"` + Pod *Pod `json:"object"` +} + +type PodList struct { + ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"` + + Items []Pod `json:"items" description:"list of pods"` +} + +type PodStatus struct { + Phase string `json:"phase" description:"Current condition of the pod. More info: http://kubernetes.io/v1.1/docs/user-guide/pod-states.html#pod-phase"` + PodIP string `json:"podIP" description:"IP address allocated to the pod. Routable at least within the cluster. Empty if not yet allocated."` + Conditions []PodCondition `json:"conditions" description:"Current service state of pod."` +} + +type PodSpec struct { + Containers []Container `json:"containers" description:"list of containers, see http://kubernetes.io/v1.1/docs/api-reference/v1/definitions.html#_v1_container"` +} + +type PodCondition struct { + Type string `json:"type" description:"Type is the type of the condition. Currently only Ready."` + Status string `json:"status" description:"Status is the status of the condition. Can be True, False, Unknown."` +} From b3350d872a770b06a1d6c48d335765da9c212ebe Mon Sep 17 00:00:00 2001 From: Patrick Bogen Date: Thu, 19 May 2016 10:37:11 -0700 Subject: [PATCH 2/3] Add one label named for each port name, mapping it to port number; add corresponding tests; prefix port list label with a comma --- retrieval/discovery/kubernetes/discovery.go | 23 +++++++++++------- .../discovery/kubernetes/discovery_test.go | 24 ++++++++++++++----- 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index c8249917e..feba25156 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -59,6 +59,9 @@ const ( podContainerPortNameLabel = metaLabelPrefix + "pod_container_port_name" // PodContainerPortListLabel is the name for the label containing a list of all TCP ports on the target container podContainerPortListLabel = metaLabelPrefix + "pod_container_port_list" + // PodContainerPortMapPrefix is the prefix used to create the names of labels that associate container port names to port values + // Such labels will be named (podContainerPortMapPrefix)_(PortName) = (ContainerPort) + podContainerPortMapPrefix = metaLabelPrefix + "pod_container_port_map_" // podReadyLabel is the name for the label containing the 'Ready' status (true/false/unknown) for a target podReadyLabel = metaLabelPrefix + "pod_ready" // podLabelPrefix is the prefix for prom label names corresponding to k8s labels for a target pod @@ -926,7 +929,7 @@ func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet { // Product a target pointed at the first port // Include a label containing all ports (portName=port,PortName=port,...,) var tcpPorts []ContainerPort - var portLabel bytes.Buffer + var portLabel *bytes.Buffer = bytes.NewBufferString(",") for _, port := range container.Ports { if port.Protocol == "TCP" { @@ -941,13 +944,6 @@ func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet { sort.Sort(ByContainerPort(tcpPorts)) - for _, port := range tcpPorts { - portLabel.WriteString(port.Name) - portLabel.WriteString("=") - portLabel.WriteString(strconv.FormatInt(int64(port.ContainerPort), 10)) - portLabel.WriteString(",") - } - t := model.LabelSet{ model.AddressLabel: model.LabelValue(net.JoinHostPort(pod.PodIP, strconv.FormatInt(int64(tcpPorts[0].ContainerPort), 10))), podNameLabel: model.LabelValue(pod.ObjectMeta.Name), @@ -955,10 +951,19 @@ func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet { podNamespaceLabel: model.LabelValue(pod.ObjectMeta.Namespace), podContainerNameLabel: model.LabelValue(container.Name), podContainerPortNameLabel: model.LabelValue(tcpPorts[0].Name), - podContainerPortListLabel: model.LabelValue(portLabel.String()), podReadyLabel: model.LabelValue(ready), } + for _, port := range tcpPorts { + portLabel.WriteString(port.Name) + portLabel.WriteString("=") + portLabel.WriteString(strconv.FormatInt(int64(port.ContainerPort), 10)) + portLabel.WriteString(",") + t[model.LabelName(podContainerPortMapPrefix + port.Name)] = model.LabelValue(strconv.FormatInt(int64(port.ContainerPort), 10)) + } + + t[model.LabelName(podContainerPortListLabel)] = model.LabelValue(portLabel.String()) + for k, v := range pod.ObjectMeta.Labels { labelName := strutil.SanitizeLabelName(podLabelPrefix + k) t[model.LabelName(labelName)] = model.LabelValue(v) diff --git a/retrieval/discovery/kubernetes/discovery_test.go b/retrieval/discovery/kubernetes/discovery_test.go index c7460b1fd..bcce56f94 100644 --- a/retrieval/discovery/kubernetes/discovery_test.go +++ b/retrieval/discovery/kubernetes/discovery_test.go @@ -42,8 +42,8 @@ var containerB = Container{ Name: "b", Ports: []ContainerPort{ ContainerPort{ - Name: "http", - ContainerPort: 80, + Name: "https", + ContainerPort: 443, Protocol: "TCP", }, }, @@ -142,6 +142,18 @@ func TestUpdatePodTargets(t *testing.T) { if result[0][podReadyLabel] != "true" { t.Fatalf("expected result[0] podReadyLabel 'true', received '%s'", result[0][podReadyLabel]) } + if _, ok := result[0][podContainerPortMapPrefix + "http"]; !ok { + t.Fatalf("expected result[0][podContainerPortMapPrefix + 'http'] to be '80', but was missing") + } + if result[0][podContainerPortMapPrefix + "http"] != "80" { + t.Fatalf("expected result[0][podContainerPortMapPrefix + 'http'] to be '80', but was %s", result[0][podContainerPortMapPrefix + "http"]) + } + if _, ok := result[1][podContainerPortMapPrefix + "https"]; !ok { + t.Fatalf("expected result[1][podContainerPortMapPrefix + 'https'] to be '443', but was missing") + } + if result[1][podContainerPortMapPrefix + "https"] != "443" { + t.Fatalf("expected result[1][podContainerPortMapPrefix + 'https'] to be '443', but was %s", result[1][podContainerPortMapPrefix + "https"]) + } // A pod with all valid containers should return one target with allContainers=false result = updatePodTargets(pod("easy", []Container{containerA, containerB}), false) @@ -163,13 +175,13 @@ func TestUpdatePodTargets(t *testing.T) { if result[0][model.AddressLabel] != "1.1.1.1:22" { t.Fatalf("expected result[0] address to be 1.1.1.1:22, received %s", result[0][model.AddressLabel]) } - if result[0][podContainerPortListLabel] != "ssh=22,http=80," { - t.Fatalf("expected result[0] podContainerPortListLabel to be 'ssh=22,http=80,', received '%s'", result[0][podContainerPortListLabel]) + if result[0][podContainerPortListLabel] != ",ssh=22,http=80," { + t.Fatalf("expected result[0] podContainerPortListLabel to be ',ssh=22,http=80,', received '%s'", result[0][podContainerPortListLabel]) } if result[1][model.AddressLabel] != "1.1.1.1:80" { t.Fatalf("expected result[1] address to be 1.1.1.1:80, received %s", result[1][model.AddressLabel]) } - if result[1][podContainerPortListLabel] != "http=80,https=443," { - t.Fatalf("expected result[1] podContainerPortListLabel to be 'http=80,https=443,', received '%s'", result[1][podContainerPortListLabel]) + if result[1][podContainerPortListLabel] != ",http=80,https=443," { + t.Fatalf("expected result[1] podContainerPortListLabel to be ',http=80,https=443,', received '%s'", result[1][podContainerPortListLabel]) } } From 89940eb48dff5e37e42fcc18c219e440dd6c2003 Mon Sep 17 00:00:00 2001 From: Patrick Bogen Date: Thu, 19 May 2016 10:57:23 -0700 Subject: [PATCH 3/3] Write tests to include testing determinancy of various slice orders; ensure that container order is deterministic --- retrieval/discovery/kubernetes/discovery.go | 10 +- .../discovery/kubernetes/discovery_test.go | 244 ++++++++++-------- 2 files changed, 141 insertions(+), 113 deletions(-) diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index feba25156..781bb314a 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -904,6 +904,12 @@ func (a ByContainerPort) Len() int { return len(a) } func (a ByContainerPort) Less(i, j int) bool { return a[i].ContainerPort < a[j].ContainerPort } func (a ByContainerPort) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +type ByContainerName []Container + +func (a ByContainerName) Len() int { return len(a) } +func (a ByContainerName) Less(i, j int) bool { return a[i].Name < a[j].Name } +func (a ByContainerName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet { var targets []model.LabelSet = make([]model.LabelSet, 0, len(pod.PodSpec.Containers)) if pod.PodStatus.PodIP == "" { @@ -923,6 +929,8 @@ func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet { } } + sort.Sort(ByContainerName(pod.PodSpec.Containers)) + for _, container := range pod.PodSpec.Containers { // Collect a list of TCP ports // Sort by port number, ascending @@ -959,7 +967,7 @@ func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet { portLabel.WriteString("=") portLabel.WriteString(strconv.FormatInt(int64(port.ContainerPort), 10)) portLabel.WriteString(",") - t[model.LabelName(podContainerPortMapPrefix + port.Name)] = model.LabelValue(strconv.FormatInt(int64(port.ContainerPort), 10)) + t[model.LabelName(podContainerPortMapPrefix+port.Name)] = model.LabelValue(strconv.FormatInt(int64(port.ContainerPort), 10)) } t[model.LabelName(podContainerPortListLabel)] = model.LabelValue(portLabel.String()) diff --git a/retrieval/discovery/kubernetes/discovery_test.go b/retrieval/discovery/kubernetes/discovery_test.go index bcce56f94..78144791c 100644 --- a/retrieval/discovery/kubernetes/discovery_test.go +++ b/retrieval/discovery/kubernetes/discovery_test.go @@ -15,6 +15,7 @@ package kubernetes import ( "flag" + "math/rand" "os" "testing" @@ -27,72 +28,82 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -var containerA = Container{ - Name: "a", - Ports: []ContainerPort{ - ContainerPort{ - Name: "http", - ContainerPort: 80, - Protocol: "TCP", - }, +var portsA = []ContainerPort{ + ContainerPort{ + Name: "http", + ContainerPort: 80, + Protocol: "TCP", }, } -var containerB = Container{ - Name: "b", - Ports: []ContainerPort{ - ContainerPort{ - Name: "https", - ContainerPort: 443, - Protocol: "TCP", - }, +var portsB = []ContainerPort{ + ContainerPort{ + Name: "https", + ContainerPort: 443, + Protocol: "TCP", }, } -var containerNoTcp = Container{ - Name: "no-tcp", - Ports: []ContainerPort{ - ContainerPort{ - Name: "dns", - ContainerPort: 53, - Protocol: "UDP", - }, +var portsNoTcp = []ContainerPort{ + ContainerPort{ + Name: "dns", + ContainerPort: 53, + Protocol: "UDP", }, } -var containerMultiA = Container{ - Name: "a", - Ports: []ContainerPort{ - ContainerPort{ - Name: "http", - ContainerPort: 80, - Protocol: "TCP", - }, - ContainerPort{ - Name: "ssh", - ContainerPort: 22, - Protocol: "TCP", - }, +var portsMultiA = []ContainerPort{ + ContainerPort{ + Name: "http", + ContainerPort: 80, + Protocol: "TCP", + }, + ContainerPort{ + Name: "ssh", + ContainerPort: 22, + Protocol: "TCP", }, } -var containerMultiB = Container{ - Name: "b", - Ports: []ContainerPort{ - ContainerPort{ - Name: "http", - ContainerPort: 80, - Protocol: "TCP", - }, - ContainerPort{ - Name: "https", - ContainerPort: 443, - Protocol: "TCP", - }, +var portsMultiB = []ContainerPort{ + ContainerPort{ + Name: "http", + ContainerPort: 80, + Protocol: "TCP", }, + ContainerPort{ + Name: "https", + ContainerPort: 443, + Protocol: "TCP", + }, +} + +func container(name string, ports []ContainerPort) Container { + p := make([]ContainerPort, len(ports)) + copy(p, ports) + + // Shuffle order of ports to ensure code enforces determinism + for i := range p { + j := rand.Intn(i + 1) + p[i], p[j] = p[j], p[i] + } + + return Container{ + Name: name, + Ports: p, + } } func pod(name string, containers []Container) *Pod { + c := make([]Container, len(containers)) + copy(c, containers) + + // Shuffle order of containers to ensure code enforces determinism + for i := range c { + j := rand.Intn(i + 1) + c[i], c[j] = c[j], c[i] + } + return &Pod{ ObjectMeta: ObjectMeta{ Name: name, @@ -108,7 +119,7 @@ func pod(name string, containers []Container) *Pod { }, }, PodSpec: PodSpec{ - Containers: containers, + Containers: c, }, } } @@ -116,72 +127,81 @@ func pod(name string, containers []Container) *Pod { func TestUpdatePodTargets(t *testing.T) { var result []model.LabelSet - // Return no targets for a pod that isn't "Running" - result = updatePodTargets(&Pod{PodStatus: PodStatus{PodIP: "1.1.1.1"}}, true) - if len(result) > 0 { - t.Fatalf("expected 0 targets, received %d", len(result)) - } + // Multiple iterations help ensure that we'll see different permutations via the various randomizations that occur + for i := 0; i < 100; i++ { + // Return no targets for a pod that isn't "Running" + result = updatePodTargets(&Pod{PodStatus: PodStatus{PodIP: "1.1.1.1"}}, true) + if len(result) > 0 { + t.Fatalf("expected 0 targets, received %d", len(result)) + } - // Return no targets for a pod with no IP - result = updatePodTargets(&Pod{PodStatus: PodStatus{Phase: "Running"}}, true) - if len(result) > 0 { - t.Fatalf("expected 0 targets, received %d", len(result)) - } + // Return no targets for a pod with no IP + result = updatePodTargets(&Pod{PodStatus: PodStatus{Phase: "Running"}}, true) + if len(result) > 0 { + t.Fatalf("expected 0 targets, received %d", len(result)) + } - // A pod with no containers (?!) should not produce any targets - result = updatePodTargets(pod("empty", []Container{}), true) - if len(result) > 0 { - t.Fatalf("expected 0 targets, received %d", len(result)) - } + // A pod with no containers (?!) should not produce any targets + result = updatePodTargets(pod("empty", []Container{}), true) + if len(result) > 0 { + t.Fatalf("expected 0 targets, received %d", len(result)) + } - // A pod with all valid containers should return one target per container with allContainers=true - result = updatePodTargets(pod("easy", []Container{containerA, containerB}), true) - if len(result) != 2 { - t.Fatalf("expected 2 targets, received %d", len(result)) - } - if result[0][podReadyLabel] != "true" { - t.Fatalf("expected result[0] podReadyLabel 'true', received '%s'", result[0][podReadyLabel]) - } - if _, ok := result[0][podContainerPortMapPrefix + "http"]; !ok { - t.Fatalf("expected result[0][podContainerPortMapPrefix + 'http'] to be '80', but was missing") - } - if result[0][podContainerPortMapPrefix + "http"] != "80" { - t.Fatalf("expected result[0][podContainerPortMapPrefix + 'http'] to be '80', but was %s", result[0][podContainerPortMapPrefix + "http"]) - } - if _, ok := result[1][podContainerPortMapPrefix + "https"]; !ok { - t.Fatalf("expected result[1][podContainerPortMapPrefix + 'https'] to be '443', but was missing") - } - if result[1][podContainerPortMapPrefix + "https"] != "443" { - t.Fatalf("expected result[1][podContainerPortMapPrefix + 'https'] to be '443', but was %s", result[1][podContainerPortMapPrefix + "https"]) - } + // A pod with all valid containers should return one target per container with allContainers=true + result = updatePodTargets(pod("easy", []Container{container("a", portsA), container("b", portsB)}), true) + if len(result) != 2 { + t.Fatalf("expected 2 targets, received %d", len(result)) + } + if result[0][podReadyLabel] != "true" { + t.Fatalf("expected result[0] podReadyLabel 'true', received '%s'", result[0][podReadyLabel]) + } + if _, ok := result[0][podContainerPortMapPrefix+"http"]; !ok { + t.Fatalf("expected result[0][podContainerPortMapPrefix + 'http'] to be '80', but was missing") + } + if result[0][podContainerPortMapPrefix+"http"] != "80" { + t.Fatalf("expected result[0][podContainerPortMapPrefix + 'http'] to be '80', but was %s", result[0][podContainerPortMapPrefix+"http"]) + } + if _, ok := result[1][podContainerPortMapPrefix+"https"]; !ok { + t.Fatalf("expected result[1][podContainerPortMapPrefix + 'https'] to be '443', but was missing") + } + if result[1][podContainerPortMapPrefix+"https"] != "443" { + t.Fatalf("expected result[1][podContainerPortMapPrefix + 'https'] to be '443', but was %s", result[1][podContainerPortMapPrefix+"https"]) + } - // A pod with all valid containers should return one target with allContainers=false - result = updatePodTargets(pod("easy", []Container{containerA, containerB}), false) - if len(result) != 1 { - t.Fatalf("expected 1 targets, received %d", len(result)) - } + // A pod with all valid containers should return one target with allContainers=false, and it should be the alphabetically first container + result = updatePodTargets(pod("easy", []Container{container("a", portsA), container("b", portsB)}), false) + if len(result) != 1 { + t.Fatalf("expected 1 targets, received %d", len(result)) + } + if _, ok := result[0][podContainerNameLabel]; !ok { + t.Fatalf("expected result[0][podContainerNameLabel] to be 'a', but was missing") + } + if result[0][podContainerNameLabel] != "a" { + t.Fatalf("expected result[0][podContainerNameLabel] to be 'a', but was '%s'", result[0][podContainerNameLabel]) + } - // A pod with some non-targetable containers should return one target per targetable container with allContainers=true - result = updatePodTargets(pod("mixed", []Container{containerA, containerNoTcp, containerB}), true) - if len(result) != 2 { - t.Fatalf("expected 2 targets, received %d", len(result)) - } + // A pod with some non-targetable containers should return one target per targetable container with allContainers=true + result = updatePodTargets(pod("mixed", []Container{container("a", portsA), container("no-tcp", portsNoTcp), container("b", portsB)}), true) + if len(result) != 2 { + t.Fatalf("expected 2 targets, received %d", len(result)) + } - // A pod with a container with multiple ports should return the numerically smallest port - result = updatePodTargets(pod("hard", []Container{containerMultiA, containerMultiB}), true) - if len(result) != 2 { - t.Fatalf("expected 2 targets, received %d", len(result)) - } - if result[0][model.AddressLabel] != "1.1.1.1:22" { - t.Fatalf("expected result[0] address to be 1.1.1.1:22, received %s", result[0][model.AddressLabel]) - } - if result[0][podContainerPortListLabel] != ",ssh=22,http=80," { - t.Fatalf("expected result[0] podContainerPortListLabel to be ',ssh=22,http=80,', received '%s'", result[0][podContainerPortListLabel]) - } - if result[1][model.AddressLabel] != "1.1.1.1:80" { - t.Fatalf("expected result[1] address to be 1.1.1.1:80, received %s", result[1][model.AddressLabel]) - } - if result[1][podContainerPortListLabel] != ",http=80,https=443," { - t.Fatalf("expected result[1] podContainerPortListLabel to be ',http=80,https=443,', received '%s'", result[1][podContainerPortListLabel]) + // A pod with a container with multiple ports should return the numerically smallest port + result = updatePodTargets(pod("hard", []Container{container("multiA", portsMultiA), container("multiB", portsMultiB)}), true) + if len(result) != 2 { + t.Fatalf("expected 2 targets, received %d", len(result)) + } + if result[0][model.AddressLabel] != "1.1.1.1:22" { + t.Fatalf("expected result[0] address to be 1.1.1.1:22, received %s", result[0][model.AddressLabel]) + } + if result[0][podContainerPortListLabel] != ",ssh=22,http=80," { + t.Fatalf("expected result[0] podContainerPortListLabel to be ',ssh=22,http=80,', received '%s'", result[0][podContainerPortListLabel]) + } + if result[1][model.AddressLabel] != "1.1.1.1:80" { + t.Fatalf("expected result[1] address to be 1.1.1.1:80, received %s", result[1][model.AddressLabel]) + } + if result[1][podContainerPortListLabel] != ",http=80,https=443," { + t.Fatalf("expected result[1] podContainerPortListLabel to be ',http=80,https=443,', received '%s'", result[1][podContainerPortListLabel]) + } } }