diff --git a/storage/local/storage.go b/storage/local/storage.go index 4e75f5f8d7..3afc9fb86d 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -17,6 +17,7 @@ package local import ( "container/list" "fmt" + "math" "sync/atomic" "time" @@ -38,11 +39,28 @@ const ( // See waitForNextFP. maxEvictInterval = time.Minute - // If numChunskToPersist is this percentage of maxChunksToPersist, we - // consider the storage in "graceful degradation mode", i.e. we do not - // checkpoint anymore based on the dirty series count, and we do not - // sync series files anymore if using the adaptive sync strategy. - percentChunksToPersistForDegradation = 80 + // Constants to control the hysteresis of entering and leaving "rushed + // mode". In rushed mode, the dirty series count is ignored for + // checkpointing, and series files are not synced if the adaptive sync + // strategy is used. + // + // If we reach 80% of -storage.local.max-chunks-to-persist, we enter + // "rushed mode". + factorChunksToPersistForEnteringRushedMode = 0.8 + // To leave "rushed mode", we must be below 70% of + // -storage.local.max-chunks-to-persist. + factorChunksToPersistForLeavingRushedMode = 0.7 + // To enter "rushed mode" for other reasons (see below), we must have at + // least 30% of -storage.local.max-chunks-to-persist. + factorMinChunksToPersistToAllowRushedMode = 0.3 + // If the number of chunks in memory reaches 110% of + // -storage.local.memory-chunks, we will enter "rushed mode" (provided + // we have enough chunks to persist at all, see + // factorMinChunksToPersistToAllowRushedMode.) + factorMemChunksForEnteringRushedMode = 1.1 + // To leave "rushed mode", we must be below 105% of + // -storage.local.memory-chunks. + factorMemChunksForLeavingRushedMode = 1.05 ) var ( @@ -110,7 +128,7 @@ type memorySeriesStorage struct { // numChunksToPersist has to be aligned for atomic operations. numChunksToPersist int64 // The number of chunks waiting for persistence. maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will stall. - degraded bool + rushed bool // Whether the storage is in rushed mode. fpLocker *fingerprintLocker fpToSeries *seriesMap @@ -238,7 +256,7 @@ func (s *memorySeriesStorage) Start() (err error) { case Always: syncStrategy = func() bool { return true } case Adaptive: - syncStrategy = func() bool { return !s.isDegraded() } + syncStrategy = func() bool { return !s.inRushedMode() } default: panic("unknown sync strategy") } @@ -898,9 +916,8 @@ loop: // would be counterproductive, as it would slow down chunk persisting even more, // while in a situation like that, where we are clearly lacking speed of disk // maintenance, the best we can do for crash recovery is to persist chunks as - // quickly as possible. So only checkpoint if the storage is not in "graceful - // degradation mode". - if dirtySeriesCount >= s.checkpointDirtySeriesLimit && !s.isDegraded() { + // quickly as possible. So only checkpoint if the storage is not in "rushed mode". + if dirtySeriesCount >= s.checkpointDirtySeriesLimit && !s.inRushedMode() { checkpointTimer.Reset(0) } } @@ -1144,37 +1161,80 @@ func (s *memorySeriesStorage) incNumChunksToPersist(by int) { atomic.AddInt64(&s.numChunksToPersist, int64(by)) } -// isDegraded returns whether the storage is in "graceful degradation mode", -// which is the case if the number of chunks waiting for persistence has reached -// a percentage of maxChunksToPersist that exceeds -// percentChunksToPersistForDegradation. The method is not goroutine safe (but -// only ever called from the goroutine dealing with series maintenance). -// Changes of degradation mode are logged. -func (s *memorySeriesStorage) isDegraded() bool { - nowDegraded := s.getNumChunksToPersist() > s.maxChunksToPersist*percentChunksToPersistForDegradation/100 - if s.degraded && !nowDegraded { - log.Warn("Storage has left graceful degradation mode. Things are back to normal.") - } else if !s.degraded && nowDegraded { - log.Warnf( - "%d chunks waiting for persistence (%d%% of the allowed maximum %d). Storage is now in graceful degradation mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible.", - s.getNumChunksToPersist(), - s.getNumChunksToPersist()*100/s.maxChunksToPersist, - s.maxChunksToPersist, - s.checkpointInterval) +// inRushedMode returns whether the storage is in "rushed mode", which is the +// case if there are too many chunks waiting for persistence or there are too +// many chunks in memory. The method is not goroutine safe (but only ever called +// from the goroutine dealing with series maintenance). Changes of degradation +// mode are logged. +func (s *memorySeriesStorage) inRushedMode() bool { + chunksToPersist := float64(s.getNumChunksToPersist()) + memChunks := float64(atomic.LoadInt64(&numMemChunks)) + + if s.rushed { + // We are already in rushed mode, so check if we can get out of + // it, using the lower hysteresis thresholds. + s.rushed = chunksToPersist > float64(s.maxChunksToPersist)*factorChunksToPersistForLeavingRushedMode || + memChunks > float64(s.maxMemoryChunks)*factorMemChunksForLeavingRushedMode + if !s.rushed { + log.Warn("Storage has left rushed mode. Things are back to normal.") + } + return s.rushed } - s.degraded = nowDegraded - return s.degraded + // We are not rushed yet, so check the higher hysteresis threshold if we enter it now. + // First WRT chunksToPersist... + s.rushed = chunksToPersist > float64(s.maxChunksToPersist)*factorChunksToPersistForEnteringRushedMode + if s.rushed { + log.Warnf( + "%.0f chunks waiting for persistence (%.1f%% of the allowed maximum %d). Storage is now in rushed mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible.", + chunksToPersist, + chunksToPersist*100/float64(s.maxChunksToPersist), + s.maxChunksToPersist, + s.checkpointInterval, + ) + return true + } + // ...then WRT memChunks. + s.rushed = memChunks > float64(s.maxMemoryChunks)*factorMemChunksForEnteringRushedMode && + chunksToPersist > float64(s.maxChunksToPersist)*factorMinChunksToPersistToAllowRushedMode + if s.rushed { + log.Warnf( + "%.0f chunks in memory (%.1f%% of the allowed maximum %d). Storage is now in rushed mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible.", + memChunks, + memChunks*100/float64(s.maxMemoryChunks), + s.maxMemoryChunks, + s.checkpointInterval, + ) + } + return s.rushed } -// persistenceBacklogScore works similar to isDegraded, but returns a score +// persistenceBacklogScore works similar to inRushedMode, but returns a score // about how close we are to degradation. This score is 1.0 if no chunks are -// waiting for persistence and 0.0 if we are at or above the degradation -// threshold. +// waiting for persistence or we are not over the threshold for memory chunks, +// and 0.0 if we are at or above the thresholds. However, the score is always 0 +// if the storage is currently in rushed mode. (Getting out of it has a +// hysteresis, so we might be below thresholds again but still in rushed mode.) func (s *memorySeriesStorage) persistenceBacklogScore() float64 { - score := 1 - float64(s.getNumChunksToPersist())/float64(s.maxChunksToPersist*percentChunksToPersistForDegradation/100) + if s.inRushedMode() { + return 0 + } + + chunksToPersist := float64(s.getNumChunksToPersist()) + score := 1 - chunksToPersist/(float64(s.maxChunksToPersist)*factorChunksToPersistForEnteringRushedMode) + + if chunksToPersist > float64(s.maxChunksToPersist)*factorMinChunksToPersistToAllowRushedMode { + memChunks := float64(atomic.LoadInt64(&numMemChunks)) + score = math.Min( + score, + 1-(memChunks/float64(s.maxMemoryChunks)-1)/(factorMemChunksForEnteringRushedMode-1), + ) + } if score < 0 { return 0 } + if score > 1 { + return 1 + } return score }