2013-02-08 17:03:26 +00:00
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"fmt"
2013-03-01 17:51:36 +00:00
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/coding/indexable"
2013-02-08 17:03:26 +00:00
"github.com/prometheus/prometheus/model"
2013-03-01 17:51:36 +00:00
dto "github.com/prometheus/prometheus/model/generated"
2013-02-08 17:03:26 +00:00
"github.com/prometheus/prometheus/storage"
2013-03-25 09:24:59 +00:00
"github.com/prometheus/prometheus/storage/raw/leveldb"
2013-04-29 09:17:56 +00:00
"log"
2013-03-07 01:16:39 +00:00
"sort"
2013-02-08 17:03:26 +00:00
"sync"
"time"
)
2013-05-08 18:39:59 +00:00
type chunk model . Values
// TruncateBefore returns a subslice of the original such that extraneous
// samples in the collection that occur before the provided time are
// dropped. The original slice is not mutated. It works with the assumption
// that consumers of these values could want preceding values if none would
// exist prior to the defined time.
func ( c chunk ) TruncateBefore ( t time . Time ) chunk {
index := sort . Search ( len ( c ) , func ( i int ) bool {
timestamp := c [ i ] . Timestamp
return ! timestamp . Before ( t )
} )
switch index {
case 0 :
return c
case len ( c ) :
return c [ len ( c ) - 1 : ]
default :
return c [ index - 1 : ]
}
}
2013-05-02 16:27:12 +00:00
// TieredStorage both persists samples and generates materialized views for
2013-02-08 17:03:26 +00:00
// queries.
2013-05-02 16:27:12 +00:00
type TieredStorage struct {
2013-05-07 08:18:19 +00:00
// BUG(matt): This introduces a Law of Demeter violation. Ugh.
DiskStorage * LevelDBMetricPersistence
2013-05-14 09:21:27 +00:00
appendToDiskQueue chan model . Samples
diskFrontier * diskFrontier
2013-05-16 14:02:07 +00:00
memoryArena * memorySeriesStorage
2013-02-08 17:03:26 +00:00
memoryTTL time . Duration
2013-05-14 09:21:27 +00:00
flushMemoryInterval time . Duration
2013-05-08 13:30:27 +00:00
// This mutex manages any concurrent reads/writes of the memoryArena.
memoryMutex sync . RWMutex
// This mutex blocks only deletions from the memoryArena. It is held for a
// potentially long time for an entire renderView() duration, since we depend
// on no samples being removed from memory after grabbing a LevelDB snapshot.
2013-05-14 09:21:27 +00:00
memoryDeleteMutex sync . RWMutex
viewQueue chan viewJob
draining chan chan bool
mutex sync . Mutex
2013-02-08 17:03:26 +00:00
}
// viewJob encapsulates a request to extract sample values from the datastore.
type viewJob struct {
builder ViewRequestBuilder
output chan View
2013-03-26 16:15:04 +00:00
abort chan bool
2013-02-08 17:03:26 +00:00
err chan error
}
2013-05-14 14:17:49 +00:00
func NewTieredStorage ( appendToDiskQueueDepth , viewQueueDepth uint , flushMemoryInterval , memoryTTL time . Duration , root string ) ( storage * TieredStorage , err error ) {
2013-03-07 01:16:39 +00:00
diskStorage , err := NewLevelDBMetricPersistence ( root )
2013-02-08 17:03:26 +00:00
if err != nil {
2013-03-27 10:25:05 +00:00
return
2013-02-08 17:03:26 +00:00
}
2013-05-02 16:27:12 +00:00
storage = & TieredStorage {
2013-04-29 14:55:18 +00:00
appendToDiskQueue : make ( chan model . Samples , appendToDiskQueueDepth ) ,
2013-05-07 08:18:19 +00:00
DiskStorage : diskStorage ,
2013-03-21 16:53:57 +00:00
draining : make ( chan chan bool ) ,
2013-02-08 17:03:26 +00:00
flushMemoryInterval : flushMemoryInterval ,
memoryArena : NewMemorySeriesStorage ( ) ,
memoryTTL : memoryTTL ,
viewQueue : make ( chan viewJob , viewQueueDepth ) ,
}
2013-03-27 10:25:05 +00:00
return
2013-02-08 17:03:26 +00:00
}
2013-05-02 16:27:12 +00:00
// Enqueues Samples for storage.
2013-05-08 13:30:27 +00:00
func ( t * TieredStorage ) AppendSamples ( samples model . Samples ) ( err error ) {
2013-03-01 17:51:36 +00:00
if len ( t . draining ) > 0 {
return fmt . Errorf ( "Storage is in the process of draining." )
}
2013-05-08 13:30:27 +00:00
t . memoryMutex . Lock ( )
t . memoryArena . AppendSamples ( samples )
t . memoryMutex . Unlock ( )
2013-03-01 17:51:36 +00:00
return
}
2013-05-02 16:27:12 +00:00
// Stops the storage subsystem, flushing all pending operations.
2013-05-07 13:12:33 +00:00
func ( t * TieredStorage ) Drain ( ) {
2013-04-29 09:17:56 +00:00
log . Println ( "Starting drain..." )
2013-03-21 16:53:57 +00:00
drainingDone := make ( chan bool )
2013-03-07 01:16:39 +00:00
if len ( t . draining ) == 0 {
2013-03-21 16:53:57 +00:00
t . draining <- drainingDone
2013-03-07 01:16:39 +00:00
}
2013-03-21 16:53:57 +00:00
<- drainingDone
2013-04-29 09:17:56 +00:00
log . Println ( "Done." )
2013-02-08 17:03:26 +00:00
}
2013-05-07 13:12:33 +00:00
// Enqueues a ViewRequestBuilder for materialization, subject to a timeout.
func ( t * TieredStorage ) MakeView ( builder ViewRequestBuilder , deadline time . Duration ) ( view View , err error ) {
2013-03-01 17:51:36 +00:00
if len ( t . draining ) > 0 {
err = fmt . Errorf ( "Storage is in the process of draining." )
return
}
2013-03-26 16:15:04 +00:00
// The result channel needs a one-element buffer in case we have timed out in
// MakeView, but the view rendering still completes afterwards and writes to
// the channel.
result := make ( chan View , 1 )
// The abort channel needs a one-element buffer in case the view rendering
// has already exited and doesn't consume from the channel anymore.
abortChan := make ( chan bool , 1 )
2013-02-08 17:03:26 +00:00
errChan := make ( chan error )
t . viewQueue <- viewJob {
builder : builder ,
output : result ,
2013-03-26 16:15:04 +00:00
abort : abortChan ,
2013-02-08 17:03:26 +00:00
err : errChan ,
}
select {
case value := <- result :
view = value
case err = <- errChan :
return
case <- time . After ( deadline ) :
2013-03-26 16:15:04 +00:00
abortChan <- true
2013-02-08 17:03:26 +00:00
err = fmt . Errorf ( "MakeView timed out after %s." , deadline )
}
return
}
2013-05-02 16:27:12 +00:00
func ( t * TieredStorage ) rebuildDiskFrontier ( i leveldb . Iterator ) ( err error ) {
2013-03-01 17:51:36 +00:00
begin := time . Now ( )
defer func ( ) {
2013-03-11 21:21:25 +00:00
duration := time . Since ( begin )
2013-02-08 17:03:26 +00:00
2013-03-01 17:51:36 +00:00
recordOutcome ( duration , err , map [ string ] string { operation : appendSample , result : success } , map [ string ] string { operation : rebuildDiskFrontier , result : failure } )
} ( )
2013-03-25 09:24:59 +00:00
2013-03-01 17:51:36 +00:00
t . diskFrontier , err = newDiskFrontier ( i )
if err != nil {
2013-04-29 09:17:56 +00:00
return
2013-02-08 17:03:26 +00:00
}
2013-03-01 17:51:36 +00:00
return
2013-02-08 17:03:26 +00:00
}
2013-05-02 16:27:12 +00:00
// Starts serving requests.
2013-05-07 13:12:33 +00:00
func ( t * TieredStorage ) Serve ( ) {
2013-04-29 09:17:56 +00:00
flushMemoryTicker := time . NewTicker ( t . flushMemoryInterval )
defer flushMemoryTicker . Stop ( )
2013-05-14 09:21:27 +00:00
queueReportTicker := time . NewTicker ( time . Second )
defer queueReportTicker . Stop ( )
2013-03-01 17:51:36 +00:00
2013-04-16 15:13:29 +00:00
go func ( ) {
2013-05-14 09:21:27 +00:00
for _ = range queueReportTicker . C {
2013-04-25 11:04:45 +00:00
t . reportQueues ( )
2013-04-16 15:13:29 +00:00
}
} ( )
for {
2013-02-08 17:03:26 +00:00
select {
2013-04-29 09:17:56 +00:00
case <- flushMemoryTicker . C :
2013-02-08 17:03:26 +00:00
t . flushMemory ( )
case viewRequest := <- t . viewQueue :
t . renderView ( viewRequest )
2013-03-21 16:53:57 +00:00
case drainingDone := <- t . draining :
2013-05-08 13:30:27 +00:00
t . Flush ( )
2013-03-21 16:53:57 +00:00
drainingDone <- true
2013-04-15 10:45:45 +00:00
return
2013-02-08 17:03:26 +00:00
}
}
}
2013-05-07 13:12:33 +00:00
func ( t * TieredStorage ) reportQueues ( ) {
2013-03-07 01:16:39 +00:00
queueSizes . Set ( map [ string ] string { "queue" : "append_to_disk" , "facet" : "occupancy" } , float64 ( len ( t . appendToDiskQueue ) ) )
queueSizes . Set ( map [ string ] string { "queue" : "append_to_disk" , "facet" : "capacity" } , float64 ( cap ( t . appendToDiskQueue ) ) )
queueSizes . Set ( map [ string ] string { "queue" : "view_generation" , "facet" : "occupancy" } , float64 ( len ( t . viewQueue ) ) )
queueSizes . Set ( map [ string ] string { "queue" : "view_generation" , "facet" : "capacity" } , float64 ( cap ( t . viewQueue ) ) )
}
2013-05-07 13:12:33 +00:00
func ( t * TieredStorage ) Flush ( ) {
2013-05-08 13:30:27 +00:00
t . flushMemory ( )
2013-03-07 01:16:39 +00:00
}
2013-05-07 13:12:33 +00:00
func ( t * TieredStorage ) Close ( ) {
2013-04-29 09:17:56 +00:00
log . Println ( "Closing tiered storage..." )
2013-03-11 21:21:25 +00:00
t . Drain ( )
2013-05-07 08:18:19 +00:00
t . DiskStorage . Close ( )
2013-04-29 09:17:56 +00:00
t . memoryArena . Close ( )
close ( t . appendToDiskQueue )
close ( t . viewQueue )
log . Println ( "Done." )
2013-03-11 21:21:25 +00:00
}
2013-02-08 17:03:26 +00:00
type memoryToDiskFlusher struct {
2013-05-08 13:30:27 +00:00
toDiskQueue chan model . Samples
disk MetricPersistence
olderThan time . Time
valuesAccepted int
valuesRejected int
memoryDeleteMutex * sync . RWMutex
2013-02-08 17:03:26 +00:00
}
type memoryToDiskFlusherVisitor struct {
2013-05-08 13:30:27 +00:00
stream stream
flusher * memoryToDiskFlusher
memoryDeleteMutex * sync . RWMutex
2013-02-08 17:03:26 +00:00
}
func ( f memoryToDiskFlusherVisitor ) DecodeKey ( in interface { } ) ( out interface { } , err error ) {
out = time . Time ( in . ( skipListTime ) )
return
}
func ( f memoryToDiskFlusherVisitor ) DecodeValue ( in interface { } ) ( out interface { } , err error ) {
out = in . ( value ) . get ( )
return
}
func ( f memoryToDiskFlusherVisitor ) Filter ( key , value interface { } ) ( filterResult storage . FilterResult ) {
var (
recordTime = key . ( time . Time )
)
if recordTime . Before ( f . flusher . olderThan ) {
f . flusher . valuesAccepted ++
return storage . ACCEPT
}
f . flusher . valuesRejected ++
return storage . STOP
}
func ( f memoryToDiskFlusherVisitor ) Operate ( key , value interface { } ) ( err * storage . OperatorError ) {
var (
recordTime = key . ( time . Time )
recordValue = value . ( model . SampleValue )
)
if len ( f . flusher . toDiskQueue ) == cap ( f . flusher . toDiskQueue ) {
f . flusher . Flush ( )
}
2013-04-29 14:55:18 +00:00
f . flusher . toDiskQueue <- model . Samples {
model . Sample {
Metric : f . stream . metric ,
Timestamp : recordTime ,
Value : recordValue ,
} ,
2013-02-08 17:03:26 +00:00
}
2013-05-08 13:30:27 +00:00
f . memoryDeleteMutex . Lock ( )
2013-02-08 17:03:26 +00:00
f . stream . values . Delete ( skipListTime ( recordTime ) )
2013-05-08 13:30:27 +00:00
f . memoryDeleteMutex . Unlock ( )
2013-02-08 17:03:26 +00:00
return
}
func ( f * memoryToDiskFlusher ) ForStream ( stream stream ) ( decoder storage . RecordDecoder , filter storage . RecordFilter , operator storage . RecordOperator ) {
visitor := memoryToDiskFlusherVisitor {
2013-05-08 13:30:27 +00:00
stream : stream ,
flusher : f ,
memoryDeleteMutex : f . memoryDeleteMutex ,
2013-02-08 17:03:26 +00:00
}
return visitor , visitor , visitor
}
func ( f * memoryToDiskFlusher ) Flush ( ) {
length := len ( f . toDiskQueue )
samples := model . Samples { }
for i := 0 ; i < length ; i ++ {
2013-04-29 14:55:18 +00:00
samples = append ( samples , <- f . toDiskQueue ... )
2013-02-08 17:03:26 +00:00
}
f . disk . AppendSamples ( samples )
}
func ( f memoryToDiskFlusher ) Close ( ) {
f . Flush ( )
}
2013-05-08 13:30:27 +00:00
// Persist a whole bunch of samples from memory to the datastore.
2013-05-02 16:27:12 +00:00
func ( t * TieredStorage ) flushMemory ( ) {
2013-03-01 17:51:36 +00:00
begin := time . Now ( )
defer func ( ) {
2013-03-11 21:21:25 +00:00
duration := time . Since ( begin )
2013-03-01 17:51:36 +00:00
recordOutcome ( duration , nil , map [ string ] string { operation : appendSample , result : success } , map [ string ] string { operation : flushMemory , result : failure } )
} ( )
2013-02-08 17:03:26 +00:00
2013-05-08 13:30:27 +00:00
t . memoryMutex . RLock ( )
defer t . memoryMutex . RUnlock ( )
2013-02-08 17:03:26 +00:00
flusher := & memoryToDiskFlusher {
2013-05-08 13:30:27 +00:00
disk : t . DiskStorage ,
olderThan : time . Now ( ) . Add ( - 1 * t . memoryTTL ) ,
toDiskQueue : t . appendToDiskQueue ,
memoryDeleteMutex : & t . memoryDeleteMutex ,
2013-02-08 17:03:26 +00:00
}
defer flusher . Close ( )
t . memoryArena . ForEachSample ( flusher )
return
}
2013-05-07 13:12:33 +00:00
func ( t * TieredStorage ) renderView ( viewJob viewJob ) {
2013-03-16 08:30:31 +00:00
// Telemetry.
var err error
2013-03-01 17:51:36 +00:00
begin := time . Now ( )
defer func ( ) {
2013-03-11 21:21:25 +00:00
duration := time . Since ( begin )
2013-02-08 17:03:26 +00:00
2013-03-16 08:30:31 +00:00
recordOutcome ( duration , err , map [ string ] string { operation : renderView , result : success } , map [ string ] string { operation : renderView , result : failure } )
2013-03-01 17:51:36 +00:00
} ( )
2013-02-08 17:03:26 +00:00
2013-05-08 13:30:27 +00:00
// No samples may be deleted from memory while rendering a view.
t . memoryDeleteMutex . RLock ( )
defer t . memoryDeleteMutex . RUnlock ( )
2013-02-08 17:03:26 +00:00
2013-05-08 13:30:27 +00:00
scans := viewJob . builder . ScanJobs ( )
view := newView ( )
// Get a single iterator that will be used for all data extraction below.
iterator := t . DiskStorage . MetricSamples . NewIterator ( true )
2013-04-01 11:22:38 +00:00
defer iterator . Close ( )
2013-02-08 17:03:26 +00:00
2013-03-01 17:51:36 +00:00
// Rebuilding of the frontier should happen on a conditional basis if a
// (fingerprint, timestamp) tuple is outside of the current frontier.
2013-04-01 11:22:38 +00:00
err = t . rebuildDiskFrontier ( iterator )
2013-03-01 17:51:36 +00:00
if err != nil {
panic ( err )
}
2013-02-08 17:03:26 +00:00
2013-03-01 17:51:36 +00:00
for _ , scanJob := range scans {
2013-04-18 23:00:57 +00:00
var seriesFrontier * seriesFrontier = nil
if t . diskFrontier != nil {
seriesFrontier , err = newSeriesFrontier ( scanJob . fingerprint , * t . diskFrontier , iterator )
if err != nil {
panic ( err )
}
2013-03-16 08:30:31 +00:00
}
standingOps := scanJob . operations
for len ( standingOps ) > 0 {
2013-03-26 16:15:04 +00:00
// Abort the view rendering if the caller (MakeView) has timed out.
if len ( viewJob . abort ) > 0 {
return
}
2013-03-16 08:30:31 +00:00
// Load data value chunk(s) around the first standing op's current time.
2013-04-18 23:00:57 +00:00
targetTime := * standingOps [ 0 ] . CurrentTime ( )
2013-05-08 18:39:59 +00:00
currentChunk := chunk { }
2013-05-08 13:30:27 +00:00
t . memoryMutex . RLock ( )
2013-04-18 23:00:57 +00:00
memValues := t . memoryArena . GetValueAtTime ( scanJob . fingerprint , targetTime )
2013-05-08 13:30:27 +00:00
t . memoryMutex . RUnlock ( )
2013-04-18 23:00:57 +00:00
// If we aimed before the oldest value in memory, load more data from disk.
if ( len ( memValues ) == 0 || memValues . FirstTimeAfter ( targetTime ) ) && seriesFrontier != nil {
// XXX: For earnest performance gains analagous to the benchmarking we
// performed, chunk should only be reloaded if it no longer contains
// the values we're looking for.
//
// To better understand this, look at https://github.com/prometheus/prometheus/blob/benchmark/leveldb/iterator-seek-characteristics/leveldb.go#L239 and note the behavior around retrievedValue.
diskValues := t . loadChunkAroundTime ( iterator , seriesFrontier , scanJob . fingerprint , targetTime )
// If we aimed past the newest value on disk, combine it with the next value from memory.
if len ( memValues ) > 0 && diskValues . LastTimeBefore ( targetTime ) {
2013-05-07 11:22:29 +00:00
latestDiskValue := diskValues [ len ( diskValues ) - 1 : ]
2013-05-08 18:39:59 +00:00
currentChunk = append ( chunk ( latestDiskValue ) , chunk ( memValues ) ... )
2013-04-18 23:00:57 +00:00
} else {
2013-05-08 18:39:59 +00:00
currentChunk = chunk ( diskValues )
2013-04-18 23:00:57 +00:00
}
} else {
2013-05-08 18:39:59 +00:00
currentChunk = chunk ( memValues )
2013-04-18 23:00:57 +00:00
}
// There's no data at all for this fingerprint, so stop processing ops for it.
2013-05-08 18:39:59 +00:00
if len ( currentChunk ) == 0 {
2013-04-18 23:00:57 +00:00
break
}
2013-05-08 18:39:59 +00:00
currentChunk = currentChunk . TruncateBefore ( targetTime )
2013-04-24 10:42:58 +00:00
2013-05-08 18:39:59 +00:00
lastChunkTime := currentChunk [ len ( currentChunk ) - 1 ] . Timestamp
2013-04-18 23:00:57 +00:00
if lastChunkTime . After ( targetTime ) {
targetTime = lastChunkTime
2013-03-01 17:51:36 +00:00
}
2013-02-08 17:03:26 +00:00
2013-03-16 08:30:31 +00:00
// For each op, extract all needed data from the current chunk.
2013-04-18 23:00:57 +00:00
out := model . Values { }
2013-03-16 08:30:31 +00:00
for _ , op := range standingOps {
2013-04-18 23:00:57 +00:00
if op . CurrentTime ( ) . After ( targetTime ) {
2013-03-16 08:30:31 +00:00
break
}
2013-04-24 09:02:51 +00:00
2013-05-08 18:39:59 +00:00
currentChunk = currentChunk . TruncateBefore ( * ( op . CurrentTime ( ) ) )
2013-04-24 09:02:51 +00:00
2013-04-18 23:00:57 +00:00
for op . CurrentTime ( ) != nil && ! op . CurrentTime ( ) . After ( targetTime ) {
2013-05-08 18:39:59 +00:00
out = op . ExtractSamples ( model . Values ( currentChunk ) )
2013-02-08 17:03:26 +00:00
}
}
2013-03-16 08:30:31 +00:00
// Append the extracted samples to the materialized view.
for _ , sample := range out {
view . appendSample ( scanJob . fingerprint , sample . Timestamp , sample . Value )
}
// Throw away standing ops which are finished.
filteredOps := ops { }
for _ , op := range standingOps {
if op . CurrentTime ( ) != nil {
filteredOps = append ( filteredOps , op )
}
}
standingOps = filteredOps
// Sort ops by start time again, since they might be slightly off now.
// For example, consider a current chunk of values and two interval ops
2013-03-19 13:25:38 +00:00
// with different interval lengths. Their states after the cycle above
2013-03-16 08:30:31 +00:00
// could be:
//
// (C = current op time)
//
// Chunk: [ X X X X X ]
// Op 1: [ X X C . . . ]
// Op 2: [ X X C . . .]
//
// Op 2 now has an earlier current time than Op 1.
2013-03-16 08:41:43 +00:00
sort . Sort ( startsAtSort { standingOps } )
2013-02-08 17:03:26 +00:00
}
2013-03-16 08:30:31 +00:00
}
viewJob . output <- view
return
}
2013-02-08 17:03:26 +00:00
2013-05-07 13:12:33 +00:00
func ( t * TieredStorage ) loadChunkAroundTime ( iterator leveldb . Iterator , frontier * seriesFrontier , fingerprint model . Fingerprint , ts time . Time ) ( chunk model . Values ) {
2013-03-16 08:30:31 +00:00
var (
targetKey = & dto . SampleKey {
Fingerprint : fingerprint . ToDTO ( ) ,
}
2013-04-22 11:30:16 +00:00
foundKey model . SampleKey
foundValues model . Values
2013-03-16 08:30:31 +00:00
)
// Limit the target key to be within the series' keyspace.
if ts . After ( frontier . lastSupertime ) {
targetKey . Timestamp = indexable . EncodeTime ( frontier . lastSupertime )
} else {
targetKey . Timestamp = indexable . EncodeTime ( ts )
2013-02-08 17:03:26 +00:00
}
2013-03-01 17:51:36 +00:00
2013-03-16 08:30:31 +00:00
// Try seeking to target key.
2013-04-05 11:07:13 +00:00
rawKey , _ := coding . NewProtocolBuffer ( targetKey ) . Encode ( )
2013-03-16 08:30:31 +00:00
iterator . Seek ( rawKey )
2013-03-01 17:51:36 +00:00
2013-03-16 08:30:31 +00:00
foundKey , err := extractSampleKey ( iterator )
if err != nil {
panic ( err )
}
// Figure out if we need to rewind by one block.
// Imagine the following supertime blocks with time ranges:
//
// Block 1: ft 1000 - lt 1009 <data>
// Block 1: ft 1010 - lt 1019 <data>
//
// If we are aiming to find time 1005, we would first seek to the block with
// supertime 1010, then need to rewind by one block by virtue of LevelDB
// iterator seek behavior.
//
// Only do the rewind if there is another chunk before this one.
rewound := false
2013-04-21 17:16:15 +00:00
firstTime := foundKey . FirstTimestamp
2013-03-16 08:30:31 +00:00
if ts . Before ( firstTime ) && ! frontier . firstSupertime . After ( ts ) {
2013-03-25 09:24:59 +00:00
iterator . Previous ( )
2013-03-16 08:30:31 +00:00
rewound = true
}
2013-04-22 11:30:16 +00:00
foundValues , err = extractSampleValues ( iterator )
2013-03-16 08:30:31 +00:00
if err != nil {
2013-04-22 11:30:16 +00:00
return
2013-03-16 08:30:31 +00:00
}
// If we rewound, but the target time is still past the current block, return
// the last value of the current (rewound) block and the entire next block.
if rewound {
foundKey , err = extractSampleKey ( iterator )
if err != nil {
2013-04-22 11:30:16 +00:00
return
2013-03-16 08:30:31 +00:00
}
2013-04-21 17:16:15 +00:00
currentChunkLastTime := foundKey . LastTimestamp
2013-03-16 08:30:31 +00:00
if ts . After ( currentChunkLastTime ) {
2013-04-22 11:30:16 +00:00
sampleCount := len ( foundValues )
chunk = append ( chunk , foundValues [ sampleCount - 1 ] )
2013-03-16 08:30:31 +00:00
// We know there's a next block since we have rewound from it.
iterator . Next ( )
2013-04-22 11:30:16 +00:00
foundValues , err = extractSampleValues ( iterator )
2013-03-16 08:30:31 +00:00
if err != nil {
2013-04-22 11:30:16 +00:00
return
2013-03-16 08:30:31 +00:00
}
}
}
// Now append all the samples of the currently seeked block to the output.
2013-04-22 11:30:16 +00:00
chunk = append ( chunk , foundValues ... )
2013-03-07 01:16:39 +00:00
2013-03-01 17:51:36 +00:00
return
2013-02-08 17:03:26 +00:00
}
2013-03-21 16:59:42 +00:00
2013-05-02 16:27:12 +00:00
// Get all label values that are associated with the provided label name.
2013-05-07 13:12:33 +00:00
func ( t * TieredStorage ) GetAllValuesForLabel ( labelName model . LabelName ) ( values model . LabelValues , err error ) {
2013-05-07 08:18:19 +00:00
diskValues , err := t . DiskStorage . GetAllValuesForLabel ( labelName )
2013-03-25 12:04:47 +00:00
if err != nil {
return
}
2013-03-26 10:45:56 +00:00
memoryValues , err := t . memoryArena . GetAllValuesForLabel ( labelName )
2013-03-25 12:04:47 +00:00
if err != nil {
return
}
2013-03-26 10:45:56 +00:00
valueSet := map [ model . LabelValue ] bool { }
for _ , value := range append ( diskValues , memoryValues ... ) {
2013-03-26 13:46:02 +00:00
if ! valueSet [ value ] {
values = append ( values , value )
valueSet [ value ] = true
}
2013-03-25 12:04:47 +00:00
}
return
2013-03-21 16:59:42 +00:00
}
2013-05-02 16:27:12 +00:00
// Get all of the metric fingerprints that are associated with the provided
// label set.
2013-05-07 13:12:33 +00:00
func ( t * TieredStorage ) GetFingerprintsForLabelSet ( labelSet model . LabelSet ) ( fingerprints model . Fingerprints , err error ) {
2013-03-25 12:04:47 +00:00
memFingerprints , err := t . memoryArena . GetFingerprintsForLabelSet ( labelSet )
if err != nil {
return
}
2013-05-07 08:18:19 +00:00
diskFingerprints , err := t . DiskStorage . GetFingerprintsForLabelSet ( labelSet )
2013-03-25 12:04:47 +00:00
if err != nil {
return
}
fingerprintSet := map [ model . Fingerprint ] bool { }
for _ , fingerprint := range append ( memFingerprints , diskFingerprints ... ) {
fingerprintSet [ fingerprint ] = true
}
for fingerprint := range fingerprintSet {
fingerprints = append ( fingerprints , fingerprint )
}
return
2013-03-21 16:59:42 +00:00
}
2013-05-02 16:27:12 +00:00
// Get the metric associated with the provided fingerprint.
2013-05-14 14:25:06 +00:00
func ( t * TieredStorage ) GetMetricForFingerprint ( f model . Fingerprint ) ( m model . Metric , err error ) {
2013-03-25 12:04:47 +00:00
m , err = t . memoryArena . GetMetricForFingerprint ( f )
if err != nil {
return
}
if m == nil {
2013-05-07 08:18:19 +00:00
m , err = t . DiskStorage . GetMetricForFingerprint ( f )
2013-03-25 12:04:47 +00:00
}
return
2013-03-21 16:59:42 +00:00
}