scrape: fix data races

This commit avoids passing the full scrape configuration down to the
scrape loop to fix data races when the scrape configuration is being
reloaded.

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
This commit is contained in:
Simon Pasquier 2018-04-12 16:54:53 +02:00
parent 8b89ab0173
commit 2cbba4e948
3 changed files with 43 additions and 33 deletions

View File

@ -249,7 +249,7 @@ func TestManagerReloadNoChange(t *testing.T) {
scrapeManager := NewManager(nil, nil) scrapeManager := NewManager(nil, nil)
scrapeManager.scrapeConfigs[tsetName] = reloadCfg.ScrapeConfigs[0] scrapeManager.scrapeConfigs[tsetName] = reloadCfg.ScrapeConfigs[0]
// As reload never happens, new loop should never be called. // As reload never happens, new loop should never be called.
newLoop := func(_ *Target, s scraper) loop { newLoop := func(_ *Target, s scraper, _ int, _ bool, _ []*config.RelabelConfig) loop {
t.Fatal("reload happened") t.Fatal("reload happened")
return nil return nil
} }

View File

@ -130,7 +130,7 @@ type scrapePool struct {
cancel context.CancelFunc cancel context.CancelFunc
// Constructor for new scrape loops. This is settable for testing convenience. // Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(*Target, scraper) loop newLoop func(*Target, scraper, int, bool, []*config.RelabelConfig) loop
} }
const maxAheadTime = 10 * time.Minute const maxAheadTime = 10 * time.Minute
@ -160,15 +160,21 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger)
loops: map[uint64]loop{}, loops: map[uint64]loop{},
logger: logger, logger: logger,
} }
sp.newLoop = func(t *Target, s scraper) loop { sp.newLoop = func(t *Target, s scraper, limit int, honor bool, mrc []*config.RelabelConfig) loop {
return newScrapeLoop( return newScrapeLoop(
ctx, ctx,
s, s,
log.With(logger, "target", t), log.With(logger, "target", t),
buffers, buffers,
func(l labels.Labels) labels.Labels { return sp.mutateSampleLabels(l, t) }, func(l labels.Labels) labels.Labels { return mutateSampleLabels(l, t, honor, mrc) },
func(l labels.Labels) labels.Labels { return sp.mutateReportSampleLabels(l, t) }, func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, t) },
sp.appender, func() storage.Appender {
app, err := app.Appender()
if err != nil {
panic(err)
}
return appender(app, limit)
},
) )
} }
@ -218,13 +224,16 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
wg sync.WaitGroup wg sync.WaitGroup
interval = time.Duration(sp.config.ScrapeInterval) interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout) timeout = time.Duration(sp.config.ScrapeTimeout)
limit = int(sp.config.SampleLimit)
honor = sp.config.HonorLabels
mrc = sp.config.MetricRelabelConfigs
) )
for fp, oldLoop := range sp.loops { for fp, oldLoop := range sp.loops {
var ( var (
t = sp.targets[fp] t = sp.targets[fp]
s = &targetScraper{Target: t, client: sp.client, timeout: timeout} s = &targetScraper{Target: t, client: sp.client, timeout: timeout}
newLoop = sp.newLoop(t, s) newLoop = sp.newLoop(t, s, limit, honor, mrc)
) )
wg.Add(1) wg.Add(1)
@ -295,6 +304,9 @@ func (sp *scrapePool) sync(targets []*Target) {
uniqueTargets = map[uint64]struct{}{} uniqueTargets = map[uint64]struct{}{}
interval = time.Duration(sp.config.ScrapeInterval) interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout) timeout = time.Duration(sp.config.ScrapeTimeout)
limit = int(sp.config.SampleLimit)
honor = sp.config.HonorLabels
mrc = sp.config.MetricRelabelConfigs
) )
for _, t := range targets { for _, t := range targets {
@ -304,7 +316,7 @@ func (sp *scrapePool) sync(targets []*Target) {
if _, ok := sp.targets[hash]; !ok { if _, ok := sp.targets[hash]; !ok {
s := &targetScraper{Target: t, client: sp.client, timeout: timeout} s := &targetScraper{Target: t, client: sp.client, timeout: timeout}
l := sp.newLoop(t, s) l := sp.newLoop(t, s, limit, honor, mrc)
sp.targets[hash] = t sp.targets[hash] = t
sp.loops[hash] = l sp.loops[hash] = l
@ -340,10 +352,10 @@ func (sp *scrapePool) sync(targets []*Target) {
wg.Wait() wg.Wait()
} }
func (sp *scrapePool) mutateSampleLabels(lset labels.Labels, target *Target) labels.Labels { func mutateSampleLabels(lset labels.Labels, target *Target, honor bool, rc []*config.RelabelConfig) labels.Labels {
lb := labels.NewBuilder(lset) lb := labels.NewBuilder(lset)
if sp.config.HonorLabels { if honor {
for _, l := range target.Labels() { for _, l := range target.Labels() {
if !lset.Has(l.Name) { if !lset.Has(l.Name) {
lb.Set(l.Name, l.Value) lb.Set(l.Name, l.Value)
@ -367,14 +379,14 @@ func (sp *scrapePool) mutateSampleLabels(lset labels.Labels, target *Target) lab
res := lb.Labels() res := lb.Labels()
if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 { if len(rc) > 0 {
res = relabel.Process(res, mrc...) res = relabel.Process(res, rc...)
} }
return res return res
} }
func (sp *scrapePool) mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels { func mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels {
lb := labels.NewBuilder(lset) lb := labels.NewBuilder(lset)
for _, l := range target.Labels() { for _, l := range target.Labels() {
@ -389,22 +401,17 @@ func (sp *scrapePool) mutateReportSampleLabels(lset labels.Labels, target *Targe
} }
// appender returns an appender for ingested samples from the target. // appender returns an appender for ingested samples from the target.
func (sp *scrapePool) appender() storage.Appender { func appender(app storage.Appender, limit int) storage.Appender {
app, err := sp.appendable.Appender()
if err != nil {
panic(err)
}
app = &timeLimitAppender{ app = &timeLimitAppender{
Appender: app, Appender: app,
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
} }
// The limit is applied after metrics are potentially dropped via relabeling. // The limit is applied after metrics are potentially dropped via relabeling.
if sp.config.SampleLimit > 0 { if limit > 0 {
app = &limitAppender{ app = &limitAppender{
Appender: app, Appender: app,
limit: int(sp.config.SampleLimit), limit: limit,
} }
} }
return app return app

View File

@ -218,7 +218,7 @@ func TestScrapePoolReload(t *testing.T) {
} }
// On starting to run, new loops created on reload check whether their preceding // On starting to run, new loops created on reload check whether their preceding
// equivalents have been stopped. // equivalents have been stopped.
newLoop := func(_ *Target, s scraper) loop { newLoop := func(_ *Target, s scraper, _ int, _ bool, _ []*config.RelabelConfig) loop {
l := &testLoop{} l := &testLoop{}
l.startFunc = func(interval, timeout time.Duration, errc chan<- error) { l.startFunc = func(interval, timeout time.Duration, errc chan<- error) {
if interval != 3*time.Second { if interval != 3*time.Second {
@ -306,7 +306,12 @@ func TestScrapePoolAppender(t *testing.T) {
app := &nopAppendable{} app := &nopAppendable{}
sp := newScrapePool(cfg, app, nil) sp := newScrapePool(cfg, app, nil)
wrapped := sp.appender() loop := sp.newLoop(nil, nil, 0, false, nil)
appl, ok := loop.(*scrapeLoop)
if !ok {
t.Fatalf("Expected scrapeLoop but got %T", loop)
}
wrapped := appl.appender()
tl, ok := wrapped.(*timeLimitAppender) tl, ok := wrapped.(*timeLimitAppender)
if !ok { if !ok {
@ -316,9 +321,12 @@ func TestScrapePoolAppender(t *testing.T) {
t.Fatalf("Expected base appender but got %T", tl.Appender) t.Fatalf("Expected base appender but got %T", tl.Appender)
} }
cfg.SampleLimit = 100 loop = sp.newLoop(nil, nil, 100, false, nil)
appl, ok = loop.(*scrapeLoop)
wrapped = sp.appender() if !ok {
t.Fatalf("Expected scrapeLoop but got %T", loop)
}
wrapped = appl.appender()
sl, ok := wrapped.(*limitAppender) sl, ok := wrapped.(*limitAppender)
if !ok { if !ok {
@ -744,11 +752,6 @@ func TestScrapeLoopAppend(t *testing.T) {
for _, test := range tests { for _, test := range tests {
app := &collectResultAppender{} app := &collectResultAppender{}
sp := &scrapePool{
config: &config.ScrapeConfig{
HonorLabels: test.honorLabels,
},
}
discoveryLabels := &Target{ discoveryLabels := &Target{
labels: labels.FromStrings(test.discoveryLabels...), labels: labels.FromStrings(test.discoveryLabels...),
@ -757,10 +760,10 @@ func TestScrapeLoopAppend(t *testing.T) {
sl := newScrapeLoop(context.Background(), sl := newScrapeLoop(context.Background(),
nil, nil, nil, nil, nil, nil,
func(l labels.Labels) labels.Labels { func(l labels.Labels) labels.Labels {
return sp.mutateSampleLabels(l, discoveryLabels) return mutateSampleLabels(l, discoveryLabels, test.honorLabels, nil)
}, },
func(l labels.Labels) labels.Labels { func(l labels.Labels) labels.Labels {
return sp.mutateReportSampleLabels(l, discoveryLabels) return mutateReportSampleLabels(l, discoveryLabels)
}, },
func() storage.Appender { return app }, func() storage.Appender { return app },
) )