From e51d6c4b6ca5aa45e61b03941a727ef1e1919146 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 23 May 2018 15:03:54 +0100 Subject: [PATCH] Make remote flush deadline a command line param. Signed-off-by: Tom Wilkie --- cmd/prometheus/main.go | 24 ++++++++++++++---------- config/config.go | 6 ------ storage/remote/queue_manager.go | 8 +++++--- storage/remote/queue_manager_test.go | 10 ++++++---- storage/remote/storage.go | 11 +++++++++-- 5 files changed, 34 insertions(+), 25 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 93898863a..b411a7342 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -83,15 +83,16 @@ func main() { cfg := struct { configFile string - localStoragePath string - notifier notifier.Options - notifierTimeout model.Duration - web web.Options - tsdb tsdb.Options - lookbackDelta model.Duration - webTimeout model.Duration - queryTimeout model.Duration - queryConcurrency int + localStoragePath string + notifier notifier.Options + notifierTimeout model.Duration + web web.Options + tsdb tsdb.Options + lookbackDelta model.Duration + webTimeout model.Duration + queryTimeout model.Duration + queryConcurrency int + RemoteFlushDeadline model.Duration prometheusURL string @@ -160,6 +161,9 @@ func main() { a.Flag("storage.tsdb.no-lockfile", "Do not create lockfile in data directory."). Default("false").BoolVar(&cfg.tsdb.NoLockfile) + a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload."). + Default(1 * time.Minute).PlaceHolder("").SetValue(&cfg.RemoteFlushDeadline) + a.Flag("alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications."). Default("10000").IntVar(&cfg.notifier.QueueCapacity) @@ -222,7 +226,7 @@ func main() { var ( localStorage = &tsdb.ReadyStorage{} - remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), localStorage.StartTime) + remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), localStorage.StartTime, cfg.RemoteFlushDeadline) fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) ) diff --git a/config/config.go b/config/config.go index 05554d2e2..746aae3bf 100644 --- a/config/config.go +++ b/config/config.go @@ -122,8 +122,6 @@ var ( MaxRetries: 10, MinBackoff: 30 * time.Millisecond, MaxBackoff: 100 * time.Millisecond, - - FlushDeadline: 1 * time.Minute, } // DefaultRemoteReadConfig is the default remote read configuration. @@ -648,10 +646,6 @@ type QueueConfig struct { // On recoverable errors, backoff exponentially. MinBackoff time.Duration `yaml:"min_backoff,omitempty"` MaxBackoff time.Duration `yaml:"max_backoff,omitempty"` - - // On shutdown or config reload allow the following duration for flushing - // pending samples, otherwise continue without waiting. - FlushDeadline time.Duration `yaml:"flush_deadline"` } // RemoteReadConfig is the configuration for reading from remote storage. diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index c0e3df6d2..19ee53afc 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -140,6 +140,7 @@ type StorageClient interface { type QueueManager struct { logger log.Logger + flushDeadline time.Duration cfg config.QueueConfig externalLabels model.LabelSet relabelConfigs []*config.RelabelConfig @@ -159,12 +160,13 @@ type QueueManager struct { } // NewQueueManager builds a new QueueManager. -func NewQueueManager(logger log.Logger, cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager { +func NewQueueManager(logger log.Logger, cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient, flushDeadline time.Duration) *QueueManager { if logger == nil { logger = log.NewNopLogger() } t := &QueueManager{ logger: logger, + flushDeadline: flushDeadline, cfg: cfg, externalLabels: externalLabels, relabelConfigs: relabelConfigs, @@ -256,7 +258,7 @@ func (t *QueueManager) Stop() { t.shardsMtx.Lock() defer t.shardsMtx.Unlock() - t.shards.stop(t.cfg.FlushDeadline) + t.shards.stop(t.flushDeadline) level.Info(t.logger).Log("msg", "Remote storage stopped.") } @@ -361,7 +363,7 @@ func (t *QueueManager) reshard(n int) { t.shards = newShards t.shardsMtx.Unlock() - oldShards.stop(t.cfg.FlushDeadline) + oldShards.stop(t.flushDeadline) // We start the newShards after we have stopped (the therefore completely // flushed) the oldShards, to guarantee we only every deliver samples in diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index e2a5b19cb..b2b2804d3 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -26,6 +26,8 @@ import ( "github.com/prometheus/prometheus/prompb" ) +const defaultFlushDeadline = 1 * time.Minute + type TestStorageClient struct { receivedSamples map[string][]*prompb.Sample expectedSamples map[string][]*prompb.Sample @@ -109,7 +111,7 @@ func TestSampleDelivery(t *testing.T) { cfg := config.DefaultQueueConfig cfg.MaxShards = 1 - m := NewQueueManager(nil, cfg, nil, nil, c) + m := NewQueueManager(nil, cfg, nil, nil, c, defaultFlushDeadline) // These should be received by the client. for _, s := range samples[:len(samples)/2] { @@ -145,7 +147,7 @@ func TestSampleDeliveryTimeout(t *testing.T) { cfg := config.DefaultQueueConfig cfg.MaxShards = 1 cfg.BatchSendDeadline = 100 * time.Millisecond - m := NewQueueManager(nil, cfg, nil, nil, c) + m := NewQueueManager(nil, cfg, nil, nil, c, defaultFlushDeadline) m.Start() defer m.Stop() @@ -181,7 +183,7 @@ func TestSampleDeliveryOrder(t *testing.T) { c := NewTestStorageClient() c.expectSamples(samples) - m := NewQueueManager(nil, config.DefaultQueueConfig, nil, nil, c) + m := NewQueueManager(nil, config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) // These should be received by the client. for _, s := range samples { @@ -259,7 +261,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { cfg := config.DefaultQueueConfig cfg.MaxShards = 1 cfg.Capacity = n - m := NewQueueManager(nil, cfg, nil, nil, c) + m := NewQueueManager(nil, cfg, nil, nil, c, defaultFlushDeadline) m.Start() diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 5192126ea..820d0ffa4 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -16,6 +16,7 @@ package remote import ( "context" "sync" + "time" "github.com/go-kit/kit/log" "github.com/prometheus/common/model" @@ -39,14 +40,19 @@ type Storage struct { // For reads queryables []storage.Queryable localStartTimeCallback startTimeCallback + flushDeadline time.Duration } // NewStorage returns a remote.Storage. -func NewStorage(l log.Logger, stCallback startTimeCallback) *Storage { +func NewStorage(l log.Logger, stCallback startTimeCallback, flushDeadline time.Duration) *Storage { if l == nil { l = log.NewNopLogger() } - return &Storage{logger: l, localStartTimeCallback: stCallback} + return &Storage{ + logger: l, + localStartTimeCallback: stCallback, + flushDeadline: flushDeadline, + } } // ApplyConfig updates the state as the new config requires. @@ -74,6 +80,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { conf.GlobalConfig.ExternalLabels, rwConf.WriteRelabelConfigs, c, + s.flushDeadline, )) }