diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 41e001c06..fb931c2f2 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -46,6 +46,9 @@ const ( // Limit to 1 log event every 10s logRateLimit = 0.1 logBurst = 10 + + // Allow 1 minute to flush samples`, then exit anyway. + stopFlushDeadline = 1 * time.Minute ) var ( @@ -403,7 +406,18 @@ func (s *shards) stop() { for _, shard := range s.queues { close(shard) } - s.wg.Wait() + + done := make(chan struct{}) + go func() { + s.wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(stopFlushDeadline): + level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown") + } } func (s *shards) enqueue(sample *model.Sample) bool {