Send target and metadata cache in context (again) (#10636)

* Send target and metadata cache in context (again)

The previous attempt was rolled back in #10590 due to memory issues.

`sl.parentCtx` and `sl.ctx` both had a copy of the cache and target info
in the previous attempt and it was hard to pin-point where the context
was being retained causing the memory increase.

I've experimented a bunch in #10627 to figure out that this approach doesn't
cause memory increase. Beyond that, just using this info in _any_ other context
is causing a memory increase.

The change fixed a bunch of long-standing in the OTel Collector that the
community was waiting on and release is blocked on a few downstream distrubutions
of OTel Collector waiting on a fix. I propose to merge this change in while
I investigate what is happening.

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

* Gate the change behind a manager option

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
This commit is contained in:
Goutham Veeramachaneni 2022-05-03 11:45:52 -07:00 committed by GitHub
parent dd5f9daab9
commit 2381d7be57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 164 additions and 23 deletions

View File

@ -124,6 +124,9 @@ func NewManager(o *Options, logger log.Logger, app storage.Appendable) *Manager
// Options are the configuration parameters to the scrape manager.
type Options struct {
ExtraMetrics bool
// Option used by downstream scraper users like OpenTelemetry Collector
// to help lookup metric metadata. Should be false for Prometheus.
PassMetadataInContext bool
// Optional HTTP client options to use when scraping.
HTTPClientOptions []config_util.HTTPClientOption
@ -195,7 +198,7 @@ func (m *Manager) reload() {
level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
continue
}
sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName), m.opts.ExtraMetrics, m.opts.HTTPClientOptions)
sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName), m.opts.ExtraMetrics, m.opts.PassMetadataInContext, m.opts.HTTPClientOptions)
if err != nil {
level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
continue

View File

@ -264,7 +264,7 @@ const maxAheadTime = 10 * time.Minute
type labelsMutator func(labels.Labels) labels.Labels
func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger, reportExtraMetrics bool, httpOpts []config_util.HTTPClientOption) (*scrapePool, error) {
func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger, reportExtraMetrics, passMetadataInContext bool, httpOpts []config_util.HTTPClientOption) (*scrapePool, error) {
targetScrapePools.Inc()
if logger == nil {
logger = log.NewNopLogger()
@ -315,6 +315,9 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed
opts.interval,
opts.timeout,
reportExtraMetrics,
opts.target,
cache,
passMetadataInContext,
)
}
@ -855,10 +858,11 @@ type scrapeLoop struct {
sampleMutator labelsMutator
reportSampleMutator labelsMutator
parentCtx context.Context
ctx context.Context
cancel func()
stopped chan struct{}
parentCtx context.Context
appenderCtx context.Context
ctx context.Context
cancel func()
stopped chan struct{}
disabledEndOfRunStalenessMarkers bool
@ -1124,6 +1128,9 @@ func newScrapeLoop(ctx context.Context,
interval time.Duration,
timeout time.Duration,
reportExtraMetrics bool,
target *Target,
metricMetadataStore MetricMetadataStore,
passMetadataInContext bool,
) *scrapeLoop {
if l == nil {
l = log.NewNopLogger()
@ -1134,6 +1141,18 @@ func newScrapeLoop(ctx context.Context,
if cache == nil {
cache = newScrapeCache()
}
appenderCtx := ctx
if passMetadataInContext {
// Store the cache and target in the context. This is then used by downstream OTel Collector
// to lookup the metadata required to process the samples. Not used by Prometheus itself.
// TODO(gouthamve) We're using a dedicated context because using the parentCtx caused a memory
// leak. We should ideally fix the main leak. See: https://github.com/prometheus/prometheus/pull/10590
appenderCtx = ContextWithMetricMetadataStore(appenderCtx, cache)
appenderCtx = ContextWithTarget(appenderCtx, target)
}
sl := &scrapeLoop{
scraper: sc,
buffers: buffers,
@ -1145,6 +1164,7 @@ func newScrapeLoop(ctx context.Context,
jitterSeed: jitterSeed,
l: l,
parentCtx: ctx,
appenderCtx: appenderCtx,
honorTimestamps: honorTimestamps,
sampleLimit: sampleLimit,
labelLimits: labelLimits,
@ -1223,7 +1243,7 @@ mainLoop:
// scrapeAndReport performs a scrape and then appends the result to the storage
// together with reporting metrics, by using as few appenders as possible.
// In the happy scenario, a single appender is used.
// This function uses sl.parentCtx instead of sl.ctx on purpose. A scrape should
// This function uses sl.appenderCtx instead of sl.ctx on purpose. A scrape should
// only be cancelled on shutdown, not on reloads.
func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- error) time.Time {
start := time.Now()
@ -1242,7 +1262,7 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
var total, added, seriesAdded, bytes int
var err, appErr, scrapeErr error
app := sl.appender(sl.parentCtx)
app := sl.appender(sl.appenderCtx)
defer func() {
if err != nil {
app.Rollback()
@ -1265,7 +1285,7 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
// Add stale markers.
if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil {
app.Rollback()
app = sl.appender(sl.parentCtx)
app = sl.appender(sl.appenderCtx)
level.Warn(sl.l).Log("msg", "Append failed", "err", err)
}
if errc != nil {
@ -1304,13 +1324,13 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
total, added, seriesAdded, appErr = sl.append(app, b, contentType, appendTime)
if appErr != nil {
app.Rollback()
app = sl.appender(sl.parentCtx)
app = sl.appender(sl.appenderCtx)
level.Debug(sl.l).Log("msg", "Append failed", "err", appErr)
// The append failed, probably due to a parse error or sample limit.
// Call sl.append again with an empty scrape to trigger stale markers.
if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil {
app.Rollback()
app = sl.appender(sl.parentCtx)
app = sl.appender(sl.appenderCtx)
level.Warn(sl.l).Log("msg", "Append failed", "err", err)
}
}
@ -1374,8 +1394,8 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
// Call sl.append again with an empty scrape to trigger stale markers.
// If the target has since been recreated and scraped, the
// stale markers will be out of order and ignored.
// sl.context would have been cancelled, hence using sl.parentCtx.
app := sl.appender(sl.parentCtx)
// sl.context would have been cancelled, hence using sl.appenderCtx.
app := sl.appender(sl.appenderCtx)
var err error
defer func() {
if err != nil {
@ -1389,7 +1409,7 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
}()
if _, _, _, err = sl.append(app, []byte{}, "", staleTime); err != nil {
app.Rollback()
app = sl.appender(sl.parentCtx)
app = sl.appender(sl.appenderCtx)
level.Warn(sl.l).Log("msg", "Stale append failed", "err", err)
}
if err = sl.reportStale(app, staleTime); err != nil {
@ -1791,3 +1811,31 @@ func reusableCache(r, l *config.ScrapeConfig) bool {
}
return reflect.DeepEqual(zeroConfig(r), zeroConfig(l))
}
// CtxKey is a dedicated type for keys of context-embedded values propagated
// with the scrape context.
type ctxKey int
// Valid CtxKey values.
const (
ctxKeyMetadata ctxKey = iota + 1
ctxKeyTarget
)
func ContextWithMetricMetadataStore(ctx context.Context, s MetricMetadataStore) context.Context {
return context.WithValue(ctx, ctxKeyMetadata, s)
}
func MetricMetadataStoreFromContext(ctx context.Context) (MetricMetadataStore, bool) {
s, ok := ctx.Value(ctxKeyMetadata).(MetricMetadataStore)
return s, ok
}
func ContextWithTarget(ctx context.Context, t *Target) context.Context {
return context.WithValue(ctx, ctxKeyTarget, t)
}
func TargetFromContext(ctx context.Context) (*Target, bool) {
t, ok := ctx.Value(ctxKeyTarget).(*Target)
return t, ok
}

View File

@ -56,7 +56,7 @@ func TestNewScrapePool(t *testing.T) {
var (
app = &nopAppendable{}
cfg = &config.ScrapeConfig{}
sp, _ = newScrapePool(cfg, app, 0, nil, false, nil)
sp, _ = newScrapePool(cfg, app, 0, nil, false, false, nil)
)
if a, ok := sp.appendable.(*nopAppendable); !ok || a != app {
@ -91,7 +91,7 @@ func TestDroppedTargetsList(t *testing.T) {
},
},
}
sp, _ = newScrapePool(cfg, app, 0, nil, false, nil)
sp, _ = newScrapePool(cfg, app, 0, nil, false, false, nil)
expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", __scrape_interval__=\"0s\", __scrape_timeout__=\"0s\", job=\"dropMe\"}"
expectedLength = 1
)
@ -454,7 +454,7 @@ func TestScrapePoolTargetLimit(t *testing.T) {
func TestScrapePoolAppender(t *testing.T) {
cfg := &config.ScrapeConfig{}
app := &nopAppendable{}
sp, _ := newScrapePool(cfg, app, 0, nil, false, nil)
sp, _ := newScrapePool(cfg, app, 0, nil, false, false, nil)
loop := sp.newLoop(scrapeLoopOptions{
target: &Target{},
@ -496,7 +496,7 @@ func TestScrapePoolRaces(t *testing.T) {
newConfig := func() *config.ScrapeConfig {
return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout}
}
sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, false, nil)
sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, false, false, nil)
tgts := []*targetgroup.Group{
{
Targets: []model.LabelSet{
@ -590,6 +590,9 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) {
1,
0,
false,
nil,
nil,
false,
)
// The scrape pool synchronizes on stopping scrape loops. However, new scrape
@ -659,6 +662,9 @@ func TestScrapeLoopStop(t *testing.T) {
10*time.Millisecond,
time.Hour,
false,
nil,
nil,
false,
)
// Terminate loop after 2 scrapes.
@ -731,6 +737,9 @@ func TestScrapeLoopRun(t *testing.T) {
time.Second,
time.Hour,
false,
nil,
nil,
false,
)
// The loop must terminate during the initial offset if the context
@ -783,6 +792,9 @@ func TestScrapeLoopRun(t *testing.T) {
time.Second,
100*time.Millisecond,
false,
nil,
nil,
false,
)
go func() {
@ -839,6 +851,9 @@ func TestScrapeLoopForcedErr(t *testing.T) {
time.Second,
time.Hour,
false,
nil,
nil,
false,
)
forcedErr := fmt.Errorf("forced err")
@ -894,6 +909,9 @@ func TestScrapeLoopMetadata(t *testing.T) {
0,
0,
false,
nil,
nil,
false,
)
defer cancel()
@ -948,6 +966,9 @@ func simpleTestScrapeLoop(t testing.TB) (context.Context, *scrapeLoop) {
0,
0,
false,
nil,
nil,
false,
)
t.Cleanup(func() { cancel() })
@ -1038,6 +1059,9 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
10*time.Millisecond,
time.Hour,
false,
nil,
nil,
false,
)
// Succeed once, several failures, then stop.
numScrapes := 0
@ -1097,6 +1121,9 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
10*time.Millisecond,
time.Hour,
false,
nil,
nil,
false,
)
// Succeed once, several failures, then stop.
@ -1160,6 +1187,9 @@ func TestScrapeLoopCache(t *testing.T) {
10*time.Millisecond,
time.Hour,
false,
nil,
nil,
false,
)
numScrapes := 0
@ -1239,6 +1269,9 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) {
10*time.Millisecond,
time.Hour,
false,
nil,
nil,
false,
)
numScrapes := 0
@ -1350,6 +1383,9 @@ func TestScrapeLoopAppend(t *testing.T) {
0,
0,
false,
nil,
nil,
false,
)
now := time.Now()
@ -1437,7 +1473,7 @@ func TestScrapeLoopAppendForConflictingPrefixedLabels(t *testing.T) {
return mutateSampleLabels(l, &Target{labels: labels.FromStrings(tc.targetLabels...)}, false, nil)
},
nil,
func(ctx context.Context) storage.Appender { return app }, nil, 0, true, 0, nil, 0, 0, false,
func(ctx context.Context) storage.Appender { return app }, nil, 0, true, 0, nil, 0, 0, false, nil, nil, false,
)
slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(tc.exposedLabels), "", time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC))
@ -1473,6 +1509,9 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
0,
0,
false,
nil,
nil,
false,
)
fakeRef := storage.SeriesRef(1)
@ -1528,6 +1567,9 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
0,
0,
false,
nil,
nil,
false,
)
// Get the value of the Counter before performing the append.
@ -1602,6 +1644,9 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
0,
0,
false,
nil,
nil,
false,
)
now := time.Now()
@ -1647,6 +1692,9 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
0,
0,
false,
nil,
nil,
false,
)
now := time.Now()
@ -1695,6 +1743,9 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
0,
0,
false,
nil,
nil,
false,
)
now := time.Now()
@ -1803,6 +1854,9 @@ metric_total{n="2"} 2 # {t="2"} 2.0 20000
0,
0,
false,
nil,
nil,
false,
)
now := time.Now()
@ -1865,6 +1919,9 @@ func TestScrapeLoopAppendExemplarSeries(t *testing.T) {
0,
0,
false,
nil,
nil,
false,
)
now := time.Now()
@ -1914,6 +1971,9 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
10*time.Millisecond,
time.Hour,
false,
nil,
nil,
false,
)
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
@ -1947,6 +2007,9 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) {
10*time.Millisecond,
time.Hour,
false,
nil,
nil,
false,
)
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
@ -1993,6 +2056,9 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
0,
0,
false,
nil,
nil,
false,
)
now := time.Unix(1, 0)
@ -2035,6 +2101,9 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
0,
0,
false,
nil,
nil,
false,
)
now := time.Now().Add(20 * time.Minute)
@ -2289,6 +2358,9 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) {
0,
0,
false,
nil,
nil,
false,
)
now := time.Now()
@ -2327,6 +2399,9 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
0,
0,
false,
nil,
nil,
false,
)
now := time.Now()
@ -2364,6 +2439,9 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
0,
0,
false,
nil,
nil,
false,
)
defer cancel()
@ -2419,6 +2497,9 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
0,
0,
false,
nil,
nil,
false,
)
defer cancel()
@ -2511,7 +2592,7 @@ func TestReuseScrapeCache(t *testing.T) {
ScrapeInterval: model.Duration(5 * time.Second),
MetricsPath: "/metrics",
}
sp, _ = newScrapePool(cfg, app, 0, nil, false, nil)
sp, _ = newScrapePool(cfg, app, 0, nil, false, false, nil)
t1 = &Target{
discoveredLabels: labels.Labels{
labels.Label{
@ -2692,6 +2773,9 @@ func TestScrapeAddFast(t *testing.T) {
0,
0,
false,
nil,
nil,
false,
)
defer cancel()
@ -2721,7 +2805,7 @@ func TestReuseCacheRace(t *testing.T) {
ScrapeInterval: model.Duration(5 * time.Second),
MetricsPath: "/metrics",
}
sp, _ = newScrapePool(cfg, app, 0, nil, false, nil)
sp, _ = newScrapePool(cfg, app, 0, nil, false, false, nil)
t1 = &Target{
discoveredLabels: labels.Labels{
labels.Label{
@ -2780,6 +2864,9 @@ func TestScrapeReportSingleAppender(t *testing.T) {
10*time.Millisecond,
time.Hour,
false,
nil,
nil,
false,
)
numScrapes := 0
@ -2850,7 +2937,7 @@ func TestScrapeReportLimit(t *testing.T) {
}))
defer ts.Close()
sp, err := newScrapePool(cfg, s, 0, nil, false, nil)
sp, err := newScrapePool(cfg, s, 0, nil, false, false, nil)
require.NoError(t, err)
defer sp.stop()
@ -2979,6 +3066,9 @@ func TestScrapeLoopLabelLimit(t *testing.T) {
0,
0,
false,
nil,
nil,
false,
)
slApp := sl.appender(context.Background())
@ -3017,7 +3107,7 @@ func TestTargetScrapeIntervalAndTimeoutRelabel(t *testing.T) {
},
},
}
sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, false, nil)
sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, false, false, nil)
tgts := []*targetgroup.Group{
{
Targets: []model.LabelSet{{model.AddressLabel: "127.0.0.1:9090"}},