Improve scraper shutdown time.

- Stop target pools in parallel.
- Stop individual scrapers in goroutines, too.
- Timing tweaks.

Change-Id: I9dff1ee18616694f14b04408eaf1625d0f989696
This commit is contained in:
Bjoern Rabenstein 2014-11-12 00:38:28 +01:00
parent 92156ee89d
commit 74c143c4c9
8 changed files with 53 additions and 19 deletions

View File

@ -229,15 +229,12 @@ func (p *prometheus) Serve() {
if err := p.storage.Stop(); err != nil { if err := p.storage.Stop(); err != nil {
glog.Error("Error stopping local storage: ", err) glog.Error("Error stopping local storage: ", err)
} }
glog.Info("Local Storage: Done")
if p.remoteTSDBQueue != nil { if p.remoteTSDBQueue != nil {
p.remoteTSDBQueue.Stop() p.remoteTSDBQueue.Stop()
glog.Info("Remote Storage: Done")
} }
p.notificationHandler.Stop() p.notificationHandler.Stop()
glog.Info("Sundry Queues: Done")
glog.Info("See you next time!") glog.Info("See you next time!")
} }
@ -258,9 +255,7 @@ func (p *prometheus) interruptHandler() {
func (p *prometheus) close() { func (p *prometheus) close() {
glog.Info("Shutdown has been requested; subsytems are closing:") glog.Info("Shutdown has been requested; subsytems are closing:")
p.targetManager.Stop() p.targetManager.Stop()
glog.Info("Remote Target Manager: Done")
p.ruleManager.Stop() p.ruleManager.Stop()
glog.Info("Rule Executor: Done")
close(p.unwrittenSamples) close(p.unwrittenSamples)
// Note: Before closing the remaining subsystems (storage, ...), we have // Note: Before closing the remaining subsystems (storage, ...), we have

View File

@ -199,8 +199,10 @@ func (n *NotificationHandler) SubmitReqs(reqs NotificationReqs) {
// Stop shuts down the notification handler. // Stop shuts down the notification handler.
func (n *NotificationHandler) Stop() { func (n *NotificationHandler) Stop() {
glog.Info("Stopping notification handler...")
close(n.pendingNotifications) close(n.pendingNotifications)
<-n.stopped <-n.stopped
glog.Info("Notification handler stopped.")
} }
// Describe implements prometheus.Collector. // Describe implements prometheus.Collector.

View File

@ -223,6 +223,7 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration
select { select {
case <-jitterTimer.C: case <-jitterTimer.C:
case <-t.stopScraper: case <-t.stopScraper:
jitterTimer.Stop()
return return
} }
jitterTimer.Stop() jitterTimer.Stop()
@ -232,16 +233,32 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration
t.lastScrape = time.Now() t.lastScrape = time.Now()
t.scrape(ingester) t.scrape(ingester)
// Explanation of the contraption below:
//
// In case t.newBaseLabels or t.stopScraper have something to receive,
// we want to read from those channels rather than starting a new scrape
// (which might take very long). That's why the outer select has no
// ticker.C. Should neither t.newBaseLabels nor t.stopScraper have
// anything to receive, we go into the inner select, where ticker.C is
// in the mix.
for { for {
select { select {
case <-ticker.C:
targetIntervalLength.WithLabelValues(interval.String()).Observe(float64(time.Since(t.lastScrape) / time.Second))
t.lastScrape = time.Now()
t.scrape(ingester)
case newBaseLabels := <-t.newBaseLabels: case newBaseLabels := <-t.newBaseLabels:
t.baseLabels = newBaseLabels t.baseLabels = newBaseLabels
case <-t.stopScraper: case <-t.stopScraper:
return return
default:
select {
case newBaseLabels := <-t.newBaseLabels:
t.baseLabels = newBaseLabels
case <-t.stopScraper:
return
case <-ticker.C:
targetIntervalLength.WithLabelValues(interval.String()).Observe(float64(time.Since(t.lastScrape) / time.Second))
t.lastScrape = time.Now()
t.scrape(ingester)
}
} }
} }
} }

View File

@ -14,6 +14,7 @@
package retrieval package retrieval
import ( import (
"sync"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/prometheus/client_golang/extraction" "github.com/prometheus/client_golang/extraction"
@ -105,13 +106,22 @@ func (m *targetManager) AddTargetsFromConfig(config config.Config) {
} }
func (m *targetManager) Stop() { func (m *targetManager) Stop() {
glog.Info("Target manager exiting...") glog.Info("Stopping target manager...")
for _, p := range m.poolsByJob { var wg sync.WaitGroup
for j, p := range m.poolsByJob {
wg.Add(1)
go func(j string, p *TargetPool) {
defer wg.Done()
glog.Infof("Stopping target pool %q...", j)
p.Stop() p.Stop()
glog.Infof("Target pool %q stopped.", j)
}(j, p)
} }
wg.Wait()
glog.Info("Target manager stopped.")
} }
// TODO: Not really thread-safe. Only used in /status page for now. // TODO: Not goroutine-safe. Only used in /status page for now.
func (m *targetManager) Pools() map[string]*TargetPool { func (m *targetManager) Pools() map[string]*TargetPool {
return m.poolsByJob return m.poolsByJob
} }

View File

@ -71,7 +71,6 @@ func (p *TargetPool) Run() {
p.addTarget(newTarget) p.addTarget(newTarget)
case stopped := <-p.done: case stopped := <-p.done:
p.ReplaceTargets([]Target{}) p.ReplaceTargets([]Target{})
glog.Info("TargetPool exiting...")
close(stopped) close(stopped)
return return
} }
@ -115,13 +114,20 @@ func (p *TargetPool) ReplaceTargets(newTargets []Target) {
} }
} }
// Stop any targets no longer present. // Stop any targets no longer present.
var wg sync.WaitGroup
for k, oldTarget := range p.targetsByAddress { for k, oldTarget := range p.targetsByAddress {
if !newTargetAddresses.Has(k) { if !newTargetAddresses.Has(k) {
glog.V(1).Info("Stopping scraper for target ", k) wg.Add(1)
go func(k string, oldTarget Target) {
defer wg.Done()
glog.V(1).Infof("Stopping scraper for target %s...", k)
oldTarget.StopScraper() oldTarget.StopScraper()
delete(p.targetsByAddress, k) delete(p.targetsByAddress, k)
glog.V(1).Infof("Scraper for target %s stopped.", k)
}(k, oldTarget)
} }
} }
wg.Wait()
} }
func (p *TargetPool) Targets() []Target { func (p *TargetPool) Targets() []Target {

View File

@ -126,13 +126,14 @@ func (m *ruleManager) Run() {
m.runIteration(m.results) m.runIteration(m.results)
iterationDuration.Observe(float64(time.Since(start) / time.Millisecond)) iterationDuration.Observe(float64(time.Since(start) / time.Millisecond))
case <-m.done: case <-m.done:
glog.Info("Rule manager exiting...") glog.Info("Rule manager stopped.")
return return
} }
} }
} }
func (m *ruleManager) Stop() { func (m *ruleManager) Stop() {
glog.Info("Stopping rule manager...")
m.done <- true m.done <- true
} }

