diff --git a/storage/local/chunk.go b/storage/local/chunk.go index b60e8767c..4e811bf20 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -68,6 +68,13 @@ func (cd *chunkDesc) unpin() { } } +func (cd *chunkDesc) getRefCount() int { + cd.Lock() + defer cd.Unlock() + + return cd.refCount +} + func (cd *chunkDesc) firstTime() clientmodel.Timestamp { cd.Lock() defer cd.Unlock() diff --git a/storage/local/series.go b/storage/local/series.go index 6bf323b35..b77df8c63 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -144,6 +144,10 @@ type memorySeries struct { // persisted. If true, the current head chunk must not be modified // anymore. headChunkPersisted bool + // Whether the current head chunk is used by an iterator. In that case, + // a non-persisted head chunk has to be cloned before more samples are + // appended. + headChunkUsedByIterator bool } // newMemorySeries returns a pointer to a newly allocated memorySeries for the @@ -165,6 +169,21 @@ func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, per newHead := newChunkDesc(newDeltaEncodedChunk(d1, d0, true)) s.chunkDescs = append(s.chunkDescs, newHead) s.headChunkPersisted = false + } else if s.headChunkUsedByIterator && s.head().getRefCount() > 1 { + // We only need to clone the head chunk if the current head + // chunk was used in an iterator at all and if the refCount is + // still greater than the 1 we always have because the head + // chunk is not yet persisted. The latter is just an + // approximation. We will still clone unnecessarily if an older + // iterator using a previous version of the head chunk is still + // around and keep the head chunk pinned. We needed to track + // pins by version of the head chunk, which is probably not + // worth the effort. + chunkOps.WithLabelValues(clone).Inc() + // No locking needed here because a non-persisted head chunk can + // not get evicted concurrently. + s.head().chunk = s.head().chunk.clone() + s.headChunkUsedByIterator = false } chunks := s.head().add(v) @@ -258,6 +277,9 @@ func (s *memorySeries) evictOlderThan( // This is a non-persisted head chunk that is old enough // for eviction. Queue it to be persisted: s.headChunkPersisted = true + // Since we cannot modify the head chunk from now on, we + // don't need to bother with cloning anymore. + s.headChunkUsedByIterator = false persistQueue <- &persistRequest{ fingerprint: fp, chunkDesc: cd, @@ -401,16 +423,16 @@ func (s *memorySeries) preloadChunksForRange( return s.preloadChunks(pinIndexes, p) } +// newIterator returns a new SeriesIterator. The caller must have locked the +// fingerprint of the memorySeries. func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator { chunks := make([]chunk, 0, len(s.chunkDescs)) for i, cd := range s.chunkDescs { if !cd.isEvicted() { - if i == len(s.chunkDescs)-1 { - chunkOps.WithLabelValues(clone).Inc() - chunks = append(chunks, cd.chunk.clone()) - } else { - chunks = append(chunks, cd.chunk) + if i == len(s.chunkDescs)-1 && !s.headChunkPersisted { + s.headChunkUsedByIterator = true } + chunks = append(chunks, cd.chunk) } }