Only reduce the number of shards when caught up.

Signed-off-by: Chris Marchbanks <csmarchbanks@gmail.com>
This commit is contained in:
Chris Marchbanks 2020-01-02 10:30:56 -07:00
parent 9e24e1f9e8
commit 7f3aca62c4
1 changed files with 8 additions and 1 deletions

View File

@ -526,7 +526,8 @@ func (t *QueueManager) calculateDesiredShards() int {
samplesPendingRate = samplesInRate*samplesKeptRatio - samplesOutRate samplesPendingRate = samplesInRate*samplesKeptRatio - samplesOutRate
highestSent = t.highestSentTimestampMetric.Get() highestSent = t.highestSentTimestampMetric.Get()
highestRecv = highestTimestamp.Get() highestRecv = highestTimestamp.Get()
samplesPending = (highestRecv - highestSent) * samplesInRate * samplesKeptRatio delay = highestRecv - highestSent
samplesPending = delay * samplesInRate * samplesKeptRatio
) )
if samplesOutRate <= 0 { if samplesOutRate <= 0 {
@ -577,6 +578,12 @@ func (t *QueueManager) calculateDesiredShards() int {
} }
numShards := int(math.Ceil(desiredShards)) numShards := int(math.Ceil(desiredShards))
// Do not downshard if we are more than ten seconds back.
if numShards < t.numShards && delay > 10.0 {
level.Debug(t.logger).Log("msg", "Not downsharding due to being too far behind")
return t.numShards
}
if numShards > t.cfg.MaxShards { if numShards > t.cfg.MaxShards {
numShards = t.cfg.MaxShards numShards = t.cfg.MaxShards
} else if numShards < t.cfg.MinShards { } else if numShards < t.cfg.MinShards {