diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 94d2f41f6..d5595b76f 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -249,7 +249,7 @@ func TestManagerReloadNoChange(t *testing.T) { scrapeManager := NewManager(nil, nil) scrapeManager.scrapeConfigs[tsetName] = reloadCfg.ScrapeConfigs[0] // As reload never happens, new loop should never be called. - newLoop := func(_ *Target, s scraper) loop { + newLoop := func(_ *Target, s scraper, _ int, _ bool, _ []*config.RelabelConfig) loop { t.Fatal("reload happened") return nil } diff --git a/scrape/scrape.go b/scrape/scrape.go index 95bc80881..8a2fa6ebc 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -130,7 +130,7 @@ type scrapePool struct { cancel context.CancelFunc // Constructor for new scrape loops. This is settable for testing convenience. - newLoop func(*Target, scraper) loop + newLoop func(*Target, scraper, int, bool, []*config.RelabelConfig) loop } const maxAheadTime = 10 * time.Minute @@ -160,15 +160,21 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger) loops: map[uint64]loop{}, logger: logger, } - sp.newLoop = func(t *Target, s scraper) loop { + sp.newLoop = func(t *Target, s scraper, limit int, honor bool, mrc []*config.RelabelConfig) loop { return newScrapeLoop( ctx, s, log.With(logger, "target", t), buffers, - func(l labels.Labels) labels.Labels { return sp.mutateSampleLabels(l, t) }, - func(l labels.Labels) labels.Labels { return sp.mutateReportSampleLabels(l, t) }, - sp.appender, + func(l labels.Labels) labels.Labels { return mutateSampleLabels(l, t, honor, mrc) }, + func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, t) }, + func() storage.Appender { + app, err := app.Appender() + if err != nil { + panic(err) + } + return appender(app, limit) + }, ) } @@ -218,13 +224,16 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { wg sync.WaitGroup interval = time.Duration(sp.config.ScrapeInterval) timeout = time.Duration(sp.config.ScrapeTimeout) + limit = int(sp.config.SampleLimit) + honor = sp.config.HonorLabels + mrc = sp.config.MetricRelabelConfigs ) for fp, oldLoop := range sp.loops { var ( t = sp.targets[fp] s = &targetScraper{Target: t, client: sp.client, timeout: timeout} - newLoop = sp.newLoop(t, s) + newLoop = sp.newLoop(t, s, limit, honor, mrc) ) wg.Add(1) @@ -295,6 +304,9 @@ func (sp *scrapePool) sync(targets []*Target) { uniqueTargets = map[uint64]struct{}{} interval = time.Duration(sp.config.ScrapeInterval) timeout = time.Duration(sp.config.ScrapeTimeout) + limit = int(sp.config.SampleLimit) + honor = sp.config.HonorLabels + mrc = sp.config.MetricRelabelConfigs ) for _, t := range targets { @@ -304,7 +316,7 @@ func (sp *scrapePool) sync(targets []*Target) { if _, ok := sp.targets[hash]; !ok { s := &targetScraper{Target: t, client: sp.client, timeout: timeout} - l := sp.newLoop(t, s) + l := sp.newLoop(t, s, limit, honor, mrc) sp.targets[hash] = t sp.loops[hash] = l @@ -340,10 +352,10 @@ func (sp *scrapePool) sync(targets []*Target) { wg.Wait() } -func (sp *scrapePool) mutateSampleLabels(lset labels.Labels, target *Target) labels.Labels { +func mutateSampleLabels(lset labels.Labels, target *Target, honor bool, rc []*config.RelabelConfig) labels.Labels { lb := labels.NewBuilder(lset) - if sp.config.HonorLabels { + if honor { for _, l := range target.Labels() { if !lset.Has(l.Name) { lb.Set(l.Name, l.Value) @@ -367,14 +379,14 @@ func (sp *scrapePool) mutateSampleLabels(lset labels.Labels, target *Target) lab res := lb.Labels() - if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 { - res = relabel.Process(res, mrc...) + if len(rc) > 0 { + res = relabel.Process(res, rc...) } return res } -func (sp *scrapePool) mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels { +func mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels { lb := labels.NewBuilder(lset) for _, l := range target.Labels() { @@ -389,22 +401,17 @@ func (sp *scrapePool) mutateReportSampleLabels(lset labels.Labels, target *Targe } // appender returns an appender for ingested samples from the target. -func (sp *scrapePool) appender() storage.Appender { - app, err := sp.appendable.Appender() - if err != nil { - panic(err) - } - +func appender(app storage.Appender, limit int) storage.Appender { app = &timeLimitAppender{ Appender: app, maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), } // The limit is applied after metrics are potentially dropped via relabeling. - if sp.config.SampleLimit > 0 { + if limit > 0 { app = &limitAppender{ Appender: app, - limit: int(sp.config.SampleLimit), + limit: limit, } } return app diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index f9aae4dc9..3fbaed286 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -218,7 +218,7 @@ func TestScrapePoolReload(t *testing.T) { } // On starting to run, new loops created on reload check whether their preceding // equivalents have been stopped. - newLoop := func(_ *Target, s scraper) loop { + newLoop := func(_ *Target, s scraper, _ int, _ bool, _ []*config.RelabelConfig) loop { l := &testLoop{} l.startFunc = func(interval, timeout time.Duration, errc chan<- error) { if interval != 3*time.Second { @@ -306,7 +306,12 @@ func TestScrapePoolAppender(t *testing.T) { app := &nopAppendable{} sp := newScrapePool(cfg, app, nil) - wrapped := sp.appender() + loop := sp.newLoop(nil, nil, 0, false, nil) + appl, ok := loop.(*scrapeLoop) + if !ok { + t.Fatalf("Expected scrapeLoop but got %T", loop) + } + wrapped := appl.appender() tl, ok := wrapped.(*timeLimitAppender) if !ok { @@ -316,9 +321,12 @@ func TestScrapePoolAppender(t *testing.T) { t.Fatalf("Expected base appender but got %T", tl.Appender) } - cfg.SampleLimit = 100 - - wrapped = sp.appender() + loop = sp.newLoop(nil, nil, 100, false, nil) + appl, ok = loop.(*scrapeLoop) + if !ok { + t.Fatalf("Expected scrapeLoop but got %T", loop) + } + wrapped = appl.appender() sl, ok := wrapped.(*limitAppender) if !ok { @@ -333,6 +341,44 @@ func TestScrapePoolAppender(t *testing.T) { } } +func TestScrapePoolRaces(t *testing.T) { + interval, _ := model.ParseDuration("500ms") + timeout, _ := model.ParseDuration("1s") + newConfig := func() *config.ScrapeConfig { + return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout} + } + sp := newScrapePool(newConfig(), &nopAppendable{}, nil) + tgts := []*targetgroup.Group{ + &targetgroup.Group{ + Targets: []model.LabelSet{ + model.LabelSet{model.AddressLabel: "127.0.0.1:9090"}, + model.LabelSet{model.AddressLabel: "127.0.0.2:9090"}, + model.LabelSet{model.AddressLabel: "127.0.0.3:9090"}, + model.LabelSet{model.AddressLabel: "127.0.0.4:9090"}, + model.LabelSet{model.AddressLabel: "127.0.0.5:9090"}, + model.LabelSet{model.AddressLabel: "127.0.0.6:9090"}, + model.LabelSet{model.AddressLabel: "127.0.0.7:9090"}, + model.LabelSet{model.AddressLabel: "127.0.0.8:9090"}, + }, + }, + } + + active, dropped := sp.Sync(tgts) + expectedActive, expectedDropped := len(tgts[0].Targets), 0 + if len(active) != expectedActive { + t.Fatalf("Invalid number of active targets: expected %v, got %v", expectedActive, len(active)) + } + if len(dropped) != expectedDropped { + t.Fatalf("Invalid number of dropped targets: expected %v, got %v", expectedDropped, len(dropped)) + } + + for i := 0; i < 20; i++ { + time.Sleep(time.Duration(10 * time.Millisecond)) + sp.reload(newConfig()) + } + sp.stop() +} + func TestScrapeLoopStopBeforeRun(t *testing.T) { scraper := &testScraper{} @@ -706,11 +752,6 @@ func TestScrapeLoopAppend(t *testing.T) { for _, test := range tests { app := &collectResultAppender{} - sp := &scrapePool{ - config: &config.ScrapeConfig{ - HonorLabels: test.honorLabels, - }, - } discoveryLabels := &Target{ labels: labels.FromStrings(test.discoveryLabels...), @@ -719,10 +760,10 @@ func TestScrapeLoopAppend(t *testing.T) { sl := newScrapeLoop(context.Background(), nil, nil, nil, func(l labels.Labels) labels.Labels { - return sp.mutateSampleLabels(l, discoveryLabels) + return mutateSampleLabels(l, discoveryLabels, test.honorLabels, nil) }, func(l labels.Labels) labels.Labels { - return sp.mutateReportSampleLabels(l, discoveryLabels) + return mutateReportSampleLabels(l, discoveryLabels) }, func() storage.Appender { return app }, )