From 4502b4952429d72ca9841acfa1a2b743ca4124ef Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Fri, 8 Feb 2013 14:13:30 +0100 Subject: [PATCH] Swap out fingerprinting infrastructure. All old database entries should be deleted. :-( --- model/dto.go | 27 +++------- model/metric.go | 4 +- model/metric_test.go | 6 ++- storage/metric/leveldb/diagnostics.go | 74 ++++++++++++--------------- storage/metric/leveldb/mutable.go | 23 ++++----- storage/metric/leveldb/reading.go | 15 +----- 6 files changed, 58 insertions(+), 91 deletions(-) diff --git a/model/dto.go b/model/dto.go index 9860eadd9..2e5a889dd 100644 --- a/model/dto.go +++ b/model/dto.go @@ -17,9 +17,7 @@ import ( "code.google.com/p/goprotobuf/proto" "crypto/md5" "encoding/hex" - "errors" dto "github.com/prometheus/prometheus/model/generated" - "io" "sort" "time" ) @@ -79,12 +77,6 @@ func MetricToDTO(m *Metric) *dto.Metric { } } -func StringToFingerprint(v string) Fingerprint { - hash := md5.New() - io.WriteString(hash, v) - return Fingerprint(hex.EncodeToString(hash.Sum([]byte{}))) -} - func BytesToFingerprint(v []byte) Fingerprint { hash := md5.New() hash.Write(v) @@ -135,19 +127,6 @@ func FingerprintToDTO(f *Fingerprint) *dto.Fingerprint { } } -func MessageToFingerprintDTO(message proto.Message) (*dto.Fingerprint, error) { - if messageByteArray, marshalError := proto.Marshal(message); marshalError == nil { - fingerprint := BytesToFingerprint(messageByteArray) - return &dto.Fingerprint{ - Signature: proto.String(string(fingerprint)), - }, nil - } else { - return nil, marshalError - } - - return nil, errors.New("Unknown error in generating FingerprintDTO from message.") -} - func SampleFromDTO(m *Metric, t *time.Time, v *dto.SampleValue) *Sample { s := &Sample{ Value: SampleValue(*v.Value), @@ -158,3 +137,9 @@ func SampleFromDTO(m *Metric, t *time.Time, v *dto.SampleValue) *Sample { return s } + +func (f Fingerprint) ToDTO() *dto.Fingerprint { + return &dto.Fingerprint{ + Signature: proto.String(string(f)), + } +} diff --git a/model/metric.go b/model/metric.go index b18a265b0..c53eccc6d 100644 --- a/model/metric.go +++ b/model/metric.go @@ -50,7 +50,7 @@ type LabelSet map[LabelName]LabelValue type Metric map[LabelName]LabelValue // Fingerprint generates a fingerprint for this given Metric. -func (m Metric) Fingerprint() string { +func (m Metric) Fingerprint() Fingerprint { labelLength := len(m) labelNames := make([]string, 0, labelLength) @@ -70,7 +70,7 @@ func (m Metric) Fingerprint() string { } summer.Write(buffer.Bytes()) - return hex.EncodeToString(summer.Sum(nil)) + return Fingerprint(hex.EncodeToString(summer.Sum(nil))) } // A SampleValue is a representation of a value for a given sample at a given diff --git a/model/metric_test.go b/model/metric_test.go index 9addf7858..34951584d 100644 --- a/model/metric_test.go +++ b/model/metric_test.go @@ -21,7 +21,7 @@ import ( func testMetric(t test.Tester) { var scenarios = []struct { input map[string]string - output string + output Fingerprint }{ { input: map[string]string{}, @@ -57,5 +57,7 @@ func TestMetric(t *testing.T) { } func BenchmarkMetric(b *testing.B) { - testMetric(b) + for i := 0; i < b.N; i++ { + testMetric(b) + } } diff --git a/storage/metric/leveldb/diagnostics.go b/storage/metric/leveldb/diagnostics.go index 1c8fa98d5..1476d7597 100644 --- a/storage/metric/leveldb/diagnostics.go +++ b/storage/metric/leveldb/diagnostics.go @@ -119,63 +119,57 @@ func (l *LevelDBMetricPersistence) GetAllMetrics() ([]model.LabelSet, error) { } func (l *LevelDBMetricPersistence) GetSamplesForMetric(metric model.Metric, interval model.Interval) ([]model.Samples, error) { - metricDTO := model.MetricToDTO(&metric) + if iterator, closer, iteratorErr := l.metricSamples.GetIterator(); iteratorErr == nil { + defer closer.Close() - if fingerprintDTO, fingerprintDTOErr := model.MessageToFingerprintDTO(metricDTO); fingerprintDTOErr == nil { - if iterator, closer, iteratorErr := l.metricSamples.GetIterator(); iteratorErr == nil { - defer closer.Close() + fingerprintDTO := metric.Fingerprint().ToDTO() + start := &dto.SampleKey{ + Fingerprint: fingerprintDTO, + Timestamp: indexable.EncodeTime(interval.OldestInclusive), + } - start := &dto.SampleKey{ - Fingerprint: fingerprintDTO, - Timestamp: indexable.EncodeTime(interval.OldestInclusive), - } + emission := make([]model.Samples, 0) - emission := make([]model.Samples, 0) + if encode, encodeErr := coding.NewProtocolBufferEncoder(start).Encode(); encodeErr == nil { + iterator.Seek(encode) - if encode, encodeErr := coding.NewProtocolBufferEncoder(start).Encode(); encodeErr == nil { - iterator.Seek(encode) + predicate := keyIsAtMostOld(interval.NewestInclusive) - predicate := keyIsAtMostOld(interval.NewestInclusive) - - for iterator = iterator; iterator.Valid(); iterator.Next() { - key := &dto.SampleKey{} - value := &dto.SampleValue{} - if keyUnmarshalErr := proto.Unmarshal(iterator.Key(), key); keyUnmarshalErr == nil { - if valueUnmarshalErr := proto.Unmarshal(iterator.Value(), value); valueUnmarshalErr == nil { - if fingerprintsEqual(fingerprintDTO, key.Fingerprint) { - // Wart - if predicate(key) { - emission = append(emission, model.Samples{ - Value: model.SampleValue(*value.Value), - Timestamp: indexable.DecodeTime(key.Timestamp), - }) - } else { - break - } + for iterator = iterator; iterator.Valid(); iterator.Next() { + key := &dto.SampleKey{} + value := &dto.SampleValue{} + if keyUnmarshalErr := proto.Unmarshal(iterator.Key(), key); keyUnmarshalErr == nil { + if valueUnmarshalErr := proto.Unmarshal(iterator.Value(), value); valueUnmarshalErr == nil { + if fingerprintsEqual(fingerprintDTO, key.Fingerprint) { + // Wart + if predicate(key) { + emission = append(emission, model.Samples{ + Value: model.SampleValue(*value.Value), + Timestamp: indexable.DecodeTime(key.Timestamp), + }) } else { break } } else { - return nil, valueUnmarshalErr + break } } else { - return nil, keyUnmarshalErr + return nil, valueUnmarshalErr } + } else { + return nil, keyUnmarshalErr } - - return emission, nil - - } else { - log.Printf("Could not encode the start key: %q\n", encodeErr) - return nil, encodeErr } + + return emission, nil + } else { - log.Printf("Could not acquire iterator: %q\n", iteratorErr) - return nil, iteratorErr + log.Printf("Could not encode the start key: %q\n", encodeErr) + return nil, encodeErr } } else { - log.Printf("Could not create fingerprint for the metric: %q\n", fingerprintDTOErr) - return nil, fingerprintDTOErr + log.Printf("Could not acquire iterator: %q\n", iteratorErr) + return nil, iteratorErr } panic("unreachable") diff --git a/storage/metric/leveldb/mutable.go b/storage/metric/leveldb/mutable.go index 68409ba23..ccd32d782 100644 --- a/storage/metric/leveldb/mutable.go +++ b/storage/metric/leveldb/mutable.go @@ -121,7 +121,7 @@ func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *dto.Lab return } -func (l *LevelDBMetricPersistence) appendFingerprints(m *dto.Metric) (err error) { +func (l *LevelDBMetricPersistence) appendFingerprints(sample model.Sample) (err error) { begin := time.Now() defer func() { @@ -130,24 +130,22 @@ func (l *LevelDBMetricPersistence) appendFingerprints(m *dto.Metric) (err error) recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: appendFingerprints, result: success}, map[string]string{operation: appendFingerprints, result: failure}) }() - fingerprintDTO, err := model.MessageToFingerprintDTO(m) - if err != nil { - return - } + fingerprintDTO := sample.Metric.Fingerprint().ToDTO() fingerprintKey := coding.NewProtocolBufferEncoder(fingerprintDTO) - metricDTOEncoder := coding.NewProtocolBufferEncoder(m) + metricDTO := model.SampleToMetricDTO(&sample) + metricDTOEncoder := coding.NewProtocolBufferEncoder(metricDTO) err = l.fingerprintToMetrics.Put(fingerprintKey, metricDTOEncoder) if err != nil { return } - labelCount := len(m.LabelPair) + labelCount := len(metricDTO.LabelPair) labelPairErrors := make(chan error, labelCount) labelNameErrors := make(chan error, labelCount) - for _, labelPair := range m.LabelPair { + for _, labelPair := range metricDTO.LabelPair { go func(labelPair *dto.LabelPair) { labelNameErrors <- l.appendLabelNameFingerprint(labelPair, fingerprintDTO) }(labelPair) @@ -191,22 +189,21 @@ func (l *LevelDBMetricPersistence) AppendSample(sample *model.Sample) (err error return } + fingerprint := sample.Metric.Fingerprint() + if !indexHas { err = l.indexMetric(metricDTO) if err != nil { return } - err = l.appendFingerprints(metricDTO) + err = l.appendFingerprints(*sample) if err != nil { return } } - fingerprintDTO, err := model.MessageToFingerprintDTO(metricDTO) - if err != nil { - return - } + fingerprintDTO := fingerprint.ToDTO() sampleKeyDTO := &dto.SampleKey{ Fingerprint: fingerprintDTO, diff --git a/storage/metric/leveldb/reading.go b/storage/metric/leveldb/reading.go index 98cffe1bb..91c1c470e 100644 --- a/storage/metric/leveldb/reading.go +++ b/storage/metric/leveldb/reading.go @@ -348,12 +348,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(m *model.Metric, t *time.Time, recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getValueAtTime, result: success}, map[string]string{operation: getValueAtTime, result: failure}) }() - d := model.MetricToDTO(m) - - f, err := model.MessageToFingerprintDTO(d) - if err != nil { - return - } + f := m.Fingerprint().ToDTO() // Candidate for Refactoring k := &dto.SampleKey{ @@ -566,13 +561,7 @@ func (l *LevelDBMetricPersistence) GetRangeValues(m *model.Metric, i *model.Inte recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getRangeValues, result: success}, map[string]string{operation: getRangeValues, result: failure}) }() - - d := model.MetricToDTO(m) - - f, err := model.MessageToFingerprintDTO(d) - if err != nil { - return - } + f := m.Fingerprint().ToDTO() k := &dto.SampleKey{ Fingerprint: f,