444 lines
10 KiB
Go
444 lines
10 KiB
Go
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
|
|
// All rights reserved.
|
|
//
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file.
|
|
|
|
package leveldb
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/memdb"
|
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
|
"github.com/syndtr/goleveldb/leveldb/util"
|
|
)
|
|
|
|
func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error {
|
|
wr, err := db.journal.Next()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := writeBatchesWithHeader(wr, batches, seq); err != nil {
|
|
return err
|
|
}
|
|
if err := db.journal.Flush(); err != nil {
|
|
return err
|
|
}
|
|
if sync {
|
|
return db.journalWriter.Sync()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
|
|
// Wait for pending memdb compaction.
|
|
err = db.compTriggerWait(db.mcompCmdC)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// Create new memdb and journal.
|
|
mem, err = db.newMem(n)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// Schedule memdb compaction.
|
|
if wait {
|
|
err = db.compTriggerWait(db.mcompCmdC)
|
|
} else {
|
|
db.compTrigger(db.mcompCmdC)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
|
|
delayed := false
|
|
slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger()
|
|
pauseTrigger := db.s.o.GetWriteL0PauseTrigger()
|
|
flush := func() (retry bool) {
|
|
mdb = db.getEffectiveMem()
|
|
if mdb == nil {
|
|
err = ErrClosed
|
|
return false
|
|
}
|
|
defer func() {
|
|
if retry {
|
|
mdb.decref()
|
|
mdb = nil
|
|
}
|
|
}()
|
|
tLen := db.s.tLen(0)
|
|
mdbFree = mdb.Free()
|
|
switch {
|
|
case tLen >= slowdownTrigger && !delayed:
|
|
delayed = true
|
|
time.Sleep(time.Millisecond)
|
|
case mdbFree >= n:
|
|
return false
|
|
case tLen >= pauseTrigger:
|
|
delayed = true
|
|
err = db.compTriggerWait(db.tcompCmdC)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
default:
|
|
// Allow memdb to grow if it has no entry.
|
|
if mdb.Len() == 0 {
|
|
mdbFree = n
|
|
} else {
|
|
mdb.decref()
|
|
mdb, err = db.rotateMem(n, false)
|
|
if err == nil {
|
|
mdbFree = mdb.Free()
|
|
} else {
|
|
mdbFree = 0
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
start := time.Now()
|
|
for flush() {
|
|
}
|
|
if delayed {
|
|
db.writeDelay += time.Since(start)
|
|
db.writeDelayN++
|
|
} else if db.writeDelayN > 0 {
|
|
db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
|
|
db.writeDelay = 0
|
|
db.writeDelayN = 0
|
|
}
|
|
return
|
|
}
|
|
|
|
type writeMerge struct {
|
|
sync bool
|
|
batch *Batch
|
|
keyType keyType
|
|
key, value []byte
|
|
}
|
|
|
|
func (db *DB) unlockWrite(overflow bool, merged int, err error) {
|
|
for i := 0; i < merged; i++ {
|
|
db.writeAckC <- err
|
|
}
|
|
if overflow {
|
|
// Pass lock to the next write (that failed to merge).
|
|
db.writeMergedC <- false
|
|
} else {
|
|
// Release lock.
|
|
<-db.writeLockC
|
|
}
|
|
}
|
|
|
|
// ourBatch if defined should equal with batch.
|
|
func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
|
|
// Try to flush memdb. This method would also trying to throttle writes
|
|
// if it is too fast and compaction cannot catch-up.
|
|
mdb, mdbFree, err := db.flush(batch.internalLen)
|
|
if err != nil {
|
|
db.unlockWrite(false, 0, err)
|
|
return err
|
|
}
|
|
defer mdb.decref()
|
|
|
|
var (
|
|
overflow bool
|
|
merged int
|
|
batches = []*Batch{batch}
|
|
)
|
|
|
|
if merge {
|
|
// Merge limit.
|
|
var mergeLimit int
|
|
if batch.internalLen > 128<<10 {
|
|
mergeLimit = (1 << 20) - batch.internalLen
|
|
} else {
|
|
mergeLimit = 128 << 10
|
|
}
|
|
mergeCap := mdbFree - batch.internalLen
|
|
if mergeLimit > mergeCap {
|
|
mergeLimit = mergeCap
|
|
}
|
|
|
|
merge:
|
|
for mergeLimit > 0 {
|
|
select {
|
|
case incoming := <-db.writeMergeC:
|
|
if incoming.batch != nil {
|
|
// Merge batch.
|
|
if incoming.batch.internalLen > mergeLimit {
|
|
overflow = true
|
|
break merge
|
|
}
|
|
batches = append(batches, incoming.batch)
|
|
mergeLimit -= incoming.batch.internalLen
|
|
} else {
|
|
// Merge put.
|
|
internalLen := len(incoming.key) + len(incoming.value) + 8
|
|
if internalLen > mergeLimit {
|
|
overflow = true
|
|
break merge
|
|
}
|
|
if ourBatch == nil {
|
|
ourBatch = db.batchPool.Get().(*Batch)
|
|
ourBatch.Reset()
|
|
batches = append(batches, ourBatch)
|
|
}
|
|
// We can use same batch since concurrent write doesn't
|
|
// guarantee write order.
|
|
ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
|
|
mergeLimit -= internalLen
|
|
}
|
|
sync = sync || incoming.sync
|
|
merged++
|
|
db.writeMergedC <- true
|
|
|
|
default:
|
|
break merge
|
|
}
|
|
}
|
|
}
|
|
|
|
// Seq number.
|
|
seq := db.seq + 1
|
|
|
|
// Write journal.
|
|
if err := db.writeJournal(batches, seq, sync); err != nil {
|
|
db.unlockWrite(overflow, merged, err)
|
|
return err
|
|
}
|
|
|
|
// Put batches.
|
|
for _, batch := range batches {
|
|
if err := batch.putMem(seq, mdb.DB); err != nil {
|
|
panic(err)
|
|
}
|
|
seq += uint64(batch.Len())
|
|
}
|
|
|
|
// Incr seq number.
|
|
db.addSeq(uint64(batchesLen(batches)))
|
|
|
|
// Rotate memdb if it's reach the threshold.
|
|
if batch.internalLen >= mdbFree {
|
|
db.rotateMem(0, false)
|
|
}
|
|
|
|
db.unlockWrite(overflow, merged, nil)
|
|
return nil
|
|
}
|
|
|
|
// Write apply the given batch to the DB. The batch records will be applied
|
|
// sequentially. Write might be used concurrently, when used concurrently and
|
|
// batch is small enough, write will try to merge the batches. Set NoWriteMerge
|
|
// option to true to disable write merge.
|
|
//
|
|
// It is safe to modify the contents of the arguments after Write returns but
|
|
// not before. Write will not modify content of the batch.
|
|
func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error {
|
|
if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 {
|
|
return err
|
|
}
|
|
|
|
// If the batch size is larger than write buffer, it may justified to write
|
|
// using transaction instead. Using transaction the batch will be written
|
|
// into tables directly, skipping the journaling.
|
|
if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
|
|
tr, err := db.OpenTransaction()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := tr.Write(batch, wo); err != nil {
|
|
tr.Discard()
|
|
return err
|
|
}
|
|
return tr.Commit()
|
|
}
|
|
|
|
merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
|
|
sync := wo.GetSync() && !db.s.o.GetNoSync()
|
|
|
|
// Acquire write lock.
|
|
if merge {
|
|
select {
|
|
case db.writeMergeC <- writeMerge{sync: sync, batch: batch}:
|
|
if <-db.writeMergedC {
|
|
// Write is merged.
|
|
return <-db.writeAckC
|
|
}
|
|
// Write is not merged, the write lock is handed to us. Continue.
|
|
case db.writeLockC <- struct{}{}:
|
|
// Write lock acquired.
|
|
case err := <-db.compPerErrC:
|
|
// Compaction error.
|
|
return err
|
|
case <-db.closeC:
|
|
// Closed
|
|
return ErrClosed
|
|
}
|
|
} else {
|
|
select {
|
|
case db.writeLockC <- struct{}{}:
|
|
// Write lock acquired.
|
|
case err := <-db.compPerErrC:
|
|
// Compaction error.
|
|
return err
|
|
case <-db.closeC:
|
|
// Closed
|
|
return ErrClosed
|
|
}
|
|
}
|
|
|
|
return db.writeLocked(batch, nil, merge, sync)
|
|
}
|
|
|
|
func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
|
|
if err := db.ok(); err != nil {
|
|
return err
|
|
}
|
|
|
|
merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
|
|
sync := wo.GetSync() && !db.s.o.GetNoSync()
|
|
|
|
// Acquire write lock.
|
|
if merge {
|
|
select {
|
|
case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
|
|
if <-db.writeMergedC {
|
|
// Write is merged.
|
|
return <-db.writeAckC
|
|
}
|
|
// Write is not merged, the write lock is handed to us. Continue.
|
|
case db.writeLockC <- struct{}{}:
|
|
// Write lock acquired.
|
|
case err := <-db.compPerErrC:
|
|
// Compaction error.
|
|
return err
|
|
case <-db.closeC:
|
|
// Closed
|
|
return ErrClosed
|
|
}
|
|
} else {
|
|
select {
|
|
case db.writeLockC <- struct{}{}:
|
|
// Write lock acquired.
|
|
case err := <-db.compPerErrC:
|
|
// Compaction error.
|
|
return err
|
|
case <-db.closeC:
|
|
// Closed
|
|
return ErrClosed
|
|
}
|
|
}
|
|
|
|
batch := db.batchPool.Get().(*Batch)
|
|
batch.Reset()
|
|
batch.appendRec(kt, key, value)
|
|
return db.writeLocked(batch, batch, merge, sync)
|
|
}
|
|
|
|
// Put sets the value for the given key. It overwrites any previous value
|
|
// for that key; a DB is not a multi-map. Write merge also applies for Put, see
|
|
// Write.
|
|
//
|
|
// It is safe to modify the contents of the arguments after Put returns but not
|
|
// before.
|
|
func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
|
|
return db.putRec(keyTypeVal, key, value, wo)
|
|
}
|
|
|
|
// Delete deletes the value for the given key. Delete will not returns error if
|
|
// key doesn't exist. Write merge also applies for Delete, see Write.
|
|
//
|
|
// It is safe to modify the contents of the arguments after Delete returns but
|
|
// not before.
|
|
func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
|
|
return db.putRec(keyTypeDel, key, nil, wo)
|
|
}
|
|
|
|
func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
|
|
iter := mem.NewIterator(nil)
|
|
defer iter.Release()
|
|
return (max == nil || (iter.First() && icmp.uCompare(max, internalKey(iter.Key()).ukey()) >= 0)) &&
|
|
(min == nil || (iter.Last() && icmp.uCompare(min, internalKey(iter.Key()).ukey()) <= 0))
|
|
}
|
|
|
|
// CompactRange compacts the underlying DB for the given key range.
|
|
// In particular, deleted and overwritten versions are discarded,
|
|
// and the data is rearranged to reduce the cost of operations
|
|
// needed to access the data. This operation should typically only
|
|
// be invoked by users who understand the underlying implementation.
|
|
//
|
|
// A nil Range.Start is treated as a key before all keys in the DB.
|
|
// And a nil Range.Limit is treated as a key after all keys in the DB.
|
|
// Therefore if both is nil then it will compact entire DB.
|
|
func (db *DB) CompactRange(r util.Range) error {
|
|
if err := db.ok(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Lock writer.
|
|
select {
|
|
case db.writeLockC <- struct{}{}:
|
|
case err := <-db.compPerErrC:
|
|
return err
|
|
case <-db.closeC:
|
|
return ErrClosed
|
|
}
|
|
|
|
// Check for overlaps in memdb.
|
|
mdb := db.getEffectiveMem()
|
|
if mdb == nil {
|
|
return ErrClosed
|
|
}
|
|
defer mdb.decref()
|
|
if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
|
|
// Memdb compaction.
|
|
if _, err := db.rotateMem(0, false); err != nil {
|
|
<-db.writeLockC
|
|
return err
|
|
}
|
|
<-db.writeLockC
|
|
if err := db.compTriggerWait(db.mcompCmdC); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
<-db.writeLockC
|
|
}
|
|
|
|
// Table compaction.
|
|
return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit)
|
|
}
|
|
|
|
// SetReadOnly makes DB read-only. It will stay read-only until reopened.
|
|
func (db *DB) SetReadOnly() error {
|
|
if err := db.ok(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Lock writer.
|
|
select {
|
|
case db.writeLockC <- struct{}{}:
|
|
db.compWriteLocking = true
|
|
case err := <-db.compPerErrC:
|
|
return err
|
|
case <-db.closeC:
|
|
return ErrClosed
|
|
}
|
|
|
|
// Set compaction read-only.
|
|
select {
|
|
case db.compErrSetC <- ErrReadOnly:
|
|
case perr := <-db.compPerErrC:
|
|
return perr
|
|
case <-db.closeC:
|
|
return ErrClosed
|
|
}
|
|
|
|
return nil
|
|
}
|