diff --git a/storage/local/persistence.go b/storage/local/persistence.go index fba1fdc36..646075dbc 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -259,7 +259,8 @@ func (p *persistence) setDirty(dirty bool) { // crashRecovery is called by loadSeriesMapAndHeads if the persistence appears // to be dirty after the loading (either because the loading resulted in an -// error or because the persistence was dirty from the start). +// error or because the persistence was dirty from the start). Not goroutine +// safe. Only call before anything else is running. func (p *persistence) crashRecovery(fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries) error { glog.Warning("Starting crash recovery. Prometheus is inoperational until complete.") @@ -1040,42 +1041,56 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) { } // dropChunks deletes all chunks from a series whose last sample time is before -// beforeTime. It returns the number of deleted chunks and true if all chunks of -// the series have been deleted. It is the caller's responsibility to make sure -// nothing is persisted or loaded for the same fingerprint concurrently. -func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (int, bool, error) { +// beforeTime. It returns the timestamp of the first sample in the oldest chunk +// _not_ dropped, the number of deleted chunks, and true if all chunks of the +// series have been deleted (in which case the returned timestamp will be 0 and +// must be ignored). It is the caller's responsibility to make sure nothing is +// persisted or loaded for the same fingerprint concurrently. +func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) ( + firstTimeNotDropped clientmodel.Timestamp, + numDropped int, + allDropped bool, + err error, +) { + defer func() { + if err != nil { + p.setDirty(true) + } + }() f, err := p.openChunkFileForReading(fp) if os.IsNotExist(err) { - return 0, true, nil + return 0, 0, true, nil } if err != nil { - return 0, false, err + return 0, 0, false, err } defer f.Close() // Find the first chunk that should be kept. var i int + var firstTime clientmodel.Timestamp for ; ; i++ { - _, err := f.Seek(p.offsetForChunkIndex(i)+chunkHeaderLastTimeOffset, os.SEEK_SET) + _, err := f.Seek(p.offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET) if err != nil { - return 0, false, err + return 0, 0, false, err } - lastTimeBuf := make([]byte, 8) - _, err = io.ReadAtLeast(f, lastTimeBuf, 8) + timeBuf := make([]byte, 16) + _, err = io.ReadAtLeast(f, timeBuf, 16) if err == io.EOF { // We ran into the end of the file without finding any chunks that should // be kept. Remove the whole file. chunkOps.WithLabelValues(purge).Add(float64(i)) if err := os.Remove(f.Name()); err != nil { - return 0, true, err + return 0, 0, true, err } - return i, true, nil + return 0, i, true, nil } if err != nil { - return 0, false, err + return 0, 0, false, err } - lastTime := clientmodel.Timestamp(binary.LittleEndian.Uint64(lastTimeBuf)) + lastTime := clientmodel.Timestamp(binary.LittleEndian.Uint64(timeBuf[8:])) if !lastTime.Before(beforeTime) { + firstTime = clientmodel.Timestamp(binary.LittleEndian.Uint64(timeBuf)) chunkOps.WithLabelValues(purge).Add(float64(i)) break } @@ -1084,25 +1099,25 @@ func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmo // We've found the first chunk that should be kept. Seek backwards to the // beginning of its header and start copying everything from there into a new // file. - _, err = f.Seek(-(chunkHeaderLastTimeOffset + 8), os.SEEK_CUR) + _, err = f.Seek(-(chunkHeaderFirstTimeOffset + 16), os.SEEK_CUR) if err != nil { - return 0, false, err + return 0, 0, false, err } temp, err := os.OpenFile(p.tempFileNameForFingerprint(fp), os.O_WRONLY|os.O_CREATE, 0640) if err != nil { - return 0, false, err + return 0, 0, false, err } defer temp.Close() if _, err := io.Copy(temp, f); err != nil { - return 0, false, err + return 0, 0, false, err } if err := os.Rename(p.tempFileNameForFingerprint(fp), p.fileNameForFingerprint(fp)); err != nil { - return 0, false, err + return 0, 0, false, err } - return i, false, nil + return firstTime, i, false, nil } // indexMetric queues the given metric for addition to the indexes needed by @@ -1148,9 +1163,11 @@ func (p *persistence) archiveMetric( defer p.archiveMtx.Unlock() if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil { + p.setDirty(true) return err } if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil { + p.setDirty(true) return err } return nil @@ -1166,6 +1183,15 @@ func (p *persistence) hasArchivedMetric(fp clientmodel.Fingerprint) ( return } +// updateArchivedTimeRange updates an archived time range. The caller must make +// sure that the fingerprint is currently archived (the time range will +// otherwise be added without the corresponding metric in the archive). +func (p *persistence) updateArchivedTimeRange( + fp clientmodel.Fingerprint, first, last clientmodel.Timestamp, +) error { + return p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}) +} + // getFingerprintsModifiedBefore returns the fingerprints of archived timeseries // that have live samples before the provided timestamp. This method is // goroutine-safe. @@ -1204,7 +1230,13 @@ func (p *persistence) getArchivedMetric(fp clientmodel.Fingerprint) (clientmodel // dropArchivedMetric deletes an archived fingerprint and its corresponding // metric entirely. It also queues the metric for un-indexing (no need to call // unindexMetric for the deleted metric.) This method is goroutine-safe. -func (p *persistence) dropArchivedMetric(fp clientmodel.Fingerprint) error { +func (p *persistence) dropArchivedMetric(fp clientmodel.Fingerprint) (err error) { + defer func() { + if err != nil { + p.setDirty(true) + } + }() + p.archiveMtx.Lock() defer p.archiveMtx.Unlock() diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 0f9e24548..a395a6360 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -138,10 +138,13 @@ func TestPersistLoadDropChunks(t *testing.T) { } // Drop half of the chunks. for fp, expectedChunks := range fpToChunks { - numDropped, allDropped, err := p.dropChunks(fp, 5) + firstTime, numDropped, allDropped, err := p.dropChunks(fp, 5) if err != nil { t.Fatal(err) } + if firstTime != 5 { + t.Errorf("want first time 5, got %d", firstTime) + } if numDropped != 5 { t.Errorf("want 5 dropped chunks, got %v", numDropped) } @@ -164,7 +167,10 @@ func TestPersistLoadDropChunks(t *testing.T) { } // Drop all the chunks. for fp := range fpToChunks { - numDropped, allDropped, err := p.dropChunks(fp, 100) + firstTime, numDropped, allDropped, err := p.dropChunks(fp, 100) + if firstTime != 0 { + t.Errorf("want first time 0, got %d", firstTime) + } if err != nil { t.Fatal(err) } diff --git a/storage/local/storage.go b/storage/local/storage.go index fcb5a4632..a3db42841 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -485,7 +485,6 @@ func (s *memorySeriesStorage) loop() { m.fp, m.series.metric, m.series.firstTime(), m.series.lastTime(), ); err != nil { glog.Errorf("Error archiving metric %v: %v", m.series.metric, err) - s.persistence.setDirty(true) } else { s.seriesOps.WithLabelValues(archive).Inc() } @@ -523,7 +522,7 @@ func (s *memorySeriesStorage) loop() { for _, fp := range persistedFPs { select { case <-s.loopStopping: - glog.Info("Interrupted purnging series.") + glog.Info("Interrupted purging series.") return default: s.purgeSeries(fp, ts) @@ -536,22 +535,22 @@ func (s *memorySeriesStorage) loop() { } } -// purgeSeries purges chunks older than persistenceRetentionPeriod from a -// series. If the series contains no chunks after the purge, it is dropped -// entirely. +// purgeSeries purges chunks older than beforeTime from a series. If the series +// contains no chunks after the purge, it is dropped entirely. func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) { s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - // First purge persisted chunks. We need to do that anyway. - numDropped, allDropped, err := s.persistence.dropChunks(fp, beforeTime) - if err != nil { - glog.Error("Error purging persisted chunks: ", err) - s.persistence.setDirty(true) - } - - // Purge chunks from memory accordingly. if series, ok := s.fpToSeries.get(fp); ok { + // Deal with series in memory. + if !series.firstTime().Before(beforeTime) { + // Oldest sample not old enough. + return + } + newFirstTime, numDropped, allDropped, err := s.persistence.dropChunks(fp, beforeTime) + if err != nil { + glog.Error("Error purging persisted chunks: ", err) + } numPurged, allPurged := series.purgeOlderThan(beforeTime) if allPurged && allDropped { s.fpToSeries.del(fp) @@ -559,6 +558,7 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime s.seriesOps.WithLabelValues(memoryPurge).Inc() s.persistence.unindexMetric(fp, series.metric) } else if series.chunkDescsOffset != -1 { + series.savedFirstTime = newFirstTime series.chunkDescsOffset += numPurged - numDropped if series.chunkDescsOffset < 0 { panic("dropped more chunks from persistence than from memory") @@ -567,20 +567,30 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime return } - // If we arrive here, nothing was in memory, so the metric must have - // been archived. Drop the archived metric if there are no persisted - // chunks left. If we don't drop the archived metric, we should update - // the archivedFingerprintToTimeRange index according to the remaining - // chunks, but it's probably not worth the effort. Queries going beyond - // the purge cut-off can be truncated in a more direct fashion. + // Deal with archived series. + has, firstTime, lastTime, err := s.persistence.hasArchivedMetric(fp) + if err != nil { + glog.Error("Error looking up archived time range: ", err) + return + } + if !has || !firstTime.Before(beforeTime) { + // Oldest sample not old enough, or metric purged or unarchived in the meantime. + return + } + + newFirstTime, _, allDropped, err := s.persistence.dropChunks(fp, beforeTime) + if err != nil { + glog.Error("Error purging persisted chunks: ", err) + } if allDropped { if err := s.persistence.dropArchivedMetric(fp); err != nil { glog.Errorf("Error dropping archived metric for fingerprint %v: %v", fp, err) - s.persistence.setDirty(true) - } else { - s.seriesOps.WithLabelValues(archivePurge).Inc() + return } + s.seriesOps.WithLabelValues(archivePurge).Inc() + return } + s.persistence.updateArchivedTimeRange(fp, newFirstTime, lastTime) } // To expose persistQueueCap as metric: diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 51d4b7944..20b48a773 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -471,8 +471,8 @@ func TestFuzz(t *testing.T) { } } -// BenchmarkFuzz is the benchmark version TestFuzz. However, it will run several -// append and verify operations in parallel, if GOMAXPROC is set +// BenchmarkFuzz is the benchmark version of TestFuzz. However, it will run +// several append and verify operations in parallel, if GOMAXPROC is set // accordingly. Also, the storage options are set such that evictions, // checkpoints, and purging will happen concurrently, too. This benchmark will // have a very long runtime (up to minutes). You can use it as an actual