Merge pull request #4096 from simonpasquier/fix-scrape-races-2.2

Fix scrape races (release-2.2 branch)
This commit is contained in:
Björn Rabenstein 2018-04-25 15:36:29 +02:00 committed by GitHub
commit 91e470d733
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 81 additions and 33 deletions

View File

@ -249,7 +249,7 @@ func TestManagerReloadNoChange(t *testing.T) {
scrapeManager := NewManager(nil, nil)
scrapeManager.scrapeConfigs[tsetName] = reloadCfg.ScrapeConfigs[0]
// As reload never happens, new loop should never be called.
newLoop := func(_ *Target, s scraper) loop {
newLoop := func(_ *Target, s scraper, _ int, _ bool, _ []*config.RelabelConfig) loop {
t.Fatal("reload happened")
return nil
}

View File

@ -130,7 +130,7 @@ type scrapePool struct {
cancel context.CancelFunc
// Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(*Target, scraper) loop
newLoop func(*Target, scraper, int, bool, []*config.RelabelConfig) loop
}
const maxAheadTime = 10 * time.Minute
@ -160,15 +160,21 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger)
loops: map[uint64]loop{},
logger: logger,
}
sp.newLoop = func(t *Target, s scraper) loop {
sp.newLoop = func(t *Target, s scraper, limit int, honor bool, mrc []*config.RelabelConfig) loop {
return newScrapeLoop(
ctx,
s,
log.With(logger, "target", t),
buffers,
func(l labels.Labels) labels.Labels { return sp.mutateSampleLabels(l, t) },
func(l labels.Labels) labels.Labels { return sp.mutateReportSampleLabels(l, t) },
sp.appender,
func(l labels.Labels) labels.Labels { return mutateSampleLabels(l, t, honor, mrc) },
func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, t) },
func() storage.Appender {
app, err := app.Appender()
if err != nil {
panic(err)
}
return appender(app, limit)
},
)
}
@ -218,13 +224,16 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
wg sync.WaitGroup
interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout)
limit = int(sp.config.SampleLimit)
honor = sp.config.HonorLabels
mrc = sp.config.MetricRelabelConfigs
)
for fp, oldLoop := range sp.loops {
var (
t = sp.targets[fp]
s = &targetScraper{Target: t, client: sp.client, timeout: timeout}
newLoop = sp.newLoop(t, s)
newLoop = sp.newLoop(t, s, limit, honor, mrc)
)
wg.Add(1)
@ -295,6 +304,9 @@ func (sp *scrapePool) sync(targets []*Target) {
uniqueTargets = map[uint64]struct{}{}
interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout)
limit = int(sp.config.SampleLimit)
honor = sp.config.HonorLabels
mrc = sp.config.MetricRelabelConfigs
)
for _, t := range targets {
@ -304,7 +316,7 @@ func (sp *scrapePool) sync(targets []*Target) {
if _, ok := sp.targets[hash]; !ok {
s := &targetScraper{Target: t, client: sp.client, timeout: timeout}
l := sp.newLoop(t, s)
l := sp.newLoop(t, s, limit, honor, mrc)
sp.targets[hash] = t
sp.loops[hash] = l
@ -340,10 +352,10 @@ func (sp *scrapePool) sync(targets []*Target) {
wg.Wait()
}
func (sp *scrapePool) mutateSampleLabels(lset labels.Labels, target *Target) labels.Labels {
func mutateSampleLabels(lset labels.Labels, target *Target, honor bool, rc []*config.RelabelConfig) labels.Labels {
lb := labels.NewBuilder(lset)
if sp.config.HonorLabels {
if honor {
for _, l := range target.Labels() {
if !lset.Has(l.Name) {
lb.Set(l.Name, l.Value)
@ -367,14 +379,14 @@ func (sp *scrapePool) mutateSampleLabels(lset labels.Labels, target *Target) lab
res := lb.Labels()
if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 {
res = relabel.Process(res, mrc...)
if len(rc) > 0 {
res = relabel.Process(res, rc...)
}
return res
}
func (sp *scrapePool) mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels {
func mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels {
lb := labels.NewBuilder(lset)
for _, l := range target.Labels() {
@ -389,22 +401,17 @@ func (sp *scrapePool) mutateReportSampleLabels(lset labels.Labels, target *Targe
}
// appender returns an appender for ingested samples from the target.
func (sp *scrapePool) appender() storage.Appender {
app, err := sp.appendable.Appender()
if err != nil {
panic(err)
}
func appender(app storage.Appender, limit int) storage.Appender {
app = &timeLimitAppender{
Appender: app,
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
}
// The limit is applied after metrics are potentially dropped via relabeling.
if sp.config.SampleLimit > 0 {
if limit > 0 {
app = &limitAppender{
Appender: app,
limit: int(sp.config.SampleLimit),
limit: limit,
}
}
return app

View File

@ -218,7 +218,7 @@ func TestScrapePoolReload(t *testing.T) {
}
// On starting to run, new loops created on reload check whether their preceding
// equivalents have been stopped.
newLoop := func(_ *Target, s scraper) loop {
newLoop := func(_ *Target, s scraper, _ int, _ bool, _ []*config.RelabelConfig) loop {
l := &testLoop{}
l.startFunc = func(interval, timeout time.Duration, errc chan<- error) {
if interval != 3*time.Second {
@ -306,7 +306,12 @@ func TestScrapePoolAppender(t *testing.T) {
app := &nopAppendable{}
sp := newScrapePool(cfg, app, nil)
wrapped := sp.appender()
loop := sp.newLoop(nil, nil, 0, false, nil)
appl, ok := loop.(*scrapeLoop)
if !ok {
t.Fatalf("Expected scrapeLoop but got %T", loop)
}
wrapped := appl.appender()
tl, ok := wrapped.(*timeLimitAppender)
if !ok {
@ -316,9 +321,12 @@ func TestScrapePoolAppender(t *testing.T) {
t.Fatalf("Expected base appender but got %T", tl.Appender)
}
cfg.SampleLimit = 100
wrapped = sp.appender()
loop = sp.newLoop(nil, nil, 100, false, nil)
appl, ok = loop.(*scrapeLoop)
if !ok {
t.Fatalf("Expected scrapeLoop but got %T", loop)
}
wrapped = appl.appender()
sl, ok := wrapped.(*limitAppender)
if !ok {
@ -333,6 +341,44 @@ func TestScrapePoolAppender(t *testing.T) {
}
}
func TestScrapePoolRaces(t *testing.T) {
interval, _ := model.ParseDuration("500ms")
timeout, _ := model.ParseDuration("1s")
newConfig := func() *config.ScrapeConfig {
return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout}
}
sp := newScrapePool(newConfig(), &nopAppendable{}, nil)
tgts := []*targetgroup.Group{
&targetgroup.Group{
Targets: []model.LabelSet{
model.LabelSet{model.AddressLabel: "127.0.0.1:9090"},
model.LabelSet{model.AddressLabel: "127.0.0.2:9090"},
model.LabelSet{model.AddressLabel: "127.0.0.3:9090"},
model.LabelSet{model.AddressLabel: "127.0.0.4:9090"},
model.LabelSet{model.AddressLabel: "127.0.0.5:9090"},
model.LabelSet{model.AddressLabel: "127.0.0.6:9090"},
model.LabelSet{model.AddressLabel: "127.0.0.7:9090"},
model.LabelSet{model.AddressLabel: "127.0.0.8:9090"},
},
},
}
active, dropped := sp.Sync(tgts)
expectedActive, expectedDropped := len(tgts[0].Targets), 0
if len(active) != expectedActive {
t.Fatalf("Invalid number of active targets: expected %v, got %v", expectedActive, len(active))
}
if len(dropped) != expectedDropped {
t.Fatalf("Invalid number of dropped targets: expected %v, got %v", expectedDropped, len(dropped))
}
for i := 0; i < 20; i++ {
time.Sleep(time.Duration(10 * time.Millisecond))
sp.reload(newConfig())
}
sp.stop()
}
func TestScrapeLoopStopBeforeRun(t *testing.T) {
scraper := &testScraper{}
@ -706,11 +752,6 @@ func TestScrapeLoopAppend(t *testing.T) {
for _, test := range tests {
app := &collectResultAppender{}
sp := &scrapePool{
config: &config.ScrapeConfig{
HonorLabels: test.honorLabels,
},
}
discoveryLabels := &Target{
labels: labels.FromStrings(test.discoveryLabels...),
@ -719,10 +760,10 @@ func TestScrapeLoopAppend(t *testing.T) {
sl := newScrapeLoop(context.Background(),
nil, nil, nil,
func(l labels.Labels) labels.Labels {
return sp.mutateSampleLabels(l, discoveryLabels)
return mutateSampleLabels(l, discoveryLabels, test.honorLabels, nil)
},
func(l labels.Labels) labels.Labels {
return sp.mutateReportSampleLabels(l, discoveryLabels)
return mutateReportSampleLabels(l, discoveryLabels)
},
func() storage.Appender { return app },
)