From 847c66a843453474910fb65b563cde278cc39a7b Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Tue, 24 Dec 2019 08:59:50 -0700 Subject: [PATCH 1/3] Add sharding test Signed-off-by: Chris Marchbanks --- storage/remote/queue_manager_test.go | 78 ++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index e012ee398..68a32ff04 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -577,3 +577,81 @@ func TestProcessExternalLabels(t *testing.T) { testutil.Equals(t, tc.expected, processExternalLabels(tc.labels, tc.externalLabels)) } } + +func TestCalculateDesiredShards(t *testing.T) { + c := NewTestStorageClient() + cfg := config.DefaultQueueConfig + + dir, err := ioutil.TempDir("", "TestCalculateDesiredShards") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) + m := NewQueueManager(nil, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline) + + // Need to start the queue manager so the proper metrics are initialized. + // However we can stop it right away since we don't need to do any actual + // processing. + m.Start() + m.Stop() + + inputRate := int64(50000) + var pendingSamples int64 + + // Two minute startup, no samples are sent. + startedAt := time.Now().Add(-2 * time.Minute) + + // helper function for adding samples. + addSamples := func(s int64, ts time.Duration) { + pendingSamples += s + samplesIn.incr(s) + samplesIn.tick() + + highestTimestamp.Set(float64(startedAt.Add(ts).Unix())) + } + + // helper function for sending samples. + sendSamples := func(s int64, ts time.Duration) { + pendingSamples -= s + m.samplesOut.incr(s) + m.samplesOutDuration.incr(int64(m.numShards) * int64(shardUpdateDuration)) + + // highest sent is how far back pending samples would be at our input rate. + highestSent := startedAt.Add(ts - time.Duration(pendingSamples/inputRate)*time.Second) + m.highestSentTimestampMetric.Set(float64(highestSent.Unix())) + + atomic.StoreInt64(&m.lastSendTimestamp, time.Now().Unix()) + } + + ts := time.Duration(0) + m.startedAt = startedAt + for ; ts < 120*time.Second; ts += shardUpdateDuration { + addSamples(inputRate*int64(shardUpdateDuration/time.Second), ts) + m.numShards = m.calculateDesiredShards() + testutil.Equals(t, 1, m.numShards) + } + + // Assume 100ms per request, or 10 requests per second per shard. + // Shard calculation should never drop below barely keeping up. + minShards := int(inputRate) / cfg.MaxSamplesPerSend / 10 + // This test should never go above 200 shards, that would be more resources than needed. + maxShards := 200 + + for ; ts < 15*time.Minute; ts += shardUpdateDuration { + sin := inputRate * int64(shardUpdateDuration/time.Second) + addSamples(sin, ts) + + sout := int64(m.numShards*cfg.MaxSamplesPerSend) * int64(shardUpdateDuration/(100*time.Millisecond)) + // You can't send samples that don't exist so cap at the number of pending samples. + if sout > pendingSamples { + sout = pendingSamples + } + sendSamples(sout, ts) + + t.Log("desiredShards", m.numShards, "pendingSamples", pendingSamples) + m.numShards = m.calculateDesiredShards() + testutil.Assert(t, m.numShards >= minShards, "Shards are too low. desiredShards=%d, minShards=%d, t_seconds=%d", m.numShards, minShards, ts/time.Second) + testutil.Assert(t, m.numShards <= maxShards, "Shards are too high. desiredShards=%d, maxShards=%d, t_seconds=%d", m.numShards, maxShards, ts/time.Second) + } + testutil.Assert(t, pendingSamples == 0, "Remote write never caught up, there are still %d pending samples.", pendingSamples) +} From 9e24e1f9e8ea069de30cd576416d95e3e0b9727a Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Mon, 23 Dec 2019 11:03:54 -0700 Subject: [PATCH 2/3] Use samplesPending rather than integral The integral accumulator in the remote write sharding code is just a second way of keeping track of the number of samples pending. Remove integralAccumulator and use the samplesPending value we already calculate to calculate the number of shards. This has the added benefit of fixing a bug where the integralAccumulator was not being initialized correctly due to not taking into account the number of ticks being counted, causing the integralAccumulator initial value to be off by an order of magnitude in some cases. Signed-off-by: Chris Marchbanks --- storage/remote/queue_manager.go | 25 ++++++------------------- storage/remote/queue_manager_test.go | 1 - 2 files changed, 6 insertions(+), 20 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 71d34e0d3..c3d97430d 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -214,8 +214,6 @@ type QueueManager struct { wg sync.WaitGroup samplesIn, samplesDropped, samplesOut, samplesOutDuration *ewmaRate - integralAccumulator float64 - startedAt time.Time highestSentTimestampMetric *maxGauge pendingSamplesMetric prometheus.Gauge @@ -316,8 +314,6 @@ outer: // Start the queue manager sending samples to the remote storage. // Does not block. func (t *QueueManager) Start() { - t.startedAt = time.Now() - // Setup the QueueManagers metrics. We do this here rather than in the // constructor because of the ordering of creating Queue Managers's, stopping them, // and then starting new ones in storage/remote/storage.go ApplyConfig. @@ -537,19 +533,6 @@ func (t *QueueManager) calculateDesiredShards() int { return t.numShards } - // We use an integral accumulator, like in a PID, to help dampen - // oscillation. The accumulator will correct for any errors not accounted - // for in the desired shard calculation by adjusting for pending samples. - const integralGain = 0.2 - // Initialise the integral accumulator as the average rate of samples - // pending. This accounts for pending samples that were created while the - // WALWatcher starts up. - if t.integralAccumulator == 0 { - elapsed := time.Since(t.startedAt) / time.Second - t.integralAccumulator = integralGain * samplesPending / float64(elapsed) - } - t.integralAccumulator += samplesPendingRate * integralGain - // We shouldn't reshard if Prometheus hasn't been able to send to the // remote endpoint successfully within some period of time. minSendTimestamp := time.Now().Add(-2 * time.Duration(t.cfg.BatchSendDeadline)).Unix() @@ -559,9 +542,14 @@ func (t *QueueManager) calculateDesiredShards() int { return t.numShards } + // When behind we will try to catch up on a proporation of samples per tick. + // This works similarly to an integral accumulator in that pending samples + // is the result of the error integral. + const integralGain = 0.1 / float64(shardUpdateDuration/time.Second) + var ( timePerSample = samplesOutDuration / samplesOutRate - desiredShards = timePerSample * (samplesInRate + t.integralAccumulator) + desiredShards = timePerSample * (samplesInRate*samplesKeptRatio + integralGain*samplesPending) ) t.desiredNumShards.Set(desiredShards) level.Debug(t.logger).Log("msg", "QueueManager.calculateDesiredShards", @@ -575,7 +563,6 @@ func (t *QueueManager) calculateDesiredShards() int { "desiredShards", desiredShards, "highestSent", highestSent, "highestRecv", highestRecv, - "integralAccumulator", t.integralAccumulator, ) // Changes in the number of shards must be greater than shardToleranceFraction. diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 68a32ff04..82206f374 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -624,7 +624,6 @@ func TestCalculateDesiredShards(t *testing.T) { } ts := time.Duration(0) - m.startedAt = startedAt for ; ts < 120*time.Second; ts += shardUpdateDuration { addSamples(inputRate*int64(shardUpdateDuration/time.Second), ts) m.numShards = m.calculateDesiredShards() From 7f3aca62c4178649fe271cf31fc4652a3ef7e4b4 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Thu, 2 Jan 2020 10:30:56 -0700 Subject: [PATCH 3/3] Only reduce the number of shards when caught up. Signed-off-by: Chris Marchbanks --- storage/remote/queue_manager.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index c3d97430d..799fdc9b4 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -526,7 +526,8 @@ func (t *QueueManager) calculateDesiredShards() int { samplesPendingRate = samplesInRate*samplesKeptRatio - samplesOutRate highestSent = t.highestSentTimestampMetric.Get() highestRecv = highestTimestamp.Get() - samplesPending = (highestRecv - highestSent) * samplesInRate * samplesKeptRatio + delay = highestRecv - highestSent + samplesPending = delay * samplesInRate * samplesKeptRatio ) if samplesOutRate <= 0 { @@ -577,6 +578,12 @@ func (t *QueueManager) calculateDesiredShards() int { } numShards := int(math.Ceil(desiredShards)) + // Do not downshard if we are more than ten seconds back. + if numShards < t.numShards && delay > 10.0 { + level.Debug(t.logger).Log("msg", "Not downsharding due to being too far behind") + return t.numShards + } + if numShards > t.cfg.MaxShards { numShards = t.cfg.MaxShards } else if numShards < t.cfg.MinShards {