mirror of
https://github.com/prometheus/prometheus
synced 2025-01-14 02:43:35 +00:00
Abstract high watermark cache into standard LRU.
Conflicts: storage/metric/memory.go storage/metric/tiered.go storage/metric/watermark.go Change-Id: Iab2aedbd8f83dc4ce633421bd4a55990fa026b85
This commit is contained in:
parent
1e37b23a17
commit
7db518d3a0
@ -116,6 +116,7 @@ func viewAdapterForInstantQuery(node Node, timestamp time.Time, storage *metric.
|
||||
requestBuildTimer.Stop()
|
||||
|
||||
buildTimer := queryStats.GetTimer(stats.InnerViewBuildingTime).Start()
|
||||
// BUG(julius): Clear Law of Demeter violation.
|
||||
view, err := analyzer.storage.MakeView(viewBuilder, time.Duration(60)*time.Second, queryStats)
|
||||
buildTimer.Stop()
|
||||
if err != nil {
|
||||
|
@ -189,7 +189,7 @@ func newArrayStream(metric clientmodel.Metric) *arrayStream {
|
||||
type memorySeriesStorage struct {
|
||||
sync.RWMutex
|
||||
|
||||
wmCache *WatermarkCache
|
||||
wmCache *watermarkCache
|
||||
fingerprintToSeries map[clientmodel.Fingerprint]stream
|
||||
labelPairToFingerprints map[LabelPair]clientmodel.Fingerprints
|
||||
labelNameToFingerprints map[clientmodel.LabelName]clientmodel.Fingerprints
|
||||
@ -198,7 +198,7 @@ type memorySeriesStorage struct {
|
||||
type MemorySeriesOptions struct {
|
||||
// If provided, this WatermarkCache will be updated for any samples that are
|
||||
// appended to the memorySeriesStorage.
|
||||
WatermarkCache *WatermarkCache
|
||||
WatermarkCache *watermarkCache
|
||||
}
|
||||
|
||||
func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) error {
|
||||
@ -222,7 +222,7 @@ func (s *memorySeriesStorage) AppendSample(sample *clientmodel.Sample) error {
|
||||
})
|
||||
|
||||
if s.wmCache != nil {
|
||||
s.wmCache.Set(fingerprint, &watermarks{High: sample.Timestamp})
|
||||
s.wmCache.Put(fingerprint, &watermarks{High: sample.Timestamp})
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
"github.com/prometheus/prometheus/coding/indexable"
|
||||
"github.com/prometheus/prometheus/stats"
|
||||
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
||||
"github.com/prometheus/prometheus/utility"
|
||||
|
||||
dto "github.com/prometheus/prometheus/model/generated"
|
||||
)
|
||||
@ -64,12 +65,8 @@ const (
|
||||
tieredStorageStopping
|
||||
)
|
||||
|
||||
const (
|
||||
// Ignore timeseries in queries that are more stale than this limit.
|
||||
stalenessLimit = time.Minute * 5
|
||||
// Size of the watermarks cache (used in determining timeseries freshness).
|
||||
wmCacheSizeBytes = 5 * 1024 * 1024
|
||||
)
|
||||
// Ignore timeseries in queries that are more stale than this limit.
|
||||
const stalenessLimit = time.Minute * 5
|
||||
|
||||
// TieredStorage both persists samples and generates materialized views for
|
||||
// queries.
|
||||
@ -95,7 +92,7 @@ type TieredStorage struct {
|
||||
memorySemaphore chan bool
|
||||
diskSemaphore chan bool
|
||||
|
||||
wmCache *WatermarkCache
|
||||
wmCache *watermarkCache
|
||||
|
||||
Indexer MetricIndexer
|
||||
}
|
||||
@ -114,14 +111,21 @@ const (
|
||||
tieredMemorySemaphores = 5
|
||||
)
|
||||
|
||||
const watermarkCacheLimit = 1024 * 1024
|
||||
|
||||
func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, memoryTTL time.Duration, root string) (*TieredStorage, error) {
|
||||
diskStorage, err := NewLevelDBMetricPersistence(root)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
wmCache := NewWatermarkCache(wmCacheSizeBytes)
|
||||
memOptions := MemorySeriesOptions{WatermarkCache: wmCache}
|
||||
wmCache := &watermarkCache{
|
||||
C: utility.NewSynchronizedCache(utility.NewLRUCache(watermarkCacheLimit)),
|
||||
}
|
||||
|
||||
memOptions := MemorySeriesOptions{
|
||||
WatermarkCache: wmCache,
|
||||
}
|
||||
|
||||
s := &TieredStorage{
|
||||
appendToDiskQueue: make(chan clientmodel.Samples, appendToDiskQueueDepth),
|
||||
@ -319,13 +323,13 @@ func (t *TieredStorage) seriesTooOld(f *clientmodel.Fingerprint, i time.Time) (b
|
||||
// BUG(julius): Make this configurable by query layer.
|
||||
i = i.Add(-stalenessLimit)
|
||||
|
||||
wm, cacheHit := t.wmCache.Get(f)
|
||||
wm, cacheHit, _ := t.wmCache.Get(f)
|
||||
if !cacheHit {
|
||||
if t.memoryArena.HasFingerprint(f) {
|
||||
samples := t.memoryArena.CloneSamples(f)
|
||||
if len(samples) > 0 {
|
||||
newest := samples[len(samples)-1].Timestamp
|
||||
t.wmCache.Set(f, &watermarks{High: newest})
|
||||
t.wmCache.Put(f, &watermarks{High: newest})
|
||||
|
||||
return newest.Before(i), nil
|
||||
}
|
||||
@ -337,12 +341,12 @@ func (t *TieredStorage) seriesTooOld(f *clientmodel.Fingerprint, i time.Time) (b
|
||||
}
|
||||
|
||||
if diskHit {
|
||||
t.wmCache.Set(f, &watermarks{High: highTime})
|
||||
t.wmCache.Put(f, &watermarks{High: highTime})
|
||||
|
||||
return highTime.Before(i), nil
|
||||
}
|
||||
|
||||
t.wmCache.Set(f, &watermarks{})
|
||||
t.wmCache.Put(f, &watermarks{})
|
||||
return true, nil
|
||||
}
|
||||
|
||||
|
@ -14,8 +14,6 @@
|
||||
package metric
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
@ -25,28 +23,11 @@ import (
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/storage/raw"
|
||||
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
||||
"github.com/prometheus/prometheus/utility"
|
||||
|
||||
dto "github.com/prometheus/prometheus/model/generated"
|
||||
)
|
||||
|
||||
// unsafe.Sizeof(watermarks{})
|
||||
const elementSize = 24
|
||||
|
||||
type Bytes uint64
|
||||
|
||||
// WatermarkCache is a thread-safe LRU cache for fingerprint watermark
|
||||
// state.
|
||||
type WatermarkCache struct {
|
||||
mu sync.Mutex
|
||||
|
||||
list *list.List
|
||||
table map[clientmodel.Fingerprint]*list.Element
|
||||
|
||||
size Bytes
|
||||
|
||||
allowance Bytes
|
||||
}
|
||||
|
||||
type watermarks struct {
|
||||
High time.Time
|
||||
}
|
||||
@ -61,112 +42,6 @@ func (w *watermarks) dump(d *dto.MetricHighWatermark) {
|
||||
d.Timestamp = proto.Int64(w.High.Unix())
|
||||
}
|
||||
|
||||
type entry struct {
|
||||
fingerprint *clientmodel.Fingerprint
|
||||
watermarks *watermarks
|
||||
accessed time.Time
|
||||
}
|
||||
|
||||
func NewWatermarkCache(allowance Bytes) *WatermarkCache {
|
||||
return &WatermarkCache{
|
||||
list: list.New(),
|
||||
table: map[clientmodel.Fingerprint]*list.Element{},
|
||||
allowance: allowance,
|
||||
}
|
||||
}
|
||||
|
||||
func (lru *WatermarkCache) Get(f *clientmodel.Fingerprint) (v *watermarks, ok bool) {
|
||||
lru.mu.Lock()
|
||||
defer lru.mu.Unlock()
|
||||
|
||||
element, ok := lru.table[*f]
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
lru.moveToFront(element)
|
||||
|
||||
return element.Value.(*entry).watermarks, true
|
||||
}
|
||||
|
||||
func (lru *WatermarkCache) Set(f *clientmodel.Fingerprint, w *watermarks) {
|
||||
lru.mu.Lock()
|
||||
defer lru.mu.Unlock()
|
||||
|
||||
if element, ok := lru.table[*f]; ok {
|
||||
lru.updateInplace(element, w)
|
||||
} else {
|
||||
lru.addNew(f, w)
|
||||
}
|
||||
}
|
||||
|
||||
func (lru *WatermarkCache) SetIfAbsent(f *clientmodel.Fingerprint, w *watermarks) {
|
||||
lru.mu.Lock()
|
||||
defer lru.mu.Unlock()
|
||||
|
||||
if element, ok := lru.table[*f]; ok {
|
||||
lru.moveToFront(element)
|
||||
} else {
|
||||
lru.addNew(f, w)
|
||||
}
|
||||
}
|
||||
|
||||
func (lru *WatermarkCache) Delete(f *clientmodel.Fingerprint) bool {
|
||||
lru.mu.Lock()
|
||||
defer lru.mu.Unlock()
|
||||
|
||||
element, ok := lru.table[*f]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
lru.list.Remove(element)
|
||||
delete(lru.table, *f)
|
||||
lru.size -= elementSize
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (lru *WatermarkCache) Clear() {
|
||||
lru.mu.Lock()
|
||||
defer lru.mu.Unlock()
|
||||
|
||||
lru.list.Init()
|
||||
lru.table = map[clientmodel.Fingerprint]*list.Element{}
|
||||
lru.size = 0
|
||||
}
|
||||
|
||||
func (lru *WatermarkCache) updateInplace(e *list.Element, w *watermarks) {
|
||||
e.Value.(*entry).watermarks = w
|
||||
lru.moveToFront(e)
|
||||
lru.checkCapacity()
|
||||
}
|
||||
|
||||
func (lru *WatermarkCache) moveToFront(e *list.Element) {
|
||||
lru.list.MoveToFront(e)
|
||||
e.Value.(*entry).accessed = time.Now()
|
||||
}
|
||||
|
||||
func (lru *WatermarkCache) addNew(f *clientmodel.Fingerprint, w *watermarks) {
|
||||
lru.table[*f] = lru.list.PushFront(&entry{
|
||||
fingerprint: f,
|
||||
watermarks: w,
|
||||
accessed: time.Now(),
|
||||
})
|
||||
lru.size += elementSize
|
||||
lru.checkCapacity()
|
||||
}
|
||||
|
||||
func (lru *WatermarkCache) checkCapacity() {
|
||||
for lru.size > lru.allowance {
|
||||
delElem := lru.list.Back()
|
||||
delWatermarks := delElem.Value.(*entry)
|
||||
lru.list.Remove(delElem)
|
||||
delete(lru.table, *delWatermarks.fingerprint)
|
||||
lru.size -= elementSize
|
||||
}
|
||||
}
|
||||
|
||||
type FingerprintHighWatermarkMapping map[clientmodel.Fingerprint]time.Time
|
||||
|
||||
type HighWatermarker interface {
|
||||
@ -333,3 +208,24 @@ func NewLevelDBCurationRemarker(o LevelDBCurationRemarkerOptions) (*LevelDBCurat
|
||||
p: s,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type watermarkCache struct {
|
||||
C utility.Cache
|
||||
}
|
||||
|
||||
func (c *watermarkCache) Get(f *clientmodel.Fingerprint) (*watermarks, bool, error) {
|
||||
v, ok, err := c.C.Get(*f)
|
||||
if ok {
|
||||
return v.(*watermarks), ok, err
|
||||
}
|
||||
|
||||
return nil, ok, err
|
||||
}
|
||||
|
||||
func (c *watermarkCache) Put(f *clientmodel.Fingerprint, v *watermarks) (bool, error) {
|
||||
return c.C.Put(*f, v)
|
||||
}
|
||||
|
||||
func (c *watermarkCache) Clear() (bool, error) {
|
||||
return c.C.Clear()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user