diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index a1c6020a4..3718561aa 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -33,24 +33,27 @@ const ( subsystem = "remote_storage" queue = "queue" - // With a maximum of 500 shards, assuming an average of 100ms remote write - // time and 100 samples per batch, we will be able to push 500k samples/s. - defaultMaxShards = 500 + // With a maximum of 1000 shards, assuming an average of 100ms remote write + // time and 100 samples per batch, we will be able to push 1M samples/s. + defaultMaxShards = 1000 defaultMaxSamplesPerSend = 100 - // defaultQueueCapacity is per shard - at 500 shards, this will buffer - // 50m samples. It is configured to buffer 1024 batches, which at 100ms + // defaultQueueCapacity is per shard - at 1000 shards, this will buffer + // 100M samples. It is configured to buffer 1000 batches, which at 100ms // per batch is 1:40mins. - defaultQueueCapacity = defaultMaxSamplesPerSend * 1024 + defaultQueueCapacity = defaultMaxSamplesPerSend * 1000 defaultBatchSendDeadline = 5 * time.Second // We track samples in/out and how long pushes take using an Exponentially // Weighted Moving Average. - ewmaWeight = 0.2 - shardUpdateDuration = 10 * time.Second - shardToleranceFraction = 0.3 // allow 30% too many shards before scaling down + ewmaWeight = 0.2 + shardUpdateDuration = 10 * time.Second - logRateLimit = 0.1 // Limit to 1 log event every 10s + // Allow 30% too many shards before scaling down. + shardToleranceFraction = 0.3 + + // Limit to 1 log event every 10s + logRateLimit = 0.1 logBurst = 10 ) @@ -114,7 +117,7 @@ var ( prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "shards_total", + Name: "shards", Help: "The number of shards used for parallel sending to the remote storage.", }, []string{queue}, @@ -196,14 +199,10 @@ func NewQueueManager(cfg QueueManagerConfig) *QueueManager { samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration), samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), } - t.shards = t.newShards(1) - numShards.WithLabelValues(t.queueName).Set(float64(1)) + t.shards = t.newShards(t.numShards) + numShards.WithLabelValues(t.queueName).Set(float64(t.numShards)) queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.QueueCapacity)) - t.wg.Add(2) - go t.updateShardsLoop() - go t.reshardLoop() - return t } @@ -253,6 +252,10 @@ func (*QueueManager) NeedsThrottling() bool { // Start the queue manager sending samples to the remote storage. // Does not block. func (t *QueueManager) Start() { + t.wg.Add(2) + go t.updateShardsLoop() + go t.reshardLoop() + t.shardsMtx.Lock() defer t.shardsMtx.Unlock() t.shards.start() @@ -264,6 +267,7 @@ func (t *QueueManager) Stop() { log.Infof("Stopping remote storage...") close(t.quit) t.wg.Wait() + t.shardsMtx.Lock() defer t.shardsMtx.Unlock() t.shards.stop() @@ -277,14 +281,14 @@ func (t *QueueManager) updateShardsLoop() { for { select { case <-ticker: - t.caclulateDesiredShards() + t.calculateDesiredShards() case <-t.quit: return } } } -func (t *QueueManager) caclulateDesiredShards() { +func (t *QueueManager) calculateDesiredShards() { t.samplesIn.tick() t.samplesOut.tick() t.samplesOutDuration.tick() @@ -292,7 +296,7 @@ func (t *QueueManager) caclulateDesiredShards() { // We use the number of incoming samples as a prediction of how much work we // will need to do next iteration. We add to this any pending samples // (received - send) so we can catch up with any backlog. We use the average - // outgoing batch latency to work out how how many shards we need. + // outgoing batch latency to work out how many shards we need. var ( samplesIn = t.samplesIn.rate() samplesOut = t.samplesOut.rate() @@ -314,7 +318,7 @@ func (t *QueueManager) caclulateDesiredShards() { log.Debugf("QueueManager.caclulateDesiredShards samplesIn=%f, samplesOut=%f, samplesPending=%f, desiredShards=%f", samplesIn, samplesOut, samplesPending, desiredShards) - // Changes in the number of shards must be greated than shardToleranceFraction. + // Changes in the number of shards must be greater than shardToleranceFraction. var ( lowerBound = float64(t.numShards) * (1. - shardToleranceFraction) upperBound = float64(t.numShards) * (1. + shardToleranceFraction)