diff --git a/config/config.go b/config/config.go index ca1a1d57d..f757c5c24 100644 --- a/config/config.go +++ b/config/config.go @@ -173,6 +173,25 @@ var ( // DefaultRemoteWriteConfig is the default remote write configuration. DefaultRemoteWriteConfig = RemoteWriteConfig{ RemoteTimeout: model.Duration(30 * time.Second), + QueueConfig: DefaultQueueConfig, + } + + // DefaultQueueConfig is the default remote queue configuration. + DefaultQueueConfig = QueueConfig{ + // With a maximum of 1000 shards, assuming an average of 100ms remote write + // time and 100 samples per batch, we will be able to push 1M samples/s. + MaxShards: 1000, + MaxSamplesPerSend: 100, + + // By default, buffer 1000 batches, which at 100ms per batch is 1:40mins. At + // 1000 shards, this will buffer 100M samples total. + Capacity: 100 * 1000, + BatchSendDeadline: 5 * time.Second, + + // Max number of times to retry a batch on recoverable errors. + MaxRetries: 10, + MinBackoff: 30 * time.Millisecond, + MaxBackoff: 100 * time.Millisecond, } // DefaultRemoteReadConfig is the default remote read configuration. @@ -1392,6 +1411,7 @@ type RemoteWriteConfig struct { // We cannot do proper Go type embedding below as the parser will then parse // values arbitrarily into the overflow maps of further-down types. HTTPClientConfig HTTPClientConfig `yaml:",inline"` + QueueConfig QueueConfig `yaml:"queue_config,omitempty"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` @@ -1404,12 +1424,43 @@ func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) err if err := unmarshal((*plain)(c)); err != nil { return err } + + // The UnmarshalYAML method of HTTPClientConfig is not being called because it's not a pointer. + // We cannot make it a pointer as the parser panics for inlined pointer structs. + // Thus we just do its validation here. + if err := c.HTTPClientConfig.validate(); err != nil { + return err + } + if err := checkOverflow(c.XXX, "remote_write"); err != nil { return err } return nil } +// QueueConfig is the configuration for the queue used to write to remote +// storage. +type QueueConfig struct { + // Number of samples to buffer per shard before we start dropping them. + Capacity int `yaml:"capacity,omitempty"` + + // Max number of shards, i.e. amount of concurrency. + MaxShards int `yaml:"max_shards,omitempty"` + + // Maximum number of samples per send. + MaxSamplesPerSend int `yaml:"max_samples_per_send,omitempty"` + + // Maximum time sample will wait in buffer. + BatchSendDeadline time.Duration `yaml:"batch_send_deadline,omitempty"` + + // Max number of times to retry a batch on recoverable errors. + MaxRetries int `yaml:"max_retries,omitempty"` + + // On recoverable errors, backoff exponentially. + MinBackoff time.Duration `yaml:"min_backoff,omitempty"` + MaxBackoff time.Duration `yaml:"max_backoff,omitempty"` +} + // RemoteReadConfig is the configuration for reading from remote storage. type RemoteReadConfig struct { URL *URL `yaml:"url,omitempty"` @@ -1430,6 +1481,14 @@ func (c *RemoteReadConfig) UnmarshalYAML(unmarshal func(interface{}) error) erro if err := unmarshal((*plain)(c)); err != nil { return err } + + // The UnmarshalYAML method of HTTPClientConfig is not being called because it's not a pointer. + // We cannot make it a pointer as the parser panics for inlined pointer structs. + // Thus we just do its validation here. + if err := c.HTTPClientConfig.validate(); err != nil { + return err + } + if err := checkOverflow(c.XXX, "remote_read"); err != nil { return err } diff --git a/config/config_test.go b/config/config_test.go index 8071d7258..8e4590b3f 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -66,10 +66,12 @@ var expectedConf = &Config{ Action: RelabelDrop, }, }, + QueueConfig: DefaultQueueConfig, }, { URL: mustParseURL("http://remote2/push"), RemoteTimeout: model.Duration(30 * time.Second), + QueueConfig: DefaultQueueConfig, }, }, diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 4bebe2b76..5d426c3b7 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -123,42 +123,6 @@ func init() { prometheus.MustRegister(numShards) } -// QueueManagerConfig is the configuration for the queue used to write to remote -// storage. -type QueueManagerConfig struct { - // Number of samples to buffer per shard before we start dropping them. - QueueCapacity int - // Max number of shards, i.e. amount of concurrency. - MaxShards int - // Maximum number of samples per send. - MaxSamplesPerSend int - // Maximum time sample will wait in buffer. - BatchSendDeadline time.Duration - // Max number of times to retry a batch on recoverable errors. - MaxRetries int - // On recoverable errors, backoff exponentially. - MinBackoff time.Duration - MaxBackoff time.Duration -} - -// defaultQueueManagerConfig is the default remote queue configuration. -var defaultQueueManagerConfig = QueueManagerConfig{ - // With a maximum of 1000 shards, assuming an average of 100ms remote write - // time and 100 samples per batch, we will be able to push 1M samples/s. - MaxShards: 1000, - MaxSamplesPerSend: 100, - - // By default, buffer 1000 batches, which at 100ms per batch is 1:40mins. At - // 1000 shards, this will buffer 100M samples total. - QueueCapacity: 100 * 1000, - BatchSendDeadline: 5 * time.Second, - - // Max number of times to retry a batch on recoverable errors. - MaxRetries: 10, - MinBackoff: 30 * time.Millisecond, - MaxBackoff: 100 * time.Millisecond, -} - // StorageClient defines an interface for sending a batch of samples to an // external timeseries database. type StorageClient interface { @@ -171,7 +135,7 @@ type StorageClient interface { // QueueManager manages a queue of samples to be sent to the Storage // indicated by the provided StorageClient. type QueueManager struct { - cfg QueueManagerConfig + cfg config.QueueConfig externalLabels model.LabelSet relabelConfigs []*config.RelabelConfig client StorageClient @@ -190,7 +154,7 @@ type QueueManager struct { } // NewQueueManager builds a new QueueManager. -func NewQueueManager(cfg QueueManagerConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager { +func NewQueueManager(cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager { t := &QueueManager{ cfg: cfg, externalLabels: externalLabels, @@ -209,7 +173,7 @@ func NewQueueManager(cfg QueueManagerConfig, externalLabels model.LabelSet, rela } t.shards = t.newShards(t.numShards) numShards.WithLabelValues(t.queueName).Set(float64(t.numShards)) - queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.QueueCapacity)) + queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.Capacity)) return t } @@ -397,7 +361,7 @@ type shards struct { func (t *QueueManager) newShards(numShards int) *shards { queues := make([]chan *model.Sample, numShards) for i := 0; i < numShards; i++ { - queues[i] = make(chan *model.Sample, t.cfg.QueueCapacity) + queues[i] = make(chan *model.Sample, t.cfg.Capacity) } s := &shards{ qm: t, diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index c97c00714..4c169cbb9 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" ) type TestStorageClient struct { @@ -81,7 +82,7 @@ func (c *TestStorageClient) Name() string { func TestSampleDelivery(t *testing.T) { // Let's create an even number of send batches so we don't run into the // batch timeout case. - n := defaultQueueManagerConfig.QueueCapacity * 2 + n := config.DefaultQueueConfig.Capacity * 2 samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { @@ -97,7 +98,7 @@ func TestSampleDelivery(t *testing.T) { c := NewTestStorageClient() c.expectSamples(samples[:len(samples)/2]) - cfg := defaultQueueManagerConfig + cfg := config.DefaultQueueConfig cfg.MaxShards = 1 m := NewQueueManager(cfg, nil, nil, c) @@ -117,7 +118,7 @@ func TestSampleDelivery(t *testing.T) { func TestSampleDeliveryOrder(t *testing.T) { ts := 10 - n := defaultQueueManagerConfig.MaxSamplesPerSend * ts + n := config.DefaultQueueConfig.MaxSamplesPerSend * ts samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { @@ -133,7 +134,7 @@ func TestSampleDeliveryOrder(t *testing.T) { c := NewTestStorageClient() c.expectSamples(samples) - m := NewQueueManager(defaultQueueManagerConfig, nil, nil, c) + m := NewQueueManager(config.DefaultQueueConfig, nil, nil, c) // These should be received by the client. for _, s := range samples { @@ -194,7 +195,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { // `MaxSamplesPerSend*Shards` samples should be consumed by the // per-shard goroutines, and then another `MaxSamplesPerSend` // should be left on the queue. - n := defaultQueueManagerConfig.MaxSamplesPerSend * 2 + n := config.DefaultQueueConfig.MaxSamplesPerSend * 2 samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { @@ -208,9 +209,9 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { } c := NewTestBlockedStorageClient() - cfg := defaultQueueManagerConfig + cfg := config.DefaultQueueConfig cfg.MaxShards = 1 - cfg.QueueCapacity = n + cfg.Capacity = n m := NewQueueManager(cfg, nil, nil, c) m.Start() @@ -240,7 +241,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { time.Sleep(10 * time.Millisecond) } - if m.queueLen() != defaultQueueManagerConfig.MaxSamplesPerSend { + if m.queueLen() != config.DefaultQueueConfig.MaxSamplesPerSend { t.Fatalf("Failed to drain QueueManager queue, %d elements left", m.queueLen(), ) diff --git a/storage/remote/write.go b/storage/remote/write.go index 02affecad..ddfa9489a 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -45,7 +45,7 @@ func (w *Writer) ApplyConfig(conf *config.Config) error { return err } newQueues = append(newQueues, NewQueueManager( - defaultQueueManagerConfig, + rwConf.QueueConfig, conf.GlobalConfig.ExternalLabels, rwConf.WriteRelabelConfigs, c,