mirror of
https://github.com/prometheus/prometheus
synced 2025-01-28 10:23:18 +00:00
Merge pull request #9274 from bboreham/improve-rw-sharding
remote_write: shard up more when backlogged
This commit is contained in:
commit
eb7c10dbbd
@ -831,14 +831,12 @@ func (t *QueueManager) calculateDesiredShards() int {
|
|||||||
return t.numShards
|
return t.numShards
|
||||||
}
|
}
|
||||||
|
|
||||||
// When behind we will try to catch up on a proporation of samples per tick.
|
|
||||||
// This works similarly to an integral accumulator in that pending samples
|
|
||||||
// is the result of the error integral.
|
|
||||||
const integralGain = 0.1 / float64(shardUpdateDuration/time.Second)
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
// When behind we will try to catch up on 5% of samples per second.
|
||||||
|
backlogCatchup = 0.05 * dataPending
|
||||||
|
// Calculate Time to send one sample, averaged across all sends done this tick.
|
||||||
timePerSample = dataOutDuration / dataOutRate
|
timePerSample = dataOutDuration / dataOutRate
|
||||||
desiredShards = timePerSample * (dataInRate*dataKeptRatio + integralGain*dataPending)
|
desiredShards = timePerSample * (dataInRate*dataKeptRatio + backlogCatchup)
|
||||||
)
|
)
|
||||||
t.metrics.desiredNumShards.Set(desiredShards)
|
t.metrics.desiredNumShards.Set(desiredShards)
|
||||||
level.Debug(t.logger).Log("msg", "QueueManager.calculateDesiredShards",
|
level.Debug(t.logger).Log("msg", "QueueManager.calculateDesiredShards",
|
||||||
@ -861,11 +859,13 @@ func (t *QueueManager) calculateDesiredShards() int {
|
|||||||
)
|
)
|
||||||
level.Debug(t.logger).Log("msg", "QueueManager.updateShardsLoop",
|
level.Debug(t.logger).Log("msg", "QueueManager.updateShardsLoop",
|
||||||
"lowerBound", lowerBound, "desiredShards", desiredShards, "upperBound", upperBound)
|
"lowerBound", lowerBound, "desiredShards", desiredShards, "upperBound", upperBound)
|
||||||
|
|
||||||
|
desiredShards = math.Ceil(desiredShards) // Round up to be on the safe side.
|
||||||
if lowerBound <= desiredShards && desiredShards <= upperBound {
|
if lowerBound <= desiredShards && desiredShards <= upperBound {
|
||||||
return t.numShards
|
return t.numShards
|
||||||
}
|
}
|
||||||
|
|
||||||
numShards := int(math.Ceil(desiredShards))
|
numShards := int(desiredShards)
|
||||||
// Do not downshard if we are more than ten seconds back.
|
// Do not downshard if we are more than ten seconds back.
|
||||||
if numShards < t.numShards && delay > 10.0 {
|
if numShards < t.numShards && delay > 10.0 {
|
||||||
level.Debug(t.logger).Log("msg", "Not downsharding due to being too far behind")
|
level.Debug(t.logger).Log("msg", "Not downsharding due to being too far behind")
|
||||||
|
@ -908,6 +908,189 @@ func TestCalculateDesiredShards(t *testing.T) {
|
|||||||
require.Equal(t, int64(0), pendingSamples, "Remote write never caught up, there are still %d pending samples.", pendingSamples)
|
require.Equal(t, int64(0), pendingSamples, "Remote write never caught up, there are still %d pending samples.", pendingSamples)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCalculateDesiredShardsDetail(t *testing.T) {
|
||||||
|
c := NewTestWriteClient()
|
||||||
|
cfg := config.DefaultQueueConfig
|
||||||
|
mcfg := config.DefaultMetadataConfig
|
||||||
|
|
||||||
|
dir, err := ioutil.TempDir("", "TestCalculateDesiredShards")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, os.RemoveAll(dir))
|
||||||
|
}()
|
||||||
|
|
||||||
|
metrics := newQueueManagerMetrics(nil, "", "")
|
||||||
|
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
|
||||||
|
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
||||||
|
|
||||||
|
for _, tc := range []struct {
|
||||||
|
name string
|
||||||
|
prevShards int
|
||||||
|
dataIn int64 // Quantities normalised to seconds.
|
||||||
|
dataOut int64
|
||||||
|
dataDropped int64
|
||||||
|
dataOutDuration float64
|
||||||
|
backlog float64
|
||||||
|
expectedShards int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "nothing in or out 1",
|
||||||
|
prevShards: 1,
|
||||||
|
expectedShards: 1, // Shards stays the same.
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "nothing in or out 10",
|
||||||
|
prevShards: 10,
|
||||||
|
expectedShards: 10, // Shards stays the same.
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "steady throughput",
|
||||||
|
prevShards: 1,
|
||||||
|
dataIn: 10,
|
||||||
|
dataOut: 10,
|
||||||
|
dataOutDuration: 1,
|
||||||
|
expectedShards: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "scale down",
|
||||||
|
prevShards: 10,
|
||||||
|
dataIn: 10,
|
||||||
|
dataOut: 10,
|
||||||
|
dataOutDuration: 5,
|
||||||
|
expectedShards: 5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "scale down constrained",
|
||||||
|
prevShards: 7,
|
||||||
|
dataIn: 10,
|
||||||
|
dataOut: 10,
|
||||||
|
dataOutDuration: 5,
|
||||||
|
expectedShards: 7,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "scale up",
|
||||||
|
prevShards: 1,
|
||||||
|
dataIn: 10,
|
||||||
|
dataOut: 10,
|
||||||
|
dataOutDuration: 10,
|
||||||
|
expectedShards: 10,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "scale up constrained",
|
||||||
|
prevShards: 8,
|
||||||
|
dataIn: 10,
|
||||||
|
dataOut: 10,
|
||||||
|
dataOutDuration: 10,
|
||||||
|
expectedShards: 8,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "backlogged 20s",
|
||||||
|
prevShards: 2,
|
||||||
|
dataIn: 10,
|
||||||
|
dataOut: 10,
|
||||||
|
dataOutDuration: 2,
|
||||||
|
backlog: 20,
|
||||||
|
expectedShards: 4,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "backlogged 90s",
|
||||||
|
prevShards: 4,
|
||||||
|
dataIn: 10,
|
||||||
|
dataOut: 10,
|
||||||
|
dataOutDuration: 4,
|
||||||
|
backlog: 90,
|
||||||
|
expectedShards: 22,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "backlog reduced",
|
||||||
|
prevShards: 22,
|
||||||
|
dataIn: 10,
|
||||||
|
dataOut: 20,
|
||||||
|
dataOutDuration: 4,
|
||||||
|
backlog: 10,
|
||||||
|
expectedShards: 3,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "backlog eliminated",
|
||||||
|
prevShards: 3,
|
||||||
|
dataIn: 10,
|
||||||
|
dataOut: 10,
|
||||||
|
dataOutDuration: 2,
|
||||||
|
backlog: 0,
|
||||||
|
expectedShards: 2, // Shard back down.
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "slight slowdown",
|
||||||
|
prevShards: 1,
|
||||||
|
dataIn: 10,
|
||||||
|
dataOut: 10,
|
||||||
|
dataOutDuration: 1.2,
|
||||||
|
expectedShards: 2, // 1.2 is rounded up to 2.
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "bigger slowdown",
|
||||||
|
prevShards: 1,
|
||||||
|
dataIn: 10,
|
||||||
|
dataOut: 10,
|
||||||
|
dataOutDuration: 1.4,
|
||||||
|
expectedShards: 2,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "speed up",
|
||||||
|
prevShards: 2,
|
||||||
|
dataIn: 10,
|
||||||
|
dataOut: 10,
|
||||||
|
dataOutDuration: 1.2,
|
||||||
|
backlog: 0,
|
||||||
|
expectedShards: 2, // No reaction - 1.2 is rounded up to 2.
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "speed up more",
|
||||||
|
prevShards: 2,
|
||||||
|
dataIn: 10,
|
||||||
|
dataOut: 10,
|
||||||
|
dataOutDuration: 0.9,
|
||||||
|
backlog: 0,
|
||||||
|
expectedShards: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "marginal decision A",
|
||||||
|
prevShards: 3,
|
||||||
|
dataIn: 10,
|
||||||
|
dataOut: 10,
|
||||||
|
dataOutDuration: 2.01,
|
||||||
|
backlog: 0,
|
||||||
|
expectedShards: 3, // 2.01 rounds up to 3.
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "marginal decision B",
|
||||||
|
prevShards: 3,
|
||||||
|
dataIn: 10,
|
||||||
|
dataOut: 10,
|
||||||
|
dataOutDuration: 1.99,
|
||||||
|
backlog: 0,
|
||||||
|
expectedShards: 2, // 1.99 rounds up to 2.
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
m.numShards = tc.prevShards
|
||||||
|
forceEMWA(samplesIn, tc.dataIn*int64(shardUpdateDuration/time.Second))
|
||||||
|
samplesIn.tick()
|
||||||
|
forceEMWA(m.dataOut, tc.dataOut*int64(shardUpdateDuration/time.Second))
|
||||||
|
forceEMWA(m.dataDropped, tc.dataDropped*int64(shardUpdateDuration/time.Second))
|
||||||
|
forceEMWA(m.dataOutDuration, int64(tc.dataOutDuration*float64(shardUpdateDuration)))
|
||||||
|
m.highestRecvTimestamp.value = tc.backlog // Not Set() because it can only increase value.
|
||||||
|
|
||||||
|
require.Equal(t, tc.expectedShards, m.calculateDesiredShards())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func forceEMWA(r *ewmaRate, rate int64) {
|
||||||
|
r.init = false
|
||||||
|
r.newEvents.Store(rate)
|
||||||
|
}
|
||||||
|
|
||||||
func TestQueueManagerMetrics(t *testing.T) {
|
func TestQueueManagerMetrics(t *testing.T) {
|
||||||
reg := prometheus.NewPedanticRegistry()
|
reg := prometheus.NewPedanticRegistry()
|
||||||
metrics := newQueueManagerMetrics(reg, "name", "http://localhost:1234")
|
metrics := newQueueManagerMetrics(reg, "name", "http://localhost:1234")
|
||||||
|
Loading…
Reference in New Issue
Block a user