Merge pull request #1555 from prometheus/beorn7/cd

Checkpoint fingerprint mappings only upon shutdown
This commit is contained in:
Björn Rabenstein 2016-04-15 01:05:32 +02:00
commit 096a2ef200
5 changed files with 87 additions and 216 deletions

View File

@ -72,13 +72,20 @@ func newFPMapper(fpToSeries *seriesMap, p *persistence) (*fpMapper, error) {
return m, nil
}
// checkpoint persists the current mappings. The caller has to ensure that the
// provided mappings are not changed concurrently. This method is only called
// upon shutdown, when no samples are ingested anymore.
func (m *fpMapper) checkpoint() error {
return m.p.checkpointFPMappings(m.mappings)
}
// mapFP takes a raw fingerprint (as returned by Metrics.FastFingerprint) and
// returns a truly unique fingerprint. The caller must have locked the raw
// fingerprint.
//
// If an error is encountered, it is returned together with the unchanged raw
// fingerprint.
func (m *fpMapper) mapFP(fp model.Fingerprint, metric model.Metric) (model.Fingerprint, error) {
func (m *fpMapper) mapFP(fp model.Fingerprint, metric model.Metric) model.Fingerprint {
// First check if we are in the reserved FP space, in which case this is
// automatically a collision that has to be mapped.
if fp <= maxMappedFP {
@ -92,7 +99,7 @@ func (m *fpMapper) mapFP(fp model.Fingerprint, metric model.Metric) (model.Finge
// FP exists in memory, but is it for the same metric?
if metric.Equal(s.metric) {
// Yupp. We are done.
return fp, nil
return fp
}
// Collision detected!
return m.maybeAddMapping(fp, metric)
@ -110,28 +117,30 @@ func (m *fpMapper) mapFP(fp model.Fingerprint, metric model.Metric) (model.Finge
mappedFP, ok := mappedFPs[ms]
if ok {
// Historical mapping found, return the mapped FP.
return mappedFP, nil
return mappedFP
}
}
// If we are here, FP does not exist in memory and is either not mapped
// at all, or existing mappings for FP are not for m. Check if we have
// something for FP in the archive.
archivedMetric, err := m.p.archivedMetric(fp)
if err != nil {
return fp, err
if err != nil || archivedMetric == nil {
// Either the archive lookup has returend an error, or fp does
// not exist in the archive. In the former case, the storage has
// been marked as dirty already. We just carry on for as long as
// it goes, assuming that fp does not exist. In either case,
// since now we know (or assume) now that fp does not exist,
// neither in memory nor in archive, we can safely keep it
// unmapped.
return fp
}
if archivedMetric != nil {
// FP exists in archive, but is it for the same metric?
if metric.Equal(archivedMetric) {
// Yupp. We are done.
return fp, nil
}
// Collision detected!
return m.maybeAddMapping(fp, metric)
// FP exists in archive, but is it for the same metric?
if metric.Equal(archivedMetric) {
// Yupp. We are done.
return fp
}
// As fp does not exist, neither in memory nor in archive, we can safely
// keep it unmapped.
return fp, nil
// Collision detected!
return m.maybeAddMapping(fp, metric)
}
// maybeAddMapping is only used internally. It takes a detected collision and
@ -140,7 +149,7 @@ func (m *fpMapper) mapFP(fp model.Fingerprint, metric model.Metric) (model.Finge
func (m *fpMapper) maybeAddMapping(
fp model.Fingerprint,
collidingMetric model.Metric,
) (model.Fingerprint, error) {
) model.Fingerprint {
ms := metricToUniqueString(collidingMetric)
m.mtx.RLock()
mappedFPs, ok := m.mappings[fp]
@ -149,20 +158,16 @@ func (m *fpMapper) maybeAddMapping(
// fp is locked by the caller, so no further locking required.
mappedFP, ok := mappedFPs[ms]
if ok {
return mappedFP, nil // Existing mapping.
return mappedFP // Existing mapping.
}
// A new mapping has to be created.
mappedFP = m.nextMappedFP()
mappedFPs[ms] = mappedFP
m.mtx.Lock()
// Checkpoint mappings after each change.
err := m.p.checkpointFPMappings(m.mappings)
m.mtx.Unlock()
log.Infof(
"Collision detected for fingerprint %v, metric %v, mapping to new fingerprint %v.",
fp, collidingMetric, mappedFP,
)
return mappedFP, err
return mappedFP
}
// This is the first collision for fp.
mappedFP := m.nextMappedFP()
@ -170,14 +175,12 @@ func (m *fpMapper) maybeAddMapping(
m.mtx.Lock()
m.mappings[fp] = mappedFPs
m.mappingsCounter.Inc()
// Checkpoint mappings after each change.
err := m.p.checkpointFPMappings(m.mappings)
m.mtx.Unlock()
log.Infof(
"Collision detected for fingerprint %v, metric %v, mapping to new fingerprint %v.",
fp, collidingMetric, mappedFP,
)
return mappedFP, err
return mappedFP
}
func (m *fpMapper) nextMappedFP() model.Fingerprint {

View File

@ -63,22 +63,13 @@ func TestFPMapper(t *testing.T) {
defer closer.Close()
mapper, err := newFPMapper(sm, p)
if err != nil {
t.Fatal(err)
}
// Everything is empty, resolving a FP should do nothing.
gotFP, err := mapper.mapFP(fp1, cm11)
if err != nil {
t.Fatal(err)
}
gotFP := mapper.mapFP(fp1, cm11)
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp1, cm12)
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
@ -86,184 +77,115 @@ func TestFPMapper(t *testing.T) {
// cm11 is in sm. Adding cm11 should do nothing. Mapping cm12 should resolve
// the collision.
sm.put(fp1, &memorySeries{metric: cm11})
gotFP, err = mapper.mapFP(fp1, cm11)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp1, cm11)
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp1, cm12)
if wantFP := model.Fingerprint(1); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// The mapped cm12 is added to sm, too. That should not change the outcome.
sm.put(model.Fingerprint(1), &memorySeries{metric: cm12})
gotFP, err = mapper.mapFP(fp1, cm11)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp1, cm11)
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp1, cm12)
if wantFP := model.Fingerprint(1); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// Now map cm13, should reproducibly result in the next mapped FP.
gotFP, err = mapper.mapFP(fp1, cm13)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp1, cm13)
if wantFP := model.Fingerprint(2); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm13)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp1, cm13)
if wantFP := model.Fingerprint(2); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// Add cm13 to sm. Should not change anything.
sm.put(model.Fingerprint(2), &memorySeries{metric: cm13})
gotFP, err = mapper.mapFP(fp1, cm11)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp1, cm11)
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp1, cm12)
if wantFP := model.Fingerprint(1); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm13)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp1, cm13)
if wantFP := model.Fingerprint(2); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// Now add cm21 and cm22 in the same way, checking the mapped FPs.
gotFP, err = mapper.mapFP(fp2, cm21)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp2, cm21)
if wantFP := fp2; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
sm.put(fp2, &memorySeries{metric: cm21})
gotFP, err = mapper.mapFP(fp2, cm21)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp2, cm21)
if wantFP := fp2; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm22)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp2, cm22)
if wantFP := model.Fingerprint(3); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
sm.put(model.Fingerprint(3), &memorySeries{metric: cm22})
gotFP, err = mapper.mapFP(fp2, cm21)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp2, cm21)
if wantFP := fp2; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm22)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp2, cm22)
if wantFP := model.Fingerprint(3); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// Map cm31, resulting in a mapping straight away.
gotFP, err = mapper.mapFP(fp3, cm31)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp3, cm31)
if wantFP := model.Fingerprint(4); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
sm.put(model.Fingerprint(4), &memorySeries{metric: cm31})
// Map cm32, which is now mapped for two reasons...
gotFP, err = mapper.mapFP(fp3, cm32)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp3, cm32)
if wantFP := model.Fingerprint(5); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
sm.put(model.Fingerprint(5), &memorySeries{metric: cm32})
// Now check ALL the mappings, just to be sure.
gotFP, err = mapper.mapFP(fp1, cm11)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp1, cm11)
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp1, cm12)
if wantFP := model.Fingerprint(1); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm13)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp1, cm13)
if wantFP := model.Fingerprint(2); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm21)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp2, cm21)
if wantFP := fp2; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm22)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp2, cm22)
if wantFP := model.Fingerprint(3); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp3, cm31)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp3, cm31)
if wantFP := model.Fingerprint(4); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp3, cm32)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp3, cm32)
if wantFP := model.Fingerprint(5); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
@ -273,55 +195,38 @@ func TestFPMapper(t *testing.T) {
sm.del(fp1)
sm.del(fp2)
sm.del(fp3)
gotFP, err = mapper.mapFP(fp1, cm11)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp1, cm11)
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp1, cm12)
if wantFP := model.Fingerprint(1); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm13)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp1, cm13)
if wantFP := model.Fingerprint(2); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm21)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp2, cm21)
if wantFP := fp2; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm22)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp2, cm22)
if wantFP := model.Fingerprint(3); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp3, cm31)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp3, cm31)
if wantFP := model.Fingerprint(4); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp3, cm32)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp3, cm32)
if wantFP := model.Fingerprint(5); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
err = mapper.checkpoint()
if err != nil {
t.Fatal(err)
}
// Load the mapper anew from disk and then check all the mappings again
// to make sure all changes have made it to disk.
@ -329,52 +234,32 @@ func TestFPMapper(t *testing.T) {
if err != nil {
t.Fatal(err)
}
gotFP, err = mapper.mapFP(fp1, cm11)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp1, cm11)
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp1, cm12)
if wantFP := model.Fingerprint(1); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm13)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp1, cm13)
if wantFP := model.Fingerprint(2); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm21)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp2, cm21)
if wantFP := fp2; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm22)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp2, cm22)
if wantFP := model.Fingerprint(3); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp3, cm31)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp3, cm31)
if wantFP := model.Fingerprint(4); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp3, cm32)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp3, cm32)
if wantFP := model.Fingerprint(5); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
@ -386,17 +271,11 @@ func TestFPMapper(t *testing.T) {
// the archive).
sm.put(fp1, &memorySeries{metric: cm12})
p.archiveMetric(fp2, cm22, 0, 0)
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp1, cm12)
if wantFP := fp1; gotFP != wantFP { // No mapping happened.
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm22)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp2, cm22)
if wantFP := model.Fingerprint(3); gotFP != wantFP { // Old mapping still applied.
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
@ -405,10 +284,7 @@ func TestFPMapper(t *testing.T) {
// archived metric is detected. Again, this is a pathological situation
// that must never happen in real operations. It's just staged here to
// test the expected behavior.
gotFP, err = mapper.mapFP(fp2, cm21)
if err != nil {
t.Fatal(err)
}
gotFP = mapper.mapFP(fp2, cm21)
if wantFP := model.Fingerprint(6); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}

View File

@ -1327,8 +1327,10 @@ loop:
close(p.indexingStopped)
}
// checkpointFPMappings persists the fingerprint mappings. This method is not
// goroutine-safe.
// checkpointFPMappings persists the fingerprint mappings. The caller has to
// ensure that the provided mappings are not changed concurrently. This method
// is only called upon shutdown or during crash recovery, when no samples are
// ingested.
//
// Description of the file format, v1:
//

