2012-11-26 19:11:34 +00:00
// Copyright 2012 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2012-11-26 18:56:51 +00:00
package leveldb
2012-11-24 11:33:34 +00:00
import (
2013-01-27 19:28:37 +00:00
"flag"
2012-11-24 11:33:34 +00:00
"github.com/jmhodges/levigo"
2013-01-27 17:49:45 +00:00
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/storage/raw"
2012-11-24 11:33:34 +00:00
"io"
)
2013-01-27 19:28:37 +00:00
var (
2013-02-01 12:35:07 +00:00
leveldbFlushOnMutate = flag . Bool ( "leveldbFlushOnMutate" , true , "Whether LevelDB should flush every operation to disk upon mutation before returning (bool)." )
leveldbUseSnappy = flag . Bool ( "leveldbUseSnappy" , true , "Whether LevelDB attempts to use Snappy for compressing elements (bool)." )
leveldbUseParanoidChecks = flag . Bool ( "leveldbUseParanoidChecks" , true , "Whether LevelDB uses expensive checks (bool)." )
2013-01-27 19:28:37 +00:00
)
2012-11-28 19:22:49 +00:00
type LevelDBPersistence struct {
2012-11-24 11:33:34 +00:00
cache * levigo . Cache
filterPolicy * levigo . FilterPolicy
options * levigo . Options
storage * levigo . DB
readOptions * levigo . ReadOptions
writeOptions * levigo . WriteOptions
}
2012-11-26 18:56:51 +00:00
type iteratorCloser struct {
iterator * levigo . Iterator
readOptions * levigo . ReadOptions
snapshot * levigo . Snapshot
storage * levigo . DB
}
2012-12-25 12:50:36 +00:00
func NewLevelDBPersistence ( storageRoot string , cacheCapacity , bitsPerBloomFilterEncoded int ) ( p * LevelDBPersistence , err error ) {
2012-11-24 11:33:34 +00:00
options := levigo . NewOptions ( )
options . SetCreateIfMissing ( true )
2013-02-01 12:35:07 +00:00
options . SetParanoidChecks ( * leveldbUseParanoidChecks )
compression := levigo . NoCompression
if * leveldbUseSnappy {
compression = levigo . SnappyCompression
}
options . SetCompression ( compression )
2012-11-24 11:33:34 +00:00
cache := levigo . NewLRUCache ( cacheCapacity )
options . SetCache ( cache )
filterPolicy := levigo . NewBloomFilter ( bitsPerBloomFilterEncoded )
options . SetFilterPolicy ( filterPolicy )
2012-12-25 12:50:36 +00:00
storage , err := levigo . Open ( storageRoot , options )
if err != nil {
return
}
2012-11-24 11:33:34 +00:00
readOptions := levigo . NewReadOptions ( )
writeOptions := levigo . NewWriteOptions ( )
2013-01-27 19:28:37 +00:00
writeOptions . SetSync ( * leveldbFlushOnMutate )
2012-12-25 12:50:36 +00:00
p = & LevelDBPersistence {
2012-11-24 11:33:34 +00:00
cache : cache ,
filterPolicy : filterPolicy ,
options : options ,
readOptions : readOptions ,
storage : storage ,
writeOptions : writeOptions ,
}
2012-12-25 12:50:36 +00:00
return
2012-11-24 11:33:34 +00:00
}
2012-12-25 12:50:36 +00:00
func ( l * LevelDBPersistence ) Close ( ) ( err error ) {
// These are deferred to take advantage of forced closing in case of stack
// unwinding due to anomalies.
defer func ( ) {
if l . storage != nil {
l . storage . Close ( )
}
} ( )
2012-11-24 11:33:34 +00:00
defer func ( ) {
if l . filterPolicy != nil {
l . filterPolicy . Close ( )
}
} ( )
defer func ( ) {
if l . cache != nil {
l . cache . Close ( )
}
} ( )
defer func ( ) {
if l . options != nil {
l . options . Close ( )
}
} ( )
defer func ( ) {
if l . readOptions != nil {
l . readOptions . Close ( )
}
} ( )
defer func ( ) {
if l . writeOptions != nil {
l . writeOptions . Close ( )
}
} ( )
2012-12-25 12:50:36 +00:00
return
2012-11-24 11:33:34 +00:00
}
2012-12-25 12:50:36 +00:00
func ( l * LevelDBPersistence ) Get ( value coding . Encoder ) ( b [ ] byte , err error ) {
key , err := value . Encode ( )
if err != nil {
return
2012-11-24 11:33:34 +00:00
}
2012-12-25 12:50:36 +00:00
return l . storage . Get ( l . readOptions , key )
2012-11-24 11:33:34 +00:00
}
2012-12-25 12:50:36 +00:00
func ( l * LevelDBPersistence ) Has ( value coding . Encoder ) ( h bool , err error ) {
raw , err := l . Get ( value )
if err != nil {
return
2012-11-24 11:33:34 +00:00
}
2012-12-25 12:50:36 +00:00
h = raw != nil
2012-11-24 11:33:34 +00:00
2012-12-25 12:50:36 +00:00
return
}
2012-11-24 11:33:34 +00:00
2012-12-25 12:50:36 +00:00
func ( l * LevelDBPersistence ) Drop ( value coding . Encoder ) ( err error ) {
key , err := value . Encode ( )
if err != nil {
return
2012-11-24 11:33:34 +00:00
}
2012-12-25 12:50:36 +00:00
err = l . storage . Delete ( l . writeOptions , key )
return
2012-11-24 11:33:34 +00:00
}
2012-12-25 12:50:36 +00:00
func ( l * LevelDBPersistence ) Put ( key , value coding . Encoder ) ( err error ) {
keyEncoded , err := key . Encode ( )
if err != nil {
return
}
2012-11-24 11:33:34 +00:00
2012-12-25 12:50:36 +00:00
valueEncoded , err := value . Encode ( )
if err != nil {
return
2012-11-24 11:33:34 +00:00
}
2012-12-25 12:50:36 +00:00
err = l . storage . Put ( l . writeOptions , keyEncoded , valueEncoded )
return
2012-11-24 11:33:34 +00:00
}
2012-12-25 12:50:36 +00:00
func ( l * LevelDBPersistence ) GetAll ( ) ( pairs [ ] raw . Pair , err error ) {
2012-11-24 11:33:34 +00:00
snapshot := l . storage . NewSnapshot ( )
defer l . storage . ReleaseSnapshot ( snapshot )
readOptions := levigo . NewReadOptions ( )
defer readOptions . Close ( )
readOptions . SetSnapshot ( snapshot )
iterator := l . storage . NewIterator ( readOptions )
defer iterator . Close ( )
iterator . SeekToFirst ( )
for iterator := iterator ; iterator . Valid ( ) ; iterator . Next ( ) {
2012-12-25 12:50:36 +00:00
pairs = append ( pairs , raw . Pair { Left : iterator . Key ( ) , Right : iterator . Value ( ) } )
2012-11-24 11:33:34 +00:00
2012-12-25 12:50:36 +00:00
err = iterator . GetError ( )
if err != nil {
return
}
2012-11-24 11:33:34 +00:00
}
2012-12-25 12:50:36 +00:00
return
2012-11-24 11:33:34 +00:00
}
2012-12-25 12:50:36 +00:00
func ( i * iteratorCloser ) Close ( ) ( err error ) {
2012-11-24 11:33:34 +00:00
defer func ( ) {
if i . storage != nil {
if i . snapshot != nil {
i . storage . ReleaseSnapshot ( i . snapshot )
}
}
} ( )
defer func ( ) {
if i . iterator != nil {
i . iterator . Close ( )
}
} ( )
defer func ( ) {
if i . readOptions != nil {
i . readOptions . Close ( )
}
} ( )
2012-12-25 12:50:36 +00:00
return
2012-11-24 11:33:34 +00:00
}
2012-12-25 12:50:36 +00:00
func ( l * LevelDBPersistence ) GetIterator ( ) ( i * levigo . Iterator , c io . Closer , err error ) {
2012-11-24 11:33:34 +00:00
snapshot := l . storage . NewSnapshot ( )
readOptions := levigo . NewReadOptions ( )
readOptions . SetSnapshot ( snapshot )
2012-12-25 12:50:36 +00:00
i = l . storage . NewIterator ( readOptions )
2012-11-24 11:33:34 +00:00
2012-12-25 12:50:36 +00:00
c = & iteratorCloser {
iterator : i ,
2012-11-24 11:33:34 +00:00
readOptions : readOptions ,
snapshot : snapshot ,
storage : l . storage ,
}
2012-12-25 12:50:36 +00:00
return
2012-11-24 11:33:34 +00:00
}