diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index fa20f8b11..af6a0719e 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -142,9 +142,14 @@ type testLoop struct { stopFunc func() forcedErr error forcedErrMtx sync.Mutex + runOnce bool } func (l *testLoop) run(interval, timeout time.Duration, errc chan<- error) { + if l.runOnce { + panic("loop must be started only once") + } + l.runOnce = true l.startFunc(interval, timeout, errc) } @@ -316,12 +321,16 @@ func TestScrapePoolReload(t *testing.T) { } func TestScrapePoolTargetLimit(t *testing.T) { + var wg sync.WaitGroup // On starting to run, new loops created on reload check whether their preceding // equivalents have been stopped. newLoop := func(opts scrapeLoopOptions) loop { + wg.Add(1) l := &testLoop{ - startFunc: func(interval, timeout time.Duration, errc chan<- error) {}, - stopFunc: func() {}, + startFunc: func(interval, timeout time.Duration, errc chan<- error) { + wg.Done() + }, + stopFunc: func() {}, } return l } @@ -361,6 +370,13 @@ func TestScrapePoolTargetLimit(t *testing.T) { sp.Sync(tgs[:n]) } + validateIsRunning := func() { + wg.Wait() + for _, l := range sp.loops { + testutil.Assert(t, l.(*testLoop).runOnce, "loop should be running") + } + } + validateErrorMessage := func(shouldErr bool) { for _, l := range sp.loops { lerr := l.(*testLoop).getForcedError() @@ -375,29 +391,54 @@ func TestScrapePoolTargetLimit(t *testing.T) { reloadWithLimit(0) loadTargets(50) + validateIsRunning() // Simulate an initial config with a limit. sp.config.TargetLimit = 30 limit = 30 loadTargets(50) + validateIsRunning() validateErrorMessage(true) reloadWithLimit(50) + validateIsRunning() validateErrorMessage(false) reloadWithLimit(40) + validateIsRunning() validateErrorMessage(true) loadTargets(30) + validateIsRunning() validateErrorMessage(false) loadTargets(40) + validateIsRunning() validateErrorMessage(false) loadTargets(41) + validateIsRunning() validateErrorMessage(true) reloadWithLimit(51) + validateIsRunning() + validateErrorMessage(false) + + tgs = append(tgs, + &targetgroup.Group{ + Targets: []model.LabelSet{ + {model.AddressLabel: model.LabelValue("127.0.0.1:1090")}, + }, + }, + &targetgroup.Group{ + Targets: []model.LabelSet{ + {model.AddressLabel: model.LabelValue("127.0.0.1:1090")}, + }, + }, + ) + + sp.Sync(tgs) + validateIsRunning() validateErrorMessage(false) } @@ -476,6 +517,54 @@ func TestScrapePoolRaces(t *testing.T) { sp.stop() } +func TestScrapePoolScrapeLoopsStarted(t *testing.T) { + var wg sync.WaitGroup + newLoop := func(opts scrapeLoopOptions) loop { + wg.Add(1) + l := &testLoop{ + startFunc: func(interval, timeout time.Duration, errc chan<- error) { + wg.Done() + }, + stopFunc: func() {}, + } + return l + } + sp := &scrapePool{ + appendable: &nopAppendable{}, + activeTargets: map[uint64]*Target{}, + loops: map[uint64]loop{}, + newLoop: newLoop, + logger: nil, + client: http.DefaultClient, + } + + tgs := []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + {model.AddressLabel: model.LabelValue("127.0.0.1:9090")}, + }, + }, + { + Targets: []model.LabelSet{ + {model.AddressLabel: model.LabelValue("127.0.0.1:9090")}, + }, + }, + } + + testutil.Ok(t, sp.reload(&config.ScrapeConfig{ + ScrapeInterval: model.Duration(3 * time.Second), + ScrapeTimeout: model.Duration(2 * time.Second), + })) + sp.Sync(tgs) + + testutil.Equals(t, 1, len(sp.loops)) + + wg.Wait() + for _, l := range sp.loops { + testutil.Assert(t, l.(*testLoop).runOnce, "loop should be running") + } +} + func TestScrapeLoopStopBeforeRun(t *testing.T) { scraper := &testScraper{}