Repointerize TieredStorage method receiver types.
This commit is contained in:
parent
f1a634f580
commit
caab131ada
|
@ -74,7 +74,7 @@ func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueu
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enqueues Samples for storage.
|
// Enqueues Samples for storage.
|
||||||
func (t TieredStorage) AppendSamples(s model.Samples) (err error) {
|
func (t *TieredStorage) AppendSamples(s model.Samples) (err error) {
|
||||||
if len(t.draining) > 0 {
|
if len(t.draining) > 0 {
|
||||||
return fmt.Errorf("Storage is in the process of draining.")
|
return fmt.Errorf("Storage is in the process of draining.")
|
||||||
}
|
}
|
||||||
|
@ -85,7 +85,7 @@ func (t TieredStorage) AppendSamples(s model.Samples) (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stops the storage subsystem, flushing all pending operations.
|
// Stops the storage subsystem, flushing all pending operations.
|
||||||
func (t TieredStorage) Drain() {
|
func (t *TieredStorage) Drain() {
|
||||||
log.Println("Starting drain...")
|
log.Println("Starting drain...")
|
||||||
drainingDone := make(chan bool)
|
drainingDone := make(chan bool)
|
||||||
if len(t.draining) == 0 {
|
if len(t.draining) == 0 {
|
||||||
|
@ -95,8 +95,8 @@ func (t TieredStorage) Drain() {
|
||||||
log.Println("Done.")
|
log.Println("Done.")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enqueus a ViewRequestBuilder for materialization, subject to a timeout.
|
// Enqueues a ViewRequestBuilder for materialization, subject to a timeout.
|
||||||
func (t TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration) (view View, err error) {
|
func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration) (view View, err error) {
|
||||||
if len(t.draining) > 0 {
|
if len(t.draining) > 0 {
|
||||||
err = fmt.Errorf("Storage is in the process of draining.")
|
err = fmt.Errorf("Storage is in the process of draining.")
|
||||||
return
|
return
|
||||||
|
@ -146,7 +146,7 @@ func (t *TieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Starts serving requests.
|
// Starts serving requests.
|
||||||
func (t TieredStorage) Serve() {
|
func (t *TieredStorage) Serve() {
|
||||||
flushMemoryTicker := time.NewTicker(t.flushMemoryInterval)
|
flushMemoryTicker := time.NewTicker(t.flushMemoryInterval)
|
||||||
defer flushMemoryTicker.Stop()
|
defer flushMemoryTicker.Stop()
|
||||||
writeMemoryTicker := time.NewTicker(t.writeMemoryInterval)
|
writeMemoryTicker := time.NewTicker(t.writeMemoryInterval)
|
||||||
|
@ -176,7 +176,7 @@ func (t TieredStorage) Serve() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t TieredStorage) reportQueues() {
|
func (t *TieredStorage) reportQueues() {
|
||||||
queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "occupancy"}, float64(len(t.appendToDiskQueue)))
|
queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "occupancy"}, float64(len(t.appendToDiskQueue)))
|
||||||
queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "capacity"}, float64(cap(t.appendToDiskQueue)))
|
queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "capacity"}, float64(cap(t.appendToDiskQueue)))
|
||||||
|
|
||||||
|
@ -205,11 +205,11 @@ func (t *TieredStorage) writeMemory() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t TieredStorage) Flush() {
|
func (t *TieredStorage) Flush() {
|
||||||
t.flush()
|
t.flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t TieredStorage) Close() {
|
func (t *TieredStorage) Close() {
|
||||||
log.Println("Closing tiered storage...")
|
log.Println("Closing tiered storage...")
|
||||||
t.Drain()
|
t.Drain()
|
||||||
t.DiskStorage.Close()
|
t.DiskStorage.Close()
|
||||||
|
@ -222,7 +222,7 @@ func (t TieredStorage) Close() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write all pending appends.
|
// Write all pending appends.
|
||||||
func (t TieredStorage) flush() (err error) {
|
func (t *TieredStorage) flush() (err error) {
|
||||||
// Trim any old values to reduce iterative write costs.
|
// Trim any old values to reduce iterative write costs.
|
||||||
t.flushMemory()
|
t.flushMemory()
|
||||||
t.writeMemory()
|
t.writeMemory()
|
||||||
|
@ -336,7 +336,7 @@ func (t *TieredStorage) flushMemory() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t TieredStorage) renderView(viewJob viewJob) {
|
func (t *TieredStorage) renderView(viewJob viewJob) {
|
||||||
// Telemetry.
|
// Telemetry.
|
||||||
var err error
|
var err error
|
||||||
begin := time.Now()
|
begin := time.Now()
|
||||||
|
@ -465,7 +465,7 @@ func (t TieredStorage) renderView(viewJob viewJob) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier *seriesFrontier, fingerprint model.Fingerprint, ts time.Time) (chunk model.Values) {
|
func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier *seriesFrontier, fingerprint model.Fingerprint, ts time.Time) (chunk model.Values) {
|
||||||
var (
|
var (
|
||||||
targetKey = &dto.SampleKey{
|
targetKey = &dto.SampleKey{
|
||||||
Fingerprint: fingerprint.ToDTO(),
|
Fingerprint: fingerprint.ToDTO(),
|
||||||
|
@ -542,7 +542,7 @@ func (t TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier *
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get all label values that are associated with the provided label name.
|
// Get all label values that are associated with the provided label name.
|
||||||
func (t TieredStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) {
|
func (t *TieredStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) {
|
||||||
diskValues, err := t.DiskStorage.GetAllValuesForLabel(labelName)
|
diskValues, err := t.DiskStorage.GetAllValuesForLabel(labelName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
@ -565,7 +565,7 @@ func (t TieredStorage) GetAllValuesForLabel(labelName model.LabelName) (values m
|
||||||
|
|
||||||
// Get all of the metric fingerprints that are associated with the provided
|
// Get all of the metric fingerprints that are associated with the provided
|
||||||
// label set.
|
// label set.
|
||||||
func (t TieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fingerprints model.Fingerprints, err error) {
|
func (t *TieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fingerprints model.Fingerprints, err error) {
|
||||||
memFingerprints, err := t.memoryArena.GetFingerprintsForLabelSet(labelSet)
|
memFingerprints, err := t.memoryArena.GetFingerprintsForLabelSet(labelSet)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
@ -586,7 +586,7 @@ func (t TieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fing
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the metric associated with the provided fingerprint.
|
// Get the metric associated with the provided fingerprint.
|
||||||
func (t TieredStorage) GetMetricForFingerprint(f model.Fingerprint) (m *model.Metric, err error) {
|
func (t *TieredStorage) GetMetricForFingerprint(f model.Fingerprint) (m *model.Metric, err error) {
|
||||||
m, err = t.memoryArena.GetMetricForFingerprint(f)
|
m, err = t.memoryArena.GetMetricForFingerprint(f)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in New Issue