View File

@ -356,6 +356,9 @@ func (s *memorySeriesStorage) Stop() error {
if err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker); err != nil {
return err
}
if err := s.mapper.checkpoint(); err != nil {
return err
}
if err := s.persistence.close(); err != nil {
return err
@ -587,14 +590,10 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error {
}
rawFP := sample.Metric.FastFingerprint()
s.fpLocker.Lock(rawFP)
fp, err := s.mapper.mapFP(rawFP, sample.Metric)
fp := s.mapper.mapFP(rawFP, sample.Metric)
defer func() {
s.fpLocker.Unlock(fp)
}() // Func wrapper because fp might change below.
if err != nil {
s.persistence.setDirty(fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err))
return err
}
if fp != rawFP {
// Switch locks.
s.fpLocker.Unlock(rawFP)

View File

@ -1683,10 +1683,7 @@ func verifyStorageRandom(t testing.TB, s *memorySeriesStorage, samples model.Sam
result := true
for _, i := range rand.Perm(len(samples)) {
sample := samples[i]
fp, err := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric)
if err != nil {
t.Fatal(err)
}
fp := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric)
p := s.NewPreloader()
it := p.PreloadInstant(fp, sample.Timestamp, 0)
found := it.ValueAtOrBeforeTime(sample.Timestamp)
@ -1726,10 +1723,7 @@ func verifyStorageSequential(t testing.TB, s *memorySeriesStorage, samples model
p.Close()
}()
for i, sample := range samples {
newFP, err := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric)
if err != nil {
t.Fatal(err)
}
newFP := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric)
if it == nil || newFP != fp {
fp = newFP
p.Close()
@ -1782,10 +1776,7 @@ func TestAppendOutOfOrder(t *testing.T) {
})
}
fp, err := s.mapper.mapFP(m.FastFingerprint(), m)
if err != nil {
t.Fatal(err)
}
fp := s.mapper.mapFP(m.FastFingerprint(), m)
pl := s.NewPreloader()
defer pl.Close()