Swap out fingerprinting infrastructure.

All old database entries should be deleted.  :-(
This commit is contained in:
Matt T. Proud 2013-02-08 14:13:30 +01:00
parent 1c74eedf76
commit 4502b49524
6 changed files with 58 additions and 91 deletions

View File

@ -17,9 +17,7 @@ import (
"code.google.com/p/goprotobuf/proto"
"crypto/md5"
"encoding/hex"
"errors"
dto "github.com/prometheus/prometheus/model/generated"
"io"
"sort"
"time"
)
@ -79,12 +77,6 @@ func MetricToDTO(m *Metric) *dto.Metric {
}
}
func StringToFingerprint(v string) Fingerprint {
hash := md5.New()
io.WriteString(hash, v)
return Fingerprint(hex.EncodeToString(hash.Sum([]byte{})))
}
func BytesToFingerprint(v []byte) Fingerprint {
hash := md5.New()
hash.Write(v)
@ -135,19 +127,6 @@ func FingerprintToDTO(f *Fingerprint) *dto.Fingerprint {
}
}
func MessageToFingerprintDTO(message proto.Message) (*dto.Fingerprint, error) {
if messageByteArray, marshalError := proto.Marshal(message); marshalError == nil {
fingerprint := BytesToFingerprint(messageByteArray)
return &dto.Fingerprint{
Signature: proto.String(string(fingerprint)),
}, nil
} else {
return nil, marshalError
}
return nil, errors.New("Unknown error in generating FingerprintDTO from message.")
}
func SampleFromDTO(m *Metric, t *time.Time, v *dto.SampleValue) *Sample {
s := &Sample{
Value: SampleValue(*v.Value),
@ -158,3 +137,9 @@ func SampleFromDTO(m *Metric, t *time.Time, v *dto.SampleValue) *Sample {
return s
}
func (f Fingerprint) ToDTO() *dto.Fingerprint {
return &dto.Fingerprint{
Signature: proto.String(string(f)),
}
}

View File

@ -50,7 +50,7 @@ type LabelSet map[LabelName]LabelValue
type Metric map[LabelName]LabelValue
// Fingerprint generates a fingerprint for this given Metric.
func (m Metric) Fingerprint() string {
func (m Metric) Fingerprint() Fingerprint {
labelLength := len(m)
labelNames := make([]string, 0, labelLength)
@ -70,7 +70,7 @@ func (m Metric) Fingerprint() string {
}
summer.Write(buffer.Bytes())
return hex.EncodeToString(summer.Sum(nil))
return Fingerprint(hex.EncodeToString(summer.Sum(nil)))
}
// A SampleValue is a representation of a value for a given sample at a given

View File

@ -21,7 +21,7 @@ import (
func testMetric(t test.Tester) {
var scenarios = []struct {
input map[string]string
output string
output Fingerprint
}{
{
input: map[string]string{},
@ -57,5 +57,7 @@ func TestMetric(t *testing.T) {
}
func BenchmarkMetric(b *testing.B) {
testMetric(b)
for i := 0; i < b.N; i++ {
testMetric(b)
}
}

View File

@ -119,63 +119,57 @@ func (l *LevelDBMetricPersistence) GetAllMetrics() ([]model.LabelSet, error) {
}
func (l *LevelDBMetricPersistence) GetSamplesForMetric(metric model.Metric, interval model.Interval) ([]model.Samples, error) {
metricDTO := model.MetricToDTO(&metric)
if iterator, closer, iteratorErr := l.metricSamples.GetIterator(); iteratorErr == nil {
defer closer.Close()
if fingerprintDTO, fingerprintDTOErr := model.MessageToFingerprintDTO(metricDTO); fingerprintDTOErr == nil {
if iterator, closer, iteratorErr := l.metricSamples.GetIterator(); iteratorErr == nil {
defer closer.Close()
fingerprintDTO := metric.Fingerprint().ToDTO()
start := &dto.SampleKey{
Fingerprint: fingerprintDTO,
Timestamp: indexable.EncodeTime(interval.OldestInclusive),
}
start := &dto.SampleKey{
Fingerprint: fingerprintDTO,
Timestamp: indexable.EncodeTime(interval.OldestInclusive),
}
emission := make([]model.Samples, 0)
emission := make([]model.Samples, 0)
if encode, encodeErr := coding.NewProtocolBufferEncoder(start).Encode(); encodeErr == nil {
iterator.Seek(encode)
if encode, encodeErr := coding.NewProtocolBufferEncoder(start).Encode(); encodeErr == nil {
iterator.Seek(encode)
predicate := keyIsAtMostOld(interval.NewestInclusive)
predicate := keyIsAtMostOld(interval.NewestInclusive)
for iterator = iterator; iterator.Valid(); iterator.Next() {
key := &dto.SampleKey{}
value := &dto.SampleValue{}
if keyUnmarshalErr := proto.Unmarshal(iterator.Key(), key); keyUnmarshalErr == nil {
if valueUnmarshalErr := proto.Unmarshal(iterator.Value(), value); valueUnmarshalErr == nil {
if fingerprintsEqual(fingerprintDTO, key.Fingerprint) {
// Wart
if predicate(key) {
emission = append(emission, model.Samples{
Value: model.SampleValue(*value.Value),
Timestamp: indexable.DecodeTime(key.Timestamp),
})
} else {
break
}
for iterator = iterator; iterator.Valid(); iterator.Next() {
key := &dto.SampleKey{}
value := &dto.SampleValue{}
if keyUnmarshalErr := proto.Unmarshal(iterator.Key(), key); keyUnmarshalErr == nil {
if valueUnmarshalErr := proto.Unmarshal(iterator.Value(), value); valueUnmarshalErr == nil {
if fingerprintsEqual(fingerprintDTO, key.Fingerprint) {
// Wart
if predicate(key) {
emission = append(emission, model.Samples{
Value: model.SampleValue(*value.Value),
Timestamp: indexable.DecodeTime(key.Timestamp),
})
} else {
break
}
} else {
return nil, valueUnmarshalErr
break
}
} else {
return nil, keyUnmarshalErr
return nil, valueUnmarshalErr
}
} else {
return nil, keyUnmarshalErr
}
return emission, nil
} else {
log.Printf("Could not encode the start key: %q\n", encodeErr)
return nil, encodeErr
}
return emission, nil
} else {
log.Printf("Could not acquire iterator: %q\n", iteratorErr)
return nil, iteratorErr
log.Printf("Could not encode the start key: %q\n", encodeErr)
return nil, encodeErr
}
} else {
log.Printf("Could not create fingerprint for the metric: %q\n", fingerprintDTOErr)
return nil, fingerprintDTOErr
log.Printf("Could not acquire iterator: %q\n", iteratorErr)
return nil, iteratorErr
}
panic("unreachable")

View File

@ -121,7 +121,7 @@ func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *dto.Lab
return
}
func (l *LevelDBMetricPersistence) appendFingerprints(m *dto.Metric) (err error) {
func (l *LevelDBMetricPersistence) appendFingerprints(sample model.Sample) (err error) {
begin := time.Now()
defer func() {
@ -130,24 +130,22 @@ func (l *LevelDBMetricPersistence) appendFingerprints(m *dto.Metric) (err error)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: appendFingerprints, result: success}, map[string]string{operation: appendFingerprints, result: failure})
}()
fingerprintDTO, err := model.MessageToFingerprintDTO(m)
if err != nil {
return
}
fingerprintDTO := sample.Metric.Fingerprint().ToDTO()
fingerprintKey := coding.NewProtocolBufferEncoder(fingerprintDTO)
metricDTOEncoder := coding.NewProtocolBufferEncoder(m)
metricDTO := model.SampleToMetricDTO(&sample)
metricDTOEncoder := coding.NewProtocolBufferEncoder(metricDTO)
err = l.fingerprintToMetrics.Put(fingerprintKey, metricDTOEncoder)
if err != nil {
return
}
labelCount := len(m.LabelPair)
labelCount := len(metricDTO.LabelPair)
labelPairErrors := make(chan error, labelCount)
labelNameErrors := make(chan error, labelCount)
for _, labelPair := range m.LabelPair {
for _, labelPair := range metricDTO.LabelPair {
go func(labelPair *dto.LabelPair) {
labelNameErrors <- l.appendLabelNameFingerprint(labelPair, fingerprintDTO)
}(labelPair)
@ -191,22 +189,21 @@ func (l *LevelDBMetricPersistence) AppendSample(sample *model.Sample) (err error
return
}
fingerprint := sample.Metric.Fingerprint()
if !indexHas {
err = l.indexMetric(metricDTO)
if err != nil {
return
}
err = l.appendFingerprints(metricDTO)
err = l.appendFingerprints(*sample)
if err != nil {
return
}
}
fingerprintDTO, err := model.MessageToFingerprintDTO(metricDTO)
if err != nil {
return
}
fingerprintDTO := fingerprint.ToDTO()
sampleKeyDTO := &dto.SampleKey{
Fingerprint: fingerprintDTO,

View File

@ -348,12 +348,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(m *model.Metric, t *time.Time,
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getValueAtTime, result: success}, map[string]string{operation: getValueAtTime, result: failure})
}()
d := model.MetricToDTO(m)
f, err := model.MessageToFingerprintDTO(d)
if err != nil {
return
}
f := m.Fingerprint().ToDTO()
// Candidate for Refactoring
k := &dto.SampleKey{
@ -566,13 +561,7 @@ func (l *LevelDBMetricPersistence) GetRangeValues(m *model.Metric, i *model.Inte
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getRangeValues, result: success}, map[string]string{operation: getRangeValues, result: failure})
}()
d := model.MetricToDTO(m)
f, err := model.MessageToFingerprintDTO(d)
if err != nil {
return
}
f := m.Fingerprint().ToDTO()
k := &dto.SampleKey{
Fingerprint: f,