diff --git a/storage/local/mapper.go b/storage/local/mapper.go index e31df2b9b..cbd58d690 100644 --- a/storage/local/mapper.go +++ b/storage/local/mapper.go @@ -72,13 +72,20 @@ func newFPMapper(fpToSeries *seriesMap, p *persistence) (*fpMapper, error) { return m, nil } +// checkpoint persists the current mappings. The caller has to ensure that the +// provided mappings are not changed concurrently. This method is only called +// upon shutdown, when no samples are ingested anymore. +func (m *fpMapper) checkpoint() error { + return m.p.checkpointFPMappings(m.mappings) +} + // mapFP takes a raw fingerprint (as returned by Metrics.FastFingerprint) and // returns a truly unique fingerprint. The caller must have locked the raw // fingerprint. // // If an error is encountered, it is returned together with the unchanged raw // fingerprint. -func (m *fpMapper) mapFP(fp model.Fingerprint, metric model.Metric) (model.Fingerprint, error) { +func (m *fpMapper) mapFP(fp model.Fingerprint, metric model.Metric) model.Fingerprint { // First check if we are in the reserved FP space, in which case this is // automatically a collision that has to be mapped. if fp <= maxMappedFP { @@ -92,7 +99,7 @@ func (m *fpMapper) mapFP(fp model.Fingerprint, metric model.Metric) (model.Finge // FP exists in memory, but is it for the same metric? if metric.Equal(s.metric) { // Yupp. We are done. - return fp, nil + return fp } // Collision detected! return m.maybeAddMapping(fp, metric) @@ -110,28 +117,30 @@ func (m *fpMapper) mapFP(fp model.Fingerprint, metric model.Metric) (model.Finge mappedFP, ok := mappedFPs[ms] if ok { // Historical mapping found, return the mapped FP. - return mappedFP, nil + return mappedFP } } // If we are here, FP does not exist in memory and is either not mapped // at all, or existing mappings for FP are not for m. Check if we have // something for FP in the archive. archivedMetric, err := m.p.archivedMetric(fp) - if err != nil { - return fp, err + if err != nil || archivedMetric == nil { + // Either the archive lookup has returend an error, or fp does + // not exist in the archive. In the former case, the storage has + // been marked as dirty already. We just carry on for as long as + // it goes, assuming that fp does not exist. In either case, + // since now we know (or assume) now that fp does not exist, + // neither in memory nor in archive, we can safely keep it + // unmapped. + return fp } - if archivedMetric != nil { - // FP exists in archive, but is it for the same metric? - if metric.Equal(archivedMetric) { - // Yupp. We are done. - return fp, nil - } - // Collision detected! - return m.maybeAddMapping(fp, metric) + // FP exists in archive, but is it for the same metric? + if metric.Equal(archivedMetric) { + // Yupp. We are done. + return fp } - // As fp does not exist, neither in memory nor in archive, we can safely - // keep it unmapped. - return fp, nil + // Collision detected! + return m.maybeAddMapping(fp, metric) } // maybeAddMapping is only used internally. It takes a detected collision and @@ -140,7 +149,7 @@ func (m *fpMapper) mapFP(fp model.Fingerprint, metric model.Metric) (model.Finge func (m *fpMapper) maybeAddMapping( fp model.Fingerprint, collidingMetric model.Metric, -) (model.Fingerprint, error) { +) model.Fingerprint { ms := metricToUniqueString(collidingMetric) m.mtx.RLock() mappedFPs, ok := m.mappings[fp] @@ -149,20 +158,16 @@ func (m *fpMapper) maybeAddMapping( // fp is locked by the caller, so no further locking required. mappedFP, ok := mappedFPs[ms] if ok { - return mappedFP, nil // Existing mapping. + return mappedFP // Existing mapping. } // A new mapping has to be created. mappedFP = m.nextMappedFP() mappedFPs[ms] = mappedFP - m.mtx.Lock() - // Checkpoint mappings after each change. - err := m.p.checkpointFPMappings(m.mappings) - m.mtx.Unlock() log.Infof( "Collision detected for fingerprint %v, metric %v, mapping to new fingerprint %v.", fp, collidingMetric, mappedFP, ) - return mappedFP, err + return mappedFP } // This is the first collision for fp. mappedFP := m.nextMappedFP() @@ -170,14 +175,12 @@ func (m *fpMapper) maybeAddMapping( m.mtx.Lock() m.mappings[fp] = mappedFPs m.mappingsCounter.Inc() - // Checkpoint mappings after each change. - err := m.p.checkpointFPMappings(m.mappings) m.mtx.Unlock() log.Infof( "Collision detected for fingerprint %v, metric %v, mapping to new fingerprint %v.", fp, collidingMetric, mappedFP, ) - return mappedFP, err + return mappedFP } func (m *fpMapper) nextMappedFP() model.Fingerprint { diff --git a/storage/local/mapper_test.go b/storage/local/mapper_test.go index f37a01657..4d036ebb2 100644 --- a/storage/local/mapper_test.go +++ b/storage/local/mapper_test.go @@ -63,22 +63,13 @@ func TestFPMapper(t *testing.T) { defer closer.Close() mapper, err := newFPMapper(sm, p) - if err != nil { - t.Fatal(err) - } // Everything is empty, resolving a FP should do nothing. - gotFP, err := mapper.mapFP(fp1, cm11) - if err != nil { - t.Fatal(err) - } + gotFP := mapper.mapFP(fp1, cm11) if wantFP := fp1; gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp1, cm12) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp1, cm12) if wantFP := fp1; gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } @@ -86,184 +77,115 @@ func TestFPMapper(t *testing.T) { // cm11 is in sm. Adding cm11 should do nothing. Mapping cm12 should resolve // the collision. sm.put(fp1, &memorySeries{metric: cm11}) - gotFP, err = mapper.mapFP(fp1, cm11) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp1, cm11) if wantFP := fp1; gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp1, cm12) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp1, cm12) if wantFP := model.Fingerprint(1); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } // The mapped cm12 is added to sm, too. That should not change the outcome. sm.put(model.Fingerprint(1), &memorySeries{metric: cm12}) - gotFP, err = mapper.mapFP(fp1, cm11) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp1, cm11) if wantFP := fp1; gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp1, cm12) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp1, cm12) if wantFP := model.Fingerprint(1); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } // Now map cm13, should reproducibly result in the next mapped FP. - gotFP, err = mapper.mapFP(fp1, cm13) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp1, cm13) if wantFP := model.Fingerprint(2); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp1, cm13) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp1, cm13) if wantFP := model.Fingerprint(2); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } // Add cm13 to sm. Should not change anything. sm.put(model.Fingerprint(2), &memorySeries{metric: cm13}) - gotFP, err = mapper.mapFP(fp1, cm11) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp1, cm11) if wantFP := fp1; gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp1, cm12) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp1, cm12) if wantFP := model.Fingerprint(1); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp1, cm13) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp1, cm13) if wantFP := model.Fingerprint(2); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } // Now add cm21 and cm22 in the same way, checking the mapped FPs. - gotFP, err = mapper.mapFP(fp2, cm21) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp2, cm21) if wantFP := fp2; gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } sm.put(fp2, &memorySeries{metric: cm21}) - gotFP, err = mapper.mapFP(fp2, cm21) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp2, cm21) if wantFP := fp2; gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp2, cm22) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp2, cm22) if wantFP := model.Fingerprint(3); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } sm.put(model.Fingerprint(3), &memorySeries{metric: cm22}) - gotFP, err = mapper.mapFP(fp2, cm21) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp2, cm21) if wantFP := fp2; gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp2, cm22) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp2, cm22) if wantFP := model.Fingerprint(3); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } // Map cm31, resulting in a mapping straight away. - gotFP, err = mapper.mapFP(fp3, cm31) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp3, cm31) if wantFP := model.Fingerprint(4); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } sm.put(model.Fingerprint(4), &memorySeries{metric: cm31}) // Map cm32, which is now mapped for two reasons... - gotFP, err = mapper.mapFP(fp3, cm32) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp3, cm32) if wantFP := model.Fingerprint(5); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } sm.put(model.Fingerprint(5), &memorySeries{metric: cm32}) // Now check ALL the mappings, just to be sure. - gotFP, err = mapper.mapFP(fp1, cm11) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp1, cm11) if wantFP := fp1; gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp1, cm12) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp1, cm12) if wantFP := model.Fingerprint(1); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp1, cm13) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp1, cm13) if wantFP := model.Fingerprint(2); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp2, cm21) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp2, cm21) if wantFP := fp2; gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp2, cm22) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp2, cm22) if wantFP := model.Fingerprint(3); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp3, cm31) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp3, cm31) if wantFP := model.Fingerprint(4); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp3, cm32) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp3, cm32) if wantFP := model.Fingerprint(5); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } @@ -273,55 +195,38 @@ func TestFPMapper(t *testing.T) { sm.del(fp1) sm.del(fp2) sm.del(fp3) - gotFP, err = mapper.mapFP(fp1, cm11) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp1, cm11) if wantFP := fp1; gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp1, cm12) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp1, cm12) if wantFP := model.Fingerprint(1); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp1, cm13) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp1, cm13) if wantFP := model.Fingerprint(2); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp2, cm21) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp2, cm21) if wantFP := fp2; gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp2, cm22) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp2, cm22) if wantFP := model.Fingerprint(3); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp3, cm31) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp3, cm31) if wantFP := model.Fingerprint(4); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp3, cm32) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp3, cm32) if wantFP := model.Fingerprint(5); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } + err = mapper.checkpoint() + if err != nil { + t.Fatal(err) + } // Load the mapper anew from disk and then check all the mappings again // to make sure all changes have made it to disk. @@ -329,52 +234,32 @@ func TestFPMapper(t *testing.T) { if err != nil { t.Fatal(err) } - gotFP, err = mapper.mapFP(fp1, cm11) - if err != nil { - t.Fatal(err) - } + + gotFP = mapper.mapFP(fp1, cm11) if wantFP := fp1; gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp1, cm12) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp1, cm12) if wantFP := model.Fingerprint(1); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp1, cm13) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp1, cm13) if wantFP := model.Fingerprint(2); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp2, cm21) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp2, cm21) if wantFP := fp2; gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp2, cm22) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp2, cm22) if wantFP := model.Fingerprint(3); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp3, cm31) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp3, cm31) if wantFP := model.Fingerprint(4); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp3, cm32) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp3, cm32) if wantFP := model.Fingerprint(5); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } @@ -386,17 +271,11 @@ func TestFPMapper(t *testing.T) { // the archive). sm.put(fp1, &memorySeries{metric: cm12}) p.archiveMetric(fp2, cm22, 0, 0) - gotFP, err = mapper.mapFP(fp1, cm12) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp1, cm12) if wantFP := fp1; gotFP != wantFP { // No mapping happened. t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } - gotFP, err = mapper.mapFP(fp2, cm22) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp2, cm22) if wantFP := model.Fingerprint(3); gotFP != wantFP { // Old mapping still applied. t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } @@ -405,10 +284,7 @@ func TestFPMapper(t *testing.T) { // archived metric is detected. Again, this is a pathological situation // that must never happen in real operations. It's just staged here to // test the expected behavior. - gotFP, err = mapper.mapFP(fp2, cm21) - if err != nil { - t.Fatal(err) - } + gotFP = mapper.mapFP(fp2, cm21) if wantFP := model.Fingerprint(6); gotFP != wantFP { t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) } diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 68bc612f0..9ade64806 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -1327,8 +1327,10 @@ loop: close(p.indexingStopped) } -// checkpointFPMappings persists the fingerprint mappings. This method is not -// goroutine-safe. +// checkpointFPMappings persists the fingerprint mappings. The caller has to +// ensure that the provided mappings are not changed concurrently. This method +// is only called upon shutdown or during crash recovery, when no samples are +// ingested. // // Description of the file format, v1: // diff --git a/storage/local/storage.go b/storage/local/storage.go index 79ece03d3..a1b915d8e 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -356,6 +356,9 @@ func (s *memorySeriesStorage) Stop() error { if err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker); err != nil { return err } + if err := s.mapper.checkpoint(); err != nil { + return err + } if err := s.persistence.close(); err != nil { return err @@ -587,14 +590,10 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error { } rawFP := sample.Metric.FastFingerprint() s.fpLocker.Lock(rawFP) - fp, err := s.mapper.mapFP(rawFP, sample.Metric) + fp := s.mapper.mapFP(rawFP, sample.Metric) defer func() { s.fpLocker.Unlock(fp) }() // Func wrapper because fp might change below. - if err != nil { - s.persistence.setDirty(fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err)) - return err - } if fp != rawFP { // Switch locks. s.fpLocker.Unlock(rawFP) diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 255878f05..c3c0948a2 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -1683,10 +1683,7 @@ func verifyStorageRandom(t testing.TB, s *memorySeriesStorage, samples model.Sam result := true for _, i := range rand.Perm(len(samples)) { sample := samples[i] - fp, err := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric) - if err != nil { - t.Fatal(err) - } + fp := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric) p := s.NewPreloader() it := p.PreloadInstant(fp, sample.Timestamp, 0) found := it.ValueAtOrBeforeTime(sample.Timestamp) @@ -1726,10 +1723,7 @@ func verifyStorageSequential(t testing.TB, s *memorySeriesStorage, samples model p.Close() }() for i, sample := range samples { - newFP, err := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric) - if err != nil { - t.Fatal(err) - } + newFP := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric) if it == nil || newFP != fp { fp = newFP p.Close() @@ -1782,10 +1776,7 @@ func TestAppendOutOfOrder(t *testing.T) { }) } - fp, err := s.mapper.mapFP(m.FastFingerprint(), m) - if err != nil { - t.Fatal(err) - } + fp := s.mapper.mapFP(m.FastFingerprint(), m) pl := s.NewPreloader() defer pl.Close()