From 1f7ed52b465a5ac656d10a821790a33623999bab Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Thu, 14 Mar 2013 19:24:28 -0700 Subject: [PATCH] Start writing high watermarks. --- model/data.proto | 4 ++ storage/metric/instrumentation.go | 1 + storage/metric/leveldb.go | 83 +++++++++++++++++++++++++++++-- 3 files changed, 85 insertions(+), 3 deletions(-) diff --git a/model/data.proto b/model/data.proto index 7b8c97514..4f056c08b 100644 --- a/model/data.proto +++ b/model/data.proto @@ -55,3 +55,7 @@ message SampleValueSeries { message MembershipIndexValue { } + +message MetricHighWatermark { + optional int64 timestamp = 1; +} diff --git a/storage/metric/instrumentation.go b/storage/metric/instrumentation.go index 7df029f4a..42979a6a4 100644 --- a/storage/metric/instrumentation.go +++ b/storage/metric/instrumentation.go @@ -49,6 +49,7 @@ const ( indexMetric = "index_metric" indexMetrics = "index_metrics" rebuildDiskFrontier = "rebuild_disk_frontier" + refreshHighWatermarks = "refresh_high_watermarks" renderView = "render_view" setLabelNameFingerprints = "set_label_name_fingerprints" setLabelPairFingerprints = "set_label_pair_fingerprints" diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index 3a18200f8..b4cff7c28 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -39,16 +39,18 @@ var ( type LevelDBMetricPersistence struct { fingerprintToMetrics *leveldb.LevelDBPersistence - metricSamples *leveldb.LevelDBPersistence labelNameToFingerprints *leveldb.LevelDBPersistence labelSetToFingerprints *leveldb.LevelDBPersistence + metricHighWatermarks *leveldb.LevelDBPersistence metricMembershipIndex *index.LevelDBMembershipIndex + metricSamples *leveldb.LevelDBPersistence } var ( // These flag values are back of the envelope, though they seem sensible. // Please re-evaluate based on your own needs. fingerprintsToLabelPairCacheSize = flag.Int("fingerprintsToLabelPairCacheSizeBytes", 100*1024*1024, "The size for the fingerprint to label pair index (bytes).") + highWatermarkCacheSize = flag.Int("highWatermarksByFingerprintSizeBytes", 50*1024*1024, "The size for the metric high watermarks (bytes).") samplesByFingerprintCacheSize = flag.Int("samplesByFingerprintCacheSizeBytes", 500*1024*1024, "The size for the samples database (bytes).") labelNameToFingerprintsCacheSize = flag.Int("labelNameToFingerprintsCacheSizeBytes", 100*1024*1024, "The size for the label name to metric fingerprint index (bytes).") labelPairToFingerprintsCacheSize = flag.Int("labelPairToFingerprintsCacheSizeBytes", 100*1024*1024, "The size for the label pair to metric fingerprint index (bytes).") @@ -66,6 +68,10 @@ func (l *LevelDBMetricPersistence) Close() error { "Fingerprint to Label Name and Value Pairs", l.fingerprintToMetrics, }, + { + "Fingerprint High Watermarks", + l.metricHighWatermarks, + }, { "Fingerprint Samples", l.metricSamples, @@ -117,7 +123,7 @@ func (l *LevelDBMetricPersistence) Close() error { } func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetricPersistence, err error) { - errorChannel := make(chan error, 5) + errorChannel := make(chan error, 6) emission := &LevelDBMetricPersistence{} @@ -141,6 +147,14 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr errorChannel <- err }, }, + { + "High Watermarks by Fingerprint", + func() { + var err error + emission.metricHighWatermarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/high_watermarks_by_fingerprint", *highWatermarkCacheSize, 10) + errorChannel <- err + }, + }, { "Fingerprints by Label Name", func() { @@ -529,6 +543,60 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri return } +func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Fingerprint]model.Samples) (err error) { + begin := time.Now() + defer func() { + duration := time.Since(begin) + + recordOutcome(duration, err, map[string]string{operation: refreshHighWatermarks, result: success}, map[string]string{operation: refreshHighWatermarks, result: failure}) + }() + + batch := leveldb.NewBatch() + defer batch.Close() + + var ( + mutationCount = 0 + ) + for fingerprint, samples := range groups { + var ( + key = &dto.Fingerprint{} + value = &dto.MetricHighWatermark{} + raw []byte + oldestSampleTimestamp = samples[len(samples)-1].Timestamp + keyEncoded = coding.NewProtocolBufferEncoder(key) + ) + + key.Signature = proto.String(fingerprint.ToRowKey()) + raw, err = l.metricHighWatermarks.Get(keyEncoded) + if err != nil { + panic(err) + return + } + + if raw != nil { + err = proto.Unmarshal(raw, value) + if err != nil { + panic(err) + continue + } + + if oldestSampleTimestamp.Before(time.Unix(*value.Timestamp, 0)) { + continue + } + } + value.Timestamp = proto.Int64(oldestSampleTimestamp.Unix()) + batch.Put(keyEncoded, coding.NewProtocolBufferEncoder(value)) + mutationCount++ + } + + err = l.metricHighWatermarks.Commit(batch) + if err != nil { + panic(err) + } + + return +} + func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err error) { begin := time.Now() defer func() { @@ -540,6 +608,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err var ( fingerprintToSamples = groupByFingerprint(samples) indexErrChan = make(chan error) + watermarkErrChan = make(chan error) ) go func(groups map[model.Fingerprint]model.Samples) { @@ -554,6 +623,10 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err indexErrChan <- l.indexMetrics(metrics) }(fingerprintToSamples) + go func(groups map[model.Fingerprint]model.Samples) { + watermarkErrChan <- l.refreshHighWatermarks(groups) + }(fingerprintToSamples) + samplesBatch := leveldb.NewBatch() defer samplesBatch.Close() @@ -593,7 +666,6 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err } err = l.metricSamples.Commit(samplesBatch) - if err != nil { panic(err) } @@ -603,6 +675,11 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err panic(err) } + err = <-watermarkErrChan + if err != nil { + panic(err) + } + return }