Add scraping tests
This commit is contained in:
parent
76a8c6160d
commit
9bea27ae8a
|
@ -74,9 +74,14 @@ type scrapePool struct {
|
|||
|
||||
ctx context.Context
|
||||
|
||||
// Targets and loops must always be synchronized to have the same
|
||||
// set of fingerprints.
|
||||
mtx sync.RWMutex
|
||||
targets map[model.Fingerprint]*Target
|
||||
loops map[model.Fingerprint]loop
|
||||
|
||||
// Constructor for new scrape loops. This is settable for testing convenience.
|
||||
newLoop func(context.Context, scraper, storage.SampleAppender, storage.SampleAppender) loop
|
||||
}
|
||||
|
||||
func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool {
|
||||
|
@ -85,25 +90,28 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrape
|
|||
config: cfg,
|
||||
targets: map[model.Fingerprint]*Target{},
|
||||
loops: map[model.Fingerprint]loop{},
|
||||
newLoop: newScrapeLoop,
|
||||
}
|
||||
}
|
||||
|
||||
// stop terminates all scrape loops and returns after they all terminated.
|
||||
// A stopped scrape pool must not be used again.
|
||||
func (sp *scrapePool) stop() {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
sp.mtx.RLock()
|
||||
sp.mtx.Lock()
|
||||
defer sp.mtx.Unlock()
|
||||
|
||||
for _, l := range sp.loops {
|
||||
for fp, l := range sp.loops {
|
||||
wg.Add(1)
|
||||
|
||||
go func(l loop) {
|
||||
l.stop()
|
||||
wg.Done()
|
||||
}(l)
|
||||
|
||||
delete(sp.loops, fp)
|
||||
delete(sp.targets, fp)
|
||||
}
|
||||
sp.mtx.RUnlock()
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
@ -126,7 +134,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
|
|||
for fp, oldLoop := range sp.loops {
|
||||
var (
|
||||
t = sp.targets[fp]
|
||||
newLoop = newScrapeLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t))
|
||||
newLoop = sp.newLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t))
|
||||
)
|
||||
wg.Add(1)
|
||||
|
||||
|
@ -143,6 +151,56 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
|
|||
wg.Wait()
|
||||
}
|
||||
|
||||
// sync takes a list of potentially duplicated targets, deduplicates them, starts
|
||||
// scrape loops for new targets, and stops scrape loops for disappeared targets.
|
||||
// It returns after all stopped scrape loops terminated.
|
||||
func (sp *scrapePool) sync(targets []*Target) {
|
||||
sp.mtx.Lock()
|
||||
defer sp.mtx.Unlock()
|
||||
|
||||
var (
|
||||
fingerprints = map[model.Fingerprint]struct{}{}
|
||||
interval = time.Duration(sp.config.ScrapeInterval)
|
||||
timeout = time.Duration(sp.config.ScrapeTimeout)
|
||||
)
|
||||
|
||||
for _, t := range targets {
|
||||
fp := t.fingerprint()
|
||||
fingerprints[fp] = struct{}{}
|
||||
|
||||
if _, ok := sp.targets[fp]; !ok {
|
||||
l := sp.newLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t))
|
||||
|
||||
sp.targets[fp] = t
|
||||
sp.loops[fp] = l
|
||||
|
||||
go l.run(interval, timeout, nil)
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Stop and remove old targets and scraper loops.
|
||||
for fp := range sp.targets {
|
||||
if _, ok := fingerprints[fp]; !ok {
|
||||
wg.Add(1)
|
||||
go func(l loop) {
|
||||
l.stop()
|
||||
wg.Done()
|
||||
}(sp.loops[fp])
|
||||
|
||||
delete(sp.loops, fp)
|
||||
delete(sp.targets, fp)
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for all potentially stopped scrapers to terminate.
|
||||
// This covers the case of flapping targets. If the server is under high load, a new scraper
|
||||
// may be active and tries to insert. The old scraper that didn't terminate yet could still
|
||||
// be inserting a previous sample set.
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// sampleAppender returns an appender for ingested samples from the target.
|
||||
func (sp *scrapePool) sampleAppender(target *Target) storage.SampleAppender {
|
||||
app := sp.appender
|
||||
|
@ -177,56 +235,6 @@ func (sp *scrapePool) reportAppender(target *Target) storage.SampleAppender {
|
|||
}
|
||||
}
|
||||
|
||||
// sync takes a list of potentially duplicated targets, deduplicates them, starts
|
||||
// scrape loops for new targets, and stops scrape loops for disappeared targets.
|
||||
// It returns after all stopped scrape loops terminated.
|
||||
func (sp *scrapePool) sync(targets []*Target) {
|
||||
sp.mtx.Lock()
|
||||
defer sp.mtx.Unlock()
|
||||
|
||||
var (
|
||||
fingerprints = map[model.Fingerprint]struct{}{}
|
||||
interval = time.Duration(sp.config.ScrapeInterval)
|
||||
timeout = time.Duration(sp.config.ScrapeTimeout)
|
||||
)
|
||||
|
||||
for _, t := range targets {
|
||||
fp := t.fingerprint()
|
||||
fingerprints[fp] = struct{}{}
|
||||
|
||||
if _, ok := sp.targets[fp]; !ok {
|
||||
l := newScrapeLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t))
|
||||
|
||||
sp.targets[fp] = t
|
||||
sp.loops[fp] = l
|
||||
|
||||
go l.run(interval, timeout, nil)
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Stop and remove old targets and scraper loops.
|
||||
for fp := range sp.targets {
|
||||
if _, ok := fingerprints[fp]; !ok {
|
||||
wg.Add(1)
|
||||
go func(l loop) {
|
||||
l.stop()
|
||||
wg.Done()
|
||||
}(sp.loops[fp])
|
||||
|
||||
delete(sp.loops, fp)
|
||||
delete(sp.targets, fp)
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for all potentially stopped scrapers to terminate.
|
||||
// This covers the case of flapping targets. If the server is under high load, a new scraper
|
||||
// may be active and tries to insert. The old scraper that didn't terminate yet could still
|
||||
// be inserting a previous sample set.
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// A scraper retrieves samples and accepts a status report at the end.
|
||||
type scraper interface {
|
||||
scrape(ctx context.Context, ts time.Time) (model.Samples, error)
|
||||
|
@ -234,7 +242,7 @@ type scraper interface {
|
|||
offset(interval time.Duration) time.Duration
|
||||
}
|
||||
|
||||
// A loop can run and be stopped again. It must be reused after it was stopped.
|
||||
// A loop can run and be stopped again. It must not be reused after it was stopped.
|
||||
type loop interface {
|
||||
run(interval, timeout time.Duration, errc chan<- error)
|
||||
stop()
|
||||
|
@ -247,12 +255,11 @@ type scrapeLoop struct {
|
|||
reportAppender storage.SampleAppender
|
||||
|
||||
done chan struct{}
|
||||
mtx sync.RWMutex
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
}
|
||||
|
||||
func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.SampleAppender) *scrapeLoop {
|
||||
func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.SampleAppender) loop {
|
||||
sl := &scrapeLoop{
|
||||
scraper: sc,
|
||||
appender: app,
|
||||
|
@ -321,10 +328,7 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
|
|||
}
|
||||
|
||||
func (sl *scrapeLoop) stop() {
|
||||
sl.mtx.RLock()
|
||||
sl.cancel()
|
||||
sl.mtx.RUnlock()
|
||||
|
||||
<-sl.done
|
||||
}
|
||||
|
||||
|
|
|
@ -14,6 +14,9 @@
|
|||
package retrieval
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -21,8 +24,199 @@ import (
|
|||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
||||
func TestNewScrapePool(t *testing.T) {
|
||||
var (
|
||||
app = &nopAppender{}
|
||||
cfg = &config.ScrapeConfig{}
|
||||
sp = newScrapePool(cfg, app)
|
||||
)
|
||||
|
||||
if a, ok := sp.appender.(*nopAppender); !ok || a != app {
|
||||
t.Fatalf("Wrong sample appender")
|
||||
}
|
||||
if sp.config != cfg {
|
||||
t.Fatalf("Wrong scrape config")
|
||||
}
|
||||
if sp.newLoop == nil {
|
||||
t.Fatalf("newLoop function not initialized")
|
||||
}
|
||||
}
|
||||
|
||||
type testLoop struct {
|
||||
startFunc func(interval, timeout time.Duration, errc chan<- error)
|
||||
stopFunc func()
|
||||
}
|
||||
|
||||
func (l *testLoop) run(interval, timeout time.Duration, errc chan<- error) {
|
||||
l.startFunc(interval, timeout, errc)
|
||||
}
|
||||
|
||||
func (l *testLoop) stop() {
|
||||
l.stopFunc()
|
||||
}
|
||||
|
||||
func TestScrapePoolStop(t *testing.T) {
|
||||
sp := &scrapePool{
|
||||
targets: map[model.Fingerprint]*Target{},
|
||||
loops: map[model.Fingerprint]loop{},
|
||||
}
|
||||
var mtx sync.Mutex
|
||||
stopped := map[model.Fingerprint]bool{}
|
||||
numTargets := 20
|
||||
|
||||
// Stopping the scrape pool must call stop() on all scrape loops,
|
||||
// clean them and the respective targets up. It must wait until each loop's
|
||||
// stop function returned before returning itself.
|
||||
|
||||
for i := 0; i < numTargets; i++ {
|
||||
t := &Target{
|
||||
labels: model.LabelSet{
|
||||
model.AddressLabel: model.LabelValue(fmt.Sprintf("example.com:%d", i)),
|
||||
},
|
||||
}
|
||||
l := &testLoop{}
|
||||
l.stopFunc = func() {
|
||||
time.Sleep(time.Duration(i*20) * time.Millisecond)
|
||||
|
||||
mtx.Lock()
|
||||
stopped[t.fingerprint()] = true
|
||||
mtx.Unlock()
|
||||
}
|
||||
|
||||
sp.targets[t.fingerprint()] = t
|
||||
sp.loops[t.fingerprint()] = l
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
stopTime := time.Now()
|
||||
|
||||
go func() {
|
||||
sp.stop()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("scrapeLoop.stop() did not return as expected")
|
||||
case <-done:
|
||||
// This should have taken at least as long as the last target slept.
|
||||
if time.Since(stopTime) < time.Duration(numTargets*20)*time.Millisecond {
|
||||
t.Fatalf("scrapeLoop.stop() exited before all targets stopped")
|
||||
}
|
||||
}
|
||||
|
||||
mtx.Lock()
|
||||
if len(stopped) != numTargets {
|
||||
t.Fatalf("Expected 20 stopped loops, got %d", len(stopped))
|
||||
}
|
||||
mtx.Unlock()
|
||||
|
||||
if len(sp.targets) > 0 {
|
||||
t.Fatalf("Targets were not cleared on stopping: %d left", len(sp.targets))
|
||||
}
|
||||
if len(sp.loops) > 0 {
|
||||
t.Fatalf("Loops were not cleared on stopping: %d left", len(sp.loops))
|
||||
}
|
||||
}
|
||||
|
||||
func TestScrapePoolReload(t *testing.T) {
|
||||
var mtx sync.Mutex
|
||||
numTargets := 20
|
||||
|
||||
stopped := map[model.Fingerprint]bool{}
|
||||
|
||||
reloadCfg := &config.ScrapeConfig{
|
||||
ScrapeInterval: model.Duration(3 * time.Second),
|
||||
ScrapeTimeout: model.Duration(2 * time.Second),
|
||||
}
|
||||
// On starting to run, new loops created on reload check whether their preceeding
|
||||
// equivalents have been stopped.
|
||||
newLoop := func(ctx context.Context, s scraper, app, reportApp storage.SampleAppender) loop {
|
||||
l := &testLoop{}
|
||||
l.startFunc = func(interval, timeout time.Duration, errc chan<- error) {
|
||||
if interval != 3*time.Second {
|
||||
t.Errorf("Expected scrape interval %d but got %d", 3*time.Second, interval)
|
||||
}
|
||||
if timeout != 2*time.Second {
|
||||
t.Errorf("Expected scrape timeout %d but got %d", 2*time.Second, timeout)
|
||||
}
|
||||
mtx.Lock()
|
||||
if !stopped[s.(*Target).fingerprint()] {
|
||||
t.Errorf("Scrape loop for %v not stopped yet", s.(*Target))
|
||||
}
|
||||
mtx.Unlock()
|
||||
}
|
||||
return l
|
||||
}
|
||||
sp := &scrapePool{
|
||||
targets: map[model.Fingerprint]*Target{},
|
||||
loops: map[model.Fingerprint]loop{},
|
||||
newLoop: newLoop,
|
||||
}
|
||||
|
||||
// Reloading a scrape pool with a new scrape configuration must stop all scrape
|
||||
// loops and start new ones. A new loop must not be started before the preceeding
|
||||
// one terminated.
|
||||
|
||||
for i := 0; i < numTargets; i++ {
|
||||
t := &Target{
|
||||
labels: model.LabelSet{
|
||||
model.AddressLabel: model.LabelValue(fmt.Sprintf("example.com:%d", i)),
|
||||
},
|
||||
}
|
||||
l := &testLoop{}
|
||||
l.stopFunc = func() {
|
||||
time.Sleep(time.Duration(i*20) * time.Millisecond)
|
||||
|
||||
mtx.Lock()
|
||||
stopped[t.fingerprint()] = true
|
||||
mtx.Unlock()
|
||||
}
|
||||
|
||||
sp.targets[t.fingerprint()] = t
|
||||
sp.loops[t.fingerprint()] = l
|
||||
}
|
||||
done := make(chan struct{})
|
||||
|
||||
beforeTargets := map[model.Fingerprint]*Target{}
|
||||
for fp, t := range sp.targets {
|
||||
beforeTargets[fp] = t
|
||||
}
|
||||
|
||||
reloadTime := time.Now()
|
||||
|
||||
go func() {
|
||||
sp.reload(reloadCfg)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("scrapeLoop.reload() did not return as expected")
|
||||
case <-done:
|
||||
// This should have taken at least as long as the last target slept.
|
||||
if time.Since(reloadTime) < time.Duration(numTargets*20)*time.Millisecond {
|
||||
t.Fatalf("scrapeLoop.stop() exited before all targets stopped")
|
||||
}
|
||||
}
|
||||
|
||||
mtx.Lock()
|
||||
if len(stopped) != numTargets {
|
||||
t.Fatalf("Expected 20 stopped loops, got %d", stopped)
|
||||
}
|
||||
mtx.Unlock()
|
||||
|
||||
if !reflect.DeepEqual(sp.targets, beforeTargets) {
|
||||
t.Fatalf("Reloading affected target states unexpectedly")
|
||||
}
|
||||
if len(sp.loops) != numTargets {
|
||||
t.Fatalf("Expected %d loops after reload but got %d", numTargets, len(sp.loops))
|
||||
}
|
||||
}
|
||||
|
||||
func TestScrapePoolReportAppender(t *testing.T) {
|
||||
cfg := &config.ScrapeConfig{
|
||||
MetricRelabelConfigs: []*config.RelabelConfig{
|
||||
|
@ -100,6 +294,53 @@ func TestScrapePoolSampleAppender(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestScrapeLoopStop(t *testing.T) {
|
||||
scraper := &testScraper{}
|
||||
sl := newScrapeLoop(context.Background(), scraper, nil, nil)
|
||||
|
||||
// The scrape pool synchronizes on stopping scrape loops. However, new scrape
|
||||
// loops are syarted asynchronously. Thus it's possible, that a loop is stopped
|
||||
// again before having started properly.
|
||||
// Stopping not-yet-started loops must block until the run method was called and exited.
|
||||
// The run method must exit immediately.
|
||||
|
||||
stopDone := make(chan struct{})
|
||||
go func() {
|
||||
sl.stop()
|
||||
close(stopDone)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-stopDone:
|
||||
t.Fatalf("Stopping terminated before run exited successfully")
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
}
|
||||
|
||||
// Running the scrape loop must exit before calling the scraper even once.
|
||||
scraper.scrapeFunc = func(context.Context, time.Time) (model.Samples, error) {
|
||||
t.Fatalf("scraper was called for terminated scrape loop")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
runDone := make(chan struct{})
|
||||
go func() {
|
||||
sl.run(0, 0, nil)
|
||||
close(runDone)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-runDone:
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatalf("Running terminated scrape loop did not exit")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-stopDone:
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatalf("Stopping did not terminate after running exited")
|
||||
}
|
||||
}
|
||||
|
||||
func TestScrapeLoopRun(t *testing.T) {
|
||||
var (
|
||||
signal = make(chan struct{})
|
||||
|
|
Loading…
Reference in New Issue