From a6c353613a90230a5f7ae8ebc358373e3f09ef02 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 1 Feb 2018 13:25:15 +0000 Subject: [PATCH] Make the flush deadline configurable. Signed-off-by: Tom Wilkie --- config/config.go | 6 ++++++ storage/remote/queue_manager.go | 11 ++++------- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/config/config.go b/config/config.go index 746aae3bf..05554d2e2 100644 --- a/config/config.go +++ b/config/config.go @@ -122,6 +122,8 @@ var ( MaxRetries: 10, MinBackoff: 30 * time.Millisecond, MaxBackoff: 100 * time.Millisecond, + + FlushDeadline: 1 * time.Minute, } // DefaultRemoteReadConfig is the default remote read configuration. @@ -646,6 +648,10 @@ type QueueConfig struct { // On recoverable errors, backoff exponentially. MinBackoff time.Duration `yaml:"min_backoff,omitempty"` MaxBackoff time.Duration `yaml:"max_backoff,omitempty"` + + // On shutdown or config reload allow the following duration for flushing + // pending samples, otherwise continue without waiting. + FlushDeadline time.Duration `yaml:"flush_deadline"` } // RemoteReadConfig is the configuration for reading from remote storage. diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index abe0caf7c..c0e3df6d2 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -47,9 +47,6 @@ const ( // Limit to 1 log event every 10s logRateLimit = 0.1 logBurst = 10 - - // Allow 1 minute to flush samples`, then exit anyway. - stopFlushDeadline = 1 * time.Minute ) var ( @@ -259,7 +256,7 @@ func (t *QueueManager) Stop() { t.shardsMtx.Lock() defer t.shardsMtx.Unlock() - t.shards.stop() + t.shards.stop(t.cfg.FlushDeadline) level.Info(t.logger).Log("msg", "Remote storage stopped.") } @@ -364,7 +361,7 @@ func (t *QueueManager) reshard(n int) { t.shards = newShards t.shardsMtx.Unlock() - oldShards.stop() + oldShards.stop(t.cfg.FlushDeadline) // We start the newShards after we have stopped (the therefore completely // flushed) the oldShards, to guarantee we only every deliver samples in @@ -403,14 +400,14 @@ func (s *shards) start() { } } -func (s *shards) stop() { +func (s *shards) stop(deadline time.Duration) { for _, shard := range s.queues { close(shard) } select { case <-s.done: - case <-time.After(stopFlushDeadline): + case <-time.After(deadline): level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown") } }