diff --git a/main.go b/main.go index 76062ea34..e8024cb43 100644 --- a/main.go +++ b/main.go @@ -229,15 +229,12 @@ func (p *prometheus) Serve() { if err := p.storage.Stop(); err != nil { glog.Error("Error stopping local storage: ", err) } - glog.Info("Local Storage: Done") if p.remoteTSDBQueue != nil { p.remoteTSDBQueue.Stop() - glog.Info("Remote Storage: Done") } p.notificationHandler.Stop() - glog.Info("Sundry Queues: Done") glog.Info("See you next time!") } @@ -258,9 +255,7 @@ func (p *prometheus) interruptHandler() { func (p *prometheus) close() { glog.Info("Shutdown has been requested; subsytems are closing:") p.targetManager.Stop() - glog.Info("Remote Target Manager: Done") p.ruleManager.Stop() - glog.Info("Rule Executor: Done") close(p.unwrittenSamples) // Note: Before closing the remaining subsystems (storage, ...), we have diff --git a/notification/notification.go b/notification/notification.go index 745811e3f..df215137f 100644 --- a/notification/notification.go +++ b/notification/notification.go @@ -199,8 +199,10 @@ func (n *NotificationHandler) SubmitReqs(reqs NotificationReqs) { // Stop shuts down the notification handler. func (n *NotificationHandler) Stop() { + glog.Info("Stopping notification handler...") close(n.pendingNotifications) <-n.stopped + glog.Info("Notification handler stopped.") } // Describe implements prometheus.Collector. diff --git a/retrieval/target.go b/retrieval/target.go index 87e940362..977389b7b 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -223,6 +223,7 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration select { case <-jitterTimer.C: case <-t.stopScraper: + jitterTimer.Stop() return } jitterTimer.Stop() @@ -232,16 +233,32 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration t.lastScrape = time.Now() t.scrape(ingester) + + // Explanation of the contraption below: + // + // In case t.newBaseLabels or t.stopScraper have something to receive, + // we want to read from those channels rather than starting a new scrape + // (which might take very long). That's why the outer select has no + // ticker.C. Should neither t.newBaseLabels nor t.stopScraper have + // anything to receive, we go into the inner select, where ticker.C is + // in the mix. for { select { - case <-ticker.C: - targetIntervalLength.WithLabelValues(interval.String()).Observe(float64(time.Since(t.lastScrape) / time.Second)) - t.lastScrape = time.Now() - t.scrape(ingester) case newBaseLabels := <-t.newBaseLabels: t.baseLabels = newBaseLabels case <-t.stopScraper: return + default: + select { + case newBaseLabels := <-t.newBaseLabels: + t.baseLabels = newBaseLabels + case <-t.stopScraper: + return + case <-ticker.C: + targetIntervalLength.WithLabelValues(interval.String()).Observe(float64(time.Since(t.lastScrape) / time.Second)) + t.lastScrape = time.Now() + t.scrape(ingester) + } } } } diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index f88e71d34..882b5364b 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -14,6 +14,7 @@ package retrieval import ( + "sync" "github.com/golang/glog" "github.com/prometheus/client_golang/extraction" @@ -105,13 +106,22 @@ func (m *targetManager) AddTargetsFromConfig(config config.Config) { } func (m *targetManager) Stop() { - glog.Info("Target manager exiting...") - for _, p := range m.poolsByJob { - p.Stop() + glog.Info("Stopping target manager...") + var wg sync.WaitGroup + for j, p := range m.poolsByJob { + wg.Add(1) + go func(j string, p *TargetPool) { + defer wg.Done() + glog.Infof("Stopping target pool %q...", j) + p.Stop() + glog.Infof("Target pool %q stopped.", j) + }(j, p) } + wg.Wait() + glog.Info("Target manager stopped.") } -// TODO: Not really thread-safe. Only used in /status page for now. +// TODO: Not goroutine-safe. Only used in /status page for now. func (m *targetManager) Pools() map[string]*TargetPool { return m.poolsByJob } diff --git a/retrieval/targetpool.go b/retrieval/targetpool.go index 6b670acbe..83bf13a33 100644 --- a/retrieval/targetpool.go +++ b/retrieval/targetpool.go @@ -71,7 +71,6 @@ func (p *TargetPool) Run() { p.addTarget(newTarget) case stopped := <-p.done: p.ReplaceTargets([]Target{}) - glog.Info("TargetPool exiting...") close(stopped) return } @@ -115,13 +114,20 @@ func (p *TargetPool) ReplaceTargets(newTargets []Target) { } } // Stop any targets no longer present. + var wg sync.WaitGroup for k, oldTarget := range p.targetsByAddress { if !newTargetAddresses.Has(k) { - glog.V(1).Info("Stopping scraper for target ", k) - oldTarget.StopScraper() - delete(p.targetsByAddress, k) + wg.Add(1) + go func(k string, oldTarget Target) { + defer wg.Done() + glog.V(1).Infof("Stopping scraper for target %s...", k) + oldTarget.StopScraper() + delete(p.targetsByAddress, k) + glog.V(1).Infof("Scraper for target %s stopped.", k) + }(k, oldTarget) } } + wg.Wait() } func (p *TargetPool) Targets() []Target { diff --git a/rules/manager/manager.go b/rules/manager/manager.go index 51e63ae9f..f15e57c90 100644 --- a/rules/manager/manager.go +++ b/rules/manager/manager.go @@ -126,13 +126,14 @@ func (m *ruleManager) Run() { m.runIteration(m.results) iterationDuration.Observe(float64(time.Since(start) / time.Millisecond)) case <-m.done: - glog.Info("Rule manager exiting...") + glog.Info("Rule manager stopped.") return } } } func (m *ruleManager) Stop() { + glog.Info("Stopping rule manager...") m.done <- true } diff --git a/storage/local/storage.go b/storage/local/storage.go index cd628c9a7..c82e56e3b 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -181,6 +181,7 @@ func (s *memorySeriesStorage) Start() { // Stop implements Storage. func (s *memorySeriesStorage) Stop() error { + glog.Info("Stopping local storage...") glog.Info("Stopping maintenance loop...") close(s.loopStopping) <-s.loopStopped @@ -197,6 +198,7 @@ func (s *memorySeriesStorage) Stop() error { if err := s.persistence.close(); err != nil { return err } + glog.Info("Local storage stopped.") return nil } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 31acfeddf..f1bbdf571 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -122,12 +122,13 @@ func (t *TSDBQueueManager) Queue(s clientmodel.Samples) { // Stop stops sending samples to the TSDB and waits for pending sends to // complete. func (t *TSDBQueueManager) Stop() { - glog.Infof("TSDB queue manager shutting down...") + glog.Infof("Stopping remote storage...") close(t.queue) <-t.drained for i := 0; i < maxConcurrentSends; i++ { t.sendSemaphore <- true } + glog.Info("Remote storage stopped.") } // Describe implements prometheus.Collector.