diff --git a/storage/metric/instrumentation.go b/storage/metric/instrumentation.go index de4d224e2..7df029f4a 100644 --- a/storage/metric/instrumentation.go +++ b/storage/metric/instrumentation.go @@ -43,6 +43,7 @@ const ( hasIndexMetric = "has_index_metric" hasLabelName = "has_label_name" hasLabelPair = "has_label_pair" + indexFingerprints = "index_fingerprints" indexLabelNames = "index_label_names" indexLabelPairs = "index_label_pairs" indexMetric = "index_metric" diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index 95e078a7c..029bd493c 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -272,6 +272,8 @@ func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[model.Fin // indexLabelNames accumulates all label name to fingerprint index entries for // the dirty metrics, appends the new dirtied metrics, sorts, and bulk updates // the index to reflect the new state. +// +// This operation is idempotent. func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint]model.Metric) (err error) { begin := time.Now() defer func() { @@ -338,6 +340,8 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint // indexLabelPairs accumulates all label pair to fingerprint index entries for // the dirty metrics, appends the new dirtied metrics, sorts, and bulk updates // the index to reflect the new state. +// +// This operation is idempotent. func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint]model.Metric) (err error) { begin := time.Now() defer func() { @@ -408,6 +412,35 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint return } +// indexFingerprints updates all of the Fingerprint to Metric reverse lookups +// in the index and then bulk updates. +// +// This operation is idempotent. +func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerprint]model.Metric) (err error) { + begin := time.Now() + defer func() { + duration := time.Since(begin) + + recordOutcome(duration, err, map[string]string{operation: indexFingerprints, result: success}, map[string]string{operation: indexFingerprints, result: failure}) + }() + + batch := leveldb.NewBatch() + defer batch.Close() + + for fingerprint, metric := range metrics { + key := coding.NewProtocolBufferEncoder(fingerprint.ToDTO()) + value := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric)) + batch.Put(key, value) + } + + err = l.fingerprintToMetrics.Commit(batch) + if err != nil { + panic(err) + } + + return +} + // indexMetrics takes groups of samples, determines which ones contain metrics // that are unknown to the storage stack, and then proceeds to update all // affected indices. @@ -435,19 +468,23 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri // TODO: For the missing fingerprints, determine what label names and pairs // are absent and act accordingly and append fingerprints. var ( - doneBuildingLabelNameIndex = make(chan error) - doneBuildingLabelPairIndex = make(chan error) + doneBuildingLabelNameIndex = make(chan error) + doneBuildingLabelPairIndex = make(chan error) + doneBuildingFingerprintIndex = make(chan error) ) go func() { doneBuildingLabelNameIndex <- l.indexLabelNames(absentMetrics) }() - // Update LabelPair -> Fingerprint index. go func() { doneBuildingLabelPairIndex <- l.indexLabelPairs(absentMetrics) }() + go func() { + doneBuildingFingerprintIndex <- l.indexFingerprints(absentMetrics) + }() + makeTopLevelIndex := true err = <-doneBuildingLabelNameIndex @@ -460,27 +497,17 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri panic(err) makeTopLevelIndex = false } - - // Update the Metric existence index. - - if len(absentMetrics) > 0 { - batch := leveldb.NewBatch() - defer batch.Close() - - for fingerprint, metric := range absentMetrics { - key := coding.NewProtocolBufferEncoder(fingerprint.ToDTO()) - value := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric)) - batch.Put(key, value) - } - - err = l.fingerprintToMetrics.Commit(batch) - if err != nil { - panic(err) - // Critical - log.Println(err) - } + err = <-doneBuildingFingerprintIndex + if err != nil { + panic(err) + makeTopLevelIndex = false } + // If any of the preceding operations failed, we will have inconsistent + // indices. Thusly, the Metric membership index should NOT be updated, as + // its state is used to determine whether to bulk update the other indices. + // Given that those operations are idempotent, it is OK to repeat them; + // however, it will consume considerable amounts of time. if makeTopLevelIndex { batch := leveldb.NewBatch() defer batch.Close()