mirror of
https://github.com/prometheus/prometheus
synced 2024-12-27 00:53:12 +00:00
storage/remote: disable resharding during active retry backoffs (#13562)
* storage/remote: disable resharding during active retry backoffs Today, remote_write reshards based on pure throughput. This is problematic if throughput has been diminished because of HTTP 429s; increasing the number of shards due to backpressure will only exacerbate the problem. This commit disables resharding for twice the retry backoff, ensuring that resharding will never occur during an active backoff, and that resharding does not become enabled again until enough time has elapsed to allow any pending requests to be retried. Signed-off-by: Robert Fratto <robertfratto@gmail.com> * storage/remote: test that resharding is disabled on retry Signed-off-by: Robert Fratto <robertfratto@gmail.com> * storage/remote: address review feedback Signed-off-by: Robert Fratto <robertfratto@gmail.com> * storage/remote: track time where resharding initially got disabled This change introduces a second atomic int64 to roughly track when resharding got disabled. This int64 is only updated after updating the disabled timestamp if resharding was previously enabled. Signed-off-by: Robert Fratto <robertfratto@gmail.com> --------- Signed-off-by: Robert Fratto <robertfratto@gmail.com>
This commit is contained in:
parent
e79b9ed2ab
commit
a09465baee
@ -396,8 +396,10 @@ type WriteClient interface {
|
||||
// indicated by the provided WriteClient. Implements writeTo interface
|
||||
// used by WAL Watcher.
|
||||
type QueueManager struct {
|
||||
lastSendTimestamp atomic.Int64
|
||||
buildRequestLimitTimestamp atomic.Int64
|
||||
lastSendTimestamp atomic.Int64
|
||||
buildRequestLimitTimestamp atomic.Int64
|
||||
reshardDisableStartTimestamp atomic.Int64 // Time that reshard was disabled.
|
||||
reshardDisableEndTimestamp atomic.Int64 // Time that reshard is disabled until.
|
||||
|
||||
logger log.Logger
|
||||
flushDeadline time.Duration
|
||||
@ -574,7 +576,7 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p
|
||||
retry := func() {
|
||||
t.metrics.retriedMetadataTotal.Add(float64(len(metadata)))
|
||||
}
|
||||
err = sendWriteRequestWithBackoff(ctx, t.cfg, t.logger, attemptStore, retry)
|
||||
err = t.sendWriteRequestWithBackoff(ctx, attemptStore, retry)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1021,6 +1023,13 @@ func (t *QueueManager) shouldReshard(desiredShards int) bool {
|
||||
level.Warn(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold", "lastSendTimestamp", lsts, "minSendTimestamp", minSendTimestamp)
|
||||
return false
|
||||
}
|
||||
if disableTimestamp := t.reshardDisableEndTimestamp.Load(); time.Now().Unix() < disableTimestamp {
|
||||
disabledAt := time.Unix(t.reshardDisableStartTimestamp.Load(), 0)
|
||||
disabledFor := time.Until(time.Unix(disableTimestamp, 0))
|
||||
|
||||
level.Warn(t.logger).Log("msg", "Skipping resharding, resharding is disabled while waiting for recoverable errors", "disabled_at", disabledAt, "disabled_for", disabledFor)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
@ -1622,7 +1631,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
|
||||
s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount))
|
||||
}
|
||||
|
||||
err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry)
|
||||
err = s.qm.sendWriteRequestWithBackoff(ctx, attemptStore, onRetry)
|
||||
if errors.Is(err, context.Canceled) {
|
||||
// When there is resharding, we cancel the context for this queue, which means the data is not sent.
|
||||
// So we exit early to not update the metrics.
|
||||
@ -1635,8 +1644,8 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
|
||||
return err
|
||||
}
|
||||
|
||||
func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error {
|
||||
backoff := cfg.MinBackoff
|
||||
func (t *QueueManager) sendWriteRequestWithBackoff(ctx context.Context, attempt func(int) error, onRetry func()) error {
|
||||
backoff := t.cfg.MinBackoff
|
||||
sleepDuration := model.Duration(0)
|
||||
try := 0
|
||||
|
||||
@ -1663,9 +1672,26 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l
|
||||
switch {
|
||||
case backoffErr.retryAfter > 0:
|
||||
sleepDuration = backoffErr.retryAfter
|
||||
level.Info(l).Log("msg", "Retrying after duration specified by Retry-After header", "duration", sleepDuration)
|
||||
level.Info(t.logger).Log("msg", "Retrying after duration specified by Retry-After header", "duration", sleepDuration)
|
||||
case backoffErr.retryAfter < 0:
|
||||
level.Debug(l).Log("msg", "retry-after cannot be in past, retrying using default backoff mechanism")
|
||||
level.Debug(t.logger).Log("msg", "retry-after cannot be in past, retrying using default backoff mechanism")
|
||||
}
|
||||
|
||||
// We should never reshard for a recoverable error; increasing shards could
|
||||
// make the problem worse, particularly if we're getting rate limited.
|
||||
//
|
||||
// reshardDisableTimestamp holds the unix timestamp until which resharding
|
||||
// is diableld. We'll update that timestamp if the period we were just told
|
||||
// to sleep for is newer than the existing disabled timestamp.
|
||||
reshardWaitPeriod := time.Now().Add(time.Duration(sleepDuration) * 2)
|
||||
if oldTS, updated := setAtomicToNewer(&t.reshardDisableEndTimestamp, reshardWaitPeriod.Unix()); updated {
|
||||
// If the old timestamp was in the past, then resharding was previously
|
||||
// enabled. We want to track the time where it initially got disabled for
|
||||
// logging purposes.
|
||||
disableTime := time.Now().Unix()
|
||||
if oldTS < disableTime {
|
||||
t.reshardDisableStartTimestamp.Store(disableTime)
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
@ -1675,18 +1701,38 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l
|
||||
|
||||
// If we make it this far, we've encountered a recoverable error and will retry.
|
||||
onRetry()
|
||||
level.Warn(l).Log("msg", "Failed to send batch, retrying", "err", err)
|
||||
level.Warn(t.logger).Log("msg", "Failed to send batch, retrying", "err", err)
|
||||
|
||||
backoff = sleepDuration * 2
|
||||
|
||||
if backoff > cfg.MaxBackoff {
|
||||
backoff = cfg.MaxBackoff
|
||||
if backoff > t.cfg.MaxBackoff {
|
||||
backoff = t.cfg.MaxBackoff
|
||||
}
|
||||
|
||||
try++
|
||||
}
|
||||
}
|
||||
|
||||
// setAtomicToNewer atomically sets a value to the newer int64 between itself
|
||||
// and the provided newValue argument. setAtomicToNewer returns whether the
|
||||
// atomic value was updated and what the previous value was.
|
||||
func setAtomicToNewer(value *atomic.Int64, newValue int64) (previous int64, updated bool) {
|
||||
for {
|
||||
current := value.Load()
|
||||
if current >= newValue {
|
||||
// If the current stored value is newer than newValue; abort.
|
||||
return current, false
|
||||
}
|
||||
|
||||
// Try to swap the value. If the atomic value has changed, we loop back to
|
||||
// the beginning until we've successfully swapped out the value or the
|
||||
// value stored in it is newer than newValue.
|
||||
if value.CompareAndSwap(current, newValue) {
|
||||
return current, true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) (int64, int64, []prompb.TimeSeries, int, int, int) {
|
||||
var highest int64
|
||||
var lowest int64
|
||||
|
@ -520,6 +520,69 @@ func TestShouldReshard(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestDisableReshardOnRetry asserts that resharding should be disabled when a
|
||||
// recoverable error is returned from remote_write.
|
||||
func TestDisableReshardOnRetry(t *testing.T) {
|
||||
onStoredContext, onStoreCalled := context.WithCancel(context.Background())
|
||||
defer onStoreCalled()
|
||||
|
||||
var (
|
||||
fakeSamples, fakeSeries = createTimeseries(100, 100)
|
||||
|
||||
cfg = config.DefaultQueueConfig
|
||||
mcfg = config.DefaultMetadataConfig
|
||||
retryAfter = time.Second
|
||||
|
||||
metrics = newQueueManagerMetrics(nil, "", "")
|
||||
|
||||
client = &MockWriteClient{
|
||||
StoreFunc: func(ctx context.Context, b []byte, i int) error {
|
||||
onStoreCalled()
|
||||
|
||||
return RecoverableError{
|
||||
error: fmt.Errorf("fake error"),
|
||||
retryAfter: model.Duration(retryAfter),
|
||||
}
|
||||
},
|
||||
NameFunc: func() string { return "mock" },
|
||||
EndpointFunc: func() string { return "http://fake:9090/api/v1/write" },
|
||||
}
|
||||
)
|
||||
|
||||
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false)
|
||||
m.StoreSeries(fakeSeries, 0)
|
||||
|
||||
// Attempt to samples while the manager is running. We immediately stop the
|
||||
// manager after the recoverable error is generated to prevent the manager
|
||||
// from resharding itself.
|
||||
m.Start()
|
||||
{
|
||||
m.Append(fakeSamples)
|
||||
|
||||
select {
|
||||
case <-onStoredContext.Done():
|
||||
case <-time.After(time.Minute):
|
||||
require.FailNow(t, "timed out waiting for client to be sent metrics")
|
||||
}
|
||||
}
|
||||
m.Stop()
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
// Force m.lastSendTimestamp to be current so the last send timestamp isn't
|
||||
// the reason resharding is disabled.
|
||||
m.lastSendTimestamp.Store(time.Now().Unix())
|
||||
return m.shouldReshard(m.numShards+1) == false
|
||||
}, time.Minute, 10*time.Millisecond, "shouldReshard was never disabled")
|
||||
|
||||
// After 2x retryAfter, resharding should be enabled again.
|
||||
require.Eventually(t, func() bool {
|
||||
// Force m.lastSendTimestamp to be current so the last send timestamp isn't
|
||||
// the reason resharding is disabled.
|
||||
m.lastSendTimestamp.Store(time.Now().Unix())
|
||||
return m.shouldReshard(m.numShards+1) == true
|
||||
}, time.Minute, retryAfter, "shouldReshard should have been re-enabled")
|
||||
}
|
||||
|
||||
func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSeries) {
|
||||
samples := make([]record.RefSample, 0, numSamples)
|
||||
series := make([]record.RefSeries, 0, numSeries)
|
||||
@ -844,6 +907,18 @@ func (c *NopWriteClient) Store(context.Context, []byte, int) error { return nil
|
||||
func (c *NopWriteClient) Name() string { return "nopwriteclient" }
|
||||
func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" }
|
||||
|
||||
type MockWriteClient struct {
|
||||
StoreFunc func(context.Context, []byte, int) error
|
||||
NameFunc func() string
|
||||
EndpointFunc func() string
|
||||
}
|
||||
|
||||
func (c *MockWriteClient) Store(ctx context.Context, bb []byte, n int) error {
|
||||
return c.StoreFunc(ctx, bb, n)
|
||||
}
|
||||
func (c *MockWriteClient) Name() string { return c.NameFunc() }
|
||||
func (c *MockWriteClient) Endpoint() string { return c.EndpointFunc() }
|
||||
|
||||
// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics.
|
||||
var extraLabels []labels.Label = []labels.Label{
|
||||
{Name: "kubernetes_io_arch", Value: "amd64"},
|
||||
|
Loading…
Reference in New Issue
Block a user