From dbc22b972ce8d2d6c12a9c8e6a43004bdeefc943 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Thu, 26 Feb 2015 23:40:35 +0100 Subject: [PATCH 1/4] Check last time in head chunk for head chunk timeout, not first. --- storage/local/storage.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/local/storage.go b/storage/local/storage.go index 916da1139..b9955f008 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -915,7 +915,7 @@ func (s *memorySeriesStorage) maintainMemorySeries(fp clientmodel.Fingerprint, b // If we are here, the series is not archived, so check for chunkDesc // eviction next and then if the head chunk needs to be persisted. series.evictChunkDescs(iOldestNotEvicted) - if !series.headChunkPersisted && time.Now().Sub(series.head().firstTime().Time()) > headChunkTimeout { + if !series.headChunkPersisted && time.Now().Sub(series.head().lastTime().Time()) > headChunkTimeout { series.headChunkPersisted = true // Since we cannot modify the head chunk from now on, we // don't need to bother with cloning anymore. From 9406afad723a966c58bc1bb993e80e591324ebc4 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Fri, 27 Feb 2015 00:06:16 +0100 Subject: [PATCH 2/4] Do not double-count non-persisted head chunks on loading. --- storage/local/persistence.go | 6 ++---- storage/local/storage.go | 6 ++++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/storage/local/persistence.go b/storage/local/persistence.go index b152b4e41..15b318228 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -22,7 +22,6 @@ import ( "path" "path/filepath" "sync" - "sync/atomic" "time" "github.com/golang/glog" @@ -586,7 +585,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap // this method during start-up while nothing else is running in storage // land. This method is utterly goroutine-unsafe. func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) { - var chunksTotal, chunkDescsTotal int64 + var chunkDescsTotal int64 fingerprintToSeries := make(map[clientmodel.Fingerprint]*memorySeries) sm = &seriesMap{m: fingerprintToSeries} @@ -599,7 +598,6 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) { } } if err == nil { - atomic.AddInt64(&numMemChunks, chunksTotal) numMemChunkDescs.Add(float64(chunkDescsTotal)) } }() @@ -704,7 +702,6 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) { } } else { // Non-persisted head chunk. - chunksTotal++ chunkType, err := r.ReadByte() if err != nil { glog.Warning("Could not decode chunk type:", err) @@ -718,6 +715,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) { return sm, nil } chunkDescs[i] = newChunkDesc(chunk) + chunkDescsTotal-- // Avoid double-counting by newChunkDesc. } } diff --git a/storage/local/storage.go b/storage/local/storage.go index b9955f008..1ff49bb12 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -1027,8 +1027,10 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) { ch <- s.ingestedSamplesCount ch <- s.invalidPreloadRequestsCount - count := atomic.LoadInt64(&numMemChunks) - ch <- prometheus.MustNewConstMetric(numMemChunksDesc, prometheus.GaugeValue, float64(count)) + ch <- prometheus.MustNewConstMetric( + numMemChunksDesc, + prometheus.GaugeValue, + float64(atomic.LoadInt64(&numMemChunks))) } // chunkMaps is a slice of maps with chunkDescs to be persisted. From 1db7589081f7428c4f6df29810486b57befddf61 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Fri, 27 Feb 2015 00:53:52 +0100 Subject: [PATCH 3/4] Reduce the capacity of countPersistedHeadChunks. The capacity is basically how many persisted head chunks we will count at most while doing other things, in particular checkpointing. To limit the amount of already counted head chunks, keep this number low, otherwise we will easily checkpoint too often if checkpoints take long anyway. --- storage/local/storage.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/local/storage.go b/storage/local/storage.go index 1ff49bb12..a18f92aca 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -154,7 +154,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { persistStopped: make(chan struct{}), persistence: p, - countPersistedHeadChunks: make(chan struct{}, 1024), + countPersistedHeadChunks: make(chan struct{}, 100), evictList: list.New(), evictRequests: make(chan evictRequest, evictRequestsCap), From 92991026bb83020f09bc1eee212bfc6daaabd41f Mon Sep 17 00:00:00 2001 From: beorn7 Date: Fri, 27 Feb 2015 02:21:12 +0100 Subject: [PATCH 4/4] Fix chunkDescsTotal count in case of errors. Only increment the counter if we actually add the memory series to the fingerprintToSeries map. --- storage/local/persistence.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 15b318228..ca6f822b4 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -680,7 +680,6 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) { return sm, nil } chunkDescs := make([]*chunkDesc, numChunkDescs) - chunkDescsTotal += numChunkDescs for i := int64(0); i < numChunkDescs; i++ { if headChunkPersisted || i < numChunkDescs-1 { @@ -715,10 +714,16 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) { return sm, nil } chunkDescs[i] = newChunkDesc(chunk) - chunkDescsTotal-- // Avoid double-counting by newChunkDesc. } } + chunkDescsTotal += numChunkDescs + if !headChunkPersisted { + // In this case, we have created a chunkDesc with + // newChunkDesc, which will count itself automatically. + // Correct for that by decrementing the count. + chunkDescsTotal-- + } fingerprintToSeries[clientmodel.Fingerprint(fp)] = &memorySeries{ metric: clientmodel.Metric(metric), chunkDescs: chunkDescs,