mirror of
https://github.com/prometheus/prometheus
synced 2024-12-27 00:53:12 +00:00
Increase resilience of the storage against data corruption - step 3.
Step 3: Remember the mtime of series files and make use of it to detect series files that are not the one the checkpoint thinks they are.
This commit is contained in:
parent
e25cca823c
commit
11bd9ce1bd
3
main.go
3
main.go
@ -95,7 +95,8 @@ func NewPrometheus() *prometheus {
|
||||
PersistenceRetentionPeriod: *persistenceRetentionPeriod,
|
||||
CheckpointInterval: *checkpointInterval,
|
||||
CheckpointDirtySeriesLimit: *checkpointDirtySeriesLimit,
|
||||
Dirty: *storageDirty,
|
||||
Dirty: *storageDirty,
|
||||
PedanticChecks: *storagePedanticChecks,
|
||||
}
|
||||
memStorage, err := local.NewMemorySeriesStorage(o)
|
||||
if err != nil {
|
||||
|
@ -193,6 +193,7 @@ func (p *persistence) sanitizeSeries(
|
||||
|
||||
bytesToTrim := fi.Size() % int64(chunkLen+chunkHeaderLen)
|
||||
chunksInFile := int(fi.Size()) / (chunkLen + chunkHeaderLen)
|
||||
modTime := fi.ModTime()
|
||||
if bytesToTrim != 0 {
|
||||
glog.Warningf(
|
||||
"Truncating file %s to exactly %d chunks, trimming %d extraneous bytes.",
|
||||
@ -224,7 +225,8 @@ func (p *persistence) sanitizeSeries(
|
||||
if !p.pedanticChecks &&
|
||||
bytesToTrim == 0 &&
|
||||
s.chunkDescsOffset != -1 &&
|
||||
chunksInFile == s.chunkDescsOffset+s.persistWatermark {
|
||||
chunksInFile == s.chunkDescsOffset+s.persistWatermark &&
|
||||
modTime.Equal(s.modTime) {
|
||||
// Everything is consistent. We are good.
|
||||
return fp, true
|
||||
}
|
||||
@ -241,8 +243,9 @@ func (p *persistence) sanitizeSeries(
|
||||
s.metric, fp, chunksInFile,
|
||||
)
|
||||
s.chunkDescs = nil
|
||||
s.chunkDescsOffset = -1
|
||||
s.chunkDescsOffset = chunksInFile
|
||||
s.persistWatermark = 0
|
||||
s.modTime = modTime
|
||||
return fp, true
|
||||
}
|
||||
// This is the tricky one: We have chunks from heads.db, but
|
||||
@ -268,6 +271,7 @@ func (p *persistence) sanitizeSeries(
|
||||
}
|
||||
s.persistWatermark = len(cds)
|
||||
s.chunkDescsOffset = 0
|
||||
s.modTime = modTime
|
||||
|
||||
lastTime := cds[len(cds)-1].lastTime()
|
||||
keepIdx := -1
|
||||
|
@ -479,7 +479,11 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie
|
||||
//
|
||||
// (4.4) The varint-encoded persistWatermark. (Missing in v1.)
|
||||
//
|
||||
// (4.5) The varint-encoded chunkDescsOffset.
|
||||
// (4.5) The modification time of the series file as nanoceconds elapsed since
|
||||
// January 1, 1970 UTC. -1 if the modification time is unknown or no series file
|
||||
// exists yet. (Missing in v1.)
|
||||
//
|
||||
// (4.6) The varint-encoded chunkDescsOffset.
|
||||
//
|
||||
// (4.6) The varint-encoded savedFirstTime.
|
||||
//
|
||||
@ -571,6 +575,15 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
|
||||
if _, err = codable.EncodeVarint(w, int64(m.series.persistWatermark)); err != nil {
|
||||
return
|
||||
}
|
||||
if m.series.modTime.IsZero() {
|
||||
if _, err = codable.EncodeVarint(w, -1); err != nil {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
if _, err = codable.EncodeVarint(w, m.series.modTime.UnixNano()); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset)); err != nil {
|
||||
return
|
||||
}
|
||||
@ -708,6 +721,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
|
||||
return sm, chunksToPersist, nil
|
||||
}
|
||||
var persistWatermark int64
|
||||
var modTime time.Time
|
||||
if version != headsFormatLegacyVersion {
|
||||
// persistWatermark only present in v2.
|
||||
persistWatermark, err = binary.ReadVarint(r)
|
||||
@ -716,6 +730,15 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
|
||||
p.dirty = true
|
||||
return sm, chunksToPersist, nil
|
||||
}
|
||||
modTimeNano, err := binary.ReadVarint(r)
|
||||
if err != nil {
|
||||
glog.Warning("Could not decode modification time:", err)
|
||||
p.dirty = true
|
||||
return sm, chunksToPersist, nil
|
||||
}
|
||||
if modTimeNano != -1 {
|
||||
modTime = time.Unix(0, modTimeNano)
|
||||
}
|
||||
}
|
||||
chunkDescsOffset, err := binary.ReadVarint(r)
|
||||
if err != nil {
|
||||
@ -786,6 +809,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
|
||||
metric: clientmodel.Metric(metric),
|
||||
chunkDescs: chunkDescs,
|
||||
persistWatermark: int(persistWatermark),
|
||||
modTime: modTime,
|
||||
chunkDescsOffset: int(chunkDescsOffset),
|
||||
savedFirstTime: clientmodel.Timestamp(savedFirstTime),
|
||||
headChunkClosed: persistWatermark >= numChunkDescs,
|
||||
@ -964,6 +988,17 @@ func (p *persistence) deleteSeriesFile(fp clientmodel.Fingerprint) (int, error)
|
||||
return numChunks, nil
|
||||
}
|
||||
|
||||
// getSeriesFileModTime returns the modification time of the series file
|
||||
// belonging to the provided fingerprint. In case of an error, the zero value of
|
||||
// time.Time is returned.
|
||||
func (p *persistence) getSeriesFileModTime(fp clientmodel.Fingerprint) time.Time {
|
||||
var modTime time.Time
|
||||
if fi, err := os.Stat(p.fileNameForFingerprint(fp)); err == nil {
|
||||
return fi.ModTime()
|
||||
}
|
||||
return modTime
|
||||
}
|
||||
|
||||
// indexMetric queues the given metric for addition to the indexes needed by
|
||||
// getFingerprintsForLabelPair, getLabelValuesForLabelName, and
|
||||
// getFingerprintsModifiedBefore. If the queue is full, this method blocks
|
||||
|
@ -143,6 +143,9 @@ type memorySeries struct {
|
||||
// points to a non-persisted chunk. If all chunks are persisted, then
|
||||
// persistWatermark == len(chunkDescs).
|
||||
persistWatermark int
|
||||
// The modification time of the series file. The zero value of time.Time
|
||||
// is used to mark an unknown modification time.
|
||||
modTime time.Time
|
||||
// The chunkDescs in memory might not have all the chunkDescs for the
|
||||
// chunks that are persisted to disk. The missing chunkDescs are all
|
||||
// contiguous and at the tail end. chunkDescsOffset is the index of the
|
||||
|
@ -770,6 +770,7 @@ func (s *memorySeriesStorage) writeMemorySeries(
|
||||
}
|
||||
s.incNumChunksToPersist(-len(cds))
|
||||
chunkOps.WithLabelValues(persistAndUnpin).Add(float64(len(cds)))
|
||||
series.modTime = s.persistence.getSeriesFileModTime(fp)
|
||||
}()
|
||||
|
||||
// Get the actual chunks from underneath the chunkDescs.
|
||||
|
Loading…
Reference in New Issue
Block a user