diff --git a/main.go b/main.go index c881b85bd..0dc29da85 100644 --- a/main.go +++ b/main.go @@ -95,7 +95,8 @@ func NewPrometheus() *prometheus { PersistenceRetentionPeriod: *persistenceRetentionPeriod, CheckpointInterval: *checkpointInterval, CheckpointDirtySeriesLimit: *checkpointDirtySeriesLimit, - Dirty: *storageDirty, + Dirty: *storageDirty, + PedanticChecks: *storagePedanticChecks, } memStorage, err := local.NewMemorySeriesStorage(o) if err != nil { diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index 384f79869..aaa22802f 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -193,6 +193,7 @@ func (p *persistence) sanitizeSeries( bytesToTrim := fi.Size() % int64(chunkLen+chunkHeaderLen) chunksInFile := int(fi.Size()) / (chunkLen + chunkHeaderLen) + modTime := fi.ModTime() if bytesToTrim != 0 { glog.Warningf( "Truncating file %s to exactly %d chunks, trimming %d extraneous bytes.", @@ -224,7 +225,8 @@ func (p *persistence) sanitizeSeries( if !p.pedanticChecks && bytesToTrim == 0 && s.chunkDescsOffset != -1 && - chunksInFile == s.chunkDescsOffset+s.persistWatermark { + chunksInFile == s.chunkDescsOffset+s.persistWatermark && + modTime.Equal(s.modTime) { // Everything is consistent. We are good. return fp, true } @@ -241,8 +243,9 @@ func (p *persistence) sanitizeSeries( s.metric, fp, chunksInFile, ) s.chunkDescs = nil - s.chunkDescsOffset = -1 + s.chunkDescsOffset = chunksInFile s.persistWatermark = 0 + s.modTime = modTime return fp, true } // This is the tricky one: We have chunks from heads.db, but @@ -268,6 +271,7 @@ func (p *persistence) sanitizeSeries( } s.persistWatermark = len(cds) s.chunkDescsOffset = 0 + s.modTime = modTime lastTime := cds[len(cds)-1].lastTime() keepIdx := -1 diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 8870bac4b..5c6c579e7 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -479,7 +479,11 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie // // (4.4) The varint-encoded persistWatermark. (Missing in v1.) // -// (4.5) The varint-encoded chunkDescsOffset. +// (4.5) The modification time of the series file as nanoceconds elapsed since +// January 1, 1970 UTC. -1 if the modification time is unknown or no series file +// exists yet. (Missing in v1.) +// +// (4.6) The varint-encoded chunkDescsOffset. // // (4.6) The varint-encoded savedFirstTime. // @@ -571,6 +575,15 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap if _, err = codable.EncodeVarint(w, int64(m.series.persistWatermark)); err != nil { return } + if m.series.modTime.IsZero() { + if _, err = codable.EncodeVarint(w, -1); err != nil { + return + } + } else { + if _, err = codable.EncodeVarint(w, m.series.modTime.UnixNano()); err != nil { + return + } + } if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset)); err != nil { return } @@ -708,6 +721,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in return sm, chunksToPersist, nil } var persistWatermark int64 + var modTime time.Time if version != headsFormatLegacyVersion { // persistWatermark only present in v2. persistWatermark, err = binary.ReadVarint(r) @@ -716,6 +730,15 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in p.dirty = true return sm, chunksToPersist, nil } + modTimeNano, err := binary.ReadVarint(r) + if err != nil { + glog.Warning("Could not decode modification time:", err) + p.dirty = true + return sm, chunksToPersist, nil + } + if modTimeNano != -1 { + modTime = time.Unix(0, modTimeNano) + } } chunkDescsOffset, err := binary.ReadVarint(r) if err != nil { @@ -786,6 +809,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in metric: clientmodel.Metric(metric), chunkDescs: chunkDescs, persistWatermark: int(persistWatermark), + modTime: modTime, chunkDescsOffset: int(chunkDescsOffset), savedFirstTime: clientmodel.Timestamp(savedFirstTime), headChunkClosed: persistWatermark >= numChunkDescs, @@ -964,6 +988,17 @@ func (p *persistence) deleteSeriesFile(fp clientmodel.Fingerprint) (int, error) return numChunks, nil } +// getSeriesFileModTime returns the modification time of the series file +// belonging to the provided fingerprint. In case of an error, the zero value of +// time.Time is returned. +func (p *persistence) getSeriesFileModTime(fp clientmodel.Fingerprint) time.Time { + var modTime time.Time + if fi, err := os.Stat(p.fileNameForFingerprint(fp)); err == nil { + return fi.ModTime() + } + return modTime +} + // indexMetric queues the given metric for addition to the indexes needed by // getFingerprintsForLabelPair, getLabelValuesForLabelName, and // getFingerprintsModifiedBefore. If the queue is full, this method blocks diff --git a/storage/local/series.go b/storage/local/series.go index 3b67e424b..df76c9712 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -143,6 +143,9 @@ type memorySeries struct { // points to a non-persisted chunk. If all chunks are persisted, then // persistWatermark == len(chunkDescs). persistWatermark int + // The modification time of the series file. The zero value of time.Time + // is used to mark an unknown modification time. + modTime time.Time // The chunkDescs in memory might not have all the chunkDescs for the // chunks that are persisted to disk. The missing chunkDescs are all // contiguous and at the tail end. chunkDescsOffset is the index of the diff --git a/storage/local/storage.go b/storage/local/storage.go index 9ffd928b0..c781461e9 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -770,6 +770,7 @@ func (s *memorySeriesStorage) writeMemorySeries( } s.incNumChunksToPersist(-len(cds)) chunkOps.WithLabelValues(persistAndUnpin).Add(float64(len(cds))) + series.modTime = s.persistence.getSeriesFileModTime(fp) }() // Get the actual chunks from underneath the chunkDescs.