retrieval: extract scrape cache

This commit is contained in:
Fabian Reinartz 2017-05-26 10:44:48 +02:00
parent a83014f53c
commit bc7aff8cef
3 changed files with 151 additions and 114 deletions

View File

@ -99,6 +99,7 @@ func (p *Parser) Err() error {
}
// Metric writes the labels of the current sample into the passed labels.
// It returns the string from which the metric was parsed.
func (p *Parser) Metric(l *labels.Labels) string {
// Allocate the full immutable string immediately, so we just
// have to create references on it below.

View File

@ -120,6 +120,16 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable
// Any errors that could occur here should be caught during config validation.
log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)
}
newLoop := func(
ctx context.Context,
s scraper,
app, reportApp func() storage.Appender,
l log.Logger,
) loop {
return newScrapeLoop(ctx, s, app, reportApp, l)
}
return &scrapePool{
appendable: app,
config: cfg,
@ -127,7 +137,7 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable
client: client,
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
newLoop: newScrapeLoop,
newLoop: newLoop,
}
}
@ -427,39 +437,110 @@ type refEntry struct {
type scrapeLoop struct {
scraper scraper
l log.Logger
iter uint64 // scrape iteration
cache *scrapeCache
appender func() storage.Appender
reportAppender func() storage.Appender
refCache map[string]*refEntry // Parsed string to ref.
lsetCache map[string]*lsetCacheEntry // Ref to labelset and string
// seriesCur and seriesPrev store the labels of series that were seen
// in the current and previous scrape.
// We hold two maps and swap them out to save allocations.
seriesCur map[uint64]labels.Labels
seriesPrev map[uint64]labels.Labels
ctx context.Context
scrapeCtx context.Context
cancel func()
stopped chan struct{}
}
func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storage.Appender, l log.Logger) loop {
// scrapeCache tracks mappings of exposed metric strings to label sets and
// storage references. Additionally, it tracks staleness of series between
// scrapes.
type scrapeCache struct {
iter uint64 // Current scrape iteration.
refs map[string]*refEntry // Parsed string to ref.
lsets map[string]*lsetCacheEntry // Ref to labelset and string.
// seriesCur and seriesPrev store the labels of series that were seen
// in the current and previous scrape.
// We hold two maps and swap them out to save allocations.
seriesCur map[uint64]labels.Labels
seriesPrev map[uint64]labels.Labels
}
func newScrapeCache() *scrapeCache {
return &scrapeCache{
refs: map[string]*refEntry{},
lsets: map[string]*lsetCacheEntry{},
seriesCur: map[uint64]labels.Labels{},
seriesPrev: map[uint64]labels.Labels{},
}
}
func (c *scrapeCache) iterDone() {
// refCache and lsetCache may grow over time through series churn
// or multiple string representations of the same metric. Clean up entries
// that haven't appeared in the last scrape.
for s, e := range c.refs {
if e.lastIter < c.iter {
delete(c.refs, s)
delete(c.lsets, e.ref)
}
}
// Swap current and previous series.
c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev
// We have to delete every single key in the map.
for k := range c.seriesCur {
delete(c.seriesCur, k)
}
c.iter++
}
func (c *scrapeCache) getRef(met string) (string, bool) {
e, ok := c.refs[met]
if !ok {
return "", false
}
e.lastIter = c.iter
return e.ref, true
}
func (c *scrapeCache) addRef(met, ref string, lset labels.Labels) {
c.refs[met] = &refEntry{ref: ref, lastIter: c.iter}
// met is the raw string the metric was ingested as. The label set is not ordered
// and thus it's not suitable to uniquely identify cache entries.
// We store a hash over the label set instead.
c.lsets[ref] = &lsetCacheEntry{lset: lset, hash: lset.Hash()}
}
func (c *scrapeCache) trackStaleness(ref string) {
e := c.lsets[ref]
c.seriesCur[e.hash] = e.lset
}
func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) {
for h, lset := range c.seriesPrev {
if _, ok := c.seriesCur[h]; !ok {
if !f(lset) {
break
}
}
}
}
func newScrapeLoop(
ctx context.Context,
sc scraper,
app, reportApp func() storage.Appender,
l log.Logger,
) *scrapeLoop {
if l == nil {
l = log.Base()
}
sl := &scrapeLoop{
scraper: sc,
appender: app,
cache: newScrapeCache(),
reportAppender: reportApp,
refCache: map[string]*refEntry{},
lsetCache: map[string]*lsetCacheEntry{},
seriesCur: map[uint64]labels.Labels{},
seriesPrev: map[uint64]labels.Labels{},
stopped: make(chan struct{}),
ctx: ctx,
l: l,
@ -487,8 +568,6 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
mainLoop:
for {
sl.iter++
buf.Reset()
select {
case <-sl.ctx.Done():
@ -534,16 +613,6 @@ mainLoop:
sl.report(start, time.Since(start), total, added, err)
last = start
// refCache and lsetCache may grow over time through series churn
// or multiple string representation of the same metric. Clean up entries
// that haven't appeared in the last scrape.
for s, e := range sl.refCache {
if e.lastIter < sl.iter {
delete(sl.refCache, s)
delete(sl.lsetCache, e.ref)
}
}
select {
case <-sl.ctx.Done():
close(sl.stopped)
@ -654,18 +723,12 @@ loop:
t = *tp
}
mets := yoloString(met)
re, ok := sl.refCache[mets]
ref, ok := sl.cache.getRef(yoloString(met))
if ok {
re.lastIter = sl.iter
switch err = app.AddFast(re.ref, t, v); err {
switch err = app.AddFast(ref, t, v); err {
case nil:
if tp == nil {
e := sl.lsetCache[re.ref]
// Bypass staleness logic if there is an explicit timestamp.
sl.seriesCur[e.hash] = e.lset
sl.cache.trackStaleness(ref)
}
case storage.ErrNotFound:
ok = false
@ -686,7 +749,7 @@ loop:
}
if !ok {
var lset labels.Labels
mets = p.Metric(&lset)
mets := p.Metric(&lset)
var ref string
ref, err = app.Add(lset, t, v)
@ -710,16 +773,11 @@ loop:
break loop
}
sl.refCache[mets] = &refEntry{ref: ref, lastIter: sl.iter}
sl.cache.addRef(mets, ref, lset)
// mets is the raw string the metric was ingested as and ambigious as it might
// not be sorted. Construct the authoritative string for the label set.
h := lset.Hash()
sl.lsetCache[ref] = &lsetCacheEntry{lset: lset, hash: h}
if tp == nil {
// Bypass staleness logic if there is an explicit timestamp.
sl.seriesCur[h] = lset
sl.cache.trackStaleness(ref)
}
}
added++
@ -734,25 +792,19 @@ loop:
sl.l.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp")
}
if err == nil {
for metric, lset := range sl.seriesPrev {
if _, ok := sl.seriesCur[metric]; !ok {
// Series no longer exposed, mark it stale.
_, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN))
switch err {
case nil:
case errSeriesDropped:
err = nil
continue
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if a target
// goes away and comes back again with a new scrape loop.
err = nil
continue
default:
break
}
sl.cache.forEachStale(func(lset labels.Labels) bool {
// Series no longer exposed, mark it stale.
_, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN))
switch err {
case errSeriesDropped:
err = nil
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if a target
// goes away and comes back again with a new scrape loop.
err = nil
}
}
return err == nil
})
}
if err != nil {
app.Rollback()
@ -762,13 +814,7 @@ loop:
return total, 0, err
}
// Swap current and previous series.
sl.seriesPrev, sl.seriesCur = sl.seriesCur, sl.seriesPrev
// We have to delete every single key in the map.
for k := range sl.seriesCur {
delete(sl.seriesCur, k)
}
sl.cache.iterDone()
return total, added, nil
}
@ -833,12 +879,13 @@ func (sl *scrapeLoop) reportStale(start time.Time) error {
}
func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error {
re, ok := sl.refCache[s]
// Suffix s with the invalid \xff unicode rune to avoid collisions
// with scraped metrics.
s2 := s + "\xff"
ref, ok := sl.cache.getRef(s2)
if ok {
re.lastIter = sl.iter
err := app.AddFast(re.ref, t, v)
err := app.AddFast(ref, t, v)
switch err {
case nil:
return nil
@ -858,7 +905,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v
ref, err := app.Add(met, t, v)
switch err {
case nil:
sl.refCache[s] = &refEntry{ref: ref, lastIter: sl.iter}
sl.cache.addRef(s2, ref, met)
return nil
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
return nil

View File

@ -373,11 +373,9 @@ func TestScrapeLoopStop(t *testing.T) {
// Succeed once, several failures, then stop.
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
numScrapes += 1
numScrapes++
if numScrapes == 2 {
go func() {
sl.stop()
}()
go sl.stop()
}
w.Write([]byte("metric_a 42\n"))
return nil
@ -398,7 +396,7 @@ func TestScrapeLoopStop(t *testing.T) {
t.Fatalf("Appended samples not as expected. Wanted: at least %d samples Got: %d", 2, len(appender.result))
}
if !value.IsStaleNaN(appender.result[len(appender.result)-1].v) {
t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[len(appender.result)].v))
t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[len(appender.result)-1].v))
}
if len(reportAppender.result) < 8 {
@ -515,7 +513,8 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
// Succeed once, several failures, then stop.
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
numScrapes += 1
numScrapes++
if numScrapes == 1 {
w.Write([]byte("metric_a 42\n"))
return nil
@ -564,7 +563,8 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
// Succeed once, several failures, then stop.
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
numScrapes += 1
numScrapes++
if numScrapes == 1 {
w.Write([]byte("metric_a 42\n"))
return nil
@ -601,15 +601,12 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
func TestScrapeLoopAppend(t *testing.T) {
app := &collectResultAppender{}
sl := &scrapeLoop{
appender: func() storage.Appender { return app },
reportAppender: func() storage.Appender { return nopAppender{} },
refCache: map[string]*refEntry{},
lsetCache: map[string]*lsetCacheEntry{},
seriesCur: map[uint64]labels.Labels{},
seriesPrev: map[uint64]labels.Labels{},
}
sl := newScrapeLoop(context.Background(), nil,
func() storage.Appender { return app },
func() storage.Appender { return nopAppender{} },
nil,
)
now := time.Now()
_, _, err := sl.append([]byte("metric_a 1\nmetric_b NaN\n"), now)
if err != nil {
@ -642,14 +639,11 @@ func TestScrapeLoopAppend(t *testing.T) {
func TestScrapeLoopAppendStaleness(t *testing.T) {
app := &collectResultAppender{}
sl := &scrapeLoop{
appender: func() storage.Appender { return app },
reportAppender: func() storage.Appender { return nopAppender{} },
refCache: map[string]*refEntry{},
lsetCache: map[string]*lsetCacheEntry{},
seriesCur: map[uint64]labels.Labels{},
seriesPrev: map[uint64]labels.Labels{},
}
sl := newScrapeLoop(context.Background(), nil,
func() storage.Appender { return app },
func() storage.Appender { return nopAppender{} },
nil,
)
now := time.Now()
_, _, err := sl.append([]byte("metric_a 1\n"), now)
@ -688,12 +682,11 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
app := &collectResultAppender{}
sl := &scrapeLoop{
appender: func() storage.Appender { return app },
reportAppender: func() storage.Appender { return nopAppender{} },
refCache: map[string]*refEntry{},
lsetCache: map[string]*lsetCacheEntry{},
}
sl := newScrapeLoop(context.Background(), nil,
func() storage.Appender { return app },
func() storage.Appender { return nopAppender{} },
nil,
)
now := time.Now()
_, _, err := sl.append([]byte("metric_a 1 1000\n"), now)
@ -737,15 +730,11 @@ func (app *errorAppender) AddFast(ref string, t int64, v float64) error {
func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) {
app := &errorAppender{}
sl := &scrapeLoop{
appender: func() storage.Appender { return app },
reportAppender: func() storage.Appender { return nopAppender{} },
refCache: map[string]*refEntry{},
lsetCache: map[string]*lsetCacheEntry{},
seriesCur: map[uint64]labels.Labels{},
seriesPrev: map[uint64]labels.Labels{},
l: log.Base(),
}
sl := newScrapeLoop(context.Background(), nil,
func() storage.Appender { return app },
func() storage.Appender { return nopAppender{} },
nil,
)
now := time.Unix(1, 0)
_, _, err := sl.append([]byte("out_of_order 1\namend 1\nnormal 1\n"), now)