diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index a856e721c..2dcba2284 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -53,9 +53,20 @@ func (c chunk) TruncateBefore(t time.Time) chunk { } } +type tieredStorageState uint + +const ( + tieredStorageStarting tieredStorageState = iota + tieredStorageServing + tieredStorageDraining + tieredStorageStopping +) + // TieredStorage both persists samples and generates materialized views for // queries. type TieredStorage struct { + mu sync.RWMutex + // BUG(matt): This introduces a Law of Demeter violation. Ugh. DiskStorage *LevelDBMetricPersistence @@ -67,9 +78,9 @@ type TieredStorage struct { viewQueue chan viewJob - draining chan chan bool + draining chan chan<- bool - mutex sync.Mutex + state tieredStorageState } // viewJob encapsulates a request to extract sample values from the datastore. @@ -90,7 +101,7 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn storage = &TieredStorage{ appendToDiskQueue: make(chan model.Samples, appendToDiskQueueDepth), DiskStorage: diskStorage, - draining: make(chan chan bool), + draining: make(chan chan<- bool), flushMemoryInterval: flushMemoryInterval, memoryArena: NewMemorySeriesStorage(), memoryTTL: memoryTTL, @@ -101,8 +112,10 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn // Enqueues Samples for storage. func (t *TieredStorage) AppendSamples(samples model.Samples) (err error) { - if len(t.draining) > 0 { - return fmt.Errorf("Storage is in the process of draining.") + t.mu.RLock() + defer t.mu.RUnlock() + if t.state != tieredStorageServing { + return fmt.Errorf("Storage is not serving.") } t.memoryArena.AppendSamples(samples) @@ -111,20 +124,32 @@ func (t *TieredStorage) AppendSamples(samples model.Samples) (err error) { } // Stops the storage subsystem, flushing all pending operations. -func (t *TieredStorage) Drain() { - log.Println("Starting drain...") - drainingDone := make(chan bool) - if len(t.draining) == 0 { - t.draining <- drainingDone +func (t *TieredStorage) Drain(drained chan<- bool) { + t.mu.Lock() + defer t.mu.Unlock() + + t.drain(drained) +} + +func (t *TieredStorage) drain(drained chan<- bool) { + if t.state >= tieredStorageDraining { + drained <- true + return + } + + select { + case t.draining <- (drained): + log.Println("Triggering drain...") + default: } - <-drainingDone - log.Println("Done.") } // Enqueues a ViewRequestBuilder for materialization, subject to a timeout. func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration, queryStats *stats.TimerGroup) (View, error) { - if len(t.draining) > 0 { - return nil, fmt.Errorf("Storage is in the process of draining.") + t.mu.RLock() + defer t.mu.RUnlock() + if t.state != tieredStorageServing { + return nil, fmt.Errorf("Storage is not serving") } // The result channel needs a one-element buffer in case we have timed out in @@ -157,6 +182,15 @@ func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat // Starts serving requests. func (t *TieredStorage) Serve(started chan<- bool) { + t.mu.Lock() + if t.state != tieredStorageStarting { + panic("Illegal State: Attempted to restart TieredStorage.") + return + } + + t.state = tieredStorageServing + t.mu.Unlock() + flushMemoryTicker := time.NewTicker(t.flushMemoryInterval) defer flushMemoryTicker.Stop() queueReportTicker := time.NewTicker(time.Second) @@ -247,14 +281,25 @@ func (t *TieredStorage) flushMemory(ttl time.Duration) { } func (t *TieredStorage) Close() { - log.Println("Closing tiered storage...") - t.Drain() - t.DiskStorage.Close() - t.memoryArena.Close() + t.mu.Lock() + defer t.mu.Unlock() + if t.state == tieredStorageStopping { + panic("Illegal State: Attempted to restop TieredStorage.") + } + + drained := make(chan bool) + t.drain(drained) + <-drained + + t.memoryArena.Close() + t.DiskStorage.Close() + // BUG(matt): There is a probability that pending items may hang here and not + // get flushed. close(t.appendToDiskQueue) close(t.viewQueue) - log.Println("Done.") + + t.state = tieredStorageStopping } func (t *TieredStorage) renderView(viewJob viewJob) { @@ -478,17 +523,25 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier } // Get all label values that are associated with the provided label name. -func (t *TieredStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) { +func (t *TieredStorage) GetAllValuesForLabel(labelName model.LabelName) (model.LabelValues, error) { + t.mu.RLock() + defer t.mu.RUnlock() + + if t.state != tieredStorageServing { + panic("Illegal State: Attempted to query non-running TieredStorage.") + } + diskValues, err := t.DiskStorage.GetAllValuesForLabel(labelName) if err != nil { - return + return nil, err } memoryValues, err := t.memoryArena.GetAllValuesForLabel(labelName) if err != nil { - return + return nil, err } valueSet := map[model.LabelValue]bool{} + values := model.LabelValues{} for _, value := range append(diskValues, memoryValues...) { if !valueSet[value] { values = append(values, value) @@ -496,40 +549,55 @@ func (t *TieredStorage) GetAllValuesForLabel(labelName model.LabelName) (values } } - return + return values, nil } // Get all of the metric fingerprints that are associated with the provided // label set. -func (t *TieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fingerprints model.Fingerprints, err error) { +func (t *TieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (model.Fingerprints, error) { + t.mu.RLock() + defer t.mu.RUnlock() + + if t.state != tieredStorageServing { + panic("Illegal State: Attempted to query non-running TieredStorage.") + } + memFingerprints, err := t.memoryArena.GetFingerprintsForLabelSet(labelSet) if err != nil { - return + return nil, err } diskFingerprints, err := t.DiskStorage.GetFingerprintsForLabelSet(labelSet) if err != nil { - return + return nil, err } fingerprintSet := map[model.Fingerprint]bool{} for _, fingerprint := range append(memFingerprints, diskFingerprints...) { fingerprintSet[*fingerprint] = true } + fingerprints := model.Fingerprints{} for fingerprint := range fingerprintSet { fpCopy := fingerprint fingerprints = append(fingerprints, &fpCopy) } - return + return fingerprints, nil } // Get the metric associated with the provided fingerprint. -func (t *TieredStorage) GetMetricForFingerprint(f *model.Fingerprint) (m model.Metric, err error) { - m, err = t.memoryArena.GetMetricForFingerprint(f) +func (t *TieredStorage) GetMetricForFingerprint(f *model.Fingerprint) (model.Metric, error) { + t.mu.RLock() + defer t.mu.RUnlock() + + if t.state != tieredStorageServing { + panic("Illegal State: Attempted to query non-running TieredStorage.") + } + + m, err := t.memoryArena.GetMetricForFingerprint(f) if err != nil { - return + return nil, err } if m == nil { m, err = t.DiskStorage.GetMetricForFingerprint(f) } - return + return m, err }