diff --git a/cmd/prometheus/config.go b/cmd/prometheus/config.go index 19d51c989..3ca28985e 100644 --- a/cmd/prometheus/config.go +++ b/cmd/prometheus/config.go @@ -107,7 +107,7 @@ func init() { ) cfg.fs.IntVar( &cfg.storage.MemoryChunks, "storage.local.memory-chunks", 1024*1024, - "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.", + "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily. Sample ingestion will be throttled if the configured value is exceeded by more than 10%.", ) cfg.fs.DurationVar( &cfg.storage.PersistenceRetentionPeriod, "storage.local.retention", 15*24*time.Hour, @@ -115,7 +115,7 @@ func init() { ) cfg.fs.IntVar( &cfg.storage.MaxChunksToPersist, "storage.local.max-chunks-to-persist", 512*1024, - "How many chunks can be waiting for persistence before sample ingestion will stop. Many chunks waiting to be persisted will increase the checkpoint size.", + "How many chunks can be waiting for persistence before sample ingestion will be throttled. Many chunks waiting to be persisted will increase the checkpoint size.", ) cfg.fs.DurationVar( &cfg.storage.CheckpointInterval, "storage.local.checkpoint-interval", 5*time.Minute, diff --git a/retrieval/helpers_test.go b/retrieval/helpers_test.go index 880e6230c..1b74b2ea4 100644 --- a/retrieval/helpers_test.go +++ b/retrieval/helpers_test.go @@ -14,8 +14,6 @@ package retrieval import ( - "time" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" @@ -26,14 +24,13 @@ type nopAppender struct{} func (a nopAppender) Append(*model.Sample) { } -type slowAppender struct{} - -func (a slowAppender) Append(*model.Sample) { - time.Sleep(time.Millisecond) +func (a nopAppender) NeedsThrottling() bool { + return false } type collectResultAppender struct { - result model.Samples + result model.Samples + throttled bool } func (a *collectResultAppender) Append(s *model.Sample) { @@ -45,6 +42,10 @@ func (a *collectResultAppender) Append(s *model.Sample) { a.result = append(a.result, s) } +func (a *collectResultAppender) NeedsThrottling() bool { + return a.throttled +} + // fakeTargetProvider implements a TargetProvider and allows manual injection // of TargetGroups through the update channel. type fakeTargetProvider struct { diff --git a/retrieval/target.go b/retrieval/target.go index 6139c3932..133009f4c 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -48,7 +48,7 @@ const ( ) var ( - errIngestChannelFull = errors.New("ingestion channel full") + errSkippedScrape = errors.New("scrape skipped due to throttled ingestion") targetIntervalLength = prometheus.NewSummaryVec( prometheus.SummaryOpts{ @@ -59,10 +59,19 @@ var ( }, []string{interval}, ) + targetSkippedScrapes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "target_skipped_scrapes_total", + Help: "Total number of scrapes that were skipped because the metric storage was throttled.", + }, + []string{interval}, + ) ) func init() { prometheus.MustRegister(targetIntervalLength) + prometheus.MustRegister(targetSkippedScrapes) } // TargetHealth describes the health state of a target. @@ -151,8 +160,6 @@ type Target struct { scraperStopping chan struct{} // Closing scraperStopped signals that scraping has been stopped. scraperStopped chan struct{} - // Channel to buffer ingested samples. - ingestedSamples chan model.Vector // Mutex protects the members below. sync.RWMutex @@ -166,8 +173,6 @@ type Target struct { baseLabels model.LabelSet // Internal labels, such as scheme. internalLabels model.LabelSet - // What is the deadline for the HTTP or HTTPS against this endpoint. - deadline time.Duration // The time between two scrapes. scrapeInterval time.Duration // Whether the target's labels have precedence over the base labels @@ -237,7 +242,6 @@ func (t *Target) Update(cfg *config.ScrapeConfig, baseLabels, metaLabels model.L t.url.RawQuery = params.Encode() t.scrapeInterval = time.Duration(cfg.ScrapeInterval) - t.deadline = time.Duration(cfg.ScrapeTimeout) t.honorLabels = cfg.HonorLabels t.metaLabels = metaLabels @@ -361,6 +365,11 @@ func (t *Target) RunScraper(sampleAppender storage.SampleAppender) { targetIntervalLength.WithLabelValues(intervalStr).Observe( float64(took) / float64(time.Second), // Sub-second precision. ) + if sampleAppender.NeedsThrottling() { + targetSkippedScrapes.WithLabelValues(intervalStr).Inc() + t.status.setLastError(errSkippedScrape) + continue + } t.scrape(sampleAppender) } } @@ -377,26 +386,6 @@ func (t *Target) StopScraper() { log.Debugf("Scraper for target %v stopped.", t) } -func (t *Target) ingest(s model.Vector) error { - t.RLock() - deadline := t.deadline - t.RUnlock() - // Since the regular case is that ingestedSamples is ready to receive, - // first try without setting a timeout so that we don't need to allocate - // a timer most of the time. - select { - case t.ingestedSamples <- s: - return nil - default: - select { - case t.ingestedSamples <- s: - return nil - case <-time.After(deadline / 10): - return errIngestChannelFull - } - } -} - const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1` func (t *Target) scrape(appender storage.SampleAppender) (err error) { @@ -414,20 +403,20 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { // so the relabeling rules are applied to the correct label set. if len(t.metricRelabelConfigs) > 0 { appender = relabelAppender{ - app: appender, - relabelings: t.metricRelabelConfigs, + SampleAppender: appender, + relabelings: t.metricRelabelConfigs, } } if t.honorLabels { appender = honorLabelsAppender{ - app: appender, - labels: baseLabels, + SampleAppender: appender, + labels: baseLabels, } } else { appender = ruleLabelsAppender{ - app: appender, - labels: baseLabels, + SampleAppender: appender, + labels: baseLabels, } } @@ -460,27 +449,11 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { }, } - t.ingestedSamples = make(chan model.Vector, ingestedSamplesCap) - - go func() { - for { - // TODO(fabxc): Change the SampleAppender interface to return an error - // so we can proceed based on the status and don't leak goroutines trying - // to append a single sample after dropping all the other ones. - // - // This will also allow use to reuse this vector and save allocations. - var samples model.Vector - if err = sdec.Decode(&samples); err != nil { - break - } - if err = t.ingest(samples); err != nil { - break - } + var samples model.Vector + for { + if err = sdec.Decode(&samples); err != nil { + break } - close(t.ingestedSamples) - }() - - for samples := range t.ingestedSamples { for _, s := range samples { appender.Append(s) } @@ -495,7 +468,7 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { // Merges the ingested sample's metric with the label set. On a collision the // value of the ingested label is stored in a label prefixed with 'exported_'. type ruleLabelsAppender struct { - app storage.SampleAppender + storage.SampleAppender labels model.LabelSet } @@ -507,11 +480,11 @@ func (app ruleLabelsAppender) Append(s *model.Sample) { s.Metric[ln] = lv } - app.app.Append(s) + app.SampleAppender.Append(s) } type honorLabelsAppender struct { - app storage.SampleAppender + storage.SampleAppender labels model.LabelSet } @@ -525,13 +498,13 @@ func (app honorLabelsAppender) Append(s *model.Sample) { } } - app.app.Append(s) + app.SampleAppender.Append(s) } // Applies a set of relabel configurations to the sample's metric // before actually appending it. type relabelAppender struct { - app storage.SampleAppender + storage.SampleAppender relabelings []*config.RelabelConfig } @@ -547,7 +520,7 @@ func (app relabelAppender) Append(s *model.Sample) { } s.Metric = model.Metric(labels) - app.app.Append(s) + app.SampleAppender.Append(s) } // URL returns a copy of the target's URL. diff --git a/retrieval/target_test.go b/retrieval/target_test.go index acacc86aa..7fdc56efc 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -139,12 +139,12 @@ func TestTargetScrapeUpdatesState(t *testing.T) { } } -func TestTargetScrapeWithFullChannel(t *testing.T) { +func TestTargetScrapeWithThrottledStorage(t *testing.T) { server := httptest.NewServer( http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", `text/plain; version=0.0.4`) - for i := 0; i < 2*ingestedSamplesCap; i++ { + for i := 0; i < 10; i++ { w.Write([]byte( fmt.Sprintf("test_metric_%d{foo=\"bar\"} 123.456\n", i), )) @@ -155,15 +155,21 @@ func TestTargetScrapeWithFullChannel(t *testing.T) { defer server.Close() testTarget := newTestTarget(server.URL, time.Second, model.LabelSet{"dings": "bums"}) - // Affects full channel but not HTTP fetch - testTarget.deadline = 0 - testTarget.scrape(slowAppender{}) + go testTarget.RunScraper(&collectResultAppender{throttled: true}) + + // Enough time for a scrape to happen. + time.Sleep(20 * time.Millisecond) + + testTarget.StopScraper() + // Wait for it to take effect. + time.Sleep(20 * time.Millisecond) + if testTarget.status.Health() != HealthBad { t.Errorf("Expected target state %v, actual: %v", HealthBad, testTarget.status.Health()) } - if testTarget.status.LastError() != errIngestChannelFull { - t.Errorf("Expected target error %q, actual: %q", errIngestChannelFull, testTarget.status.LastError()) + if testTarget.status.LastError() != errSkippedScrape { + t.Errorf("Expected target error %q, actual: %q", errSkippedScrape, testTarget.status.LastError()) } } @@ -450,7 +456,6 @@ func newTestTarget(targetURL string, deadline time.Duration, baseLabels model.La Host: strings.TrimLeft(targetURL, "http://"), Path: "/metrics", }, - deadline: deadline, status: &TargetStatus{}, scrapeInterval: 1 * time.Millisecond, httpClient: c, diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 8a453e70a..0da126f8e 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -165,6 +165,7 @@ func (tm *TargetManager) Run() { }) tm.running = true + log.Info("Target manager started.") } // handleUpdates receives target group updates and handles them in the diff --git a/rules/manager.go b/rules/manager.go index 2fea605a9..e9e7096e7 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -66,9 +66,19 @@ var ( iterationDuration = prometheus.NewSummary(prometheus.SummaryOpts{ Namespace: namespace, Name: "evaluator_duration_seconds", - Help: "The duration for all evaluations to execute.", + Help: "The duration of rule group evaluations.", Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, }) + iterationsSkipped = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Name: "evaluator_iterations_skipped_total", + Help: "The total number of rule group evaluations skipped due to throttled metric storage.", + }) + iterationsScheduled = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Name: "evaluator_iterations_total", + Help: "The total number of scheduled rule group evaluations, whether skipped or executed.", + }) ) func init() { @@ -78,6 +88,7 @@ func init() { evalFailures.WithLabelValues(string(ruleTypeRecording)) prometheus.MustRegister(iterationDuration) + prometheus.MustRegister(iterationsSkipped) prometheus.MustRegister(evalFailures) prometheus.MustRegister(evalDuration) } @@ -133,6 +144,11 @@ func (g *Group) run() { } iter := func() { + iterationsScheduled.Inc() + if g.opts.SampleAppender.NeedsThrottling() { + iterationsSkipped.Inc() + return + } start := time.Now() g.eval() diff --git a/storage/local/interface.go b/storage/local/interface.go index 6c7df5ca2..e260cc762 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -34,6 +34,9 @@ type Storage interface { // from the provided Sample as those labels are considered equivalent to // a label not present at all. Append(*model.Sample) + // NeedsThrottling returns true if the Storage has too many chunks in memory + // already or has too many chunks waiting for persistence. + NeedsThrottling() bool // NewPreloader returns a new Preloader which allows preloading and pinning // series data into memory for use within a query. NewPreloader() Preloader diff --git a/storage/local/storage.go b/storage/local/storage.go index 3d5aeed56..29700163a 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -47,9 +47,9 @@ const ( persintenceUrgencyScoreForLeavingRushedMode = 0.7 // This factor times -storage.local.memory-chunks is the number of - // memory chunks we tolerate before suspending ingestion (TODO!). It is - // also a basis for calculating the persistenceUrgencyScore. - toleranceFactorForMemChunks = 1.1 + // memory chunks we tolerate before throttling the storage. It is also a + // basis for calculating the persistenceUrgencyScore. + toleranceFactorMemChunks = 1.1 // This factor times -storage.local.max-chunks-to-persist is the minimum // required number of chunks waiting for persistence before the number // of chunks in memory may influence the persistenceUrgencyScore. (In @@ -121,9 +121,10 @@ type syncStrategy func() bool type memorySeriesStorage struct { // numChunksToPersist has to be aligned for atomic operations. - numChunksToPersist int64 // The number of chunks waiting for persistence. - maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will stall. - rushed bool // Whether the storage is in rushed mode. + numChunksToPersist int64 // The number of chunks waiting for persistence. + maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled. + rushed bool // Whether the storage is in rushed mode. + throttled chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging). fpLocker *fingerprintLocker fpToSeries *seriesMap @@ -180,6 +181,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { loopStopping: make(chan struct{}), loopStopped: make(chan struct{}), + throttled: make(chan struct{}, 1), maxMemoryChunks: o.MemoryChunks, dropAfter: o.PersistenceRetentionPeriod, checkpointInterval: o.CheckpointInterval, @@ -306,6 +308,7 @@ func (s *memorySeriesStorage) Start() (err error) { } go s.handleEvictList() + go s.logThrottling() go s.loop() return nil @@ -571,16 +574,6 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) { delete(sample.Metric, ln) } } - if s.getNumChunksToPersist() >= s.maxChunksToPersist { - log.Warnf( - "%d chunks waiting for persistence, sample ingestion suspended.", - s.getNumChunksToPersist(), - ) - for s.getNumChunksToPersist() >= s.maxChunksToPersist { - time.Sleep(time.Second) - } - log.Warn("Sample ingestion resumed.") - } rawFP := sample.Metric.FastFingerprint() s.fpLocker.Lock(rawFP) fp, err := s.mapper.mapFP(rawFP, sample.Metric) @@ -616,6 +609,57 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) { s.incNumChunksToPersist(completedChunksCount) } +// NeedsThrottling implements Storage. +func (s *memorySeriesStorage) NeedsThrottling() bool { + if s.getNumChunksToPersist() > s.maxChunksToPersist || + float64(atomic.LoadInt64(&numMemChunks)) > float64(s.maxMemoryChunks)*toleranceFactorMemChunks { + select { + case s.throttled <- struct{}{}: + default: // Do nothing, signal aready pending. + } + return true + } + return false +} + +// logThrottling handles logging of throttled events and has to be started as a +// goroutine. It stops once s.loopStopping is closed. +// +// Logging strategy: Whenever Throttle() is called and returns true, an signal +// is sent to s.throttled. If that happens for the first time, an Error is +// logged that the storage is now throttled. As long as signals continues to be +// sent via s.throttled at least once per minute, nothing else is logged. Once +// no signal has arrived for a minute, an Info is logged that the storage is not +// throttled anymore. This resets things to the initial state, i.e. once a +// signal arrives again, the Error will be logged again. +func (s *memorySeriesStorage) logThrottling() { + timer := time.NewTimer(time.Minute) + timer.Stop() + + for { + select { + case <-s.throttled: + if !timer.Reset(time.Minute) { + log. + With("chunksToPersist", s.getNumChunksToPersist()). + With("maxChunksToPersist", s.maxChunksToPersist). + With("memoryChunks", atomic.LoadInt64(&numMemChunks)). + With("maxToleratedMemChunks", int(float64(s.maxMemoryChunks)*toleranceFactorMemChunks)). + Error("Storage needs throttling. Scrapes and rule evaluations will be skipped.") + } + case <-timer.C: + log. + With("chunksToPersist", s.getNumChunksToPersist()). + With("maxChunksToPersist", s.maxChunksToPersist). + With("memoryChunks", atomic.LoadInt64(&numMemChunks)). + With("maxToleratedMemChunks", int(float64(s.maxMemoryChunks)*toleranceFactorMemChunks)). + Info("Storage does not need throttling anymore.") + case <-s.loopStopping: + return + } + } +} + func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) *memorySeries { series, ok := s.fpToSeries.get(fp) if !ok { @@ -1210,7 +1254,7 @@ func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 { if chunksToPersist > maxChunksToPersist*factorMinChunksToPersist { score = math.Max( score, - (memChunks/maxMemChunks-1)/(toleranceFactorForMemChunks-1), + (memChunks/maxMemChunks-1)/(toleranceFactorMemChunks-1), ) } if score > 1 { @@ -1230,11 +1274,11 @@ func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 { s.rushedMode.Set(0) log. With("urgencyScore", score). - With("chunksToPersist", chunksToPersist). - With("maxChunksToPersist", maxChunksToPersist). - With("memoryChunks", memChunks). - With("maxMemoryChunks", maxMemChunks). - Warn("Storage has left rushed mode.") + With("chunksToPersist", int(chunksToPersist)). + With("maxChunksToPersist", int(maxChunksToPersist)). + With("memoryChunks", int(memChunks)). + With("maxMemoryChunks", int(maxMemChunks)). + Info("Storage has left rushed mode.") return score } if score > persintenceUrgencyScoreForEnteringRushedMode { @@ -1243,10 +1287,10 @@ func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 { s.rushedMode.Set(1) log. With("urgencyScore", score). - With("chunksToPersist", chunksToPersist). - With("maxChunksToPersist", maxChunksToPersist). - With("memoryChunks", memChunks). - With("maxMemoryChunks", maxMemChunks). + With("chunksToPersist", int(chunksToPersist)). + With("maxChunksToPersist", int(maxChunksToPersist)). + With("memoryChunks", int(memChunks)). + With("maxMemoryChunks", int(maxMemChunks)). Warn("Storage has entered rushed mode.") return 1 } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 768f2f3d5..ae3528c7d 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -132,8 +132,7 @@ func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueue } // Append queues a sample to be sent to the remote storage. It drops the -// sample on the floor if the queue is full. It implements -// storage.SampleAppender. +// sample on the floor if the queue is full. func (t *StorageQueueManager) Append(s *model.Sample) { select { case t.queue <- s: diff --git a/storage/remote/remote.go b/storage/remote/remote.go index 91d057a63..d295f8f37 100644 --- a/storage/remote/remote.go +++ b/storage/remote/remote.go @@ -124,6 +124,13 @@ func (s *Storage) Append(smpl *model.Sample) { } } +// NeedsThrottling implements storage.SampleAppender. It will always return +// false as a remote storage drops samples on the floor if backlogging instead +// of asking for throttling. +func (s *Storage) NeedsThrottling() bool { + return false +} + // Describe implements prometheus.Collector. func (s *Storage) Describe(ch chan<- *prometheus.Desc) { for _, q := range s.queues { diff --git a/storage/storage.go b/storage/storage.go index 9f509e2dc..71b6bcfa8 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -18,9 +18,32 @@ import ( ) // SampleAppender is the interface to append samples to both, local and remote -// storage. +// storage. All methods are goroutine-safe. type SampleAppender interface { + // Append appends a sample to the underlying storage. Depending on the + // storage implementation, there are different guarantees for the fate + // of the sample after Append has returned. Remote storage + // implementation will simply drop samples if they cannot keep up with + // sending samples. Local storage implementations will only drop metrics + // upon unrecoverable errors. Reporting any errors is done via metrics + // and logs and not the concern of the caller. Append(*model.Sample) + // NeedsThrottling returns true if the underlying storage wishes to not + // receive any more samples. Append will still work but might lead to + // undue resource usage. It is recommended to call NeedsThrottling once + // before an upcoming batch of Append calls (e.g. a full scrape of a + // target or the evaluation of a rule group) and only proceed with the + // batch if NeedsThrottling returns false. In that way, the result of a + // scrape or of an evaluation of a rule group will always be appended + // completely or not at all, and the work of scraping or evaluation will + // not be performed in vain. Also, a call of NeedsThrottling is + // potentially expensive, so limiting the number of calls is reasonable. + // + // Only SampleAppenders for which it is considered critical to receive + // each and every sample should ever return true. SampleAppenders that + // tolerate not receiving all samples should always return false and + // instead drop samples as they see fit to avoid overload. + NeedsThrottling() bool } // Fanout is a SampleAppender that appends every sample to each SampleAppender @@ -35,3 +58,14 @@ func (f Fanout) Append(s *model.Sample) { a.Append(s) } } + +// NeedsThrottling returns true if at least one of the SampleAppenders in the +// Fanout slice is throttled. +func (f Fanout) NeedsThrottling() bool { + for _, a := range f { + if a.NeedsThrottling() { + return true + } + } + return false +}