From d215e013b78255152598475938302e42444614f6 Mon Sep 17 00:00:00 2001 From: Bjoern Rabenstein Date: Mon, 27 Oct 2014 20:40:48 +0100 Subject: [PATCH] Fix the weird chunkDesc shuffling bug. The root cause was that after chunkDesc eviction, the offset between memory representation of chunk layout (via chunkDescs in memory) was shiftet against chunks as layed out on disk. Keeping the offset up to date is by no means trivial, so this commit is pretty involved. Also, found a race that for some reason didn't bite us so far: Persisting chunks was completel unlocked, so if chunks were purged on disk at the same time, disaster would strike. However, locking the persisting of chunk revealed interesting dead locks. Basically, never queue under the fp lock. Change-Id: I1ea9e4e71024cabbc1f9601b28e74db0c5c55db8 --- storage/local/persistence.go | 106 ++++++++++++++++++++---------- storage/local/persistence_test.go | 10 ++- storage/local/series.go | 105 +++++++++++++++-------------- storage/local/storage.go | 93 ++++++++++++++++---------- storage/local/storage_test.go | 2 +- 5 files changed, 193 insertions(+), 123 deletions(-) diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 4444840fe..b7a56520d 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -58,8 +58,10 @@ const ( ) const ( - flagChunkDescsLoaded byte = 1 << iota - flagHeadChunkPersisted + flagHeadChunkPersisted byte = 1 << iota + // Add more flags here like: + // flagFoo + // flagBar ) type indexingOpType byte @@ -228,34 +230,53 @@ func (p *persistence) getLabelValuesForLabelName(ln clientmodel.LabelName) (clie } // persistChunk persists a single chunk of a series. It is the caller's -// responsibility to not modify chunk concurrently and to not persist or drop anything -// for the same fingerprint concurrently. -func (p *persistence) persistChunk(fp clientmodel.Fingerprint, c chunk) error { +// responsibility to not modify chunk concurrently and to not persist or drop +// anything for the same fingerprint concurrently. It returns the (zero-based) +// index of the persisted chunk within the series file. In case of an error, the +// returned index is -1 (to avoid the misconception that the chunk was written +// at position 0). +func (p *persistence) persistChunk(fp clientmodel.Fingerprint, c chunk) (int, error) { // 1. Open chunk file. f, err := p.openChunkFileForWriting(fp) if err != nil { - return err + return -1, err } defer f.Close() b := bufio.NewWriterSize(f, chunkHeaderLen+p.chunkLen) - defer b.Flush() // 2. Write the header (chunk type and first/last times). err = writeChunkHeader(b, c) if err != nil { - return err + return -1, err } // 3. Write chunk into file. - return c.marshal(b) + err = c.marshal(b) + if err != nil { + return -1, err + } + + // 4. Determine index within the file. + b.Flush() + offset, err := f.Seek(0, os.SEEK_CUR) + if err != nil { + return -1, err + } + index, err := p.chunkIndexForOffset(offset) + if err != nil { + return -1, err + } + + return index - 1, err } // loadChunks loads a group of chunks of a timeseries by their index. The chunk // with the earliest time will have index 0, the following ones will have -// incrementally larger indexes. It is the caller's responsibility to not -// persist or drop anything for the same fingerprint concurrently. -func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int) ([]chunk, error) { +// incrementally larger indexes. The indexOffset denotes the offset to be added to +// each index in indexes. It is the caller's responsibility to not persist or +// drop anything for the same fingerprint concurrently. +func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int, indexOffset int) ([]chunk, error) { // TODO: we need to verify at some point that file length is a multiple of // the chunk size. When is the best time to do this, and where to remember // it? Right now, we only do it when loading chunkDescs. @@ -268,7 +289,7 @@ func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int) ([]c chunks := make([]chunk, 0, len(indexes)) typeBuf := make([]byte, 1) for _, idx := range indexes { - _, err := f.Seek(p.offsetForChunkIndex(idx), os.SEEK_SET) + _, err := f.Seek(p.offsetForChunkIndex(idx+indexOffset), os.SEEK_SET) if err != nil { return nil, err } @@ -339,7 +360,7 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie chunkFirstTime: clientmodel.Timestamp(binary.LittleEndian.Uint64(chunkTimesBuf)), chunkLastTime: clientmodel.Timestamp(binary.LittleEndian.Uint64(chunkTimesBuf[8:])), } - if !cd.firstTime().Before(beforeTime) { + if cd.chunkLastTime.After(beforeTime) { // From here on, we have chunkDescs in memory already. break } @@ -412,9 +433,6 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap } realNumberOfSeries++ var seriesFlags byte - if m.series.chunkDescsLoaded { - seriesFlags |= flagChunkDescsLoaded - } if m.series.headChunkPersisted { seriesFlags |= flagHeadChunkPersisted } @@ -430,6 +448,9 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap return } w.Write(buf) + if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset)); err != nil { + return + } if _, err = codable.EncodeVarint(w, int64(len(m.series.chunkDescs))); err != nil { return } @@ -523,6 +544,10 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) { if err := metric.UnmarshalFromReader(r); err != nil { return nil, err } + chunkDescsOffset, err := binary.ReadVarint(r) + if err != nil { + return nil, err + } numChunkDescs, err := binary.ReadVarint(r) if err != nil { return nil, err @@ -562,7 +587,7 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) { fingerprintToSeries[clientmodel.Fingerprint(fp)] = &memorySeries{ metric: clientmodel.Metric(metric), chunkDescs: chunkDescs, - chunkDescsLoaded: seriesFlags&flagChunkDescsLoaded != 0, + chunkDescsOffset: int(chunkDescsOffset), headChunkPersisted: headChunkPersisted, } } @@ -572,24 +597,25 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) { } // dropChunks deletes all chunks from a series whose last sample time is before -// beforeTime. It returns true if all chunks of the series have been deleted. -// It is the caller's responsibility to make sure nothing is persisted or loaded -// for the same fingerprint concurrently. -func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (bool, error) { +// beforeTime. It returns the number of deleted chunks and true if all chunks of +// the series have been deleted. It is the caller's responsibility to make sure +// nothing is persisted or loaded for the same fingerprint concurrently. +func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (int, bool, error) { f, err := p.openChunkFileForReading(fp) if os.IsNotExist(err) { - return true, nil + return 0, true, nil } if err != nil { - return false, err + return 0, false, err } defer f.Close() // Find the first chunk that should be kept. - for i := 0; ; i++ { + var i int + for ; ; i++ { _, err := f.Seek(p.offsetForChunkIndex(i)+chunkHeaderLastTimeOffset, os.SEEK_SET) if err != nil { - return false, err + return 0, false, err } lastTimeBuf := make([]byte, 8) _, err = io.ReadAtLeast(f, lastTimeBuf, 8) @@ -598,12 +624,12 @@ func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmo // be kept. Remove the whole file. chunkOps.WithLabelValues(purge).Add(float64(i)) if err := os.Remove(f.Name()); err != nil { - return true, err + return 0, true, err } - return true, nil + return i, true, nil } if err != nil { - return false, err + return 0, false, err } lastTime := clientmodel.Timestamp(binary.LittleEndian.Uint64(lastTimeBuf)) if !lastTime.Before(beforeTime) { @@ -617,21 +643,23 @@ func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmo // file. _, err = f.Seek(-(chunkHeaderLastTimeOffset + 8), os.SEEK_CUR) if err != nil { - return false, err + return 0, false, err } temp, err := os.OpenFile(p.tempFileNameForFingerprint(fp), os.O_WRONLY|os.O_CREATE, 0640) if err != nil { - return false, err + return 0, false, err } defer temp.Close() if _, err := io.Copy(temp, f); err != nil { - return false, err + return 0, false, err } - os.Rename(p.tempFileNameForFingerprint(fp), p.fileNameForFingerprint(fp)) - return false, nil + if err := os.Rename(p.tempFileNameForFingerprint(fp), p.fileNameForFingerprint(fp)); err != nil { + return 0, false, err + } + return i, false, nil } // indexMetric queues the given metric for addition to the indexes needed by @@ -836,6 +864,16 @@ func (p *persistence) offsetForChunkIndex(i int) int64 { return int64(i * (chunkHeaderLen + p.chunkLen)) } +func (p *persistence) chunkIndexForOffset(offset int64) (int, error) { + if int(offset)%(chunkHeaderLen+p.chunkLen) != 0 { + return -1, fmt.Errorf( + "offset %d is not a multiple of on-disk chunk length %d", + offset, chunkHeaderLen+p.chunkLen, + ) + } + return int(offset) / (chunkHeaderLen + p.chunkLen), nil +} + func (p *persistence) headsFileName() string { return path.Join(p.basePath, headsFileName) } diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 66b09a8a7..52bfb1d86 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -82,10 +82,14 @@ func TestPersistChunk(t *testing.T) { fpToChunks := buildTestChunks() for fp, chunks := range fpToChunks { - for _, c := range chunks { - if err := p.persistChunk(fp, c); err != nil { + for i, c := range chunks { + index, err := p.persistChunk(fp, c) + if err != nil { t.Fatal(err) } + if i != index { + t.Errorf("Want chunk index %d, got %d.", i, index) + } } } @@ -94,7 +98,7 @@ func TestPersistChunk(t *testing.T) { for i := range expectedChunks { indexes = append(indexes, i) } - actualChunks, err := p.loadChunks(fp, indexes) + actualChunks, err := p.loadChunks(fp, indexes, 0) if err != nil { t.Fatal(err) } diff --git a/storage/local/series.go b/storage/local/series.go index b77df8c63..a7b47e5ad 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -136,10 +136,16 @@ type memorySeries struct { metric clientmodel.Metric // Sorted by start time, overlapping chunk ranges are forbidden. chunkDescs []*chunkDesc - // Whether chunkDescs for chunks on disk are all loaded. If false, some - // (or all) chunkDescs are only on disk. These chunks are all contiguous - // and at the tail end. - chunkDescsLoaded bool + // The chunkDescs in memory might not have all the chunkDescs for the + // chunks that are persisted to disk. The missing chunkDescs are all + // contiguous and at the tail end. chunkDescsOffset is the index of the + // chunk on disk that corresponds to the first chunkDesc in memory. If + // it is 0, the chunkDescs are all loaded. A value of -1 denotes a + // special case: There are chunks on disk, but the offset to the + // chunkDescs in memory is unknown. Also, there is no overlap between + // chunks on disk and chunks in memory (implying that upon first + // persiting of a chunk in memory, the offset has to be set). + chunkDescsOffset int // Whether the current head chunk has already been scheduled to be // persisted. If true, the current head chunk must not be modified // anymore. @@ -155,16 +161,20 @@ type memorySeries struct { // or (if false) a series for a metric being unarchived, i.e. a series that // existed before but has been evicted from memory. func newMemorySeries(m clientmodel.Metric, reallyNew bool) *memorySeries { - return &memorySeries{ + s := memorySeries{ metric: m, - chunkDescsLoaded: reallyNew, headChunkPersisted: !reallyNew, } + if !reallyNew { + s.chunkDescsOffset = -1 + } + return &s } // add adds a sample pair to the series. +// It returns chunkDescs that must be queued to be persisted. // The caller must have locked the fingerprint of the series. -func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, persistQueue chan *persistRequest) { +func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair) []*chunkDesc { if len(s.chunkDescs) == 0 || s.headChunkPersisted { newHead := newChunkDesc(newDeltaEncodedChunk(d1, d0, true)) s.chunkDescs = append(s.chunkDescs, newHead) @@ -187,33 +197,27 @@ func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, per } chunks := s.head().add(v) - s.head().chunk = chunks[0] + + var chunkDescsToPersist []*chunkDesc if len(chunks) > 1 { - queuePersist := func(cd *chunkDesc) { - persistQueue <- &persistRequest{ - fingerprint: fp, - chunkDesc: cd, - } - } - - queuePersist(s.head()) - + chunkDescsToPersist = append(chunkDescsToPersist, s.head()) for i, c := range chunks[1:] { cd := newChunkDesc(c) s.chunkDescs = append(s.chunkDescs, cd) // The last chunk is still growing. if i < len(chunks[1:])-1 { - queuePersist(cd) + chunkDescsToPersist = append(chunkDescsToPersist, cd) } } } + return chunkDescsToPersist } // evictOlderThan marks for eviction all chunks whose latest sample is older -// than the given timestamp. It returns true if all chunks in the series were -// immediately evicted (i.e. all chunks are older than the timestamp, and none -// of the chunks was pinned). +// than the given timestamp. It returns allEvicted as true if all chunks in the +// series were immediately evicted (i.e. all chunks are older than the +// timestamp, and none of the chunks was pinned). // // The method also evicts chunkDescs if there are chunkDescEvictionFactor times // more chunkDescs in the series than unevicted chunks. (The number of unevicted @@ -222,25 +226,22 @@ func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, per // series, even if chunks in between were evicted.) // // Special considerations for the head chunk: If it has not been scheduled to be -// persisted yet but is old enough for eviction, the scheduling happens now. (To -// do that, the method neets the fingerprint and the persist queue.) It is -// likely that the actual persisting will not happen soon enough to immediately -// evict the head chunk, though. Thus, calling evictOlderThan for a series with -// a non-persisted head chunk will most likely return false, even if no chunk is -// pinned for other reasons. A series old enough for archiving will usually +// persisted yet but is old enough for eviction, this method returns +// persistHeadChunk as true. The caller is then responsible for persisting the +// head chunk. The internal state of this memorySeries is already set +// accordingly by this method. Calling evictOlderThan for a series with a +// non-persisted head chunk that is old enough for eviction will never evict all +// chunks immediately, even if no chunk is pinned for other reasons, because the +// head chunk is not persisted yet. A series old enough for archiving will // require at least two eviction runs to become ready for archiving: In the -// first run, its head chunk is scheduled to be persisted. The next call of +// first run, its head chunk is requested to be persisted. The next call of // evictOlderThan will then return true, provided that the series hasn't // received new samples in the meantime, the head chunk has now been persisted, // and no chunk is pinned for other reasons. // // The caller must have locked the fingerprint of the series. -func (s *memorySeries) evictOlderThan( - t clientmodel.Timestamp, - fp clientmodel.Fingerprint, - persistQueue chan *persistRequest, -) bool { - allEvicted := true +func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) (allEvicted bool, persistHeadChunk bool) { + allEvicted = true iOldestNotEvicted := -1 defer func() { @@ -250,8 +251,8 @@ func (s *memorySeries) evictOlderThan( if iOldestNotEvicted != -1 { lenToKeep := chunkDescEvictionFactor * (len(s.chunkDescs) - iOldestNotEvicted) if lenToKeep < len(s.chunkDescs) { - s.chunkDescsLoaded = false lenEvicted := len(s.chunkDescs) - lenToKeep + s.chunkDescsOffset += lenEvicted chunkDescOps.WithLabelValues(evict).Add(float64(lenEvicted)) atomic.AddInt64(&numMemChunkDescs, -int64(lenEvicted)) s.chunkDescs = append( @@ -268,22 +269,19 @@ func (s *memorySeries) evictOlderThan( if iOldestNotEvicted == -1 { iOldestNotEvicted = i } - return false + return false, false } if cd.isEvicted() { continue } if !s.headChunkPersisted && i == len(s.chunkDescs)-1 { // This is a non-persisted head chunk that is old enough - // for eviction. Queue it to be persisted: + // for eviction. Request it to be persisted: + persistHeadChunk = true s.headChunkPersisted = true // Since we cannot modify the head chunk from now on, we // don't need to bother with cloning anymore. s.headChunkUsedByIterator = false - persistQueue <- &persistRequest{ - fingerprint: fp, - chunkDesc: cd, - } } if !cd.evictOnUnpin() { if iOldestNotEvicted == -1 { @@ -292,13 +290,15 @@ func (s *memorySeries) evictOlderThan( allEvicted = false } } - return allEvicted + return allEvicted, persistHeadChunk } -// purgeOlderThan returns true if all chunks have been purged. +// purgeOlderThan removes chunkDescs older than t. It also evicts the chunks of +// those chunkDescs (although that's probably not even necessary). It returns +// the number of purged chunkDescs and true if all chunkDescs have been purged. // // The caller must have locked the fingerprint of the series. -func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) bool { +func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) (int, bool) { keepIdx := len(s.chunkDescs) for i, cd := range s.chunkDescs { if !cd.lastTime().Before(t) { @@ -307,9 +307,11 @@ func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) bool { } s.chunkDescs[i].evictOnUnpin() } - s.chunkDescs = append(make([]*chunkDesc, 0, len(s.chunkDescs)-keepIdx), s.chunkDescs[keepIdx:]...) - atomic.AddInt64(&numMemChunkDescs, -int64(keepIdx)) - return len(s.chunkDescs) == 0 + if keepIdx > 0 { + s.chunkDescs = append(make([]*chunkDesc, 0, len(s.chunkDescs)-keepIdx), s.chunkDescs[keepIdx:]...) + atomic.AddInt64(&numMemChunkDescs, -int64(keepIdx)) + } + return keepIdx, len(s.chunkDescs) == 0 } // preloadChunks is an internal helper method. @@ -327,8 +329,11 @@ func (s *memorySeries) preloadChunks(indexes []int, p *persistence) ([]*chunkDes chunkOps.WithLabelValues(pin).Add(float64(len(pinnedChunkDescs))) if len(loadIndexes) > 0 { + if s.chunkDescsOffset == -1 { + panic("requested loading chunks from persistence in a situation where we must not have persisted data for chunk descriptors in memory") + } fp := s.metric.Fingerprint() - chunks, err := p.loadChunks(fp, loadIndexes) + chunks, err := p.loadChunks(fp, loadIndexes, s.chunkDescsOffset) if err != nil { // Unpin the chunks since we won't return them as pinned chunks now. for _, cd := range pinnedChunkDescs { @@ -388,13 +393,13 @@ func (s *memorySeries) preloadChunksForRange( if len(s.chunkDescs) > 0 { firstChunkDescTime = s.chunkDescs[0].firstTime() } - if !s.chunkDescsLoaded && from.Before(firstChunkDescTime) { + if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) { cds, err := p.loadChunkDescs(fp, firstChunkDescTime) if err != nil { return nil, err } s.chunkDescs = append(cds, s.chunkDescs...) - s.chunkDescsLoaded = true + s.chunkDescsOffset = 0 } if len(s.chunkDescs) == 0 { diff --git a/storage/local/storage.go b/storage/local/storage.go index 5ee336b2a..cd252dc11 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -43,15 +43,15 @@ type persistRequest struct { } type memorySeriesStorage struct { - fpLocker *fingerprintLocker - fingerprintToSeries *seriesMap + fpLocker *fingerprintLocker + fpToSeries *seriesMap loopStopping, loopStopped chan struct{} evictInterval, evictAfter time.Duration purgeInterval, purgeAfter time.Duration checkpointInterval time.Duration - persistQueue chan *persistRequest + persistQueue chan persistRequest persistStopped chan struct{} persistence *persistence @@ -84,22 +84,22 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { return nil, err } glog.Info("Loading series map and head chunks...") - fingerprintToSeries, err := p.loadSeriesMapAndHeads() + fpToSeries, err := p.loadSeriesMapAndHeads() if err != nil { return nil, err } - glog.Infof("%d series loaded.", fingerprintToSeries.length()) + glog.Infof("%d series loaded.", fpToSeries.length()) numSeries := prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "memory_series", Help: "The current number of series in memory.", }) - numSeries.Set(float64(fingerprintToSeries.length())) + numSeries.Set(float64(fpToSeries.length())) return &memorySeriesStorage{ - fpLocker: newFingerprintLocker(100), // TODO: Tweak value. - fingerprintToSeries: fingerprintToSeries, + fpLocker: newFingerprintLocker(100), // TODO: Tweak value. + fpToSeries: fpToSeries, loopStopping: make(chan struct{}), loopStopped: make(chan struct{}), @@ -109,7 +109,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { purgeAfter: o.PersistenceRetentionPeriod, checkpointInterval: o.CheckpointInterval, - persistQueue: make(chan *persistRequest, persistQueueCap), + persistQueue: make(chan persistRequest, persistQueueCap), persistStopped: make(chan struct{}), persistence: p, @@ -182,7 +182,7 @@ func (s *memorySeriesStorage) Stop() error { <-s.persistStopped // One final checkpoint of the series map and the head chunks. - if err := s.persistence.checkpointSeriesMapAndHeads(s.fingerprintToSeries, s.fpLocker); err != nil { + if err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker); err != nil { return err } @@ -202,7 +202,7 @@ func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIter s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - series, ok := s.fingerprintToSeries.get(fp) + series, ok := s.fpToSeries.get(fp) if !ok { // Oops, no series for fp found. That happens if, after // preloading is done, the whole series is identified as old @@ -301,7 +301,7 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - series, ok := s.fingerprintToSeries.get(fp) + series, ok := s.fpToSeries.get(fp) if ok { // Copy required here because caller might mutate the returned // metric. @@ -330,17 +330,21 @@ func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) { func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) { fp := sample.Metric.Fingerprint() s.fpLocker.Lock(fp) - defer s.fpLocker.Unlock(fp) - series := s.getOrCreateSeries(fp, sample.Metric) - series.add(fp, &metric.SamplePair{ + chunkDescsToPersist := series.add(fp, &metric.SamplePair{ Value: sample.Value, Timestamp: sample.Timestamp, - }, s.persistQueue) + }) + s.fpLocker.Unlock(fp) + // Queue only outside of the locked area, processing the persistQueue + // requires the same lock! + for _, cd := range chunkDescsToPersist { + s.persistQueue <- persistRequest{fp, cd} + } } func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries { - series, ok := s.fingerprintToSeries.get(fp) + series, ok := s.fpToSeries.get(fp) if !ok { unarchived, err := s.persistence.unarchiveMetric(fp) if err != nil { @@ -354,7 +358,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m cl s.seriesOps.WithLabelValues(create).Inc() } series = newMemorySeries(m, !unarchived) - s.fingerprintToSeries.put(fp, series) + s.fpToSeries.put(fp, series) s.numSeries.Inc() } return series @@ -362,7 +366,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m cl /* func (s *memorySeriesStorage) preloadChunksAtTime(fp clientmodel.Fingerprint, ts clientmodel.Timestamp) (chunkDescs, error) { - series, ok := s.fingerprintToSeries.get(fp) + series, ok := s.fpToSeries.get(fp) if !ok { panic("requested preload for non-existent series") } @@ -378,7 +382,7 @@ func (s *memorySeriesStorage) preloadChunksForRange( s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - series, ok := s.fingerprintToSeries.get(fp) + series, ok := s.fpToSeries.get(fp) if !ok { has, first, last, err := s.persistence.hasArchivedMetric(fp) if err != nil { @@ -404,12 +408,21 @@ func (s *memorySeriesStorage) handlePersistQueue() { for req := range s.persistQueue { s.persistQueueLength.Set(float64(len(s.persistQueue))) start := time.Now() - err := s.persistence.persistChunk(req.fingerprint, req.chunkDesc.chunk) + s.fpLocker.Lock(req.fingerprint) + offset, err := s.persistence.persistChunk(req.fingerprint, req.chunkDesc.chunk) + if series, seriesInMemory := s.fpToSeries.get(req.fingerprint); err == nil && seriesInMemory && series.chunkDescsOffset == -1 { + // This is the first chunk persisted for a newly created + // series that had prior chunks on disk. Finally, we can + // set the chunkDescsOffset. + series.chunkDescsOffset = offset + } + s.fpLocker.Unlock(req.fingerprint) s.persistLatency.Observe(float64(time.Since(start)) / float64(time.Microsecond)) if err != nil { s.persistErrors.WithLabelValues(err.Error()).Inc() - glog.Error("Error persisting chunk, requeuing: ", err) - s.persistQueue <- req + glog.Error("Error persisting chunk: ", err) + glog.Error("The storage is now inconsistent. Prepare for disaster.") + // TODO: Remove respective chunkDesc to at least be consistent? continue } req.chunkDesc.unpin() @@ -436,13 +449,13 @@ func (s *memorySeriesStorage) loop() { case <-s.loopStopping: return case <-checkpointTicker.C: - s.persistence.checkpointSeriesMapAndHeads(s.fingerprintToSeries, s.fpLocker) + s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker) case <-evictTicker.C: // TODO: Change this to be based on number of chunks in memory. glog.Info("Evicting chunks...") begin := time.Now() - for m := range s.fingerprintToSeries.iter() { + for m := range s.fpToSeries.iter() { select { case <-s.loopStopping: glog.Info("Interrupted evicting chunks.") @@ -451,11 +464,11 @@ func (s *memorySeriesStorage) loop() { // Keep going. } s.fpLocker.Lock(m.fp) - if m.series.evictOlderThan( - clientmodel.TimestampFromTime(time.Now()).Add(-1*s.evictAfter), - m.fp, s.persistQueue, - ) { - s.fingerprintToSeries.del(m.fp) + allEvicted, persistHeadChunk := m.series.evictOlderThan( + clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.evictAfter), + ) + if allEvicted { + s.fpToSeries.del(m.fp) s.numSeries.Dec() if err := s.persistence.archiveMetric( m.fp, m.series.metric, m.series.firstTime(), m.series.lastTime(), @@ -466,6 +479,10 @@ func (s *memorySeriesStorage) loop() { } } s.fpLocker.Unlock(m.fp) + // Queue outside of lock! + if persistHeadChunk { + s.persistQueue <- persistRequest{m.fp, m.series.head()} + } } duration := time.Since(begin) @@ -476,7 +493,7 @@ func (s *memorySeriesStorage) loop() { ts := clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.purgeAfter) begin := time.Now() - for fp := range s.fingerprintToSeries.fpIter() { + for fp := range s.fpToSeries.fpIter() { select { case <-s.loopStopping: glog.Info("Interrupted purging series.") @@ -515,18 +532,24 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime defer s.fpLocker.Unlock(fp) // First purge persisted chunks. We need to do that anyway. - allDropped, err := s.persistence.dropChunks(fp, beforeTime) + numDropped, allDropped, err := s.persistence.dropChunks(fp, beforeTime) if err != nil { glog.Error("Error purging persisted chunks: ", err) } // Purge chunks from memory accordingly. - if series, ok := s.fingerprintToSeries.get(fp); ok { - if series.purgeOlderThan(beforeTime) && allDropped { - s.fingerprintToSeries.del(fp) + if series, ok := s.fpToSeries.get(fp); ok { + numPurged, allPurged := series.purgeOlderThan(beforeTime) + if allPurged && allDropped { + s.fpToSeries.del(fp) s.numSeries.Dec() s.seriesOps.WithLabelValues(memoryPurge).Inc() s.persistence.unindexMetric(series.metric, fp) + } else if series.chunkDescsOffset != -1 { + series.chunkDescsOffset += numPurged - numDropped + if series.chunkDescsOffset < 0 { + panic("dropped more chunks from persistence than from memory") + } } return } diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 8c905b511..ef5136109 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -37,7 +37,7 @@ func TestChunk(t *testing.T) { s.AppendSamples(samples) - for m := range s.(*memorySeriesStorage).fingerprintToSeries.iter() { + for m := range s.(*memorySeriesStorage).fpToSeries.iter() { for i, v := range m.series.values() { if samples[i].Timestamp != v.Timestamp { t.Fatalf("%d. Got %v; want %v", i, v.Timestamp, samples[i].Timestamp)