Pass over scrape cache to the next scrape (#6670)

* Pass over scrape cache to the next scrape

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
This commit is contained in:
Julien Pivotto 2020-01-22 13:13:47 +01:00 committed by Brian Brazil
parent 3fbeee07fa
commit fafb7940b1
2 changed files with 232 additions and 1 deletions

View File

@ -23,6 +23,7 @@ import (
"io/ioutil" "io/ioutil"
"math" "math"
"net/http" "net/http"
"reflect"
"sync" "sync"
"time" "time"
"unsafe" "unsafe"
@ -176,6 +177,7 @@ type scrapeLoopOptions struct {
honorLabels bool honorLabels bool
honorTimestamps bool honorTimestamps bool
mrc []*relabel.Config mrc []*relabel.Config
cache *scrapeCache
} }
const maxAheadTime = 10 * time.Minute const maxAheadTime = 10 * time.Minute
@ -208,7 +210,10 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, jitterSeed uint64,
} }
sp.newLoop = func(opts scrapeLoopOptions) loop { sp.newLoop = func(opts scrapeLoopOptions) loop {
// Update the targets retrieval function for metadata to a new scrape cache. // Update the targets retrieval function for metadata to a new scrape cache.
cache := newScrapeCache() cache := opts.cache
if cache == nil {
cache = newScrapeCache()
}
opts.target.SetMetadataStore(cache) opts.target.SetMetadataStore(cache)
return newScrapeLoop( return newScrapeLoop(
@ -291,6 +296,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
targetScrapePoolReloadsFailed.Inc() targetScrapePoolReloadsFailed.Inc()
return errors.Wrap(err, "error creating HTTP client") return errors.Wrap(err, "error creating HTTP client")
} }
reuseCache := reusableCache(sp.config, cfg)
sp.config = cfg sp.config = cfg
oldClient := sp.client oldClient := sp.client
sp.client = client sp.client = client
@ -306,6 +313,12 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
) )
for fp, oldLoop := range sp.loops { for fp, oldLoop := range sp.loops {
var cache *scrapeCache
if oc := oldLoop.getCache(); reuseCache && oc != nil {
cache = oc
} else {
cache = newScrapeCache()
}
var ( var (
t = sp.activeTargets[fp] t = sp.activeTargets[fp]
s = &targetScraper{Target: t, client: sp.client, timeout: timeout} s = &targetScraper{Target: t, client: sp.client, timeout: timeout}
@ -316,6 +329,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
honorLabels: honorLabels, honorLabels: honorLabels,
honorTimestamps: honorTimestamps, honorTimestamps: honorTimestamps,
mrc: mrc, mrc: mrc,
cache: cache,
}) })
) )
wg.Add(1) wg.Add(1)
@ -579,6 +593,7 @@ func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error)
type loop interface { type loop interface {
run(interval, timeout time.Duration, errc chan<- error) run(interval, timeout time.Duration, errc chan<- error)
stop() stop()
getCache() *scrapeCache
} }
type cacheEntry struct { type cacheEntry struct {
@ -1016,6 +1031,10 @@ func (sl *scrapeLoop) stop() {
<-sl.stopped <-sl.stopped
} }
func (sl *scrapeLoop) getCache() *scrapeCache {
return sl.cache
}
func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
var ( var (
app = sl.appender() app = sl.appender()
@ -1312,3 +1331,24 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v
return err return err
} }
} }
// zeroConfig returns a new scrape config that only contains configuration items
// that alter metrics.
func zeroConfig(c *config.ScrapeConfig) *config.ScrapeConfig {
z := *c
// We zero out the fields that for sure don't affect scrape.
z.ScrapeInterval = 0
z.ScrapeTimeout = 0
z.SampleLimit = 0
z.HTTPClientConfig = config_util.HTTPClientConfig{}
return &z
}
// reusableCache compares two scrape config and tells wheter the cache is still
// valid.
func reusableCache(r, l *config.ScrapeConfig) bool {
if r == nil || l == nil {
return false
}
return reflect.DeepEqual(zeroConfig(r), zeroConfig(l))
}

View File

