diff --git a/storage/local/storage.go b/storage/local/storage.go index 3afc9fb86..3d5aeed56 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -41,26 +41,21 @@ const ( // Constants to control the hysteresis of entering and leaving "rushed // mode". In rushed mode, the dirty series count is ignored for - // checkpointing, and series files are not synced if the adaptive sync - // strategy is used. - // - // If we reach 80% of -storage.local.max-chunks-to-persist, we enter - // "rushed mode". - factorChunksToPersistForEnteringRushedMode = 0.8 - // To leave "rushed mode", we must be below 70% of - // -storage.local.max-chunks-to-persist. - factorChunksToPersistForLeavingRushedMode = 0.7 - // To enter "rushed mode" for other reasons (see below), we must have at - // least 30% of -storage.local.max-chunks-to-persist. - factorMinChunksToPersistToAllowRushedMode = 0.3 - // If the number of chunks in memory reaches 110% of - // -storage.local.memory-chunks, we will enter "rushed mode" (provided - // we have enough chunks to persist at all, see - // factorMinChunksToPersistToAllowRushedMode.) - factorMemChunksForEnteringRushedMode = 1.1 - // To leave "rushed mode", we must be below 105% of - // -storage.local.memory-chunks. - factorMemChunksForLeavingRushedMode = 1.05 + // checkpointing, series are maintained as frequently as possible, and + // series files are not synced if the adaptive sync strategy is used. + persintenceUrgencyScoreForEnteringRushedMode = 0.8 + persintenceUrgencyScoreForLeavingRushedMode = 0.7 + + // This factor times -storage.local.memory-chunks is the number of + // memory chunks we tolerate before suspending ingestion (TODO!). It is + // also a basis for calculating the persistenceUrgencyScore. + toleranceFactorForMemChunks = 1.1 + // This factor times -storage.local.max-chunks-to-persist is the minimum + // required number of chunks waiting for persistence before the number + // of chunks in memory may influence the persistenceUrgencyScore. (In + // other words: if there are no chunks to persist, it doesn't help chunk + // eviction if we speed up persistence.) + factorMinChunksToPersist = 0.2 ) var ( @@ -155,6 +150,8 @@ type memorySeriesStorage struct { outOfOrderSamplesCount prometheus.Counter invalidPreloadRequestsCount prometheus.Counter maintainSeriesDuration *prometheus.SummaryVec + persistenceUrgencyScore prometheus.Gauge + rushedMode prometheus.Gauge } // MemorySeriesStorageOptions contains options needed by @@ -243,6 +240,18 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { }, []string{seriesLocationLabel}, ), + persistenceUrgencyScore: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "persistence_urgency_score", + Help: "A score of urgency to persist chunks, 0 is least urgent, 1 most.", + }), + rushedMode: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "rushed_mode", + Help: "1 if the storage is in rushed mode, 0 otherwise. In rushed mode, the system behaves as if the persistence_urgency_score is 1.", + }), } return s } @@ -256,7 +265,7 @@ func (s *memorySeriesStorage) Start() (err error) { case Always: syncStrategy = func() bool { return true } case Adaptive: - syncStrategy = func() bool { return !s.inRushedMode() } + syncStrategy = func() bool { return s.calculatePersistenceUrgencyScore() < 1 } default: panic("unknown sync strategy") } @@ -823,8 +832,8 @@ func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Finger case <-s.loopStopping: return } - // Reduce the wait time by the backlog score. - s.waitForNextFP(s.fpToSeries.length(), s.persistenceBacklogScore()) + // Reduce the wait time according to the urgency score. + s.waitForNextFP(s.fpToSeries.length(), 1-s.calculatePersistenceUrgencyScore()) count++ } if count > 0 { @@ -916,8 +925,9 @@ loop: // would be counterproductive, as it would slow down chunk persisting even more, // while in a situation like that, where we are clearly lacking speed of disk // maintenance, the best we can do for crash recovery is to persist chunks as - // quickly as possible. So only checkpoint if the storage is not in "rushed mode". - if dirtySeriesCount >= s.checkpointDirtySeriesLimit && !s.inRushedMode() { + // quickly as possible. So only checkpoint if the urgency score is < 1. + if dirtySeriesCount >= s.checkpointDirtySeriesLimit && + s.calculatePersistenceUrgencyScore() < 1 { checkpointTimer.Reset(0) } } @@ -1161,78 +1171,83 @@ func (s *memorySeriesStorage) incNumChunksToPersist(by int) { atomic.AddInt64(&s.numChunksToPersist, int64(by)) } -// inRushedMode returns whether the storage is in "rushed mode", which is the -// case if there are too many chunks waiting for persistence or there are too -// many chunks in memory. The method is not goroutine safe (but only ever called -// from the goroutine dealing with series maintenance). Changes of degradation -// mode are logged. -func (s *memorySeriesStorage) inRushedMode() bool { - chunksToPersist := float64(s.getNumChunksToPersist()) - memChunks := float64(atomic.LoadInt64(&numMemChunks)) - - if s.rushed { - // We are already in rushed mode, so check if we can get out of - // it, using the lower hysteresis thresholds. - s.rushed = chunksToPersist > float64(s.maxChunksToPersist)*factorChunksToPersistForLeavingRushedMode || - memChunks > float64(s.maxMemoryChunks)*factorMemChunksForLeavingRushedMode - if !s.rushed { - log.Warn("Storage has left rushed mode. Things are back to normal.") - } - return s.rushed - } - // We are not rushed yet, so check the higher hysteresis threshold if we enter it now. - // First WRT chunksToPersist... - s.rushed = chunksToPersist > float64(s.maxChunksToPersist)*factorChunksToPersistForEnteringRushedMode - if s.rushed { - log.Warnf( - "%.0f chunks waiting for persistence (%.1f%% of the allowed maximum %d). Storage is now in rushed mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible.", - chunksToPersist, - chunksToPersist*100/float64(s.maxChunksToPersist), - s.maxChunksToPersist, - s.checkpointInterval, - ) - return true - } - // ...then WRT memChunks. - s.rushed = memChunks > float64(s.maxMemoryChunks)*factorMemChunksForEnteringRushedMode && - chunksToPersist > float64(s.maxChunksToPersist)*factorMinChunksToPersistToAllowRushedMode - if s.rushed { - log.Warnf( - "%.0f chunks in memory (%.1f%% of the allowed maximum %d). Storage is now in rushed mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible.", - memChunks, - memChunks*100/float64(s.maxMemoryChunks), - s.maxMemoryChunks, - s.checkpointInterval, - ) - } - return s.rushed -} - -// persistenceBacklogScore works similar to inRushedMode, but returns a score -// about how close we are to degradation. This score is 1.0 if no chunks are -// waiting for persistence or we are not over the threshold for memory chunks, -// and 0.0 if we are at or above the thresholds. However, the score is always 0 -// if the storage is currently in rushed mode. (Getting out of it has a -// hysteresis, so we might be below thresholds again but still in rushed mode.) -func (s *memorySeriesStorage) persistenceBacklogScore() float64 { - if s.inRushedMode() { - return 0 - } - - chunksToPersist := float64(s.getNumChunksToPersist()) - score := 1 - chunksToPersist/(float64(s.maxChunksToPersist)*factorChunksToPersistForEnteringRushedMode) - - if chunksToPersist > float64(s.maxChunksToPersist)*factorMinChunksToPersistToAllowRushedMode { - memChunks := float64(atomic.LoadInt64(&numMemChunks)) - score = math.Min( +// calculatePersistenceUrgencyScore calculates and returns an urgency score for +// the speed of persisting chunks. The score is between 0 and 1, where 0 means +// no urgency at all and 1 means highest urgency. +// +// The score is the maximum of the two following sub-scores: +// +// (1) The first sub-score is the number of chunks waiting for persistence +// divided by the maximum number of chunks allowed to be waiting for +// persistence. +// +// (2) If there are more chunks in memory than allowed AND there are more chunks +// waiting for persistence than factorMinChunksToPersist times +// -storage.local.max-chunks-to-persist, then the second sub-score is the +// fraction the number of memory chunks has reached between +// -storage.local.memory-chunks and toleranceFactorForMemChunks times +// -storage.local.memory-chunks. +// +// Should the score ever hit persintenceUrgencyScoreForEnteringRushedMode, the +// storage locks into "rushed mode", in which the returned score is always +// bumped up to 1 until the non-bumped score is below +// persintenceUrgencyScoreForLeavingRushedMode. +// +// This method is not goroutine-safe, but it is only ever called by the single +// goroutine that is in charge of series maintenance. According to the returned +// score, series maintenence should be sped up. If a score of 1 is returned, +// checkpointing based on dirty-series count should be disabled, and series +// files should not by synced anymore provided the user has specified the +// adaptive sync strategy. +func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 { + var ( + chunksToPersist = float64(s.getNumChunksToPersist()) + maxChunksToPersist = float64(s.maxChunksToPersist) + memChunks = float64(atomic.LoadInt64(&numMemChunks)) + maxMemChunks = float64(s.maxMemoryChunks) + ) + score := chunksToPersist / maxChunksToPersist + if chunksToPersist > maxChunksToPersist*factorMinChunksToPersist { + score = math.Max( score, - 1-(memChunks/float64(s.maxMemoryChunks)-1)/(factorMemChunksForEnteringRushedMode-1), + (memChunks/maxMemChunks-1)/(toleranceFactorForMemChunks-1), ) } - if score < 0 { - return 0 - } if score > 1 { + score = 1 + } + s.persistenceUrgencyScore.Set(score) + + if s.rushed { + // We are already in rushed mode. If the score is still above + // persintenceUrgencyScoreForLeavingRushedMode, return 1 and + // leave things as they are. + if score > persintenceUrgencyScoreForLeavingRushedMode { + return 1 + } + // We are out of rushed mode! + s.rushed = false + s.rushedMode.Set(0) + log. + With("urgencyScore", score). + With("chunksToPersist", chunksToPersist). + With("maxChunksToPersist", maxChunksToPersist). + With("memoryChunks", memChunks). + With("maxMemoryChunks", maxMemChunks). + Warn("Storage has left rushed mode.") + return score + } + if score > persintenceUrgencyScoreForEnteringRushedMode { + // Enter rushed mode. + s.rushed = true + s.rushedMode.Set(1) + log. + With("urgencyScore", score). + With("chunksToPersist", chunksToPersist). + With("maxChunksToPersist", maxChunksToPersist). + With("memoryChunks", memChunks). + With("maxMemoryChunks", maxMemChunks). + Warn("Storage has entered rushed mode.") return 1 } return score @@ -1253,6 +1268,8 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { ch <- s.invalidPreloadRequestsCount.Desc() ch <- numMemChunksDesc s.maintainSeriesDuration.Describe(ch) + ch <- s.persistenceUrgencyScore.Desc() + ch <- s.rushedMode.Desc() } // Collect implements prometheus.Collector. @@ -1282,4 +1299,6 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) { float64(atomic.LoadInt64(&numMemChunks)), ) s.maintainSeriesDuration.Collect(ch) + ch <- s.persistenceUrgencyScore + ch <- s.rushedMode }