Extract finding unindexed metrics.
This commit is contained in:
parent
67300af137
commit
84acfed061
|
@ -47,11 +47,11 @@ func SampleToMetricDTO(s *Sample) *dto.Metric {
|
|||
}
|
||||
}
|
||||
|
||||
func MetricToDTO(m *Metric) *dto.Metric {
|
||||
metricLength := len(*m)
|
||||
func MetricToDTO(m Metric) *dto.Metric {
|
||||
metricLength := len(m)
|
||||
labelNames := make([]string, 0, metricLength)
|
||||
|
||||
for labelName := range *m {
|
||||
for labelName := range m {
|
||||
labelNames = append(labelNames, string(labelName))
|
||||
}
|
||||
|
||||
|
@ -61,7 +61,7 @@ func MetricToDTO(m *Metric) *dto.Metric {
|
|||
|
||||
for _, labelName := range labelNames {
|
||||
l := LabelName(labelName)
|
||||
labelValue := (*m)[l]
|
||||
labelValue := m[l]
|
||||
labelPair := &dto.LabelPair{
|
||||
Name: proto.String(string(labelName)),
|
||||
Value: proto.String(string(labelValue)),
|
||||
|
|
|
@ -31,6 +31,7 @@ const (
|
|||
appendLabelPairFingerprint = "append_label_pair_fingerprint"
|
||||
appendSample = "append_sample"
|
||||
appendSamples = "append_samples"
|
||||
findUnindexedMetrics = "find_unindexed_metrics"
|
||||
flushMemory = "flush_memory"
|
||||
getBoundaryValues = "get_boundary_values"
|
||||
getFingerprintsForLabelName = "get_fingerprints_for_label_name"
|
||||
|
@ -43,7 +44,7 @@ const (
|
|||
hasLabelName = "has_label_name"
|
||||
hasLabelPair = "has_label_pair"
|
||||
indexMetric = "index_metric"
|
||||
indexSamples = "index_samples"
|
||||
indexMetrics = "index_metrics"
|
||||
rebuildDiskFrontier = "rebuild_disk_frontier"
|
||||
renderView = "render_view"
|
||||
setLabelNameFingerprints = "set_label_name_fingerprints"
|
||||
|
|
|
@ -239,38 +239,58 @@ func groupByFingerprint(samples model.Samples) map[model.Fingerprint]model.Sampl
|
|||
return fingerprintToSamples
|
||||
}
|
||||
|
||||
// indexSamples takes groups of samples, determines which ones contain metrics
|
||||
// that are unknown to the storage stack, and then proceeds to update all
|
||||
// affected indices.
|
||||
func (l *LevelDBMetricPersistence) indexSamples(groups map[model.Fingerprint]model.Samples) (err error) {
|
||||
// findUnindexedMetrics scours the metric membership index for each given Metric
|
||||
// in the keyspace and returns a map of Fingerprint-Metric pairs that are
|
||||
// absent.
|
||||
func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[model.Fingerprint]model.Metric) (unindexed map[model.Fingerprint]model.Metric, err error) {
|
||||
begin := time.Now()
|
||||
defer func() {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: indexSamples, result: success}, map[string]string{operation: indexSamples, result: failure})
|
||||
recordOutcome(duration, err, map[string]string{operation: findUnindexedMetrics, result: success}, map[string]string{operation: findUnindexedMetrics, result: failure})
|
||||
}()
|
||||
|
||||
var (
|
||||
absentFingerprints = map[model.Fingerprint]model.Samples{}
|
||||
)
|
||||
unindexed = make(map[model.Fingerprint]model.Metric)
|
||||
|
||||
// Determine which metrics are unknown in the database.
|
||||
|
||||
for fingerprint, samples := range groups {
|
||||
sample := samples[0]
|
||||
metricDTO := model.SampleToMetricDTO(&sample)
|
||||
indexHas, err := l.hasIndexMetric(metricDTO)
|
||||
for fingerprint, metric := range candidates {
|
||||
var (
|
||||
dto = model.MetricToDTO(metric)
|
||||
indexHas, err = l.hasIndexMetric(dto)
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if !indexHas {
|
||||
absentFingerprints[fingerprint] = samples
|
||||
unindexed[fingerprint] = metric
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// indexMetrics takes groups of samples, determines which ones contain metrics
|
||||
// that are unknown to the storage stack, and then proceeds to update all
|
||||
// affected indices.
|
||||
func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerprint]model.Metric) (err error) {
|
||||
begin := time.Now()
|
||||
defer func() {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: indexMetrics, result: success}, map[string]string{operation: indexMetrics, result: failure})
|
||||
}()
|
||||
|
||||
var (
|
||||
absentMetrics map[model.Fingerprint]model.Metric
|
||||
)
|
||||
|
||||
absentMetrics, err = l.findUnindexedMetrics(fingerprints)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// TODO: For the missing fingerprints, determine what label names and pairs
|
||||
// are absent and act accordingly and append fingerprints.
|
||||
|
||||
var (
|
||||
doneBuildingLabelNameIndex = make(chan interface{})
|
||||
doneBuildingLabelPairIndex = make(chan interface{})
|
||||
|
@ -280,8 +300,7 @@ func (l *LevelDBMetricPersistence) indexSamples(groups map[model.Fingerprint]mod
|
|||
go func() {
|
||||
labelNameFingerprints := map[model.LabelName]utility.Set{}
|
||||
|
||||
for fingerprint, samples := range absentFingerprints {
|
||||
metric := samples[0].Metric
|
||||
for fingerprint, metric := range absentMetrics {
|
||||
for labelName := range metric {
|
||||
fingerprintSet, ok := labelNameFingerprints[labelName]
|
||||
if !ok {
|
||||
|
@ -340,8 +359,7 @@ func (l *LevelDBMetricPersistence) indexSamples(groups map[model.Fingerprint]mod
|
|||
go func() {
|
||||
labelPairFingerprints := map[model.LabelPair]utility.Set{}
|
||||
|
||||
for fingerprint, samples := range absentFingerprints {
|
||||
metric := samples[0].Metric
|
||||
for fingerprint, metric := range absentMetrics {
|
||||
for labelName, labelValue := range metric {
|
||||
labelPair := model.LabelPair{
|
||||
Name: labelName,
|
||||
|
@ -420,16 +438,14 @@ func (l *LevelDBMetricPersistence) indexSamples(groups map[model.Fingerprint]mod
|
|||
|
||||
// Update the Metric existence index.
|
||||
|
||||
if len(absentFingerprints) > 0 {
|
||||
if len(absentMetrics) > 0 {
|
||||
batch := leveldb.NewBatch()
|
||||
defer batch.Close()
|
||||
|
||||
for fingerprint, samples := range absentFingerprints {
|
||||
for _, sample := range samples {
|
||||
key := coding.NewProtocolBufferEncoder(fingerprint.ToDTO())
|
||||
value := coding.NewProtocolBufferEncoder(model.SampleToMetricDTO(&sample))
|
||||
batch.Put(key, value)
|
||||
}
|
||||
for fingerprint, metric := range absentMetrics {
|
||||
key := coding.NewProtocolBufferEncoder(fingerprint.ToDTO())
|
||||
value := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric))
|
||||
batch.Put(key, value)
|
||||
}
|
||||
|
||||
err = l.fingerprintToMetrics.Commit(batch)
|
||||
|
@ -445,9 +461,8 @@ func (l *LevelDBMetricPersistence) indexSamples(groups map[model.Fingerprint]mod
|
|||
defer batch.Close()
|
||||
|
||||
// WART: We should probably encode simple fingerprints.
|
||||
for _, samples := range absentFingerprints {
|
||||
sample := samples[0]
|
||||
key := coding.NewProtocolBufferEncoder(model.SampleToMetricDTO(&sample))
|
||||
for _, metric := range absentMetrics {
|
||||
key := coding.NewProtocolBufferEncoder(model.MetricToDTO(metric))
|
||||
batch.Put(key, key)
|
||||
}
|
||||
|
||||
|
@ -477,7 +492,15 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
|
|||
)
|
||||
|
||||
go func(groups map[model.Fingerprint]model.Samples) {
|
||||
indexErrChan <- l.indexSamples(groups)
|
||||
var (
|
||||
metrics = map[model.Fingerprint]model.Metric{}
|
||||
)
|
||||
|
||||
for fingerprint, samples := range groups {
|
||||
metrics[fingerprint] = samples[0].Metric
|
||||
}
|
||||
|
||||
indexErrChan <- l.indexMetrics(metrics)
|
||||
}(fingerprintToSamples)
|
||||
|
||||
go func() {
|
||||
|
|
Loading…
Reference in New Issue