diff --git a/main.go b/main.go index 0f63f8da5..300bb79b5 100644 --- a/main.go +++ b/main.go @@ -74,7 +74,7 @@ type prometheus struct { compactionTimer *time.Ticker deletionTimer *time.Ticker - curationSema chan bool + curationSema chan struct{} stopBackgroundOperations chan bool unwrittenSamples chan *extraction.Result @@ -101,17 +101,22 @@ func (p *prometheus) interruptHandler() { func (p *prometheus) compact(olderThan time.Duration, groupSize int) error { select { - case p.curationSema <- true: + case s, ok := <-p.curationSema: + if !ok { + glog.Warning("Prometheus is shutting down; no more curation runs are allowed.") + return nil + } + + defer func() { + p.curationSema <- s + }() + default: glog.Warningf("Deferred compaction for %s and %s due to existing operation.", olderThan, groupSize) return nil } - defer func() { - <-p.curationSema - }() - processor := metric.NewCompactionProcessor(&metric.CompactionProcessorOptions{ MaximumMutationPoolBatch: groupSize * 3, MinimumGroupSize: groupSize, @@ -130,7 +135,16 @@ func (p *prometheus) compact(olderThan time.Duration, groupSize int) error { func (p *prometheus) delete(olderThan time.Duration, batchSize int) error { select { - case p.curationSema <- true: + case s, ok := <-p.curationSema: + if !ok { + glog.Warning("Prometheus is shutting down; no more curation runs are allowed.") + return nil + } + + defer func() { + p.curationSema <- s + }() + default: glog.Warningf("Deferred deletion for %s due to existing operation.", olderThan) @@ -153,10 +167,7 @@ func (p *prometheus) delete(olderThan time.Duration, batchSize int) error { } func (p *prometheus) close() { - select { - case p.curationSema <- true: - default: - } + close(p.curationSema) if p.compactionTimer != nil { p.compactionTimer.Stop() @@ -291,7 +302,7 @@ func main() { deletionTimer: deletionTimer, curationState: prometheusStatus, - curationSema: make(chan bool, 1), + curationSema: make(chan struct{}, 1), unwrittenSamples: unwrittenSamples, @@ -305,6 +316,8 @@ func main() { } defer prometheus.close() + prometheus.curationSema <- struct{}{} + storageStarted := make(chan bool) go ts.Serve(storageStarted) <-storageStarted