storage/remote: Prevent two goroutines from endless loop (#8967)

Signed-off-by: lzhfromustc <lzhfromustc@gmail.com>
This commit is contained in:
lzhfromustc 2021-10-29 16:39:02 -07:00 committed by GitHub
parent d42be7be76
commit 9da5382103
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 4 deletions

View File

@ -51,7 +51,7 @@ type startTimeCallback func() (int64, error)
// Storage represents all the remote read and write endpoints. It implements // Storage represents all the remote read and write endpoints. It implements
// storage.Storage. // storage.Storage.
type Storage struct { type Storage struct {
logger log.Logger logger *logging.Deduper
mtx sync.Mutex mtx sync.Mutex
rws *WriteStorage rws *WriteStorage
@ -66,9 +66,10 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
if l == nil { if l == nil {
l = log.NewNopLogger() l = log.NewNopLogger()
} }
logger := logging.Dedupe(l, 1*time.Minute)
s := &Storage{ s := &Storage{
logger: logging.Dedupe(l, 1*time.Minute), logger: logger,
localStartTimeCallback: stCallback, localStartTimeCallback: stCallback,
} }
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm) s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm)
@ -186,6 +187,7 @@ func (s *Storage) LowestSentTimestamp() int64 {
// Close the background processing of the storage queues. // Close the background processing of the storage queues.
func (s *Storage) Close() error { func (s *Storage) Close() error {
s.logger.Stop()
s.mtx.Lock() s.mtx.Lock()
defer s.mtx.Unlock() defer s.mtx.Unlock()
return s.rws.Close() return s.rws.Close()

View File

@ -61,6 +61,7 @@ type WriteStorage struct {
flushDeadline time.Duration flushDeadline time.Duration
interner *pool interner *pool
scraper ReadyScrapeManager scraper ReadyScrapeManager
quit chan struct{}
// For timestampTracker. // For timestampTracker.
highestTimestamp *maxTimestamp highestTimestamp *maxTimestamp
@ -82,6 +83,7 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string
walDir: walDir, walDir: walDir,
interner: newPool(), interner: newPool(),
scraper: sm, scraper: sm,
quit: make(chan struct{}),
highestTimestamp: &maxTimestamp{ highestTimestamp: &maxTimestamp{
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace, Namespace: namespace,
@ -101,8 +103,13 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string
func (rws *WriteStorage) run() { func (rws *WriteStorage) run() {
ticker := time.NewTicker(shardUpdateDuration) ticker := time.NewTicker(shardUpdateDuration)
defer ticker.Stop() defer ticker.Stop()
for range ticker.C { for {
rws.samplesIn.tick() select {
case <-ticker.C:
rws.samplesIn.tick()
case <-rws.quit:
return
}
} }
} }
@ -235,6 +242,7 @@ func (rws *WriteStorage) Close() error {
for _, q := range rws.queues { for _, q := range rws.queues {
q.Stop() q.Stop()
} }
close(rws.quit)
return nil return nil
} }