diff --git a/storage/remote/client.go b/storage/remote/client.go index 77694180f..98acf9633 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -60,6 +60,10 @@ func NewClient(index int, conf *clientConfig) (*Client, error) { }, nil } +type recoverableError struct { + error +} + // Store sends a batch of samples to the HTTP endpoint. func (c *Client) Store(samples model.Samples) error { req := &WriteRequest{ @@ -97,6 +101,8 @@ func (c *Client) Store(samples model.Samples) error { httpReq, err := http.NewRequest("POST", c.url.String(), &buf) if err != nil { + // Errors from NewRequest are from unparseable URLs, so are not + // recoverable. return err } httpReq.Header.Add("Content-Encoding", "snappy") @@ -108,11 +114,17 @@ func (c *Client) Store(samples model.Samples) error { httpResp, err := ctxhttp.Do(ctx, c.client, httpReq) if err != nil { - return err + // Errors from client.Do are from (for example) network errors, so are + // recoverable. + return recoverableError{err} } defer httpResp.Body.Close() + if httpResp.StatusCode/100 != 2 { - return fmt.Errorf("server returned HTTP status %s", httpResp.Status) + err = fmt.Errorf("server returned HTTP status %s", httpResp.Status) + } + if httpResp.StatusCode/100 == 5 { + return recoverableError{err} } return nil } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 3718561aa..ad462d176 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -33,17 +33,6 @@ const ( subsystem = "remote_storage" queue = "queue" - // With a maximum of 1000 shards, assuming an average of 100ms remote write - // time and 100 samples per batch, we will be able to push 1M samples/s. - defaultMaxShards = 1000 - defaultMaxSamplesPerSend = 100 - - // defaultQueueCapacity is per shard - at 1000 shards, this will buffer - // 100M samples. It is configured to buffer 1000 batches, which at 100ms - // per batch is 1:40mins. - defaultQueueCapacity = defaultMaxSamplesPerSend * 1000 - defaultBatchSendDeadline = 5 * time.Second - // We track samples in/out and how long pushes take using an Exponentially // Weighted Moving Average. ewmaWeight = 0.2 @@ -58,12 +47,12 @@ const ( ) var ( - sentSamplesTotal = prometheus.NewCounterVec( + succeededSamplesTotal = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "sent_samples_total", - Help: "Total number of processed samples sent to remote storage.", + Name: "succeeded_samples_total", + Help: "Total number of samples successfully sent to remote storage.", }, []string{queue}, ) @@ -72,7 +61,7 @@ var ( Namespace: namespace, Subsystem: subsystem, Name: "failed_samples_total", - Help: "Total number of processed samples which failed on send to remote storage.", + Help: "Total number of samples which failed on send to remote storage.", }, []string{queue}, ) @@ -125,7 +114,7 @@ var ( ) func init() { - prometheus.MustRegister(sentSamplesTotal) + prometheus.MustRegister(succeededSamplesTotal) prometheus.MustRegister(failedSamplesTotal) prometheus.MustRegister(droppedSamplesTotal) prometheus.MustRegister(sentBatchDuration) @@ -134,6 +123,42 @@ func init() { prometheus.MustRegister(numShards) } +// QueueManagerConfig is the configuration for the queue used to write to remote +// storage. +type QueueManagerConfig struct { + // Number of samples to buffer per shard before we start dropping them. + QueueCapacity int + // Max number of shards, i.e. amount of concurrency. + MaxShards int + // Maximum number of samples per send. + MaxSamplesPerSend int + // Maximum time sample will wait in buffer. + BatchSendDeadline time.Duration + // Max number of times to retry a batch on recoverable errors. + MaxRetries int + // On recoverable errors, backoff exponentially. + MinBackoff time.Duration + MaxBackoff time.Duration +} + +// defaultQueueManagerConfig is the default remote queue configuration. +var defaultQueueManagerConfig = QueueManagerConfig{ + // With a maximum of 1000 shards, assuming an average of 100ms remote write + // time and 100 samples per batch, we will be able to push 1M samples/s. + MaxShards: 1000, + MaxSamplesPerSend: 100, + + // By default, buffer 1000 batches, which at 100ms per batch is 1:40mins. At + // 1000 shards, this will buffer 100M samples total. + QueueCapacity: 100 * 1000, + BatchSendDeadline: 5 * time.Second, + + // Max number of times to retry a batch on recoverable errors. + MaxRetries: 10, + MinBackoff: 30 * time.Millisecond, + MaxBackoff: 100 * time.Millisecond, +} + // StorageClient defines an interface for sending a batch of samples to an // external timeseries database. type StorageClient interface { @@ -143,23 +168,15 @@ type StorageClient interface { Name() string } -// QueueManagerConfig configures a storage queue. -type QueueManagerConfig struct { - QueueCapacity int // Number of samples to buffer per shard before we start dropping them. - MaxShards int // Max number of shards, i.e. amount of concurrency. - MaxSamplesPerSend int // Maximum number of samples per send. - BatchSendDeadline time.Duration // Maximum time sample will wait in buffer. - ExternalLabels model.LabelSet - RelabelConfigs []*config.RelabelConfig - Client StorageClient -} - // QueueManager manages a queue of samples to be sent to the Storage // indicated by the provided StorageClient. type QueueManager struct { - cfg QueueManagerConfig - queueName string - logLimiter *rate.Limiter + cfg QueueManagerConfig + externalLabels model.LabelSet + relabelConfigs []*config.RelabelConfig + client StorageClient + queueName string + logLimiter *rate.Limiter shardsMtx sync.Mutex shards *shards @@ -173,23 +190,14 @@ type QueueManager struct { } // NewQueueManager builds a new QueueManager. -func NewQueueManager(cfg QueueManagerConfig) *QueueManager { - if cfg.QueueCapacity == 0 { - cfg.QueueCapacity = defaultQueueCapacity - } - if cfg.MaxShards == 0 { - cfg.MaxShards = defaultMaxShards - } - if cfg.MaxSamplesPerSend == 0 { - cfg.MaxSamplesPerSend = defaultMaxSamplesPerSend - } - if cfg.BatchSendDeadline == 0 { - cfg.BatchSendDeadline = defaultBatchSendDeadline - } - +func NewQueueManager(cfg QueueManagerConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager { t := &QueueManager{ - cfg: cfg, - queueName: cfg.Client.Name(), + cfg: cfg, + externalLabels: externalLabels, + relabelConfigs: relabelConfigs, + client: client, + queueName: client.Name(), + logLimiter: rate.NewLimiter(logRateLimit, logBurst), numShards: 1, reshardChan: make(chan int), @@ -214,14 +222,14 @@ func (t *QueueManager) Append(s *model.Sample) error { snew = *s snew.Metric = s.Metric.Clone() - for ln, lv := range t.cfg.ExternalLabels { + for ln, lv := range t.externalLabels { if _, ok := s.Metric[ln]; !ok { snew.Metric[ln] = lv } } snew.Metric = model.Metric( - relabel.Process(model.LabelSet(snew.Metric), t.cfg.RelabelConfigs...)) + relabel.Process(model.LabelSet(snew.Metric), t.relabelConfigs...)) if snew.Metric == nil { return nil @@ -467,21 +475,38 @@ func (s *shards) runShard(i int) { } func (s *shards) sendSamples(samples model.Samples) { - // Samples are sent to the remote storage on a best-effort basis. If a - // sample isn't sent correctly the first time, it's simply dropped on the - // floor. begin := time.Now() - err := s.qm.cfg.Client.Store(samples) - duration := time.Since(begin) - - if err != nil { - log.Warnf("error sending %d samples to remote storage: %s", len(samples), err) - failedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) - } else { - sentSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) - } - sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(duration.Seconds()) + s.sendSamplesWithBackoff(samples) + // These counters are used to caclulate the dynamic sharding, and as such + // should be maintained irrespective of success or failure. s.qm.samplesOut.incr(int64(len(samples))) - s.qm.samplesOutDuration.incr(int64(duration)) + s.qm.samplesOutDuration.incr(int64(time.Since(begin))) +} + +// sendSamples to the remote storage with backoff for recoverable errors. +func (s *shards) sendSamplesWithBackoff(samples model.Samples) { + backoff := s.qm.cfg.MinBackoff + for retries := s.qm.cfg.MaxRetries; retries > 0; retries-- { + begin := time.Now() + err := s.qm.client.Store(samples) + + sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(time.Since(begin).Seconds()) + if err == nil { + succeededSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) + return + } + + log.Warnf("Error sending %d samples to remote storage: %s", len(samples), err) + if _, ok := err.(recoverableError); !ok { + break + } + time.Sleep(backoff) + backoff = backoff * 2 + if backoff > s.qm.cfg.MaxBackoff { + backoff = s.qm.cfg.MaxBackoff + } + } + + failedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index bfe4b69e1..c97c00714 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -81,7 +81,7 @@ func (c *TestStorageClient) Name() string { func TestSampleDelivery(t *testing.T) { // Let's create an even number of send batches so we don't run into the // batch timeout case. - n := defaultQueueCapacity * 2 + n := defaultQueueManagerConfig.QueueCapacity * 2 samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { @@ -97,10 +97,9 @@ func TestSampleDelivery(t *testing.T) { c := NewTestStorageClient() c.expectSamples(samples[:len(samples)/2]) - m := NewQueueManager(QueueManagerConfig{ - Client: c, - MaxShards: 1, - }) + cfg := defaultQueueManagerConfig + cfg.MaxShards = 1 + m := NewQueueManager(cfg, nil, nil, c) // These should be received by the client. for _, s := range samples[:len(samples)/2] { @@ -118,7 +117,7 @@ func TestSampleDelivery(t *testing.T) { func TestSampleDeliveryOrder(t *testing.T) { ts := 10 - n := defaultMaxSamplesPerSend * ts + n := defaultQueueManagerConfig.MaxSamplesPerSend * ts samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { @@ -134,11 +133,7 @@ func TestSampleDeliveryOrder(t *testing.T) { c := NewTestStorageClient() c.expectSamples(samples) - m := NewQueueManager(QueueManagerConfig{ - Client: c, - // Ensure we don't drop samples in this test. - QueueCapacity: n, - }) + m := NewQueueManager(defaultQueueManagerConfig, nil, nil, c) // These should be received by the client. for _, s := range samples { @@ -199,7 +194,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { // `MaxSamplesPerSend*Shards` samples should be consumed by the // per-shard goroutines, and then another `MaxSamplesPerSend` // should be left on the queue. - n := defaultMaxSamplesPerSend*1 + defaultMaxSamplesPerSend + n := defaultQueueManagerConfig.MaxSamplesPerSend * 2 samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { @@ -213,11 +208,10 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { } c := NewTestBlockedStorageClient() - m := NewQueueManager(QueueManagerConfig{ - Client: c, - QueueCapacity: n, - MaxShards: 1, - }) + cfg := defaultQueueManagerConfig + cfg.MaxShards = 1 + cfg.QueueCapacity = n + m := NewQueueManager(cfg, nil, nil, c) m.Start() @@ -246,7 +240,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { time.Sleep(10 * time.Millisecond) } - if m.queueLen() != defaultMaxSamplesPerSend { + if m.queueLen() != defaultQueueManagerConfig.MaxSamplesPerSend { t.Fatalf("Failed to drain QueueManager queue, %d elements left", m.queueLen(), ) diff --git a/storage/remote/write.go b/storage/remote/write.go index 825bf3fab..02affecad 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -44,11 +44,12 @@ func (w *Writer) ApplyConfig(conf *config.Config) error { if err != nil { return err } - newQueues = append(newQueues, NewQueueManager(QueueManagerConfig{ - Client: c, - ExternalLabels: conf.GlobalConfig.ExternalLabels, - RelabelConfigs: rwConf.WriteRelabelConfigs, - })) + newQueues = append(newQueues, NewQueueManager( + defaultQueueManagerConfig, + conf.GlobalConfig.ExternalLabels, + rwConf.WriteRelabelConfigs, + c, + )) } for _, q := range w.queues {