refactored discovery

This commit is contained in:
Krasi Georgiev 2017-11-25 13:13:54 +00:00
parent a8cce41882
commit e405e2f1ea
10 changed files with 481 additions and 552 deletions

View File

@ -43,6 +43,7 @@ import (
"github.com/prometheus/common/promlog"
promlogflag "github.com/prometheus/common/promlog/flag"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/retrieval"
@ -230,12 +231,17 @@ func main() {
cfg.queryEngine.Logger = log.With(logger, "component", "query engine")
var (
notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier"))
targetManager = retrieval.NewTargetManager(fanoutStorage, log.With(logger, "component", "target manager"))
queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine)
ctx, cancelCtx = context.WithCancel(context.Background())
notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier"))
ctxDiscovery, cancelDiscovery = context.WithCancel(context.Background())
discoveryManager = discovery.NewDiscoveryManager(ctxDiscovery, log.With(logger, "component", "discovery manager"))
ctxScrape, cancelScrape = context.WithCancel(context.Background())
scrapeManager = retrieval.NewScrapeManager(ctxScrape, log.With(logger, "component", "scrape manager"), fanoutStorage)
queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine)
ctxWeb, cancelWeb = context.WithCancel(context.Background())
webHandler = web.New(log.With(logger, "component", "web"), &cfg.web)
)
ctx := context.Background()
ruleManager := rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine),
@ -250,7 +256,7 @@ func main() {
cfg.web.TSDB = localStorage.Get
cfg.web.Storage = fanoutStorage
cfg.web.QueryEngine = queryEngine
cfg.web.TargetManager = targetManager
cfg.web.ScrapeManager = scrapeManager
cfg.web.RuleManager = ruleManager
cfg.web.Notifier = notifier
@ -268,8 +274,6 @@ func main() {
cfg.web.Flags[f.Name] = f.Value.String()
}
webHandler := web.New(log.With(logger, "component", "web"), &cfg.web)
// Monitor outgoing connections on default transport with conntrack.
http.DefaultTransport.(*http.Transport).DialContext = conntrack.NewDialContextFunc(
conntrack.DialWithTracing(),
@ -306,6 +310,17 @@ func main() {
var g group.Group
{
g.Add(
func() error {
err := discoveryManager.Run()
level.Info(logger).Log("msg", "Discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping discovery manager...")
cancelDiscovery()
},
)
term := make(chan os.Signal)
signal.Notify(term, os.Interrupt, syscall.SIGTERM)
cancel := make(chan struct{})
@ -426,7 +441,7 @@ func main() {
{
g.Add(
func() error {
if err := webHandler.Run(ctx); err != nil {
if err := webHandler.Run(ctxWeb); err != nil {
return fmt.Errorf("Error starting web server: %s", err)
}
return nil
@ -435,7 +450,7 @@ func main() {
// Keep this interrupt before the ruleManager.Stop().
// Shutting down the query engine before the rule manager will cause pending queries
// to be canceled and ensures a quick shutdown of the rule manager.
cancelCtx()
cancelWeb()
},
)
}
@ -468,17 +483,15 @@ func main() {
)
}
{
// TODO(krasi) refactor targetManager.Run() to be blocking to avoid using an extra blocking channel.
cancel := make(chan struct{})
g.Add(
func() error {
targetManager.Run()
<-cancel
return nil
err := scrapeManager.Run(discoveryManager.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
return err
},
func(err error) {
targetManager.Stop()
close(cancel)
level.Info(logger).Log("msg", "Stopping scrape manager...")
cancelScrape()
},
)
}

View File

@ -721,7 +721,7 @@ func (a *BasicAuth) UnmarshalYAML(unmarshal func(interface{}) error) error {
return checkOverflow(a.XXX, "basic_auth")
}
// TargetGroup is a set of targets with a common label set.
// TargetGroup is a set of targets with a common label set(production , test, staging etc.).
type TargetGroup struct {
// Targets is a list of targets identified by a label set. Each target is
// uniquely identifiable in the group by its address label.

View File

@ -1,319 +0,0 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package discovery
import (
"context"
"fmt"
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/azure"
"github.com/prometheus/prometheus/discovery/consul"
"github.com/prometheus/prometheus/discovery/dns"
"github.com/prometheus/prometheus/discovery/ec2"
"github.com/prometheus/prometheus/discovery/file"
"github.com/prometheus/prometheus/discovery/gce"
"github.com/prometheus/prometheus/discovery/kubernetes"
"github.com/prometheus/prometheus/discovery/marathon"
"github.com/prometheus/prometheus/discovery/openstack"
"github.com/prometheus/prometheus/discovery/triton"
"github.com/prometheus/prometheus/discovery/zookeeper"
)
// A TargetProvider provides information about target groups. It maintains a set
// of sources from which TargetGroups can originate. Whenever a target provider
// detects a potential change, it sends the TargetGroup through its provided channel.
//
// The TargetProvider does not have to guarantee that an actual change happened.
// It does guarantee that it sends the new TargetGroup whenever a change happens.
//
// TargetProviders should initially send a full set of all discoverable TargetGroups.
type TargetProvider interface {
// Run hands a channel to the target provider through which it can send
// updated target groups.
// Must returns if the context gets canceled. It should not close the update
// channel on returning.
Run(ctx context.Context, up chan<- []*config.TargetGroup)
}
// ProvidersFromConfig returns all TargetProviders configured in cfg.
func ProvidersFromConfig(cfg config.ServiceDiscoveryConfig, logger log.Logger) map[string]TargetProvider {
providers := map[string]TargetProvider{}
app := func(mech string, i int, tp TargetProvider) {
providers[fmt.Sprintf("%s/%d", mech, i)] = tp
}
for i, c := range cfg.DNSSDConfigs {
app("dns", i, dns.NewDiscovery(c, log.With(logger, "discovery", "dns")))
}
for i, c := range cfg.FileSDConfigs {
app("file", i, file.NewDiscovery(c, log.With(logger, "discovery", "file")))
}
for i, c := range cfg.ConsulSDConfigs {
k, err := consul.NewDiscovery(c, log.With(logger, "discovery", "consul"))
if err != nil {
level.Error(logger).Log("msg", "Cannot create Consul discovery", "err", err)
continue
}
app("consul", i, k)
}
for i, c := range cfg.MarathonSDConfigs {
m, err := marathon.NewDiscovery(c, log.With(logger, "discovery", "marathon"))
if err != nil {
level.Error(logger).Log("msg", "Cannot create Marathon discovery", "err", err)
continue
}
app("marathon", i, m)
}
for i, c := range cfg.KubernetesSDConfigs {
k, err := kubernetes.New(log.With(logger, "discovery", "k8s"), c)
if err != nil {
level.Error(logger).Log("msg", "Cannot create Kubernetes discovery", "err", err)
continue
}
app("kubernetes", i, k)
}
for i, c := range cfg.ServersetSDConfigs {
app("serverset", i, zookeeper.NewServersetDiscovery(c, log.With(logger, "discovery", "zookeeper")))
}
for i, c := range cfg.NerveSDConfigs {
app("nerve", i, zookeeper.NewNerveDiscovery(c, log.With(logger, "discovery", "nerve")))
}
for i, c := range cfg.EC2SDConfigs {
app("ec2", i, ec2.NewDiscovery(c, log.With(logger, "discovery", "ec2")))
}
for i, c := range cfg.OpenstackSDConfigs {
openstackd, err := openstack.NewDiscovery(c, log.With(logger, "discovery", "openstack"))
if err != nil {
level.Error(logger).Log("msg", "Cannot initialize OpenStack discovery", "err", err)
continue
}
app("openstack", i, openstackd)
}
for i, c := range cfg.GCESDConfigs {
gced, err := gce.NewDiscovery(c, log.With(logger, "discovery", "gce"))
if err != nil {
level.Error(logger).Log("msg", "Cannot initialize GCE discovery", "err", err)
continue
}
app("gce", i, gced)
}
for i, c := range cfg.AzureSDConfigs {
app("azure", i, azure.NewDiscovery(c, log.With(logger, "discovery", "azure")))
}
for i, c := range cfg.TritonSDConfigs {
t, err := triton.New(log.With(logger, "discovery", "trition"), c)
if err != nil {
level.Error(logger).Log("msg", "Cannot create Triton discovery", "err", err)
continue
}
app("triton", i, t)
}
if len(cfg.StaticConfigs) > 0 {
app("static", 0, NewStaticProvider(cfg.StaticConfigs))
}
return providers
}
// StaticProvider holds a list of target groups that never change.
type StaticProvider struct {
TargetGroups []*config.TargetGroup
}
// NewStaticProvider returns a StaticProvider configured with the given
// target groups.
func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider {
for i, tg := range groups {
tg.Source = fmt.Sprintf("%d", i)
}
return &StaticProvider{groups}
}
// Run implements the TargetProvider interface.
func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// We still have to consider that the consumer exits right away in which case
// the context will be canceled.
select {
case ch <- sd.TargetGroups:
case <-ctx.Done():
}
close(ch)
}
// TargetSet handles multiple TargetProviders and sends a full overview of their
// discovered TargetGroups to a Syncer.
type TargetSet struct {
mtx sync.RWMutex
// Sets of targets by a source string that is unique across target providers.
tgroups map[string]*config.TargetGroup
syncer Syncer
syncCh chan struct{}
providerCh chan map[string]TargetProvider
cancelProviders func()
}
// Syncer receives updates complete sets of TargetGroups.
type Syncer interface {
Sync([]*config.TargetGroup)
}
// NewTargetSet returns a new target sending TargetGroups to the Syncer.
func NewTargetSet(s Syncer) *TargetSet {
return &TargetSet{
syncCh: make(chan struct{}, 1),
providerCh: make(chan map[string]TargetProvider),
syncer: s,
}
}
// Run starts the processing of target providers and their updates.
// It blocks until the context gets canceled.
func (ts *TargetSet) Run(ctx context.Context) {
Loop:
for {
// Throttle syncing to once per five seconds.
select {
case <-ctx.Done():
break Loop
case p := <-ts.providerCh:
ts.updateProviders(ctx, p)
case <-time.After(5 * time.Second):
}
select {
case <-ctx.Done():
break Loop
case <-ts.syncCh:
ts.sync()
case p := <-ts.providerCh:
ts.updateProviders(ctx, p)
}
}
}
func (ts *TargetSet) sync() {
ts.mtx.RLock()
var all []*config.TargetGroup
for _, tg := range ts.tgroups {
all = append(all, tg)
}
ts.mtx.RUnlock()
ts.syncer.Sync(all)
}
// UpdateProviders sets new target providers for the target set.
func (ts *TargetSet) UpdateProviders(p map[string]TargetProvider) {
ts.providerCh <- p
}
func (ts *TargetSet) updateProviders(ctx context.Context, providers map[string]TargetProvider) {
// Stop all previous target providers of the target set.
if ts.cancelProviders != nil {
ts.cancelProviders()
}
ctx, ts.cancelProviders = context.WithCancel(ctx)
var wg sync.WaitGroup
// (Re-)create a fresh tgroups map to not keep stale targets around. We
// will retrieve all targets below anyway, so cleaning up everything is
// safe and doesn't inflict any additional cost.
ts.mtx.Lock()
ts.tgroups = map[string]*config.TargetGroup{}
ts.mtx.Unlock()
for name, prov := range providers {
wg.Add(1)
updates := make(chan []*config.TargetGroup)
go prov.Run(ctx, updates)
go func(name string, prov TargetProvider) {
select {
case <-ctx.Done():
case initial, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
break
}
// First set of all targets the provider knows.
for _, tgroup := range initial {
ts.setTargetGroup(name, tgroup)
}
case <-time.After(5 * time.Second):
// Initial set didn't arrive. Act as if it was empty
// and wait for updates later on.
}
wg.Done()
// Start listening for further updates.
for {
select {
case <-ctx.Done():
return
case tgs, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
return
}
for _, tg := range tgs {
ts.update(name, tg)
}
}
}
}(name, prov)
}
// We wait for a full initial set of target groups before releasing the mutex
// to ensure the initial sync is complete and there are no races with subsequent updates.
wg.Wait()
// Just signal that there are initial sets to sync now. Actual syncing must only
// happen in the runScraping loop.
select {
case ts.syncCh <- struct{}{}:
default:
}
}
// update handles a target group update from a target provider identified by the name.
func (ts *TargetSet) update(name string, tgroup *config.TargetGroup) {
ts.setTargetGroup(name, tgroup)
select {
case ts.syncCh <- struct{}{}:
default:
}
}
func (ts *TargetSet) setTargetGroup(name string, tg *config.TargetGroup) {
ts.mtx.Lock()
defer ts.mtx.Unlock()
if tg == nil {
return
}
ts.tgroups[name+"/"+tg.Source] = tg
}

293
discovery/manager.go Normal file
View File

@ -0,0 +1,293 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package discovery
import (
"context"
"fmt"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/azure"
"github.com/prometheus/prometheus/discovery/consul"
"github.com/prometheus/prometheus/discovery/dns"
"github.com/prometheus/prometheus/discovery/ec2"
"github.com/prometheus/prometheus/discovery/file"
"github.com/prometheus/prometheus/discovery/gce"
"github.com/prometheus/prometheus/discovery/kubernetes"
"github.com/prometheus/prometheus/discovery/marathon"
"github.com/prometheus/prometheus/discovery/openstack"
"github.com/prometheus/prometheus/discovery/triton"
"github.com/prometheus/prometheus/discovery/zookeeper"
)
// DiscoveryProvider provides information about target groups. It maintains a set
// of sources from which TargetGroups can originate. Whenever a discovery provider
// detects a potential change, it sends the TargetGroup through its provided channel.
//
// The DiscoveryProvider does not have to guarantee that an actual change happened.
// It does guarantee that it sends the new TargetGroup whenever a change happens.
//
// DiscoveryProviders should initially send a full set of all discoverable TargetGroups.
type DiscoveryProvider interface {
// Run hands a channel to the discovery provider(consul,dns etc) through which it can send
// updated target groups.
// Must returns if the context gets canceled. It should not close the update
// channel on returning.
Run(ctx context.Context, up chan<- []*config.TargetGroup)
}
type targetSetProvider struct {
cancel func()
tgroups []*config.TargetGroup
}
// NewDiscoveryManager is the DiscoveryManager constructor
func NewDiscoveryManager(ctx context.Context, logger log.Logger) *DiscoveryManager {
return &DiscoveryManager{
ctx: ctx,
logger: logger,
actionCh: make(chan func()),
syncCh: make(chan map[string][]*config.TargetGroup),
targetSetProviders: make(map[string]map[string]*targetSetProvider),
}
}
// DiscoveryManager maintains a set of discovery providers and sends each update to a channel used by other packages.
type DiscoveryManager struct {
ctx context.Context
logger log.Logger
syncCh chan map[string][]*config.TargetGroup // map[targetSetName]
actionCh chan func()
targetSetProviders map[string]map[string]*targetSetProvider // map[targetSetName]map[providerName]
}
// Run starts the background processing
func (m *DiscoveryManager) Run() error {
for {
select {
case f := <-m.actionCh:
f()
case <-m.ctx.Done():
return m.ctx.Err()
}
}
}
// SyncCh returns a read only channel used by all DiscoveryProviders targetSet updates
func (m *DiscoveryManager) SyncCh() <-chan map[string][]*config.TargetGroup {
return m.syncCh
}
// ApplyConfig removes all running discovery providers and starts new ones using the provided config.
func (m *DiscoveryManager) ApplyConfig(cfg *config.Config) error {
err := make(chan error)
m.actionCh <- func() {
m.cancelDiscoveryProviders()
for _, scfg := range cfg.ScrapeConfigs {
for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) {
ctx, cancel := context.WithCancel(m.ctx)
updates := make(chan []*config.TargetGroup)
m.createProvider(cancel, scfg.JobName, provName)
go prov.Run(ctx, updates)
go func(provName string) {
select {
case <-ctx.Done():
// First set of all targets the provider knows.
case tgs, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
break
}
m.syncCh <- m.mergeGroups(scfg.JobName, provName, tgs)
case <-time.After(5 * time.Second):
// Initial set didn't arrive. Act as if it was empty
// and wait for updates later on.
}
// Start listening for further updates.
for {
select {
case <-ctx.Done():
return
case tgs, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
return
}
m.syncCh <- m.mergeGroups(scfg.JobName, provName, tgs)
}
}
}(provName)
}
}
close(err)
}
return <-err
}
func (m *DiscoveryManager) cancelDiscoveryProviders() {
for targetSetName, targetSetProviders := range m.targetSetProviders {
for _, discoveryProvider := range targetSetProviders {
discoveryProvider.cancel()
}
delete(m.targetSetProviders, targetSetName)
}
}
func (m *DiscoveryManager) createProvider(cancel context.CancelFunc, tsName, provName string) {
if m.targetSetProviders[tsName] == nil {
m.targetSetProviders[tsName] = make(map[string]*targetSetProvider)
}
m.targetSetProviders[tsName][provName] = &targetSetProvider{
cancel: cancel,
tgroups: []*config.TargetGroup{},
}
}
// mergeGroups adds a new target group for a named discovery provider and returns all target groups for a given target set
func (m *DiscoveryManager) mergeGroups(tsName, provName string, tg []*config.TargetGroup) map[string][]*config.TargetGroup {
tset := make(chan map[string][]*config.TargetGroup)
m.actionCh <- func() {
if tg != nil {
m.targetSetProviders[tsName][provName].tgroups = tg
}
var tgAll []*config.TargetGroup
for _, prov := range m.targetSetProviders[tsName] {
for _, tg := range prov.tgroups {
tgAll = append(tgAll, tg)
}
}
t := make(map[string][]*config.TargetGroup)
t[tsName] = tgAll
tset <- t
}
return <-tset
}
func (m *DiscoveryManager) providersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]DiscoveryProvider {
providers := map[string]DiscoveryProvider{}
app := func(mech string, i int, tp DiscoveryProvider) {
providers[fmt.Sprintf("%s/%d", mech, i)] = tp
}
for i, c := range cfg.DNSSDConfigs {
app("dns", i, dns.NewDiscovery(c, log.With(m.logger, "discovery", "dns")))
}
for i, c := range cfg.FileSDConfigs {
app("file", i, file.NewDiscovery(c, log.With(m.logger, "discovery", "file")))
}
for i, c := range cfg.ConsulSDConfigs {
k, err := consul.NewDiscovery(c, log.With(m.logger, "discovery", "consul"))
if err != nil {
level.Error(m.logger).Log("msg", "Cannot create Consul discovery", "err", err)
continue
}
app("consul", i, k)
}
for i, c := range cfg.MarathonSDConfigs {
t, err := marathon.NewDiscovery(c, log.With(m.logger, "discovery", "marathon"))
if err != nil {
level.Error(m.logger).Log("msg", "Cannot create Marathon discovery", "err", err)
continue
}
app("marathon", i, t)
}
for i, c := range cfg.KubernetesSDConfigs {
k, err := kubernetes.New(log.With(m.logger, "discovery", "k8s"), c)
if err != nil {
level.Error(m.logger).Log("msg", "Cannot create Kubernetes discovery", "err", err)
continue
}
app("kubernetes", i, k)
}
for i, c := range cfg.ServersetSDConfigs {
app("serverset", i, zookeeper.NewServersetDiscovery(c, log.With(m.logger, "discovery", "zookeeper")))
}
for i, c := range cfg.NerveSDConfigs {
app("nerve", i, zookeeper.NewNerveDiscovery(c, log.With(m.logger, "discovery", "nerve")))
}
for i, c := range cfg.EC2SDConfigs {
app("ec2", i, ec2.NewDiscovery(c, log.With(m.logger, "discovery", "ec2")))
}
for i, c := range cfg.OpenstackSDConfigs {
openstackd, err := openstack.NewDiscovery(c, log.With(m.logger, "discovery", "openstack"))
if err != nil {
level.Error(m.logger).Log("msg", "Cannot initialize OpenStack discovery", "err", err)
continue
}
app("openstack", i, openstackd)
}
for i, c := range cfg.GCESDConfigs {
gced, err := gce.NewDiscovery(c, log.With(m.logger, "discovery", "gce"))
if err != nil {
level.Error(m.logger).Log("msg", "Cannot initialize GCE discovery", "err", err)
continue
}
app("gce", i, gced)
}
for i, c := range cfg.AzureSDConfigs {
app("azure", i, azure.NewDiscovery(c, log.With(m.logger, "discovery", "azure")))
}
for i, c := range cfg.TritonSDConfigs {
t, err := triton.New(log.With(m.logger, "discovery", "trition"), c)
if err != nil {
level.Error(m.logger).Log("msg", "Cannot create Triton discovery", "err", err)
continue
}
app("triton", i, t)
}
if len(cfg.StaticConfigs) > 0 {
app("static", 0, NewStaticProvider(cfg.StaticConfigs))
}
return providers
}
// StaticProvider holds a list of target groups that never change.
type StaticProvider struct {
TargetGroups []*config.TargetGroup
}
// NewStaticProvider returns a StaticProvider configured with the given
// target groups.
func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider {
for i, tg := range groups {
tg.Source = fmt.Sprintf("%d", i)
}
return &StaticProvider{groups}
}
// Run implements the DiscoveryProvider interface.
func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// We still have to consider that the consumer exits right away in which case
// the context will be canceled.
select {
case ch <- sd.TargetGroups:
case <-ctx.Done():
}
close(ch)
}

