mirror of
https://github.com/prometheus/prometheus
synced 2024-12-27 17:13:22 +00:00
Merge pull request #2212 from prometheus/alertingsd
Extract discovery package
This commit is contained in:
commit
dd1a656cc4
@ -11,7 +11,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package discovery
|
||||
package azure
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@ -45,15 +45,13 @@ const (
|
||||
var (
|
||||
azureSDRefreshFailuresCount = prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Name: "sd_azure_refresh_failures_total",
|
||||
Help: "Number of Azure-SD refresh failures.",
|
||||
Name: "prometheus_sd_azure_refresh_failures_total",
|
||||
Help: "Number of Azure-SD refresh failures.",
|
||||
})
|
||||
azureSDRefreshDuration = prometheus.NewSummary(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Name: "sd_azure_refresh_duration_seconds",
|
||||
Help: "The duration of a Azure-SD refresh in seconds.",
|
||||
Name: "prometheus_sd_azure_refresh_duration_seconds",
|
||||
Help: "The duration of a Azure-SD refresh in seconds.",
|
||||
})
|
||||
)
|
||||
|
||||
@ -70,8 +68,8 @@ type AzureDiscovery struct {
|
||||
port int
|
||||
}
|
||||
|
||||
// NewAzureDiscovery returns a new AzureDiscovery which periodically refreshes its targets.
|
||||
func NewAzureDiscovery(cfg *config.AzureSDConfig) *AzureDiscovery {
|
||||
// NewDiscovery returns a new AzureDiscovery which periodically refreshes its targets.
|
||||
func NewDiscovery(cfg *config.AzureSDConfig) *AzureDiscovery {
|
||||
return &AzureDiscovery{
|
||||
cfg: cfg,
|
||||
interval: time.Duration(cfg.RefreshInterval),
|
302
discovery/discovery.go
Normal file
302
discovery/discovery.go
Normal file
@ -0,0 +1,302 @@
|
||||
// Copyright 2016 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package discovery
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/discovery/azure"
|
||||
"github.com/prometheus/prometheus/discovery/consul"
|
||||
"github.com/prometheus/prometheus/discovery/dns"
|
||||
"github.com/prometheus/prometheus/discovery/ec2"
|
||||
"github.com/prometheus/prometheus/discovery/file"
|
||||
"github.com/prometheus/prometheus/discovery/gce"
|
||||
"github.com/prometheus/prometheus/discovery/kubernetes"
|
||||
"github.com/prometheus/prometheus/discovery/marathon"
|
||||
"github.com/prometheus/prometheus/discovery/zookeeper"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// A TargetProvider provides information about target groups. It maintains a set
|
||||
// of sources from which TargetGroups can originate. Whenever a target provider
|
||||
// detects a potential change, it sends the TargetGroup through its provided channel.
|
||||
//
|
||||
// The TargetProvider does not have to guarantee that an actual change happened.
|
||||
// It does guarantee that it sends the new TargetGroup whenever a change happens.
|
||||
//
|
||||
// TargetProviders should initially send a full set of all discoverable TargetGroups.
|
||||
type TargetProvider interface {
|
||||
// Run hands a channel to the target provider through which it can send
|
||||
// updated target groups.
|
||||
// Must returns if the context gets canceled. It should not close the update
|
||||
// channel on returning.
|
||||
Run(ctx context.Context, up chan<- []*config.TargetGroup)
|
||||
}
|
||||
|
||||
// ProvidersFromConfig returns all TargetProviders configured in cfg.
|
||||
func ProvidersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider {
|
||||
providers := map[string]TargetProvider{}
|
||||
|
||||
app := func(mech string, i int, tp TargetProvider) {
|
||||
providers[fmt.Sprintf("%s/%d", mech, i)] = tp
|
||||
}
|
||||
|
||||
for i, c := range cfg.DNSSDConfigs {
|
||||
app("dns", i, dns.NewDiscovery(c))
|
||||
}
|
||||
for i, c := range cfg.FileSDConfigs {
|
||||
app("file", i, file.NewDiscovery(c))
|
||||
}
|
||||
for i, c := range cfg.ConsulSDConfigs {
|
||||
k, err := consul.NewDiscovery(c)
|
||||
if err != nil {
|
||||
log.Errorf("Cannot create Consul discovery: %s", err)
|
||||
continue
|
||||
}
|
||||
app("consul", i, k)
|
||||
}
|
||||
for i, c := range cfg.MarathonSDConfigs {
|
||||
m, err := marathon.NewDiscovery(c)
|
||||
if err != nil {
|
||||
log.Errorf("Cannot create Marathon discovery: %s", err)
|
||||
continue
|
||||
}
|
||||
app("marathon", i, m)
|
||||
}
|
||||
for i, c := range cfg.KubernetesSDConfigs {
|
||||
k, err := kubernetes.New(log.Base(), c)
|
||||
if err != nil {
|
||||
log.Errorf("Cannot create Kubernetes discovery: %s", err)
|
||||
continue
|
||||
}
|
||||
app("kubernetes", i, k)
|
||||
}
|
||||
for i, c := range cfg.ServersetSDConfigs {
|
||||
app("serverset", i, zookeeper.NewServersetDiscovery(c))
|
||||
}
|
||||
for i, c := range cfg.NerveSDConfigs {
|
||||
app("nerve", i, zookeeper.NewNerveDiscovery(c))
|
||||
}
|
||||
for i, c := range cfg.EC2SDConfigs {
|
||||
app("ec2", i, ec2.NewDiscovery(c))
|
||||
}
|
||||
for i, c := range cfg.GCESDConfigs {
|
||||
gced, err := gce.NewDiscovery(c)
|
||||
if err != nil {
|
||||
log.Errorf("Cannot initialize GCE discovery: %s", err)
|
||||
continue
|
||||
}
|
||||
app("gce", i, gced)
|
||||
}
|
||||
for i, c := range cfg.AzureSDConfigs {
|
||||
app("azure", i, azure.NewDiscovery(c))
|
||||
}
|
||||
if len(cfg.StaticConfigs) > 0 {
|
||||
app("static", 0, NewStaticProvider(cfg.StaticConfigs))
|
||||
}
|
||||
|
||||
return providers
|
||||
}
|
||||
|
||||
// StaticProvider holds a list of target groups that never change.
|
||||
type StaticProvider struct {
|
||||
TargetGroups []*config.TargetGroup
|
||||
}
|
||||
|
||||
// NewStaticProvider returns a StaticProvider configured with the given
|
||||
// target groups.
|
||||
func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider {
|
||||
for i, tg := range groups {
|
||||
tg.Source = fmt.Sprintf("%d", i)
|
||||
}
|
||||
return &StaticProvider{groups}
|
||||
}
|
||||
|
||||
// Run implements the TargetProvider interface.
|
||||
func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||
// We still have to consider that the consumer exits right away in which case
|
||||
// the context will be canceled.
|
||||
select {
|
||||
case ch <- sd.TargetGroups:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
close(ch)
|
||||
}
|
||||
|
||||
// TargetSet handles multiple TargetProviders and sends a full overview of their
|
||||
// discovered TargetGroups to a Syncer.
|
||||
type TargetSet struct {
|
||||
mtx sync.RWMutex
|
||||
// Sets of targets by a source string that is unique across target providers.
|
||||
tgroups map[string]*config.TargetGroup
|
||||
|
||||
syncer Syncer
|
||||
|
||||
syncCh chan struct{}
|
||||
providerCh chan map[string]TargetProvider
|
||||
cancelProviders func()
|
||||
}
|
||||
|
||||
// Syncer receives updates complete sets of TargetGroups.
|
||||
type Syncer interface {
|
||||
Sync([]*config.TargetGroup)
|
||||
}
|
||||
|
||||
// NewTargetSet returns a new target sending TargetGroups to the Syncer.
|
||||
func NewTargetSet(s Syncer) *TargetSet {
|
||||
return &TargetSet{
|
||||
syncCh: make(chan struct{}, 1),
|
||||
providerCh: make(chan map[string]TargetProvider),
|
||||
syncer: s,
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts the processing of target providers and their updates.
|
||||
// It blocks until the context gets canceled.
|
||||
func (ts *TargetSet) Run(ctx context.Context) {
|
||||
Loop:
|
||||
for {
|
||||
// Throttle syncing to once per five seconds.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break Loop
|
||||
case p := <-ts.providerCh:
|
||||
ts.updateProviders(ctx, p)
|
||||
case <-time.After(5 * time.Second):
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break Loop
|
||||
case <-ts.syncCh:
|
||||
ts.sync()
|
||||
case p := <-ts.providerCh:
|
||||
ts.updateProviders(ctx, p)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ts *TargetSet) sync() {
|
||||
ts.mtx.RLock()
|
||||
var all []*config.TargetGroup
|
||||
for _, tg := range ts.tgroups {
|
||||
all = append(all, tg)
|
||||
}
|
||||
ts.mtx.RUnlock()
|
||||
|
||||
ts.syncer.Sync(all)
|
||||
}
|
||||
|
||||
// UpdateProviders sets new target providers for the target set.
|
||||
func (ts *TargetSet) UpdateProviders(p map[string]TargetProvider) {
|
||||
ts.providerCh <- p
|
||||
}
|
||||
|
||||
func (ts *TargetSet) updateProviders(ctx context.Context, providers map[string]TargetProvider) {
|
||||
// Lock for the entire time. This may mean up to 5 seconds until the full initial set
|
||||
// is retrieved and applied.
|
||||
// We could release earlier with some tweaks, but this is easier to reason about.
|
||||
ts.mtx.Lock()
|
||||
defer ts.mtx.Unlock()
|
||||
|
||||
// Stop all previous target providers of the target set.
|
||||
if ts.cancelProviders != nil {
|
||||
ts.cancelProviders()
|
||||
}
|
||||
ctx, ts.cancelProviders = context.WithCancel(ctx)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
// (Re-)create a fresh tgroups map to not keep stale targets around. We
|
||||
// will retrieve all targets below anyway, so cleaning up everything is
|
||||
// safe and doesn't inflict any additional cost.
|
||||
ts.tgroups = map[string]*config.TargetGroup{}
|
||||
|
||||
for name, prov := range providers {
|
||||
wg.Add(1)
|
||||
|
||||
updates := make(chan []*config.TargetGroup)
|
||||
go prov.Run(ctx, updates)
|
||||
|
||||
go func(name string, prov TargetProvider) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case initial, ok := <-updates:
|
||||
// Handle the case that a target provider exits and closes the channel
|
||||
// before the context is done.
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
// First set of all targets the provider knows.
|
||||
for _, tgroup := range initial {
|
||||
ts.setTargetGroup(name, tgroup)
|
||||
}
|
||||
case <-time.After(5 * time.Second):
|
||||
// Initial set didn't arrive. Act as if it was empty
|
||||
// and wait for updates later on.
|
||||
}
|
||||
wg.Done()
|
||||
|
||||
// Start listening for further updates.
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case tgs, ok := <-updates:
|
||||
// Handle the case that a target provider exits and closes the channel
|
||||
// before the context is done.
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
for _, tg := range tgs {
|
||||
ts.update(name, tg)
|
||||
}
|
||||
}
|
||||
}
|
||||
}(name, prov)
|
||||
}
|
||||
|
||||
// We wait for a full initial set of target groups before releasing the mutex
|
||||
// to ensure the initial sync is complete and there are no races with subsequent updates.
|
||||
wg.Wait()
|
||||
// Just signal that there are initial sets to sync now. Actual syncing must only
|
||||
// happen in the runScraping loop.
|
||||
select {
|
||||
case ts.syncCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// update handles a target group update from a target provider identified by the name.
|
||||
func (ts *TargetSet) update(name string, tgroup *config.TargetGroup) {
|
||||
ts.mtx.Lock()
|
||||
defer ts.mtx.Unlock()
|
||||
|
||||
ts.setTargetGroup(name, tgroup)
|
||||
|
||||
select {
|
||||
case ts.syncCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (ts *TargetSet) setTargetGroup(name string, tg *config.TargetGroup) {
|
||||
if tg == nil {
|
||||
return
|
||||
}
|
||||
ts.tgroups[name+"/"+tg.Source] = tg
|
||||
}
|
87
discovery/discovery_test.go
Normal file
87
discovery/discovery_test.go
Normal file
@ -0,0 +1,87 @@
|
||||
// Copyright 2016 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package discovery
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"golang.org/x/net/context"
|
||||
yaml "gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) {
|
||||
|
||||
verifyPresence := func(tgroups map[string]*config.TargetGroup, name string, present bool) {
|
||||
if _, ok := tgroups[name]; ok != present {
|
||||
msg := ""
|
||||
if !present {
|
||||
msg = "not "
|
||||
}
|
||||
t.Fatalf("'%s' should %sbe present in TargetSet.tgroups: %s", name, msg, tgroups)
|
||||
}
|
||||
}
|
||||
|
||||
scrapeConfig := &config.ScrapeConfig{}
|
||||
|
||||
sOne := `
|
||||
job_name: "foo"
|
||||
static_configs:
|
||||
- targets: ["foo:9090"]
|
||||
- targets: ["bar:9090"]
|
||||
`
|
||||
if err := yaml.Unmarshal([]byte(sOne), scrapeConfig); err != nil {
|
||||
t.Fatalf("Unable to load YAML config sOne: %s", err)
|
||||
}
|
||||
called := make(chan struct{})
|
||||
|
||||
ts := NewTargetSet(&mockSyncer{
|
||||
sync: func([]*config.TargetGroup) { called <- struct{}{} },
|
||||
})
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
go ts.Run(ctx)
|
||||
|
||||
ts.UpdateProviders(ProvidersFromConfig(scrapeConfig))
|
||||
<-called
|
||||
|
||||
verifyPresence(ts.tgroups, "static/0/0", true)
|
||||
verifyPresence(ts.tgroups, "static/0/1", true)
|
||||
|
||||
sTwo := `
|
||||
job_name: "foo"
|
||||
static_configs:
|
||||
- targets: ["foo:9090"]
|
||||
`
|
||||
if err := yaml.Unmarshal([]byte(sTwo), scrapeConfig); err != nil {
|
||||
t.Fatalf("Unable to load YAML config sTwo: %s", err)
|
||||
}
|
||||
|
||||
ts.UpdateProviders(ProvidersFromConfig(scrapeConfig))
|
||||
<-called
|
||||
|
||||
verifyPresence(ts.tgroups, "static/0/0", true)
|
||||
verifyPresence(ts.tgroups, "static/0/1", false)
|
||||
}
|
||||
|
||||
type mockSyncer struct {
|
||||
sync func(tgs []*config.TargetGroup)
|
||||
}
|
||||
|
||||
func (s *mockSyncer) Sync(tgs []*config.TargetGroup) {
|
||||
if s.sync != nil {
|
||||
s.sync(tgs)
|
||||
}
|
||||
}
|
@ -11,7 +11,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package discovery
|
||||
package ec2
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@ -50,15 +50,13 @@ const (
|
||||
var (
|
||||
ec2SDRefreshFailuresCount = prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Name: "sd_ec2_refresh_failures_total",
|
||||
Help: "The number of EC2-SD scrape failures.",
|
||||
Name: "prometheus_sd_ec2_refresh_failures_total",
|
||||
Help: "The number of EC2-SD scrape failures.",
|
||||
})
|
||||
ec2SDRefreshDuration = prometheus.NewSummary(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Name: "sd_ec2_refresh_duration_seconds",
|
||||
Help: "The duration of a EC2-SD refresh in seconds.",
|
||||
Name: "prometheus_sd_ec2_refresh_duration_seconds",
|
||||
Help: "The duration of a EC2-SD refresh in seconds.",
|
||||
})
|
||||
)
|
||||
|
||||
@ -76,8 +74,8 @@ type EC2Discovery struct {
|
||||
port int
|
||||
}
|
||||
|
||||
// NewEC2Discovery returns a new EC2Discovery which periodically refreshes its targets.
|
||||
func NewEC2Discovery(conf *config.EC2SDConfig) *EC2Discovery {
|
||||
// NewDiscovery returns a new EC2Discovery which periodically refreshes its targets.
|
||||
func NewDiscovery(conf *config.EC2SDConfig) *EC2Discovery {
|
||||
creds := credentials.NewStaticCredentials(conf.AccessKey, conf.SecretKey, "")
|
||||
if conf.AccessKey == "" && conf.SecretKey == "" {
|
||||
creds = nil
|
@ -11,7 +11,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package discovery
|
||||
package file
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
@ -36,15 +36,13 @@ const fileSDFilepathLabel = model.MetaLabelPrefix + "filepath"
|
||||
var (
|
||||
fileSDScanDuration = prometheus.NewSummary(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Name: "sd_file_scan_duration_seconds",
|
||||
Help: "The duration of the File-SD scan in seconds.",
|
||||
Name: "prometheus_sd_file_scan_duration_seconds",
|
||||
Help: "The duration of the File-SD scan in seconds.",
|
||||
})
|
||||
fileSDReadErrorsCount = prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Name: "sd_file_read_errors_total",
|
||||
Help: "The number of File-SD read errors.",
|
||||
Name: "prometheus_sd_file_read_errors_total",
|
||||
Help: "The number of File-SD read errors.",
|
||||
})
|
||||
)
|
||||
|
||||
@ -67,8 +65,8 @@ type FileDiscovery struct {
|
||||
lastRefresh map[string]int
|
||||
}
|
||||
|
||||
// NewFileDiscovery returns a new file discovery for the given paths.
|
||||
func NewFileDiscovery(conf *config.FileSDConfig) *FileDiscovery {
|
||||
// NewDiscovery returns a new file discovery for the given paths.
|
||||
func NewDiscovery(conf *config.FileSDConfig) *FileDiscovery {
|
||||
return &FileDiscovery{
|
||||
paths: conf.Files,
|
||||
interval: time.Duration(conf.RefreshInterval),
|
@ -11,7 +11,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package discovery
|
||||
package file
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@ -41,7 +41,7 @@ func testFileSD(t *testing.T, ext string) {
|
||||
conf.RefreshInterval = model.Duration(1 * time.Hour)
|
||||
|
||||
var (
|
||||
fsd = NewFileDiscovery(&conf)
|
||||
fsd = NewDiscovery(&conf)
|
||||
ch = make(chan []*config.TargetGroup)
|
||||
ctx, cancel = context.WithCancel(context.Background())
|
||||
)
|
@ -11,7 +11,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package discovery
|
||||
package gce
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@ -52,15 +52,13 @@ const (
|
||||
var (
|
||||
gceSDRefreshFailuresCount = prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Name: "sd_gce_refresh_failures_total",
|
||||
Help: "The number of GCE-SD refresh failures.",
|
||||
Name: "prometheus_sd_gce_refresh_failures_total",
|
||||
Help: "The number of GCE-SD refresh failures.",
|
||||
})
|
||||
gceSDRefreshDuration = prometheus.NewSummary(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Name: "sd_gce_refresh_duration",
|
||||
Help: "The duration of a GCE-SD refresh in seconds.",
|
||||
Name: "prometheus_sd_gce_refresh_duration",
|
||||
Help: "The duration of a GCE-SD refresh in seconds.",
|
||||
})
|
||||
)
|
||||
|
||||
@ -84,7 +82,7 @@ type GCEDiscovery struct {
|
||||
}
|
||||
|
||||
// NewGCEDiscovery returns a new GCEDiscovery which periodically refreshes its targets.
|
||||
func NewGCEDiscovery(conf *config.GCESDConfig) (*GCEDiscovery, error) {
|
||||
func NewDiscovery(conf *config.GCESDConfig) (*GCEDiscovery, error) {
|
||||
gd := &GCEDiscovery{
|
||||
project: conf.Project,
|
||||
zone: conf.Zone,
|
@ -11,7 +11,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package discovery
|
||||
package zookeeper
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
@ -42,17 +42,17 @@ type ZookeeperDiscovery struct {
|
||||
|
||||
// NewNerveDiscovery returns a new NerveDiscovery for the given config.
|
||||
func NewNerveDiscovery(conf *config.NerveSDConfig) *ZookeeperDiscovery {
|
||||
return NewZookeeperDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseNerveMember)
|
||||
return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseNerveMember)
|
||||
}
|
||||
|
||||
// NewServersetDiscovery returns a new ServersetDiscovery for the given config.
|
||||
func NewServersetDiscovery(conf *config.ServersetSDConfig) *ZookeeperDiscovery {
|
||||
return NewZookeeperDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseServersetMember)
|
||||
return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseServersetMember)
|
||||
}
|
||||
|
||||
// NewZookeeperDiscovery returns a new discovery along Zookeeper parses with
|
||||
// NewDiscovery returns a new discovery along Zookeeper parses with
|
||||
// the given parse function.
|
||||
func NewZookeeperDiscovery(
|
||||
func NewDiscovery(
|
||||
srvs []string,
|
||||
timeout time.Duration,
|
||||
paths []string,
|
@ -1,43 +0,0 @@
|
||||
// Copyright 2016 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package discovery
|
||||
|
||||
import (
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/retrieval/discovery/consul"
|
||||
"github.com/prometheus/prometheus/retrieval/discovery/dns"
|
||||
"github.com/prometheus/prometheus/retrieval/discovery/kubernetes"
|
||||
"github.com/prometheus/prometheus/retrieval/discovery/marathon"
|
||||
)
|
||||
|
||||
// NewConsul creates a new Consul based Discovery.
|
||||
func NewConsul(cfg *config.ConsulSDConfig) (*consul.Discovery, error) {
|
||||
return consul.NewDiscovery(cfg)
|
||||
}
|
||||
|
||||
// NewKubernetesDiscovery creates a Kubernetes service discovery based on the passed-in configuration.
|
||||
func NewKubernetesDiscovery(conf *config.KubernetesSDConfig) (*kubernetes.Kubernetes, error) {
|
||||
return kubernetes.New(log.Base(), conf)
|
||||
}
|
||||
|
||||
// NewMarathon creates a new Marathon based discovery.
|
||||
func NewMarathon(conf *config.MarathonSDConfig) (*marathon.Discovery, error) {
|
||||
return marathon.NewDiscovery(conf)
|
||||
}
|
||||
|
||||
// NewDNS creates a new DNS based discovery.
|
||||
func NewDNS(conf *config.DNSSDConfig) *dns.Discovery {
|
||||
return dns.NewDiscovery(conf)
|
||||
}
|
@ -36,56 +36,46 @@ const (
|
||||
scrapeHealthMetricName = "up"
|
||||
scrapeDurationMetricName = "scrape_duration_seconds"
|
||||
scrapeSamplesMetricName = "scrape_samples_scraped"
|
||||
|
||||
// Constants for instrumentation.
|
||||
namespace = "prometheus"
|
||||
interval = "interval"
|
||||
scrapeJob = "scrape_job"
|
||||
)
|
||||
|
||||
var (
|
||||
targetIntervalLength = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Name: "target_interval_length_seconds",
|
||||
Name: "prometheus_target_interval_length_seconds",
|
||||
Help: "Actual intervals between scrapes.",
|
||||
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
|
||||
},
|
||||
[]string{interval},
|
||||
[]string{"interval"},
|
||||
)
|
||||
targetSkippedScrapes = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Name: "target_skipped_scrapes_total",
|
||||
Help: "Total number of scrapes that were skipped because the metric storage was throttled.",
|
||||
Name: "prometheus_target_skipped_scrapes_total",
|
||||
Help: "Total number of scrapes that were skipped because the metric storage was throttled.",
|
||||
},
|
||||
[]string{interval},
|
||||
[]string{"interval"},
|
||||
)
|
||||
targetReloadIntervalLength = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Name: "target_reload_length_seconds",
|
||||
Name: "prometheus_target_reload_length_seconds",
|
||||
Help: "Actual interval to reload the scrape pool with a given configuration.",
|
||||
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
|
||||
},
|
||||
[]string{interval},
|
||||
[]string{"interval"},
|
||||
)
|
||||
targetSyncIntervalLength = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Name: "target_sync_length_seconds",
|
||||
Name: "prometheus_target_sync_length_seconds",
|
||||
Help: "Actual interval to sync the scrape pool.",
|
||||
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
|
||||
},
|
||||
[]string{scrapeJob},
|
||||
[]string{"scrape_job"},
|
||||
)
|
||||
targetScrapePoolSyncsCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Name: "target_scrape_pool_sync_total",
|
||||
Help: "Total number of syncs that were executed on a scrape pool.",
|
||||
Name: "prometheus_target_scrape_pool_sync_total",
|
||||
Help: "Total number of syncs that were executed on a scrape pool.",
|
||||
},
|
||||
[]string{scrapeJob},
|
||||
[]string{"scrape_job"},
|
||||
)
|
||||
)
|
||||
|
||||
@ -115,7 +105,7 @@ type scrapePool struct {
|
||||
newLoop func(context.Context, scraper, storage.SampleAppender, storage.SampleAppender) loop
|
||||
}
|
||||
|
||||
func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool {
|
||||
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool {
|
||||
client, err := NewHTTPClient(cfg)
|
||||
if err != nil {
|
||||
// Any errors that could occur here should be caught during config validation.
|
||||
@ -124,6 +114,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrape
|
||||
return &scrapePool{
|
||||
appender: app,
|
||||
config: cfg,
|
||||
ctx: ctx,
|
||||
client: client,
|
||||
targets: map[uint64]*Target{},
|
||||
loops: map[uint64]loop{},
|
||||
@ -131,13 +122,6 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrape
|
||||
}
|
||||
}
|
||||
|
||||
func (sp *scrapePool) init(ctx context.Context) {
|
||||
sp.mtx.Lock()
|
||||
defer sp.mtx.Unlock()
|
||||
|
||||
sp.ctx = ctx
|
||||
}
|
||||
|
||||
// stop terminates all scrape loops and returns after they all terminated.
|
||||
func (sp *scrapePool) stop() {
|
||||
var wg sync.WaitGroup
|
||||
@ -165,6 +149,7 @@ func (sp *scrapePool) stop() {
|
||||
// This method returns after all scrape loops that were stopped have fully terminated.
|
||||
func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
|
||||
start := time.Now()
|
||||
|
||||
sp.mtx.Lock()
|
||||
defer sp.mtx.Unlock()
|
||||
|
||||
@ -206,11 +191,32 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
|
||||
)
|
||||
}
|
||||
|
||||
// Sync converts target groups into actual scrape targets and synchronizes
|
||||
// the currently running scraper with the resulting set.
|
||||
func (sp *scrapePool) Sync(tgs []*config.TargetGroup) {
|
||||
start := time.Now()
|
||||
|
||||
var all []*Target
|
||||
for _, tg := range tgs {
|
||||
targets, err := targetsFromGroup(tg, sp.config)
|
||||
if err != nil {
|
||||
log.With("err", err).Error("creating targets failed")
|
||||
continue
|
||||
}
|
||||
all = append(all, targets...)
|
||||
}
|
||||
sp.sync(all)
|
||||
|
||||
targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
|
||||
time.Since(start).Seconds(),
|
||||
)
|
||||
targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
|
||||
}
|
||||
|
||||
// sync takes a list of potentially duplicated targets, deduplicates them, starts
|
||||
// scrape loops for new targets, and stops scrape loops for disappeared targets.
|
||||
// It returns after all stopped scrape loops terminated.
|
||||
func (sp *scrapePool) sync(targets []*Target) {
|
||||
start := time.Now()
|
||||
sp.mtx.Lock()
|
||||
defer sp.mtx.Unlock()
|
||||
|
||||
@ -256,10 +262,6 @@ func (sp *scrapePool) sync(targets []*Target) {
|
||||
// may be active and tries to insert. The old scraper that didn't terminate yet could still
|
||||
// be inserting a previous sample set.
|
||||
wg.Wait()
|
||||
targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
|
||||
time.Since(start).Seconds(),
|
||||
)
|
||||
targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
|
||||
}
|
||||
|
||||
// sampleAppender returns an appender for ingested samples from the target.
|
||||
|
@ -36,7 +36,7 @@ func TestNewScrapePool(t *testing.T) {
|
||||
var (
|
||||
app = &nopAppender{}
|
||||
cfg = &config.ScrapeConfig{}
|
||||
sp = newScrapePool(cfg, app)
|
||||
sp = newScrapePool(context.Background(), cfg, app)
|
||||
)
|
||||
|
||||
if a, ok := sp.appender.(*nopAppender); !ok || a != app {
|
||||
@ -231,7 +231,7 @@ func TestScrapePoolReportAppender(t *testing.T) {
|
||||
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
|
||||
app := &nopAppender{}
|
||||
|
||||
sp := newScrapePool(cfg, app)
|
||||
sp := newScrapePool(context.Background(), cfg, app)
|
||||
|
||||
cfg.HonorLabels = false
|
||||
wrapped := sp.reportAppender(target)
|
||||
@ -266,7 +266,7 @@ func TestScrapePoolSampleAppender(t *testing.T) {
|
||||
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
|
||||
app := &nopAppender{}
|
||||
|
||||
sp := newScrapePool(cfg, app)
|
||||
sp := newScrapePool(context.Background(), cfg, app)
|
||||
|
||||
cfg.HonorLabels = false
|
||||
wrapped := sp.sampleAppender(target)
|
||||
|
@ -17,6 +17,7 @@ import (
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
@ -276,3 +277,103 @@ func (app relabelAppender) Append(s *model.Sample) error {
|
||||
|
||||
return app.SampleAppender.Append(s)
|
||||
}
|
||||
|
||||
// populateLabels builds a label set from the given label set and scrape configuration.
|
||||
// It returns a label set before relabeling was applied as the second return value.
|
||||
// Returns a nil label set if the target is dropped during relabeling.
|
||||
func populateLabels(lset model.LabelSet, cfg *config.ScrapeConfig) (res, orig model.LabelSet, err error) {
|
||||
if _, ok := lset[model.AddressLabel]; !ok {
|
||||
return nil, nil, fmt.Errorf("no address")
|
||||
}
|
||||
// Copy labels into the labelset for the target if they are not
|
||||
// set already. Apply the labelsets in order of decreasing precedence.
|
||||
scrapeLabels := model.LabelSet{
|
||||
model.SchemeLabel: model.LabelValue(cfg.Scheme),
|
||||
model.MetricsPathLabel: model.LabelValue(cfg.MetricsPath),
|
||||
model.JobLabel: model.LabelValue(cfg.JobName),
|
||||
}
|
||||
for ln, lv := range scrapeLabels {
|
||||
if _, ok := lset[ln]; !ok {
|
||||
lset[ln] = lv
|
||||
}
|
||||
}
|
||||
// Encode scrape query parameters as labels.
|
||||
for k, v := range cfg.Params {
|
||||
if len(v) > 0 {
|
||||
lset[model.LabelName(model.ParamLabelPrefix+k)] = model.LabelValue(v[0])
|
||||
}
|
||||
}
|
||||
|
||||
preRelabelLabels := lset
|
||||
lset = relabel.Process(lset, cfg.RelabelConfigs...)
|
||||
|
||||
// Check if the target was dropped.
|
||||
if lset == nil {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
// addPort checks whether we should add a default port to the address.
|
||||
// If the address is not valid, we don't append a port either.
|
||||
addPort := func(s string) bool {
|
||||
// If we can split, a port exists and we don't have to add one.
|
||||
if _, _, err := net.SplitHostPort(s); err == nil {
|
||||
return false
|
||||
}
|
||||
// If adding a port makes it valid, the previous error
|
||||
// was not due to an invalid address and we can append a port.
|
||||
_, _, err := net.SplitHostPort(s + ":1234")
|
||||
return err == nil
|
||||
}
|
||||
// If it's an address with no trailing port, infer it based on the used scheme.
|
||||
if addr := string(lset[model.AddressLabel]); addPort(addr) {
|
||||
// Addresses reaching this point are already wrapped in [] if necessary.
|
||||
switch lset[model.SchemeLabel] {
|
||||
case "http", "":
|
||||
addr = addr + ":80"
|
||||
case "https":
|
||||
addr = addr + ":443"
|
||||
default:
|
||||
return nil, nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme)
|
||||
}
|
||||
lset[model.AddressLabel] = model.LabelValue(addr)
|
||||
}
|
||||
if err := config.CheckTargetAddress(lset[model.AddressLabel]); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Meta labels are deleted after relabelling. Other internal labels propagate to
|
||||
// the target which decides whether they will be part of their label set.
|
||||
for ln := range lset {
|
||||
if strings.HasPrefix(string(ln), model.MetaLabelPrefix) {
|
||||
delete(lset, ln)
|
||||
}
|
||||
}
|
||||
|
||||
// Default the instance label to the target address.
|
||||
if _, ok := lset[model.InstanceLabel]; !ok {
|
||||
lset[model.InstanceLabel] = lset[model.AddressLabel]
|
||||
}
|
||||
return lset, preRelabelLabels, nil
|
||||
}
|
||||
|
||||
// targetsFromGroup builds targets based on the given TargetGroup and config.
|
||||
func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) {
|
||||
targets := make([]*Target, 0, len(tg.Targets))
|
||||
|
||||
for i, lset := range tg.Targets {
|
||||
// Combine target labels with target group labels.
|
||||
for ln, lv := range tg.Labels {
|
||||
if _, ok := lset[ln]; !ok {
|
||||
lset[ln] = lv
|
||||
}
|
||||
}
|
||||
labels, origLabels, err := populateLabels(lset, cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("instance %d in group %s: %s", i, tg, err)
|
||||
}
|
||||
if labels != nil {
|
||||
targets = append(targets, NewTarget(labels, origLabels, cfg.Params))
|
||||
}
|
||||
}
|
||||
return targets, nil
|
||||
}
|
||||
|
@ -14,39 +14,18 @@
|
||||
package retrieval
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/prometheus/common/model"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/relabel"
|
||||
"github.com/prometheus/prometheus/retrieval/discovery"
|
||||
"github.com/prometheus/prometheus/discovery"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
||||
// A TargetProvider provides information about target groups. It maintains a set
|
||||
// of sources from which TargetGroups can originate. Whenever a target provider
|
||||
// detects a potential change, it sends the TargetGroup through its provided channel.
|
||||
//
|
||||
// The TargetProvider does not have to guarantee that an actual change happened.
|
||||
// It does guarantee that it sends the new TargetGroup whenever a change happens.
|
||||
//
|
||||
// Providers must initially send all known target groups as soon as it can.
|
||||
type TargetProvider interface {
|
||||
// Run hands a channel to the target provider through which it can send
|
||||
// updated target groups. The channel must be closed by the target provider
|
||||
// if no more updates will be sent.
|
||||
// On receiving from done Run must return.
|
||||
Run(ctx context.Context, up chan<- []*config.TargetGroup)
|
||||
}
|
||||
|
||||
// TargetManager maintains a set of targets, starts and stops their scraping and
|
||||
// creates the new targets based on the target groups it receives from various
|
||||
// target providers.
|
||||
@ -63,6 +42,14 @@ type TargetManager struct {
|
||||
targetSets map[string]*targetSet
|
||||
}
|
||||
|
||||
type targetSet struct {
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
|
||||
ts *discovery.TargetSet
|
||||
sp *scrapePool
|
||||
}
|
||||
|
||||
// NewTargetManager creates a new TargetManager.
|
||||
func NewTargetManager(app storage.SampleAppender) *TargetManager {
|
||||
return &TargetManager{
|
||||
@ -111,23 +98,33 @@ func (tm *TargetManager) reload() {
|
||||
|
||||
ts, ok := tm.targetSets[scfg.JobName]
|
||||
if !ok {
|
||||
ts = newTargetSet(scfg, tm.appender)
|
||||
ctx, cancel := context.WithCancel(tm.ctx)
|
||||
ts = &targetSet{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
sp: newScrapePool(ctx, scfg, tm.appender),
|
||||
}
|
||||
ts.ts = discovery.NewTargetSet(ts.sp)
|
||||
|
||||
tm.targetSets[scfg.JobName] = ts
|
||||
|
||||
tm.wg.Add(1)
|
||||
|
||||
go func(ts *targetSet) {
|
||||
ts.runScraping(tm.ctx)
|
||||
// Run target set, which blocks until its context is canceled.
|
||||
// Gracefully shut down pending scrapes in the scrape pool afterwards.
|
||||
ts.ts.Run(ctx)
|
||||
ts.sp.stop()
|
||||
tm.wg.Done()
|
||||
}(ts)
|
||||
} else {
|
||||
ts.reload(scfg)
|
||||
ts.sp.reload(scfg)
|
||||
}
|
||||
ts.runProviders(tm.ctx, providersFromConfig(scfg))
|
||||
ts.ts.UpdateProviders(discovery.ProvidersFromConfig(scfg))
|
||||
}
|
||||
|
||||
// Remove old target sets. Waiting for stopping is already guaranteed
|
||||
// by the goroutine that started the target set.
|
||||
// Remove old target sets. Waiting for scrape pools to complete pending
|
||||
// scrape inserts is already guaranteed by the goroutine that started the target set.
|
||||
for name, ts := range tm.targetSets {
|
||||
if _, ok := jobs[name]; !ok {
|
||||
ts.cancel()
|
||||
@ -145,14 +142,14 @@ func (tm *TargetManager) Pools() map[string]Targets {
|
||||
|
||||
// TODO(fabxc): this is just a hack to maintain compatibility for now.
|
||||
for _, ps := range tm.targetSets {
|
||||
ps.scrapePool.mtx.RLock()
|
||||
ps.sp.mtx.RLock()
|
||||
|
||||
for _, t := range ps.scrapePool.targets {
|
||||
for _, t := range ps.sp.targets {
|
||||
job := string(t.Labels()[model.JobLabel])
|
||||
pools[job] = append(pools[job], t)
|
||||
}
|
||||
|
||||
ps.scrapePool.mtx.RUnlock()
|
||||
ps.sp.mtx.RUnlock()
|
||||
}
|
||||
for _, targets := range pools {
|
||||
sort.Sort(targets)
|
||||
@ -173,385 +170,3 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// targetSet holds several TargetProviders for which the same scrape configuration
|
||||
// is used. It maintains target groups from all given providers and sync them
|
||||
// to a scrape pool.
|
||||
type targetSet struct {
|
||||
mtx sync.RWMutex
|
||||
|
||||
// Sets of targets by a source string that is unique across target providers.
|
||||
tgroups map[string][]*Target
|
||||
|
||||
scrapePool *scrapePool
|
||||
config *config.ScrapeConfig
|
||||
|
||||
syncCh chan struct{}
|
||||
cancelScraping func()
|
||||
cancelProviders func()
|
||||
}
|
||||
|
||||
func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet {
|
||||
ts := &targetSet{
|
||||
scrapePool: newScrapePool(cfg, app),
|
||||
syncCh: make(chan struct{}, 1),
|
||||
config: cfg,
|
||||
}
|
||||
return ts
|
||||
}
|
||||
|
||||
func (ts *targetSet) cancel() {
|
||||
ts.mtx.RLock()
|
||||
defer ts.mtx.RUnlock()
|
||||
|
||||
if ts.cancelScraping != nil {
|
||||
ts.cancelScraping()
|
||||
}
|
||||
if ts.cancelProviders != nil {
|
||||
ts.cancelProviders()
|
||||
}
|
||||
}
|
||||
|
||||
func (ts *targetSet) reload(cfg *config.ScrapeConfig) {
|
||||
ts.mtx.Lock()
|
||||
ts.config = cfg
|
||||
ts.mtx.Unlock()
|
||||
|
||||
ts.scrapePool.reload(cfg)
|
||||
}
|
||||
|
||||
func (ts *targetSet) runScraping(ctx context.Context) {
|
||||
ctx, ts.cancelScraping = context.WithCancel(ctx)
|
||||
|
||||
ts.scrapePool.init(ctx)
|
||||
|
||||
Loop:
|
||||
for {
|
||||
// Throttle syncing to once per five seconds.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break Loop
|
||||
case <-time.After(5 * time.Second):
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break Loop
|
||||
case <-ts.syncCh:
|
||||
ts.mtx.RLock()
|
||||
ts.sync()
|
||||
ts.mtx.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
// We want to wait for all pending target scrapes to complete though to ensure there'll
|
||||
// be no more storage writes after this point.
|
||||
ts.scrapePool.stop()
|
||||
}
|
||||
|
||||
func (ts *targetSet) sync() {
|
||||
var all []*Target
|
||||
for _, targets := range ts.tgroups {
|
||||
all = append(all, targets...)
|
||||
}
|
||||
ts.scrapePool.sync(all)
|
||||
}
|
||||
|
||||
func (ts *targetSet) runProviders(ctx context.Context, providers map[string]TargetProvider) {
|
||||
// Lock for the entire time. This may mean up to 5 seconds until the full initial set
|
||||
// is retrieved and applied.
|
||||
// We could release earlier with some tweaks, but this is easier to reason about.
|
||||
ts.mtx.Lock()
|
||||
defer ts.mtx.Unlock()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
if ts.cancelProviders != nil {
|
||||
ts.cancelProviders()
|
||||
}
|
||||
ctx, ts.cancelProviders = context.WithCancel(ctx)
|
||||
|
||||
// (Re-)create a fresh tgroups map to not keep stale targets around. We
|
||||
// will retrieve all targets below anyway, so cleaning up everything is
|
||||
// safe and doesn't inflict any additional cost.
|
||||
ts.tgroups = map[string][]*Target{}
|
||||
|
||||
for name, prov := range providers {
|
||||
wg.Add(1)
|
||||
|
||||
updates := make(chan []*config.TargetGroup)
|
||||
|
||||
go func(name string, prov TargetProvider) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case initial, ok := <-updates:
|
||||
// Handle the case that a target provider exits and closes the channel
|
||||
// before the context is done.
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
// First set of all targets the provider knows.
|
||||
for _, tgroup := range initial {
|
||||
if tgroup == nil {
|
||||
continue
|
||||
}
|
||||
targets, err := targetsFromGroup(tgroup, ts.config)
|
||||
if err != nil {
|
||||
log.With("target_group", tgroup).Errorf("Target update failed: %s", err)
|
||||
continue
|
||||
}
|
||||
ts.tgroups[name+"/"+tgroup.Source] = targets
|
||||
}
|
||||
case <-time.After(5 * time.Second):
|
||||
// Initial set didn't arrive. Act as if it was empty
|
||||
// and wait for updates later on.
|
||||
}
|
||||
|
||||
wg.Done()
|
||||
|
||||
// Start listening for further updates.
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case tgs, ok := <-updates:
|
||||
// Handle the case that a target provider exits and closes the channel
|
||||
// before the context is done.
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
for _, tg := range tgs {
|
||||
if err := ts.update(name, tg); err != nil {
|
||||
log.With("target_group", tg).Errorf("Target update failed: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}(name, prov)
|
||||
|
||||
go prov.Run(ctx, updates)
|
||||
}
|
||||
|
||||
// We wait for a full initial set of target groups before releasing the mutex
|
||||
// to ensure the initial sync is complete and there are no races with subsequent updates.
|
||||
wg.Wait()
|
||||
// Just signal that there are initial sets to sync now. Actual syncing must only
|
||||
// happen in the runScraping loop.
|
||||
select {
|
||||
case ts.syncCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// update handles a target group update from a target provider identified by the name.
|
||||
func (ts *targetSet) update(name string, tgroup *config.TargetGroup) error {
|
||||
if tgroup == nil {
|
||||
return nil
|
||||
}
|
||||
targets, err := targetsFromGroup(tgroup, ts.config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ts.mtx.Lock()
|
||||
defer ts.mtx.Unlock()
|
||||
|
||||
ts.tgroups[name+"/"+tgroup.Source] = targets
|
||||
|
||||
select {
|
||||
case ts.syncCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// providersFromConfig returns all TargetProviders configured in cfg.
|
||||
func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider {
|
||||
providers := map[string]TargetProvider{}
|
||||
|
||||
app := func(mech string, i int, tp TargetProvider) {
|
||||
providers[fmt.Sprintf("%s/%d", mech, i)] = tp
|
||||
}
|
||||
|
||||
for i, c := range cfg.DNSSDConfigs {
|
||||
app("dns", i, discovery.NewDNS(c))
|
||||
}
|
||||
for i, c := range cfg.FileSDConfigs {
|
||||
app("file", i, discovery.NewFileDiscovery(c))
|
||||
}
|
||||
for i, c := range cfg.ConsulSDConfigs {
|
||||
k, err := discovery.NewConsul(c)
|
||||
if err != nil {
|
||||
log.Errorf("Cannot create Consul discovery: %s", err)
|
||||
continue
|
||||
}
|
||||
app("consul", i, k)
|
||||
}
|
||||
for i, c := range cfg.MarathonSDConfigs {
|
||||
m, err := discovery.NewMarathon(c)
|
||||
if err != nil {
|
||||
log.Errorf("Cannot create Marathon discovery: %s", err)
|
||||
continue
|
||||
}
|
||||
app("marathon", i, m)
|
||||
}
|
||||
for i, c := range cfg.KubernetesSDConfigs {
|
||||
k, err := discovery.NewKubernetesDiscovery(c)
|
||||
if err != nil {
|
||||
log.Errorf("Cannot create Kubernetes discovery: %s", err)
|
||||
continue
|
||||
}
|
||||
app("kubernetes", i, k)
|
||||
}
|
||||
for i, c := range cfg.ServersetSDConfigs {
|
||||
app("serverset", i, discovery.NewServersetDiscovery(c))
|
||||
}
|
||||
for i, c := range cfg.NerveSDConfigs {
|
||||
app("nerve", i, discovery.NewNerveDiscovery(c))
|
||||
}
|
||||
for i, c := range cfg.EC2SDConfigs {
|
||||
app("ec2", i, discovery.NewEC2Discovery(c))
|
||||
}
|
||||
for i, c := range cfg.GCESDConfigs {
|
||||
gced, err := discovery.NewGCEDiscovery(c)
|
||||
if err != nil {
|
||||
log.Errorf("Cannot initialize GCE discovery: %s", err)
|
||||
continue
|
||||
}
|
||||
app("gce", i, gced)
|
||||
}
|
||||
for i, c := range cfg.AzureSDConfigs {
|
||||
app("azure", i, discovery.NewAzureDiscovery(c))
|
||||
}
|
||||
if len(cfg.StaticConfigs) > 0 {
|
||||
app("static", 0, NewStaticProvider(cfg.StaticConfigs))
|
||||
}
|
||||
|
||||
return providers
|
||||
}
|
||||
|
||||
// populateLabels builds a label set from the given label set and scrape configuration.
|
||||
// It returns a label set before relabeling was applied as the second return value.
|
||||
// Returns a nil label set if the target is dropped during relabeling.
|
||||
func populateLabels(lset model.LabelSet, cfg *config.ScrapeConfig) (res, orig model.LabelSet, err error) {
|
||||
if _, ok := lset[model.AddressLabel]; !ok {
|
||||
return nil, nil, fmt.Errorf("no address")
|
||||
}
|
||||
// Copy labels into the labelset for the target if they are not
|
||||
// set already. Apply the labelsets in order of decreasing precedence.
|
||||
scrapeLabels := model.LabelSet{
|
||||
model.SchemeLabel: model.LabelValue(cfg.Scheme),
|
||||
model.MetricsPathLabel: model.LabelValue(cfg.MetricsPath),
|
||||
model.JobLabel: model.LabelValue(cfg.JobName),
|
||||
}
|
||||
for ln, lv := range scrapeLabels {
|
||||
if _, ok := lset[ln]; !ok {
|
||||
lset[ln] = lv
|
||||
}
|
||||
}
|
||||
// Encode scrape query parameters as labels.
|
||||
for k, v := range cfg.Params {
|
||||
if len(v) > 0 {
|
||||
lset[model.LabelName(model.ParamLabelPrefix+k)] = model.LabelValue(v[0])
|
||||
}
|
||||
}
|
||||
|
||||
preRelabelLabels := lset
|
||||
lset = relabel.Process(lset, cfg.RelabelConfigs...)
|
||||
|
||||
// Check if the target was dropped.
|
||||
if lset == nil {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
// addPort checks whether we should add a default port to the address.
|
||||
// If the address is not valid, we don't append a port either.
|
||||
addPort := func(s string) bool {
|
||||
// If we can split, a port exists and we don't have to add one.
|
||||
if _, _, err := net.SplitHostPort(s); err == nil {
|
||||
return false
|
||||
}
|
||||
// If adding a port makes it valid, the previous error
|
||||
// was not due to an invalid address and we can append a port.
|
||||
_, _, err := net.SplitHostPort(s + ":1234")
|
||||
return err == nil
|
||||
}
|
||||
// If it's an address with no trailing port, infer it based on the used scheme.
|
||||
if addr := string(lset[model.AddressLabel]); addPort(addr) {
|
||||
// Addresses reaching this point are already wrapped in [] if necessary.
|
||||
switch lset[model.SchemeLabel] {
|
||||
case "http", "":
|
||||
addr = addr + ":80"
|
||||
case "https":
|
||||
addr = addr + ":443"
|
||||
default:
|
||||
return nil, nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme)
|
||||
}
|
||||
lset[model.AddressLabel] = model.LabelValue(addr)
|
||||
}
|
||||
if err := config.CheckTargetAddress(lset[model.AddressLabel]); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Meta labels are deleted after relabelling. Other internal labels propagate to
|
||||
// the target which decides whether they will be part of their label set.
|
||||
for ln := range lset {
|
||||
if strings.HasPrefix(string(ln), model.MetaLabelPrefix) {
|
||||
delete(lset, ln)
|
||||
}
|
||||
}
|
||||
|
||||
// Default the instance label to the target address.
|
||||
if _, ok := lset[model.InstanceLabel]; !ok {
|
||||
lset[model.InstanceLabel] = lset[model.AddressLabel]
|
||||
}
|
||||
return lset, preRelabelLabels, nil
|
||||
}
|
||||
|
||||
// targetsFromGroup builds targets based on the given TargetGroup and config.
|
||||
func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) {
|
||||
targets := make([]*Target, 0, len(tg.Targets))
|
||||
|
||||
for i, lset := range tg.Targets {
|
||||
// Combine target labels with target group labels.
|
||||
for ln, lv := range tg.Labels {
|
||||
if _, ok := lset[ln]; !ok {
|
||||
lset[ln] = lv
|
||||
}
|
||||
}
|
||||
labels, origLabels, err := populateLabels(lset, cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("instance %d in group %s: %s", i, tg, err)
|
||||
}
|
||||
if labels != nil {
|
||||
targets = append(targets, NewTarget(labels, origLabels, cfg.Params))
|
||||
}
|
||||
}
|
||||
return targets, nil
|
||||
}
|
||||
|
||||
// StaticProvider holds a list of target groups that never change.
|
||||
type StaticProvider struct {
|
||||
TargetGroups []*config.TargetGroup
|
||||
}
|
||||
|
||||
// NewStaticProvider returns a StaticProvider configured with the given
|
||||
// target groups.
|
||||
func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider {
|
||||
for i, tg := range groups {
|
||||
tg.Source = fmt.Sprintf("%d", i)
|
||||
}
|
||||
return &StaticProvider{groups}
|
||||
}
|
||||
|
||||
// Run implements the TargetProvider interface.
|
||||
func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||
// We still have to consider that the consumer exits right away in which case
|
||||
// the context will be canceled.
|
||||
select {
|
||||
case ch <- sd.TargetGroups:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
close(ch)
|
||||
}
|
||||
|
@ -17,64 +17,10 @@ import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"gopkg.in/yaml.v2"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/storage/local"
|
||||
)
|
||||
|
||||
func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) {
|
||||
|
||||
verifyPresence := func(tgroups map[string][]*Target, name string, present bool) {
|
||||
if _, ok := tgroups[name]; ok != present {
|
||||
msg := ""
|
||||
if !present {
|
||||
msg = "not "
|
||||
}
|
||||
t.Fatalf("'%s' should %sbe present in TargetSet.tgroups: %s", name, msg, tgroups)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
scrapeConfig := &config.ScrapeConfig{}
|
||||
|
||||
sOne := `
|
||||
job_name: "foo"
|
||||
static_configs:
|
||||
- targets: ["foo:9090"]
|
||||
- targets: ["bar:9090"]
|
||||
`
|
||||
if err := yaml.Unmarshal([]byte(sOne), scrapeConfig); err != nil {
|
||||
t.Fatalf("Unable to load YAML config sOne: %s", err)
|
||||
}
|
||||
|
||||
// Not properly setting it up, but that seems okay
|
||||
mss := &local.MemorySeriesStorage{}
|
||||
|
||||
ts := newTargetSet(scrapeConfig, mss)
|
||||
|
||||
ts.runProviders(context.Background(), providersFromConfig(scrapeConfig))
|
||||
|
||||
verifyPresence(ts.tgroups, "static/0/0", true)
|
||||
verifyPresence(ts.tgroups, "static/0/1", true)
|
||||
|
||||
sTwo := `
|
||||
job_name: "foo"
|
||||
static_configs:
|
||||
- targets: ["foo:9090"]
|
||||
`
|
||||
if err := yaml.Unmarshal([]byte(sTwo), scrapeConfig); err != nil {
|
||||
t.Fatalf("Unable to load YAML config sTwo: %s", err)
|
||||
}
|
||||
|
||||
ts.runProviders(context.Background(), providersFromConfig(scrapeConfig))
|
||||
|
||||
verifyPresence(ts.tgroups, "static/0/0", true)
|
||||
verifyPresence(ts.tgroups, "static/0/1", false)
|
||||
}
|
||||
|
||||
func mustNewRegexp(s string) config.Regexp {
|
||||
re, err := config.NewRegexp(s)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user