diff --git a/main.go b/main.go index 95ab3ef54..3a56e040d 100644 --- a/main.go +++ b/main.go @@ -44,7 +44,34 @@ var ( memoryAppendQueueCapacity = flag.Int("queue.memoryAppendCapacity", 1000000, "The size of the queue for items that are pending writing to memory.") ) +type prometheus struct { + storage metric.Storage + // TODO: Refactor channels to work with arrays of results for better chunking. + scrapeResults chan format.Result + ruleResults chan *rules.Result +} + +func (p prometheus) interruptHandler() { + notifier := make(chan os.Signal) + signal.Notify(notifier, os.Interrupt) + + <-notifier + + log.Println("Received SIGINT; Exiting Gracefully...") + p.close() + os.Exit(0) +} + +func (p prometheus) close() { + p.storage.Close() + close(p.scrapeResults) + close(p.ruleResults) +} + func main() { + // TODO(all): Future additions to main should be, where applicable, glumped + // into the prometheus struct above---at least where the scoping of the entire + // server is concerned. flag.Parse() versionInfoTmpl.Execute(os.Stdout, BuildInfo) @@ -62,23 +89,25 @@ func main() { if err != nil { log.Fatalf("Error opening storage: %s", err) } + + scrapeResults := make(chan format.Result, *scrapeResultsQueueCapacity) + ruleResults := make(chan *rules.Result, *ruleResultsQueueCapacity) + + prometheus := prometheus{ + storage: ts, + scrapeResults: scrapeResults, + ruleResults: ruleResults, + } + defer prometheus.close() + go ts.Serve() - go func() { - notifier := make(chan os.Signal) - signal.Notify(notifier, os.Interrupt) - <-notifier - ts.Close() - os.Exit(0) - }() + go prometheus.interruptHandler() // Queue depth will need to be exposed - scrapeResults := make(chan format.Result, *scrapeResultsQueueCapacity) targetManager := retrieval.NewTargetManager(scrapeResults, *concurrentRetrievalAllowance) targetManager.AddTargetsFromConfig(conf) - ruleResults := make(chan *rules.Result, *ruleResultsQueueCapacity) - ast.SetStorage(ts) ruleManager := rules.NewRuleManager(ruleResults, conf.Global.EvaluationInterval) @@ -97,6 +126,7 @@ func main() { web.StartServing(appState) + // TODO(all): Migrate this into prometheus.serve(). for { select { case scrapeResult := <-scrapeResults: diff --git a/model/metric.go b/model/metric.go index fe7f16e42..9105f05a4 100644 --- a/model/metric.go +++ b/model/metric.go @@ -49,7 +49,6 @@ func (l LabelSet) Merge(other LabelSet) LabelSet { return result } - func (l LabelSet) String() string { var ( buffer bytes.Buffer diff --git a/retrieval/target_test.go b/retrieval/target_test.go index 3630a6892..9fabfac56 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -86,7 +86,7 @@ func TestTargetScrapeTimeout(t *testing.T) { } // let the deadline lapse - time.Sleep(15*time.Millisecond) + time.Sleep(15 * time.Millisecond) // now scrape again signal <- true diff --git a/storage/metric/test_helper.go b/storage/metric/test_helper.go index 26d9bd8d8..c3428425b 100644 --- a/storage/metric/test_helper.go +++ b/storage/metric/test_helper.go @@ -26,6 +26,10 @@ var ( testInstant = time.Date(1972, 7, 18, 19, 5, 45, 0, usEastern).In(time.UTC) ) +const ( + appendQueueSize = 1000 +) + func testAppendSample(p MetricPersistence, s model.Sample, t test.Tester) { err := p.AppendSample(s) if err != nil { @@ -86,7 +90,7 @@ func (t testTieredStorageCloser) Close() { func NewTestTieredStorage(t test.Tester) (storage Storage, closer test.Closer) { var directory test.TemporaryDirectory directory = test.NewTemporaryDirectory("test_tiered_storage", t) - storage, err := NewTieredStorage(5000000, 2500, 1000, 5*time.Second, 15*time.Second, 0*time.Second, directory.Path()) + storage, err := NewTieredStorage(appendQueueSize, 2500, 1000, 5*time.Second, 15*time.Second, 0*time.Second, directory.Path()) if err != nil { if storage != nil { diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 69fecb945..aa45a2eaa 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -21,6 +21,7 @@ import ( dto "github.com/prometheus/prometheus/model/generated" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/raw/leveldb" + "log" "sort" "sync" "time" @@ -93,7 +94,7 @@ func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueu return } -func (t *tieredStorage) AppendSample(s model.Sample) (err error) { +func (t tieredStorage) AppendSample(s model.Sample) (err error) { if len(t.draining) > 0 { return fmt.Errorf("Storage is in the process of draining.") } @@ -103,12 +104,14 @@ func (t *tieredStorage) AppendSample(s model.Sample) (err error) { return } -func (t *tieredStorage) Drain() { +func (t tieredStorage) Drain() { + log.Println("Starting drain...") drainingDone := make(chan bool) if len(t.draining) == 0 { t.draining <- drainingDone } <-drainingDone + log.Println("Done.") } func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration) (view View, err error) { @@ -155,17 +158,17 @@ func (t *tieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) { t.diskFrontier, err = newDiskFrontier(i) if err != nil { - panic(err) + return } return } func (t *tieredStorage) Serve() { - var ( - flushMemoryTicker = time.Tick(t.flushMemoryInterval) - writeMemoryTicker = time.Tick(t.writeMemoryInterval) - reportTicker = time.NewTicker(time.Second) - ) + flushMemoryTicker := time.NewTicker(t.flushMemoryInterval) + defer flushMemoryTicker.Stop() + writeMemoryTicker := time.NewTicker(t.writeMemoryInterval) + defer writeMemoryTicker.Stop() + reportTicker := time.NewTicker(time.Second) defer reportTicker.Stop() go func() { @@ -176,9 +179,9 @@ func (t *tieredStorage) Serve() { for { select { - case <-writeMemoryTicker: + case <-writeMemoryTicker.C: t.writeMemory() - case <-flushMemoryTicker: + case <-flushMemoryTicker.C: t.flushMemory() case viewRequest := <-t.viewQueue: t.renderView(viewRequest) @@ -219,17 +222,24 @@ func (t *tieredStorage) writeMemory() { } } -func (t *tieredStorage) Flush() { +func (t tieredStorage) Flush() { t.flush() } -func (t *tieredStorage) Close() { +func (t tieredStorage) Close() { + log.Println("Closing tiered storage...") t.Drain() t.diskStorage.Close() + t.memoryArena.Close() + + close(t.appendToDiskQueue) + close(t.appendToMemoryQueue) + close(t.viewQueue) + log.Println("Done.") } // Write all pending appends. -func (t *tieredStorage) flush() (err error) { +func (t tieredStorage) flush() (err error) { // Trim any old values to reduce iterative write costs. t.flushMemory() t.writeMemory()