Add crash recovery of fingerprint mappings.

This commit is contained in:
beorn7 2015-05-07 18:58:14 +02:00
parent 2235cec175
commit c36e0e05f1
2 changed files with 48 additions and 11 deletions

View File

@ -42,6 +42,12 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge
count := 0
seriesDirNameFmt := fmt.Sprintf("%%0%dx", seriesDirNameLen)
// Delete the fingerprint mapping file as it might be stale or
// corrupt. We'll rebuild the mappings as we go.
os.Remove(p.mappingsFileName())
// The mappings to rebuild.
fpm := fpMappings{}
glog.Info("Scanning files.")
for i := 0; i < 1<<(seriesDirNameLen*4); i++ {
dirname := path.Join(p.basePath, fmt.Sprintf(seriesDirNameFmt, i))
@ -58,7 +64,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge
return err
}
for _, fi := range fis {
fp, ok := p.sanitizeSeries(dirname, fi, fingerprintToSeries)
fp, ok := p.sanitizeSeries(dirname, fi, fingerprintToSeries, fpm)
if ok {
fpsSeen[fp] = struct{}{}
}
@ -75,7 +81,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge
for fp, s := range fingerprintToSeries {
if _, seen := fpsSeen[fp]; !seen {
// fp exists in fingerprintToSeries, but has no representation on disk.
if s.headChunkClosed {
if s.persistWatermark == len(s.chunkDescs) {
// Oops, everything including the head chunk was
// already persisted, but nothing on disk.
// Thus, we lost that series completely. Clean
@ -112,17 +118,24 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge
s.persistWatermark = 0
s.chunkDescsOffset = 0
}
maybeAddMapping(fp, s.metric, fpm)
fpsSeen[fp] = struct{}{} // Add so that fpsSeen is complete.
}
}
glog.Info("Check for series without series file complete.")
if err := p.cleanUpArchiveIndexes(fingerprintToSeries, fpsSeen); err != nil {
if err := p.cleanUpArchiveIndexes(fingerprintToSeries, fpsSeen, fpm); err != nil {
return err
}
if err := p.rebuildLabelIndexes(fingerprintToSeries); err != nil {
return err
}
// Finally rewrite the mappings file if there are any mappings.
if len(fpm) > 0 {
if err := p.checkpointFPMappings(fpm); err != nil {
return err
}
}
p.setDirty(false)
glog.Warning("Crash recovery complete.")
@ -156,7 +169,9 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge
// is checked for its presence in the index of archived series. If it cannot
// be found there, it is moved into the orphaned directory.
func (p *persistence) sanitizeSeries(
dirname string, fi os.FileInfo, fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries,
dirname string, fi os.FileInfo,
fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries,
fpm fpMappings,
) (clientmodel.Fingerprint, bool) {
filename := path.Join(dirname, fi.Name())
purge := func() {
@ -222,6 +237,7 @@ func (p *persistence) sanitizeSeries(
if s == nil {
panic("fingerprint mapped to nil pointer")
}
maybeAddMapping(fp, s.metric, fpm)
if !p.pedanticChecks &&
bytesToTrim == 0 &&
s.chunkDescsOffset != -1 &&
@ -320,12 +336,14 @@ func (p *persistence) sanitizeSeries(
return fp, false
}
// This series looks like a properly archived one.
maybeAddMapping(fp, metric, fpm)
return fp, true
}
func (p *persistence) cleanUpArchiveIndexes(
fpToSeries map[clientmodel.Fingerprint]*memorySeries,
fpsSeen map[clientmodel.Fingerprint]struct{},
fpm fpMappings,
) error {
glog.Info("Cleaning up archive indexes.")
var fp codable.Fingerprint
@ -359,7 +377,12 @@ func (p *persistence) cleanUpArchiveIndexes(
_, err := p.archivedFingerprintToTimeRange.Delete(fp)
return err
}
// fp is legitimately archived. Make sure it is in timerange index, too.
// fp is legitimately archived. Now we need the metric to check for a mapped fingerprint.
if err := kv.Value(&m); err != nil {
return err
}
maybeAddMapping(clientmodel.Fingerprint(fp), clientmodel.Metric(m), fpm)
// Make sure it is in timerange index, too.
has, err := p.archivedFingerprintToTimeRange.Has(fp)
if err != nil {
return err
@ -372,9 +395,6 @@ func (p *persistence) cleanUpArchiveIndexes(
if _, err := p.archivedFingerprintToMetrics.Delete(fp); err != nil {
return err
}
if err := kv.Value(&m); err != nil {
return err
}
series := newMemorySeries(clientmodel.Metric(m), false, clientmodel.Earliest)
cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Now())
if err != nil {
@ -455,3 +475,20 @@ func (p *persistence) rebuildLabelIndexes(
glog.Info("All requests for rebuilding the label indexes queued. (Actual processing may lag behind.)")
return nil
}
// maybeAddMapping adds a fingerprint mapping to fpm if the FastFingerprint of m is different from fp.
func maybeAddMapping(fp clientmodel.Fingerprint, m clientmodel.Metric, fpm fpMappings) {
if rawFP := m.FastFingerprint(); rawFP != fp {
glog.Warningf(
"Metric %v with fingerprint %v is mapped from raw fingerprint %v.",
m, fp, rawFP,
)
if mappedFPs, ok := fpm[rawFP]; ok {
mappedFPs[metricToUniqueString(m)] = fp
} else {
fpm[rawFP] = map[string]clientmodel.Fingerprint{
metricToUniqueString(m): fp,
}
}
}
}

View File

@ -1418,7 +1418,7 @@ loop:
// (4.3.1) The uvarint-encoded length of the unique metric string.
// (4.3.2) The unique metric string.
// (4.3.3) The mapped fingerprint as big-endian uint64.
func (p *persistence) checkpointFPMappings(c fpMappings) (err error) {
func (p *persistence) checkpointFPMappings(fpm fpMappings) (err error) {
glog.Info("Checkpointing fingerprint mappings...")
begin := time.Now()
f, err := os.OpenFile(p.mappingsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
@ -1449,11 +1449,11 @@ func (p *persistence) checkpointFPMappings(c fpMappings) (err error) {
if _, err = codable.EncodeUvarint(w, mappingsFormatVersion); err != nil {
return
}
if _, err = codable.EncodeUvarint(w, uint64(len(c))); err != nil {
if _, err = codable.EncodeUvarint(w, uint64(len(fpm))); err != nil {
return
}
for fp, mappings := range c {
for fp, mappings := range fpm {
if err = codable.EncodeUint64(w, uint64(fp)); err != nil {
return
}