From a14dbd5bd0f8fee10802df7d543bf4be84d14935 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Wed, 19 Dec 2012 20:34:54 +0100 Subject: [PATCH 1/2] Interim commit for Julius. --- model/dto.go | 12 + model/metric.go | 5 + storage/metric/interface.go | 4 +- storage/metric/leveldb/reading.go | 221 +++++++++++++++++- .../metric/leveldb/rule_integration_test.go | 136 +++++++++++ 5 files changed, 374 insertions(+), 4 deletions(-) create mode 100644 storage/metric/leveldb/rule_integration_test.go diff --git a/model/dto.go b/model/dto.go index 80db6806b..113ba0676 100644 --- a/model/dto.go +++ b/model/dto.go @@ -21,6 +21,7 @@ import ( dto "github.com/matttproud/prometheus/model/generated" "io" "sort" + "time" ) func SampleToMetricDTO(s *Sample) *dto.Metric { @@ -146,3 +147,14 @@ func MessageToFingerprintDTO(message proto.Message) (*dto.Fingerprint, error) { return nil, errors.New("Unknown error in generating FingerprintDTO from message.") } + +func SampleFromDTO(m *Metric, t *time.Time, v *dto.SampleValue) *Sample { + s := &Sample{ + Value: SampleValue(*v.Value), + Timestamp: *t, + } + + s.Metric = *m + + return s +} diff --git a/model/metric.go b/model/metric.go index fdb2802b3..e73e2597d 100644 --- a/model/metric.go +++ b/model/metric.go @@ -14,6 +14,7 @@ package model import ( + "fmt" "time" ) @@ -45,6 +46,10 @@ type Metric map[LabelName]LabelValue // remedied down the road. type SampleValue float32 +func (s SampleValue) String() string { + return fmt.Sprintf("%f", s) +} + type Sample struct { Metric Metric Value SampleValue diff --git a/storage/metric/interface.go b/storage/metric/interface.go index 05c1865c2..55685c686 100644 --- a/storage/metric/interface.go +++ b/storage/metric/interface.go @@ -19,8 +19,8 @@ import ( ) type StalenessPolicy struct { - AllowStale bool - InterpolationInterval time.Duration + AllowStale bool + MaximumStaleness time.Duration } // MetricPersistence is a system for storing metric samples in a persistence diff --git a/storage/metric/leveldb/reading.go b/storage/metric/leveldb/reading.go index 62cc306c2..eeb8eee60 100644 --- a/storage/metric/leveldb/reading.go +++ b/storage/metric/leveldb/reading.go @@ -14,6 +14,7 @@ package leveldb import ( + "bytes" "code.google.com/p/goprotobuf/proto" "errors" "github.com/matttproud/prometheus/coding" @@ -209,8 +210,224 @@ func (l *LevelDBMetricPersistence) GetFirstValue(m *model.Metric) (*model.Sample panic("not implemented") } -func (l *LevelDBMetricPersistence) GetValueAtTime(m *model.Metric, t *time.Time, s *metric.StalenessPolicy) (*model.Sample, error) { - panic("not implemented") +func interpolate(x1, x2 time.Time, y1, y2 float32, e time.Time) float32 { + yDelta := y2 - y1 + xDelta := x2.Sub(x1) + + dDt := yDelta / float32(xDelta) + offset := float32(e.Sub(x1)) + + return y1 + (offset * dDt) +} + +type iterator interface { + Close() + Key() []byte + Next() + Prev() + Seek([]byte) + SeekToFirst() + SeekToLast() + Valid() bool + Value() []byte +} + +func isKeyInsideRecordedInterval(k *dto.SampleKey, i iterator) (b bool, err error) { + byteKey, err := coding.NewProtocolBufferEncoder(k).Encode() + if err != nil { + return + } + + i.Seek(byteKey) + if !i.Valid() { + return + } + + var ( + retrievedKey *dto.SampleKey = &dto.SampleKey{} + ) + + err = proto.Unmarshal(i.Key(), retrievedKey) + if err != nil { + return + } + + if *retrievedKey.Fingerprint.Signature != *k.Fingerprint.Signature { + return + } + + if bytes.Equal(retrievedKey.Timestamp, k.Timestamp) { + return true, nil + } + + i.Prev() + if !i.Valid() { + return + } + + err = proto.Unmarshal(i.Key(), retrievedKey) + if err != nil { + return + } + + b = *retrievedKey.Fingerprint.Signature == *k.Fingerprint.Signature + + return +} + +func doesKeyHavePrecursor(k *dto.SampleKey, i iterator) (b bool, err error) { + byteKey, err := coding.NewProtocolBufferEncoder(k).Encode() + if err != nil { + return + } + + i.Seek(byteKey) + + if !i.Valid() { + i.SeekToFirst() + } + + var ( + retrievedKey *dto.SampleKey = &dto.SampleKey{} + ) + + err = proto.Unmarshal(i.Key(), retrievedKey) + if err != nil { + return + } + + signaturesEqual := *retrievedKey.Fingerprint.Signature == *k.Fingerprint.Signature + if !signaturesEqual { + return + } + + keyTime := indexable.DecodeTime(k.Timestamp) + retrievedTime := indexable.DecodeTime(retrievedKey.Timestamp) + + return retrievedTime.Before(keyTime), nil +} + +func doesKeyHaveSuccessor(k *dto.SampleKey, i iterator) (b bool, err error) { + byteKey, err := coding.NewProtocolBufferEncoder(k).Encode() + if err != nil { + return + } + + i.Seek(byteKey) + + if !i.Valid() { + i.SeekToLast() + } + + var ( + retrievedKey *dto.SampleKey = &dto.SampleKey{} + ) + + err = proto.Unmarshal(i.Key(), retrievedKey) + if err != nil { + return + } + + signaturesEqual := *retrievedKey.Fingerprint.Signature == *k.Fingerprint.Signature + if !signaturesEqual { + return + } + + keyTime := indexable.DecodeTime(k.Timestamp) + retrievedTime := indexable.DecodeTime(retrievedKey.Timestamp) + + return retrievedTime.After(keyTime), nil +} + +func (l *LevelDBMetricPersistence) GetValueAtTime(m *model.Metric, t *time.Time, s *metric.StalenessPolicy) (sample *model.Sample, err error) { + d := model.MetricToDTO(m) + + f, err := model.MessageToFingerprintDTO(d) + if err != nil { + return + } + + // Candidate for Refactoring + k := &dto.SampleKey{ + Fingerprint: f, + Timestamp: indexable.EncodeTime(*t), + } + + e, err := coding.NewProtocolBufferEncoder(k).Encode() + if err != nil { + return + } + + iterator, closer, err := l.metricSamples.GetIterator() + if err != nil { + return + } + defer closer.Close() + + iterator.Seek(e) + + var ( + firstKey *dto.SampleKey = &dto.SampleKey{} + firstValue *dto.SampleValue = nil + ) + + within, err := isKeyInsideRecordedInterval(k, iterator) + if err != nil || !within { + return + } + + for iterator = iterator; iterator.Valid(); iterator.Prev() { + err := proto.Unmarshal(iterator.Key(), firstKey) + if err != nil { + return nil, err + } + + if *firstKey.Fingerprint.Signature == *k.Fingerprint.Signature { + firstValue = &dto.SampleValue{} + err := proto.Unmarshal(iterator.Value(), firstValue) + if err != nil { + return nil, err + } + + if indexable.DecodeTime(firstKey.Timestamp).Equal(indexable.DecodeTime(k.Timestamp)) { + return model.SampleFromDTO(m, t, firstValue), nil + } + break + } + } + + var ( + secondKey *dto.SampleKey = &dto.SampleKey{} + secondValue *dto.SampleValue = nil + ) + + iterator.Next() + if !iterator.Valid() { + return + } + + err = proto.Unmarshal(iterator.Key(), secondKey) + if err != nil { + + return + } + + if *secondKey.Fingerprint.Signature == *k.Fingerprint.Signature { + secondValue = &dto.SampleValue{} + err = proto.Unmarshal(iterator.Value(), secondValue) + if err != nil { + return + } + } + + firstTime := indexable.DecodeTime(firstKey.Timestamp) + secondTime := indexable.DecodeTime(secondKey.Timestamp) + interpolated := interpolate(firstTime, secondTime, *firstValue.Value, *secondValue.Value, *t) + emission := &dto.SampleValue{ + Value: &interpolated, + } + + return model.SampleFromDTO(m, t, emission), nil } func (l *LevelDBMetricPersistence) GetRangeValues(m *model.Metric, i *model.Interval, s *metric.StalenessPolicy) (*model.SampleSet, error) { diff --git a/storage/metric/leveldb/rule_integration_test.go b/storage/metric/leveldb/rule_integration_test.go new file mode 100644 index 000000000..077374c36 --- /dev/null +++ b/storage/metric/leveldb/rule_integration_test.go @@ -0,0 +1,136 @@ +package leveldb + +import ( + "fmt" + "github.com/matttproud/prometheus/model" + "github.com/matttproud/prometheus/storage/metric" + "io/ioutil" + "os" + "testing" + "time" +) + +func TestGetValueAtTime(t *testing.T) { + temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") + + defer func() { + if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil { + t.Errorf("Could not remove temporary directory: %q\n", removeAllErr) + } + }() + + persistence, _ := NewLevelDBMetricPersistence(temporaryDirectory) + + defer func() { + persistence.Close() + }() + + m := model.Metric{ + "name": "age_in_years", + } + + appendErr := persistence.AppendSample(&model.Sample{ + Value: model.SampleValue(0), + Timestamp: time.Date(1984, 3, 30, 0, 0, 0, 0, time.UTC), + Metric: m, + }) + + if appendErr != nil { + t.Error(appendErr) + } + + p := &metric.StalenessPolicy{ + AllowStale: false, + } + + d := time.Date(1984, 3, 30, 0, 0, 0, 0, time.UTC) + s, sErr := persistence.GetValueAtTime(&m, &d, p) + + if sErr != nil { + t.Error(sErr) + } + + if s == nil { + t.Error("a sample should be returned") + } + + if s.Value != model.SampleValue(0) { + t.Error("an incorrect sample value was returned") + } + + if s.Timestamp != d { + t.Error("an incorrect timestamp for the sample was returned") + } + + d = time.Date(1985, 3, 30, 0, 0, 0, 0, time.UTC) + + s, sErr = persistence.GetValueAtTime(&m, &d, p) + + if sErr != nil { + t.Error(sErr) + } + + if s != nil { + t.Error("no sample should be returned") + } + + d = time.Date(1983, 3, 30, 0, 0, 0, 0, time.UTC) + + s, sErr = persistence.GetValueAtTime(&m, &d, p) + + if sErr != nil { + t.Error(sErr) + } + + if s != nil { + t.Error("no sample should be returned") + } + + appendErr = persistence.AppendSample(&model.Sample{ + Value: model.SampleValue(1), + Timestamp: time.Date(1985, 3, 30, 0, 0, 0, 0, time.UTC), + Metric: m, + }) + + if appendErr != nil { + t.Error(appendErr) + } + + d = time.Date(1985, 3, 30, 0, 0, 0, 0, time.UTC) + s, sErr = persistence.GetValueAtTime(&m, &d, p) + + if sErr != nil { + t.Error(sErr) + } + + if s == nil { + t.Error("a sample should be returned") + } + + if s.Value != model.SampleValue(1) { + t.Error("an incorrect sample value was returned") + } + + if s.Timestamp != d { + t.Error("an incorrect timestamp for the sample was returned") + } + + d = time.Date(1984, 9, 24, 0, 0, 0, 0, time.UTC) + s, sErr = persistence.GetValueAtTime(&m, &d, p) + + if sErr != nil { + t.Error(sErr) + } + + if s == nil { + t.Error("a sample should be returned") + } + + if fmt.Sprintf("%f", s.Value) != model.SampleValue(0.487671).String() { + t.Errorf("an incorrect sample value was returned: %s\n", s.Value) + } + + if s.Timestamp != d { + t.Error("an incorrect timestamp for the sample was returned") + } +} From 3ac5d48b1a373faa201f3fcd3143e9b436e97d78 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Tue, 25 Dec 2012 13:50:36 +0100 Subject: [PATCH 2/2] Impl' storage i'faces and fix non-idiomatic warts. This change includes implementation of most major storage layer features, albeit some imperfect. It also includes nascent telemetry bindings, too. --- Makefile.TRAVIS | 10 +- main.go | 19 +- retrieval/type.go | 119 ++ storage/metric/interface.go | 4 +- storage/metric/leveldb/diagnostics.go | 65 + storage/metric/leveldb/leveldb_test.go | 14 +- storage/metric/leveldb/mutable.go | 250 +-- storage/metric/leveldb/reading.go | 491 +++-- .../metric/leveldb/rule_integration_test.go | 1654 ++++++++++++++++- storage/raw/index/leveldb/leveldb.go | 18 +- storage/raw/leveldb/leveldb.go | 123 +- utility/test/interface.go | 21 + 12 files changed, 2304 insertions(+), 484 deletions(-) create mode 100644 retrieval/type.go create mode 100644 utility/test/interface.go diff --git a/Makefile.TRAVIS b/Makefile.TRAVIS index 5fc39c207..eec04c567 100644 --- a/Makefile.TRAVIS +++ b/Makefile.TRAVIS @@ -40,7 +40,7 @@ preparation-stamp: build-dependencies build-dependencies: build-dependencies-stamp -build-dependencies-stamp: bison cc mercurial protoc goprotobuf go leveldb levigo +build-dependencies-stamp: bison cc mercurial protoc goprotobuf go instrumentation leveldb levigo touch $@ overlay: overlay-stamp @@ -100,6 +100,12 @@ goprotobuf-stamp: go protoc source $(GO_GET) code.google.com/p/goprotobuf/protoc-gen-go touch $@ +instrumentation: instrumentation-stamp + +instrumentation-stamp: go source + $(GO_GET) github.com/matttproud/golang_instrumentation + touch $@ + leveldb: leveldb-stamp leveldb-stamp: cc rsync leveldb-$(LEVELDB_VERSION).tar.gz overlay @@ -148,4 +154,4 @@ clean: -rm protobuf-$(PROTOCOL_BUFFERS_VERSION).tar.bz2 -.PHONY: all bison build-dependencies cc clean go goprotobuf leveldb levigo mercurial overlay preparation protoc rsync source test wget +.PHONY: all bison build-dependencies cc clean go goprotobuf instrumentation leveldb levigo mercurial overlay preparation protoc rsync source test wget diff --git a/main.go b/main.go index cf7c36fc1..17440933c 100644 --- a/main.go +++ b/main.go @@ -14,6 +14,8 @@ package main import ( + "fmt" + "github.com/matttproud/prometheus/retrieval" "github.com/matttproud/prometheus/storage/metric/leveldb" "log" "os" @@ -30,6 +32,21 @@ func main() { m.Close() }() - for { + t := &retrieval.Target{ + Address: "http://localhost:8080/metrics.json", + } + + for i := 0; i < 100000; i++ { + c, err := t.Scrape() + if err != nil { + fmt.Println(err) + continue + } + + for _, s := range c { + m.AppendSample(&s) + } + + fmt.Printf("Finished %d\n", i) } } diff --git a/retrieval/type.go b/retrieval/type.go new file mode 100644 index 000000000..7dd516556 --- /dev/null +++ b/retrieval/type.go @@ -0,0 +1,119 @@ +package retrieval + +import ( + "encoding/json" + "github.com/matttproud/prometheus/model" + "io/ioutil" + "net/http" + "strconv" + "time" +) + +type TargetState int + +const ( + UNKNOWN TargetState = iota + ALIVE + UNREACHABLE +) + +type Target struct { + State TargetState + Address string + Staleness time.Duration + Frequency time.Duration +} + +func (t *Target) Scrape() (samples []model.Sample, err error) { + defer func() { + if err != nil { + t.State = ALIVE + } + }() + + ti := time.Now() + resp, err := http.Get(t.Address) + if err != nil { + return + } + + defer resp.Body.Close() + + raw, err := ioutil.ReadAll(resp.Body) + if err != nil { + return + } + + intermediate := make(map[string]interface{}) + err = json.Unmarshal(raw, &intermediate) + if err != nil { + return + } + + baseLabels := map[string]string{"instance": t.Address} + + for name, v := range intermediate { + asMap, ok := v.(map[string]interface{}) + + if !ok { + continue + } + + switch asMap["type"] { + case "counter": + m := model.Metric{} + m["name"] = model.LabelValue(name) + asFloat, ok := asMap["value"].(float64) + if !ok { + continue + } + + s := model.Sample{ + Metric: m, + Value: model.SampleValue(asFloat), + Timestamp: ti, + } + + for baseK, baseV := range baseLabels { + m[model.LabelName(baseK)] = model.LabelValue(baseV) + } + + samples = append(samples, s) + case "histogram": + values, ok := asMap["value"].(map[string]interface{}) + if !ok { + continue + } + + for p, pValue := range values { + asString, ok := pValue.(string) + if !ok { + continue + } + + float, err := strconv.ParseFloat(asString, 64) + if err != nil { + continue + } + + m := model.Metric{} + m["name"] = model.LabelValue(name) + m["percentile"] = model.LabelValue(p) + + s := model.Sample{ + Metric: m, + Value: model.SampleValue(float), + Timestamp: ti, + } + + for baseK, baseV := range baseLabels { + m[model.LabelName(baseK)] = model.LabelValue(baseV) + } + + samples = append(samples, s) + } + } + } + + return +} diff --git a/storage/metric/interface.go b/storage/metric/interface.go index 55685c686..c4a230e7c 100644 --- a/storage/metric/interface.go +++ b/storage/metric/interface.go @@ -19,8 +19,7 @@ import ( ) type StalenessPolicy struct { - AllowStale bool - MaximumStaleness time.Duration + DeltaAllowance time.Duration } // MetricPersistence is a system for storing metric samples in a persistence @@ -43,7 +42,6 @@ type MetricPersistence interface { GetMetricForFingerprint(f *model.Fingerprint) (*model.Metric, error) - GetFirstValue(*model.Metric) (*model.Sample, error) GetValueAtTime(*model.Metric, *time.Time, *StalenessPolicy) (*model.Sample, error) GetBoundaryValues(*model.Metric, *model.Interval, *StalenessPolicy) (*model.Sample, *model.Sample, error) GetRangeValues(*model.Metric, *model.Interval, *StalenessPolicy) (*model.SampleSet, error) diff --git a/storage/metric/leveldb/diagnostics.go b/storage/metric/leveldb/diagnostics.go index bf8112d1c..3c38f1810 100644 --- a/storage/metric/leveldb/diagnostics.go +++ b/storage/metric/leveldb/diagnostics.go @@ -17,9 +17,11 @@ import ( "code.google.com/p/goprotobuf/proto" "errors" "github.com/matttproud/prometheus/coding" + "github.com/matttproud/prometheus/coding/indexable" "github.com/matttproud/prometheus/model" dto "github.com/matttproud/prometheus/model/generated" "github.com/matttproud/prometheus/utility" + "log" ) func (l *LevelDBMetricPersistence) GetAllLabelNames() ([]string, error) { @@ -115,3 +117,66 @@ func (l *LevelDBMetricPersistence) GetAllMetrics() ([]model.LabelSet, error) { return nil, errors.New("Unknown error encountered when querying metrics.") } + +func (l *LevelDBMetricPersistence) GetSamplesForMetric(metric model.Metric, interval model.Interval) ([]model.Samples, error) { + metricDTO := model.MetricToDTO(&metric) + + if fingerprintDTO, fingerprintDTOErr := model.MessageToFingerprintDTO(metricDTO); fingerprintDTOErr == nil { + if iterator, closer, iteratorErr := l.metricSamples.GetIterator(); iteratorErr == nil { + defer closer.Close() + + start := &dto.SampleKey{ + Fingerprint: fingerprintDTO, + Timestamp: indexable.EncodeTime(interval.OldestInclusive), + } + + emission := make([]model.Samples, 0) + + if encode, encodeErr := coding.NewProtocolBufferEncoder(start).Encode(); encodeErr == nil { + iterator.Seek(encode) + + predicate := keyIsAtMostOld(interval.NewestInclusive) + + for iterator = iterator; iterator.Valid(); iterator.Next() { + key := &dto.SampleKey{} + value := &dto.SampleValue{} + if keyUnmarshalErr := proto.Unmarshal(iterator.Key(), key); keyUnmarshalErr == nil { + if valueUnmarshalErr := proto.Unmarshal(iterator.Value(), value); valueUnmarshalErr == nil { + if fingerprintsEqual(fingerprintDTO, key.Fingerprint) { + // Wart + if predicate(key) { + emission = append(emission, model.Samples{ + Value: model.SampleValue(*value.Value), + Timestamp: indexable.DecodeTime(key.Timestamp), + }) + } else { + break + } + } else { + break + } + } else { + return nil, valueUnmarshalErr + } + } else { + return nil, keyUnmarshalErr + } + } + + return emission, nil + + } else { + log.Printf("Could not encode the start key: %q\n", encodeErr) + return nil, encodeErr + } + } else { + log.Printf("Could not acquire iterator: %q\n", iteratorErr) + return nil, iteratorErr + } + } else { + log.Printf("Could not create fingerprint for the metric: %q\n", fingerprintDTOErr) + return nil, fingerprintDTOErr + } + + panic("unreachable") +} diff --git a/storage/metric/leveldb/leveldb_test.go b/storage/metric/leveldb/leveldb_test.go index f8c4a10d8..f7804cf69 100644 --- a/storage/metric/leveldb/leveldb_test.go +++ b/storage/metric/leveldb/leveldb_test.go @@ -18,6 +18,7 @@ import ( "fmt" "github.com/matttproud/prometheus/model" dto "github.com/matttproud/prometheus/model/generated" + "github.com/matttproud/prometheus/utility/test" "io/ioutil" "math" "math/rand" @@ -31,12 +32,7 @@ const ( stochasticMaximumVariance = 64 ) -type tester interface { - Errorf(format string, args ...interface{}) - Error(args ...interface{}) -} - -var testBasicLifecycle func(t tester) = func(t tester) { +var testBasicLifecycle func(t test.Tester) = func(t test.Tester) { temporaryDirectory, temporaryDirectoryErr := ioutil.TempDir("", "leveldb_metric_persistence_test") if temporaryDirectoryErr != nil { @@ -78,7 +74,7 @@ func BenchmarkBasicLifecycle(b *testing.B) { } } -var testReadEmpty func(t tester) = func(t tester) { +var testReadEmpty func(t test.Tester) = func(t test.Tester) { temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") defer func() { @@ -195,7 +191,7 @@ func BenchmarkReadEmpty(b *testing.B) { } } -var testAppendSampleAsPureSparseAppend = func(t tester) { +var testAppendSampleAsPureSparseAppend = func(t test.Tester) { temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") defer func() { @@ -241,7 +237,7 @@ func BenchmarkAppendSampleAsPureSparseAppend(b *testing.B) { } } -var testAppendSampleAsSparseAppendWithReads func(t tester) = func(t tester) { +var testAppendSampleAsSparseAppendWithReads func(t test.Tester) = func(t test.Tester) { temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") defer func() { diff --git a/storage/metric/leveldb/mutable.go b/storage/metric/leveldb/mutable.go index bcff51efa..62e75f981 100644 --- a/storage/metric/leveldb/mutable.go +++ b/storage/metric/leveldb/mutable.go @@ -15,14 +15,24 @@ package leveldb import ( "code.google.com/p/goprotobuf/proto" - "errors" + registry "github.com/matttproud/golang_instrumentation" + "github.com/matttproud/golang_instrumentation/metrics" "github.com/matttproud/prometheus/coding" "github.com/matttproud/prometheus/coding/indexable" "github.com/matttproud/prometheus/model" dto "github.com/matttproud/prometheus/model/generated" - "log" ) +var ( + appendSuccessCount = &metrics.CounterMetric{} + appendFailureCount = &metrics.CounterMetric{} +) + +func init() { + registry.Register("sample_append_success_count_total", appendSuccessCount) + registry.Register("sample_append_failure_count_total", appendFailureCount) +} + func (l *LevelDBMetricPersistence) setLabelPairFingerprints(labelPair *dto.LabelPair, fingerprints *dto.FingerprintCollection) error { labelPairEncoded := coding.NewProtocolBufferEncoder(labelPair) fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints) @@ -35,146 +45,148 @@ func (l *LevelDBMetricPersistence) setLabelNameFingerprints(labelName *dto.Label return l.labelNameToFingerprints.Put(labelNameEncoded, fingerprintsEncoded) } -func (l *LevelDBMetricPersistence) appendLabelPairFingerprint(labelPair *dto.LabelPair, fingerprint *dto.Fingerprint) error { - if has, hasError := l.HasLabelPair(labelPair); hasError == nil { - var fingerprints *dto.FingerprintCollection - if has { - if existing, existingError := l.getFingerprintsForLabelSet(labelPair); existingError == nil { - fingerprints = existing - } else { - return existingError - } - } else { - fingerprints = &dto.FingerprintCollection{} - } - - fingerprints.Member = append(fingerprints.Member, fingerprint) - - return l.setLabelPairFingerprints(labelPair, fingerprints) - } else { - return hasError +func (l *LevelDBMetricPersistence) appendLabelPairFingerprint(labelPair *dto.LabelPair, fingerprint *dto.Fingerprint) (err error) { + has, err := l.HasLabelPair(labelPair) + if err != nil { + return } - return errors.New("Unknown error when appending fingerprint to label name and value pair.") + var fingerprints *dto.FingerprintCollection + if has { + fingerprints, err = l.getFingerprintsForLabelSet(labelPair) + if err != nil { + return + } + } else { + fingerprints = &dto.FingerprintCollection{} + } + + fingerprints.Member = append(fingerprints.Member, fingerprint) + + return l.setLabelPairFingerprints(labelPair, fingerprints) } -func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *dto.LabelPair, fingerprint *dto.Fingerprint) error { +func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *dto.LabelPair, fingerprint *dto.Fingerprint) (err error) { labelName := &dto.LabelName{ Name: labelPair.Name, } - if has, hasError := l.HasLabelName(labelName); hasError == nil { - var fingerprints *dto.FingerprintCollection - if has { - if existing, existingError := l.GetLabelNameFingerprints(labelName); existingError == nil { - fingerprints = existing - } else { - return existingError - } - } else { - fingerprints = &dto.FingerprintCollection{} - } - - fingerprints.Member = append(fingerprints.Member, fingerprint) - - return l.setLabelNameFingerprints(labelName, fingerprints) - } else { - return hasError + has, err := l.HasLabelName(labelName) + if err != nil { + return } - return errors.New("Unknown error when appending fingerprint to label name and value pair.") -} - -func (l *LevelDBMetricPersistence) appendFingerprints(m *dto.Metric) error { - if fingerprintDTO, fingerprintDTOError := model.MessageToFingerprintDTO(m); fingerprintDTOError == nil { - fingerprintKey := coding.NewProtocolBufferEncoder(fingerprintDTO) - metricDTOEncoder := coding.NewProtocolBufferEncoder(m) - - if putError := l.fingerprintToMetrics.Put(fingerprintKey, metricDTOEncoder); putError == nil { - labelCount := len(m.LabelPair) - labelPairErrors := make(chan error, labelCount) - labelNameErrors := make(chan error, labelCount) - - for _, labelPair := range m.LabelPair { - go func(labelPair *dto.LabelPair) { - labelNameErrors <- l.appendLabelNameFingerprint(labelPair, fingerprintDTO) - }(labelPair) - - go func(labelPair *dto.LabelPair) { - labelPairErrors <- l.appendLabelPairFingerprint(labelPair, fingerprintDTO) - }(labelPair) - } - - for i := 0; i < cap(labelPairErrors); i++ { - appendError := <-labelPairErrors - - if appendError != nil { - return appendError - } - } - - for i := 0; i < cap(labelNameErrors); i++ { - appendError := <-labelNameErrors - - if appendError != nil { - return appendError - } - } - - return nil - - } else { - return putError + var fingerprints *dto.FingerprintCollection + if has { + fingerprints, err = l.GetLabelNameFingerprints(labelName) + if err != nil { + return } } else { - return fingerprintDTOError + fingerprints = &dto.FingerprintCollection{} } - return errors.New("Unknown error in appending label pairs to fingerprint.") + fingerprints.Member = append(fingerprints.Member, fingerprint) + + return l.setLabelNameFingerprints(labelName, fingerprints) } -func (l *LevelDBMetricPersistence) AppendSample(sample *model.Sample) error { +func (l *LevelDBMetricPersistence) appendFingerprints(m *dto.Metric) (err error) { + fingerprintDTO, err := model.MessageToFingerprintDTO(m) + if err != nil { + return + } + + fingerprintKey := coding.NewProtocolBufferEncoder(fingerprintDTO) + metricDTOEncoder := coding.NewProtocolBufferEncoder(m) + + err = l.fingerprintToMetrics.Put(fingerprintKey, metricDTOEncoder) + if err != nil { + return + } + + labelCount := len(m.LabelPair) + labelPairErrors := make(chan error, labelCount) + labelNameErrors := make(chan error, labelCount) + + for _, labelPair := range m.LabelPair { + go func(labelPair *dto.LabelPair) { + labelNameErrors <- l.appendLabelNameFingerprint(labelPair, fingerprintDTO) + }(labelPair) + + go func(labelPair *dto.LabelPair) { + labelPairErrors <- l.appendLabelPairFingerprint(labelPair, fingerprintDTO) + }(labelPair) + } + + for i := 0; i < cap(labelPairErrors); i++ { + err = <-labelPairErrors + + if err != nil { + return + } + } + + for i := 0; i < cap(labelNameErrors); i++ { + err = <-labelNameErrors + + if err != nil { + return + } + } + + return +} + +func (l *LevelDBMetricPersistence) AppendSample(sample *model.Sample) (err error) { + defer func() { + var m *metrics.CounterMetric = appendSuccessCount + + if err != nil { + m = appendFailureCount + } + + m.Increment() + }() + metricDTO := model.SampleToMetricDTO(sample) - if indexHas, indexHasError := l.hasIndexMetric(metricDTO); indexHasError == nil { - if !indexHas { - if indexPutError := l.indexMetric(metricDTO); indexPutError == nil { - if appendError := l.appendFingerprints(metricDTO); appendError != nil { - log.Printf("Could not set metric fingerprint to label pairs mapping: %q\n", appendError) - return appendError - } - } else { - log.Printf("Could not add metric to membership index: %q\n", indexPutError) - return indexPutError - } - } - } else { - log.Printf("Could not query membership index for metric: %q\n", indexHasError) - return indexHasError + indexHas, err := l.hasIndexMetric(metricDTO) + if err != nil { + return } - if fingerprintDTO, fingerprintDTOErr := model.MessageToFingerprintDTO(metricDTO); fingerprintDTOErr == nil { - - sampleKeyDTO := &dto.SampleKey{ - Fingerprint: fingerprintDTO, - Timestamp: indexable.EncodeTime(sample.Timestamp), + if !indexHas { + err = l.indexMetric(metricDTO) + if err != nil { + return } - sampleValueDTO := &dto.SampleValue{ - Value: proto.Float32(float32(sample.Value)), + err = l.appendFingerprints(metricDTO) + if err != nil { + return } - - sampleKeyEncoded := coding.NewProtocolBufferEncoder(sampleKeyDTO) - sampleValueEncoded := coding.NewProtocolBufferEncoder(sampleValueDTO) - - if putError := l.metricSamples.Put(sampleKeyEncoded, sampleValueEncoded); putError != nil { - log.Printf("Could not append metric sample: %q\n", putError) - return putError - } - } else { - log.Printf("Could not encode metric fingerprint: %q\n", fingerprintDTOErr) - return fingerprintDTOErr } - return nil + fingerprintDTO, err := model.MessageToFingerprintDTO(metricDTO) + if err != nil { + return + } + + sampleKeyDTO := &dto.SampleKey{ + Fingerprint: fingerprintDTO, + Timestamp: indexable.EncodeTime(sample.Timestamp), + } + sampleValueDTO := &dto.SampleValue{ + Value: proto.Float32(float32(sample.Value)), + } + sampleKeyEncoded := coding.NewProtocolBufferEncoder(sampleKeyDTO) + sampleValueEncoded := coding.NewProtocolBufferEncoder(sampleValueDTO) + + err = l.metricSamples.Put(sampleKeyEncoded, sampleValueEncoded) + if err != nil { + return + } + + return } diff --git a/storage/metric/leveldb/reading.go b/storage/metric/leveldb/reading.go index eeb8eee60..08aecb0f3 100644 --- a/storage/metric/leveldb/reading.go +++ b/storage/metric/leveldb/reading.go @@ -16,16 +16,95 @@ package leveldb import ( "bytes" "code.google.com/p/goprotobuf/proto" - "errors" + registry "github.com/matttproud/golang_instrumentation" + "github.com/matttproud/golang_instrumentation/metrics" "github.com/matttproud/prometheus/coding" "github.com/matttproud/prometheus/coding/indexable" "github.com/matttproud/prometheus/model" dto "github.com/matttproud/prometheus/model/generated" "github.com/matttproud/prometheus/storage/metric" - "log" "time" ) +var ( + getLabelNameFingerprintsSuccessCount = &metrics.CounterMetric{} + getLabelNameFingerprintsFailureCount = &metrics.CounterMetric{} + getFingerprintsForLabelSetSuccessCount = &metrics.CounterMetric{} + getFingerprintsForLabelSetFailureCount = &metrics.CounterMetric{} + getFingerprintsForLabelNameSuccessCount = &metrics.CounterMetric{} + getFingerprintsForLabelNameFailureCount = &metrics.CounterMetric{} + getMetricForFingerprintSuccessCount = &metrics.CounterMetric{} + getMetricForFingerprintFailureCount = &metrics.CounterMetric{} + getBoundaryValuesSuccessCount = &metrics.CounterMetric{} + getBoundaryValuesFailureCount = &metrics.CounterMetric{} +) + +func init() { + registry.Register("get_label_name_fingerprints_success_count_total", getLabelNameFingerprintsSuccessCount) + registry.Register("get_label_name_fingerprints_failure_count_total", getLabelNameFingerprintsFailureCount) + + registry.Register("get_fingerprints_for_label_set_success_count_total", getFingerprintsForLabelSetSuccessCount) + registry.Register("get_fingerprints_for_label_set_failure_count_total", getFingerprintsForLabelSetFailureCount) + registry.Register("get_fingerprints_for_label_name_success_count_total", getFingerprintsForLabelNameSuccessCount) + registry.Register("get_fingerprints_for_label_name_failure_count_total", getFingerprintsForLabelNameFailureCount) + registry.Register("get_metric_for_fingerprint_success_count_total", getMetricForFingerprintSuccessCount) + registry.Register("get_metric_for_fingerprint_failure_count_total", getMetricForFingerprintFailureCount) + registry.Register("get_boundary_values_success_count_total", getBoundaryValuesSuccessCount) + registry.Register("get_boundary_values_failure_count_total", getBoundaryValuesFailureCount) +} + +func extractSampleKey(i iterator) (k *dto.SampleKey, err error) { + k = &dto.SampleKey{} + err = proto.Unmarshal(i.Key(), k) + + return +} + +func extractSampleValue(i iterator) (v *dto.SampleValue, err error) { + v = &dto.SampleValue{} + err = proto.Unmarshal(i.Value(), v) + + return +} + +func fingerprintsEqual(l *dto.Fingerprint, r *dto.Fingerprint) bool { + if l == r { + return true + } + + if l == nil && r == nil { + return true + } + + if r.Signature == l.Signature { + return true + } + + if *r.Signature == *l.Signature { + return true + } + + return false +} + +type sampleKeyPredicate func(k *dto.SampleKey) bool + +func keyIsOlderThan(t time.Time) sampleKeyPredicate { + unix := t.Unix() + + return func(k *dto.SampleKey) bool { + return indexable.DecodeTime(k.Timestamp).Unix() > unix + } +} + +func keyIsAtMostOld(t time.Time) sampleKeyPredicate { + unix := t.Unix() + + return func(k *dto.SampleKey) bool { + return indexable.DecodeTime(k.Timestamp).Unix() <= unix + } +} + func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (bool, error) { dtoKey := coding.NewProtocolBufferEncoder(dto) return l.metricMembershipIndex.Has(dtoKey) @@ -46,168 +125,162 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (bool, error return l.labelNameToFingerprints.Has(dtoKey) } -func (l *LevelDBMetricPersistence) getFingerprintsForLabelSet(p *dto.LabelPair) (*dto.FingerprintCollection, error) { +func (l *LevelDBMetricPersistence) getFingerprintsForLabelSet(p *dto.LabelPair) (c *dto.FingerprintCollection, err error) { dtoKey := coding.NewProtocolBufferEncoder(p) - if get, getError := l.labelSetToFingerprints.Get(dtoKey); getError == nil { - value := &dto.FingerprintCollection{} - if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil { - return value, nil - } else { - return nil, unmarshalError - } - } else { - return nil, getError + get, err := l.labelSetToFingerprints.Get(dtoKey) + if err != nil { + return } - panic("unreachable") + c = &dto.FingerprintCollection{} + err = proto.Unmarshal(get, c) + + return } -func (l *LevelDBMetricPersistence) GetLabelNameFingerprints(n *dto.LabelName) (*dto.FingerprintCollection, error) { +// XXX: Delete me and replace with GetFingerprintsForLabelName. +func (l *LevelDBMetricPersistence) GetLabelNameFingerprints(n *dto.LabelName) (c *dto.FingerprintCollection, err error) { + defer func() { + m := getLabelNameFingerprintsSuccessCount + if err != nil { + m = getLabelNameFingerprintsFailureCount + } + + m.Increment() + }() + dtoKey := coding.NewProtocolBufferEncoder(n) - if get, getError := l.labelNameToFingerprints.Get(dtoKey); getError == nil { - value := &dto.FingerprintCollection{} - if unmarshalError := proto.Unmarshal(get, value); unmarshalError == nil { - return value, nil - } else { - return nil, unmarshalError - } - } else { - return nil, getError + get, err := l.labelNameToFingerprints.Get(dtoKey) + if err != nil { + return } - return nil, errors.New("Unknown error while getting label name fingerprints.") + c = &dto.FingerprintCollection{} + err = proto.Unmarshal(get, c) + + return } -func (l *LevelDBMetricPersistence) GetSamplesForMetric(metric model.Metric, interval model.Interval) ([]model.Samples, error) { - metricDTO := model.MetricToDTO(&metric) - - if fingerprintDTO, fingerprintDTOErr := model.MessageToFingerprintDTO(metricDTO); fingerprintDTOErr == nil { - if iterator, closer, iteratorErr := l.metricSamples.GetIterator(); iteratorErr == nil { - defer closer.Close() - - start := &dto.SampleKey{ - Fingerprint: fingerprintDTO, - Timestamp: indexable.EncodeTime(interval.OldestInclusive), - } - - emission := make([]model.Samples, 0) - - if encode, encodeErr := coding.NewProtocolBufferEncoder(start).Encode(); encodeErr == nil { - iterator.Seek(encode) - - for iterator = iterator; iterator.Valid(); iterator.Next() { - key := &dto.SampleKey{} - value := &dto.SampleValue{} - if keyUnmarshalErr := proto.Unmarshal(iterator.Key(), key); keyUnmarshalErr == nil { - if valueUnmarshalErr := proto.Unmarshal(iterator.Value(), value); valueUnmarshalErr == nil { - if *fingerprintDTO.Signature == *key.Fingerprint.Signature { - // Wart - if indexable.DecodeTime(key.Timestamp).Unix() <= interval.NewestInclusive.Unix() { - emission = append(emission, model.Samples{ - Value: model.SampleValue(*value.Value), - Timestamp: indexable.DecodeTime(key.Timestamp), - }) - } else { - break - } - } else { - break - } - } else { - return nil, valueUnmarshalErr - } - } else { - return nil, keyUnmarshalErr - } - } - - return emission, nil - - } else { - log.Printf("Could not encode the start key: %q\n", encodeErr) - return nil, encodeErr - } - } else { - log.Printf("Could not acquire iterator: %q\n", iteratorErr) - return nil, iteratorErr +func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet *model.LabelSet) (fps []*model.Fingerprint, err error) { + defer func() { + m := getFingerprintsForLabelSetSuccessCount + if err != nil { + m = getFingerprintsForLabelSetFailureCount } - } else { - log.Printf("Could not create fingerprint for the metric: %q\n", fingerprintDTOErr) - return nil, fingerprintDTOErr - } - panic("unreachable") -} + m.Increment() + }() -func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet *model.LabelSet) ([]*model.Fingerprint, error) { - emission := make([]*model.Fingerprint, 0, 0) + fps = make([]*model.Fingerprint, 0, 0) for _, labelSetDTO := range model.LabelSetToDTOs(labelSet) { - if f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBufferEncoder(labelSetDTO)); err == nil { - unmarshaled := &dto.FingerprintCollection{} - if unmarshalErr := proto.Unmarshal(f, unmarshaled); unmarshalErr == nil { - for _, m := range unmarshaled.Member { - fp := model.Fingerprint(*m.Signature) - emission = append(emission, &fp) - } - } else { - return nil, err - } - } else { - return nil, err + f, err := l.labelSetToFingerprints.Get(coding.NewProtocolBufferEncoder(labelSetDTO)) + if err != nil { + return fps, err } - } - - return emission, nil -} - -func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName *model.LabelName) ([]*model.Fingerprint, error) { - emission := make([]*model.Fingerprint, 0, 0) - - if raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBufferEncoder(model.LabelNameToDTO(labelName))); err == nil { unmarshaled := &dto.FingerprintCollection{} - - if err = proto.Unmarshal(raw, unmarshaled); err == nil { - for _, m := range unmarshaled.Member { - fp := model.Fingerprint(*m.Signature) - emission = append(emission, &fp) - } - } else { - return nil, err + err = proto.Unmarshal(f, unmarshaled) + if err != nil { + return fps, err + } + + for _, m := range unmarshaled.Member { + fp := model.Fingerprint(*m.Signature) + fps = append(fps, &fp) } - } else { - return nil, err } - return emission, nil + return } -func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint) (*model.Metric, error) { - if raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBufferEncoder(model.FingerprintToDTO(f))); err == nil { - unmarshaled := &dto.Metric{} - if unmarshalErr := proto.Unmarshal(raw, unmarshaled); unmarshalErr == nil { - m := model.Metric{} - for _, v := range unmarshaled.LabelPair { - m[model.LabelName(*v.Name)] = model.LabelValue(*v.Value) - } - return &m, nil - } else { - return nil, unmarshalErr +func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName *model.LabelName) (fps []*model.Fingerprint, err error) { + defer func() { + m := getFingerprintsForLabelNameSuccessCount + if err != nil { + m = getFingerprintsForLabelNameFailureCount } - } else { - return nil, err + + m.Increment() + }() + + fps = make([]*model.Fingerprint, 0, 0) + + raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBufferEncoder(model.LabelNameToDTO(labelName))) + if err != nil { + return } - panic("unreachable") + unmarshaled := &dto.FingerprintCollection{} + + err = proto.Unmarshal(raw, unmarshaled) + if err != nil { + return + } + + for _, m := range unmarshaled.Member { + fp := model.Fingerprint(*m.Signature) + fps = append(fps, &fp) + } + + return } -func (l *LevelDBMetricPersistence) GetBoundaryValues(m *model.Metric, i *model.Interval, s *metric.StalenessPolicy) (*model.Sample, *model.Sample, error) { - panic("not implemented") +func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint) (m *model.Metric, err error) { + defer func() { + m := getMetricForFingerprintSuccessCount + if err != nil { + m = getMetricForFingerprintFailureCount + } + + m.Increment() + }() + + raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBufferEncoder(model.FingerprintToDTO(f))) + if err != nil { + return + } + + unmarshaled := &dto.Metric{} + err = proto.Unmarshal(raw, unmarshaled) + if err != nil { + return + } + + m = &model.Metric{} + for _, v := range unmarshaled.LabelPair { + (*m)[model.LabelName(*v.Name)] = model.LabelValue(*v.Value) + } + + return } -func (l *LevelDBMetricPersistence) GetFirstValue(m *model.Metric) (*model.Sample, error) { - panic("not implemented") +func (l *LevelDBMetricPersistence) GetBoundaryValues(m *model.Metric, i *model.Interval, s *metric.StalenessPolicy) (open *model.Sample, end *model.Sample, err error) { + defer func() { + m := getBoundaryValuesSuccessCount + if err != nil { + m = getBoundaryValuesFailureCount + } + + m.Increment() + }() + + // XXX: Maybe we will want to emit incomplete sets? + open, err = l.GetValueAtTime(m, &i.OldestInclusive, s) + if err != nil { + return + } else if open == nil { + return + } + + end, err = l.GetValueAtTime(m, &i.NewestInclusive, s) + if err != nil { + return + } else if end == nil { + open = nil + } + + return } func interpolate(x1, x2 time.Time, y1, y2 float32, e time.Time) float32 { @@ -244,15 +317,15 @@ func isKeyInsideRecordedInterval(k *dto.SampleKey, i iterator) (b bool, err erro } var ( - retrievedKey *dto.SampleKey = &dto.SampleKey{} + retrievedKey *dto.SampleKey ) - err = proto.Unmarshal(i.Key(), retrievedKey) + retrievedKey, err = extractSampleKey(i) if err != nil { return } - if *retrievedKey.Fingerprint.Signature != *k.Fingerprint.Signature { + if !fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) { return } @@ -265,12 +338,12 @@ func isKeyInsideRecordedInterval(k *dto.SampleKey, i iterator) (b bool, err erro return } - err = proto.Unmarshal(i.Key(), retrievedKey) + retrievedKey, err = extractSampleKey(i) if err != nil { return } - b = *retrievedKey.Fingerprint.Signature == *k.Fingerprint.Signature + b = fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) return } @@ -288,16 +361,15 @@ func doesKeyHavePrecursor(k *dto.SampleKey, i iterator) (b bool, err error) { } var ( - retrievedKey *dto.SampleKey = &dto.SampleKey{} + retrievedKey *dto.SampleKey ) - err = proto.Unmarshal(i.Key(), retrievedKey) + retrievedKey, err = extractSampleKey(i) if err != nil { return } - signaturesEqual := *retrievedKey.Fingerprint.Signature == *k.Fingerprint.Signature - if !signaturesEqual { + if !fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) { return } @@ -320,16 +392,15 @@ func doesKeyHaveSuccessor(k *dto.SampleKey, i iterator) (b bool, err error) { } var ( - retrievedKey *dto.SampleKey = &dto.SampleKey{} + retrievedKey *dto.SampleKey ) - err = proto.Unmarshal(i.Key(), retrievedKey) + retrievedKey, err = extractSampleKey(i) if err != nil { return } - signaturesEqual := *retrievedKey.Fingerprint.Signature == *k.Fingerprint.Signature - if !signaturesEqual { + if !fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) { return } @@ -366,39 +437,41 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(m *model.Metric, t *time.Time, iterator.Seek(e) - var ( - firstKey *dto.SampleKey = &dto.SampleKey{} - firstValue *dto.SampleValue = nil - ) - within, err := isKeyInsideRecordedInterval(k, iterator) if err != nil || !within { return } - for iterator = iterator; iterator.Valid(); iterator.Prev() { - err := proto.Unmarshal(iterator.Key(), firstKey) + var ( + firstKey *dto.SampleKey + firstValue *dto.SampleValue + ) + + firstKey, err = extractSampleKey(iterator) + if err != nil { + return + } + + if fingerprintsEqual(firstKey.Fingerprint, k.Fingerprint) { + firstValue, err = extractSampleValue(iterator) + if err != nil { return nil, err } - if *firstKey.Fingerprint.Signature == *k.Fingerprint.Signature { - firstValue = &dto.SampleValue{} - err := proto.Unmarshal(iterator.Value(), firstValue) - if err != nil { - return nil, err - } + foundTimestamp := indexable.DecodeTime(firstKey.Timestamp) + targetTimestamp := indexable.DecodeTime(k.Timestamp) - if indexable.DecodeTime(firstKey.Timestamp).Equal(indexable.DecodeTime(k.Timestamp)) { - return model.SampleFromDTO(m, t, firstValue), nil - } - break + if foundTimestamp.Equal(targetTimestamp) { + return model.SampleFromDTO(m, t, firstValue), nil } + } else { + return } var ( - secondKey *dto.SampleKey = &dto.SampleKey{} - secondValue *dto.SampleValue = nil + secondKey *dto.SampleKey + secondValue *dto.SampleValue ) iterator.Next() @@ -406,30 +479,94 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(m *model.Metric, t *time.Time, return } - err = proto.Unmarshal(iterator.Key(), secondKey) + secondKey, err = extractSampleKey(iterator) if err != nil { - return } - if *secondKey.Fingerprint.Signature == *k.Fingerprint.Signature { - secondValue = &dto.SampleValue{} - err = proto.Unmarshal(iterator.Value(), secondValue) + if fingerprintsEqual(secondKey.Fingerprint, k.Fingerprint) { + secondValue, err = extractSampleValue(iterator) if err != nil { return } + } else { + return } firstTime := indexable.DecodeTime(firstKey.Timestamp) secondTime := indexable.DecodeTime(secondKey.Timestamp) - interpolated := interpolate(firstTime, secondTime, *firstValue.Value, *secondValue.Value, *t) - emission := &dto.SampleValue{ - Value: &interpolated, + currentDelta := secondTime.Sub(firstTime) + + if currentDelta <= s.DeltaAllowance { + interpolated := interpolate(firstTime, secondTime, *firstValue.Value, *secondValue.Value, *t) + emission := &dto.SampleValue{ + Value: &interpolated, + } + + return model.SampleFromDTO(m, t, emission), nil } - return model.SampleFromDTO(m, t, emission), nil + return } -func (l *LevelDBMetricPersistence) GetRangeValues(m *model.Metric, i *model.Interval, s *metric.StalenessPolicy) (*model.SampleSet, error) { - panic("not implemented") +func (l *LevelDBMetricPersistence) GetRangeValues(m *model.Metric, i *model.Interval, s *metric.StalenessPolicy) (v *model.SampleSet, err error) { + d := model.MetricToDTO(m) + + f, err := model.MessageToFingerprintDTO(d) + if err != nil { + return + } + + k := &dto.SampleKey{ + Fingerprint: f, + Timestamp: indexable.EncodeTime(i.OldestInclusive), + } + + e, err := coding.NewProtocolBufferEncoder(k).Encode() + if err != nil { + return + } + + iterator, closer, err := l.metricSamples.GetIterator() + if err != nil { + return + } + defer closer.Close() + + iterator.Seek(e) + + predicate := keyIsOlderThan(i.NewestInclusive) + + for ; iterator.Valid(); iterator.Next() { + retrievedKey := &dto.SampleKey{} + + retrievedKey, err = extractSampleKey(iterator) + if err != nil { + return + } + + if predicate(retrievedKey) { + break + } + + if !fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) { + break + } + + retrievedValue, err := extractSampleValue(iterator) + if err != nil { + return nil, err + } + + if v == nil { + v = &model.SampleSet{} + } + + v.Values = append(v.Values, model.SamplePair{ + Value: model.SampleValue(*retrievedValue.Value), + Timestamp: indexable.DecodeTime(retrievedKey.Timestamp), + }) + } + + return } diff --git a/storage/metric/leveldb/rule_integration_test.go b/storage/metric/leveldb/rule_integration_test.go index 077374c36..9e6faed6c 100644 --- a/storage/metric/leveldb/rule_integration_test.go +++ b/storage/metric/leveldb/rule_integration_test.go @@ -1,136 +1,1584 @@ +// Copyright 2012 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package leveldb import ( - "fmt" "github.com/matttproud/prometheus/model" "github.com/matttproud/prometheus/storage/metric" + "github.com/matttproud/prometheus/utility/test" "io/ioutil" "os" "testing" "time" ) -func TestGetValueAtTime(t *testing.T) { - temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") - - defer func() { - if removeAllErr := os.RemoveAll(temporaryDirectory); removeAllErr != nil { - t.Errorf("Could not remove temporary directory: %q\n", removeAllErr) - } - }() - - persistence, _ := NewLevelDBMetricPersistence(temporaryDirectory) - - defer func() { - persistence.Close() - }() - - m := model.Metric{ - "name": "age_in_years", +var testGetValueAtTime = func(t test.Tester) { + type value struct { + year int + month time.Month + day int + hour int + value float32 } - appendErr := persistence.AppendSample(&model.Sample{ - Value: model.SampleValue(0), - Timestamp: time.Date(1984, 3, 30, 0, 0, 0, 0, time.UTC), - Metric: m, - }) - - if appendErr != nil { - t.Error(appendErr) + type input struct { + year int + month time.Month + day int + hour int + staleness time.Duration } - p := &metric.StalenessPolicy{ - AllowStale: false, + type output struct { + value model.SampleValue } - d := time.Date(1984, 3, 30, 0, 0, 0, 0, time.UTC) - s, sErr := persistence.GetValueAtTime(&m, &d, p) - - if sErr != nil { - t.Error(sErr) + type behavior struct { + name string + input input + output *output } - if s == nil { - t.Error("a sample should be returned") + var contexts = []struct { + name string + values []value + behaviors []behavior + }{ + { + name: "no values", + values: []value{}, + behaviors: []behavior{ + { + name: "random target", + input: input{ + year: 1984, + month: 3, + day: 30, + hour: 0, + staleness: time.Duration(0), + }, + }, + }, + }, + { + name: "singleton", + values: []value{ + { + year: 1984, + month: 3, + day: 30, + hour: 0, + value: 0, + }, + }, + behaviors: []behavior{ + { + name: "exact without staleness policy", + input: input{ + year: 1984, + month: 3, + day: 30, + hour: 0, + staleness: time.Duration(0), + }, + output: &output{ + value: 0, + }, + }, + { + name: "exact with staleness policy", + input: input{ + year: 1984, + month: 3, + day: 30, + hour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + output: &output{ + value: 0, + }, + }, + { + name: "before without staleness policy", + input: input{ + year: 1984, + month: 3, + day: 29, + hour: 0, + staleness: time.Duration(0), + }, + }, + { + name: "before with staleness policy", + input: input{ + year: 1984, + month: 3, + day: 29, + hour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + }, + { + name: "after without staleness policy", + input: input{ + year: 1984, + month: 3, + day: 31, + hour: 0, + staleness: time.Duration(0), + }, + }, + { + name: "after with staleness policy", + input: input{ + year: 1984, + month: 3, + day: 31, + hour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + }, + }, + }, + { + name: "double", + values: []value{ + { + year: 1984, + month: 3, + day: 30, + hour: 0, + value: 0, + }, + { + year: 1985, + month: 3, + day: 30, + hour: 0, + value: 1, + }, + }, + behaviors: []behavior{ + { + name: "exact first without staleness policy", + input: input{ + year: 1984, + month: 3, + day: 30, + hour: 0, + staleness: time.Duration(0), + }, + output: &output{ + value: 0, + }, + }, + { + name: "exact first with staleness policy", + input: input{ + year: 1984, + month: 3, + day: 30, + hour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + output: &output{ + value: 0, + }, + }, + { + name: "exact second without staleness policy", + input: input{ + year: 1985, + month: 3, + day: 30, + hour: 0, + staleness: time.Duration(0), + }, + output: &output{ + value: 1, + }, + }, + { + name: "exact second with staleness policy", + input: input{ + year: 1985, + month: 3, + day: 30, + hour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + output: &output{ + value: 1, + }, + }, + { + name: "before first without staleness policy", + input: input{ + year: 1983, + month: 9, + day: 29, + hour: 12, + staleness: time.Duration(0), + }, + }, + { + name: "before first with staleness policy", + input: input{ + year: 1983, + month: 9, + day: 29, + hour: 12, + staleness: time.Duration(365*24) * time.Hour, + }, + }, + { + name: "after second with staleness policy", + input: input{ + year: 1985, + month: 9, + day: 28, + hour: 12, + staleness: time.Duration(365*24) * time.Hour, + }, + }, + { + name: "middle without staleness policy", + input: input{ + year: 1984, + month: 9, + day: 28, + hour: 12, + staleness: time.Duration(0), + }, + }, + { + name: "middle with insufficient staleness policy", + input: input{ + year: 1984, + month: 9, + day: 28, + hour: 12, + staleness: time.Duration(364*24) * time.Hour, + }, + }, + { + name: "middle with sufficient staleness policy", + input: input{ + year: 1984, + month: 9, + day: 28, + hour: 12, + staleness: time.Duration(365*24) * time.Hour, + }, + output: &output{ + value: 0.5, + }, + }, + }, + }, + { + name: "triple", + values: []value{ + { + year: 1984, + month: 3, + day: 30, + hour: 0, + value: 0, + }, + { + year: 1985, + month: 3, + day: 30, + hour: 0, + value: 1, + }, + { + year: 1986, + month: 3, + day: 30, + hour: 0, + value: 2, + }, + }, + behaviors: []behavior{ + { + name: "exact first without staleness policy", + input: input{ + year: 1984, + month: 3, + day: 30, + hour: 0, + staleness: time.Duration(0), + }, + output: &output{ + value: 0, + }, + }, + { + name: "exact first with staleness policy", + input: input{ + year: 1984, + month: 3, + day: 30, + hour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + output: &output{ + value: 0, + }, + }, + { + name: "exact second without staleness policy", + input: input{ + year: 1985, + month: 3, + day: 30, + hour: 0, + staleness: time.Duration(0), + }, + output: &output{ + value: 1, + }, + }, + { + name: "exact second with staleness policy", + input: input{ + year: 1985, + month: 3, + day: 30, + hour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + output: &output{ + value: 1, + }, + }, + { + name: "exact third without staleness policy", + input: input{ + year: 1986, + month: 3, + day: 30, + hour: 0, + staleness: time.Duration(0), + }, + output: &output{ + value: 2, + }, + }, + { + name: "exact third with staleness policy", + input: input{ + year: 1986, + month: 3, + day: 30, + hour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + output: &output{ + value: 2, + }, + }, + { + name: "before first without staleness policy", + input: input{ + year: 1983, + month: 9, + day: 29, + hour: 12, + staleness: time.Duration(0), + }, + }, + { + name: "before first with staleness policy", + input: input{ + year: 1983, + month: 9, + day: 29, + hour: 12, + staleness: time.Duration(365*24) * time.Hour, + }, + }, + { + name: "after third with staleness policy", + input: input{ + year: 1986, + month: 9, + day: 28, + hour: 12, + staleness: time.Duration(365*24) * time.Hour, + }, + }, + { + name: "first middle without staleness policy", + input: input{ + year: 1984, + month: 9, + day: 28, + hour: 12, + staleness: time.Duration(0), + }, + }, + { + name: "first middle with insufficient staleness policy", + input: input{ + year: 1984, + month: 9, + day: 28, + hour: 12, + staleness: time.Duration(364*24) * time.Hour, + }, + }, + { + name: "first middle with sufficient staleness policy", + input: input{ + year: 1984, + month: 9, + day: 28, + hour: 12, + staleness: time.Duration(365*24) * time.Hour, + }, + output: &output{ + value: 0.5, + }, + }, + { + name: "second middle without staleness policy", + input: input{ + year: 1985, + month: 9, + day: 28, + hour: 12, + staleness: time.Duration(0), + }, + }, + { + name: "second middle with insufficient staleness policy", + input: input{ + year: 1985, + month: 9, + day: 28, + hour: 12, + staleness: time.Duration(364*24) * time.Hour, + }, + }, + { + name: "second middle with sufficient staleness policy", + input: input{ + year: 1985, + month: 9, + day: 28, + hour: 12, + staleness: time.Duration(365*24) * time.Hour, + }, + output: &output{ + value: 1.5, + }, + }, + }, + }, } - if s.Value != model.SampleValue(0) { - t.Error("an incorrect sample value was returned") - } + for i, context := range contexts { + // Wrapping in function to enable garbage collection of resources. + func() { + temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") - if s.Timestamp != d { - t.Error("an incorrect timestamp for the sample was returned") - } + defer func() { + if err := os.RemoveAll(temporaryDirectory); err != nil { + t.Errorf("%d(%s). Could not remove temporary directory: %q\n", i, context.name, err) + } + }() - d = time.Date(1985, 3, 30, 0, 0, 0, 0, time.UTC) + persistence, _ := NewLevelDBMetricPersistence(temporaryDirectory) - s, sErr = persistence.GetValueAtTime(&m, &d, p) + defer func() { + persistence.Close() + }() - if sErr != nil { - t.Error(sErr) - } + m := model.Metric{ + "name": "age_in_years", + } - if s != nil { - t.Error("no sample should be returned") - } + for j, value := range context.values { + err := persistence.AppendSample(&model.Sample{ + Value: model.SampleValue(value.value), + Timestamp: time.Date(value.year, value.month, value.day, value.hour, 0, 0, 0, time.UTC), + Metric: m, + }) - d = time.Date(1983, 3, 30, 0, 0, 0, 0, time.UTC) + if err != nil { + t.Errorf("%d.%d(%s). Could not create sample: %q\n", i, j, context.name, err) + } + } - s, sErr = persistence.GetValueAtTime(&m, &d, p) + for j, behavior := range context.behaviors { + input := behavior.input + time := time.Date(input.year, input.month, input.day, input.hour, 0, 0, 0, time.UTC) + p := metric.StalenessPolicy{ + DeltaAllowance: input.staleness, + } - if sErr != nil { - t.Error(sErr) - } + actual, err := persistence.GetValueAtTime(&m, &time, &p) + if err != nil { + t.Errorf("%d.%d(%s). Could not query for value: %q\n", i, j, behavior.name, err) + } - if s != nil { - t.Error("no sample should be returned") - } + if behavior.output == nil { + if actual != nil { + t.Errorf("%d.%d(%s). Expected nil but got: %q\n", i, j, behavior.name, actual) + } + } else { + if actual == nil { + t.Errorf("%d.%d(%s). Expected %s but got nil\n", i, j, behavior.name, behavior.output) + } else { + if actual.Value != behavior.output.value { + t.Errorf("%d.%d(%s). Expected %s but got %s\n", i, j, behavior.name, behavior.output, actual) - appendErr = persistence.AppendSample(&model.Sample{ - Value: model.SampleValue(1), - Timestamp: time.Date(1985, 3, 30, 0, 0, 0, 0, time.UTC), - Metric: m, - }) - - if appendErr != nil { - t.Error(appendErr) - } - - d = time.Date(1985, 3, 30, 0, 0, 0, 0, time.UTC) - s, sErr = persistence.GetValueAtTime(&m, &d, p) - - if sErr != nil { - t.Error(sErr) - } - - if s == nil { - t.Error("a sample should be returned") - } - - if s.Value != model.SampleValue(1) { - t.Error("an incorrect sample value was returned") - } - - if s.Timestamp != d { - t.Error("an incorrect timestamp for the sample was returned") - } - - d = time.Date(1984, 9, 24, 0, 0, 0, 0, time.UTC) - s, sErr = persistence.GetValueAtTime(&m, &d, p) - - if sErr != nil { - t.Error(sErr) - } - - if s == nil { - t.Error("a sample should be returned") - } - - if fmt.Sprintf("%f", s.Value) != model.SampleValue(0.487671).String() { - t.Errorf("an incorrect sample value was returned: %s\n", s.Value) - } - - if s.Timestamp != d { - t.Error("an incorrect timestamp for the sample was returned") + } + } + } + } + }() + } +} + +func TestGetValueAtTime(t *testing.T) { + testGetValueAtTime(t) +} + +func BenchmarkGetValueAtTime(b *testing.B) { + for i := 0; i < b.N; i++ { + testGetValueAtTime(b) + } +} + +var testGetBoundaryValues = func(t test.Tester) { + type value struct { + year int + month time.Month + day int + hour int + value float32 + } + + type input struct { + openYear int + openMonth time.Month + openDay int + openHour int + endYear int + endMonth time.Month + endDay int + endHour int + staleness time.Duration + } + + type output struct { + open model.SampleValue + end model.SampleValue + } + + type behavior struct { + name string + input input + output *output + } + + var contexts = []struct { + name string + values []value + behaviors []behavior + }{ + { + name: "no values", + values: []value{}, + behaviors: []behavior{ + { + name: "non-existent interval without staleness policy", + input: input{ + openYear: 1984, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1985, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(0), + }, + }, + { + name: "non-existent interval with staleness policy", + input: input{ + openYear: 1984, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1985, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + }, + }, + }, + { + name: "single value", + values: []value{ + { + year: 1984, + month: 3, + day: 30, + hour: 0, + value: 0, + }, + }, + behaviors: []behavior{ + { + name: "on start but missing end without staleness policy", + input: input{ + openYear: 1984, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1985, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(0), + }, + }, + { + name: "non-existent interval with staleness policy", + input: input{ + openYear: 1984, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1985, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + }, + { + name: "on end but not start without staleness policy", + input: input{ + openYear: 1983, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1984, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(0), + }, + }, + { + name: "on end but not start without staleness policy", + input: input{ + openYear: 1983, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1984, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + }, + { + name: "before point without staleness policy", + input: input{ + openYear: 1982, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1983, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(0), + }, + }, + { + name: "before point with staleness policy", + input: input{ + openYear: 1982, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1983, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + }, + { + name: "after point without staleness policy", + input: input{ + openYear: 1985, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1986, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(0), + }, + }, + { + name: "after point with staleness policy", + input: input{ + openYear: 1985, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1986, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + }, + { + name: "spanning point without staleness policy", + input: input{ + openYear: 1983, + openMonth: 9, + openDay: 29, + openHour: 12, + endYear: 1984, + endMonth: 9, + endDay: 28, + endHour: 12, + staleness: time.Duration(0), + }, + }, + { + name: "spanning point with staleness policy", + input: input{ + openYear: 1983, + openMonth: 9, + openDay: 29, + openHour: 12, + endYear: 1984, + endMonth: 9, + endDay: 28, + endHour: 12, + staleness: time.Duration(365*24) * time.Hour, + }, + }, + }, + }, + { + name: "double values", + values: []value{ + { + year: 1984, + month: 3, + day: 30, + hour: 0, + value: 0, + }, + { + year: 1985, + month: 3, + day: 30, + hour: 0, + value: 1, + }, + }, + behaviors: []behavior{ + { + name: "on points without staleness policy", + input: input{ + openYear: 1984, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1985, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(0), + }, + output: &output{ + open: 0, + end: 1, + }, + }, + { + name: "on points with staleness policy", + input: input{ + openYear: 1984, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1985, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + output: &output{ + open: 0, + end: 1, + }, + }, + { + name: "on first before second outside of staleness", + input: input{ + openYear: 1984, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1984, + endMonth: 6, + endDay: 29, + endHour: 6, + staleness: time.Duration(178*24) * time.Hour, + }, + }, + { + name: "on first before second within staleness", + input: input{ + openYear: 1984, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1984, + endMonth: 6, + endDay: 29, + endHour: 6, + staleness: time.Duration(356*24) * time.Hour, + }, + }, + { + name: "on first after second outside of staleness", + input: input{ + openYear: 1984, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1985, + endMonth: 6, + endDay: 29, + endHour: 6, + staleness: time.Duration(178*24) * time.Hour, + }, + }, + { + name: "on first after second within staleness", + input: input{ + openYear: 1984, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1985, + endMonth: 6, + endDay: 29, + endHour: 6, + staleness: time.Duration(356*24) * time.Hour, + }, + }, + }, + }, + } + + for i, context := range contexts { + // Wrapping in function to enable garbage collection of resources. + func() { + temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") + + defer func() { + if err := os.RemoveAll(temporaryDirectory); err != nil { + t.Errorf("%d(%s). Could not remove temporary directory: %q\n", i, context.name, err) + } + }() + + persistence, _ := NewLevelDBMetricPersistence(temporaryDirectory) + + defer func() { + persistence.Close() + }() + + m := model.Metric{ + "name": "age_in_years", + } + + for j, value := range context.values { + err := persistence.AppendSample(&model.Sample{ + Value: model.SampleValue(value.value), + Timestamp: time.Date(value.year, value.month, value.day, value.hour, 0, 0, 0, time.UTC), + Metric: m, + }) + + if err != nil { + t.Errorf("%d.%d(%s). Could not create sample: %q\n", i, j, context.name, err) + } + } + + for j, behavior := range context.behaviors { + input := behavior.input + open := time.Date(input.openYear, input.openMonth, input.openDay, input.openHour, 0, 0, 0, time.UTC) + end := time.Date(input.endYear, input.endMonth, input.endDay, input.endHour, 0, 0, 0, time.UTC) + i := model.Interval{ + OldestInclusive: open, + NewestInclusive: end, + } + p := metric.StalenessPolicy{ + DeltaAllowance: input.staleness, + } + + openValue, endValue, err := persistence.GetBoundaryValues(&m, &i, &p) + if err != nil { + t.Errorf("%d.%d(%s). Could not query for value: %q\n", i, j, behavior.name, err) + } + + if behavior.output == nil { + if openValue != nil { + t.Errorf("%d.%d(%s). Expected nil but got: %q\n", i, j, behavior.name, openValue) + } + if endValue != nil { + t.Errorf("%d.%d(%s). Expected nil but got: %q\n", i, j, behavior.name, endValue) + } + } else { + if openValue == nil { + t.Errorf("%d.%d(%s). Expected %s but got nil\n", i, j, behavior.name, behavior.output) + } + if endValue == nil { + t.Errorf("%d.%d(%s). Expected %s but got nil\n", i, j, behavior.name, behavior.output) + } + if openValue.Value != behavior.output.open { + t.Errorf("%d.%d(%s). Expected %s but got %s\n", i, j, behavior.name, behavior.output.open, openValue.Value) + } + if endValue.Value != behavior.output.end { + t.Errorf("%d.%d(%s). Expected %s but got %s\n", i, j, behavior.name, behavior.output.end, endValue.Value) + } + } + } + }() + } +} + +func TestGetBoundaryValues(t *testing.T) { + testGetBoundaryValues(t) +} + +func BenchmarkGetBoundaryValues(b *testing.B) { + for i := 0; i < b.N; i++ { + testGetBoundaryValues(b) + } +} + +var testGetRangeValues = func(t test.Tester) { + type value struct { + year int + month time.Month + day int + hour int + value float32 + } + + type input struct { + openYear int + openMonth time.Month + openDay int + openHour int + endYear int + endMonth time.Month + endDay int + endHour int + staleness time.Duration + } + + type output struct { + year int + month time.Month + day int + hour int + value float32 + } + + type behavior struct { + name string + input input + output []output + } + + var contexts = []struct { + name string + values []value + behaviors []behavior + }{ + { + name: "no values", + values: []value{}, + behaviors: []behavior{ + { + name: "non-existent interval without staleness policy", + input: input{ + openYear: 1984, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1985, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(0), + }, + }, + { + name: "non-existent interval with staleness policy", + input: input{ + openYear: 1984, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1985, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + }, + }, + }, + { + name: "singleton value", + values: []value{ + { + year: 1984, + month: 3, + day: 30, + hour: 0, + value: 0, + }, + }, + behaviors: []behavior{ + { + name: "start on first value without staleness policy", + input: input{ + openYear: 1984, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1985, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(0), + }, + output: []output{ + { + year: 1984, + month: 3, + day: 30, + hour: 0, + value: 0, + }, + }, + }, + { + name: "start on first value with staleness policy", + input: input{ + openYear: 1984, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1985, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + output: []output{ + { + year: 1984, + month: 3, + day: 30, + hour: 0, + value: 0, + }, + }, + }, + { + name: "end on first value without staleness policy", + input: input{ + openYear: 1983, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1984, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(0), + }, + output: []output{ + { + year: 1984, + month: 3, + day: 30, + hour: 0, + value: 0, + }, + }, + }, + { + name: "end on first value with staleness policy", + input: input{ + openYear: 1983, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1984, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + output: []output{ + { + year: 1984, + month: 3, + day: 30, + hour: 0, + value: 0, + }, + }, + }, + { + name: "overlap on first value without staleness policy", + input: input{ + openYear: 1983, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1985, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(0), + }, + output: []output{ + { + year: 1984, + month: 3, + day: 30, + hour: 0, + value: 0, + }, + }, + }, + { + name: "overlap on first value with staleness policy", + input: input{ + openYear: 1983, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1985, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + output: []output{ + { + year: 1984, + month: 3, + day: 30, + hour: 0, + value: 0, + }, + }, + }, + }, + }, + { + name: "two values", + values: []value{ + { + year: 1984, + month: 3, + day: 30, + hour: 0, + value: 0, + }, + { + year: 1985, + month: 3, + day: 30, + hour: 0, + value: 1, + }, + }, + behaviors: []behavior{ + { + name: "start on first value without staleness policy", + input: input{ + openYear: 1984, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1985, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(0), + }, + output: []output{ + { + year: 1984, + month: 3, + day: 30, + hour: 0, + value: 0, + }, + { + year: 1985, + month: 3, + day: 30, + hour: 0, + value: 1, + }, + }, + }, + { + name: "start on first value with staleness policy", + input: input{ + openYear: 1984, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1985, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + output: []output{ + { + year: 1984, + month: 3, + day: 30, + hour: 0, + value: 0, + }, + { + year: 1985, + month: 3, + day: 30, + hour: 0, + value: 1, + }, + }, + }, + { + name: "start on second value without staleness policy", + input: input{ + openYear: 1985, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1986, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(0), + }, + output: []output{ + { + year: 1985, + month: 3, + day: 30, + hour: 0, + value: 1, + }, + }, + }, + { + name: "start on second value with staleness policy", + input: input{ + openYear: 1985, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1986, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + output: []output{ + { + year: 1985, + month: 3, + day: 30, + hour: 0, + value: 1, + }, + }, + }, + { + name: "end on first value without staleness policy", + input: input{ + openYear: 1983, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1984, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(0), + }, + output: []output{ + { + year: 1984, + month: 3, + day: 30, + hour: 0, + value: 0, + }, + }, + }, + { + name: "end on first value with staleness policy", + input: input{ + openYear: 1983, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1984, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + output: []output{ + { + year: 1984, + month: 3, + day: 30, + hour: 0, + value: 0, + }, + }, + }, + { + name: "end on second value without staleness policy", + input: input{ + openYear: 1985, + openMonth: 1, + openDay: 1, + openHour: 0, + endYear: 1985, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(0), + }, + output: []output{ + { + year: 1985, + month: 3, + day: 30, + hour: 0, + value: 1, + }, + }, + }, + { + name: "end on second value with staleness policy", + input: input{ + openYear: 1985, + openMonth: 1, + openDay: 1, + openHour: 0, + endYear: 1985, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + output: []output{ + { + year: 1985, + month: 3, + day: 30, + hour: 0, + value: 1, + }, + }, + }, + { + name: "overlap on values without staleness policy", + input: input{ + openYear: 1983, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1986, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(0), + }, + output: []output{ + { + year: 1984, + month: 3, + day: 30, + hour: 0, + value: 0, + }, + { + year: 1985, + month: 3, + day: 30, + hour: 0, + value: 1, + }, + }, + }, + { + name: "overlap on values with staleness policy", + input: input{ + openYear: 1983, + openMonth: 3, + openDay: 30, + openHour: 0, + endYear: 1986, + endMonth: 3, + endDay: 30, + endHour: 0, + staleness: time.Duration(365*24) * time.Hour, + }, + output: []output{ + { + year: 1984, + month: 3, + day: 30, + hour: 0, + value: 0, + }, + { + year: 1985, + month: 3, + day: 30, + hour: 0, + value: 1, + }, + }, + }, + }, + }, + } + + for i, context := range contexts { + // Wrapping in function to enable garbage collection of resources. + func() { + temporaryDirectory, _ := ioutil.TempDir("", "leveldb_metric_persistence_test") + + defer func() { + if err := os.RemoveAll(temporaryDirectory); err != nil { + t.Errorf("%d(%s). Could not remove temporary directory: %q\n", i, context.name, err) + } + }() + + persistence, _ := NewLevelDBMetricPersistence(temporaryDirectory) + + defer func() { + persistence.Close() + }() + + m := model.Metric{ + "name": "age_in_years", + } + + for j, value := range context.values { + err := persistence.AppendSample(&model.Sample{ + Value: model.SampleValue(value.value), + Timestamp: time.Date(value.year, value.month, value.day, value.hour, 0, 0, 0, time.UTC), + Metric: m, + }) + + if err != nil { + t.Errorf("%d.%d(%s). Could not create sample: %q\n", i, j, context.name, err) + } + } + + for j, behavior := range context.behaviors { + input := behavior.input + open := time.Date(input.openYear, input.openMonth, input.openDay, input.openHour, 0, 0, 0, time.UTC) + end := time.Date(input.endYear, input.endMonth, input.endDay, input.endHour, 0, 0, 0, time.UTC) + i := model.Interval{ + OldestInclusive: open, + NewestInclusive: end, + } + p := metric.StalenessPolicy{ + DeltaAllowance: input.staleness, + } + + values, err := persistence.GetRangeValues(&m, &i, &p) + if err != nil { + t.Errorf("%d.%d(%s). Could not query for value: %q\n", i, j, behavior.name, err) + } + + if values == nil && len(behavior.output) != 0 { + t.Fatalf("%d.%d(%s). Expected %s but got: %s\n", i, j, behavior.name, behavior.output, values) + } + + if behavior.output == nil { + if values != nil { + t.Fatalf("%d.%d(%s). Expected nil values but got: %s\n", i, j, behavior.name, values) + } + } else { + if len(behavior.output) != len(values.Values) { + t.Errorf("%d.%d(%s). Expected length %d but got: %d\n", i, j, len(behavior.output), len(values.Values)) + } + + for k, actual := range values.Values { + expected := behavior.output[k] + if actual.Value != model.SampleValue(expected.value) { + t.Errorf("%d.%d.%d(%s). Expected %d but got: %d\n", i, j, k, len(behavior.output), actual) + } + } + } + } + }() + } +} + +func TestGetRangeValues(t *testing.T) { + testGetRangeValues(t) +} + +func BenchmarkGetRangeValues(b *testing.B) { + for i := 0; i < b.N; i++ { + testGetRangeValues(b) } } diff --git a/storage/raw/index/leveldb/leveldb.go b/storage/raw/index/leveldb/leveldb.go index bc3b1791e..0d5a73d6d 100644 --- a/storage/raw/index/leveldb/leveldb.go +++ b/storage/raw/index/leveldb/leveldb.go @@ -43,16 +43,16 @@ func (l *LevelDBMembershipIndex) Put(key coding.Encoder) error { return l.persistence.Put(key, existenceValue) } -func NewLevelDBMembershipIndex(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (*LevelDBMembershipIndex, error) { - var leveldbPersistence *leveldb.LevelDBPersistence - var persistenceError error +func NewLevelDBMembershipIndex(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (i *LevelDBMembershipIndex, err error) { - if leveldbPersistence, persistenceError = leveldb.NewLevelDBPersistence(storageRoot, cacheCapacity, bitsPerBloomFilterEncoded); persistenceError == nil { - leveldbMembershipIndex := &LevelDBMembershipIndex{ - persistence: leveldbPersistence, - } - return leveldbMembershipIndex, nil + leveldbPersistence, err := leveldb.NewLevelDBPersistence(storageRoot, cacheCapacity, bitsPerBloomFilterEncoded) + if err != nil { + return } - return nil, persistenceError + i = &LevelDBMembershipIndex{ + persistence: leveldbPersistence, + } + + return } diff --git a/storage/raw/leveldb/leveldb.go b/storage/raw/leveldb/leveldb.go index 927afbc25..52afd506e 100644 --- a/storage/raw/leveldb/leveldb.go +++ b/storage/raw/leveldb/leveldb.go @@ -36,7 +36,7 @@ type iteratorCloser struct { storage *levigo.DB } -func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (*LevelDBPersistence, error) { +func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (p *LevelDBPersistence, err error) { options := levigo.NewOptions() options.SetCreateIfMissing(true) options.SetParanoidChecks(true) @@ -47,13 +47,16 @@ func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilter filterPolicy := levigo.NewBloomFilter(bitsPerBloomFilterEncoded) options.SetFilterPolicy(filterPolicy) - storage, openErr := levigo.Open(storageRoot, options) + storage, err := levigo.Open(storageRoot, options) + if err != nil { + return + } readOptions := levigo.NewReadOptions() writeOptions := levigo.NewWriteOptions() writeOptions.SetSync(true) - emission := &LevelDBPersistence{ + p = &LevelDBPersistence{ cache: cache, filterPolicy: filterPolicy, options: options, @@ -62,13 +65,17 @@ func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilter writeOptions: writeOptions, } - return emission, openErr + return } -func (l *LevelDBPersistence) Close() error { - if l.storage != nil { - l.storage.Close() - } +func (l *LevelDBPersistence) Close() (err error) { + // These are deferred to take advantage of forced closing in case of stack + // unwinding due to anomalies. + defer func() { + if l.storage != nil { + l.storage.Close() + } + }() defer func() { if l.filterPolicy != nil { @@ -100,60 +107,57 @@ func (l *LevelDBPersistence) Close() error { } }() - return nil + return } -func (l *LevelDBPersistence) Get(value coding.Encoder) ([]byte, error) { - if key, keyError := value.Encode(); keyError == nil { - return l.storage.Get(l.readOptions, key) - } else { - return nil, keyError +func (l *LevelDBPersistence) Get(value coding.Encoder) (b []byte, err error) { + key, err := value.Encode() + if err != nil { + return } - panic("unreachable") + return l.storage.Get(l.readOptions, key) } -func (l *LevelDBPersistence) Has(value coding.Encoder) (bool, error) { - if value, getError := l.Get(value); getError != nil { - return false, getError - } else if value == nil { - return false, nil +func (l *LevelDBPersistence) Has(value coding.Encoder) (h bool, err error) { + raw, err := l.Get(value) + if err != nil { + return } - return true, nil + h = raw != nil + + return } -func (l *LevelDBPersistence) Drop(value coding.Encoder) error { - if key, keyError := value.Encode(); keyError == nil { - - if deleteError := l.storage.Delete(l.writeOptions, key); deleteError != nil { - return deleteError - } - } else { - return keyError +func (l *LevelDBPersistence) Drop(value coding.Encoder) (err error) { + key, err := value.Encode() + if err != nil { + return } - return nil + err = l.storage.Delete(l.writeOptions, key) + + return } -func (l *LevelDBPersistence) Put(key, value coding.Encoder) error { - if keyEncoded, keyError := key.Encode(); keyError == nil { - if valueEncoded, valueError := value.Encode(); valueError == nil { - - if putError := l.storage.Put(l.writeOptions, keyEncoded, valueEncoded); putError != nil { - return putError - } - } else { - return valueError - } - } else { - return keyError +func (l *LevelDBPersistence) Put(key, value coding.Encoder) (err error) { + keyEncoded, err := key.Encode() + if err != nil { + return } - return nil + valueEncoded, err := value.Encode() + if err != nil { + return + } + + err = l.storage.Put(l.writeOptions, keyEncoded, valueEncoded) + + return } -func (l *LevelDBPersistence) GetAll() ([]raw.Pair, error) { +func (l *LevelDBPersistence) GetAll() (pairs []raw.Pair, err error) { snapshot := l.storage.NewSnapshot() defer l.storage.ReleaseSnapshot(snapshot) readOptions := levigo.NewReadOptions() @@ -164,22 +168,19 @@ func (l *LevelDBPersistence) GetAll() ([]raw.Pair, error) { defer iterator.Close() iterator.SeekToFirst() - result := make([]raw.Pair, 0) - for iterator := iterator; iterator.Valid(); iterator.Next() { - result = append(result, raw.Pair{Left: iterator.Key(), Right: iterator.Value()}) + pairs = append(pairs, raw.Pair{Left: iterator.Key(), Right: iterator.Value()}) + + err = iterator.GetError() + if err != nil { + return + } } - iteratorError := iterator.GetError() - - if iteratorError != nil { - return nil, iteratorError - } - - return result, nil + return } -func (i *iteratorCloser) Close() error { +func (i *iteratorCloser) Close() (err error) { defer func() { if i.storage != nil { if i.snapshot != nil { @@ -200,21 +201,21 @@ func (i *iteratorCloser) Close() error { } }() - return nil + return } -func (l *LevelDBPersistence) GetIterator() (*levigo.Iterator, io.Closer, error) { +func (l *LevelDBPersistence) GetIterator() (i *levigo.Iterator, c io.Closer, err error) { snapshot := l.storage.NewSnapshot() readOptions := levigo.NewReadOptions() readOptions.SetSnapshot(snapshot) - iterator := l.storage.NewIterator(readOptions) + i = l.storage.NewIterator(readOptions) - closer := &iteratorCloser{ - iterator: iterator, + c = &iteratorCloser{ + iterator: i, readOptions: readOptions, snapshot: snapshot, storage: l.storage, } - return iterator, closer, nil + return } diff --git a/utility/test/interface.go b/utility/test/interface.go new file mode 100644 index 000000000..e0750af3d --- /dev/null +++ b/utility/test/interface.go @@ -0,0 +1,21 @@ +// Copyright 2012 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +type Tester interface { + Error(args ...interface{}) + Errorf(format string, args ...interface{}) + Fatal(args ...interface{}) + Fatalf(format string, args ...interface{}) +}