From 9da538210330bd30572a40b1dc263c7c5cebbfae Mon Sep 17 00:00:00 2001 From: lzhfromustc <43191155+lzhfromustc@users.noreply.github.com> Date: Fri, 29 Oct 2021 16:39:02 -0700 Subject: [PATCH] storage/remote: Prevent two goroutines from endless loop (#8967) Signed-off-by: lzhfromustc --- storage/remote/storage.go | 6 ++++-- storage/remote/write.go | 12 ++++++++++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/storage/remote/storage.go b/storage/remote/storage.go index df20af713..cebbffdd1 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -51,7 +51,7 @@ type startTimeCallback func() (int64, error) // Storage represents all the remote read and write endpoints. It implements // storage.Storage. type Storage struct { - logger log.Logger + logger *logging.Deduper mtx sync.Mutex rws *WriteStorage @@ -66,9 +66,10 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal if l == nil { l = log.NewNopLogger() } + logger := logging.Dedupe(l, 1*time.Minute) s := &Storage{ - logger: logging.Dedupe(l, 1*time.Minute), + logger: logger, localStartTimeCallback: stCallback, } s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm) @@ -186,6 +187,7 @@ func (s *Storage) LowestSentTimestamp() int64 { // Close the background processing of the storage queues. func (s *Storage) Close() error { + s.logger.Stop() s.mtx.Lock() defer s.mtx.Unlock() return s.rws.Close() diff --git a/storage/remote/write.go b/storage/remote/write.go index b56d258d7..cbbfaf52e 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -61,6 +61,7 @@ type WriteStorage struct { flushDeadline time.Duration interner *pool scraper ReadyScrapeManager + quit chan struct{} // For timestampTracker. highestTimestamp *maxTimestamp @@ -82,6 +83,7 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string walDir: walDir, interner: newPool(), scraper: sm, + quit: make(chan struct{}), highestTimestamp: &maxTimestamp{ Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, @@ -101,8 +103,13 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string func (rws *WriteStorage) run() { ticker := time.NewTicker(shardUpdateDuration) defer ticker.Stop() - for range ticker.C { - rws.samplesIn.tick() + for { + select { + case <-ticker.C: + rws.samplesIn.tick() + case <-rws.quit: + return + } } } @@ -235,6 +242,7 @@ func (rws *WriteStorage) Close() error { for _, q := range rws.queues { q.Stop() } + close(rws.quit) return nil }