// Copyright 2013 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package retrieval import ( "fmt" "net" "sort" "strings" "sync" "time" "github.com/prometheus/common/log" "github.com/prometheus/common/model" "golang.org/x/net/context" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/relabel" "github.com/prometheus/prometheus/retrieval/discovery" "github.com/prometheus/prometheus/storage" ) // A TargetProvider provides information about target groups. It maintains a set // of sources from which TargetGroups can originate. Whenever a target provider // detects a potential change, it sends the TargetGroup through its provided channel. // // The TargetProvider does not have to guarantee that an actual change happened. // It does guarantee that it sends the new TargetGroup whenever a change happens. // // Providers must initially send all known target groups as soon as it can. type TargetProvider interface { // Run hands a channel to the target provider through which it can send // updated target groups. The channel must be closed by the target provider // if no more updates will be sent. // On receiving from done Run must return. Run(ctx context.Context, up chan<- []*config.TargetGroup) } // TargetManager maintains a set of targets, starts and stops their scraping and // creates the new targets based on the target groups it receives from various // target providers. type TargetManager struct { appender storage.SampleAppender scrapeConfigs []*config.ScrapeConfig mtx sync.RWMutex ctx context.Context cancel func() wg sync.WaitGroup // Set of unqiue targets by scrape configuration. targetSets map[string]*targetSet } // NewTargetManager creates a new TargetManager. func NewTargetManager(app storage.SampleAppender) *TargetManager { return &TargetManager{ appender: app, targetSets: map[string]*targetSet{}, } } // Run starts background processing to handle target updates. func (tm *TargetManager) Run() { log.Info("Starting target manager...") tm.mtx.Lock() tm.ctx, tm.cancel = context.WithCancel(context.Background()) tm.reload() tm.mtx.Unlock() tm.wg.Wait() } // Stop all background processing. func (tm *TargetManager) Stop() { log.Infoln("Stopping target manager...") tm.mtx.Lock() // Cancel the base context, this will cause all target providers to shut down // and all in-flight scrapes to abort immmediately. // Started inserts will be finished before terminating. tm.cancel() tm.mtx.Unlock() // Wait for all scrape inserts to complete. tm.wg.Wait() log.Debugln("Target manager stopped") } func (tm *TargetManager) reload() { jobs := map[string]struct{}{} // Start new target sets and update existing ones. for _, scfg := range tm.scrapeConfigs { jobs[scfg.JobName] = struct{}{} ts, ok := tm.targetSets[scfg.JobName] if !ok { ts = newTargetSet(scfg, tm.appender) tm.targetSets[scfg.JobName] = ts tm.wg.Add(1) go func(ts *targetSet) { ts.runScraping(tm.ctx) tm.wg.Done() }(ts) } else { ts.reload(scfg) } ts.runProviders(tm.ctx, providersFromConfig(scfg)) } // Remove old target sets. Waiting for stopping is already guaranteed // by the goroutine that started the target set. for name, ts := range tm.targetSets { if _, ok := jobs[name]; !ok { ts.cancel() delete(tm.targetSets, name) } } } // Pools returns the targets currently being scraped bucketed by their job name. func (tm *TargetManager) Pools() map[string]Targets { tm.mtx.RLock() defer tm.mtx.RUnlock() pools := map[string]Targets{} // TODO(fabxc): this is just a hack to maintain compatibility for now. for _, ps := range tm.targetSets { ps.scrapePool.mtx.RLock() for _, t := range ps.scrapePool.targets { job := string(t.Labels()[model.JobLabel]) pools[job] = append(pools[job], t) } ps.scrapePool.mtx.RUnlock() } for _, targets := range pools { sort.Sort(targets) } return pools } // ApplyConfig resets the manager's target providers and job configurations as defined // by the new cfg. The state of targets that are valid in the new configuration remains unchanged. func (tm *TargetManager) ApplyConfig(cfg *config.Config) error { tm.mtx.Lock() defer tm.mtx.Unlock() tm.scrapeConfigs = cfg.ScrapeConfigs if tm.ctx != nil { tm.reload() } return nil } // targetSet holds several TargetProviders for which the same scrape configuration // is used. It maintains target groups from all given providers and sync them // to a scrape pool. type targetSet struct { mtx sync.RWMutex // Sets of targets by a source string that is unique across target providers. tgroups map[string][]*Target scrapePool *scrapePool config *config.ScrapeConfig syncCh chan struct{} cancelScraping func() cancelProviders func() } func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet { ts := &targetSet{ scrapePool: newScrapePool(cfg, app), syncCh: make(chan struct{}, 1), config: cfg, } return ts } func (ts *targetSet) cancel() { ts.mtx.RLock() defer ts.mtx.RUnlock() if ts.cancelScraping != nil { ts.cancelScraping() } if ts.cancelProviders != nil { ts.cancelProviders() } } func (ts *targetSet) reload(cfg *config.ScrapeConfig) { ts.mtx.Lock() ts.config = cfg ts.mtx.Unlock() ts.scrapePool.reload(cfg) } func (ts *targetSet) runScraping(ctx context.Context) { ctx, ts.cancelScraping = context.WithCancel(ctx) ts.scrapePool.init(ctx) Loop: for { // Throttle syncing to once per five seconds. select { case <-ctx.Done(): break Loop case <-time.After(5 * time.Second): } select { case <-ctx.Done(): break Loop case <-ts.syncCh: ts.mtx.RLock() ts.sync() ts.mtx.RUnlock() } } // We want to wait for all pending target scrapes to complete though to ensure there'll // be no more storage writes after this point. ts.scrapePool.stop() } func (ts *targetSet) sync() { var all []*Target for _, targets := range ts.tgroups { all = append(all, targets...) } ts.scrapePool.sync(all) } func (ts *targetSet) runProviders(ctx context.Context, providers map[string]TargetProvider) { // Lock for the entire time. This may mean up to 5 seconds until the full initial set // is retrieved and applied. // We could release earlier with some tweaks, but this is easier to reason about. ts.mtx.Lock() defer ts.mtx.Unlock() var wg sync.WaitGroup if ts.cancelProviders != nil { ts.cancelProviders() } ctx, ts.cancelProviders = context.WithCancel(ctx) // (Re-)create a fresh tgroups map to not keep stale targets around. We // will retrieve all targets below anyway, so cleaning up everything is // safe and doesn't inflict any additional cost. ts.tgroups = map[string][]*Target{} for name, prov := range providers { wg.Add(1) updates := make(chan []*config.TargetGroup) go func(name string, prov TargetProvider) { select { case <-ctx.Done(): case initial, ok := <-updates: // Handle the case that a target provider exits and closes the channel // before the context is done. if !ok { break } // First set of all targets the provider knows. for _, tgroup := range initial { if tgroup == nil { continue } targets, err := targetsFromGroup(tgroup, ts.config) if err != nil { log.With("target_group", tgroup).Errorf("Target update failed: %s", err) continue } ts.tgroups[name+"/"+tgroup.Source] = targets } case <-time.After(5 * time.Second): // Initial set didn't arrive. Act as if it was empty // and wait for updates later on. } wg.Done() // Start listening for further updates. for { select { case <-ctx.Done(): return case tgs, ok := <-updates: // Handle the case that a target provider exits and closes the channel // before the context is done. if !ok { return } for _, tg := range tgs { if err := ts.update(name, tg); err != nil { log.With("target_group", tg).Errorf("Target update failed: %s", err) } } } } }(name, prov) go prov.Run(ctx, updates) } // We wait for a full initial set of target groups before releasing the mutex // to ensure the initial sync is complete and there are no races with subsequent updates. wg.Wait() // Just signal that there are initial sets to sync now. Actual syncing must only // happen in the runScraping loop. select { case ts.syncCh <- struct{}{}: default: } } // update handles a target group update from a target provider identified by the name. func (ts *targetSet) update(name string, tgroup *config.TargetGroup) error { if tgroup == nil { return nil } targets, err := targetsFromGroup(tgroup, ts.config) if err != nil { return err } ts.mtx.Lock() defer ts.mtx.Unlock() ts.tgroups[name+"/"+tgroup.Source] = targets select { case ts.syncCh <- struct{}{}: default: } return nil } // providersFromConfig returns all TargetProviders configured in cfg. func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider { providers := map[string]TargetProvider{} app := func(mech string, i int, tp TargetProvider) { providers[fmt.Sprintf("%s/%d", mech, i)] = tp } for i, c := range cfg.DNSSDConfigs { app("dns", i, discovery.NewDNS(c)) } for i, c := range cfg.FileSDConfigs { app("file", i, discovery.NewFileDiscovery(c)) } for i, c := range cfg.ConsulSDConfigs { k, err := discovery.NewConsul(c) if err != nil { log.Errorf("Cannot create Consul discovery: %s", err) continue } app("consul", i, k) } for i, c := range cfg.MarathonSDConfigs { app("marathon", i, discovery.NewMarathon(c)) } for i, c := range cfg.KubernetesSDConfigs { k, err := discovery.NewKubernetesDiscovery(c) if err != nil { log.Errorf("Cannot create Kubernetes discovery: %s", err) continue } app("kubernetes", i, k) } for i, c := range cfg.ServersetSDConfigs { app("serverset", i, discovery.NewServersetDiscovery(c)) } for i, c := range cfg.NerveSDConfigs { app("nerve", i, discovery.NewNerveDiscovery(c)) } for i, c := range cfg.EC2SDConfigs { app("ec2", i, discovery.NewEC2Discovery(c)) } for i, c := range cfg.AzureSDConfigs { app("azure", i, discovery.NewAzureDiscovery(c)) } if len(cfg.StaticConfigs) > 0 { app("static", 0, NewStaticProvider(cfg.StaticConfigs)) } return providers } // populateLabels builds a label set from the given label set and scrape configuration. // It returns a label set before relabeling was applied as the second return value. // Returns a nil label set if the target is dropped during relabeling. func populateLabels(lset model.LabelSet, cfg *config.ScrapeConfig) (res, orig model.LabelSet, err error) { if _, ok := lset[model.AddressLabel]; !ok { return nil, nil, fmt.Errorf("no address") } // Copy labels into the labelset for the target if they are not // set already. Apply the labelsets in order of decreasing precedence. scrapeLabels := model.LabelSet{ model.SchemeLabel: model.LabelValue(cfg.Scheme), model.MetricsPathLabel: model.LabelValue(cfg.MetricsPath), model.JobLabel: model.LabelValue(cfg.JobName), } for ln, lv := range scrapeLabels { if _, ok := lset[ln]; !ok { lset[ln] = lv } } // Encode scrape query parameters as labels. for k, v := range cfg.Params { if len(v) > 0 { lset[model.LabelName(model.ParamLabelPrefix+k)] = model.LabelValue(v[0]) } } preRelabelLabels := lset lset = relabel.Process(lset, cfg.RelabelConfigs...) // Check if the target was dropped. if lset == nil { return nil, nil, nil } // addPort checks whether we should add a default port to the address. // If the address is not valid, we don't append a port either. addPort := func(s string) bool { // If we can split, a port exists and we don't have to add one. if _, _, err := net.SplitHostPort(s); err == nil { return false } // If adding a port makes it valid, the previous error // was not due to an invalid address and we can append a port. _, _, err := net.SplitHostPort(s + ":1234") return err == nil } // If it's an address with no trailing port, infer it based on the used scheme. if addr := string(lset[model.AddressLabel]); addPort(addr) { // Addresses reaching this point are already wrapped in [] if necessary. switch lset[model.SchemeLabel] { case "http", "": addr = addr + ":80" case "https": addr = addr + ":443" default: return nil, nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme) } lset[model.AddressLabel] = model.LabelValue(addr) } if err := config.CheckTargetAddress(lset[model.AddressLabel]); err != nil { return nil, nil, err } // Meta labels are deleted after relabelling. Other internal labels propagate to // the target which decides whether they will be part of their label set. for ln := range lset { if strings.HasPrefix(string(ln), model.MetaLabelPrefix) { delete(lset, ln) } } // Default the instance label to the target address. if _, ok := lset[model.InstanceLabel]; !ok { lset[model.InstanceLabel] = lset[model.AddressLabel] } return lset, preRelabelLabels, nil } // targetsFromGroup builds targets based on the given TargetGroup and config. func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) { targets := make([]*Target, 0, len(tg.Targets)) for i, lset := range tg.Targets { // Combine target labels with target group labels. for ln, lv := range tg.Labels { if _, ok := lset[ln]; !ok { lset[ln] = lv } } labels, origLabels, err := populateLabels(lset, cfg) if err != nil { return nil, fmt.Errorf("instance %d in group %s: %s", i, tg, err) } if labels != nil { targets = append(targets, NewTarget(labels, origLabels, cfg.Params)) } } return targets, nil } // StaticProvider holds a list of target groups that never change. type StaticProvider struct { TargetGroups []*config.TargetGroup } // NewStaticProvider returns a StaticProvider configured with the given // target groups. func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { for i, tg := range groups { tg.Source = fmt.Sprintf("%d", i) } return &StaticProvider{groups} } // Run implements the TargetProvider interface. func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { // We still have to consider that the consumer exits right away in which case // the context will be canceled. select { case ch <- sd.TargetGroups: case <-ctx.Done(): } close(ch) }