From d5c3c567d3e2badbdcc844edeaeff4e3b36c71dc Mon Sep 17 00:00:00 2001 From: Levi Harrison Date: Thu, 24 Jun 2021 18:39:50 -0400 Subject: [PATCH] Remote Write: Add max samples per metadata send (#8959) * Added MaxSamplesPerSend Signed-off-by: Levi Harrison * Added tests Signed-off-by: Levi Harrison * Fixed order of require Signed-off-by: Levi Harrison * Added docs Signed-off-by: Levi Harrison * writes -> writesReceived Signed-off-by: Levi Harrison * Improved send loop Signed-off-by: Levi Harrison --- config/config.go | 7 +++++-- docs/configuration/configuration.md | 2 ++ storage/remote/queue_manager.go | 16 +++++++++++----- storage/remote/queue_manager_test.go | 24 ++++++++++++++++++------ 4 files changed, 36 insertions(+), 13 deletions(-) diff --git a/config/config.go b/config/config.go index 1648653c2..af716d1f0 100644 --- a/config/config.go +++ b/config/config.go @@ -173,8 +173,9 @@ var ( // DefaultMetadataConfig is the default metadata configuration for a remote write endpoint. DefaultMetadataConfig = MetadataConfig{ - Send: true, - SendInterval: model.Duration(1 * time.Minute), + Send: true, + SendInterval: model.Duration(1 * time.Minute), + MaxSamplesPerSend: 500, } // DefaultRemoteReadConfig is the default remote read configuration. @@ -731,6 +732,8 @@ type MetadataConfig struct { Send bool `yaml:"send"` // SendInterval controls how frequently we send metric metadata. SendInterval model.Duration `yaml:"send_interval"` + // Maximum number of samples per send. + MaxSamplesPerSend int `yaml:"max_samples_per_send,omitempty"` } // SigV4Config is the configuration for signing remote write requests with diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 939d66e8f..ac1368b0c 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -2443,6 +2443,8 @@ metadata_config: [ send: | default = true ] # How frequently metric metadata is sent to remote storage. [ send_interval: | default = 1m ] + # Maximum number of samples per send. + [ max_samples_per_send: | default = 500] ``` There is a list of diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index b0fde112e..655886b46 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -443,11 +443,17 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met }) } - err := t.sendMetadataWithBackoff(ctx, mm) - - if err != nil { - t.metrics.failedMetadataTotal.Add(float64(len(metadata))) - level.Error(t.logger).Log("msg", "non-recoverable error while sending metadata", "count", len(metadata), "err", err) + numSends := int(math.Ceil(float64(len(metadata)) / float64(t.mcfg.MaxSamplesPerSend))) + for i := 0; i < numSends; i++ { + last := (i + 1) * t.mcfg.MaxSamplesPerSend + if last > len(metadata) { + last = len(metadata) + } + err := t.sendMetadataWithBackoff(ctx, mm[i*t.mcfg.MaxSamplesPerSend:last]) + if err != nil { + t.metrics.failedMetadataTotal.Add(float64(last - (i * t.mcfg.MaxSamplesPerSend))) + level.Error(t.logger).Log("msg", "non-recoverable error while sending metadata", "count", last-(i*t.mcfg.MaxSamplesPerSend), "err", err) + } } } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 5c3493d48..94546fb1d 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -167,16 +167,25 @@ func TestMetadataDelivery(t *testing.T) { m.Start() defer m.Stop() - m.AppendMetadata(context.Background(), []scrape.MetricMetadata{ - { - Metric: "prometheus_remote_storage_sent_metadata_bytes_total", + metadata := []scrape.MetricMetadata{} + numMetadata := 1532 + for i := 0; i < numMetadata; i++ { + metadata = append(metadata, scrape.MetricMetadata{ + Metric: "prometheus_remote_storage_sent_metadata_bytes_total_" + strconv.Itoa(i), Type: textparse.MetricTypeCounter, Help: "a nice help text", Unit: "", - }, - }) + }) + } - require.Equal(t, len(c.receivedMetadata), 1) + m.AppendMetadata(context.Background(), metadata) + + require.Equal(t, numMetadata, len(c.receivedMetadata)) + // One more write than the rounded qoutient should be performed in order to get samples that didn't + // fit into MaxSamplesPerSend. + require.Equal(t, numMetadata/mcfg.MaxSamplesPerSend+1, c.writesReceived) + // Make sure the last samples were sent. + require.Equal(t, c.receivedMetadata[metadata[len(metadata)-1].Metric][0].MetricFamilyName, metadata[len(metadata)-1].Metric) } func TestSampleDeliveryTimeout(t *testing.T) { @@ -522,6 +531,7 @@ type TestWriteClient struct { receivedExemplars map[string][]prompb.Exemplar expectedExemplars map[string][]prompb.Exemplar receivedMetadata map[string][]prompb.MetricMetadata + writesReceived int withWaitGroup bool wg sync.WaitGroup mtx sync.Mutex @@ -655,6 +665,8 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte) error { c.receivedMetadata[m.MetricFamilyName] = append(c.receivedMetadata[m.MetricFamilyName], m) } + c.writesReceived++ + return nil }