Add storage state guards and transition callbacks.
To ensure that we access tiered storage in the proper way, we have guards now.
This commit is contained in:
parent
abb5353ade
commit
beaaf386e7
|
@ -53,9 +53,20 @@ func (c chunk) TruncateBefore(t time.Time) chunk {
|
|||
}
|
||||
}
|
||||
|
||||
type tieredStorageState uint
|
||||
|
||||
const (
|
||||
tieredStorageStarting tieredStorageState = iota
|
||||
tieredStorageServing
|
||||
tieredStorageDraining
|
||||
tieredStorageStopping
|
||||
)
|
||||
|
||||
// TieredStorage both persists samples and generates materialized views for
|
||||
// queries.
|
||||
type TieredStorage struct {
|
||||
mu sync.RWMutex
|
||||
|
||||
// BUG(matt): This introduces a Law of Demeter violation. Ugh.
|
||||
DiskStorage *LevelDBMetricPersistence
|
||||
|
||||
|
@ -67,9 +78,9 @@ type TieredStorage struct {
|
|||
|
||||
viewQueue chan viewJob
|
||||
|
||||
draining chan chan bool
|
||||
draining chan chan<- bool
|
||||
|
||||
mutex sync.Mutex
|
||||
state tieredStorageState
|
||||
}
|
||||
|
||||
// viewJob encapsulates a request to extract sample values from the datastore.
|
||||
|
@ -90,7 +101,7 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn
|
|||
storage = &TieredStorage{
|
||||
appendToDiskQueue: make(chan model.Samples, appendToDiskQueueDepth),
|
||||
DiskStorage: diskStorage,
|
||||
draining: make(chan chan bool),
|
||||
draining: make(chan chan<- bool),
|
||||
flushMemoryInterval: flushMemoryInterval,
|
||||
memoryArena: NewMemorySeriesStorage(),
|
||||
memoryTTL: memoryTTL,
|
||||
|
@ -101,8 +112,10 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn
|
|||
|
||||
// Enqueues Samples for storage.
|
||||
func (t *TieredStorage) AppendSamples(samples model.Samples) (err error) {
|
||||
if len(t.draining) > 0 {
|
||||
return fmt.Errorf("Storage is in the process of draining.")
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
if t.state != tieredStorageServing {
|
||||
return fmt.Errorf("Storage is not serving.")
|
||||
}
|
||||
|
||||
t.memoryArena.AppendSamples(samples)
|
||||
|
@ -111,20 +124,32 @@ func (t *TieredStorage) AppendSamples(samples model.Samples) (err error) {
|
|||
}
|
||||
|
||||
// Stops the storage subsystem, flushing all pending operations.
|
||||
func (t *TieredStorage) Drain() {
|
||||
log.Println("Starting drain...")
|
||||
drainingDone := make(chan bool)
|
||||
if len(t.draining) == 0 {
|
||||
t.draining <- drainingDone
|
||||
func (t *TieredStorage) Drain(drained chan<- bool) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
t.drain(drained)
|
||||
}
|
||||
|
||||
func (t *TieredStorage) drain(drained chan<- bool) {
|
||||
if t.state >= tieredStorageDraining {
|
||||
drained <- true
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case t.draining <- (drained):
|
||||
log.Println("Triggering drain...")
|
||||
default:
|
||||
}
|
||||
<-drainingDone
|
||||
log.Println("Done.")
|
||||
}
|
||||
|
||||
// Enqueues a ViewRequestBuilder for materialization, subject to a timeout.
|
||||
func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration, queryStats *stats.TimerGroup) (View, error) {
|
||||
if len(t.draining) > 0 {
|
||||
return nil, fmt.Errorf("Storage is in the process of draining.")
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
if t.state != tieredStorageServing {
|
||||
return nil, fmt.Errorf("Storage is not serving")
|
||||
}
|
||||
|
||||
// The result channel needs a one-element buffer in case we have timed out in
|
||||
|
@ -157,6 +182,15 @@ func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat
|
|||
|
||||
// Starts serving requests.
|
||||
func (t *TieredStorage) Serve(started chan<- bool) {
|
||||
t.mu.Lock()
|
||||
if t.state != tieredStorageStarting {
|
||||
panic("Illegal State: Attempted to restart TieredStorage.")
|
||||
return
|
||||
}
|
||||
|
||||
t.state = tieredStorageServing
|
||||
t.mu.Unlock()
|
||||
|
||||
flushMemoryTicker := time.NewTicker(t.flushMemoryInterval)
|
||||
defer flushMemoryTicker.Stop()
|
||||
queueReportTicker := time.NewTicker(time.Second)
|
||||
|
@ -247,14 +281,25 @@ func (t *TieredStorage) flushMemory(ttl time.Duration) {
|
|||
}
|
||||
|
||||
func (t *TieredStorage) Close() {
|
||||
log.Println("Closing tiered storage...")
|
||||
t.Drain()
|
||||
t.DiskStorage.Close()
|
||||
t.memoryArena.Close()
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
if t.state == tieredStorageStopping {
|
||||
panic("Illegal State: Attempted to restop TieredStorage.")
|
||||
}
|
||||
|
||||
drained := make(chan bool)
|
||||
t.drain(drained)
|
||||
<-drained
|
||||
|
||||
t.memoryArena.Close()
|
||||
t.DiskStorage.Close()
|
||||
// BUG(matt): There is a probability that pending items may hang here and not
|
||||
// get flushed.
|
||||
close(t.appendToDiskQueue)
|
||||
close(t.viewQueue)
|
||||
log.Println("Done.")
|
||||
|
||||
t.state = tieredStorageStopping
|
||||
}
|
||||
|
||||
func (t *TieredStorage) renderView(viewJob viewJob) {
|
||||
|
@ -478,17 +523,25 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier
|
|||
}
|
||||
|
||||
// Get all label values that are associated with the provided label name.
|
||||
func (t *TieredStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) {
|
||||
func (t *TieredStorage) GetAllValuesForLabel(labelName model.LabelName) (model.LabelValues, error) {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
||||
if t.state != tieredStorageServing {
|
||||
panic("Illegal State: Attempted to query non-running TieredStorage.")
|
||||
}
|
||||
|
||||
diskValues, err := t.DiskStorage.GetAllValuesForLabel(labelName)
|
||||
if err != nil {
|
||||
return
|
||||
return nil, err
|
||||
}
|
||||
memoryValues, err := t.memoryArena.GetAllValuesForLabel(labelName)
|
||||
if err != nil {
|
||||
return
|
||||
return nil, err
|
||||
}
|
||||
|
||||
valueSet := map[model.LabelValue]bool{}
|
||||
values := model.LabelValues{}
|
||||
for _, value := range append(diskValues, memoryValues...) {
|
||||
if !valueSet[value] {
|
||||
values = append(values, value)
|
||||
|
@ -496,40 +549,55 @@ func (t *TieredStorage) GetAllValuesForLabel(labelName model.LabelName) (values
|
|||
}
|
||||
}
|
||||
|
||||
return
|
||||
return values, nil
|
||||
}
|
||||
|
||||
// Get all of the metric fingerprints that are associated with the provided
|
||||
// label set.
|
||||
func (t *TieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fingerprints model.Fingerprints, err error) {
|
||||
func (t *TieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (model.Fingerprints, error) {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
||||
if t.state != tieredStorageServing {
|
||||
panic("Illegal State: Attempted to query non-running TieredStorage.")
|
||||
}
|
||||
|
||||
memFingerprints, err := t.memoryArena.GetFingerprintsForLabelSet(labelSet)
|
||||
if err != nil {
|
||||
return
|
||||
return nil, err
|
||||
}
|
||||
diskFingerprints, err := t.DiskStorage.GetFingerprintsForLabelSet(labelSet)
|
||||
if err != nil {
|
||||
return
|
||||
return nil, err
|
||||
}
|
||||
fingerprintSet := map[model.Fingerprint]bool{}
|
||||
for _, fingerprint := range append(memFingerprints, diskFingerprints...) {
|
||||
fingerprintSet[*fingerprint] = true
|
||||
}
|
||||
fingerprints := model.Fingerprints{}
|
||||
for fingerprint := range fingerprintSet {
|
||||
fpCopy := fingerprint
|
||||
fingerprints = append(fingerprints, &fpCopy)
|
||||
}
|
||||
|
||||
return
|
||||
return fingerprints, nil
|
||||
}
|
||||
|
||||
// Get the metric associated with the provided fingerprint.
|
||||
func (t *TieredStorage) GetMetricForFingerprint(f *model.Fingerprint) (m model.Metric, err error) {
|
||||
m, err = t.memoryArena.GetMetricForFingerprint(f)
|
||||
func (t *TieredStorage) GetMetricForFingerprint(f *model.Fingerprint) (model.Metric, error) {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
||||
if t.state != tieredStorageServing {
|
||||
panic("Illegal State: Attempted to query non-running TieredStorage.")
|
||||
}
|
||||
|
||||
m, err := t.memoryArena.GetMetricForFingerprint(f)
|
||||
if err != nil {
|
||||
return
|
||||
return nil, err
|
||||
}
|
||||
if m == nil {
|
||||
m, err = t.DiskStorage.GetMetricForFingerprint(f)
|
||||
}
|
||||
return
|
||||
return m, err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue