prometheus/retrieval/targetmanager.go

178 lines
4.5 KiB
Go

// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"context"
"sync"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/storage"
)
// TargetManager maintains a set of targets, starts and stops their scraping and
// creates the new targets based on the target groups it receives from various
// target providers.
type TargetManager struct {
append Appendable
scrapeConfigs []*config.ScrapeConfig
mtx sync.RWMutex
ctx context.Context
cancel func()
wg sync.WaitGroup
// Set of unqiue targets by scrape configuration.
targetSets map[string]*targetSet
logger log.Logger
starting chan struct{}
}
type targetSet struct {
ctx context.Context
cancel func()
ts *discovery.TargetSet
sp *scrapePool
}
// Appendable returns an Appender.
type Appendable interface {
Appender() (storage.Appender, error)
}
// NewTargetManager creates a new TargetManager.
func NewTargetManager(app Appendable, logger log.Logger) *TargetManager {
return &TargetManager{
append: app,
targetSets: map[string]*targetSet{},
logger: logger,
starting: make(chan struct{}),
}
}
// Run starts background processing to handle target updates.
func (tm *TargetManager) Run() {
level.Info(tm.logger).Log("msg", "Starting target manager...")
tm.mtx.Lock()
tm.ctx, tm.cancel = context.WithCancel(context.Background())
tm.reload()
tm.mtx.Unlock()
close(tm.starting)
tm.wg.Wait()
}
// Stop all background processing.
func (tm *TargetManager) Stop() {
<-tm.starting
level.Info(tm.logger).Log("msg", "Stopping target manager...")
tm.mtx.Lock()
// Cancel the base context, this will cause all target providers to shut down
// and all in-flight scrapes to abort immmediately.
// Started inserts will be finished before terminating.
tm.cancel()
tm.mtx.Unlock()
// Wait for all scrape inserts to complete.
tm.wg.Wait()
level.Info(tm.logger).Log("msg", "Target manager stopped")
}
func (tm *TargetManager) reload() {
jobs := map[string]struct{}{}
// Start new target sets and update existing ones.
for _, scfg := range tm.scrapeConfigs {
jobs[scfg.JobName] = struct{}{}
ts, ok := tm.targetSets[scfg.JobName]
if !ok {
ctx, cancel := context.WithCancel(tm.ctx)
ts = &targetSet{
ctx: ctx,
cancel: cancel,
sp: newScrapePool(ctx, scfg, tm.append, log.With(tm.logger, "scrape_pool", scfg.JobName)),
}
ts.ts = discovery.NewTargetSet(ts.sp)
tm.targetSets[scfg.JobName] = ts
tm.wg.Add(1)
go func(ts *targetSet) {
// Run target set, which blocks until its context is canceled.
// Gracefully shut down pending scrapes in the scrape pool afterwards.
ts.ts.Run(ctx)
ts.sp.stop()
tm.wg.Done()
}(ts)
} else {
ts.sp.reload(scfg)
}
ts.ts.UpdateProviders(discovery.ProvidersFromConfig(scfg.ServiceDiscoveryConfig, tm.logger))
}
// Remove old target sets. Waiting for scrape pools to complete pending
// scrape inserts is already guaranteed by the goroutine that started the target set.
for name, ts := range tm.targetSets {
if _, ok := jobs[name]; !ok {
ts.cancel()
delete(tm.targetSets, name)
}
}
}
// Targets returns the targets currently being scraped.
func (tm *TargetManager) Targets() []*Target {
tm.mtx.RLock()
defer tm.mtx.RUnlock()
targets := []*Target{}
for _, ps := range tm.targetSets {
ps.sp.mtx.RLock()
for _, t := range ps.sp.targets {
targets = append(targets, t)
}
ps.sp.mtx.RUnlock()
}
return targets
}
// ApplyConfig resets the manager's target providers and job configurations as defined
// by the new cfg. The state of targets that are valid in the new configuration remains unchanged.
func (tm *TargetManager) ApplyConfig(cfg *config.Config) error {
tm.mtx.Lock()
defer tm.mtx.Unlock()
tm.scrapeConfigs = cfg.ScrapeConfigs
if tm.ctx != nil {
tm.reload()
}
return nil
}