@ -30,6 +30,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
dto "github.com/prometheus/client_model/go" dto "github.com/prometheus/client_model/go"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
@ -144,6 +145,10 @@ func (l *testLoop) stop() {
l.stopFunc() l.stopFunc()
} }
func (l *testLoop) getCache() *scrapeCache {
return nil
}
func TestScrapePoolStop(t *testing.T) { func TestScrapePoolStop(t *testing.T) {
sp := &scrapePool{ sp := &scrapePool{
activeTargets: map[uint64]*Target{}, activeTargets: map[uint64]*Target{},
@ -1576,3 +1581,189 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
testutil.Equals(t, true, series.Next(), "series not found in tsdb") testutil.Equals(t, true, series.Next(), "series not found in tsdb")
testutil.Equals(t, false, series.Next(), "more than one series found in tsdb") testutil.Equals(t, false, series.Next(), "more than one series found in tsdb")
} }
func TestReusableConfig(t *testing.T) {
variants := []*config.ScrapeConfig{
&config.ScrapeConfig{
JobName: "prometheus",
ScrapeTimeout: model.Duration(15 * time.Second),
},
&config.ScrapeConfig{
JobName: "httpd",
ScrapeTimeout: model.Duration(15 * time.Second),
},
&config.ScrapeConfig{
JobName: "prometheus",
ScrapeTimeout: model.Duration(5 * time.Second),
},
&config.ScrapeConfig{
JobName: "prometheus",
MetricsPath: "/metrics",
},
&config.ScrapeConfig{
JobName: "prometheus",
MetricsPath: "/metrics2",
},
&config.ScrapeConfig{
JobName: "prometheus",
ScrapeTimeout: model.Duration(5 * time.Second),
MetricsPath: "/metrics2",
},
&config.ScrapeConfig{
JobName: "prometheus",
ScrapeInterval: model.Duration(5 * time.Second),
MetricsPath: "/metrics2",
},
&config.ScrapeConfig{
JobName: "prometheus",
ScrapeInterval: model.Duration(5 * time.Second),
SampleLimit: 1000,
MetricsPath: "/metrics2",
},
}
match := [][]int{
[]int{0, 2},
[]int{4, 5},
[]int{4, 6},
[]int{4, 7},
[]int{5, 6},
[]int{5, 7},
[]int{6, 7},
}
noMatch := [][]int{
[]int{1, 2},
[]int{0, 4},
[]int{3, 4},
}
for i, m := range match {
testutil.Equals(t, true, reusableCache(variants[m[0]], variants[m[1]]), "match test %d", i)
testutil.Equals(t, true, reusableCache(variants[m[1]], variants[m[0]]), "match test %d", i)
testutil.Equals(t, true, reusableCache(variants[m[1]], variants[m[1]]), "match test %d", i)
testutil.Equals(t, true, reusableCache(variants[m[0]], variants[m[0]]), "match test %d", i)
}
for i, m := range noMatch {
testutil.Equals(t, false, reusableCache(variants[m[0]], variants[m[1]]), "not match test %d", i)
testutil.Equals(t, false, reusableCache(variants[m[1]], variants[m[0]]), "not match test %d", i)
}
}
func TestReuseScrapeCache(t *testing.T) {
var (
app = &nopAppendable{}
cfg = &config.ScrapeConfig{
JobName: "Prometheus",
ScrapeTimeout: model.Duration(5 * time.Second),
ScrapeInterval: model.Duration(5 * time.Second),
MetricsPath: "/metrics",
}
sp, _ = newScrapePool(cfg, app, 0, nil)
t1 = &Target{
discoveredLabels: labels.Labels{
labels.Label{
Name: "labelNew",
Value: "nameNew",
},
},
}
proxyURL, _ = url.Parse("http://localhost:2128")
)
sp.sync([]*Target{t1})
steps := []struct {
keep bool
newConfig *config.ScrapeConfig
}{
{
keep: true,
newConfig: &config.ScrapeConfig{
JobName: "Prometheus",
ScrapeInterval: model.Duration(5 * time.Second),
ScrapeTimeout: model.Duration(5 * time.Second),
MetricsPath: "/metrics",
},
},
{
keep: false,
newConfig: &config.ScrapeConfig{
JobName: "Prometheus",
ScrapeInterval: model.Duration(5 * time.Second),
ScrapeTimeout: model.Duration(15 * time.Second),
MetricsPath: "/metrics2",
},
},
{
keep: true,
newConfig: &config.ScrapeConfig{
JobName: "Prometheus",
SampleLimit: 400,
ScrapeInterval: model.Duration(5 * time.Second),
ScrapeTimeout: model.Duration(15 * time.Second),
MetricsPath: "/metrics2",
},
},
{
keep: false,
newConfig: &config.ScrapeConfig{
JobName: "Prometheus",
HonorTimestamps: true,
SampleLimit: 400,
ScrapeInterval: model.Duration(5 * time.Second),
ScrapeTimeout: model.Duration(15 * time.Second),
MetricsPath: "/metrics2",
},
},
{
keep: true,
newConfig: &config.ScrapeConfig{
JobName: "Prometheus",
HonorTimestamps: true,
SampleLimit: 400,
HTTPClientConfig: config_util.HTTPClientConfig{
ProxyURL: config_util.URL{URL: proxyURL},
},
ScrapeInterval: model.Duration(5 * time.Second),
ScrapeTimeout: model.Duration(15 * time.Second),
MetricsPath: "/metrics2",
},
},
{
keep: false,
newConfig: &config.ScrapeConfig{
JobName: "Prometheus",
HonorTimestamps: true,
HonorLabels: true,
SampleLimit: 400,
ScrapeInterval: model.Duration(5 * time.Second),
ScrapeTimeout: model.Duration(15 * time.Second),
MetricsPath: "/metrics2",
},
},
}
cacheAddr := func(sp *scrapePool) map[uint64]string {
r := make(map[uint64]string)
for fp, l := range sp.loops {
r[fp] = fmt.Sprintf("%p", l.getCache())
}
return r
}
for i, s := range steps {
initCacheAddr := cacheAddr(sp)
sp.reload(s.newConfig)
for fp, newCacheAddr := range cacheAddr(sp) {
if s.keep {
testutil.Assert(t, initCacheAddr[fp] == newCacheAddr, "step %d: old cache and new cache are not the same", i)
} else {
testutil.Assert(t, initCacheAddr[fp] != newCacheAddr, "step %d: old cache and new cache are the same", i)
}
}
initCacheAddr = cacheAddr(sp)
sp.reload(s.newConfig)
for fp, newCacheAddr := range cacheAddr(sp) {
testutil.Assert(t, initCacheAddr[fp] == newCacheAddr, "step %d: reloading the exact config invalidates the cache", i)
}
}
}