From 7db518d3a01c022764b066ebdc21129e4c06f7ac Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Wed, 7 Aug 2013 23:28:11 +0200 Subject: [PATCH] Abstract high watermark cache into standard LRU. Conflicts: storage/metric/memory.go storage/metric/tiered.go storage/metric/watermark.go Change-Id: Iab2aedbd8f83dc4ce633421bd4a55990fa026b85 --- rules/ast/query_analyzer.go | 1 + storage/metric/memory.go | 6 +- storage/metric/tiered.go | 30 ++++---- storage/metric/watermark.go | 148 ++++++------------------------------ 4 files changed, 43 insertions(+), 142 deletions(-) diff --git a/rules/ast/query_analyzer.go b/rules/ast/query_analyzer.go index bdbed7710..d8c79065e 100644 --- a/rules/ast/query_analyzer.go +++ b/rules/ast/query_analyzer.go @@ -116,6 +116,7 @@ func viewAdapterForInstantQuery(node Node, timestamp time.Time, storage *metric. requestBuildTimer.Stop() buildTimer := queryStats.GetTimer(stats.InnerViewBuildingTime).Start() + // BUG(julius): Clear Law of Demeter violation. view, err := analyzer.storage.MakeView(viewBuilder, time.Duration(60)*time.Second, queryStats) buildTimer.Stop() if err != nil { diff --git a/storage/metric/memory.go b/storage/metric/memory.go index 625cf67d1..f4af20960 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -189,7 +189,7 @@ func newArrayStream(metric clientmodel.Metric) *arrayStream { type memorySeriesStorage struct { sync.RWMutex - wmCache *WatermarkCache + wmCache *watermarkCache fingerprintToSeries map[clientmodel.Fingerprint]stream labelPairToFingerprints map[LabelPair]clientmodel.Fingerprints labelNameToFingerprints map[clientmodel.LabelName]clientmodel.Fingerprints @@ -198,7 +198,7 @@ type memorySeriesStorage struct { type MemorySeriesOptions struct { // If provided, this WatermarkCache will be updated for any samples that are // appended to the memorySeriesStorage. - WatermarkCache *WatermarkCache + WatermarkCache *watermarkCache } func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) error { @@ -222,7 +222,7 @@ func (s *memorySeriesStorage) AppendSample(sample *clientmodel.Sample) error { }) if s.wmCache != nil { - s.wmCache.Set(fingerprint, &watermarks{High: sample.Timestamp}) + s.wmCache.Put(fingerprint, &watermarks{High: sample.Timestamp}) } return nil diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 6608f58e2..73f67d86e 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/prometheus/coding/indexable" "github.com/prometheus/prometheus/stats" "github.com/prometheus/prometheus/storage/raw/leveldb" + "github.com/prometheus/prometheus/utility" dto "github.com/prometheus/prometheus/model/generated" ) @@ -64,12 +65,8 @@ const ( tieredStorageStopping ) -const ( - // Ignore timeseries in queries that are more stale than this limit. - stalenessLimit = time.Minute * 5 - // Size of the watermarks cache (used in determining timeseries freshness). - wmCacheSizeBytes = 5 * 1024 * 1024 -) +// Ignore timeseries in queries that are more stale than this limit. +const stalenessLimit = time.Minute * 5 // TieredStorage both persists samples and generates materialized views for // queries. @@ -95,7 +92,7 @@ type TieredStorage struct { memorySemaphore chan bool diskSemaphore chan bool - wmCache *WatermarkCache + wmCache *watermarkCache Indexer MetricIndexer } @@ -114,14 +111,21 @@ const ( tieredMemorySemaphores = 5 ) +const watermarkCacheLimit = 1024 * 1024 + func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, memoryTTL time.Duration, root string) (*TieredStorage, error) { diskStorage, err := NewLevelDBMetricPersistence(root) if err != nil { return nil, err } - wmCache := NewWatermarkCache(wmCacheSizeBytes) - memOptions := MemorySeriesOptions{WatermarkCache: wmCache} + wmCache := &watermarkCache{ + C: utility.NewSynchronizedCache(utility.NewLRUCache(watermarkCacheLimit)), + } + + memOptions := MemorySeriesOptions{ + WatermarkCache: wmCache, + } s := &TieredStorage{ appendToDiskQueue: make(chan clientmodel.Samples, appendToDiskQueueDepth), @@ -319,13 +323,13 @@ func (t *TieredStorage) seriesTooOld(f *clientmodel.Fingerprint, i time.Time) (b // BUG(julius): Make this configurable by query layer. i = i.Add(-stalenessLimit) - wm, cacheHit := t.wmCache.Get(f) + wm, cacheHit, _ := t.wmCache.Get(f) if !cacheHit { if t.memoryArena.HasFingerprint(f) { samples := t.memoryArena.CloneSamples(f) if len(samples) > 0 { newest := samples[len(samples)-1].Timestamp - t.wmCache.Set(f, &watermarks{High: newest}) + t.wmCache.Put(f, &watermarks{High: newest}) return newest.Before(i), nil } @@ -337,12 +341,12 @@ func (t *TieredStorage) seriesTooOld(f *clientmodel.Fingerprint, i time.Time) (b } if diskHit { - t.wmCache.Set(f, &watermarks{High: highTime}) + t.wmCache.Put(f, &watermarks{High: highTime}) return highTime.Before(i), nil } - t.wmCache.Set(f, &watermarks{}) + t.wmCache.Put(f, &watermarks{}) return true, nil } diff --git a/storage/metric/watermark.go b/storage/metric/watermark.go index 8023acff1..cf1fc69f3 100644 --- a/storage/metric/watermark.go +++ b/storage/metric/watermark.go @@ -14,8 +14,6 @@ package metric import ( - "container/list" - "sync" "time" "code.google.com/p/goprotobuf/proto" @@ -25,28 +23,11 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw/leveldb" + "github.com/prometheus/prometheus/utility" dto "github.com/prometheus/prometheus/model/generated" ) -// unsafe.Sizeof(watermarks{}) -const elementSize = 24 - -type Bytes uint64 - -// WatermarkCache is a thread-safe LRU cache for fingerprint watermark -// state. -type WatermarkCache struct { - mu sync.Mutex - - list *list.List - table map[clientmodel.Fingerprint]*list.Element - - size Bytes - - allowance Bytes -} - type watermarks struct { High time.Time } @@ -61,112 +42,6 @@ func (w *watermarks) dump(d *dto.MetricHighWatermark) { d.Timestamp = proto.Int64(w.High.Unix()) } -type entry struct { - fingerprint *clientmodel.Fingerprint - watermarks *watermarks - accessed time.Time -} - -func NewWatermarkCache(allowance Bytes) *WatermarkCache { - return &WatermarkCache{ - list: list.New(), - table: map[clientmodel.Fingerprint]*list.Element{}, - allowance: allowance, - } -} - -func (lru *WatermarkCache) Get(f *clientmodel.Fingerprint) (v *watermarks, ok bool) { - lru.mu.Lock() - defer lru.mu.Unlock() - - element, ok := lru.table[*f] - if !ok { - return nil, false - } - - lru.moveToFront(element) - - return element.Value.(*entry).watermarks, true -} - -func (lru *WatermarkCache) Set(f *clientmodel.Fingerprint, w *watermarks) { - lru.mu.Lock() - defer lru.mu.Unlock() - - if element, ok := lru.table[*f]; ok { - lru.updateInplace(element, w) - } else { - lru.addNew(f, w) - } -} - -func (lru *WatermarkCache) SetIfAbsent(f *clientmodel.Fingerprint, w *watermarks) { - lru.mu.Lock() - defer lru.mu.Unlock() - - if element, ok := lru.table[*f]; ok { - lru.moveToFront(element) - } else { - lru.addNew(f, w) - } -} - -func (lru *WatermarkCache) Delete(f *clientmodel.Fingerprint) bool { - lru.mu.Lock() - defer lru.mu.Unlock() - - element, ok := lru.table[*f] - if !ok { - return false - } - - lru.list.Remove(element) - delete(lru.table, *f) - lru.size -= elementSize - - return true -} - -func (lru *WatermarkCache) Clear() { - lru.mu.Lock() - defer lru.mu.Unlock() - - lru.list.Init() - lru.table = map[clientmodel.Fingerprint]*list.Element{} - lru.size = 0 -} - -func (lru *WatermarkCache) updateInplace(e *list.Element, w *watermarks) { - e.Value.(*entry).watermarks = w - lru.moveToFront(e) - lru.checkCapacity() -} - -func (lru *WatermarkCache) moveToFront(e *list.Element) { - lru.list.MoveToFront(e) - e.Value.(*entry).accessed = time.Now() -} - -func (lru *WatermarkCache) addNew(f *clientmodel.Fingerprint, w *watermarks) { - lru.table[*f] = lru.list.PushFront(&entry{ - fingerprint: f, - watermarks: w, - accessed: time.Now(), - }) - lru.size += elementSize - lru.checkCapacity() -} - -func (lru *WatermarkCache) checkCapacity() { - for lru.size > lru.allowance { - delElem := lru.list.Back() - delWatermarks := delElem.Value.(*entry) - lru.list.Remove(delElem) - delete(lru.table, *delWatermarks.fingerprint) - lru.size -= elementSize - } -} - type FingerprintHighWatermarkMapping map[clientmodel.Fingerprint]time.Time type HighWatermarker interface { @@ -333,3 +208,24 @@ func NewLevelDBCurationRemarker(o LevelDBCurationRemarkerOptions) (*LevelDBCurat p: s, }, nil } + +type watermarkCache struct { + C utility.Cache +} + +func (c *watermarkCache) Get(f *clientmodel.Fingerprint) (*watermarks, bool, error) { + v, ok, err := c.C.Get(*f) + if ok { + return v.(*watermarks), ok, err + } + + return nil, ok, err +} + +func (c *watermarkCache) Put(f *clientmodel.Fingerprint, v *watermarks) (bool, error) { + return c.C.Put(*f, v) +} + +func (c *watermarkCache) Clear() (bool, error) { + return c.C.Clear() +}