Add tls support

This commit is contained in:
bekbulatov 2016-09-29 13:57:28 +01:00
parent 05d3d6d9f7
commit 01b53c1180
7 changed files with 91 additions and 53 deletions

View File

@ -227,6 +227,13 @@ func resolveFilepaths(baseDir string, cfg *Config) {
kcfg.TLSConfig.CertFile = join(kcfg.TLSConfig.CertFile)
kcfg.TLSConfig.KeyFile = join(kcfg.TLSConfig.KeyFile)
}
for _, mcfg := range scfg.MarathonSDConfigs {
mcfg.TLSConfig.CAFile = join(mcfg.TLSConfig.CAFile)
mcfg.TLSConfig.CertFile = join(mcfg.TLSConfig.CertFile)
mcfg.TLSConfig.KeyFile = join(mcfg.TLSConfig.KeyFile)
}
}
}
@ -773,6 +780,7 @@ func (c *NerveSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
type MarathonSDConfig struct {
Servers []string `yaml:"servers,omitempty"`
RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
TLSConfig TLSConfig `yaml:"tls_config,omitempty"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`

View File

@ -258,9 +258,13 @@ var expectedConf = &Config{
MarathonSDConfigs: []*MarathonSDConfig{
{
Servers: []string{
"http://marathon.example.com:8080",
"https://marathon.example.com:443",
},
RefreshInterval: model.Duration(30 * time.Second),
TLSConfig: TLSConfig{
CertFile: "testdata/valid_cert_file",
KeyFile: "testdata/valid_key_file",
},
},
},
},

View File

@ -126,7 +126,11 @@ scrape_configs:
- job_name: service-marathon
marathon_sd_configs:
- servers:
- 'http://marathon.example.com:8080'
- 'https://marathon.example.com:443'
tls_config:
cert_file: valid_cert_file
key_file: valid_key_file
- job_name: service-ec2
ec2_sd_configs:

View File

@ -14,8 +14,6 @@
package discovery
import (
"time"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/retrieval/discovery/consul"
"github.com/prometheus/prometheus/retrieval/discovery/dns"
@ -41,12 +39,8 @@ func NewKubernetesDiscovery(conf *config.KubernetesSDConfig) (*kubernetes.Discov
}
// NewMarathon creates a new Marathon based discovery.
func NewMarathon(conf *config.MarathonSDConfig) *marathon.Discovery {
return &marathon.Discovery{
Servers: conf.Servers,
RefreshInterval: time.Duration(conf.RefreshInterval),
Client: marathon.FetchApps,
}
func NewMarathon(conf *config.MarathonSDConfig) (*marathon.Discovery, error) {
return marathon.NewDiscovery(conf)
}
// NewDNS creates a new DNS based discovery.

View File

@ -27,6 +27,7 @@ import (
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/httputil"
)
const (
@ -47,10 +48,32 @@ const appListPath string = "/v2/apps/?embed=apps.tasks"
// Discovery provides service discovery based on a Marathon instance.
type Discovery struct {
Servers []string
RefreshInterval time.Duration
client *http.Client
servers []string
refreshInterval time.Duration
lastRefresh map[string]*config.TargetGroup
Client AppListClient
appsClient AppListClient
}
// Initialize sets up the discovery for usage.
func NewDiscovery(conf *config.MarathonSDConfig) (*Discovery, error) {
tls, err := httputil.NewTLSConfig(conf.TLSConfig)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tls,
},
}
return &Discovery{
client: client,
servers: conf.Servers,
refreshInterval: time.Duration(conf.RefreshInterval),
appsClient: fetchApps,
}, nil
}
// Run implements the TargetProvider interface.
@ -61,7 +84,7 @@ func (md *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
select {
case <-ctx.Done():
return
case <-time.After(md.RefreshInterval):
case <-time.After(md.refreshInterval):
err := md.updateServices(ctx, ch)
if err != nil {
log.Errorf("Error while updating services: %s", err)
@ -105,8 +128,8 @@ func (md *Discovery) updateServices(ctx context.Context, ch chan<- []*config.Tar
}
func (md *Discovery) fetchTargetGroups() (map[string]*config.TargetGroup, error) {
url := RandomAppsURL(md.Servers)
apps, err := md.Client(url)
url := RandomAppsURL(md.servers)
apps, err := md.appsClient(md.client, url)
if err != nil {
return nil, err
}
@ -147,11 +170,11 @@ type AppList struct {
}
// AppListClient defines a function that can be used to get an application list from marathon.
type AppListClient func(url string) (*AppList, error)
type AppListClient func(client *http.Client, url string) (*AppList, error)
// FetchApps requests a list of applications from a marathon server.
func FetchApps(url string) (*AppList, error) {
resp, err := http.Get(url)
// fetchApps requests a list of applications from a marathon server.
func fetchApps(client *http.Client, url string) (*AppList, error) {
resp, err := client.Get(url)
if err != nil {
return nil, err
}

View File

@ -15,6 +15,7 @@ package marathon
import (
"errors"
"net/http"
"testing"
"time"
@ -27,13 +28,15 @@ import (
var (
marathonValidLabel = map[string]string{"prometheus": "yes"}
testServers = []string{"http://localhost:8080"}
conf = config.MarathonSDConfig{Servers: testServers}
)
func testUpdateServices(client AppListClient, ch chan []*config.TargetGroup) error {
md := Discovery{
Servers: testServers,
Client: client,
md, err := NewDiscovery(&conf)
if err != nil {
return err
}
md.appsClient = client
return md.updateServices(context.Background(), ch)
}
@ -41,7 +44,7 @@ func TestMarathonSDHandleError(t *testing.T) {
var (
errTesting = errors.New("testing failure")
ch = make(chan []*config.TargetGroup, 1)
client = func(url string) (*AppList, error) { return nil, errTesting }
client = func(client *http.Client, url string) (*AppList, error) { return nil, errTesting }
)
if err := testUpdateServices(client, ch); err != errTesting {
t.Fatalf("Expected error: %s", err)
@ -56,7 +59,7 @@ func TestMarathonSDHandleError(t *testing.T) {
func TestMarathonSDEmptyList(t *testing.T) {
var (
ch = make(chan []*config.TargetGroup, 1)
client = func(url string) (*AppList, error) { return &AppList{}, nil }
client = func(client *http.Client, url string) (*AppList, error) { return &AppList{}, nil }
)
if err := testUpdateServices(client, ch); err != nil {
t.Fatalf("Got error: %s", err)
@ -95,7 +98,7 @@ func marathonTestAppList(labels map[string]string, runningTasks int) *AppList {
func TestMarathonSDSendGroup(t *testing.T) {
var (
ch = make(chan []*config.TargetGroup, 1)
client = func(url string) (*AppList, error) {
client = func(client *http.Client, url string) (*AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil
}
)
@ -122,16 +125,14 @@ func TestMarathonSDSendGroup(t *testing.T) {
}
func TestMarathonSDRemoveApp(t *testing.T) {
var (
ch = make(chan []*config.TargetGroup)
client = func(url string) (*AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil
}
md = Discovery{
Servers: testServers,
Client: client,
}
)
var ch = make(chan []*config.TargetGroup)
md, err := NewDiscovery(&conf)
if err != nil {
t.Fatalf("%s", err)
}
md.appsClient = func(client *http.Client, url string) (*AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil
}
go func() {
up1 := (<-ch)[0]
up2 := (<-ch)[0]
@ -142,32 +143,31 @@ func TestMarathonSDRemoveApp(t *testing.T) {
}
}
}()
err := md.updateServices(context.Background(), ch)
if err != nil {
if err := md.updateServices(context.Background(), ch); err != nil {
t.Fatalf("Got error on first update: %s", err)
}
md.Client = func(url string) (*AppList, error) {
md.appsClient = func(client *http.Client, url string) (*AppList, error) {
return marathonTestAppList(marathonValidLabel, 0), nil
}
err = md.updateServices(context.Background(), ch)
if err != nil {
if err := md.updateServices(context.Background(), ch); err != nil {
t.Fatalf("Got error on second update: %s", err)
}
}
func TestMarathonSDRunAndStop(t *testing.T) {
var (
ch = make(chan []*config.TargetGroup)
client = func(url string) (*AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil
}
md = Discovery{
Servers: testServers,
Client: client,
RefreshInterval: time.Millisecond * 10,
}
refreshInterval = model.Duration(time.Millisecond * 10)
conf = config.MarathonSDConfig{Servers: testServers, RefreshInterval: refreshInterval}
ch = make(chan []*config.TargetGroup)
)
md, err := NewDiscovery(&conf)
if err != nil {
t.Fatalf("%s", err)
}
md.appsClient = func(client *http.Client, url string) (*AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil
}
ctx, cancel := context.WithCancel(context.Background())
go func() {
@ -178,7 +178,7 @@ func TestMarathonSDRunAndStop(t *testing.T) {
return
}
cancel()
case <-time.After(md.RefreshInterval * 3):
case <-time.After(md.refreshInterval * 3):
cancel()
t.Fatalf("Update took too long.")
}
@ -213,7 +213,7 @@ func marathonTestZeroTaskPortAppList(labels map[string]string, runningTasks int)
func TestMarathonZeroTaskPorts(t *testing.T) {
var (
ch = make(chan []*config.TargetGroup, 1)
client = func(url string) (*AppList, error) {
client = func(client *http.Client, url string) (*AppList, error) {
return marathonTestZeroTaskPortAppList(marathonValidLabel, 1), nil
}
)

View File

@ -389,7 +389,12 @@ func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider {
app("consul", i, k)
}
for i, c := range cfg.MarathonSDConfigs {
app("marathon", i, discovery.NewMarathon(c))
m, err := discovery.NewMarathon(c)
if err != nil {
log.Errorf("Cannot create Marathon discovery: %s", err)
continue
}
app("marathon", i, m)
}
for i, c := range cfg.KubernetesSDConfigs {
k, err := discovery.NewKubernetesDiscovery(c)