diff --git a/model/data.proto b/model/data.proto index 6be584836..8bad15d27 100644 --- a/model/data.proto +++ b/model/data.proto @@ -50,14 +50,6 @@ message SampleKey { optional fixed32 sample_count = 4; } -message SampleValueSeries { - message Value { - optional int64 timestamp = 1; - optional double value = 2; - } - repeated Value value = 1; -} - message MembershipIndexValue { } @@ -69,7 +61,7 @@ message MetricHighWatermark { // corpus that ensures that sparse samples. message CompactionProcessorDefinition { // minimum_group_size identifies how minimally samples should be grouped - // together to write a new SampleValueSeries chunk. + // together to write a new samples chunk. optional uint32 minimum_group_size = 1; } diff --git a/model/generated/data.pb.go b/model/generated/data.pb.go index a2ae871bd..7171aced8 100644 --- a/model/generated/data.pb.go +++ b/model/generated/data.pb.go @@ -2,6 +2,27 @@ // source: data.proto // DO NOT EDIT! +/* +Package io_prometheus is a generated protocol buffer package. + +It is generated from these files: + data.proto + +It has these top-level messages: + LabelPair + LabelName + Metric + Fingerprint + FingerprintCollection + LabelSet + SampleKey + MembershipIndexValue + MetricHighWatermark + CompactionProcessorDefinition + CurationKey + CurationValue + DeletionProcessorDefinition +*/ package io_prometheus import proto "code.google.com/p/goprotobuf/proto" @@ -119,6 +140,9 @@ func (m *LabelSet) GetMember() []*LabelPair { return nil } +// The default LevelDB comparator sorts not only lexicographically, but also by +// key length (which takes precedence). Thus, no variable-length fields may be +// introduced into the key definition below. type SampleKey struct { Fingerprint *Fingerprint `protobuf:"bytes,1,opt,name=fingerprint" json:"fingerprint,omitempty"` Timestamp []byte `protobuf:"bytes,2,opt,name=timestamp" json:"timestamp,omitempty"` @@ -159,46 +183,6 @@ func (m *SampleKey) GetSampleCount() uint32 { return 0 } -type SampleValueSeries struct { - Value []*SampleValueSeries_Value `protobuf:"bytes,1,rep,name=value" json:"value,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *SampleValueSeries) Reset() { *m = SampleValueSeries{} } -func (m *SampleValueSeries) String() string { return proto.CompactTextString(m) } -func (*SampleValueSeries) ProtoMessage() {} - -func (m *SampleValueSeries) GetValue() []*SampleValueSeries_Value { - if m != nil { - return m.Value - } - return nil -} - -type SampleValueSeries_Value struct { - Timestamp *int64 `protobuf:"varint,1,opt,name=timestamp" json:"timestamp,omitempty"` - Value *float64 `protobuf:"fixed64,2,opt,name=value" json:"value,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *SampleValueSeries_Value) Reset() { *m = SampleValueSeries_Value{} } -func (m *SampleValueSeries_Value) String() string { return proto.CompactTextString(m) } -func (*SampleValueSeries_Value) ProtoMessage() {} - -func (m *SampleValueSeries_Value) GetTimestamp() int64 { - if m != nil && m.Timestamp != nil { - return *m.Timestamp - } - return 0 -} - -func (m *SampleValueSeries_Value) GetValue() float64 { - if m != nil && m.Value != nil { - return *m.Value - } - return 0 -} - type MembershipIndexValue struct { XXX_unrecognized []byte `json:"-"` } @@ -223,7 +207,11 @@ func (m *MetricHighWatermark) GetTimestamp() int64 { return 0 } +// CompactionProcessorDefinition models a curation process across the sample +// corpus that ensures that sparse samples. type CompactionProcessorDefinition struct { + // minimum_group_size identifies how minimally samples should be grouped + // together to write a new samples chunk. MinimumGroupSize *uint32 `protobuf:"varint,1,opt,name=minimum_group_size" json:"minimum_group_size,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -239,12 +227,38 @@ func (m *CompactionProcessorDefinition) GetMinimumGroupSize() uint32 { return 0 } +// CurationKey models the state of curation for a given metric fingerprint and +// its associated samples. The time series database only knows about compaction +// and resampling behaviors that are explicitly defined to it in its runtime +// configuration, meaning it never scans on-disk tables for CurationKey +// policies; rather, it looks up via the CurationKey tuple to find out what the +// effectuation state for a given metric fingerprint is. +// +// For instance, how far along as a rule for (Fingerprint A, Samples Older Than +// B, and Curation Processor) has been effectuated on-disk. type CurationKey struct { - Fingerprint *Fingerprint `protobuf:"bytes,1,opt,name=fingerprint" json:"fingerprint,omitempty"` - ProcessorMessageTypeName *string `protobuf:"bytes,2,opt,name=processor_message_type_name" json:"processor_message_type_name,omitempty"` - ProcessorMessageRaw []byte `protobuf:"bytes,3,opt,name=processor_message_raw" json:"processor_message_raw,omitempty"` - IgnoreYoungerThan *int64 `protobuf:"varint,4,opt,name=ignore_younger_than" json:"ignore_younger_than,omitempty"` - XXX_unrecognized []byte `json:"-"` + // fingerprint identifies the fingerprint for the given policy. + Fingerprint *Fingerprint `protobuf:"bytes,1,opt,name=fingerprint" json:"fingerprint,omitempty"` + // processor_message_type_name identifies the underlying message type that + // was used to encode processor_message_raw. + ProcessorMessageTypeName *string `protobuf:"bytes,2,opt,name=processor_message_type_name" json:"processor_message_type_name,omitempty"` + // processor_message_raw identifies the serialized ProcessorSignature for this + // operation. + ProcessorMessageRaw []byte `protobuf:"bytes,3,opt,name=processor_message_raw" json:"processor_message_raw,omitempty"` + // ignore_younger_than represents in seconds relative to when the curation + // cycle start when the curator should stop operating. For instance, if + // the curation cycle starts at time T and the curation remark dictates that + // the curation should starts processing samples at time S, the curator should + // work from S until ignore_younger_than seconds before T: + // + // PAST NOW FUTURE + // + // S--------------->|----------T + // |---IYT----| + // + // [Curation Resumption Time (S), T - IYT) + IgnoreYoungerThan *int64 `protobuf:"varint,4,opt,name=ignore_younger_than" json:"ignore_younger_than,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *CurationKey) Reset() { *m = CurationKey{} } @@ -279,7 +293,11 @@ func (m *CurationKey) GetIgnoreYoungerThan() int64 { return 0 } +// CurationValue models the progress for a given CurationKey. type CurationValue struct { + // last_completion_timestamp represents the seconds since the epoch UTC at + // which the curator last completed its duty cycle for a given metric + // fingerprint. LastCompletionTimestamp *int64 `protobuf:"varint,1,opt,name=last_completion_timestamp" json:"last_completion_timestamp,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -295,6 +313,8 @@ func (m *CurationValue) GetLastCompletionTimestamp() int64 { return 0 } +// DeletionProcessorDefinition models a curation process across the sample +// corpus that deletes old values. type DeletionProcessorDefinition struct { XXX_unrecognized []byte `json:"-"` } diff --git a/model/generated/descriptor.blob b/model/generated/descriptor.blob index 044e1c72b..ec90e3898 100644 Binary files a/model/generated/descriptor.blob and b/model/generated/descriptor.blob differ diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index 422e97b86..55cc7accc 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -323,12 +323,11 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (e key := &SampleKey{} keyDto := &dto.SampleKey{} - value := &dto.SampleValueSeries{} + values := make(Values, 0, *leveldbChunkSize) for fingerprint, group := range fingerprintToSamples { for { - value.Reset() - + values := values[:0] lengthOfGroup := len(group) if lengthOfGroup == 0 { @@ -348,16 +347,16 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (e key.LastTimestamp = chunk[take-1].Timestamp key.SampleCount = uint32(take) + key.Dump(keyDto) + for _, sample := range chunk { - // XXX: Candidate for allocation reduction. - value.Value = append(value.Value, &dto.SampleValueSeries_Value{ - Timestamp: proto.Int64(sample.Timestamp.Unix()), - Value: proto.Float64(float64(sample.Value)), + values = append(values, &SamplePair{ + Timestamp: sample.Timestamp, + Value: sample.Value, }) } - - key.Dump(keyDto) - samplesBatch.Put(keyDto, value) + val := values.marshal() + samplesBatch.PutRaw(keyDto, val) } } @@ -391,15 +390,6 @@ func extractSampleKey(i leveldb.Iterator) (*SampleKey, error) { return key, nil } -func extractSampleValues(i leveldb.Iterator) (Values, error) { - v := &dto.SampleValueSeries{} - if err := i.Value(v); err != nil { - return nil, err - } - - return NewValuesFromDTO(v), nil -} - func (l *LevelDBMetricPersistence) hasIndexMetric(m clientmodel.Metric) (value bool, err error) { defer func(begin time.Time) { duration := time.Since(begin) @@ -625,13 +615,7 @@ func (d *MetricSamplesDecoder) DecodeKey(in interface{}) (interface{}, error) { // DecodeValue implements storage.RecordDecoder. It requires 'in' to be a // SampleValueSeries protobuf. 'out' is of type metric.Values. func (d *MetricSamplesDecoder) DecodeValue(in interface{}) (interface{}, error) { - values := &dto.SampleValueSeries{} - err := proto.Unmarshal(in.([]byte), values) - if err != nil { - return nil, err - } - - return NewValuesFromDTO(values), nil + return unmarshalValues(in.([]byte)), nil } // AcceptAllFilter implements storage.RecordFilter and accepts all records. diff --git a/storage/metric/processor.go b/storage/metric/processor.go index c18f7e30a..5b483a1d9 100644 --- a/storage/metric/processor.go +++ b/storage/metric/processor.go @@ -118,10 +118,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers sampleKey.Load(sampleKeyDto) - unactedSamples, err = extractSampleValues(sampleIterator) - if err != nil { - return - } + unactedSamples = unmarshalValues(sampleIterator.RawValue()) for lastCurated.Before(stopAt) && lastTouchedTime.Before(stopAt) && sampleKey.Fingerprint.Equal(fingerprint) { switch { @@ -147,10 +144,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers break } - unactedSamples, err = extractSampleValues(sampleIterator) - if err != nil { - return - } + unactedSamples = unmarshalValues(sampleIterator.RawValue()) // If the number of pending mutations exceeds the allowed batch amount, // commit to disk and delete the batch. A new one will be recreated if @@ -188,9 +182,8 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers k := &dto.SampleKey{} newSampleKey := pendingSamples.ToSampleKey(fingerprint) newSampleKey.Dump(k) - b := &dto.SampleValueSeries{} - pendingSamples.dump(b) - pendingBatch.Put(k, b) + b := pendingSamples.marshal() + pendingBatch.PutRaw(k, b) pendingMutations++ lastCurated = newSampleKey.FirstTimestamp @@ -238,9 +231,8 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers k := &dto.SampleKey{} newSampleKey := pendingSamples.ToSampleKey(fingerprint) newSampleKey.Dump(k) - b := &dto.SampleValueSeries{} - pendingSamples.dump(b) - pendingBatch.Put(k, b) + b := pendingSamples.marshal() + pendingBatch.PutRaw(k, b) pendingSamples = Values{} pendingMutations++ lastCurated = newSampleKey.FirstTimestamp @@ -347,10 +339,7 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis } sampleKey.Load(sampleKeyDto) - sampleValues, err := extractSampleValues(sampleIterator) - if err != nil { - return - } + sampleValues := unmarshalValues(sampleIterator.RawValue()) pendingMutations := 0 @@ -374,10 +363,7 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis } sampleKey.Load(sampleKeyDto) - sampleValues, err = extractSampleValues(sampleIterator) - if err != nil { - return - } + sampleValues = unmarshalValues(sampleIterator.RawValue()) // If the number of pending mutations exceeds the allowed batch // amount, commit to disk and delete the batch. A new one will @@ -412,10 +398,9 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis k := &dto.SampleKey{} sampleKey = sampleValues.ToSampleKey(fingerprint) sampleKey.Dump(k) - v := &dto.SampleValueSeries{} - sampleValues.dump(v) lastCurated = sampleKey.FirstTimestamp - pendingBatch.Put(k, v) + v := sampleValues.marshal() + pendingBatch.PutRaw(k, v) pendingMutations++ } else { lastCurated = sampleKey.LastTimestamp diff --git a/storage/metric/processor_test.go b/storage/metric/processor_test.go index 1dbbeae5a..b7639fea1 100644 --- a/storage/metric/processor_test.go +++ b/storage/metric/processor_test.go @@ -59,7 +59,7 @@ type out struct { sampleGroups []sampleGroup } -func (c curationState) Get() (key, value proto.Message) { +func (c curationState) Get() (key proto.Message, value interface{}) { signature := c.processor.Signature() fingerprint := &clientmodel.Fingerprint{} fingerprint.LoadFromString(c.fingerprint) @@ -80,7 +80,7 @@ func (c curationState) Get() (key, value proto.Message) { return k, v } -func (w watermarkState) Get() (key, value proto.Message) { +func (w watermarkState) Get() (key proto.Message, value interface{}) { fingerprint := &clientmodel.Fingerprint{} fingerprint.LoadFromString(w.fingerprint) k := &dto.Fingerprint{} @@ -94,7 +94,7 @@ func (w watermarkState) Get() (key, value proto.Message) { return k, v } -func (s sampleGroup) Get() (key, value proto.Message) { +func (s sampleGroup) Get() (key proto.Message, value interface{}) { fingerprint := &clientmodel.Fingerprint{} fingerprint.LoadFromString(s.fingerprint) keyRaw := SampleKey{ @@ -106,10 +106,7 @@ func (s sampleGroup) Get() (key, value proto.Message) { k := &dto.SampleKey{} keyRaw.Dump(k) - v := &dto.SampleValueSeries{} - s.values.dump(v) - - return k, v + return k, s.values.marshal() } type noopUpdater struct{} @@ -963,10 +960,7 @@ func TestCuratorCompactionProcessor(t *testing.T) { if err != nil { t.Fatalf("%d.%d. error %s", i, j, err) } - sampleValues, err := extractSampleValues(iterator) - if err != nil { - t.Fatalf("%d.%d. error %s", i, j, err) - } + sampleValues := unmarshalValues(iterator.RawValue()) expectedFingerprint := &clientmodel.Fingerprint{} expectedFingerprint.LoadFromString(expected.fingerprint) @@ -1493,10 +1487,7 @@ func TestCuratorDeletionProcessor(t *testing.T) { if err != nil { t.Fatalf("%d.%d. error %s", i, j, err) } - sampleValues, err := extractSampleValues(iterator) - if err != nil { - t.Fatalf("%d.%d. error %s", i, j, err) - } + sampleValues := unmarshalValues(iterator.RawValue()) expectedFingerprint := &clientmodel.Fingerprint{} expectedFingerprint.LoadFromString(expected.fingerprint) diff --git a/storage/metric/sample.go b/storage/metric/sample.go index d0afd9e0e..c36cbaaac 100644 --- a/storage/metric/sample.go +++ b/storage/metric/sample.go @@ -15,16 +15,17 @@ package metric import ( "bytes" + "encoding/binary" "fmt" + "math" "sort" - "code.google.com/p/goprotobuf/proto" - clientmodel "github.com/prometheus/client_golang/model" - - dto "github.com/prometheus/prometheus/model/generated" ) +// bytesPerSample is the number of bytes per sample in marshalled format. +const bytesPerSample = 16 + // MarshalJSON implements json.Marshaler. func (s SamplePair) MarshalJSON() ([]byte, error) { return []byte(fmt.Sprintf("{\"Value\": \"%f\", \"Timestamp\": %d}", s.Value, s.Timestamp)), nil @@ -32,8 +33,8 @@ func (s SamplePair) MarshalJSON() ([]byte, error) { // SamplePair pairs a SampleValue with a Timestamp. type SamplePair struct { - Value clientmodel.SampleValue Timestamp clientmodel.Timestamp + Value clientmodel.SampleValue } // Equal returns true if this SamplePair and o have equal Values and equal @@ -46,14 +47,6 @@ func (s *SamplePair) Equal(o *SamplePair) bool { return s.Value.Equal(o.Value) && s.Timestamp.Equal(o.Timestamp) } -func (s *SamplePair) dump(d *dto.SampleValueSeries_Value) { - d.Reset() - - d.Timestamp = proto.Int64(s.Timestamp.Unix()) - d.Value = proto.Float64(float64(s.Value)) - -} - func (s *SamplePair) String() string { return fmt.Sprintf("SamplePair at %s of %s", s.Timestamp, s.Value) } @@ -133,16 +126,6 @@ func (v Values) TruncateBefore(t clientmodel.Timestamp) Values { return v[index:] } -func (v Values) dump(d *dto.SampleValueSeries) { - d.Reset() - - for _, value := range v { - element := &dto.SampleValueSeries_Value{} - value.dump(element) - d.Value = append(d.Value, element) - } -} - // ToSampleKey returns the SampleKey for these Values. func (v Values) ToSampleKey(f *clientmodel.Fingerprint) *SampleKey { return &SampleKey{ @@ -168,19 +151,32 @@ func (v Values) String() string { return buffer.String() } -// NewValuesFromDTO deserializes Values from a DTO. -func NewValuesFromDTO(d *dto.SampleValueSeries) Values { - // BUG(matt): Incogruent from the other load/dump API types, but much - // more performant. - v := make(Values, 0, len(d.Value)) - - for _, value := range d.Value { - v = append(v, &SamplePair{ - Timestamp: clientmodel.TimestampFromUnix(value.GetTimestamp()), - Value: clientmodel.SampleValue(value.GetValue()), - }) +// marshal marshals a group of samples for being written to disk. +func (v Values) marshal() []byte { + buf := make([]byte, len(v)*bytesPerSample) + for i, val := range v { + offset := i * 16 + binary.LittleEndian.PutUint64(buf[offset:], uint64(val.Timestamp.Unix())) + binary.LittleEndian.PutUint64(buf[offset+8:], math.Float64bits(float64(val.Value))) } + return buf +} +// unmarshalValues decodes marshalled samples and returns them as Values. +func unmarshalValues(buf []byte) Values { + n := len(buf) / bytesPerSample + // Setting the value of a given slice index is around 15% faster than doing + // an append, even if the slice already has the required capacity. For this + // reason, we already set the full target length here. + v := make(Values, n) + + for i := 0; i < n; i++ { + offset := i * 16 + v[i] = &SamplePair{ + Timestamp: clientmodel.TimestampFromUnix(int64(binary.LittleEndian.Uint64(buf[offset:]))), + Value: clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(buf[offset+8:]))), + } + } return v } diff --git a/storage/metric/sample_test.go b/storage/metric/sample_test.go new file mode 100644 index 000000000..833cbd463 --- /dev/null +++ b/storage/metric/sample_test.go @@ -0,0 +1,53 @@ +package metric + +import ( + "math/rand" + "testing" + + clientmodel "github.com/prometheus/client_golang/model" +) + +const numTestValues = 5000 + +func TestValuesMarshalAndUnmarshal(t *testing.T) { + values := randomValues(numTestValues) + + marshalled := values.marshal() + unmarshalled := unmarshalValues(marshalled) + + for i, expected := range values { + actual := unmarshalled[i] + if !actual.Equal(expected) { + t.Fatalf("%d. got: %v, expected: %v", i, actual, expected) + } + } +} + +func randomValues(numSamples int) Values { + v := make(Values, 0, numSamples) + for i := 0; i < numSamples; i++ { + v = append(v, &SamplePair{ + Timestamp: clientmodel.Timestamp(rand.Int63()), + Value: clientmodel.SampleValue(rand.NormFloat64()), + }) + } + + return v +} + +func BenchmarkMarshal(b *testing.B) { + v := randomValues(numTestValues) + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.marshal() + } +} + +func BenchmarkUnmarshal(b *testing.B) { + v := randomValues(numTestValues) + marshalled := v.marshal() + b.ResetTimer() + for i := 0; i < b.N; i++ { + unmarshalValues(marshalled) + } +} diff --git a/storage/metric/stochastic_test.go b/storage/metric/stochastic_test.go index 75ed8350c..65db74b03 100644 --- a/storage/metric/stochastic_test.go +++ b/storage/metric/stochastic_test.go @@ -183,10 +183,7 @@ func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp *clientmodel.Fingerpr break } - retrievedValues, err := extractSampleValues(iterator) - if err != nil { - return nil, err - } + retrievedValues := unmarshalValues(iterator.RawValue()) samples = append(samples, retrievedValues...) } diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 79f47ef85..7e5d85d58 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -587,7 +587,7 @@ func (t *TieredStorage) loadChunkAroundTime( // // Only do the rewind if there is another chunk before this one. if !seekingKey.MayContain(ts) { - postValues, _ := extractSampleValues(iterator) + postValues := unmarshalValues(iterator.RawValue()) if !seekingKey.Equal(firstBlock) { if !iterator.Previous() { panic("This should never return false.") @@ -602,13 +602,13 @@ func (t *TieredStorage) loadChunkAroundTime( return postValues, false } - foundValues, _ = extractSampleValues(iterator) + foundValues = unmarshalValues(iterator.RawValue()) foundValues = append(foundValues, postValues...) return foundValues, false } } - foundValues, _ = extractSampleValues(iterator) + foundValues = unmarshalValues(iterator.RawValue()) return foundValues, false } @@ -627,7 +627,7 @@ func (t *TieredStorage) loadChunkAroundTime( return nil, false } - foundValues, _ = extractSampleValues(iterator) + foundValues = unmarshalValues(iterator.RawValue()) return foundValues, false } } diff --git a/storage/raw/interface.go b/storage/raw/interface.go index 30655d2cf..5c1328cf5 100644 --- a/storage/raw/interface.go +++ b/storage/raw/interface.go @@ -67,6 +67,8 @@ type Persistence interface { Drop(key proto.Message) error // Put sets the key to a given value. Put(key, value proto.Message) error + // PutRaw sets the key to a given raw bytes value. + PutRaw(key proto.Message, value []byte) error // Commit applies the Batch operations to the database. Commit(Batch) error } @@ -80,6 +82,8 @@ type Batch interface { Close() // Put follows the same protocol as Persistence.Put. Put(key, value proto.Message) + // PutRaw follows the same protocol as Persistence.PutRaw. + PutRaw(key proto.Message, value []byte) // Drop follows the same protocol as Persistence.Drop. Drop(key proto.Message) } diff --git a/storage/raw/leveldb/batch.go b/storage/raw/leveldb/batch.go index d5dafa961..c934cc427 100644 --- a/storage/raw/leveldb/batch.go +++ b/storage/raw/leveldb/batch.go @@ -64,7 +64,19 @@ func (b *batch) Put(key, value proto.Message) { b.batch.Put(keyBuf.Bytes(), valBuf.Bytes()) b.puts++ +} +func (b *batch) PutRaw(key proto.Message, value []byte) { + keyBuf, _ := buffers.Get() + defer buffers.Give(keyBuf) + + if err := keyBuf.Marshal(key); err != nil { + panic(err) + } + + b.batch.Put(keyBuf.Bytes(), value) + + b.puts++ } func (b *batch) Close() { diff --git a/storage/raw/leveldb/iterator.go b/storage/raw/leveldb/iterator.go index 0992536c1..1ed91568c 100644 --- a/storage/raw/leveldb/iterator.go +++ b/storage/raw/leveldb/iterator.go @@ -33,10 +33,9 @@ type Iterator interface { Previous() bool Key(proto.Message) error - Value(proto.Message) error + RawValue() []byte Close() error rawKey() []byte - rawValue() []byte } diff --git a/storage/raw/leveldb/leveldb.go b/storage/raw/leveldb/leveldb.go index 95db1f9a4..6b049537d 100644 --- a/storage/raw/leveldb/leveldb.go +++ b/storage/raw/leveldb/leveldb.go @@ -163,10 +163,6 @@ func (i *levigoIterator) rawKey() (key []byte) { return i.iterator.Key() } -func (i *levigoIterator) rawValue() (value []byte) { - return i.iterator.Value() -} - func (i *levigoIterator) Error() (err error) { return i.iterator.GetError() } @@ -180,13 +176,8 @@ func (i *levigoIterator) Key(m proto.Message) error { return buf.Unmarshal(m) } -func (i *levigoIterator) Value(m proto.Message) error { - buf, _ := buffers.Get() - defer buffers.Give(buf) - - buf.SetBuf(i.iterator.Value()) - - return buf.Unmarshal(m) +func (i *levigoIterator) RawValue() []byte { + return i.iterator.Value() } func (i *levigoIterator) Valid() bool { @@ -373,6 +364,18 @@ func (l *LevelDBPersistence) Put(k, v proto.Message) error { return l.storage.Put(l.writeOptions, keyBuf.Bytes(), valBuf.Bytes()) } +// PutRaw implements raw.Persistence. +func (l *LevelDBPersistence) PutRaw(key proto.Message, value []byte) error { + keyBuf, _ := buffers.Get() + defer buffers.Give(keyBuf) + + if err := keyBuf.Marshal(key); err != nil { + panic(err) + } + + return l.storage.Put(l.writeOptions, keyBuf.Bytes(), value) +} + // Commit implements raw.Persistence. func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) { // XXX: This is a wart to clean up later. Ideally, after doing @@ -492,7 +495,7 @@ func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter stora if decodeErr != nil { continue } - decodedValue, decodeErr := decoder.DecodeValue(iterator.rawValue()) + decodedValue, decodeErr := decoder.DecodeValue(iterator.RawValue()) if decodeErr != nil { continue } diff --git a/storage/raw/leveldb/test/fixtures.go b/storage/raw/leveldb/test/fixtures.go index e680050e3..b9121ba8c 100644 --- a/storage/raw/leveldb/test/fixtures.go +++ b/storage/raw/leveldb/test/fixtures.go @@ -26,7 +26,7 @@ type ( // Pair models a prospective (key, value) double that will be committed // to a database. Pair interface { - Get() (key, value proto.Message) + Get() (key proto.Message, value interface{}) } // Pairs models a list of Pair for disk committing. @@ -46,7 +46,7 @@ type ( // fixture data to build. HasNext() (has bool) // Next emits the next (key, value) double for storage. - Next() (key, value proto.Message) + Next() (key proto.Message, value interface{}) } preparer struct { @@ -76,7 +76,14 @@ func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory for f.HasNext() { key, value := f.Next() - err = persistence.Put(key, value) + switch v := value.(type) { + case proto.Message: + err = persistence.Put(key, v) + case []byte: + err = persistence.PutRaw(key, v) + default: + panic("illegal value type") + } if err != nil { defer t.Close() p.tester.Fatal(err) @@ -92,7 +99,7 @@ func (f cassetteFactory) HasNext() bool { } // Next implements FixtureFactory. -func (f *cassetteFactory) Next() (key, value proto.Message) { +func (f *cassetteFactory) Next() (key proto.Message, value interface{}) { key, value = f.pairs[f.index].Get() f.index++