View File

@ -181,6 +181,7 @@ func (s *memorySeriesStorage) Start() {
// Stop implements Storage. // Stop implements Storage.
func (s *memorySeriesStorage) Stop() error { func (s *memorySeriesStorage) Stop() error {
glog.Info("Stopping local storage...")
glog.Info("Stopping maintenance loop...") glog.Info("Stopping maintenance loop...")
close(s.loopStopping) close(s.loopStopping)
<-s.loopStopped <-s.loopStopped
@ -197,6 +198,7 @@ func (s *memorySeriesStorage) Stop() error {
if err := s.persistence.close(); err != nil { if err := s.persistence.close(); err != nil {
return err return err
} }
glog.Info("Local storage stopped.")
return nil return nil
} }

View File

@ -122,12 +122,13 @@ func (t *TSDBQueueManager) Queue(s clientmodel.Samples) {
// Stop stops sending samples to the TSDB and waits for pending sends to // Stop stops sending samples to the TSDB and waits for pending sends to
// complete. // complete.
func (t *TSDBQueueManager) Stop() { func (t *TSDBQueueManager) Stop() {
glog.Infof("TSDB queue manager shutting down...") glog.Infof("Stopping remote storage...")
close(t.queue) close(t.queue)
<-t.drained <-t.drained
for i := 0; i < maxConcurrentSends; i++ { for i := 0; i < maxConcurrentSends; i++ {
t.sendSemaphore <- true t.sendSemaphore <- true
} }
glog.Info("Remote storage stopped.")
} }
// Describe implements prometheus.Collector. // Describe implements prometheus.Collector.