diff --git a/storage/metric/iterator.go b/storage/metric/iterator.go index 81a40d1cb..ce386088d 100644 --- a/storage/metric/iterator.go +++ b/storage/metric/iterator.go @@ -20,7 +20,3 @@ type Iterator interface { Key() interface{} Value() interface{} } - -type IteratorManager interface { - Iterator() Iterator -} diff --git a/storage/raw/index/leveldb/leveldb.go b/storage/raw/index/leveldb/leveldb.go index 1542a22cf..e00152d3d 100644 --- a/storage/raw/index/leveldb/leveldb.go +++ b/storage/raw/index/leveldb/leveldb.go @@ -16,6 +16,7 @@ package leveldb import ( "github.com/prometheus/prometheus/coding" dto "github.com/prometheus/prometheus/model/generated" + "github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw/leveldb" ) @@ -57,6 +58,6 @@ func NewLevelDBMembershipIndex(storageRoot string, cacheCapacity, bitsPerBloomFi return } -func (l *LevelDBMembershipIndex) Commit(batch leveldb.Batch) error { +func (l *LevelDBMembershipIndex) Commit(batch raw.Batch) error { return l.persistence.Commit(batch) } diff --git a/storage/raw/interface.go b/storage/raw/interface.go index c552710fd..af776d59f 100644 --- a/storage/raw/interface.go +++ b/storage/raw/interface.go @@ -16,6 +16,7 @@ package raw import ( "github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/storage" + "io" ) type Pair struct { @@ -25,13 +26,20 @@ type Pair struct { type EachFunc func(pair *Pair) +// Persistence models a key-value store for bytes that supports various +// additional operations. type Persistence interface { - Has(key coding.Encoder) (bool, error) - Get(key coding.Encoder) ([]byte, error) - Drop(key coding.Encoder) error - Put(key, value coding.Encoder) error - Close() error + io.Closer + // Has informs the user whether a given key exists in the database. + Has(key coding.Encoder) (bool, error) + // Get retrieves the key from the database if it exists or returns nil if + // it is absent. + Get(key coding.Encoder) ([]byte, error) + // Drop removes the key from the database. + Drop(key coding.Encoder) error + // Put sets the key to a given value. + Put(key, value coding.Encoder) error // ForEach is responsible for iterating through all records in the database // until one of the following conditions are met: // @@ -41,7 +49,20 @@ type Persistence interface { // // Decoding errors for an entity cause that entity to be skipped. ForEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) - + // Commit applies the Batch operations to the database. + Commit(Batch) error // Pending removal. GetAll() ([]Pair, error) } + +// Batch models a pool of mutations for the database that can be committed +// en masse. The interface implies no protocol around the atomicity of +// effectuation. +type Batch interface { + io.Closer + + // Put follows the same protocol as Persistence.Put. + Put(key, value coding.Encoder) + // Drop follows the same protocol as Persistence.Drop. + Drop(key coding.Encoder) +} diff --git a/storage/raw/leveldb/batch.go b/storage/raw/leveldb/batch.go new file mode 100644 index 000000000..4af34d9f5 --- /dev/null +++ b/storage/raw/leveldb/batch.go @@ -0,0 +1,57 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leveldb + +import ( + "github.com/jmhodges/levigo" + "github.com/prometheus/prometheus/coding" +) + +type batch struct { + batch *levigo.WriteBatch +} + +func NewBatch() batch { + return batch{ + batch: levigo.NewWriteBatch(), + } +} + +func (b batch) Drop(key coding.Encoder) { + keyEncoded, err := key.Encode() + if err != nil { + panic(err) + } + + b.batch.Delete(keyEncoded) +} + +func (b batch) Put(key, value coding.Encoder) { + keyEncoded, err := key.Encode() + if err != nil { + panic(err) + } + valueEncoded, err := value.Encode() + if err != nil { + panic(err) + } + + b.batch.Put(keyEncoded, valueEncoded) +} + +func (b batch) Close() (err error) { + b.batch.Close() + + return +} diff --git a/storage/raw/leveldb/leveldb.go b/storage/raw/leveldb/leveldb.go index acf6f4427..a85b9f17b 100644 --- a/storage/raw/leveldb/leveldb.go +++ b/storage/raw/leveldb/leveldb.go @@ -172,8 +172,17 @@ func (l *LevelDBPersistence) Put(key, value coding.Encoder) (err error) { return } -func (l *LevelDBPersistence) Commit(b Batch) (err error) { - return l.storage.Write(l.writeOptions, b.(batch).batch) +func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) { + // XXX: This is a wart to clean up later. Ideally, after doing extensive + // tests, we could create a Batch struct that journals pending + // operations which the given Persistence implementation could convert + // to its specific commit requirements. + batch, ok := b.(batch) + if !ok { + panic("leveldb.batch expected") + } + + return l.storage.Write(l.writeOptions, batch.batch) } func (l *LevelDBPersistence) GetAll() (pairs []raw.Pair, err error) { @@ -229,6 +238,8 @@ func (l *LevelDBPersistence) GetIterator() (i *levigo.Iterator, c io.Closer, err readOptions.SetSnapshot(snapshot) i = l.storage.NewIterator(readOptions) + // TODO: Kill the return of an additional io.Closer and just use a decorated + // iterator interface. c = &iteratorCloser{ iterator: i, readOptions: readOptions, @@ -279,47 +290,3 @@ func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter stora scannedEntireCorpus = true return } - -// Batch encapsulates a list of mutations to occur to the datastore. It must -// be closed once done. -type Batch interface { - Delete(coding.Encoder) - Put(coding.Encoder, coding.Encoder) - Close() -} - -func NewBatch() Batch { - return batch{ - batch: levigo.NewWriteBatch(), - } -} - -type batch struct { - batch *levigo.WriteBatch -} - -func (b batch) Delete(key coding.Encoder) { - keyEncoded, err := key.Encode() - if err != nil { - panic(err) - } - - b.batch.Delete(keyEncoded) -} - -func (b batch) Put(key, value coding.Encoder) { - keyEncoded, err := key.Encode() - if err != nil { - panic(err) - } - valueEncoded, err := value.Encode() - if err != nil { - panic(err) - } - - b.batch.Put(keyEncoded, valueEncoded) -} - -func (b batch) Close() { - b.batch.Close() -}