View File

@ -35,7 +35,6 @@ import (
"golang.org/x/net/context/ctxhttp"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/util/httputil"
@ -248,7 +247,6 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error {
n.opts.RelabelConfigs = conf.AlertingConfig.AlertRelabelConfigs
amSets := []*alertmanagerSet{}
ctx, cancel := context.WithCancel(n.ctx)
for _, cfg := range conf.AlertingConfig.AlertmanagerConfigs {
ams, err := newAlertmanagerSet(cfg, n.logger)
@ -261,17 +259,6 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error {
amSets = append(amSets, ams)
}
// After all sets were created successfully, start them and cancel the
// old ones.
for _, ams := range amSets {
go ams.ts.Run(ctx)
ams.ts.UpdateProviders(discovery.ProvidersFromConfig(ams.cfg.ServiceDiscoveryConfig, n.logger))
}
if n.cancelDiscovery != nil {
n.cancelDiscovery()
}
n.cancelDiscovery = cancel
n.alertmanagers = amSets
return nil
@ -504,7 +491,6 @@ func (a alertmanagerLabels) url() *url.URL {
// alertmanagerSet contains a set of Alertmanagers discovered via a group of service
// discovery definitions that have a common configuration on how alerts should be sent.
type alertmanagerSet struct {
ts *discovery.TargetSet
cfg *config.AlertmanagerConfig
client *http.Client
@ -525,8 +511,6 @@ func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger log.Logger) (*ale
cfg: cfg,
logger: logger,
}
s.ts = discovery.NewTargetSet(s)
return s, nil
}

152
retrieval/manager.go Normal file
View File

@ -0,0 +1,152 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"context"
"fmt"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage"
)
// Appendable returns an Appender.
type Appendable interface {
Appender() (storage.Appender, error)
}
// NewScrapeManager is the ScrapeManager constructor
func NewScrapeManager(ctx context.Context, logger log.Logger, app Appendable) *ScrapeManager {
return &ScrapeManager{
ctx: ctx,
append: app,
logger: logger,
actionCh: make(chan func()),
scrapeConfigs: make(map[string]*config.ScrapeConfig),
scrapePools: make(map[string]*scrapePool),
}
}
// ScrapeManager maintains a set of scrape pools and manages start/stop cicles
// when receiving new target groups form the discovery manager.
type ScrapeManager struct {
ctx context.Context
logger log.Logger
append Appendable
scrapeConfigs map[string]*config.ScrapeConfig
scrapePools map[string]*scrapePool
actionCh chan func()
}
// Run starts background processing to handle target updates and reload the scraping loops.
func (m *ScrapeManager) Run(tsets <-chan map[string][]*config.TargetGroup) error {
level.Info(m.logger).Log("msg", "Starting scrape manager...")
for {
select {
case f := <-m.actionCh:
f()
case ts := <-tsets:
m.reload(ts)
case <-m.ctx.Done():
return m.ctx.Err()
}
}
}
// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg.
func (m *ScrapeManager) ApplyConfig(cfg *config.Config) error {
done := make(chan struct{})
m.actionCh <- func() {
for _, scfg := range cfg.ScrapeConfigs {
m.scrapeConfigs[scfg.JobName] = scfg
}
close(done)
}
<-done
return nil
}
// TargetMap returns map of active and dropped targets and their corresponding scrape config job name.
func (tm *TargetManager) TargetMap() map[string][]*Target {
tm.mtx.RLock()
defer tm.mtx.RUnlock()
targetsMap := make(map[string][]*Target)
for jobName, ps := range tm.targetSets {
ps.sp.mtx.RLock()
for _, t := range ps.sp.targets {
targetsMap[jobName] = append(targetsMap[jobName], t)
}
targetsMap[jobName] = append(targetsMap[jobName], ps.sp.droppedTargets...)
ps.sp.mtx.RUnlock()
}
return targetsMap
}
// Targets returns the targets currently being scraped.
func (m *ScrapeManager) Targets() []*Target {
targets := make(chan []*Target)
m.actionCh <- func() {
var t []*Target
for _, p := range m.scrapePools {
p.mtx.RLock()
for _, tt := range p.targets {
t = append(t, tt)
}
p.mtx.RUnlock()
}
targets <- t
}
return <-targets
}
func (m *ScrapeManager) reload(t map[string][]*config.TargetGroup) error {
for tsetName, tgroup := range t {
scrapeConfig, ok := m.scrapeConfigs[tsetName]
if !ok {
return fmt.Errorf("target set '%v' doesn't have valid config", tsetName)
}
// scrape pool doesn't exist so start a new one
existing, ok := m.scrapePools[tsetName]
if !ok {
sp := newScrapePool(m.ctx, scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName))
m.scrapePools[tsetName] = sp
sp.Sync(tgroup)
} else {
existing.Sync(tgroup)
}
// cleanup - check config and cancel the scrape loops if it don't exist in the scrape config
jobs := make(map[string]struct{})
for k := range m.scrapeConfigs {
jobs[k] = struct{}{}
}
for name, sp := range m.scrapePools {
if _, ok := jobs[name]; !ok {
sp.stop()
delete(m.scrapePools, name)
}
}
}
return nil
}

View File

@ -1,194 +0,0 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"context"
"sync"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/storage"
)
// TargetManager maintains a set of targets, starts and stops their scraping and
// creates the new targets based on the target groups it receives from various
// target providers.
type TargetManager struct {
append Appendable
scrapeConfigs []*config.ScrapeConfig
mtx sync.RWMutex
ctx context.Context
cancel func()
wg sync.WaitGroup
// Set of unqiue targets by scrape configuration.
targetSets map[string]*targetSet
logger log.Logger
starting chan struct{}
}
type targetSet struct {
ctx context.Context
cancel func()
ts *discovery.TargetSet
sp *scrapePool
}
// Appendable returns an Appender.
type Appendable interface {
Appender() (storage.Appender, error)
}
// NewTargetManager creates a new TargetManager.
func NewTargetManager(app Appendable, logger log.Logger) *TargetManager {
return &TargetManager{
append: app,
targetSets: map[string]*targetSet{},
logger: logger,
starting: make(chan struct{}),
}
}
// Run starts background processing to handle target updates.
func (tm *TargetManager) Run() {
level.Info(tm.logger).Log("msg", "Starting target manager...")
tm.mtx.Lock()
tm.ctx, tm.cancel = context.WithCancel(context.Background())
tm.reload()
tm.mtx.Unlock()
close(tm.starting)
tm.wg.Wait()
}
// Stop all background processing.
func (tm *TargetManager) Stop() {
<-tm.starting
level.Info(tm.logger).Log("msg", "Stopping target manager...")
tm.mtx.Lock()
// Cancel the base context, this will cause all target providers to shut down
// and all in-flight scrapes to abort immmediately.
// Started inserts will be finished before terminating.
tm.cancel()
tm.mtx.Unlock()
// Wait for all scrape inserts to complete.
tm.wg.Wait()
level.Info(tm.logger).Log("msg", "Target manager stopped")
}
func (tm *TargetManager) reload() {
jobs := map[string]struct{}{}
// Start new target sets and update existing ones.
for _, scfg := range tm.scrapeConfigs {
jobs[scfg.JobName] = struct{}{}
ts, ok := tm.targetSets[scfg.JobName]
if !ok {
ctx, cancel := context.WithCancel(tm.ctx)
ts = &targetSet{
ctx: ctx,
cancel: cancel,
sp: newScrapePool(ctx, scfg, tm.append, log.With(tm.logger, "scrape_pool", scfg.JobName)),
}
ts.ts = discovery.NewTargetSet(ts.sp)
tm.targetSets[scfg.JobName] = ts
tm.wg.Add(1)
go func(ts *targetSet) {
// Run target set, which blocks until its context is canceled.
// Gracefully shut down pending scrapes in the scrape pool afterwards.
ts.ts.Run(ctx)
ts.sp.stop()
tm.wg.Done()
}(ts)
} else {
ts.sp.reload(scfg)
}
ts.ts.UpdateProviders(discovery.ProvidersFromConfig(scfg.ServiceDiscoveryConfig, tm.logger))
}
// Remove old target sets. Waiting for scrape pools to complete pending
// scrape inserts is already guaranteed by the goroutine that started the target set.
for name, ts := range tm.targetSets {
if _, ok := jobs[name]; !ok {
ts.cancel()
delete(tm.targetSets, name)
}
}
}
// TargetMap returns map of active and dropped targets and their corresponding scrape config job name.
func (tm *TargetManager) TargetMap() map[string][]*Target {
tm.mtx.RLock()
defer tm.mtx.RUnlock()
targetsMap := make(map[string][]*Target)
for jobName, ps := range tm.targetSets {
ps.sp.mtx.RLock()
for _, t := range ps.sp.targets {
targetsMap[jobName] = append(targetsMap[jobName], t)
}
targetsMap[jobName] = append(targetsMap[jobName], ps.sp.droppedTargets...)
ps.sp.mtx.RUnlock()
}
return targetsMap
}
// Targets returns the targets currently being scraped.
func (tm *TargetManager) Targets() []*Target {
tm.mtx.RLock()
defer tm.mtx.RUnlock()
targets := []*Target{}
for _, ps := range tm.targetSets {
ps.sp.mtx.RLock()
for _, t := range ps.sp.targets {
targets = append(targets, t)
}
ps.sp.mtx.RUnlock()
}
return targets
}
// ApplyConfig resets the manager's target providers and job configurations as defined
// by the new cfg. The state of targets that are valid in the new configuration remains unchanged.
func (tm *TargetManager) ApplyConfig(cfg *config.Config) error {
tm.mtx.Lock()
defer tm.mtx.Unlock()
tm.scrapeConfigs = cfg.ScrapeConfigs
if tm.ctx != nil {
tm.reload()
}
return nil
}

