diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 000000000..32d134f7d --- /dev/null +++ b/.dockerignore @@ -0,0 +1,3 @@ +data/ +prometheus +promtool diff --git a/config/config.go b/config/config.go index 1c5970092..5d20f0811 100644 --- a/config/config.go +++ b/config/config.go @@ -98,6 +98,11 @@ var ( DefaultServersetSDConfig = ServersetSDConfig{ Timeout: Duration(10 * time.Second), } + + // DefaultMarathonSDConfig is the default Marathon SD configuration. + DefaultMarathonSDConfig = MarathonSDConfig{ + RefreshInterval: Duration(30 * time.Second), + } ) // This custom URL type allows validating at configuration load time. @@ -278,6 +283,8 @@ type ScrapeConfig struct { ConsulSDConfigs []*ConsulSDConfig `yaml:"consul_sd_configs,omitempty"` // List of Serverset service discovery configurations. ServersetSDConfigs []*ServersetSDConfig `yaml:"serverset_sd_configs,omitempty"` + // MarathonSDConfigs is a list of Marathon service discovery configurations. + MarathonSDConfigs []*MarathonSDConfig `yaml:"marathon_sd_configs,omitempty"` // List of target relabel configurations. RelabelConfigs []*RelabelConfig `yaml:"relabel_configs,omitempty"` @@ -538,6 +545,29 @@ func (c *ServersetSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) err return checkOverflow(c.XXX, "serverset_sd_config") } +// MarathonSDConfig is the configuration for services running on Marathon. +type MarathonSDConfig struct { + Servers []string `yaml:"servers,omitempty"` + RefreshInterval Duration `yaml:"refresh_interval,omitempty"` + + // Catches all undefined fields and must be empty after parsing. + XXX map[string]interface{} `yaml:",inline"` +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (c *MarathonSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + *c = DefaultMarathonSDConfig + type plain MarathonSDConfig + err := unmarshal((*plain)(c)) + if err != nil { + return err + } + if len(c.Servers) == 0 { + return fmt.Errorf("Marathon SD config must contain at least one Marathon server") + } + return checkOverflow(c.XXX, "marathon_sd_config") +} + // RelabelAction is the action to be performed on relabeling. type RelabelAction string @@ -574,7 +604,7 @@ type RelabelConfig struct { // Separator is the string between concatenated values from the source labels. Separator string `yaml:"separator,omitempty"` // Regex against which the concatenation is matched. - Regex *Regexp `yaml:"regex",omitempty` + Regex *Regexp `yaml:"regex,omitempty"` // Modulus to take of the hash of concatenated values from the source labels. Modulus uint64 `yaml:"modulus,omitempty"` // The label to which the resulting string is written in a replacement. diff --git a/retrieval/discovery/marathon.go b/retrieval/discovery/marathon.go new file mode 100644 index 000000000..5fc4d07a9 --- /dev/null +++ b/retrieval/discovery/marathon.go @@ -0,0 +1,98 @@ +package discovery + +import ( + "time" + + "github.com/prometheus/log" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/retrieval/discovery/marathon" +) + +// MarathonDiscovery provides service discovery based on a Marathon instance. +type MarathonDiscovery struct { + servers []string + refreshInterval time.Duration + done chan struct{} + lastRefresh map[string]*config.TargetGroup + client marathon.AppListClient +} + +// NewMarathonDiscovery creates a new Marathon based discovery. +func NewMarathonDiscovery(conf *config.MarathonSDConfig) *MarathonDiscovery { + return &MarathonDiscovery{ + servers: conf.Servers, + refreshInterval: time.Duration(conf.RefreshInterval), + done: make(chan struct{}), + client: marathon.FetchMarathonApps, + } +} + +// Sources implements the TargetProvider interface. +func (md *MarathonDiscovery) Sources() []string { + var sources []string + tgroups, err := md.fetchTargetGroups() + if err == nil { + for source := range tgroups { + sources = append(sources, source) + } + } + return sources +} + +// Run implements the TargetProvider interface. +func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup) { + defer close(ch) + + for { + select { + case <-md.done: + log.Debug("Shutting down marathon discovery.") + return + case <-time.After(md.refreshInterval): + err := md.updateServices(ch) + if err != nil { + log.Errorf("Error while updating services: %s", err) + } + } + } +} + +// Stop implements the TargetProvider interface. +func (md *MarathonDiscovery) Stop() { + md.done <- struct{}{} +} + +func (md *MarathonDiscovery) updateServices(ch chan<- *config.TargetGroup) error { + targetMap, err := md.fetchTargetGroups() + if err != nil { + return err + } + + // Update services which are still present + for _, tg := range targetMap { + ch <- tg + } + + // Remove services which did disappear + for source := range md.lastRefresh { + _, ok := targetMap[source] + if !ok { + log.Debugf("Removing group for %s", source) + ch <- &config.TargetGroup{Source: source} + } + } + + md.lastRefresh = targetMap + return nil +} + +func (md *MarathonDiscovery) fetchTargetGroups() (map[string]*config.TargetGroup, error) { + url := marathon.RandomAppsURL(md.servers) + apps, err := md.client(url) + if err != nil { + return nil, err + } + + groups := marathon.AppsToTargetGroups(apps) + return groups, nil +} diff --git a/retrieval/discovery/marathon/client.go b/retrieval/discovery/marathon/client.go new file mode 100644 index 000000000..52ccbbf7a --- /dev/null +++ b/retrieval/discovery/marathon/client.go @@ -0,0 +1,34 @@ +package marathon + +import ( + "encoding/json" + "io/ioutil" + "net/http" +) + +// AppListClient defines a function that can be used to get an application list from marathon. +type AppListClient func(url string) (*AppList, error) + +// FetchMarathonApps requests a list of applications from a marathon server. +func FetchMarathonApps(url string) (*AppList, error) { + resp, err := http.Get(url) + if err != nil { + return nil, err + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + return parseAppJSON(body) +} + +func parseAppJSON(body []byte) (*AppList, error) { + apps := &AppList{} + err := json.Unmarshal(body, apps) + if err != nil { + return nil, err + } + return apps, nil +} diff --git a/retrieval/discovery/marathon/constants.go b/retrieval/discovery/marathon/constants.go new file mode 100644 index 000000000..7b761a660 --- /dev/null +++ b/retrieval/discovery/marathon/constants.go @@ -0,0 +1,16 @@ +package marathon + +import ( + "github.com/prometheus/client_golang/model" +) + +const ( + // AppLabel is used for the name of the app in Marathon. + AppLabel model.LabelName = "__meta_marathon_app" + + // ImageLabel is the label that is used for the docker image running the service. + ImageLabel model.LabelName = "__meta_marathon_image" + + // TaskLabel contains the mesos task name of the app instance. + TaskLabel model.LabelName = "__meta_marathon_task" +) diff --git a/retrieval/discovery/marathon/conversion.go b/retrieval/discovery/marathon/conversion.go new file mode 100644 index 000000000..eace6cb2d --- /dev/null +++ b/retrieval/discovery/marathon/conversion.go @@ -0,0 +1,76 @@ +package marathon + +import ( + "fmt" + "strings" + + clientmodel "github.com/prometheus/client_golang/model" + + "github.com/prometheus/prometheus/config" +) + +// AppsToTargetGroups takes an array of Marathon apps and converts them into target groups. +func AppsToTargetGroups(apps *AppList) map[string]*config.TargetGroup { + tgroups := map[string]*config.TargetGroup{} + for _, a := range apps.Apps { + if isValidApp(&a) { + group := createTargetGroup(&a) + tgroups[group.Source] = group + } + } + return tgroups +} + +func createTargetGroup(app *App) *config.TargetGroup { + var ( + targets = targetsForApp(app) + source = targetGroupName(app) + appName = clientmodel.LabelValue(sanitizeName(app.ID)) + image = clientmodel.LabelValue(imageName(app)) + ) + return &config.TargetGroup{ + Targets: targets, + Labels: clientmodel.LabelSet{ + AppLabel: appName, + ImageLabel: image, + }, + Source: source, + } +} + +func targetsForApp(app *App) []clientmodel.LabelSet { + targets := make([]clientmodel.LabelSet, 0, len(app.Tasks)) + for _, t := range app.Tasks { + target := targetForTask(&t) + targets = append(targets, clientmodel.LabelSet{ + clientmodel.AddressLabel: clientmodel.LabelValue(target), + TaskLabel: clientmodel.LabelValue(sanitizeName(t.ID)), + }) + } + return targets +} + +func imageName(app *App) string { + return app.Container.Docker.Image +} + +func targetForTask(task *Task) string { + return fmt.Sprintf("%s:%d", task.Host, task.Ports[0]) +} + +func isValidApp(app *App) bool { + if app.RunningTasks > 0 { + _, ok := app.Labels["prometheus"] + return ok + } + return false +} + +func targetGroupName(app *App) string { + return fmt.Sprintf("marathon:%s", sanitizeName(app.ID)) +} + +func sanitizeName(id string) string { + trimID := strings.TrimLeft(id, " -/.") + return strings.Replace(trimID, "/", "-", -1) +} diff --git a/retrieval/discovery/marathon/objects.go b/retrieval/discovery/marathon/objects.go new file mode 100644 index 000000000..7da743a51 --- /dev/null +++ b/retrieval/discovery/marathon/objects.go @@ -0,0 +1,32 @@ +package marathon + +// Task describes one instance of a service running on Marathon. +type Task struct { + ID string `json:"id"` + Host string `json:"host"` + Ports []uint32 `json:"ports"` +} + +// DockerContainer describes a container which uses the docker runtime. +type DockerContainer struct { + Image string `json:"image"` +} + +// Container describes the runtime an app in running in. +type Container struct { + Docker DockerContainer `json:"docker"` +} + +// App describes a service running on Marathon. +type App struct { + ID string `json:"id"` + Tasks []Task `json:"tasks"` + RunningTasks int `json:"tasksRunning"` + Labels map[string]string `json:"labels"` + Container Container `json:"container"` +} + +// AppList is a list of Marathon apps. +type AppList struct { + Apps []App `json:"apps"` +} diff --git a/retrieval/discovery/marathon/url.go b/retrieval/discovery/marathon/url.go new file mode 100644 index 000000000..61f5041f9 --- /dev/null +++ b/retrieval/discovery/marathon/url.go @@ -0,0 +1,15 @@ +package marathon + +import ( + "fmt" + "math/rand" +) + +const appListPath string = "/v2/apps/?embed=apps.tasks" + +// RandomAppsURL randomly selects a server from an array and creates an URL pointing to the app list. +func RandomAppsURL(servers []string) string { + // TODO If possible update server list from Marathon at some point + server := servers[rand.Intn(len(servers))] + return fmt.Sprintf("%s%s", server, appListPath) +} diff --git a/retrieval/discovery/marathon_test.go b/retrieval/discovery/marathon_test.go new file mode 100644 index 000000000..7cf6b74b6 --- /dev/null +++ b/retrieval/discovery/marathon_test.go @@ -0,0 +1,200 @@ +package discovery + +import ( + "errors" + "testing" + "time" + + clientmodel "github.com/prometheus/client_golang/model" + + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/retrieval/discovery/marathon" +) + +var marathonValidLabel = map[string]string{"prometheus": "yes"} + +func newTestDiscovery(client marathon.AppListClient) (chan *config.TargetGroup, *MarathonDiscovery) { + ch := make(chan *config.TargetGroup) + md := NewMarathonDiscovery(&config.MarathonSDConfig{ + Servers: []string{"http://localhost:8080"}, + }) + md.client = client + return ch, md +} + +func TestMarathonSDHandleError(t *testing.T) { + var errTesting = errors.New("testing failure") + ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { + return nil, errTesting + }) + go func() { + select { + case tg := <-ch: + t.Fatalf("Got group: %s", tg) + default: + } + }() + err := md.updateServices(ch) + if err != errTesting { + t.Fatalf("Expected error: %s", err) + } +} + +func TestMarathonSDEmptyList(t *testing.T) { + ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { + return &marathon.AppList{}, nil + }) + go func() { + select { + case tg := <-ch: + t.Fatalf("Got group: %v", tg) + default: + } + }() + err := md.updateServices(ch) + if err != nil { + t.Fatalf("Got error: %s", err) + } +} + +func marathonTestAppList(labels map[string]string, runningTasks int) *marathon.AppList { + task := marathon.Task{ + ID: "test-task-1", + Host: "mesos-slave1", + Ports: []uint32{31000}, + } + docker := marathon.DockerContainer{Image: "repo/image:tag"} + container := marathon.Container{Docker: docker} + app := marathon.App{ + ID: "test-service", + Tasks: []marathon.Task{task}, + RunningTasks: runningTasks, + Labels: labels, + Container: container, + } + return &marathon.AppList{ + Apps: []marathon.App{app}, + } +} + +func TestMarathonSDSendGroup(t *testing.T) { + ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { + return marathonTestAppList(marathonValidLabel, 1), nil + }) + go func() { + select { + case tg := <-ch: + if tg.Source != "marathon:test-service" { + t.Fatalf("Wrong target group name: %s", tg.Source) + } + if len(tg.Targets) != 1 { + t.Fatalf("Wrong number of targets: %v", tg.Targets) + } + tgt := tg.Targets[0] + if tgt[clientmodel.AddressLabel] != "mesos-slave1:31000" { + t.Fatalf("Wrong target address: %s", tgt[clientmodel.AddressLabel]) + } + default: + t.Fatal("Did not get a target group.") + } + }() + err := md.updateServices(ch) + if err != nil { + t.Fatalf("Got error: %s", err) + } +} + +func TestMarathonSDNoLabel(t *testing.T) { + ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { + return marathonTestAppList(map[string]string{}, 1), nil + }) + go func() { + select { + case tg := <-ch: + t.Fatalf("Got group: %s", tg) + default: + } + }() + err := md.updateServices(ch) + if err != nil { + t.Fatalf("Got error: %s", err) + } +} + +func TestMarathonSDNotRunning(t *testing.T) { + ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { + return marathonTestAppList(marathonValidLabel, 0), nil + }) + go func() { + select { + case tg := <-ch: + t.Fatalf("Got group: %s", tg) + default: + } + }() + err := md.updateServices(ch) + if err != nil { + t.Fatalf("Got error: %s", err) + } +} + +func TestMarathonSDRemoveApp(t *testing.T) { + ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { + return marathonTestAppList(marathonValidLabel, 1), nil + }) + go func() { + up1 := <-ch + up2 := <-ch + if up2.Source != up1.Source { + t.Fatalf("Source is different: %s", up2) + if len(up2.Targets) > 0 { + t.Fatalf("Got a non-empty target set: %s", up2.Targets) + } + } + }() + err := md.updateServices(ch) + if err != nil { + t.Fatalf("Got error on first update: %s", err) + } + + md.client = func(url string) (*marathon.AppList, error) { + return marathonTestAppList(marathonValidLabel, 0), nil + } + err = md.updateServices(ch) + if err != nil { + t.Fatalf("Got error on second update: %s", err) + } +} + +func TestMarathonSDSources(t *testing.T) { + _, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { + return marathonTestAppList(marathonValidLabel, 1), nil + }) + sources := md.Sources() + if len(sources) != 1 { + t.Fatalf("Wrong number of sources: %s", sources) + } +} + +func TestMarathonSDRunAndStop(t *testing.T) { + ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { + return marathonTestAppList(marathonValidLabel, 1), nil + }) + md.refreshInterval = time.Millisecond * 10 + + go func() { + select { + case <-ch: + md.Stop() + case <-time.After(md.refreshInterval * 3): + md.Stop() + t.Fatalf("Update took too long.") + } + }() + md.Run(ch) + select { + case <-ch: + default: + t.Fatalf("Channel not closed.") + } +} diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 271307b74..1474169a0 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -398,6 +398,9 @@ func ProvidersFromConfig(cfg *config.ScrapeConfig) []TargetProvider { for _, c := range cfg.ServersetSDConfigs { providers = append(providers, discovery.NewServersetDiscovery(c)) } + for _, c := range cfg.MarathonSDConfigs { + providers = append(providers, discovery.NewMarathonDiscovery(c)) + } if len(cfg.TargetGroups) > 0 { providers = append(providers, NewStaticProvider(cfg.TargetGroups)) }