Discovery: abstain from restarting providers if possible (#9321) (#9349)

* Abstain from restarting discovery providers if possible (#9321)

Signed-off-by: Vladimir Kononov <krya-kryak@users.noreply.github.com>
This commit is contained in:
Vladimir Kononov 2021-10-20 12:16:20 +04:00 committed by GitHub
parent 432005826d
commit 1043d2b594
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 419 additions and 68 deletions

View File

@ -74,12 +74,26 @@ type poolKey struct {
provider string provider string
} }
// provider holds a Discoverer instance, its configuration and its subscribers. // provider holds a Discoverer instance, its configuration, cancel func and its subscribers.
type provider struct { type provider struct {
name string name string
d Discoverer d Discoverer
subs []string
config interface{} config interface{}
cancel context.CancelFunc
// done should be called after cleaning up resources associated with cancelled provider.
done func()
mu sync.RWMutex
subs map[string]struct{}
// newSubs is used to temporary store subs to be used upon config reload completion.
newSubs map[string]struct{}
}
// IsStarted return true if Discoverer is started.
func (p *provider) IsStarted() bool {
return p.cancel != nil
} }
// NewManager is the Discovery Manager constructor. // NewManager is the Discovery Manager constructor.
@ -88,13 +102,12 @@ func NewManager(ctx context.Context, logger log.Logger, options ...func(*Manager
logger = log.NewNopLogger() logger = log.NewNopLogger()
} }
mgr := &Manager{ mgr := &Manager{
logger: logger, logger: logger,
syncCh: make(chan map[string][]*targetgroup.Group), syncCh: make(chan map[string][]*targetgroup.Group),
targets: make(map[poolKey]map[string]*targetgroup.Group), targets: make(map[poolKey]map[string]*targetgroup.Group),
discoverCancel: []context.CancelFunc{}, ctx: ctx,
ctx: ctx, updatert: 5 * time.Second,
updatert: 5 * time.Second, triggerSend: make(chan struct{}, 1),
triggerSend: make(chan struct{}, 1),
} }
for _, option := range options { for _, option := range options {
option(mgr) option(mgr)
@ -114,15 +127,16 @@ func Name(n string) func(*Manager) {
// Manager maintains a set of discovery providers and sends each update to a map channel. // Manager maintains a set of discovery providers and sends each update to a map channel.
// Targets are grouped by the target set name. // Targets are grouped by the target set name.
type Manager struct { type Manager struct {
logger log.Logger logger log.Logger
name string name string
mtx sync.RWMutex mtx sync.RWMutex
ctx context.Context ctx context.Context
discoverCancel []context.CancelFunc
// Some Discoverers(eg. k8s) send only the updates for a given target group // Some Discoverers(e.g. k8s) send only the updates for a given target group,
// so we use map[tg.Source]*targetgroup.Group to know which group to update. // so we use map[tg.Source]*targetgroup.Group to know which group to update.
targets map[poolKey]map[string]*targetgroup.Group targets map[poolKey]map[string]*targetgroup.Group
targetsMtx sync.Mutex
// providers keeps track of SD providers. // providers keeps track of SD providers.
providers []*provider providers []*provider
// The sync channel sends the updates as a map where the key is the job value from the scrape config. // The sync channel sends the updates as a map where the key is the job value from the scrape config.
@ -132,11 +146,14 @@ type Manager struct {
// should only be modified in unit tests. // should only be modified in unit tests.
updatert time.Duration updatert time.Duration
// The triggerSend channel signals to the manager that new updates have been received from providers. // The triggerSend channel signals to the Manager that new updates have been received from providers.
triggerSend chan struct{} triggerSend chan struct{}
// lastProvider counts providers registered during Manager's lifetime.
lastProvider uint
} }
// Run starts the background processing // Run starts the background processing.
func (m *Manager) Run() error { func (m *Manager) Run() error {
go m.sender() go m.sender()
for range m.ctx.Done() { for range m.ctx.Done() {
@ -151,31 +168,80 @@ func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group {
return m.syncCh return m.syncCh
} }
// ApplyConfig removes all running discovery providers and starts new ones using the provided config. // ApplyConfig checks if discovery provider with supplied config is already running and keeps them as is.
// Remaining providers are then stopped and new required providers are started using the provided config.
func (m *Manager) ApplyConfig(cfg map[string]Configs) error { func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
m.mtx.Lock() m.mtx.Lock()
defer m.mtx.Unlock() defer m.mtx.Unlock()
for pk := range m.targets { var failedCount int
if _, ok := cfg[pk.setName]; !ok {
discoveredTargets.DeleteLabelValues(m.name, pk.setName)
}
}
m.cancelDiscoverers()
m.targets = make(map[poolKey]map[string]*targetgroup.Group)
m.providers = nil
m.discoverCancel = nil
failedCount := 0
for name, scfg := range cfg { for name, scfg := range cfg {
failedCount += m.registerProviders(scfg, name) failedCount += m.registerProviders(scfg, name)
discoveredTargets.WithLabelValues(m.name, name).Set(0)
} }
failedConfigs.WithLabelValues(m.name).Set(float64(failedCount)) failedConfigs.WithLabelValues(m.name).Set(float64(failedCount))
var (
wg sync.WaitGroup
// keep shows if we keep any providers after reload.
keep bool
newProviders []*provider
)
for _, prov := range m.providers { for _, prov := range m.providers {
m.startProvider(m.ctx, prov) // Cancel obsolete providers.
if len(prov.newSubs) == 0 {
wg.Add(1)
prov.done = func() {
wg.Done()
}
prov.cancel()
continue
}
newProviders = append(newProviders, prov)
// refTargets keeps reference targets used to populate new subs' targets
var refTargets map[string]*targetgroup.Group
prov.mu.Lock()
for s := range prov.subs {
keep = true
refTargets = m.targets[poolKey{s, prov.name}]
// Remove obsolete subs' targets.
if _, ok := prov.newSubs[s]; !ok {
m.targetsMtx.Lock()
delete(m.targets, poolKey{s, prov.name})
m.targetsMtx.Unlock()
discoveredTargets.DeleteLabelValues(m.name, s)
}
}
// Set metrics and targets for new subs.
for s := range prov.newSubs {
if _, ok := prov.subs[s]; !ok {
discoveredTargets.WithLabelValues(m.name, s).Set(0)
}
if l := len(refTargets); l > 0 {
m.targets[poolKey{s, prov.name}] = make(map[string]*targetgroup.Group, l)
for k, v := range refTargets {
m.targets[poolKey{s, prov.name}][k] = v
}
}
}
prov.subs = prov.newSubs
prov.newSubs = map[string]struct{}{}
prov.mu.Unlock()
if !prov.IsStarted() {
m.startProvider(m.ctx, prov)
}
} }
// Currently downstream managers expect full target state upon config reload, so we must oblige.
// While startProvider does pull the trigger, it may take some time to do so, therefore
// we pull the trigger as soon as possible so that downstream managers can populate their state.
// See https://github.com/prometheus/prometheus/pull/8639 for details.
if keep {
select {
case m.triggerSend <- struct{}{}:
default:
}
}
m.providers = newProviders
wg.Wait()
return nil return nil
} }
@ -185,7 +251,9 @@ func (m *Manager) StartCustomProvider(ctx context.Context, name string, worker D
p := &provider{ p := &provider{
name: name, name: name,
d: worker, d: worker,
subs: []string{name}, subs: map[string]struct{}{
name: {},
},
} }
m.providers = append(m.providers, p) m.providers = append(m.providers, p)
m.startProvider(ctx, p) m.startProvider(ctx, p)
@ -196,13 +264,29 @@ func (m *Manager) startProvider(ctx context.Context, p *provider) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
updates := make(chan []*targetgroup.Group) updates := make(chan []*targetgroup.Group)
m.discoverCancel = append(m.discoverCancel, cancel) p.cancel = cancel
go p.d.Run(ctx, updates) go p.d.Run(ctx, updates)
go m.updater(ctx, p, updates) go m.updater(ctx, p, updates)
} }
// cleaner cleans resources associated with provider.
func (m *Manager) cleaner(p *provider) {
m.targetsMtx.Lock()
p.mu.RLock()
for s := range p.subs {
delete(m.targets, poolKey{s, p.name})
}
p.mu.RUnlock()
m.targetsMtx.Unlock()
if p.done != nil {
p.done()
}
}
func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) { func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {
// Ensure targets from this provider are cleaned up.
defer m.cleaner(p)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -211,12 +295,16 @@ func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targ
receivedUpdates.WithLabelValues(m.name).Inc() receivedUpdates.WithLabelValues(m.name).Inc()
if !ok { if !ok {
level.Debug(m.logger).Log("msg", "Discoverer channel closed", "provider", p.name) level.Debug(m.logger).Log("msg", "Discoverer channel closed", "provider", p.name)
// Wait for provider cancellation to ensure targets are cleaned up when expected.
<-ctx.Done()
return return
} }
for _, s := range p.subs { p.mu.RLock()
for s := range p.subs {
m.updateGroup(poolKey{setName: s, provider: p.name}, tgs) m.updateGroup(poolKey{setName: s, provider: p.name}, tgs)
} }
p.mu.RUnlock()
select { select {
case m.triggerSend <- struct{}{}: case m.triggerSend <- struct{}{}:
@ -234,7 +322,7 @@ func (m *Manager) sender() {
select { select {
case <-m.ctx.Done(): case <-m.ctx.Done():
return return
case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker. case <-ticker.C: // Some discoverers send updates too often, so we throttle these with the ticker.
select { select {
case <-m.triggerSend: case <-m.triggerSend:
sentUpdates.WithLabelValues(m.name).Inc() sentUpdates.WithLabelValues(m.name).Inc()
@ -255,14 +343,18 @@ func (m *Manager) sender() {
} }
func (m *Manager) cancelDiscoverers() { func (m *Manager) cancelDiscoverers() {
for _, c := range m.discoverCancel { m.mtx.RLock()
c() defer m.mtx.RUnlock()
for _, p := range m.providers {
if p.cancel != nil {
p.cancel()
}
} }
} }
func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) { func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) {
m.mtx.Lock() m.targetsMtx.Lock()
defer m.mtx.Unlock() defer m.targetsMtx.Unlock()
if _, ok := m.targets[poolKey]; !ok { if _, ok := m.targets[poolKey]; !ok {
m.targets[poolKey] = make(map[string]*targetgroup.Group) m.targets[poolKey] = make(map[string]*targetgroup.Group)
@ -275,11 +367,11 @@ func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) {
} }
func (m *Manager) allGroups() map[string][]*targetgroup.Group { func (m *Manager) allGroups() map[string][]*targetgroup.Group {
m.mtx.RLock()
defer m.mtx.RUnlock()
tSets := map[string][]*targetgroup.Group{} tSets := map[string][]*targetgroup.Group{}
n := map[string]int{} n := map[string]int{}
m.targetsMtx.Lock()
defer m.targetsMtx.Unlock()
for pkey, tsets := range m.targets { for pkey, tsets := range m.targets {
for _, tg := range tsets { for _, tg := range tsets {
// Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager' // Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager'
@ -303,7 +395,7 @@ func (m *Manager) registerProviders(cfgs Configs, setName string) int {
add := func(cfg Config) { add := func(cfg Config) {
for _, p := range m.providers { for _, p := range m.providers {
if reflect.DeepEqual(cfg, p.config) { if reflect.DeepEqual(cfg, p.config) {
p.subs = append(p.subs, setName) p.newSubs[setName] = struct{}{}
added = true added = true
return return
} }
@ -318,11 +410,14 @@ func (m *Manager) registerProviders(cfgs Configs, setName string) int {
return return
} }
m.providers = append(m.providers, &provider{ m.providers = append(m.providers, &provider{
name: fmt.Sprintf("%s/%d", typ, len(m.providers)), name: fmt.Sprintf("%s/%d", typ, m.lastProvider),
d: d, d: d,
config: cfg, config: cfg,
subs: []string{setName}, newSubs: map[string]struct{}{
setName: {},
},
}) })
m.lastProvider++
added = true added = true
} }
for _, cfg := range cfgs { for _, cfg := range cfgs {

View File

@ -18,6 +18,7 @@ import (
"fmt" "fmt"
"sort" "sort"
"strconv" "strconv"
"sync"
"testing" "testing"
"time" "time"
@ -718,6 +719,32 @@ func staticConfig(addrs ...string) StaticConfig {
return cfg return cfg
} }
func verifySyncedPresence(t *testing.T, tGroups map[string][]*targetgroup.Group, key string, label string, present bool) {
t.Helper()
if _, ok := tGroups[key]; !ok {
t.Fatalf("'%s' should be present in Group map keys: %v", key, tGroups)
return
}
match := false
var mergedTargets string
for _, targetGroups := range tGroups[key] {
for _, l := range targetGroups.Targets {
mergedTargets = mergedTargets + " " + l.String()
if l.String() == label {
match = true
}
}
}
if match != present {
msg := ""
if !present {
msg = "not"
}
t.Fatalf("%q should %s be present in Group labels: %q", label, msg, mergedTargets)
}
}
func verifyPresence(t *testing.T, tSets map[poolKey]map[string]*targetgroup.Group, poolKey poolKey, label string, present bool) { func verifyPresence(t *testing.T, tSets map[poolKey]map[string]*targetgroup.Group, poolKey poolKey, label string, present bool) {
t.Helper() t.Helper()
if _, ok := tSets[poolKey]; !ok { if _, ok := tSets[poolKey]; !ok {
@ -746,7 +773,180 @@ func verifyPresence(t *testing.T, tSets map[poolKey]map[string]*targetgroup.Grou
} }
} }
func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) { func pk(provider, setName string, n int) poolKey {
return poolKey{
setName: setName,
provider: fmt.Sprintf("%s/%d", provider, n),
}
}
func TestTargetSetTargetGroupsPresentOnConfigReload(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discoveryManager := NewManager(ctx, log.NewNopLogger())
discoveryManager.updatert = 100 * time.Millisecond
go discoveryManager.Run()
c := map[string]Configs{
"prometheus": {
staticConfig("foo:9090"),
},
}
discoveryManager.ApplyConfig(c)
syncedTargets := <-discoveryManager.SyncCh()
require.Equal(t, 1, len(syncedTargets))
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(syncedTargets["prometheus"]))
p := pk("static", "prometheus", 0)
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(discoveryManager.targets))
discoveryManager.ApplyConfig(c)
syncedTargets = <-discoveryManager.SyncCh()
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(discoveryManager.targets))
require.Equal(t, 1, len(syncedTargets))
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(syncedTargets["prometheus"]))
}
func TestTargetSetTargetGroupsPresentOnConfigRename(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discoveryManager := NewManager(ctx, log.NewNopLogger())
discoveryManager.updatert = 100 * time.Millisecond
go discoveryManager.Run()
c := map[string]Configs{
"prometheus": {
staticConfig("foo:9090"),
},
}
discoveryManager.ApplyConfig(c)
syncedTargets := <-discoveryManager.SyncCh()
require.Equal(t, 1, len(syncedTargets))
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(syncedTargets["prometheus"]))
p := pk("static", "prometheus", 0)
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(discoveryManager.targets))
c["prometheus2"] = c["prometheus"]
delete(c, "prometheus")
discoveryManager.ApplyConfig(c)
syncedTargets = <-discoveryManager.SyncCh()
p = pk("static", "prometheus2", 0)
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(discoveryManager.targets))
require.Equal(t, 1, len(syncedTargets))
verifySyncedPresence(t, syncedTargets, "prometheus2", "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(syncedTargets["prometheus2"]))
}
func TestTargetSetTargetGroupsPresentOnConfigDuplicateAndDeleteOriginal(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discoveryManager := NewManager(ctx, log.NewNopLogger())
discoveryManager.updatert = 100 * time.Millisecond
go discoveryManager.Run()
c := map[string]Configs{
"prometheus": {
staticConfig("foo:9090"),
},
}
discoveryManager.ApplyConfig(c)
<-discoveryManager.SyncCh()
c["prometheus2"] = c["prometheus"]
discoveryManager.ApplyConfig(c)
syncedTargets := <-discoveryManager.SyncCh()
require.Equal(t, 2, len(syncedTargets))
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(syncedTargets["prometheus"]))
verifySyncedPresence(t, syncedTargets, "prometheus2", "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(syncedTargets["prometheus2"]))
p := pk("static", "prometheus", 0)
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
require.Equal(t, 2, len(discoveryManager.targets))
delete(c, "prometheus")
discoveryManager.ApplyConfig(c)
syncedTargets = <-discoveryManager.SyncCh()
p = pk("static", "prometheus2", 0)
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(discoveryManager.targets))
require.Equal(t, 1, len(syncedTargets))
verifySyncedPresence(t, syncedTargets, "prometheus2", "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(syncedTargets["prometheus2"]))
}
func TestTargetSetTargetGroupsPresentOnConfigChange(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discoveryManager := NewManager(ctx, log.NewNopLogger())
discoveryManager.updatert = 100 * time.Millisecond
go discoveryManager.Run()
c := map[string]Configs{
"prometheus": {
staticConfig("foo:9090"),
},
}
discoveryManager.ApplyConfig(c)
syncedTargets := <-discoveryManager.SyncCh()
require.Equal(t, 1, len(syncedTargets))
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(syncedTargets["prometheus"]))
var mu sync.Mutex
c["prometheus2"] = Configs{
lockStaticConfig{
mu: &mu,
config: staticConfig("bar:9090"),
},
}
mu.Lock()
discoveryManager.ApplyConfig(c)
// Original targets should be present as soon as possible.
syncedTargets = <-discoveryManager.SyncCh()
mu.Unlock()
require.Equal(t, 1, len(syncedTargets))
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(syncedTargets["prometheus"]))
// prometheus2 configs should be ready on second sync.
syncedTargets = <-discoveryManager.SyncCh()
require.Equal(t, 2, len(syncedTargets))
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(syncedTargets["prometheus"]))
verifySyncedPresence(t, syncedTargets, "prometheus2", "{__address__=\"bar:9090\"}", true)
require.Equal(t, 1, len(syncedTargets["prometheus2"]))
p := pk("static", "prometheus", 0)
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
p = pk("lockstatic", "prometheus2", 1)
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"bar:9090\"}", true)
require.Equal(t, 2, len(discoveryManager.targets))
// Delete part of config and ensure only original targets exist.
delete(c, "prometheus2")
discoveryManager.ApplyConfig(c)
syncedTargets = <-discoveryManager.SyncCh()
require.Equal(t, 1, len(discoveryManager.targets))
verifyPresence(t, discoveryManager.targets, pk("static", "prometheus", 0), "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(syncedTargets))
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(syncedTargets["prometheus"]))
}
func TestTargetSetRecreatesTargetGroupsOnConfigChange(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
discoveryManager := NewManager(ctx, log.NewNopLogger()) discoveryManager := NewManager(ctx, log.NewNopLogger())
@ -760,18 +960,29 @@ func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) {
} }
discoveryManager.ApplyConfig(c) discoveryManager.ApplyConfig(c)
<-discoveryManager.SyncCh() syncedTargets := <-discoveryManager.SyncCh()
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) p := pk("static", "prometheus", 0)
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", true) verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"bar:9090\"}", true)
require.Equal(t, 1, len(discoveryManager.targets))
require.Equal(t, 1, len(syncedTargets))
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"bar:9090\"}", true)
require.Equal(t, 2, len(syncedTargets["prometheus"]))
c["prometheus"] = Configs{ c["prometheus"] = Configs{
staticConfig("foo:9090"), staticConfig("foo:9090"),
} }
discoveryManager.ApplyConfig(c) discoveryManager.ApplyConfig(c)
syncedTargets = <-discoveryManager.SyncCh()
<-discoveryManager.SyncCh() require.Equal(t, 1, len(discoveryManager.targets))
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) p = pk("static", "prometheus", 1)
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", false) verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"bar:9090\"}", false)
require.Equal(t, 1, len(discoveryManager.targets))
require.Equal(t, 1, len(syncedTargets))
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(syncedTargets["prometheus"]))
} }
func TestDiscovererConfigs(t *testing.T) { func TestDiscovererConfigs(t *testing.T) {
@ -789,10 +1000,18 @@ func TestDiscovererConfigs(t *testing.T) {
} }
discoveryManager.ApplyConfig(c) discoveryManager.ApplyConfig(c)
<-discoveryManager.SyncCh() syncedTargets := <-discoveryManager.SyncCh()
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) p := pk("static", "prometheus", 0)
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", true) verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/1"}, "{__address__=\"baz:9090\"}", true) verifyPresence(t, discoveryManager.targets, p, "{__address__=\"bar:9090\"}", true)
p = pk("static", "prometheus", 1)
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"baz:9090\"}", true)
require.Equal(t, 2, len(discoveryManager.targets))
require.Equal(t, 1, len(syncedTargets))
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"bar:9090\"}", true)
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"baz:9090\"}", true)
require.Equal(t, 3, len(syncedTargets["prometheus"]))
} }
// TestTargetSetRecreatesEmptyStaticConfigs ensures that reloading a config file after // TestTargetSetRecreatesEmptyStaticConfigs ensures that reloading a config file after
@ -812,20 +1031,23 @@ func TestTargetSetRecreatesEmptyStaticConfigs(t *testing.T) {
} }
discoveryManager.ApplyConfig(c) discoveryManager.ApplyConfig(c)
<-discoveryManager.SyncCh() syncedTargets := <-discoveryManager.SyncCh()
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) p := pk("static", "prometheus", 0)
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(syncedTargets))
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(syncedTargets["prometheus"]))
c["prometheus"] = Configs{ c["prometheus"] = Configs{
StaticConfig{{}}, StaticConfig{{}},
} }
discoveryManager.ApplyConfig(c) discoveryManager.ApplyConfig(c)
<-discoveryManager.SyncCh() syncedTargets = <-discoveryManager.SyncCh()
p = pk("static", "prometheus", 1)
pkey := poolKey{setName: "prometheus", provider: "static/0"} targetGroups, ok := discoveryManager.targets[p]
targetGroups, ok := discoveryManager.targets[pkey]
if !ok { if !ok {
t.Fatalf("'%v' should be present in target groups", pkey) t.Fatalf("'%v' should be present in target groups", p)
} }
group, ok := targetGroups[""] group, ok := targetGroups[""]
if !ok { if !ok {
@ -835,6 +1057,12 @@ func TestTargetSetRecreatesEmptyStaticConfigs(t *testing.T) {
if len(group.Targets) != 0 { if len(group.Targets) != 0 {
t.Fatalf("Invalid number of targets: expected 0, got %d", len(group.Targets)) t.Fatalf("Invalid number of targets: expected 0, got %d", len(group.Targets))
} }
require.Equal(t, 1, len(syncedTargets))
require.Equal(t, 1, len(syncedTargets["prometheus"]))
if lbls := syncedTargets["prometheus"][0].Labels; lbls != nil {
t.Fatalf("Unexpected Group: expected nil Labels, got %v", lbls)
}
} }
func TestIdenticalConfigurationsAreCoalesced(t *testing.T) { func TestIdenticalConfigurationsAreCoalesced(t *testing.T) {
@ -854,12 +1082,17 @@ func TestIdenticalConfigurationsAreCoalesced(t *testing.T) {
} }
discoveryManager.ApplyConfig(c) discoveryManager.ApplyConfig(c)
<-discoveryManager.SyncCh() syncedTargets := <-discoveryManager.SyncCh()
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) verifyPresence(t, discoveryManager.targets, pk("static", "prometheus", 0), "{__address__=\"foo:9090\"}", true)
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus2", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) verifyPresence(t, discoveryManager.targets, pk("static", "prometheus2", 0), "{__address__=\"foo:9090\"}", true)
if len(discoveryManager.providers) != 1 { if len(discoveryManager.providers) != 1 {
t.Fatalf("Invalid number of providers: expected 1, got %d", len(discoveryManager.providers)) t.Fatalf("Invalid number of providers: expected 1, got %d", len(discoveryManager.providers))
} }
require.Equal(t, 2, len(syncedTargets))
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(syncedTargets["prometheus"]))
verifySyncedPresence(t, syncedTargets, "prometheus2", "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(syncedTargets["prometheus2"]))
} }
func TestApplyConfigDoesNotModifyStaticTargets(t *testing.T) { func TestApplyConfigDoesNotModifyStaticTargets(t *testing.T) {
@ -891,6 +1124,29 @@ type errorConfig struct{ err error }
func (e errorConfig) Name() string { return "error" } func (e errorConfig) Name() string { return "error" }
func (e errorConfig) NewDiscoverer(DiscovererOptions) (Discoverer, error) { return nil, e.err } func (e errorConfig) NewDiscoverer(DiscovererOptions) (Discoverer, error) { return nil, e.err }
type lockStaticConfig struct {
mu *sync.Mutex
config StaticConfig
}
func (s lockStaticConfig) Name() string { return "lockstatic" }
func (s lockStaticConfig) NewDiscoverer(options DiscovererOptions) (Discoverer, error) {
return (lockStaticDiscoverer)(s), nil
}
type lockStaticDiscoverer lockStaticConfig
func (s lockStaticDiscoverer) Run(ctx context.Context, up chan<- []*targetgroup.Group) {
// TODO: existing implementation closes up chan, but documentation explicitly forbids it...?
defer close(up)
s.mu.Lock()
defer s.mu.Unlock()
select {
case <-ctx.Done():
case up <- s.config:
}
}
func TestGaugeFailedConfigs(t *testing.T) { func TestGaugeFailedConfigs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()