diff --git a/main.go b/main.go index 47d94b46c..99b005547 100644 --- a/main.go +++ b/main.go @@ -118,11 +118,7 @@ func NewPrometheus() *prometheus { PedanticChecks: *storagePedanticChecks, SyncStrategy: syncStrategy, } - memStorage, err := local.NewMemorySeriesStorage(o) - if err != nil { - glog.Error("Error opening memory series storage: ", err) - os.Exit(1) - } + memStorage := local.NewMemorySeriesStorage(o) var sampleAppender storage.SampleAppender var remoteStorageQueues []*remote.StorageQueueManager @@ -213,38 +209,47 @@ func NewPrometheus() *prometheus { } webService.QuitChan = make(chan struct{}) - p.reloadConfig() + if !p.reloadConfig() { + os.Exit(1) + } return p } -func (p *prometheus) reloadConfig() { +func (p *prometheus) reloadConfig() bool { glog.Infof("Loading configuration file %s", *configFile) conf, err := config.LoadFromFile(*configFile) if err != nil { glog.Errorf("Couldn't load configuration (-config.file=%s): %v", *configFile, err) glog.Errorf("Note: The configuration format has changed with version 0.14, please check the documentation.") - return + return false } p.webService.StatusHandler.ApplyConfig(conf) p.targetManager.ApplyConfig(conf) p.ruleManager.ApplyConfig(conf) + + return true } // Serve starts the Prometheus server. It returns after the server has been shut // down. The method installs an interrupt handler, allowing to trigger a // shutdown by sending SIGTERM to the process. func (p *prometheus) Serve() { + if err := p.storage.Start(); err != nil { + glog.Error("Error opening memory series storage: ", err) + os.Exit(1) + } for _, q := range p.remoteStorageQueues { go q.Run() } + go p.ruleManager.Run() go p.notificationHandler.Run() go p.targetManager.Run() - p.storage.Start() + registry.MustRegister(p) go func() { err := p.webService.ServeForever(*pathPrefix) @@ -387,6 +392,5 @@ func main() { } p := NewPrometheus() - registry.MustRegister(p) p.Serve() } diff --git a/storage/local/interface.go b/storage/local/interface.go index 34ed30c7b..60df2d2f7 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -48,7 +48,7 @@ type Storage interface { // Run the various maintenance loops in goroutines. Returns when the // storage is ready to use. Keeps everything running in the background // until Stop is called. - Start() + Start() error // Stop shuts down the Storage gracefully, flushes all pending // operations, stops all maintenance loops,and frees all resources. Stop() error diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 7b2a34cec..96d0adf08 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -268,10 +268,13 @@ func newPersistence(basePath string, dirty, pedanticChecks bool, shouldSync sync p.labelPairToFingerprints = labelPairToFingerprints p.labelNameToLabelValues = labelNameToLabelValues - go p.processIndexingQueue() return p, nil } +func (p *persistence) run() { + p.processIndexingQueue() +} + // Describe implements prometheus.Collector. func (p *persistence) Describe(ch chan<- *prometheus.Desc) { ch <- p.indexingQueueLength.Desc() diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 20bfe6216..00e36b792 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -42,6 +42,7 @@ func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, tes dir.Close() t.Fatal(err) } + go p.run() return p, test.NewCallbackCloser(func() { p.close() dir.Close() diff --git a/storage/local/storage.go b/storage/local/storage.go index c6db45b2b..56371ca77 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -82,6 +82,8 @@ type memorySeriesStorage struct { fpLocker *fingerprintLocker fpToSeries *seriesMap + options *MemorySeriesStorageOptions + loopStopping, loopStopped chan struct{} maxMemoryChunks int dropAfter time.Duration @@ -124,10 +126,12 @@ type MemorySeriesStorageOptions struct { // NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still // has to be called to start the storage. -func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { +func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { s := &memorySeriesStorage{ fpLocker: newFingerprintLocker(1024), + options: o, + loopStopping: make(chan struct{}), loopStopped: make(chan struct{}), maxMemoryChunks: o.MemoryChunks, @@ -185,9 +189,13 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { []string{seriesLocationLabel}, ), } + return s +} +// Start implements Storage. +func (s *memorySeriesStorage) Start() error { var syncStrategy syncStrategy - switch o.SyncStrategy { + switch s.options.SyncStrategy { case Never: syncStrategy = func() bool { return false } case Always: @@ -198,33 +206,32 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { panic("unknown sync strategy") } - p, err := newPersistence(o.PersistenceStoragePath, o.Dirty, o.PedanticChecks, syncStrategy) + p, err := newPersistence(s.options.PersistenceStoragePath, s.options.Dirty, s.options.PedanticChecks, syncStrategy) if err != nil { - return nil, err + return err } s.persistence = p glog.Info("Loading series map and head chunks...") s.fpToSeries, s.numChunksToPersist, err = p.loadSeriesMapAndHeads() if err != nil { - return nil, err + return err } glog.Infof("%d series loaded.", s.fpToSeries.length()) s.numSeries.Set(float64(s.fpToSeries.length())) mapper, err := newFPMapper(s.fpToSeries, p) if err != nil { - return nil, err + return err } s.mapper = mapper - return s, nil -} + go s.persistence.run() -// Start implements Storage. -func (s *memorySeriesStorage) Start() { go s.handleEvictList() go s.loop() + + return nil } // Stop implements Storage. diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 8ae4d3507..16ec17b01 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -163,9 +163,9 @@ func TestLoop(t *testing.T) { CheckpointInterval: 250 * time.Millisecond, SyncStrategy: Adaptive, } - storage, err := NewMemorySeriesStorage(o) - if err != nil { - t.Fatalf("Error creating storage: %s", err) + storage := NewMemorySeriesStorage(o) + if err := storage.Start; err != nil { + t.Fatalf("Error starting storage: %s", err) } storage.Start() for _, s := range samples { @@ -731,9 +731,9 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) { CheckpointInterval: time.Second, SyncStrategy: Adaptive, } - s, err := NewMemorySeriesStorage(o) - if err != nil { - b.Fatalf("Error creating storage: %s", err) + s := NewMemorySeriesStorage(o) + if err := s.Start(); err != nil { + b.Fatalf("Error starting storage: %s", err) } s.Start() defer s.Stop() diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index a645bd1f5..21f6aa008 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -48,14 +48,12 @@ func NewTestStorage(t test.T, encoding chunkEncoding) (*memorySeriesStorage, tes CheckpointInterval: time.Hour, SyncStrategy: Adaptive, } - storage, err := NewMemorySeriesStorage(o) - if err != nil { + storage := NewMemorySeriesStorage(o) + if err := storage.Start(); err != nil { directory.Close() t.Fatalf("Error creating storage: %s", err) } - storage.Start() - closer := &testStorageCloser{ storage: storage, directory: directory,