mirror of
https://github.com/prometheus/prometheus
synced 2024-12-26 00:23:18 +00:00
Make queue manager configurable.
This commit is contained in:
parent
4b868113bb
commit
454b661145
@ -172,7 +172,26 @@ var (
|
||||
|
||||
// DefaultRemoteWriteConfig is the default remote write configuration.
|
||||
DefaultRemoteWriteConfig = RemoteWriteConfig{
|
||||
RemoteTimeout: model.Duration(30 * time.Second),
|
||||
RemoteTimeout: model.Duration(30 * time.Second),
|
||||
QueueManagerConfig: DefaultQueueManagerConfig,
|
||||
}
|
||||
|
||||
// DefaultQueueManagerConfig is the default remote queue configuration.
|
||||
DefaultQueueManagerConfig = QueueManagerConfig{
|
||||
// With a maximum of 1000 shards, assuming an average of 100ms remote write
|
||||
// time and 100 samples per batch, we will be able to push 1M samples/s.
|
||||
MaxShards: 1000,
|
||||
MaxSamplesPerSend: 100,
|
||||
|
||||
// By default, buffer 1000 batches, which at 100ms per batch is 1:40mins. At
|
||||
// 1000 shards, this will buffer 100M samples total.
|
||||
QueueCapacity: 100 * 1000,
|
||||
BatchSendDeadline: 5 * time.Second,
|
||||
|
||||
// Max number of times to retry a batch on recoverable errors.
|
||||
MaxRetries: 10,
|
||||
MinBackoff: 30 * time.Millisecond,
|
||||
MaxBackoff: 100 * time.Millisecond,
|
||||
}
|
||||
|
||||
// DefaultRemoteReadConfig is the default remote read configuration.
|
||||
@ -1391,7 +1410,8 @@ type RemoteWriteConfig struct {
|
||||
|
||||
// We cannot do proper Go type embedding below as the parser will then parse
|
||||
// values arbitrarily into the overflow maps of further-down types.
|
||||
HTTPClientConfig HTTPClientConfig `yaml:",inline"`
|
||||
HTTPClientConfig HTTPClientConfig `yaml:",inline"`
|
||||
QueueManagerConfig QueueManagerConfig `yaml:",inline"`
|
||||
|
||||
// Catches all undefined fields and must be empty after parsing.
|
||||
XXX map[string]interface{} `yaml:",inline"`
|
||||
@ -1404,12 +1424,43 @@ func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) err
|
||||
if err := unmarshal((*plain)(c)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// The UnmarshalYAML method of HTTPClientConfig is not being called because it's not a pointer.
|
||||
// We cannot make it a pointer as the parser panics for inlined pointer structs.
|
||||
// Thus we just do its validation here.
|
||||
if err := c.HTTPClientConfig.validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := checkOverflow(c.XXX, "remote_write"); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// QueueManagerConfig is the configuration for the queue used to write to remote
|
||||
// storage.
|
||||
type QueueManagerConfig struct {
|
||||
// Number of samples to buffer per shard before we start dropping them.
|
||||
QueueCapacity int
|
||||
|
||||
// Max number of shards, i.e. amount of concurrency.
|
||||
MaxShards int
|
||||
|
||||
// Maximum number of samples per send.
|
||||
MaxSamplesPerSend int
|
||||
|
||||
// Maximum time sample will wait in buffer.
|
||||
BatchSendDeadline time.Duration
|
||||
|
||||
// Max number of times to retry a batch on recoverable errors.
|
||||
MaxRetries int
|
||||
|
||||
// On recoverable errors, backoff exponentially.
|
||||
MinBackoff time.Duration
|
||||
MaxBackoff time.Duration
|
||||
}
|
||||
|
||||
// RemoteReadConfig is the configuration for reading from remote storage.
|
||||
type RemoteReadConfig struct {
|
||||
URL *URL `yaml:"url,omitempty"`
|
||||
@ -1430,6 +1481,14 @@ func (c *RemoteReadConfig) UnmarshalYAML(unmarshal func(interface{}) error) erro
|
||||
if err := unmarshal((*plain)(c)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// The UnmarshalYAML method of HTTPClientConfig is not being called because it's not a pointer.
|
||||
// We cannot make it a pointer as the parser panics for inlined pointer structs.
|
||||
// Thus we just do its validation here.
|
||||
if err := c.HTTPClientConfig.validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := checkOverflow(c.XXX, "remote_read"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -66,10 +66,12 @@ var expectedConf = &Config{
|
||||
Action: RelabelDrop,
|
||||
},
|
||||
},
|
||||
QueueManagerConfig: DefaultQueueManagerConfig,
|
||||
},
|
||||
{
|
||||
URL: mustParseURL("http://remote2/push"),
|
||||
RemoteTimeout: model.Duration(30 * time.Second),
|
||||
URL: mustParseURL("http://remote2/push"),
|
||||
RemoteTimeout: model.Duration(30 * time.Second),
|
||||
QueueManagerConfig: DefaultQueueManagerConfig,
|
||||
},
|
||||
},
|
||||
|
||||
|
@ -123,42 +123,6 @@ func init() {
|
||||
prometheus.MustRegister(numShards)
|
||||
}
|
||||
|
||||
// QueueManagerConfig is the configuration for the queue used to write to remote
|
||||
// storage.
|
||||
type QueueManagerConfig struct {
|
||||
// Number of samples to buffer per shard before we start dropping them.
|
||||
QueueCapacity int
|
||||
// Max number of shards, i.e. amount of concurrency.
|
||||
MaxShards int
|
||||
// Maximum number of samples per send.
|
||||
MaxSamplesPerSend int
|
||||
// Maximum time sample will wait in buffer.
|
||||
BatchSendDeadline time.Duration
|
||||
// Max number of times to retry a batch on recoverable errors.
|
||||
MaxRetries int
|
||||
// On recoverable errors, backoff exponentially.
|
||||
MinBackoff time.Duration
|
||||
MaxBackoff time.Duration
|
||||
}
|
||||
|
||||
// defaultQueueManagerConfig is the default remote queue configuration.
|
||||
var defaultQueueManagerConfig = QueueManagerConfig{
|
||||
// With a maximum of 1000 shards, assuming an average of 100ms remote write
|
||||
// time and 100 samples per batch, we will be able to push 1M samples/s.
|
||||
MaxShards: 1000,
|
||||
MaxSamplesPerSend: 100,
|
||||
|
||||
// By default, buffer 1000 batches, which at 100ms per batch is 1:40mins. At
|
||||
// 1000 shards, this will buffer 100M samples total.
|
||||
QueueCapacity: 100 * 1000,
|
||||
BatchSendDeadline: 5 * time.Second,
|
||||
|
||||
// Max number of times to retry a batch on recoverable errors.
|
||||
MaxRetries: 10,
|
||||
MinBackoff: 30 * time.Millisecond,
|
||||
MaxBackoff: 100 * time.Millisecond,
|
||||
}
|
||||
|
||||
// StorageClient defines an interface for sending a batch of samples to an
|
||||
// external timeseries database.
|
||||
type StorageClient interface {
|
||||
@ -171,7 +135,7 @@ type StorageClient interface {
|
||||
// QueueManager manages a queue of samples to be sent to the Storage
|
||||
// indicated by the provided StorageClient.
|
||||
type QueueManager struct {
|
||||
cfg QueueManagerConfig
|
||||
cfg config.QueueManagerConfig
|
||||
externalLabels model.LabelSet
|
||||
relabelConfigs []*config.RelabelConfig
|
||||
client StorageClient
|
||||
@ -190,7 +154,7 @@ type QueueManager struct {
|
||||
}
|
||||
|
||||
// NewQueueManager builds a new QueueManager.
|
||||
func NewQueueManager(cfg QueueManagerConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager {
|
||||
func NewQueueManager(cfg config.QueueManagerConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager {
|
||||
t := &QueueManager{
|
||||
cfg: cfg,
|
||||
externalLabels: externalLabels,
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
)
|
||||
|
||||
type TestStorageClient struct {
|
||||
@ -81,7 +82,7 @@ func (c *TestStorageClient) Name() string {
|
||||
func TestSampleDelivery(t *testing.T) {
|
||||
// Let's create an even number of send batches so we don't run into the
|
||||
// batch timeout case.
|
||||
n := defaultQueueManagerConfig.QueueCapacity * 2
|
||||
n := config.DefaultQueueManagerConfig.QueueCapacity * 2
|
||||
|
||||
samples := make(model.Samples, 0, n)
|
||||
for i := 0; i < n; i++ {
|
||||
@ -97,7 +98,7 @@ func TestSampleDelivery(t *testing.T) {
|
||||
c := NewTestStorageClient()
|
||||
c.expectSamples(samples[:len(samples)/2])
|
||||
|
||||
cfg := defaultQueueManagerConfig
|
||||
cfg := config.DefaultQueueManagerConfig
|
||||
cfg.MaxShards = 1
|
||||
m := NewQueueManager(cfg, nil, nil, c)
|
||||
|
||||
@ -117,7 +118,7 @@ func TestSampleDelivery(t *testing.T) {
|
||||
|
||||
func TestSampleDeliveryOrder(t *testing.T) {
|
||||
ts := 10
|
||||
n := defaultQueueManagerConfig.MaxSamplesPerSend * ts
|
||||
n := config.DefaultQueueManagerConfig.MaxSamplesPerSend * ts
|
||||
|
||||
samples := make(model.Samples, 0, n)
|
||||
for i := 0; i < n; i++ {
|
||||
@ -133,7 +134,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
|
||||
|
||||
c := NewTestStorageClient()
|
||||
c.expectSamples(samples)
|
||||
m := NewQueueManager(defaultQueueManagerConfig, nil, nil, c)
|
||||
m := NewQueueManager(config.DefaultQueueManagerConfig, nil, nil, c)
|
||||
|
||||
// These should be received by the client.
|
||||
for _, s := range samples {
|
||||
@ -194,7 +195,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
|
||||
// `MaxSamplesPerSend*Shards` samples should be consumed by the
|
||||
// per-shard goroutines, and then another `MaxSamplesPerSend`
|
||||
// should be left on the queue.
|
||||
n := defaultQueueManagerConfig.MaxSamplesPerSend * 2
|
||||
n := config.DefaultQueueManagerConfig.MaxSamplesPerSend * 2
|
||||
|
||||
samples := make(model.Samples, 0, n)
|
||||
for i := 0; i < n; i++ {
|
||||
@ -208,7 +209,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
|
||||
}
|
||||
|
||||
c := NewTestBlockedStorageClient()
|
||||
cfg := defaultQueueManagerConfig
|
||||
cfg := config.DefaultQueueManagerConfig
|
||||
cfg.MaxShards = 1
|
||||
cfg.QueueCapacity = n
|
||||
m := NewQueueManager(cfg, nil, nil, c)
|
||||
@ -240,7 +241,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
if m.queueLen() != defaultQueueManagerConfig.MaxSamplesPerSend {
|
||||
if m.queueLen() != config.DefaultQueueManagerConfig.MaxSamplesPerSend {
|
||||
t.Fatalf("Failed to drain QueueManager queue, %d elements left",
|
||||
m.queueLen(),
|
||||
)
|
||||
|
@ -45,7 +45,7 @@ func (w *Writer) ApplyConfig(conf *config.Config) error {
|
||||
return err
|
||||
}
|
||||
newQueues = append(newQueues, NewQueueManager(
|
||||
defaultQueueManagerConfig,
|
||||
rwConf.QueueManagerConfig,
|
||||
conf.GlobalConfig.ExternalLabels,
|
||||
rwConf.WriteRelabelConfigs,
|
||||
c,
|
||||
|
Loading…
Reference in New Issue
Block a user