diff --git a/storage/metric/instrumentation.go b/storage/metric/instrumentation.go index 701354e5d..de4d224e2 100644 --- a/storage/metric/instrumentation.go +++ b/storage/metric/instrumentation.go @@ -43,6 +43,8 @@ const ( hasIndexMetric = "has_index_metric" hasLabelName = "has_label_name" hasLabelPair = "has_label_pair" + indexLabelNames = "index_label_names" + indexLabelPairs = "index_label_pairs" indexMetric = "index_metric" indexMetrics = "index_metrics" rebuildDiskFrontier = "rebuild_disk_frontier" diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index 5772f636d..95e078a7c 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -269,6 +269,145 @@ func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[model.Fin return } +// indexLabelNames accumulates all label name to fingerprint index entries for +// the dirty metrics, appends the new dirtied metrics, sorts, and bulk updates +// the index to reflect the new state. +func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint]model.Metric) (err error) { + begin := time.Now() + defer func() { + duration := time.Since(begin) + + recordOutcome(duration, err, map[string]string{operation: indexLabelNames, result: success}, map[string]string{operation: indexLabelNames, result: failure}) + }() + + labelNameFingerprints := map[model.LabelName]utility.Set{} + + for fingerprint, metric := range metrics { + for labelName := range metric { + fingerprintSet, ok := labelNameFingerprints[labelName] + if !ok { + fingerprintSet = utility.Set{} + + fingerprints, err := l.GetFingerprintsForLabelName(labelName) + if err != nil { + panic(err) + return err + } + + for _, fingerprint := range fingerprints { + fingerprintSet.Add(fingerprint) + } + } + + fingerprintSet.Add(fingerprint) + labelNameFingerprints[labelName] = fingerprintSet + } + } + + batch := leveldb.NewBatch() + defer batch.Close() + + for labelName, fingerprintSet := range labelNameFingerprints { + fingerprints := model.Fingerprints{} + for fingerprint := range fingerprintSet { + fingerprints = append(fingerprints, fingerprint.(model.Fingerprint)) + } + + sort.Sort(fingerprints) + + key := &dto.LabelName{ + Name: proto.String(string(labelName)), + } + value := &dto.FingerprintCollection{} + for _, fingerprint := range fingerprints { + value.Member = append(value.Member, fingerprint.ToDTO()) + } + + batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) + } + + err = l.labelNameToFingerprints.Commit(batch) + if err != nil { + panic(err) + return + } + + return +} + +// indexLabelPairs accumulates all label pair to fingerprint index entries for +// the dirty metrics, appends the new dirtied metrics, sorts, and bulk updates +// the index to reflect the new state. +func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint]model.Metric) (err error) { + begin := time.Now() + defer func() { + duration := time.Since(begin) + + recordOutcome(duration, err, map[string]string{operation: indexLabelPairs, result: success}, map[string]string{operation: indexLabelPairs, result: failure}) + }() + + labelPairFingerprints := map[model.LabelPair]utility.Set{} + + for fingerprint, metric := range metrics { + for labelName, labelValue := range metric { + labelPair := model.LabelPair{ + Name: labelName, + Value: labelValue, + } + fingerprintSet, ok := labelPairFingerprints[labelPair] + if !ok { + fingerprintSet = utility.Set{} + + fingerprints, err := l.GetFingerprintsForLabelSet(model.LabelSet{ + labelName: labelValue, + }) + if err != nil { + panic(err) + return err + } + + for _, fingerprint := range fingerprints { + fingerprintSet.Add(fingerprint) + } + } + + fingerprintSet.Add(fingerprint) + labelPairFingerprints[labelPair] = fingerprintSet + } + } + + batch := leveldb.NewBatch() + defer batch.Close() + + for labelPair, fingerprintSet := range labelPairFingerprints { + fingerprints := model.Fingerprints{} + for fingerprint := range fingerprintSet { + fingerprints = append(fingerprints, fingerprint.(model.Fingerprint)) + } + + sort.Sort(fingerprints) + + key := &dto.LabelPair{ + Name: proto.String(string(labelPair.Name)), + Value: proto.String(string(labelPair.Value)), + } + value := &dto.FingerprintCollection{} + for _, fingerprint := range fingerprints { + value.Member = append(value.Member, fingerprint.ToDTO()) + } + + batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) + } + + err = l.labelSetToFingerprints.Commit(batch) + if err != nil { + panic(err) + return + } + + return +} + // indexMetrics takes groups of samples, determines which ones contain metrics // that are unknown to the storage stack, and then proceeds to update all // affected indices. @@ -289,149 +428,35 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri panic(err) } + if len(absentMetrics) == 0 { + return + } + // TODO: For the missing fingerprints, determine what label names and pairs // are absent and act accordingly and append fingerprints. var ( - doneBuildingLabelNameIndex = make(chan interface{}) - doneBuildingLabelPairIndex = make(chan interface{}) + doneBuildingLabelNameIndex = make(chan error) + doneBuildingLabelPairIndex = make(chan error) ) - // Update LabelName -> Fingerprint index. go func() { - labelNameFingerprints := map[model.LabelName]utility.Set{} - - for fingerprint, metric := range absentMetrics { - for labelName := range metric { - fingerprintSet, ok := labelNameFingerprints[labelName] - if !ok { - fingerprintSet = utility.Set{} - - fingerprints, err := l.GetFingerprintsForLabelName(labelName) - if err != nil { - panic(err) - doneBuildingLabelNameIndex <- err - return - } - - for _, fingerprint := range fingerprints { - fingerprintSet.Add(fingerprint) - } - } - - fingerprintSet.Add(fingerprint) - labelNameFingerprints[labelName] = fingerprintSet - } - } - - batch := leveldb.NewBatch() - defer batch.Close() - - for labelName, fingerprintSet := range labelNameFingerprints { - fingerprints := model.Fingerprints{} - for fingerprint := range fingerprintSet { - fingerprints = append(fingerprints, fingerprint.(model.Fingerprint)) - } - - sort.Sort(fingerprints) - - key := &dto.LabelName{ - Name: proto.String(string(labelName)), - } - value := &dto.FingerprintCollection{} - for _, fingerprint := range fingerprints { - value.Member = append(value.Member, fingerprint.ToDTO()) - } - - batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) - } - - err := l.labelNameToFingerprints.Commit(batch) - if err != nil { - panic(err) - doneBuildingLabelNameIndex <- err - return - } - - doneBuildingLabelNameIndex <- true + doneBuildingLabelNameIndex <- l.indexLabelNames(absentMetrics) }() // Update LabelPair -> Fingerprint index. go func() { - labelPairFingerprints := map[model.LabelPair]utility.Set{} - - for fingerprint, metric := range absentMetrics { - for labelName, labelValue := range metric { - labelPair := model.LabelPair{ - Name: labelName, - Value: labelValue, - } - fingerprintSet, ok := labelPairFingerprints[labelPair] - if !ok { - fingerprintSet = utility.Set{} - - fingerprints, err := l.GetFingerprintsForLabelSet(model.LabelSet{ - labelName: labelValue, - }) - if err != nil { - panic(err) - doneBuildingLabelPairIndex <- err - return - } - - for _, fingerprint := range fingerprints { - fingerprintSet.Add(fingerprint) - } - } - - fingerprintSet.Add(fingerprint) - labelPairFingerprints[labelPair] = fingerprintSet - } - } - - batch := leveldb.NewBatch() - defer batch.Close() - - for labelPair, fingerprintSet := range labelPairFingerprints { - fingerprints := model.Fingerprints{} - for fingerprint := range fingerprintSet { - fingerprints = append(fingerprints, fingerprint.(model.Fingerprint)) - } - - sort.Sort(fingerprints) - - key := &dto.LabelPair{ - Name: proto.String(string(labelPair.Name)), - Value: proto.String(string(labelPair.Value)), - } - value := &dto.FingerprintCollection{} - for _, fingerprint := range fingerprints { - value.Member = append(value.Member, fingerprint.ToDTO()) - } - - batch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value)) - } - - err := l.labelSetToFingerprints.Commit(batch) - if err != nil { - panic(err) - doneBuildingLabelPairIndex <- true - return - } - - doneBuildingLabelPairIndex <- true + doneBuildingLabelPairIndex <- l.indexLabelPairs(absentMetrics) }() makeTopLevelIndex := true - v := <-doneBuildingLabelNameIndex - _, ok := v.(error) - if ok { + err = <-doneBuildingLabelNameIndex + if err != nil { panic(err) makeTopLevelIndex = false } - v = <-doneBuildingLabelPairIndex - _, ok = v.(error) - if ok { + err = <-doneBuildingLabelPairIndex + if err != nil { panic(err) makeTopLevelIndex = false }