mirror of
https://github.com/prometheus/prometheus
synced 2024-12-27 17:13:22 +00:00
Merge pull request #1387 from prometheus/beorn7/storage
Populate first and last time in the chunk descriptor earlier
This commit is contained in:
commit
7e41f45fe7
@ -56,11 +56,11 @@ const (
|
||||
// chunkDesc contains meta-data for a chunk. Many of its methods are
|
||||
// goroutine-safe proxies for chunk methods.
|
||||
type chunkDesc struct {
|
||||
sync.Mutex
|
||||
sync.Mutex // TODO(beorn7): Try out if an RWMutex would help here.
|
||||
c chunk // nil if chunk is evicted.
|
||||
rCnt int
|
||||
chunkFirstTime model.Time // Used if chunk is evicted.
|
||||
chunkLastTime model.Time // Used if chunk is evicted.
|
||||
chunkFirstTime model.Time // Populated at creation.
|
||||
chunkLastTime model.Time // Populated on closing of the chunk, model.Earliest if unset.
|
||||
|
||||
// evictListElement is nil if the chunk is not in the evict list.
|
||||
// evictListElement is _not_ protected by the chunkDesc mutex.
|
||||
@ -71,11 +71,16 @@ type chunkDesc struct {
|
||||
// newChunkDesc creates a new chunkDesc pointing to the provided chunk. The
|
||||
// provided chunk is assumed to be not persisted yet. Therefore, the refCount of
|
||||
// the new chunkDesc is 1 (preventing eviction prior to persisting).
|
||||
func newChunkDesc(c chunk) *chunkDesc {
|
||||
func newChunkDesc(c chunk, firstTime model.Time) *chunkDesc {
|
||||
chunkOps.WithLabelValues(createAndPin).Inc()
|
||||
atomic.AddInt64(&numMemChunks, 1)
|
||||
numMemChunkDescs.Inc()
|
||||
return &chunkDesc{c: c, rCnt: 1}
|
||||
return &chunkDesc{
|
||||
c: c,
|
||||
rCnt: 1,
|
||||
chunkFirstTime: firstTime,
|
||||
chunkLastTime: model.Earliest,
|
||||
}
|
||||
}
|
||||
|
||||
func (cd *chunkDesc) add(s *model.SamplePair) []chunk {
|
||||
@ -124,25 +129,29 @@ func (cd *chunkDesc) refCount() int {
|
||||
}
|
||||
|
||||
func (cd *chunkDesc) firstTime() model.Time {
|
||||
cd.Lock()
|
||||
defer cd.Unlock()
|
||||
|
||||
if cd.c == nil {
|
||||
return cd.chunkFirstTime
|
||||
}
|
||||
return cd.c.firstTime()
|
||||
// No lock required, will never be modified.
|
||||
return cd.chunkFirstTime
|
||||
}
|
||||
|
||||
func (cd *chunkDesc) lastTime() model.Time {
|
||||
cd.Lock()
|
||||
defer cd.Unlock()
|
||||
|
||||
if cd.c == nil {
|
||||
if cd.chunkLastTime != model.Earliest || cd.c == nil {
|
||||
return cd.chunkLastTime
|
||||
}
|
||||
return cd.c.newIterator().lastTimestamp()
|
||||
}
|
||||
|
||||
func (cd *chunkDesc) maybePopulateLastTime() {
|
||||
cd.Lock()
|
||||
defer cd.Unlock()
|
||||
|
||||
if cd.chunkLastTime == model.Earliest && cd.c != nil {
|
||||
cd.chunkLastTime = cd.c.newIterator().lastTimestamp()
|
||||
}
|
||||
}
|
||||
|
||||
func (cd *chunkDesc) lastSamplePair() *model.SamplePair {
|
||||
cd.Lock()
|
||||
defer cd.Unlock()
|
||||
@ -198,8 +207,10 @@ func (cd *chunkDesc) maybeEvict() bool {
|
||||
if cd.rCnt != 0 {
|
||||
return false
|
||||
}
|
||||
cd.chunkFirstTime = cd.c.firstTime()
|
||||
cd.chunkLastTime = cd.c.newIterator().lastTimestamp()
|
||||
// Last opportunity to populate chunkLastTime.
|
||||
if cd.chunkLastTime == model.Earliest {
|
||||
cd.chunkLastTime = cd.c.newIterator().lastTimestamp()
|
||||
}
|
||||
cd.c = nil
|
||||
chunkOps.WithLabelValues(evict).Inc()
|
||||
atomic.AddInt64(&numMemChunks, -1)
|
||||
|
@ -856,7 +856,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
|
||||
p.dirty = true
|
||||
return sm, chunksToPersist, nil
|
||||
}
|
||||
chunkDescs[i] = newChunkDesc(chunk)
|
||||
chunkDescs[i] = newChunkDesc(chunk, chunk.firstTime())
|
||||
chunksToPersist++
|
||||
}
|
||||
}
|
||||
|
@ -209,7 +209,7 @@ func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time)
|
||||
// The caller must have locked the fingerprint of the series.
|
||||
func (s *memorySeries) add(v *model.SamplePair) int {
|
||||
if len(s.chunkDescs) == 0 || s.headChunkClosed {
|
||||
newHead := newChunkDesc(newChunk())
|
||||
newHead := newChunkDesc(newChunk(), v.Timestamp)
|
||||
s.chunkDescs = append(s.chunkDescs, newHead)
|
||||
s.headChunkClosed = false
|
||||
} else if s.headChunkUsedByIterator && s.head().refCount() > 1 {
|
||||
@ -233,7 +233,12 @@ func (s *memorySeries) add(v *model.SamplePair) int {
|
||||
s.head().c = chunks[0]
|
||||
|
||||
for _, c := range chunks[1:] {
|
||||
s.chunkDescs = append(s.chunkDescs, newChunkDesc(c))
|
||||
s.chunkDescs = append(s.chunkDescs, newChunkDesc(c, c.firstTime()))
|
||||
}
|
||||
|
||||
// Populate lastTime of now-closed chunks.
|
||||
for _, cd := range s.chunkDescs[len(s.chunkDescs)-len(chunks) : len(s.chunkDescs)-1] {
|
||||
cd.maybePopulateLastTime()
|
||||
}
|
||||
|
||||
s.lastTime = v.Timestamp
|
||||
@ -254,6 +259,7 @@ func (s *memorySeries) maybeCloseHeadChunk() bool {
|
||||
// Since we cannot modify the head chunk from now on, we
|
||||
// don't need to bother with cloning anymore.
|
||||
s.headChunkUsedByIterator = false
|
||||
s.head().maybePopulateLastTime()
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
Loading…
Reference in New Issue
Block a user