diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index f1ee14500..7fe42651f 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -188,7 +188,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr func (l *LevelDBMetricPersistence) AppendSample(sample model.Sample) (err error) { begin := time.Now() defer func() { - duration := time.Now().Sub(begin) + duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: appendSample, result: failure}) }() @@ -201,7 +201,7 @@ func (l *LevelDBMetricPersistence) AppendSample(sample model.Sample) (err error) func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err error) { begin := time.Now() defer func() { - duration := time.Now().Sub(begin) + duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: appendSamples, result: success}, map[string]string{operation: appendSamples, result: failure}) }() @@ -303,7 +303,6 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err indexHas, err := l.hasIndexMetric(metricDTO) if err != nil { panic(err) - continue } if !indexHas { absentFingerprints[fingerprint] = samples @@ -570,7 +569,7 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool, begin := time.Now() defer func() { - duration := time.Now().Sub(begin) + duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure}) }() @@ -585,7 +584,7 @@ func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool, begin := time.Now() defer func() { - duration := time.Now().Sub(begin) + duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure}) }() @@ -600,7 +599,7 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool, begin := time.Now() defer func() { - duration := time.Now().Sub(begin) + duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure}) }() @@ -615,7 +614,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab begin := time.Now() defer func() { - duration := time.Now().Sub(begin) + duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelSet, result: success}, map[string]string{operation: getFingerprintsForLabelSet, result: failure}) }() @@ -665,7 +664,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L begin := time.Now() defer func() { - duration := time.Now().Sub(begin) + duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure}) }() @@ -694,7 +693,7 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint) begin := time.Now() defer func() { - duration := time.Now().Sub(begin) + duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure}) }() @@ -727,7 +726,7 @@ func (l *LevelDBMetricPersistence) GetBoundaryValues(m model.Metric, i model.Int begin := time.Now() defer func() { - duration := time.Now().Sub(begin) + duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: getBoundaryValues, result: success}, map[string]string{operation: getBoundaryValues, result: failure}) }() @@ -776,7 +775,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(m model.Metric, t time.Time, s begin := time.Now() defer func() { - duration := time.Now().Sub(begin) + duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: getValueAtTime, result: success}, map[string]string{operation: getValueAtTime, result: failure}) }() @@ -992,7 +991,7 @@ func (l *LevelDBMetricPersistence) GetRangeValues(m model.Metric, i model.Interv begin := time.Now() defer func() { - duration := time.Now().Sub(begin) + duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: getRangeValues, result: success}, map[string]string{operation: getRangeValues, result: failure}) }() diff --git a/storage/metric/memory.go b/storage/metric/memory.go index e04f7e9e2..051a3b873 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -69,11 +69,11 @@ func (s stream) forEach(decoder storage.RecordDecoder, filter storage.RecordFilt for !(iterator.Key() == nil || iterator.Value() == nil) { decodedKey, decodeErr := decoder.DecodeKey(iterator.Key()) if decodeErr != nil { - continue + panic(decodeErr) } decodedValue, decodeErr := decoder.DecodeValue(iterator.Value()) if decodeErr != nil { - continue + panic(decodeErr) } switch filter.Filter(decodedKey, decodedValue) { diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 0f1c394c8..92f99a775 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -60,6 +60,7 @@ type Storage interface { // Stops the storage subsystem, flushing all pending operations. Drain() Flush() + Close() } func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, writeMemoryInterval, memoryTTL time.Duration, root string) Storage { @@ -126,7 +127,7 @@ func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat func (t *tieredStorage) rebuildDiskFrontier() (err error) { begin := time.Now() defer func() { - duration := time.Now().Sub(begin) + duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: rebuildDiskFrontier, result: failure}) }() @@ -181,7 +182,7 @@ func (t *tieredStorage) reportQueues() { func (t *tieredStorage) writeMemory() { begin := time.Now() defer func() { - duration := time.Now().Sub(begin) + duration := time.Since(begin) recordOutcome(duration, nil, map[string]string{operation: appendSample, result: success}, map[string]string{operation: writeMemory, result: failure}) }() @@ -200,9 +201,14 @@ func (t *tieredStorage) Flush() { t.flush() } +func (t *tieredStorage) Close() { + t.Drain() + t.diskStorage.Close() +} + // Write all pending appends. func (t *tieredStorage) flush() (err error) { - // Trim and old values to reduce iterative write costs. + // Trim any old values to reduce iterative write costs. t.flushMemory() t.writeMemory() t.flushMemory() @@ -299,7 +305,7 @@ func (f memoryToDiskFlusher) Close() { func (t *tieredStorage) flushMemory() { begin := time.Now() defer func() { - duration := time.Now().Sub(begin) + duration := time.Since(begin) recordOutcome(duration, nil, map[string]string{operation: appendSample, result: success}, map[string]string{operation: flushMemory, result: failure}) }() @@ -322,7 +328,7 @@ func (t *tieredStorage) flushMemory() { func (t *tieredStorage) renderView(viewJob viewJob) (err error) { begin := time.Now() defer func() { - duration := time.Now().Sub(begin) + duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: renderView, result: failure}) }()