Make scraping offset consistent.

To evenly distribute scraping load we currently rely on random
jittering. This commit hashes over the target's identity and calculates
a consistent offset. This also ensures that scrape intervals
are constantly spaced between config/target changes.
This commit is contained in:
Fabian Reinartz 2016-02-15 15:22:57 +01:00
parent 938ebe78c2
commit fe7e91e2eb
3 changed files with 123 additions and 56 deletions

View File

@ -18,7 +18,6 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/http"
"net/url"
"strings"
@ -267,6 +266,30 @@ func (t *Target) String() string {
return t.host()
}
// fingerprint returns an identifying hash for the target.
func (t *Target) fingerprint() model.Fingerprint {
t.RLock()
defer t.RUnlock()
return t.labels.Fingerprint()
}
// offset returns the time until the next scrape cycle for the target.
func (t *Target) offset(interval time.Duration) time.Duration {
now := time.Now().UnixNano()
var (
base = now % int64(interval)
offset = uint64(t.fingerprint()) % uint64(interval)
next = base + int64(offset)
)
if next > int64(interval) {
next -= int64(interval)
}
return time.Duration(next)
}
func (t *Target) client() (*http.Client, error) {
t.RLock()
defer t.RUnlock()
@ -366,14 +389,12 @@ func (t *Target) RunScraper(sampleAppender storage.SampleAppender) {
log.Debugf("Starting scraper for target %v...", t)
jitterTimer := time.NewTimer(time.Duration(float64(lastScrapeInterval) * rand.Float64()))
select {
case <-jitterTimer.C:
case <-time.After(t.offset(lastScrapeInterval)):
// Continue after scraping offset.
case <-t.scraperStopping:
jitterTimer.Stop()
return
}
jitterTimer.Stop()
ticker := time.NewTicker(lastScrapeInterval)
defer ticker.Stop()

View File

@ -45,6 +45,52 @@ func TestTargetLabels(t *testing.T) {
}
}
func TestTargetOffset(t *testing.T) {
interval := 10 * time.Second
offsets := make([]time.Duration, 10000)
// Calculate offsets for 10000 different targets.
for i := range offsets {
target := newTestTarget("example.com:80", 0, model.LabelSet{
"label": model.LabelValue(fmt.Sprintf("%d", i)),
})
offsets[i] = target.offset(interval)
}
// Put the offsets into buckets and validate that they are all
// within bounds.
bucketSize := 1 * time.Second
buckets := make([]int, interval/bucketSize)
for _, offset := range offsets {
if offset < 0 || offset >= interval {
t.Fatalf("Offset %v out of bounds", offset)
}
bucket := offset / bucketSize
buckets[bucket]++
}
t.Log(buckets)
// Calculate whether the the number of targets per bucket
// does not differ more than a given tolerance.
avg := len(offsets) / len(buckets)
tolerance := 0.15
for _, bucket := range buckets {
diff := bucket - avg
if diff < 0 {
diff = -diff
}
if float64(diff)/float64(avg) > tolerance {
t.Fatalf("Bucket out of tolerance bounds")
}
}
}
func TestOverwriteLabels(t *testing.T) {
type test struct {
metric string

View File

@ -864,6 +864,7 @@ type bintree struct {
Func func() (*asset, error)
Children map[string]*bintree
}
var _bintree = &bintree{nil, map[string]*bintree{
"web": &bintree{nil, map[string]*bintree{
"ui": &bintree{nil, map[string]*bintree{
@ -979,4 +980,3 @@ func _filePath(dir, name string) string {
cannonicalName := strings.Replace(name, "\\", "/", -1)
return filepath.Join(append([]string{dir}, strings.Split(cannonicalName, "/")...)...)
}