mirror of
https://github.com/prometheus/prometheus
synced 2024-12-26 08:33:06 +00:00
Merge pull request #2667 from goller/go-discovery-logger
Add logger injection into discovery services
This commit is contained in:
commit
a391156dfb
@ -117,8 +117,8 @@ func Main() int {
|
||||
}
|
||||
|
||||
var (
|
||||
notifier = notifier.New(&cfg.notifier)
|
||||
targetManager = retrieval.NewTargetManager(sampleAppender)
|
||||
notifier = notifier.New(&cfg.notifier, log.Base())
|
||||
targetManager = retrieval.NewTargetManager(sampleAppender, log.Base())
|
||||
queryEngine = promql.NewEngine(queryable, &cfg.queryEngine)
|
||||
ctx, cancelCtx = context.WithCancel(context.Background())
|
||||
)
|
||||
|
@ -66,14 +66,16 @@ type Discovery struct {
|
||||
cfg *config.AzureSDConfig
|
||||
interval time.Duration
|
||||
port int
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// NewDiscovery returns a new AzureDiscovery which periodically refreshes its targets.
|
||||
func NewDiscovery(cfg *config.AzureSDConfig) *Discovery {
|
||||
func NewDiscovery(cfg *config.AzureSDConfig, logger log.Logger) *Discovery {
|
||||
return &Discovery{
|
||||
cfg: cfg,
|
||||
interval: time.Duration(cfg.RefreshInterval),
|
||||
port: cfg.Port,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
@ -91,7 +93,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||
|
||||
tg, err := d.refresh()
|
||||
if err != nil {
|
||||
log.Errorf("unable to refresh during Azure discovery: %s", err)
|
||||
d.logger.Errorf("unable to refresh during Azure discovery: %s", err)
|
||||
} else {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -141,13 +143,13 @@ type azureResource struct {
|
||||
}
|
||||
|
||||
// Create a new azureResource object from an ID string.
|
||||
func newAzureResourceFromID(id string) (azureResource, error) {
|
||||
func newAzureResourceFromID(id string, logger log.Logger) (azureResource, error) {
|
||||
// Resource IDs have the following format.
|
||||
// /subscriptions/SUBSCRIPTION_ID/resourceGroups/RESOURCE_GROUP/providers/PROVIDER/TYPE/NAME
|
||||
s := strings.Split(id, "/")
|
||||
if len(s) != 9 {
|
||||
err := fmt.Errorf("invalid ID '%s'. Refusing to create azureResource", id)
|
||||
log.Error(err)
|
||||
logger.Error(err)
|
||||
return azureResource{}, err
|
||||
}
|
||||
return azureResource{
|
||||
@ -185,7 +187,7 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) {
|
||||
}
|
||||
machines = append(machines, *result.Value...)
|
||||
}
|
||||
log.Debugf("Found %d virtual machines during Azure discovery.", len(machines))
|
||||
d.logger.Debugf("Found %d virtual machines during Azure discovery.", len(machines))
|
||||
|
||||
// We have the slice of machines. Now turn them into targets.
|
||||
// Doing them in go routines because the network interface calls are slow.
|
||||
@ -197,7 +199,7 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) {
|
||||
ch := make(chan target, len(machines))
|
||||
for i, vm := range machines {
|
||||
go func(i int, vm compute.VirtualMachine) {
|
||||
r, err := newAzureResourceFromID(*vm.ID)
|
||||
r, err := newAzureResourceFromID(*vm.ID, d.logger)
|
||||
if err != nil {
|
||||
ch <- target{labelSet: nil, err: err}
|
||||
return
|
||||
@ -219,14 +221,14 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) {
|
||||
|
||||
// Get the IP address information via separate call to the network provider.
|
||||
for _, nic := range *vm.Properties.NetworkProfile.NetworkInterfaces {
|
||||
r, err := newAzureResourceFromID(*nic.ID)
|
||||
r, err := newAzureResourceFromID(*nic.ID, d.logger)
|
||||
if err != nil {
|
||||
ch <- target{labelSet: nil, err: err}
|
||||
return
|
||||
}
|
||||
networkInterface, err := client.nic.Get(r.ResourceGroup, r.Name, "")
|
||||
if err != nil {
|
||||
log.Errorf("Unable to get network interface %s: %s", r.Name, err)
|
||||
d.logger.Errorf("Unable to get network interface %s: %s", r.Name, err)
|
||||
ch <- target{labelSet: nil, err: err}
|
||||
// Get out of this routine because we cannot continue without a network interface.
|
||||
return
|
||||
@ -237,7 +239,7 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) {
|
||||
// yet support this. On deallocated machines, this value happens to be nil so it
|
||||
// is a cheap and easy way to determine if a machine is allocated or not.
|
||||
if networkInterface.Properties.Primary == nil {
|
||||
log.Debugf("Virtual machine %s is deallocated. Skipping during Azure SD.", *vm.Name)
|
||||
d.logger.Debugf("Virtual machine %s is deallocated. Skipping during Azure SD.", *vm.Name)
|
||||
ch <- target{}
|
||||
return
|
||||
}
|
||||
@ -272,6 +274,6 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) {
|
||||
}
|
||||
}
|
||||
|
||||
log.Debugf("Azure discovery completed.")
|
||||
d.logger.Debugf("Azure discovery completed.")
|
||||
return tg, nil
|
||||
}
|
||||
|
@ -89,10 +89,11 @@ type Discovery struct {
|
||||
clientDatacenter string
|
||||
tagSeparator string
|
||||
watchedServices []string // Set of services which will be discovered.
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// NewDiscovery returns a new Discovery for the given config.
|
||||
func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) {
|
||||
func NewDiscovery(conf *config.ConsulSDConfig, logger log.Logger) (*Discovery, error) {
|
||||
tls, err := httputil.NewTLSConfig(conf.TLSConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -121,6 +122,7 @@ func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) {
|
||||
tagSeparator: conf.TagSeparator,
|
||||
watchedServices: conf.Services,
|
||||
clientDatacenter: clientConf.Datacenter,
|
||||
logger: logger,
|
||||
}
|
||||
return cd, nil
|
||||
}
|
||||
@ -163,7 +165,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("Error refreshing service list: %s", err)
|
||||
d.logger.Errorf("Error refreshing service list: %s", err)
|
||||
rpcFailuresCount.Inc()
|
||||
time.Sleep(retryInterval)
|
||||
continue
|
||||
@ -179,7 +181,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||
if d.clientDatacenter == "" {
|
||||
info, err := d.client.Agent().Self()
|
||||
if err != nil {
|
||||
log.Errorf("Error retrieving datacenter name: %s", err)
|
||||
d.logger.Errorf("Error retrieving datacenter name: %s", err)
|
||||
time.Sleep(retryInterval)
|
||||
continue
|
||||
}
|
||||
@ -203,6 +205,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||
datacenterLabel: model.LabelValue(d.clientDatacenter),
|
||||
},
|
||||
tagSeparator: d.tagSeparator,
|
||||
logger: d.logger,
|
||||
}
|
||||
|
||||
wctx, cancel := context.WithCancel(ctx)
|
||||
@ -235,6 +238,7 @@ type consulService struct {
|
||||
labels model.LabelSet
|
||||
client *consul.Client
|
||||
tagSeparator string
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
func (srv *consulService) watch(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||
@ -258,7 +262,7 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*config.TargetG
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("Error refreshing service %s: %s", srv.name, err)
|
||||
srv.logger.Errorf("Error refreshing service %s: %s", srv.name, err)
|
||||
rpcFailuresCount.Inc()
|
||||
time.Sleep(retryInterval)
|
||||
continue
|
||||
|
@ -16,13 +16,14 @@ package consul
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
)
|
||||
|
||||
func TestConfiguredService(t *testing.T) {
|
||||
conf := &config.ConsulSDConfig{
|
||||
Services: []string{"configuredServiceName"}}
|
||||
consulDiscovery, err := NewDiscovery(conf)
|
||||
consulDiscovery, err := NewDiscovery(conf, log.Base())
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error when initialising discovery %v", err)
|
||||
@ -37,7 +38,7 @@ func TestConfiguredService(t *testing.T) {
|
||||
|
||||
func TestNonConfiguredService(t *testing.T) {
|
||||
conf := &config.ConsulSDConfig{}
|
||||
consulDiscovery, err := NewDiscovery(conf)
|
||||
consulDiscovery, err := NewDiscovery(conf, log.Base())
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error when initialising discovery %v", err)
|
||||
|
@ -50,7 +50,7 @@ type TargetProvider interface {
|
||||
}
|
||||
|
||||
// ProvidersFromConfig returns all TargetProviders configured in cfg.
|
||||
func ProvidersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]TargetProvider {
|
||||
func ProvidersFromConfig(cfg config.ServiceDiscoveryConfig, logger log.Logger) map[string]TargetProvider {
|
||||
providers := map[string]TargetProvider{}
|
||||
|
||||
app := func(mech string, i int, tp TargetProvider) {
|
||||
@ -58,59 +58,59 @@ func ProvidersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]TargetPro
|
||||
}
|
||||
|
||||
for i, c := range cfg.DNSSDConfigs {
|
||||
app("dns", i, dns.NewDiscovery(c))
|
||||
app("dns", i, dns.NewDiscovery(c, logger))
|
||||
}
|
||||
for i, c := range cfg.FileSDConfigs {
|
||||
app("file", i, file.NewDiscovery(c))
|
||||
app("file", i, file.NewDiscovery(c, logger))
|
||||
}
|
||||
for i, c := range cfg.ConsulSDConfigs {
|
||||
k, err := consul.NewDiscovery(c)
|
||||
k, err := consul.NewDiscovery(c, logger)
|
||||
if err != nil {
|
||||
log.Errorf("Cannot create Consul discovery: %s", err)
|
||||
logger.Errorf("Cannot create Consul discovery: %s", err)
|
||||
continue
|
||||
}
|
||||
app("consul", i, k)
|
||||
}
|
||||
for i, c := range cfg.MarathonSDConfigs {
|
||||
m, err := marathon.NewDiscovery(c)
|
||||
m, err := marathon.NewDiscovery(c, logger)
|
||||
if err != nil {
|
||||
log.Errorf("Cannot create Marathon discovery: %s", err)
|
||||
logger.Errorf("Cannot create Marathon discovery: %s", err)
|
||||
continue
|
||||
}
|
||||
app("marathon", i, m)
|
||||
}
|
||||
for i, c := range cfg.KubernetesSDConfigs {
|
||||
k, err := kubernetes.New(log.Base(), c)
|
||||
k, err := kubernetes.New(logger, c)
|
||||
if err != nil {
|
||||
log.Errorf("Cannot create Kubernetes discovery: %s", err)
|
||||
logger.Errorf("Cannot create Kubernetes discovery: %s", err)
|
||||
continue
|
||||
}
|
||||
app("kubernetes", i, k)
|
||||
}
|
||||
for i, c := range cfg.ServersetSDConfigs {
|
||||
app("serverset", i, zookeeper.NewServersetDiscovery(c))
|
||||
app("serverset", i, zookeeper.NewServersetDiscovery(c, logger))
|
||||
}
|
||||
for i, c := range cfg.NerveSDConfigs {
|
||||
app("nerve", i, zookeeper.NewNerveDiscovery(c))
|
||||
app("nerve", i, zookeeper.NewNerveDiscovery(c, logger))
|
||||
}
|
||||
for i, c := range cfg.EC2SDConfigs {
|
||||
app("ec2", i, ec2.NewDiscovery(c))
|
||||
app("ec2", i, ec2.NewDiscovery(c, logger))
|
||||
}
|
||||
for i, c := range cfg.GCESDConfigs {
|
||||
gced, err := gce.NewDiscovery(c)
|
||||
gced, err := gce.NewDiscovery(c, logger)
|
||||
if err != nil {
|
||||
log.Errorf("Cannot initialize GCE discovery: %s", err)
|
||||
logger.Errorf("Cannot initialize GCE discovery: %s", err)
|
||||
continue
|
||||
}
|
||||
app("gce", i, gced)
|
||||
}
|
||||
for i, c := range cfg.AzureSDConfigs {
|
||||
app("azure", i, azure.NewDiscovery(c))
|
||||
app("azure", i, azure.NewDiscovery(c, logger))
|
||||
}
|
||||
for i, c := range cfg.TritonSDConfigs {
|
||||
t, err := triton.New(log.With("sd", "triton"), c)
|
||||
t, err := triton.New(logger.With("sd", "triton"), c)
|
||||
if err != nil {
|
||||
log.Errorf("Cannot create Triton discovery: %s", err)
|
||||
logger.Errorf("Cannot create Triton discovery: %s", err)
|
||||
continue
|
||||
}
|
||||
app("triton", i, t)
|
||||
|
@ -16,6 +16,7 @@ package discovery
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"golang.org/x/net/context"
|
||||
yaml "gopkg.in/yaml.v2"
|
||||
@ -53,7 +54,7 @@ static_configs:
|
||||
|
||||
go ts.Run(ctx)
|
||||
|
||||
ts.UpdateProviders(ProvidersFromConfig(*cfg))
|
||||
ts.UpdateProviders(ProvidersFromConfig(*cfg, log.Base()))
|
||||
<-called
|
||||
|
||||
verifyPresence(ts.tgroups, "static/0/0", true)
|
||||
@ -67,7 +68,7 @@ static_configs:
|
||||
t.Fatalf("Unable to load YAML config sTwo: %s", err)
|
||||
}
|
||||
|
||||
ts.UpdateProviders(ProvidersFromConfig(*cfg))
|
||||
ts.UpdateProviders(ProvidersFromConfig(*cfg, log.Base()))
|
||||
<-called
|
||||
|
||||
verifyPresence(ts.tgroups, "static/0/0", true)
|
||||
|
@ -66,10 +66,11 @@ type Discovery struct {
|
||||
interval time.Duration
|
||||
port int
|
||||
qtype uint16
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// NewDiscovery returns a new Discovery which periodically refreshes its targets.
|
||||
func NewDiscovery(conf *config.DNSSDConfig) *Discovery {
|
||||
func NewDiscovery(conf *config.DNSSDConfig, logger log.Logger) *Discovery {
|
||||
qtype := dns.TypeSRV
|
||||
switch strings.ToUpper(conf.Type) {
|
||||
case "A":
|
||||
@ -84,6 +85,7 @@ func NewDiscovery(conf *config.DNSSDConfig) *Discovery {
|
||||
interval: time.Duration(conf.RefreshInterval),
|
||||
qtype: qtype,
|
||||
port: conf.Port,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
@ -112,7 +114,7 @@ func (d *Discovery) refreshAll(ctx context.Context, ch chan<- []*config.TargetGr
|
||||
for _, name := range d.names {
|
||||
go func(n string) {
|
||||
if err := d.refresh(ctx, n, ch); err != nil {
|
||||
log.Errorf("Error refreshing DNS targets: %s", err)
|
||||
d.logger.Errorf("Error refreshing DNS targets: %s", err)
|
||||
}
|
||||
wg.Done()
|
||||
}(name)
|
||||
@ -122,7 +124,7 @@ func (d *Discovery) refreshAll(ctx context.Context, ch chan<- []*config.TargetGr
|
||||
}
|
||||
|
||||
func (d *Discovery) refresh(ctx context.Context, name string, ch chan<- []*config.TargetGroup) error {
|
||||
response, err := lookupAll(name, d.qtype)
|
||||
response, err := lookupAll(name, d.qtype, d.logger)
|
||||
dnsSDLookupsCount.Inc()
|
||||
if err != nil {
|
||||
dnsSDLookupFailuresCount.Inc()
|
||||
@ -147,7 +149,7 @@ func (d *Discovery) refresh(ctx context.Context, name string, ch chan<- []*confi
|
||||
case *dns.AAAA:
|
||||
target = hostPort(addr.AAAA.String(), d.port)
|
||||
default:
|
||||
log.Warnf("%q is not a valid SRV record", record)
|
||||
d.logger.Warnf("%q is not a valid SRV record", record)
|
||||
continue
|
||||
|
||||
}
|
||||
@ -167,7 +169,7 @@ func (d *Discovery) refresh(ctx context.Context, name string, ch chan<- []*confi
|
||||
return nil
|
||||
}
|
||||
|
||||
func lookupAll(name string, qtype uint16) (*dns.Msg, error) {
|
||||
func lookupAll(name string, qtype uint16, logger log.Logger) (*dns.Msg, error) {
|
||||
conf, err := dns.ClientConfigFromFile(resolvConf)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not load resolv.conf: %s", err)
|
||||
@ -181,7 +183,7 @@ func lookupAll(name string, qtype uint16) (*dns.Msg, error) {
|
||||
for _, lname := range conf.NameList(name) {
|
||||
response, err = lookup(lname, qtype, client, servAddr, false)
|
||||
if err != nil {
|
||||
log.
|
||||
logger.
|
||||
With("server", server).
|
||||
With("name", name).
|
||||
With("reason", err).
|
||||
|
@ -72,10 +72,11 @@ type Discovery struct {
|
||||
interval time.Duration
|
||||
profile string
|
||||
port int
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// NewDiscovery returns a new EC2Discovery which periodically refreshes its targets.
|
||||
func NewDiscovery(conf *config.EC2SDConfig) *Discovery {
|
||||
func NewDiscovery(conf *config.EC2SDConfig, logger log.Logger) *Discovery {
|
||||
creds := credentials.NewStaticCredentials(conf.AccessKey, string(conf.SecretKey), "")
|
||||
if conf.AccessKey == "" && conf.SecretKey == "" {
|
||||
creds = nil
|
||||
@ -88,6 +89,7 @@ func NewDiscovery(conf *config.EC2SDConfig) *Discovery {
|
||||
profile: conf.Profile,
|
||||
interval: time.Duration(conf.RefreshInterval),
|
||||
port: conf.Port,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
@ -99,7 +101,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||
// Get an initial set right away.
|
||||
tg, err := d.refresh()
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
d.logger.Error(err)
|
||||
} else {
|
||||
select {
|
||||
case ch <- []*config.TargetGroup{tg}:
|
||||
@ -113,7 +115,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||
case <-ticker.C:
|
||||
tg, err := d.refresh()
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
d.logger.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -63,13 +63,15 @@ type Discovery struct {
|
||||
// and how many target groups they contained.
|
||||
// This is used to detect deleted target groups.
|
||||
lastRefresh map[string]int
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// NewDiscovery returns a new file discovery for the given paths.
|
||||
func NewDiscovery(conf *config.FileSDConfig) *Discovery {
|
||||
func NewDiscovery(conf *config.FileSDConfig, logger log.Logger) *Discovery {
|
||||
return &Discovery{
|
||||
paths: conf.Files,
|
||||
interval: time.Duration(conf.RefreshInterval),
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
@ -79,7 +81,7 @@ func (d *Discovery) listFiles() []string {
|
||||
for _, p := range d.paths {
|
||||
files, err := filepath.Glob(p)
|
||||
if err != nil {
|
||||
log.Errorf("Error expanding glob %q: %s", p, err)
|
||||
d.logger.Errorf("Error expanding glob %q: %s", p, err)
|
||||
continue
|
||||
}
|
||||
paths = append(paths, files...)
|
||||
@ -100,7 +102,7 @@ func (d *Discovery) watchFiles() {
|
||||
p = "./"
|
||||
}
|
||||
if err := d.watcher.Add(p); err != nil {
|
||||
log.Errorf("Error adding file watch for %q: %s", p, err)
|
||||
d.logger.Errorf("Error adding file watch for %q: %s", p, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -111,7 +113,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
log.Errorf("Error creating file watcher: %s", err)
|
||||
d.logger.Errorf("Error creating file watcher: %s", err)
|
||||
return
|
||||
}
|
||||
d.watcher = watcher
|
||||
@ -149,7 +151,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||
|
||||
case err := <-d.watcher.Errors:
|
||||
if err != nil {
|
||||
log.Errorf("Error on file watch: %s", err)
|
||||
d.logger.Errorf("Error on file watch: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -157,7 +159,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||
|
||||
// stop shuts down the file watcher.
|
||||
func (d *Discovery) stop() {
|
||||
log.Debugf("Stopping file discovery for %s...", d.paths)
|
||||
d.logger.Debugf("Stopping file discovery for %s...", d.paths)
|
||||
|
||||
done := make(chan struct{})
|
||||
defer close(done)
|
||||
@ -175,10 +177,10 @@ func (d *Discovery) stop() {
|
||||
}
|
||||
}()
|
||||
if err := d.watcher.Close(); err != nil {
|
||||
log.Errorf("Error closing file watcher for %s: %s", d.paths, err)
|
||||
d.logger.Errorf("Error closing file watcher for %s: %s", d.paths, err)
|
||||
}
|
||||
|
||||
log.Debugf("File discovery for %s stopped.", d.paths)
|
||||
d.logger.Debugf("File discovery for %s stopped.", d.paths)
|
||||
}
|
||||
|
||||
// refresh reads all files matching the discovery's patterns and sends the respective
|
||||
@ -194,7 +196,7 @@ func (d *Discovery) refresh(ctx context.Context, ch chan<- []*config.TargetGroup
|
||||
tgroups, err := readFile(p)
|
||||
if err != nil {
|
||||
fileSDReadErrorsCount.Inc()
|
||||
log.Errorf("Error reading file %q: %s", p, err)
|
||||
d.logger.Errorf("Error reading file %q: %s", p, err)
|
||||
// Prevent deletion down below.
|
||||
ref[p] = d.lastRefresh[p]
|
||||
continue
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/prometheus/common/model"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
@ -41,7 +42,7 @@ func testFileSD(t *testing.T, ext string) {
|
||||
conf.RefreshInterval = model.Duration(1 * time.Hour)
|
||||
|
||||
var (
|
||||
fsd = NewDiscovery(&conf)
|
||||
fsd = NewDiscovery(&conf, log.Base())
|
||||
ch = make(chan []*config.TargetGroup)
|
||||
ctx, cancel = context.WithCancel(context.Background())
|
||||
)
|
||||
|
@ -76,10 +76,11 @@ type Discovery struct {
|
||||
interval time.Duration
|
||||
port int
|
||||
tagSeparator string
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// NewDiscovery returns a new Discovery which periodically refreshes its targets.
|
||||
func NewDiscovery(conf *config.GCESDConfig) (*Discovery, error) {
|
||||
func NewDiscovery(conf *config.GCESDConfig, logger log.Logger) (*Discovery, error) {
|
||||
gd := &Discovery{
|
||||
project: conf.Project,
|
||||
zone: conf.Zone,
|
||||
@ -87,6 +88,7 @@ func NewDiscovery(conf *config.GCESDConfig) (*Discovery, error) {
|
||||
interval: time.Duration(conf.RefreshInterval),
|
||||
port: conf.Port,
|
||||
tagSeparator: conf.TagSeparator,
|
||||
logger: logger,
|
||||
}
|
||||
var err error
|
||||
gd.client, err = google.DefaultClient(oauth2.NoContext, compute.ComputeReadonlyScope)
|
||||
@ -106,7 +108,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||
// Get an initial set right away.
|
||||
tg, err := d.refresh()
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
d.logger.Error(err)
|
||||
} else {
|
||||
select {
|
||||
case ch <- []*config.TargetGroup{tg}:
|
||||
@ -122,7 +124,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||
case <-ticker.C:
|
||||
tg, err := d.refresh()
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
d.logger.Error(err)
|
||||
continue
|
||||
}
|
||||
select {
|
||||
|
@ -85,10 +85,11 @@ type Discovery struct {
|
||||
lastRefresh map[string]*config.TargetGroup
|
||||
appsClient AppListClient
|
||||
token string
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// NewDiscovery returns a new Marathon Discovery.
|
||||
func NewDiscovery(conf *config.MarathonSDConfig) (*Discovery, error) {
|
||||
func NewDiscovery(conf *config.MarathonSDConfig, logger log.Logger) (*Discovery, error) {
|
||||
tls, err := httputil.NewTLSConfig(conf.TLSConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -116,6 +117,7 @@ func NewDiscovery(conf *config.MarathonSDConfig) (*Discovery, error) {
|
||||
refreshInterval: time.Duration(conf.RefreshInterval),
|
||||
appsClient: fetchApps,
|
||||
token: token,
|
||||
logger: logger,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -128,7 +130,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||
case <-time.After(d.refreshInterval):
|
||||
err := d.updateServices(ctx, ch)
|
||||
if err != nil {
|
||||
log.Errorf("Error while updating services: %s", err)
|
||||
d.logger.Errorf("Error while updating services: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -167,7 +169,7 @@ func (d *Discovery) updateServices(ctx context.Context, ch chan<- []*config.Targ
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case ch <- []*config.TargetGroup{{Source: source}}:
|
||||
log.Debugf("Removing group for %s", source)
|
||||
d.logger.Debugf("Removing group for %s", source)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/prometheus/common/model"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
@ -32,7 +33,7 @@ var (
|
||||
)
|
||||
|
||||
func testUpdateServices(client AppListClient, ch chan []*config.TargetGroup) error {
|
||||
md, err := NewDiscovery(&conf)
|
||||
md, err := NewDiscovery(&conf, log.Base())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -140,7 +141,7 @@ func TestMarathonSDSendGroup(t *testing.T) {
|
||||
|
||||
func TestMarathonSDRemoveApp(t *testing.T) {
|
||||
var ch = make(chan []*config.TargetGroup, 1)
|
||||
md, err := NewDiscovery(&conf)
|
||||
md, err := NewDiscovery(&conf, log.Base())
|
||||
if err != nil {
|
||||
t.Fatalf("%s", err)
|
||||
}
|
||||
@ -176,7 +177,7 @@ func TestMarathonSDRunAndStop(t *testing.T) {
|
||||
ch = make(chan []*config.TargetGroup)
|
||||
doneCh = make(chan error)
|
||||
)
|
||||
md, err := NewDiscovery(&conf)
|
||||
md, err := NewDiscovery(&conf, log.Base())
|
||||
if err != nil {
|
||||
t.Fatalf("%s", err)
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"github.com/samuel/go-zookeeper/zk"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/util/strutil"
|
||||
"github.com/prometheus/prometheus/util/treecache"
|
||||
@ -39,17 +40,18 @@ type Discovery struct {
|
||||
updates chan treecache.ZookeeperTreeCacheEvent
|
||||
treeCaches []*treecache.ZookeeperTreeCache
|
||||
|
||||
parse func(data []byte, path string) (model.LabelSet, error)
|
||||
parse func(data []byte, path string) (model.LabelSet, error)
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// NewNerveDiscovery returns a new Discovery for the given Nerve config.
|
||||
func NewNerveDiscovery(conf *config.NerveSDConfig) *Discovery {
|
||||
return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseNerveMember)
|
||||
func NewNerveDiscovery(conf *config.NerveSDConfig, logger log.Logger) *Discovery {
|
||||
return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, logger, parseNerveMember)
|
||||
}
|
||||
|
||||
// NewServersetDiscovery returns a new Discovery for the given serverset config.
|
||||
func NewServersetDiscovery(conf *config.ServersetSDConfig) *Discovery {
|
||||
return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseServersetMember)
|
||||
func NewServersetDiscovery(conf *config.ServersetSDConfig, logger log.Logger) *Discovery {
|
||||
return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, logger, parseServersetMember)
|
||||
}
|
||||
|
||||
// NewDiscovery returns a new discovery along Zookeeper parses with
|
||||
@ -58,6 +60,7 @@ func NewDiscovery(
|
||||
srvs []string,
|
||||
timeout time.Duration,
|
||||
paths []string,
|
||||
logger log.Logger,
|
||||
pf func(data []byte, path string) (model.LabelSet, error),
|
||||
) *Discovery {
|
||||
conn, _, err := zk.Connect(srvs, timeout)
|
||||
@ -71,6 +74,7 @@ func NewDiscovery(
|
||||
updates: updates,
|
||||
sources: map[string]*config.TargetGroup{},
|
||||
parse: pf,
|
||||
logger: logger,
|
||||
}
|
||||
for _, path := range paths {
|
||||
sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, updates))
|
||||
|
@ -65,6 +65,7 @@ type Notifier struct {
|
||||
|
||||
alertmanagers []*alertmanagerSet
|
||||
cancelDiscovery func()
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// Options are the configurable parameters of a Handler.
|
||||
@ -156,7 +157,7 @@ func newAlertMetrics(r prometheus.Registerer, queueCap int, queueLen, alertmanag
|
||||
}
|
||||
|
||||
// New constructs a new Notifier.
|
||||
func New(o *Options) *Notifier {
|
||||
func New(o *Options, logger log.Logger) *Notifier {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
if o.Do == nil {
|
||||
@ -169,6 +170,7 @@ func New(o *Options) *Notifier {
|
||||
cancel: cancel,
|
||||
more: make(chan struct{}, 1),
|
||||
opts: o,
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
queueLenFunc := func() float64 { return float64(n.queueLen()) }
|
||||
@ -189,7 +191,7 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error {
|
||||
ctx, cancel := context.WithCancel(n.ctx)
|
||||
|
||||
for _, cfg := range conf.AlertingConfig.AlertmanagerConfigs {
|
||||
ams, err := newAlertmanagerSet(cfg)
|
||||
ams, err := newAlertmanagerSet(cfg, n.logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -203,7 +205,7 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error {
|
||||
// old ones.
|
||||
for _, ams := range amSets {
|
||||
go ams.ts.Run(ctx)
|
||||
ams.ts.UpdateProviders(discovery.ProvidersFromConfig(ams.cfg.ServiceDiscoveryConfig))
|
||||
ams.ts.UpdateProviders(discovery.ProvidersFromConfig(ams.cfg.ServiceDiscoveryConfig, n.logger))
|
||||
}
|
||||
if n.cancelDiscovery != nil {
|
||||
n.cancelDiscovery()
|
||||
@ -283,7 +285,7 @@ func (n *Notifier) Send(alerts ...*model.Alert) {
|
||||
if d := len(alerts) - n.opts.QueueCapacity; d > 0 {
|
||||
alerts = alerts[d:]
|
||||
|
||||
log.Warnf("Alert batch larger than queue capacity, dropping %d alerts", d)
|
||||
n.logger.Warnf("Alert batch larger than queue capacity, dropping %d alerts", d)
|
||||
n.metrics.dropped.Add(float64(d))
|
||||
}
|
||||
|
||||
@ -292,7 +294,7 @@ func (n *Notifier) Send(alerts ...*model.Alert) {
|
||||
if d := (len(n.queue) + len(alerts)) - n.opts.QueueCapacity; d > 0 {
|
||||
n.queue = n.queue[d:]
|
||||
|
||||
log.Warnf("Alert notification queue full, dropping %d alerts", d)
|
||||
n.logger.Warnf("Alert notification queue full, dropping %d alerts", d)
|
||||
n.metrics.dropped.Add(float64(d))
|
||||
}
|
||||
n.queue = append(n.queue, alerts...)
|
||||
@ -349,7 +351,7 @@ func (n *Notifier) sendAll(alerts ...*model.Alert) bool {
|
||||
|
||||
b, err := json.Marshal(alerts)
|
||||
if err != nil {
|
||||
log.Errorf("Encoding alerts failed: %s", err)
|
||||
n.logger.Errorf("Encoding alerts failed: %s", err)
|
||||
return false
|
||||
}
|
||||
|
||||
@ -374,7 +376,7 @@ func (n *Notifier) sendAll(alerts ...*model.Alert) bool {
|
||||
u := am.url().String()
|
||||
|
||||
if err := n.sendOne(ctx, ams.client, u, b); err != nil {
|
||||
log.With("alertmanager", u).With("count", len(alerts)).Errorf("Error sending alerts: %s", err)
|
||||
n.logger.With("alertmanager", u).With("count", len(alerts)).Errorf("Error sending alerts: %s", err)
|
||||
n.metrics.errors.WithLabelValues(u).Inc()
|
||||
} else {
|
||||
atomic.AddUint64(&numSuccess, 1)
|
||||
@ -413,7 +415,7 @@ func (n *Notifier) sendOne(ctx context.Context, c *http.Client, url string, b []
|
||||
|
||||
// Stop shuts down the notification handler.
|
||||
func (n *Notifier) Stop() {
|
||||
log.Info("Stopping notification handler...")
|
||||
n.logger.Info("Stopping notification handler...")
|
||||
n.cancel()
|
||||
}
|
||||
|
||||
@ -443,11 +445,12 @@ type alertmanagerSet struct {
|
||||
|
||||
metrics *alertMetrics
|
||||
|
||||
mtx sync.RWMutex
|
||||
ams []alertmanager
|
||||
mtx sync.RWMutex
|
||||
ams []alertmanager
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
func newAlertmanagerSet(cfg *config.AlertmanagerConfig) (*alertmanagerSet, error) {
|
||||
func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger log.Logger) (*alertmanagerSet, error) {
|
||||
client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -455,6 +458,7 @@ func newAlertmanagerSet(cfg *config.AlertmanagerConfig) (*alertmanagerSet, error
|
||||
s := &alertmanagerSet{
|
||||
client: client,
|
||||
cfg: cfg,
|
||||
logger: logger,
|
||||
}
|
||||
s.ts = discovery.NewTargetSet(s)
|
||||
|
||||
@ -469,7 +473,7 @@ func (s *alertmanagerSet) Sync(tgs []*config.TargetGroup) {
|
||||
for _, tg := range tgs {
|
||||
ams, err := alertmanagerFromGroup(tg, s.cfg)
|
||||
if err != nil {
|
||||
log.With("err", err).Error("generating discovered Alertmanagers failed")
|
||||
s.logger.With("err", err).Error("generating discovered Alertmanagers failed")
|
||||
continue
|
||||
}
|
||||
all = append(all, ams...)
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
)
|
||||
@ -63,7 +64,7 @@ func TestPostPath(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHandlerNextBatch(t *testing.T) {
|
||||
h := New(&Options{})
|
||||
h := New(&Options{}, log.Base())
|
||||
|
||||
for i := range make([]struct{}, 2*maxBatchSize+1) {
|
||||
h.queue = append(h.queue, &model.Alert{
|
||||
@ -150,7 +151,7 @@ func TestHandlerSendAll(t *testing.T) {
|
||||
defer server1.Close()
|
||||
defer server2.Close()
|
||||
|
||||
h := New(&Options{})
|
||||
h := New(&Options{}, log.Base())
|
||||
h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{
|
||||
ams: []alertmanager{
|
||||
alertmanagerMock{
|
||||
@ -217,7 +218,7 @@ func TestCustomDo(t *testing.T) {
|
||||
Body: ioutil.NopCloser(nil),
|
||||
}, nil
|
||||
},
|
||||
})
|
||||
}, log.Base())
|
||||
|
||||
h.sendOne(context.Background(), nil, testURL, []byte(testBody))
|
||||
|
||||
@ -239,7 +240,7 @@ func TestExternalLabels(t *testing.T) {
|
||||
Replacement: "c",
|
||||
},
|
||||
},
|
||||
})
|
||||
}, log.Base())
|
||||
|
||||
// This alert should get the external label attached.
|
||||
h.Send(&model.Alert{
|
||||
@ -293,7 +294,7 @@ func TestHandlerRelabel(t *testing.T) {
|
||||
Replacement: "renamed",
|
||||
},
|
||||
},
|
||||
})
|
||||
}, log.Base())
|
||||
|
||||
// This alert should be dropped due to the configuration
|
||||
h.Send(&model.Alert{
|
||||
@ -347,7 +348,9 @@ func TestHandlerQueueing(t *testing.T) {
|
||||
|
||||
h := New(&Options{
|
||||
QueueCapacity: 3 * maxBatchSize,
|
||||
})
|
||||
},
|
||||
log.Base(),
|
||||
)
|
||||
h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{
|
||||
ams: []alertmanager{
|
||||
alertmanagerMock{
|
||||
|
@ -38,6 +38,7 @@ type TargetManager struct {
|
||||
|
||||
// Set of unqiue targets by scrape configuration.
|
||||
targetSets map[string]*targetSet
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
type targetSet struct {
|
||||
@ -49,16 +50,17 @@ type targetSet struct {
|
||||
}
|
||||
|
||||
// NewTargetManager creates a new TargetManager.
|
||||
func NewTargetManager(app storage.SampleAppender) *TargetManager {
|
||||
func NewTargetManager(app storage.SampleAppender, logger log.Logger) *TargetManager {
|
||||
return &TargetManager{
|
||||
appender: app,
|
||||
targetSets: map[string]*targetSet{},
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts background processing to handle target updates.
|
||||
func (tm *TargetManager) Run() {
|
||||
log.Info("Starting target manager...")
|
||||
tm.logger.Info("Starting target manager...")
|
||||
|
||||
tm.mtx.Lock()
|
||||
|
||||
@ -72,7 +74,7 @@ func (tm *TargetManager) Run() {
|
||||
|
||||
// Stop all background processing.
|
||||
func (tm *TargetManager) Stop() {
|
||||
log.Infoln("Stopping target manager...")
|
||||
tm.logger.Infoln("Stopping target manager...")
|
||||
|
||||
tm.mtx.Lock()
|
||||
// Cancel the base context, this will cause all target providers to shut down
|
||||
@ -84,7 +86,7 @@ func (tm *TargetManager) Stop() {
|
||||
// Wait for all scrape inserts to complete.
|
||||
tm.wg.Wait()
|
||||
|
||||
log.Debugln("Target manager stopped")
|
||||
tm.logger.Debugln("Target manager stopped")
|
||||
}
|
||||
|
||||
func (tm *TargetManager) reload() {
|
||||
@ -118,7 +120,7 @@ func (tm *TargetManager) reload() {
|
||||
} else {
|
||||
ts.sp.reload(scfg)
|
||||
}
|
||||
ts.ts.UpdateProviders(discovery.ProvidersFromConfig(scfg.ServiceDiscoveryConfig))
|
||||
ts.ts.UpdateProviders(discovery.ProvidersFromConfig(scfg.ServiceDiscoveryConfig, tm.logger))
|
||||
}
|
||||
|
||||
// Remove old target sets. Waiting for scrape pools to complete pending
|
||||
|
Loading…
Reference in New Issue
Block a user