View File

@ -71,7 +71,7 @@ var localhostRepresentations = []string{"127.0.0.1", "localhost"}
type Handler struct {
logger log.Logger
targetManager *retrieval.TargetManager
scrapeManager *retrieval.ScrapeManager
ruleManager *rules.Manager
queryEngine *promql.Engine
context context.Context
@ -125,7 +125,7 @@ type Options struct {
TSDB func() *tsdb.DB
Storage storage.Storage
QueryEngine *promql.Engine
TargetManager *retrieval.TargetManager
ScrapeManager *retrieval.ScrapeManager
RuleManager *rules.Manager
Notifier *notifier.Notifier
Version *PrometheusVersion
@ -169,7 +169,7 @@ func New(logger log.Logger, o *Options) *Handler {
flagsMap: o.Flags,
context: o.Context,
targetManager: o.TargetManager,
scrapeManager: o.ScrapeManager,
ruleManager: o.RuleManager,
queryEngine: o.QueryEngine,
tsdb: o.TSDB,
@ -181,7 +181,7 @@ func New(logger log.Logger, o *Options) *Handler {
ready: 0,
}
h.apiV1 = api_v1.NewAPI(h.queryEngine, h.storage, h.targetManager, h.notifier,
h.apiV1 = api_v1.NewAPI(h.queryEngine, h.storage, h.scrapeManager, h.notifier,
func() config.Config {
h.mtx.RLock()
defer h.mtx.RUnlock()
@ -405,7 +405,7 @@ func (h *Handler) Run(ctx context.Context) error {
h.options.QueryEngine,
h.options.Storage.Querier,
func() []*retrieval.Target {
return h.options.TargetManager.Targets()
return h.options.ScrapeManager.Targets()
},
func() []*url.URL {
return h.options.Notifier.Alertmanagers()
@ -605,7 +605,7 @@ func (h *Handler) serviceDiscovery(w http.ResponseWriter, r *http.Request) {
func (h *Handler) targets(w http.ResponseWriter, r *http.Request) {
// Bucket targets by job label
tps := map[string][]*retrieval.Target{}
for _, t := range h.targetManager.Targets() {
for _, t := range h.scrapeManager.Targets() {
job := t.Labels().Get(model.JobLabel)
tps[job] = append(tps[job], t)
}