Make the flush deadline configurable.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
This commit is contained in:
Tom Wilkie 2018-02-01 13:25:15 +00:00
parent aa17263edd
commit a6c353613a
2 changed files with 10 additions and 7 deletions

View File

@ -122,6 +122,8 @@ var (
MaxRetries: 10, MaxRetries: 10,
MinBackoff: 30 * time.Millisecond, MinBackoff: 30 * time.Millisecond,
MaxBackoff: 100 * time.Millisecond, MaxBackoff: 100 * time.Millisecond,
FlushDeadline: 1 * time.Minute,
} }
// DefaultRemoteReadConfig is the default remote read configuration. // DefaultRemoteReadConfig is the default remote read configuration.
@ -646,6 +648,10 @@ type QueueConfig struct {
// On recoverable errors, backoff exponentially. // On recoverable errors, backoff exponentially.
MinBackoff time.Duration `yaml:"min_backoff,omitempty"` MinBackoff time.Duration `yaml:"min_backoff,omitempty"`
MaxBackoff time.Duration `yaml:"max_backoff,omitempty"` MaxBackoff time.Duration `yaml:"max_backoff,omitempty"`
// On shutdown or config reload allow the following duration for flushing
// pending samples, otherwise continue without waiting.
FlushDeadline time.Duration `yaml:"flush_deadline"`
} }
// RemoteReadConfig is the configuration for reading from remote storage. // RemoteReadConfig is the configuration for reading from remote storage.

View File

@ -47,9 +47,6 @@ const (
// Limit to 1 log event every 10s // Limit to 1 log event every 10s
logRateLimit = 0.1 logRateLimit = 0.1
logBurst = 10 logBurst = 10
// Allow 1 minute to flush samples`, then exit anyway.
stopFlushDeadline = 1 * time.Minute
) )
var ( var (
@ -259,7 +256,7 @@ func (t *QueueManager) Stop() {
t.shardsMtx.Lock() t.shardsMtx.Lock()
defer t.shardsMtx.Unlock() defer t.shardsMtx.Unlock()
t.shards.stop() t.shards.stop(t.cfg.FlushDeadline)
level.Info(t.logger).Log("msg", "Remote storage stopped.") level.Info(t.logger).Log("msg", "Remote storage stopped.")
} }
@ -364,7 +361,7 @@ func (t *QueueManager) reshard(n int) {
t.shards = newShards t.shards = newShards
t.shardsMtx.Unlock() t.shardsMtx.Unlock()
oldShards.stop() oldShards.stop(t.cfg.FlushDeadline)
// We start the newShards after we have stopped (the therefore completely // We start the newShards after we have stopped (the therefore completely
// flushed) the oldShards, to guarantee we only every deliver samples in // flushed) the oldShards, to guarantee we only every deliver samples in
@ -403,14 +400,14 @@ func (s *shards) start() {
} }
} }
func (s *shards) stop() { func (s *shards) stop(deadline time.Duration) {
for _, shard := range s.queues { for _, shard := range s.queues {
close(shard) close(shard)
} }
select { select {
case <-s.done: case <-s.done:
case <-time.After(stopFlushDeadline): case <-time.After(deadline):
level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown") level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown")
} }
} }