prometheus/storage/metric/tiered.go

714 lines
18 KiB
Go

// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"fmt"
"os"
"sort"
"sync"
"time"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility"
)
type chunk Values
// TruncateBefore returns a subslice of the original such that extraneous
// samples in the collection that occur before the provided time are
// dropped. The original slice is not mutated. It works with the assumption
// that consumers of these values could want preceding values if none would
// exist prior to the defined time.
func (c chunk) TruncateBefore(t clientmodel.Timestamp) chunk {
index := sort.Search(len(c), func(i int) bool {
timestamp := c[i].Timestamp
return !timestamp.Before(t)
})
switch index {
case 0:
return c
case len(c):
return c[len(c)-1:]
default:
return c[index-1:]
}
}
type tieredStorageState uint
const (
tieredStorageStarting tieredStorageState = iota
tieredStorageServing
tieredStorageDraining
tieredStorageStopping
)
// Ignore timeseries in queries that are more stale than this limit.
const stalenessLimit = time.Minute * 5
// TieredStorage both persists samples and generates materialized views for
// queries.
type TieredStorage struct {
// mu is purely used for state transitions.
mu sync.RWMutex
// BUG(matt): This introduces a Law of Demeter violation. Ugh.
DiskStorage *LevelDBMetricPersistence
appendToDiskQueue chan clientmodel.Samples
memoryArena *memorySeriesStorage
memoryTTL time.Duration
flushMemoryInterval time.Duration
ViewQueue chan viewJob
draining chan chan<- bool
state tieredStorageState
memorySemaphore chan bool
wmCache *watermarkCache
Indexer MetricIndexer
flushSema chan bool
dtoSampleKeys *dtoSampleKeyList
sampleKeys *sampleKeyList
}
// viewJob encapsulates a request to extract sample values from the datastore.
type viewJob struct {
builder ViewRequestBuilder
output chan View
abort chan bool
err chan error
stats *stats.TimerGroup
}
const (
tieredMemorySemaphores = 5
)
const watermarkCacheLimit = 1024 * 1024
// NewTieredStorage returns a TieredStorage object ready to use.
func NewTieredStorage(
appendToDiskQueueDepth,
viewQueueDepth uint,
flushMemoryInterval time.Duration,
memoryTTL time.Duration,
rootDirectory string,
) (*TieredStorage, error) {
if isDir, _ := utility.IsDir(rootDirectory); !isDir {
if err := os.MkdirAll(rootDirectory, 0755); err != nil {
return nil, fmt.Errorf("could not find or create metrics directory %s: %s", rootDirectory, err)
}
}
diskStorage, err := NewLevelDBMetricPersistence(rootDirectory)
if err != nil {
return nil, err
}
wmCache := &watermarkCache{
C: utility.NewSynchronizedCache(utility.NewLRUCache(watermarkCacheLimit)),
}
memOptions := MemorySeriesOptions{
WatermarkCache: wmCache,
}
s := &TieredStorage{
appendToDiskQueue: make(chan clientmodel.Samples, appendToDiskQueueDepth),
DiskStorage: diskStorage,
draining: make(chan chan<- bool),
flushMemoryInterval: flushMemoryInterval,
memoryArena: NewMemorySeriesStorage(memOptions),
memoryTTL: memoryTTL,
ViewQueue: make(chan viewJob, viewQueueDepth),
memorySemaphore: make(chan bool, tieredMemorySemaphores),
wmCache: wmCache,
flushSema: make(chan bool, 1),
dtoSampleKeys: newDtoSampleKeyList(10),
sampleKeys: newSampleKeyList(10),
}
for i := 0; i < tieredMemorySemaphores; i++ {
s.memorySemaphore <- true
}
return s, nil
}
// AppendSamples enqueues Samples for storage.
func (t *TieredStorage) AppendSamples(samples clientmodel.Samples) (err error) {
t.mu.RLock()
defer t.mu.RUnlock()
if t.state != tieredStorageServing {
return fmt.Errorf("storage is not serving")
}
t.memoryArena.AppendSamples(samples)
storedSamplesCount.IncrementBy(prometheus.NilLabels, float64(len(samples)))
return
}
// Drain stops the storage subsystem, flushing all pending operations.
func (t *TieredStorage) Drain(drained chan<- bool) {
t.mu.Lock()
defer t.mu.Unlock()
t.drain(drained)
}
func (t *TieredStorage) drain(drained chan<- bool) {
if t.state >= tieredStorageDraining {
panic("Illegal State: Supplemental drain requested.")
}
t.state = tieredStorageDraining
glog.Info("Triggering drain...")
t.draining <- (drained)
}
// MakeView materializes a View according to a ViewRequestBuilder, subject to a
// timeout.
func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration, queryStats *stats.TimerGroup) (View, error) {
t.mu.RLock()
defer t.mu.RUnlock()
if t.state != tieredStorageServing {
return nil, fmt.Errorf("storage is not serving")
}
// The result channel needs a one-element buffer in case we have timed
// out in MakeView, but the view rendering still completes afterwards
// and writes to the channel.
result := make(chan View, 1)
// The abort channel needs a one-element buffer in case the view
// rendering has already exited and doesn't consume from the channel
// anymore.
abortChan := make(chan bool, 1)
errChan := make(chan error)
queryStats.GetTimer(stats.ViewQueueTime).Start()
t.ViewQueue <- viewJob{
builder: builder,
output: result,
abort: abortChan,
err: errChan,
stats: queryStats,
}
select {
case view := <-result:
return view, nil
case err := <-errChan:
return nil, err
case <-time.After(deadline):
abortChan <- true
return nil, fmt.Errorf("fetching query data timed out after %s", deadline)
}
}
// Serve starts serving requests.
func (t *TieredStorage) Serve(started chan<- bool) {
t.mu.Lock()
if t.state != tieredStorageStarting {
panic("Illegal State: Attempted to restart TieredStorage.")
}
t.state = tieredStorageServing
t.mu.Unlock()
flushMemoryTicker := time.NewTicker(t.flushMemoryInterval)
defer flushMemoryTicker.Stop()
queueReportTicker := time.NewTicker(time.Second)
defer queueReportTicker.Stop()
go func() {
for _ = range queueReportTicker.C {
t.reportQueues()
}
}()
started <- true
for {
select {
case <-flushMemoryTicker.C:
select {
case t.flushSema <- true:
go func() {
t.flushMemory(t.memoryTTL)
<-t.flushSema
}()
default:
glog.Warning("Backlogging on flush...")
}
case viewRequest := <-t.ViewQueue:
<-t.memorySemaphore
viewRequest.stats.GetTimer(stats.ViewQueueTime).Stop()
go t.renderView(viewRequest)
case drainingDone := <-t.draining:
t.Flush()
drainingDone <- true
return
}
}
}
func (t *TieredStorage) reportQueues() {
queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "occupancy"}, float64(len(t.appendToDiskQueue)))
queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "capacity"}, float64(cap(t.appendToDiskQueue)))
queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "occupancy"}, float64(len(t.ViewQueue)))
queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "capacity"}, float64(cap(t.ViewQueue)))
}
// Flush flushes all samples to disk.
func (t *TieredStorage) Flush() {
t.flushSema <- true
t.flushMemory(0)
<-t.flushSema
}
func (t *TieredStorage) flushMemory(ttl time.Duration) {
flushOlderThan := clientmodel.Now().Add(-1 * ttl)
glog.Info("Flushing samples to disk...")
t.memoryArena.Flush(flushOlderThan, t.appendToDiskQueue)
queueLength := len(t.appendToDiskQueue)
if queueLength > 0 {
samples := clientmodel.Samples{}
for i := 0; i < queueLength; i++ {
chunk := <-t.appendToDiskQueue
samples = append(samples, chunk...)
}
glog.Infof("Writing %d samples...", len(samples))
t.DiskStorage.AppendSamples(samples)
}
glog.Info("Done flushing.")
}
// Close stops serving, flushes all pending operations, and frees all resources.
func (t *TieredStorage) Close() {
t.mu.Lock()
defer t.mu.Unlock()
t.close()
}
func (t *TieredStorage) close() {
if t.state == tieredStorageStopping {
panic("Illegal State: Attempted to restop TieredStorage.")
}
drained := make(chan bool)
t.drain(drained)
<-drained
t.memoryArena.Close()
t.DiskStorage.Close()
// BUG(matt): There is a probability that pending items may hang here
// and not get flushed.
close(t.appendToDiskQueue)
close(t.ViewQueue)
t.wmCache.Clear()
t.dtoSampleKeys.Close()
t.sampleKeys.Close()
t.state = tieredStorageStopping
}
func (t *TieredStorage) seriesTooOld(f *clientmodel.Fingerprint, i clientmodel.Timestamp) (bool, error) {
// BUG(julius): Make this configurable by query layer.
i = i.Add(-stalenessLimit)
wm, cacheHit, _ := t.wmCache.Get(f)
if !cacheHit {
if t.memoryArena.HasFingerprint(f) {
samples := t.memoryArena.CloneSamples(f)
if len(samples) > 0 {
newest := samples[len(samples)-1].Timestamp
t.wmCache.Put(f, &watermarks{High: newest})
return newest.Before(i), nil
}
}
highTime, diskHit, err := t.DiskStorage.MetricHighWatermarks.Get(f)
if err != nil {
return false, err
}
if diskHit {
t.wmCache.Put(f, &watermarks{High: highTime})
return highTime.Before(i), nil
}
t.wmCache.Put(f, &watermarks{})
return true, nil
}
return wm.High.Before(i), nil
}
func (t *TieredStorage) renderView(viewJob viewJob) {
// Telemetry.
var err error
begin := time.Now()
defer func() {
t.memorySemaphore <- true
duration := time.Since(begin)
recordOutcome(
duration,
err,
map[string]string{operation: renderView, result: success},
map[string]string{operation: renderView, result: failure},
)
}()
scanJobsTimer := viewJob.stats.GetTimer(stats.ViewScanJobsTime).Start()
scanJobsTimer.Stop()
view := newView()
var iterator leveldb.Iterator
diskPresent := true
firstBlock, _ := t.sampleKeys.Get()
defer t.sampleKeys.Give(firstBlock)
lastBlock, _ := t.sampleKeys.Get()
defer t.sampleKeys.Give(lastBlock)
sampleKeyDto, _ := t.dtoSampleKeys.Get()
defer t.dtoSampleKeys.Give(sampleKeyDto)
extractionTimer := viewJob.stats.GetTimer(stats.ViewDataExtractionTime).Start()
for viewJob.builder.HasOp() {
op := viewJob.builder.PopOp()
fp := op.Fingerprint()
old, err := t.seriesTooOld(fp, op.CurrentTime())
if err != nil {
glog.Errorf("Error getting watermark from cache for %s: %s", fp, err)
continue
}
if old {
continue
}
memValues := t.memoryArena.CloneSamples(fp)
// Abort the view rendering if the caller (MakeView) has timed out.
if len(viewJob.abort) > 0 {
return
}
// Load data value chunk(s) around the current time.
targetTime := op.CurrentTime()
currentChunk := chunk{}
// If we aimed before the oldest value in memory, load more data from disk.
if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent {
if iterator == nil {
// Get a single iterator that will be used for all data extraction
// below.
iterator, _ = t.DiskStorage.MetricSamples.NewIterator(true)
defer iterator.Close()
if diskPresent = iterator.SeekToLast(); diskPresent {
if err := iterator.Key(sampleKeyDto); err != nil {
panic(err)
}
lastBlock.Load(sampleKeyDto)
if !iterator.SeekToFirst() {
diskPresent = false
} else {
if err := iterator.Key(sampleKeyDto); err != nil {
panic(err)
}
firstBlock.Load(sampleKeyDto)
}
}
}
if diskPresent {
diskTimer := viewJob.stats.GetTimer(stats.ViewDiskExtractionTime).Start()
diskValues, expired := t.loadChunkAroundTime(
iterator,
fp,
targetTime,
firstBlock,
lastBlock,
)
if expired {
diskPresent = false
}
diskTimer.Stop()
// If we aimed past the newest value on disk,
// combine it with the next value from memory.
if len(diskValues) == 0 {
currentChunk = chunk(memValues)
} else {
if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) {
latestDiskValue := diskValues[len(diskValues)-1:]
currentChunk = append(chunk(latestDiskValue), chunk(memValues)...)
} else {
currentChunk = chunk(diskValues)
}
}
} else {
currentChunk = chunk(memValues)
}
} else {
currentChunk = chunk(memValues)
}
// There's no data at all for this fingerprint, so stop processing.
if len(currentChunk) == 0 {
continue
}
currentChunk = currentChunk.TruncateBefore(targetTime)
lastChunkTime := currentChunk[len(currentChunk)-1].Timestamp
if lastChunkTime.After(targetTime) {
targetTime = lastChunkTime
}
// Extract all needed data from the current chunk and append the
// extracted samples to the materialized view.
for !op.Consumed() && !op.CurrentTime().After(targetTime) {
view.appendSamples(fp, op.ExtractSamples(Values(currentChunk)))
}
}
extractionTimer.Stop()
viewJob.output <- view
return
}
func (t *TieredStorage) loadChunkAroundTime(
iterator leveldb.Iterator,
fingerprint *clientmodel.Fingerprint,
ts clientmodel.Timestamp,
firstBlock,
lastBlock *SampleKey,
) (chunk Values, expired bool) {
if fingerprint.Less(firstBlock.Fingerprint) {
return nil, false
}
if lastBlock.Fingerprint.Less(fingerprint) {
return nil, true
}
seekingKey, _ := t.sampleKeys.Get()
defer t.sampleKeys.Give(seekingKey)
seekingKey.Fingerprint = fingerprint
if fingerprint.Equal(firstBlock.Fingerprint) && ts.Before(firstBlock.FirstTimestamp) {
seekingKey.FirstTimestamp = firstBlock.FirstTimestamp
} else if fingerprint.Equal(lastBlock.Fingerprint) && ts.After(lastBlock.FirstTimestamp) {
seekingKey.FirstTimestamp = lastBlock.FirstTimestamp
} else {
seekingKey.FirstTimestamp = ts
}
dto, _ := t.dtoSampleKeys.Get()
defer t.dtoSampleKeys.Give(dto)
seekingKey.Dump(dto)
if !iterator.Seek(dto) {
return chunk, true
}
var foundValues Values
if err := iterator.Key(dto); err != nil {
panic(err)
}
seekingKey.Load(dto)
if seekingKey.Fingerprint.Equal(fingerprint) {
// Figure out if we need to rewind by one block.
// Imagine the following supertime blocks with time ranges:
//
// Block 1: ft 1000 - lt 1009 <data>
// Block 1: ft 1010 - lt 1019 <data>
//
// If we are aiming to find time 1005, we would first seek to the block with
// supertime 1010, then need to rewind by one block by virtue of LevelDB
// iterator seek behavior.
//
// Only do the rewind if there is another chunk before this one.
if !seekingKey.MayContain(ts) {
postValues, _ := extractSampleValues(iterator)
if !seekingKey.Equal(firstBlock) {
if !iterator.Previous() {
panic("This should never return false.")
}
if err := iterator.Key(dto); err != nil {
panic(err)
}
seekingKey.Load(dto)
if !seekingKey.Fingerprint.Equal(fingerprint) {
return postValues, false
}
foundValues, _ = extractSampleValues(iterator)
foundValues = append(foundValues, postValues...)
return foundValues, false
}
}
foundValues, _ = extractSampleValues(iterator)
return foundValues, false
}
if fingerprint.Less(seekingKey.Fingerprint) {
if !seekingKey.Equal(firstBlock) {
if !iterator.Previous() {
panic("This should never return false.")
}
if err := iterator.Key(dto); err != nil {
panic(err)
}
seekingKey.Load(dto)
if !seekingKey.Fingerprint.Equal(fingerprint) {
return nil, false
}
foundValues, _ = extractSampleValues(iterator)
return foundValues, false
}
}
panic("illegal state: violated sort invariant")
}
// GetAllValuesForLabel gets all label values that are associated with the
// provided label name.
func (t *TieredStorage) GetAllValuesForLabel(labelName clientmodel.LabelName) (clientmodel.LabelValues, error) {
t.mu.RLock()
defer t.mu.RUnlock()
if t.state != tieredStorageServing {
panic("Illegal State: Attempted to query non-running TieredStorage.")
}
diskValues, err := t.DiskStorage.GetAllValuesForLabel(labelName)
if err != nil {
return nil, err
}
memoryValues, err := t.memoryArena.GetAllValuesForLabel(labelName)
if err != nil {
return nil, err
}
valueSet := map[clientmodel.LabelValue]bool{}
values := clientmodel.LabelValues{}
for _, value := range append(diskValues, memoryValues...) {
if !valueSet[value] {
values = append(values, value)
valueSet[value] = true
}
}
return values, nil
}
// GetFingerprintsForLabelSet gets all of the metric fingerprints that are
// associated with the provided label set.
func (t *TieredStorage) GetFingerprintsForLabelSet(labelSet clientmodel.LabelSet) (clientmodel.Fingerprints, error) {
t.mu.RLock()
defer t.mu.RUnlock()
if t.state != tieredStorageServing {
panic("Illegal State: Attempted to query non-running TieredStorage.")
}
memFingerprints, err := t.memoryArena.GetFingerprintsForLabelSet(labelSet)
if err != nil {
return nil, err
}
diskFingerprints, err := t.DiskStorage.GetFingerprintsForLabelSet(labelSet)
if err != nil {
return nil, err
}
fingerprintSet := map[clientmodel.Fingerprint]bool{}
for _, fingerprint := range append(memFingerprints, diskFingerprints...) {
fingerprintSet[*fingerprint] = true
}
fingerprints := clientmodel.Fingerprints{}
for fingerprint := range fingerprintSet {
fpCopy := fingerprint
fingerprints = append(fingerprints, &fpCopy)
}
return fingerprints, nil
}
// GetMetricForFingerprint gets the metric associated with the provided
// fingerprint.
func (t *TieredStorage) GetMetricForFingerprint(f *clientmodel.Fingerprint) (clientmodel.Metric, error) {
t.mu.RLock()
defer t.mu.RUnlock()
if t.state != tieredStorageServing {
panic("Illegal State: Attempted to query non-running TieredStorage.")
}
m, err := t.memoryArena.GetMetricForFingerprint(f)
if err != nil {
return nil, err
}
if m == nil {
m, err = t.DiskStorage.GetMetricForFingerprint(f)
t.memoryArena.CreateEmptySeries(m)
}
return m, err
}