mirror of
https://github.com/prometheus/prometheus
synced 2025-03-11 07:59:57 +00:00
Merge pull request #311 from prometheus/fix/watermarking/on-first-write
Ensure new metrics are watermarked early.
This commit is contained in:
commit
42198c1f1c
@ -506,7 +506,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger
|
||||
}
|
||||
|
||||
// BUG(matt): Repace this with watermark management.
|
||||
if !newestSampleTimestamp.Before(time.Unix(value.GetTimestamp(), 0)) {
|
||||
if newestSampleTimestamp.After(time.Unix(value.GetTimestamp(), 0)) {
|
||||
value.Timestamp = proto.Int64(newestSampleTimestamp.Unix())
|
||||
batch.Put(fingerprint.ToDTO(), value)
|
||||
}
|
||||
|
@ -14,11 +14,12 @@
|
||||
package metric
|
||||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/model"
|
||||
"github.com/prometheus/prometheus/utility"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/prometheus/model"
|
||||
"github.com/prometheus/prometheus/utility"
|
||||
)
|
||||
|
||||
// Assuming sample rate of 1 / 15Hz, this allows for one hour's worth of
|
||||
@ -370,6 +371,15 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(f *model.Fingerprint) (mod
|
||||
return metric, nil
|
||||
}
|
||||
|
||||
func (s *memorySeriesStorage) HasFingerprint(f *model.Fingerprint) bool {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
||||
_, has := s.fingerprintToSeries[*f]
|
||||
|
||||
return has
|
||||
}
|
||||
|
||||
func (s *memorySeriesStorage) CloneSamples(f *model.Fingerprint) model.Values {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
@ -309,20 +309,34 @@ func (t *TieredStorage) seriesTooOld(f *model.Fingerprint, i time.Time) (bool, e
|
||||
// BUG(julius): Make this configurable by query layer.
|
||||
i = i.Add(-stalenessLimit)
|
||||
|
||||
wm, ok := t.wmCache.Get(f)
|
||||
if !ok {
|
||||
wm, cacheHit := t.wmCache.Get(f)
|
||||
if !cacheHit {
|
||||
if t.memoryArena.HasFingerprint(f) {
|
||||
samples := t.memoryArena.CloneSamples(f)
|
||||
if len(samples) > 0 {
|
||||
newest := samples[len(samples)-1].Timestamp
|
||||
t.wmCache.Set(f, &Watermarks{High: newest})
|
||||
|
||||
return newest.Before(i), nil
|
||||
}
|
||||
}
|
||||
|
||||
value := &dto.MetricHighWatermark{}
|
||||
present, err := t.DiskStorage.MetricHighWatermarks.Get(f.ToDTO(), value)
|
||||
diskHit, err := t.DiskStorage.MetricHighWatermarks.Get(f.ToDTO(), value)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if present {
|
||||
|
||||
if diskHit {
|
||||
wmTime := time.Unix(*value.Timestamp, 0).UTC()
|
||||
t.wmCache.Set(f, &Watermarks{High: wmTime})
|
||||
|
||||
return wmTime.Before(i), nil
|
||||
}
|
||||
return true, nil
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return wm.High.Before(i), nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user