prometheus/retrieval/targetmanager.go

408 lines
11 KiB
Go
Raw Normal View History

// Copyright 2013 The Prometheus Authors
2013-01-04 13:41:47 +00:00
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"fmt"
"strings"
"sync"
"github.com/golang/glog"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/config"
pb "github.com/prometheus/prometheus/config/generated"
"github.com/prometheus/prometheus/retrieval/discovery"
"github.com/prometheus/prometheus/storage"
2013-01-04 13:41:47 +00:00
)
// A TargetProvider provides information about target groups. It maintains a set
// of sources from which TargetGroups can originate. Whenever a target provider
// detects a potential change it sends the TargetGroup through its provided channel.
//
// The TargetProvider does not have to guarantee that an actual change happened.
// It does guarantee that it sends the new TargetGroup whenever a change happens.
// On startup it sends all TargetGroups it can see.
type TargetProvider interface {
// Sources returns the source identifiers the provider is currently aware of.
Sources() []string
// Run hands a channel to the target provider through which it can send
// updated target groups. The channel must be closed by the target provider
// if no more updates will be sent.
Run(chan<- *config.TargetGroup)
// Stop terminates any potential computation of the target provider. The
// channel received on Run must be closed afterwards.
Stop()
2013-01-04 13:41:47 +00:00
}
// TargetManager maintains a set of targets, starts and stops their scraping and
// creates the new targets based on the target groups it receives from various
// target providers.
type TargetManager struct {
m sync.RWMutex
globalLabels clientmodel.LabelSet
sampleAppender storage.SampleAppender
running bool
// Targets by their source ID.
targets map[string][]Target
// Providers and configs by their job name.
// TODO(fabxc): turn this into map[*ScrapeConfig][]TargetProvider eventually.
providers map[string][]TargetProvider
configs map[string]config.JobConfig
2013-01-04 13:41:47 +00:00
}
// NewTargetManager creates a new TargetManager based on the given config.
func NewTargetManager(cfg config.Config, sampleAppender storage.SampleAppender) (*TargetManager, error) {
tm := &TargetManager{
sampleAppender: sampleAppender,
targets: make(map[string][]Target),
}
if err := tm.applyConfig(cfg); err != nil {
return nil, err
2013-01-04 13:41:47 +00:00
}
return tm, nil
2013-01-04 13:41:47 +00:00
}
// Run starts background processing to handle target updates.
func (tm *TargetManager) Run() {
glog.Info("Starting target manager...")
2013-01-04 13:41:47 +00:00
sources := map[string]struct{}{}
for name, provs := range tm.providers {
for _, p := range provs {
jcfg := tm.configs[name]
ch := make(chan *config.TargetGroup)
go tm.handleTargetUpdates(tm.configs[name], ch)
for _, src := range p.Sources() {
src = fullSource(jcfg, src)
sources[src] = struct{}{}
}
// Run the target provider after cleanup of the stale targets is done.
defer func(c chan *config.TargetGroup) {
go p.Run(c)
}(ch)
}
2013-01-04 13:41:47 +00:00
}
tm.removeTargets(func(src string) bool {
if _, ok := sources[src]; ok {
return false
}
return true
})
tm.running = true
}
// handleTargetUpdates receives target group updates and handles them in the
// context of the given job config.
func (tm *TargetManager) handleTargetUpdates(cfg config.JobConfig, ch <-chan *config.TargetGroup) {
for tg := range ch {
glog.V(1).Infof("Received potential update for target group %q", tg.Source)
if err := tm.updateTargetGroup(tg, cfg); err != nil {
glog.Errorf("Error updating targets: %s", err)
}
}
}
2013-01-04 13:41:47 +00:00
// fullSource prepends the unique job name to the source.
//
// Thus, oscilliating label sets for targets with the same source,
// but providers from different configs, are prevented.
func fullSource(cfg config.JobConfig, src string) string {
return cfg.GetName() + ":" + src
}
// Stop all background processing.
func (tm *TargetManager) Stop() {
tm.stop(true)
2013-01-04 13:41:47 +00:00
}
// stop background processing of the target manager. If removeTargets is true,
// existing targets will be stopped and removed.
func (tm *TargetManager) stop(removeTargets bool) {
tm.m.Lock()
defer tm.m.Unlock()
if !tm.running {
return
}
glog.Info("Stopping target manager...")
defer glog.Info("Target manager stopped.")
for _, provs := range tm.providers {
for _, p := range provs {
p.Stop()
}
}
if removeTargets {
tm.removeTargets(nil)
}
tm.running = false
2013-01-04 13:41:47 +00:00
}
// removeTargets stops and removes targets for sources where f(source) is true
// or if f is nil. This method is not thread-safe.
func (tm *TargetManager) removeTargets(f func(string) bool) {
if f == nil {
f = func(string) bool { return true }
}
var wg sync.WaitGroup
for src, targets := range tm.targets {
if !f(src) {
continue
}
wg.Add(len(targets))
for _, target := range targets {
go func(t Target) {
t.StopScraper()
wg.Done()
}(target)
}
delete(tm.targets, src)
}
wg.Wait()
}
// updateTargetGroup creates new targets for the group and replaces the old targets
// for the source ID.
func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg config.JobConfig) error {
newTargets, err := tm.targetsFromGroup(tgroup, cfg)
if err != nil {
return err
}
src := fullSource(cfg, tgroup.Source)
tm.m.Lock()
defer tm.m.Unlock()
oldTargets, ok := tm.targets[src]
if ok {
var wg sync.WaitGroup
// Replace the old targets with the new ones while keeping the state
// of intersecting targets.
for i, tnew := range newTargets {
var match Target
for j, told := range oldTargets {
if told == nil {
continue
}
if tnew.InstanceIdentifier() == told.InstanceIdentifier() {
match = told
oldTargets[j] = nil
break
}
}
// Update the exisiting target and discard the new equivalent.
// Otherwise start scraping the new target.
if match != nil {
// Updating is blocked during a scrape. We don't want those wait times
// to build up.
wg.Add(1)
go func(t Target) {
match.Update(cfg, t.BaseLabels())
wg.Done()
}(tnew)
newTargets[i] = match
} else {
go tnew.RunScraper(tm.sampleAppender)
}
}
// Remove all old targets that disappeared.
for _, told := range oldTargets {
if told != nil {
wg.Add(1)
go func(t Target) {
t.StopScraper()
wg.Done()
}(told)
}
}
wg.Wait()
} else {
// The source ID is new, start all target scrapers.
for _, tnew := range newTargets {
go tnew.RunScraper(tm.sampleAppender)
}
}
if len(newTargets) > 0 {
tm.targets[src] = newTargets
} else {
delete(tm.targets, src)
}
return nil
}
// Pools returns the targets currently being scraped bucketed by their job name.
func (tm *TargetManager) Pools() map[string][]Target {
tm.m.RLock()
defer tm.m.RUnlock()
pools := map[string][]Target{}
for _, ts := range tm.targets {
for _, t := range ts {
job := string(t.BaseLabels()[clientmodel.JobLabel])
pools[job] = append(pools[job], t)
}
}
return pools
}
// ApplyConfig resets the manager's target providers and job configurations as defined
// by the new cfg. The state of targets that are valid in the new configuration remains unchanged.
func (tm *TargetManager) ApplyConfig(cfg config.Config) error {
tm.stop(false)
// Even if updating the config failed, we want to continue rather than stop scraping anything.
defer tm.Run()
if err := tm.applyConfig(cfg); err != nil {
glog.Warningf("Error updating config, changes not applied: %s", err)
return err
}
return nil
}
func (tm *TargetManager) applyConfig(cfg config.Config) error {
// Only apply changes if everything was successful.
providers := map[string][]TargetProvider{}
configs := map[string]config.JobConfig{}
for _, jcfg := range cfg.Jobs() {
provs, err := ProvidersFromConfig(jcfg)
if err != nil {
return err
}
configs[jcfg.GetName()] = jcfg
providers[jcfg.GetName()] = provs
}
tm.m.Lock()
defer tm.m.Unlock()
tm.globalLabels = cfg.GlobalLabels()
tm.providers = providers
tm.configs = configs
return nil
}
// targetsFromGroup builds targets based on the given TargetGroup and config.
func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg config.JobConfig) ([]Target, error) {
tm.m.RLock()
defer tm.m.RUnlock()
targets := make([]Target, 0, len(tg.Targets))
for i, labels := range tg.Targets {
for ln, lv := range tg.Labels {
if _, ok := labels[ln]; !ok {
labels[ln] = lv
}
}
for ln, lv := range tm.globalLabels {
if _, ok := labels[ln]; !ok {
labels[ln] = lv
}
}
address, ok := labels[clientmodel.AddressLabel]
if !ok {
return nil, fmt.Errorf("Instance %d in target group %s has no address", i, tg)
}
if _, ok := labels[clientmodel.JobLabel]; !ok {
labels[clientmodel.JobLabel] = clientmodel.LabelValue(cfg.GetName())
}
for ln := range labels {
// There are currently no internal labels we want to take over to time series.
if strings.HasPrefix(string(ln), clientmodel.ReservedLabelPrefix) {
delete(labels, ln)
}
}
targets = append(targets, NewTarget(string(address), cfg, labels))
}
return targets, nil
}
// ProvidersFromConfig returns all TargetProviders configured in cfg.
func ProvidersFromConfig(cfg config.JobConfig) ([]TargetProvider, error) {
var providers []TargetProvider
if name := cfg.GetSdName(); name != "" {
dnsSD := discovery.NewDNSDiscovery(name, cfg.SDRefreshInterval())
providers = append(providers, dnsSD)
}
if tgs := cfg.GetTargetGroup(); tgs != nil {
static := NewStaticProvider(tgs)
providers = append(providers, static)
}
return providers, nil
}
// StaticProvider holds a list of target groups that never change.
type StaticProvider struct {
TargetGroups []*config.TargetGroup
}
// NewStaticProvider returns a StaticProvider configured with the given
// target groups.
func NewStaticProvider(groups []*pb.TargetGroup) *StaticProvider {
prov := &StaticProvider{}
for i, tg := range groups {
g := &config.TargetGroup{
Source: fmt.Sprintf("static:%d", i),
Labels: clientmodel.LabelSet{},
}
for _, pair := range tg.GetLabels().GetLabel() {
g.Labels[clientmodel.LabelName(pair.GetName())] = clientmodel.LabelValue(pair.GetValue())
}
for _, t := range tg.GetTarget() {
g.Targets = append(g.Targets, clientmodel.LabelSet{
clientmodel.AddressLabel: clientmodel.LabelValue(t),
})
}
prov.TargetGroups = append(prov.TargetGroups, g)
}
return prov
}
// Run implements the TargetProvider interface.
func (sd *StaticProvider) Run(ch chan<- *config.TargetGroup) {
for _, tg := range sd.TargetGroups {
ch <- tg
}
close(ch) // This provider never sends any updates.
}
// Stop implements the TargetProvider interface.
func (sd *StaticProvider) Stop() {}
// TargetGroups returns the provider's target groups.
func (sd *StaticProvider) Sources() (srcs []string) {
for _, tg := range sd.TargetGroups {
srcs = append(srcs, tg.Source)
}
return srcs
}