diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index c3206fa53..0d2305a63 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -78,8 +78,8 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge // Oops, head chunk was persisted, but nothing on disk. // Thus, we lost that series completely. Clean up the remnants. delete(fingerprintToSeries, fp) - if err := p.dropArchivedMetric(fp); err != nil { - // Dropping the archived metric didn't work, so try + if err := p.purgeArchivedMetric(fp); err != nil { + // Purging the archived metric didn't work, so try // to unindex it, just in case it's in the indexes. p.unindexMetric(fp, s.metric) } diff --git a/storage/local/instrumentation.go b/storage/local/instrumentation.go index 48af8df80..5da933e36 100644 --- a/storage/local/instrumentation.go +++ b/storage/local/instrumentation.go @@ -67,7 +67,7 @@ const ( unpin = "unpin" // Excluding the unpin on persisting. clone = "clone" transcode = "transcode" - purge = "purge" + drop = "drop" // Op-types for chunkOps and chunkDescOps. evict = "evict" diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 5ce731be7..b152b4e41 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -771,7 +771,7 @@ func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmo if err == io.EOF { // We ran into the end of the file without finding any chunks that should // be kept. Remove the whole file. - chunkOps.WithLabelValues(purge).Add(float64(i)) + chunkOps.WithLabelValues(drop).Add(float64(i)) if err := os.Remove(f.Name()); err != nil { return 0, 0, true, err } @@ -783,7 +783,7 @@ func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmo lastTime := clientmodel.Timestamp(binary.LittleEndian.Uint64(timeBuf[8:])) if !lastTime.Before(beforeTime) { firstTime = clientmodel.Timestamp(binary.LittleEndian.Uint64(timeBuf)) - chunkOps.WithLabelValues(purge).Add(float64(i)) + chunkOps.WithLabelValues(drop).Add(float64(i)) break } } @@ -824,7 +824,7 @@ func (p *persistence) indexMetric(fp clientmodel.Fingerprint, m clientmodel.Metr // indexes used for getFingerprintsForLabelPair, getLabelValuesForLabelName, and // getFingerprintsModifiedBefore. The index of fingerprints to archived metrics // is not affected by this removal. (In fact, never call this method for an -// archived metric. To drop an archived metric, call dropArchivedFingerprint.) +// archived metric. To purge an archived metric, call purgeArchivedFingerprint.) // If the queue is full, this method blocks until the metric can be queued. This // method is goroutine-safe. func (p *persistence) unindexMetric(fp clientmodel.Fingerprint, m clientmodel.Metric) { @@ -910,11 +910,11 @@ func (p *persistence) getArchivedMetric(fp clientmodel.Fingerprint) (clientmodel return metric, err } -// dropArchivedMetric deletes an archived fingerprint and its corresponding +// purgeArchivedMetric deletes an archived fingerprint and its corresponding // metric entirely. It also queues the metric for un-indexing (no need to call -// unindexMetric for the deleted metric.) The caller must have locked the -// fingerprint. -func (p *persistence) dropArchivedMetric(fp clientmodel.Fingerprint) (err error) { +// unindexMetric for the deleted metric.) It does not touch the series file, +// though. The caller must have locked the fingerprint. +func (p *persistence) purgeArchivedMetric(fp clientmodel.Fingerprint) (err error) { defer func() { if err != nil { p.setDirty(true) @@ -944,7 +944,7 @@ func (p *persistence) dropArchivedMetric(fp clientmodel.Fingerprint) (err error) } // unarchiveMetric deletes an archived fingerprint and its metric, but (in -// contrast to dropArchivedMetric) does not un-index the metric. If a metric +// contrast to purgeArchivedMetric) does not un-index the metric. If a metric // was actually deleted, the method returns true and the first time of the // deleted metric. The caller must have locked the fingerprint. func (p *persistence) unarchiveMetric(fp clientmodel.Fingerprint) ( diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index c7564caa3..666b9e4b3 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -349,11 +349,11 @@ func TestDropArchivedMetric(t *testing.T) { t.Error("want FP 2 archived") } - if err != p.dropArchivedMetric(1) { + if err != p.purgeArchivedMetric(1) { t.Fatal(err) } - if err != p.dropArchivedMetric(3) { - // Dropping something that has not beet archived is not an error. + if err != p.purgeArchivedMetric(3) { + // Purging something that has not beet archived is not an error. t.Fatal(err) } p.waitForIndexing() diff --git a/storage/local/series.go b/storage/local/series.go index 0c1065a9d..696a6a9fa 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -242,11 +242,11 @@ func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) { } } -// purgeOlderThan removes chunkDescs older than t. It returns the number of -// purged chunkDescs and true if all chunkDescs have been purged. +// dropChunks removes chunkDescs older than t. It returns the number of dropped +// chunkDescs and true if all chunkDescs have been dropped. // // The caller must have locked the fingerprint of the series. -func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) (int, bool) { +func (s *memorySeries) dropChunks(t clientmodel.Timestamp) (int, bool) { keepIdx := len(s.chunkDescs) for i, cd := range s.chunkDescs { if !cd.lastTime().Before(t) { diff --git a/storage/local/storage.go b/storage/local/storage.go index ff215680b..916da1139 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -68,7 +68,7 @@ type memorySeriesStorage struct { loopStopping, loopStopped chan struct{} maxMemoryChunks int - purgeAfter time.Duration + dropAfter time.Duration checkpointInterval time.Duration checkpointDirtySeriesLimit int @@ -96,7 +96,6 @@ type memorySeriesStorage struct { seriesOps *prometheus.CounterVec ingestedSamplesCount prometheus.Counter invalidPreloadRequestsCount prometheus.Counter - purgeDuration prometheus.Gauge } // MemorySeriesStorageOptions contains options needed by @@ -105,7 +104,7 @@ type memorySeriesStorage struct { type MemorySeriesStorageOptions struct { MemoryChunks int // How many chunks to keep in memory. PersistenceStoragePath string // Location of persistence files. - PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged. + PersistenceRetentionPeriod time.Duration // Chunks at least that old are dropped. PersistenceQueueCapacity int // Capacity of queue for chunks to be persisted. CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks. CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint. @@ -140,7 +139,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { loopStopping: make(chan struct{}), loopStopped: make(chan struct{}), maxMemoryChunks: o.MemoryChunks, - purgeAfter: o.PersistenceRetentionPeriod, + dropAfter: o.PersistenceRetentionPeriod, checkpointInterval: o.CheckpointInterval, checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, @@ -670,8 +669,8 @@ func (s *memorySeriesStorage) persistChunks(fp clientmodel.Fingerprint, cds []*c // waitForNextFP waits an estimated duration, after which we want to process // another fingerprint so that we will process all fingerprints in a tenth of -// s.purgeAfter assuming that the system is doing nothing else, e.g. if we want -// to purge after 40h, we want to cycle through all fingerprints within +// s.dropAfter assuming that the system is doing nothing else, e.g. if we want +// to drop chunks after 40h, we want to cycle through all fingerprints within // 4h. However, the maximum sweep time is capped at fpMaxSweepTime. Furthermore, // this method will always wait for at least fpMinWaitDuration and never longer // than fpMaxWaitDuration. If s.loopStopped is closed, it will return false @@ -680,7 +679,7 @@ func (s *memorySeriesStorage) persistChunks(fp clientmodel.Fingerprint, cds []*c func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int) bool { d := fpMaxWaitDuration if numberOfFPs != 0 { - sweepTime := s.purgeAfter / 10 + sweepTime := s.dropAfter / 10 if sweepTime > fpMaxSweepTime { sweepTime = fpMaxSweepTime } @@ -725,6 +724,7 @@ func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan clientmodel. } begin := time.Now() fpIter = s.fpToSeries.fpIter() + count := 0 for fp := range fpIter { select { case memoryFingerprints <- fp: @@ -732,8 +732,14 @@ func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan clientmodel. return } s.waitForNextFP(s.fpToSeries.length()) + count++ + } + if count > 0 { + glog.Infof( + "Completed maintenance sweep through %d in-memory fingerprints in %v.", + count, time.Since(begin), + ) } - glog.Infof("Completed maintenance sweep through in-memory fingerprints in %v.", time.Since(begin)) } }() @@ -750,7 +756,7 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmode for { archivedFPs, err := s.persistence.getFingerprintsModifiedBefore( - clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.purgeAfter), + clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter), ) if err != nil { glog.Error("Failed to lookup archived fingerprint ranges: ", err) @@ -770,7 +776,12 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmode } s.waitForNextFP(len(archivedFPs)) } - glog.Infof("Completed maintenance sweep through archived fingerprints in %v.", time.Since(begin)) + if len(archivedFPs) > 0 { + glog.Infof( + "Completed maintenance sweep through %d archived fingerprints in %v.", + len(archivedFPs), time.Since(begin), + ) + } } }() return archivedFingerprints @@ -807,11 +818,9 @@ loop: headChunksPersistedSinceLastCheckpoint = 0 checkpointTimer.Reset(s.checkpointInterval) case fp := <-memoryFingerprints: - s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter)) - s.seriesOps.WithLabelValues(memoryMaintenance).Inc() + s.maintainMemorySeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter)) case fp := <-archivedFingerprints: - s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter)) - s.seriesOps.WithLabelValues(archiveMaintenance).Inc() + s.maintainArchivedSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter)) case <-s.countPersistedHeadChunks: headChunksPersistedSinceLastCheckpoint++ // Check if we have enough "dirty" series so that we need an early checkpoint. @@ -835,10 +844,11 @@ loop: } } -// maintainSeries closes the head chunk if not touched in a while. It archives a -// series if all chunks are evicted. It evicts chunkDescs if there are too -// many. -func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) { +// maintainMemorySeries first purges the series from old chunks. If the series +// still exists after that, it proceeds with the following steps: It closes the +// head chunk if it was not touched in a while. It archives a series if all +// chunks are evicted. It evicts chunkDescs if there are too many. +func (s *memorySeriesStorage) maintainMemorySeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) { var headChunkToPersist *chunkDesc s.fpLocker.Lock(fp) defer func() { @@ -846,19 +856,28 @@ func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) { // Queue outside of lock! if headChunkToPersist != nil { s.persistQueue <- persistRequest{fp, headChunkToPersist} - } - // Count that a head chunk was persisted, but only best effort, i.e. we - // don't want to block here. - select { - case s.countPersistedHeadChunks <- struct{}{}: // Counted. - default: // Meh... + // Count that a head chunk was persisted, but only best effort, i.e. we + // don't want to block here. + select { + case s.countPersistedHeadChunks <- struct{}{}: // Counted. + default: // Meh... + } } }() series, ok := s.fpToSeries.get(fp) if !ok { + // Series is actually not in memory, perhaps archived or dropped in the meantime. return } + + defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc() + + if s.purgeMemorySeries(fp, series, beforeTime) { + // Series is gone now, we are done. + return + } + iOldestNotEvicted := -1 for i, cd := range series.chunkDescs { if !cd.isEvicted() { @@ -876,7 +895,10 @@ func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) { if len(series.chunkDescs) == 0 { cds, err := s.loadChunkDescs(fp, clientmodel.Latest) if err != nil { - glog.Errorf("Could not load chunk descriptors prior to archiving metric %v, metric will not be archived: %v", series.metric, err) + glog.Errorf( + "Could not load chunk descriptors prior to archiving metric %v, metric will not be archived: %v", + series.metric, err, + ) return } series.chunkDescs = cds @@ -902,38 +924,43 @@ func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) { } } -// purgeSeries purges chunks older than beforeTime from a series. If the series -// contains no chunks after the purge, it is dropped entirely. -func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) { +// purgeMemorySeries drops chunks older than beforeTime from the provided memory +// series. The caller must have locked fp. If the series contains no chunks +// after dropping old chunks, it is purged entirely. In that case, the method +// returns true. +func (s *memorySeriesStorage) purgeMemorySeries(fp clientmodel.Fingerprint, series *memorySeries, beforeTime clientmodel.Timestamp) bool { + if !series.firstTime().Before(beforeTime) { + // Oldest sample not old enough. + return false + } + newFirstTime, numDroppedFromPersistence, allDroppedFromPersistence, err := s.persistence.dropChunks(fp, beforeTime) + if err != nil { + glog.Error("Error dropping persisted chunks: ", err) + } + numDroppedFromMemory, allDroppedFromMemory := series.dropChunks(beforeTime) + if allDroppedFromPersistence && allDroppedFromMemory { + s.fpToSeries.del(fp) + s.numSeries.Dec() + s.seriesOps.WithLabelValues(memoryPurge).Inc() + s.persistence.unindexMetric(fp, series.metric) + return true + } + if series.chunkDescsOffset != -1 { + series.savedFirstTime = newFirstTime + series.chunkDescsOffset += numDroppedFromMemory - numDroppedFromPersistence + if series.chunkDescsOffset < 0 { + panic("dropped more chunks from persistence than from memory") + } + } + return false +} + +// maintainArchivedSeries drops chunks older than beforeTime from an archived +// series. If the series contains no chunks after that, it is purged entirely. +func (s *memorySeriesStorage) maintainArchivedSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) { s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - if series, ok := s.fpToSeries.get(fp); ok { - // Deal with series in memory. - if !series.firstTime().Before(beforeTime) { - // Oldest sample not old enough. - return - } - newFirstTime, numDropped, allDropped, err := s.persistence.dropChunks(fp, beforeTime) - if err != nil { - glog.Error("Error purging persisted chunks: ", err) - } - numPurged, allPurged := series.purgeOlderThan(beforeTime) - if allPurged && allDropped { - s.fpToSeries.del(fp) - s.numSeries.Dec() - s.seriesOps.WithLabelValues(memoryPurge).Inc() - s.persistence.unindexMetric(fp, series.metric) - } else if series.chunkDescsOffset != -1 { - series.savedFirstTime = newFirstTime - series.chunkDescsOffset += numPurged - numDropped - if series.chunkDescsOffset < 0 { - panic("dropped more chunks from persistence than from memory") - } - } - return - } - // Deal with archived series. has, firstTime, lastTime, err := s.persistence.hasArchivedMetric(fp) if err != nil { glog.Error("Error looking up archived time range: ", err) @@ -944,13 +971,15 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime return } + defer s.seriesOps.WithLabelValues(archiveMaintenance).Inc() + newFirstTime, _, allDropped, err := s.persistence.dropChunks(fp, beforeTime) if err != nil { - glog.Error("Error purging persisted chunks: ", err) + glog.Error("Error dropping persisted chunks: ", err) } if allDropped { - if err := s.persistence.dropArchivedMetric(fp); err != nil { - glog.Errorf("Error dropping archived metric for fingerprint %v: %v", fp, err) + if err := s.persistence.purgeArchivedMetric(fp); err != nil { + glog.Errorf("Error purging archived metric for fingerprint %v: %v", fp, err) return } s.seriesOps.WithLabelValues(archivePurge).Inc() diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index a30a0a7fe..91f9b6bc3 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -36,6 +36,9 @@ func TestGetFingerprintsForLabelMatchers(t *testing.T) { // TestLoop is just a smoke test for the loop method, if we can switch it on and // off without disaster. func TestLoop(t *testing.T) { + if testing.Short() { + t.Skip("Skipping test in short mode.") + } samples := make(clientmodel.Samples, 1000) for i := range samples { samples[i] = &clientmodel.Sample{ @@ -57,8 +60,18 @@ func TestLoop(t *testing.T) { } storage.Start() storage.AppendSamples(samples) - time.Sleep(time.Second) + storage.WaitForIndexing() + series, _ := storage.(*memorySeriesStorage).fpToSeries.get(clientmodel.Metric{}.Fingerprint()) + cdsBefore := len(series.chunkDescs) + time.Sleep(fpMaxWaitDuration + time.Second) // TODO(beorn7): Ugh, need to wait for maintenance to kick in. + cdsAfter := len(series.chunkDescs) storage.Stop() + if cdsBefore <= cdsAfter { + t.Errorf( + "Number of chunk descriptors should have gone down by now. Got before %d, after %d.", + cdsBefore, cdsAfter, + ) + } } func TestChunk(t *testing.T) { @@ -337,15 +350,15 @@ func TestEvictAndPurgeSeries(t *testing.T) { s, closer := NewTestStorage(t) defer closer.Close() - ms := s.(*memorySeriesStorage) // Going to test the internal purgeSeries method. + ms := s.(*memorySeriesStorage) // Going to test the internal maintain.*Series methods. s.AppendSamples(samples) s.WaitForIndexing() fp := clientmodel.Metric{}.Fingerprint() - // Purge ~half of the chunks. - ms.purgeSeries(fp, 1000) + // Drop ~half of the chunks. + ms.maintainMemorySeries(fp, 1000) it := s.NewIterator(fp) actual := it.GetBoundaryValues(metric.Interval{ OldestInclusive: 0, @@ -362,8 +375,8 @@ func TestEvictAndPurgeSeries(t *testing.T) { t.Errorf("2nd timestamp: want %v, got %v", want, actual[1].Timestamp) } - // Purge everything. - ms.purgeSeries(fp, 10000) + // Drop everything. + ms.maintainMemorySeries(fp, 10000) it = s.NewIterator(fp) actual = it.GetBoundaryValues(metric.Interval{ OldestInclusive: 0, @@ -403,18 +416,18 @@ func TestEvictAndPurgeSeries(t *testing.T) { t.Fatal("not archived") } - // Purge ~half of the chunks of an archived series. - ms.purgeSeries(fp, 1000) + // Drop ~half of the chunks of an archived series. + ms.maintainArchivedSeries(fp, 1000) archived, _, _, err = ms.persistence.hasArchivedMetric(fp) if err != nil { t.Fatal(err) } if !archived { - t.Fatal("archived series dropped although only half of the chunks purged") + t.Fatal("archived series purged although only half of the chunks dropped") } - // Purge everything. - ms.purgeSeries(fp, 10000) + // Drop everything. + ms.maintainArchivedSeries(fp, 10000) archived, _, _, err = ms.persistence.hasArchivedMetric(fp) if err != nil { t.Fatal(err)