diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 71d34e0d3..799fdc9b4 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -214,8 +214,6 @@ type QueueManager struct { wg sync.WaitGroup samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate - integralAccumulator float64 - startedAt time.Time highestSentTimestampMetric *maxGauge pendingSamplesMetric prometheus.Gauge @@ -316,8 +314,6 @@ outer: // Start the queue manager sending samples to the remote storage. // Does not block. func (t *QueueManager) Start() { - t.startedAt = time.Now() - // Setup the QueueManagers metrics. We do this here rather than in the // constructor because of the ordering of creating Queue Managers's, stopping them, // and then starting new ones in storage/remote/storage.go ApplyConfig. @@ -530,26 +526,14 @@ func (t *QueueManager) calculateDesiredShards() int { samplesPendingRate = samplesInRate*samplesKeptRatio - samplesOutRate highestSent = t.highestSentTimestampMetric.Get() highestRecv = highestTimestamp.Get() - samplesPending = (highestRecv - highestSent) * samplesInRate * samplesKeptRatio + delay = highestRecv - highestSent + samplesPending = delay * samplesInRate * samplesKeptRatio ) if samplesOutRate <= 0 { return t.numShards } - // We use an integral accumulator, like in a PID, to help dampen - // oscillation. The accumulator will correct for any errors not accounted - // for in the desired shard calculation by adjusting for pending samples. - const integralGain = 0.2 - // Initialise the integral accumulator as the average rate of samples - // pending. This accounts for pending samples that were created while the - // WALWatcher starts up. - if t.integralAccumulator == 0 { - elapsed := time.Since(t.startedAt) / time.Second - t.integralAccumulator = integralGain * samplesPending / float64(elapsed) - } - t.integralAccumulator += samplesPendingRate * integralGain - // We shouldn't reshard if Prometheus hasn't been able to send to the // remote endpoint successfully within some period of time. minSendTimestamp := time.Now().Add(-2 * time.Duration(t.cfg.BatchSendDeadline)).Unix() @@ -559,9 +543,14 @@ func (t *QueueManager) calculateDesiredShards() int { return t.numShards } + // When behind we will try to catch up on a proporation of samples per tick. + // This works similarly to an integral accumulator in that pending samples + // is the result of the error integral. + const integralGain = 0.1 / float64(shardUpdateDuration/time.Second) + var ( timePerSample = samplesOutDuration / samplesOutRate - desiredShards = timePerSample * (samplesInRate + t.integralAccumulator) + desiredShards = timePerSample * (samplesInRate*samplesKeptRatio + integralGain*samplesPending) ) t.desiredNumShards.Set(desiredShards) level.Debug(t.logger).Log("msg", "QueueManager.calculateDesiredShards", @@ -575,7 +564,6 @@ func (t *QueueManager) calculateDesiredShards() int { "desiredShards", desiredShards, "highestSent", highestSent, "highestRecv", highestRecv, - "integralAccumulator", t.integralAccumulator, ) // Changes in the number of shards must be greater than shardToleranceFraction. @@ -590,6 +578,12 @@ func (t *QueueManager) calculateDesiredShards() int { } numShards := int(math.Ceil(desiredShards)) + // Do not downshard if we are more than ten seconds back. + if numShards < t.numShards && delay > 10.0 { + level.Debug(t.logger).Log("msg", "Not downsharding due to being too far behind") + return t.numShards + } + if numShards > t.cfg.MaxShards { numShards = t.cfg.MaxShards } else if numShards < t.cfg.MinShards { diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index e012ee398..82206f374 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -577,3 +577,80 @@ func TestProcessExternalLabels(t *testing.T) { testutil.Equals(t, tc.expected, processExternalLabels(tc.labels, tc.externalLabels)) } } + +func TestCalculateDesiredShards(t *testing.T) { + c := NewTestStorageClient() + cfg := config.DefaultQueueConfig + + dir, err := ioutil.TempDir("", "TestCalculateDesiredShards") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) + m := NewQueueManager(nil, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline) + + // Need to start the queue manager so the proper metrics are initialized. + // However we can stop it right away since we don't need to do any actual + // processing. + m.Start() + m.Stop() + + inputRate := int64(50000) + var pendingSamples int64 + + // Two minute startup, no samples are sent. + startedAt := time.Now().Add(-2 * time.Minute) + + // helper function for adding samples. + addSamples := func(s int64, ts time.Duration) { + pendingSamples += s + samplesIn.incr(s) + samplesIn.tick() + + highestTimestamp.Set(float64(startedAt.Add(ts).Unix())) + } + + // helper function for sending samples. + sendSamples := func(s int64, ts time.Duration) { + pendingSamples -= s + m.samplesOut.incr(s) + m.samplesOutDuration.incr(int64(m.numShards) * int64(shardUpdateDuration)) + + // highest sent is how far back pending samples would be at our input rate. + highestSent := startedAt.Add(ts - time.Duration(pendingSamples/inputRate)*time.Second) + m.highestSentTimestampMetric.Set(float64(highestSent.Unix())) + + atomic.StoreInt64(&m.lastSendTimestamp, time.Now().Unix()) + } + + ts := time.Duration(0) + for ; ts < 120*time.Second; ts += shardUpdateDuration { + addSamples(inputRate*int64(shardUpdateDuration/time.Second), ts) + m.numShards = m.calculateDesiredShards() + testutil.Equals(t, 1, m.numShards) + } + + // Assume 100ms per request, or 10 requests per second per shard. + // Shard calculation should never drop below barely keeping up. + minShards := int(inputRate) / cfg.MaxSamplesPerSend / 10 + // This test should never go above 200 shards, that would be more resources than needed. + maxShards := 200 + + for ; ts < 15*time.Minute; ts += shardUpdateDuration { + sin := inputRate * int64(shardUpdateDuration/time.Second) + addSamples(sin, ts) + + sout := int64(m.numShards*cfg.MaxSamplesPerSend) * int64(shardUpdateDuration/(100*time.Millisecond)) + // You can't send samples that don't exist so cap at the number of pending samples. + if sout > pendingSamples { + sout = pendingSamples + } + sendSamples(sout, ts) + + t.Log("desiredShards", m.numShards, "pendingSamples", pendingSamples) + m.numShards = m.calculateDesiredShards() + testutil.Assert(t, m.numShards >= minShards, "Shards are too low. desiredShards=%d, minShards=%d, t_seconds=%d", m.numShards, minShards, ts/time.Second) + testutil.Assert(t, m.numShards <= maxShards, "Shards are too high. desiredShards=%d, maxShards=%d, t_seconds=%d", m.numShards, maxShards, ts/time.Second) + } + testutil.Assert(t, pendingSamples == 0, "Remote write never caught up, there are still %d pending samples.", pendingSamples) +}