diff --git a/storage/local/mapper.go b/storage/local/mapper.go index 0dca894fa..97b58c923 100644 --- a/storage/local/mapper.go +++ b/storage/local/mapper.go @@ -7,6 +7,7 @@ import ( "sync" "sync/atomic" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/log" clientmodel "github.com/prometheus/client_golang/model" @@ -31,22 +32,31 @@ type fpMapper struct { fpToSeries *seriesMap p *persistence + + mappingsCounter prometheus.Counter } // newFPMapper loads the collision map from the persistence and // returns an fpMapper ready to use. func newFPMapper(fpToSeries *seriesMap, p *persistence) (*fpMapper, error) { - r := &fpMapper{ + m := &fpMapper{ fpToSeries: fpToSeries, p: p, + mappingsCounter: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "fingerprint_mappings_total", + Help: "The total number of fingerprints being mapped to avoid collisions.", + }), } mappings, nextFP, err := p.loadFPMappings() if err != nil { return nil, err } - r.mappings = mappings - r.highestMappedFP = nextFP - return r, nil + m.mappings = mappings + m.mappingsCounter.Set(float64(len(m.mappings))) + m.highestMappedFP = nextFP + return m, nil } // mapFP takes a raw fingerprint (as returned by Metrics.FastFingerprint) and @@ -55,33 +65,33 @@ func newFPMapper(fpToSeries *seriesMap, p *persistence) (*fpMapper, error) { // // If an error is encountered, it is returned together with the unchanged raw // fingerprint. -func (r *fpMapper) mapFP(fp clientmodel.Fingerprint, m clientmodel.Metric) (clientmodel.Fingerprint, error) { +func (m *fpMapper) mapFP(fp clientmodel.Fingerprint, metric clientmodel.Metric) (clientmodel.Fingerprint, error) { // First check if we are in the reserved FP space, in which case this is // automatically a collision that has to be mapped. if fp <= maxMappedFP { - return r.maybeAddMapping(fp, m) + return m.maybeAddMapping(fp, metric) } // Then check the most likely case: This fp belongs to a series that is // already in memory. - s, ok := r.fpToSeries.get(fp) + s, ok := m.fpToSeries.get(fp) if ok { // FP exists in memory, but is it for the same metric? - if m.Equal(s.metric) { + if metric.Equal(s.metric) { // Yupp. We are done. return fp, nil } // Collision detected! - return r.maybeAddMapping(fp, m) + return m.maybeAddMapping(fp, metric) } // Metric is not in memory. Before doing the expensive archive lookup, // check if we have a mapping for this metric in place already. - r.mtx.RLock() - mappedFPs, fpAlreadyMapped := r.mappings[fp] - r.mtx.RUnlock() + m.mtx.RLock() + mappedFPs, fpAlreadyMapped := m.mappings[fp] + m.mtx.RUnlock() if fpAlreadyMapped { // We indeed have mapped fp historically. - ms := metricToUniqueString(m) + ms := metricToUniqueString(metric) // fp is locked by the caller, so no further locking of // 'collisions' required (it is specific to fp). mappedFP, ok := mappedFPs[ms] @@ -93,18 +103,18 @@ func (r *fpMapper) mapFP(fp clientmodel.Fingerprint, m clientmodel.Metric) (clie // If we are here, FP does not exist in memory and is either not mapped // at all, or existing mappings for FP are not for m. Check if we have // something for FP in the archive. - archivedMetric, err := r.p.archivedMetric(fp) + archivedMetric, err := m.p.archivedMetric(fp) if err != nil { return fp, err } if archivedMetric != nil { // FP exists in archive, but is it for the same metric? - if m.Equal(archivedMetric) { + if metric.Equal(archivedMetric) { // Yupp. We are done. return fp, nil } // Collision detected! - return r.maybeAddMapping(fp, m) + return m.maybeAddMapping(fp, metric) } // As fp does not exist, neither in memory nor in archive, we can safely // keep it unmapped. @@ -114,14 +124,14 @@ func (r *fpMapper) mapFP(fp clientmodel.Fingerprint, m clientmodel.Metric) (clie // maybeAddMapping is only used internally. It takes a detected collision and // adds it to the collisions map if not yet there. In any case, it returns the // truly unique fingerprint for the colliding metric. -func (r *fpMapper) maybeAddMapping( +func (m *fpMapper) maybeAddMapping( fp clientmodel.Fingerprint, collidingMetric clientmodel.Metric, ) (clientmodel.Fingerprint, error) { ms := metricToUniqueString(collidingMetric) - r.mtx.RLock() - mappedFPs, ok := r.mappings[fp] - r.mtx.RUnlock() + m.mtx.RLock() + mappedFPs, ok := m.mappings[fp] + m.mtx.RUnlock() if ok { // fp is locked by the caller, so no further locking required. mappedFP, ok := mappedFPs[ms] @@ -129,12 +139,12 @@ func (r *fpMapper) maybeAddMapping( return mappedFP, nil // Existing mapping. } // A new mapping has to be created. - mappedFP = r.nextMappedFP() + mappedFP = m.nextMappedFP() mappedFPs[ms] = mappedFP - r.mtx.RLock() + m.mtx.RLock() // Checkpoint mappings after each change. - err := r.p.checkpointFPMappings(r.mappings) - r.mtx.RUnlock() + err := m.p.checkpointFPMappings(m.mappings) + m.mtx.RUnlock() log.Infof( "Collision detected for fingerprint %v, metric %v, mapping to new fingerprint %v.", fp, collidingMetric, mappedFP, @@ -142,13 +152,14 @@ func (r *fpMapper) maybeAddMapping( return mappedFP, err } // This is the first collision for fp. - mappedFP := r.nextMappedFP() + mappedFP := m.nextMappedFP() mappedFPs = map[string]clientmodel.Fingerprint{ms: mappedFP} - r.mtx.Lock() - r.mappings[fp] = mappedFPs + m.mtx.Lock() + m.mappings[fp] = mappedFPs + m.mappingsCounter.Inc() // Checkpoint mappings after each change. - err := r.p.checkpointFPMappings(r.mappings) - r.mtx.Unlock() + err := m.p.checkpointFPMappings(m.mappings) + m.mtx.Unlock() log.Infof( "Collision detected for fingerprint %v, metric %v, mapping to new fingerprint %v.", fp, collidingMetric, mappedFP, @@ -156,14 +167,24 @@ func (r *fpMapper) maybeAddMapping( return mappedFP, err } -func (r *fpMapper) nextMappedFP() clientmodel.Fingerprint { - mappedFP := clientmodel.Fingerprint(atomic.AddUint64((*uint64)(&r.highestMappedFP), 1)) +func (m *fpMapper) nextMappedFP() clientmodel.Fingerprint { + mappedFP := clientmodel.Fingerprint(atomic.AddUint64((*uint64)(&m.highestMappedFP), 1)) if mappedFP > maxMappedFP { panic(fmt.Errorf("more than %v fingerprints mapped in collision detection", maxMappedFP)) } return mappedFP } +// Describe implements prometheus.Collector. +func (m *fpMapper) Describe(ch chan<- *prometheus.Desc) { + ch <- m.mappingsCounter.Desc() +} + +// Collect implements prometheus.Collector. +func (m *fpMapper) Collect(ch chan<- prometheus.Metric) { + ch <- m.mappingsCounter +} + // metricToUniqueString turns a metric into a string in a reproducible and // unique way, i.e. the same metric will always create the same string, and // different metrics will always create different strings. In a way, it is the diff --git a/storage/local/persistence.go b/storage/local/persistence.go index c2fc3d034..e7f089a3d 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -120,6 +120,7 @@ type persistence struct { indexingBatchSizes prometheus.Summary indexingBatchDuration prometheus.Summary checkpointDuration prometheus.Gauge + dirtyCounter prometheus.Counter dirtyMtx sync.Mutex // Protects dirty and becameDirty. dirty bool // true if persistence was started in dirty state. @@ -237,6 +238,12 @@ func newPersistence(basePath string, dirty, pedanticChecks bool, shouldSync sync Name: "checkpoint_duration_milliseconds", Help: "The duration (in milliseconds) it took to checkpoint in-memory metrics and head chunks.", }), + dirtyCounter: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "inconsistencies_total", + Help: "A counter incremented each time an inconsistency in the local storage is detected. If this is greater zero, restart the server as soon as possible.", + }), dirty: dirty, pedanticChecks: pedanticChecks, dirtyFileName: dirtyPath, @@ -282,6 +289,7 @@ func (p *persistence) Describe(ch chan<- *prometheus.Desc) { p.indexingBatchSizes.Describe(ch) p.indexingBatchDuration.Describe(ch) ch <- p.checkpointDuration.Desc() + ch <- p.dirtyCounter.Desc() } // Collect implements prometheus.Collector. @@ -293,6 +301,7 @@ func (p *persistence) Collect(ch chan<- prometheus.Metric) { p.indexingBatchSizes.Collect(ch) p.indexingBatchDuration.Collect(ch) ch <- p.checkpointDuration + ch <- p.dirtyCounter } // isDirty returns the dirty flag in a goroutine-safe way. @@ -307,6 +316,7 @@ func (p *persistence) isDirty() bool { // dirty during our runtime, there is no way back. If we were dirty from the // start, a clean-up might make us clean again.) func (p *persistence) setDirty(dirty bool) { + p.dirtyCounter.Inc() p.dirtyMtx.Lock() defer p.dirtyMtx.Unlock() if p.becameDirty { diff --git a/storage/local/storage.go b/storage/local/storage.go index e4fda7789..c12a06536 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -1003,6 +1003,7 @@ func (s *memorySeriesStorage) persistenceBacklogScore() float64 { // Describe implements prometheus.Collector. func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { s.persistence.Describe(ch) + s.mapper.Describe(ch) ch <- s.persistErrors.Desc() ch <- maxChunksToPersistDesc @@ -1018,6 +1019,7 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { // Collect implements prometheus.Collector. func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) { s.persistence.Collect(ch) + s.mapper.Collect(ch) ch <- s.persistErrors ch <- prometheus.MustNewConstMetric(