mirror of
https://github.com/ceph/ceph
synced 2024-12-17 08:57:28 +00:00
Merge pull request #9933 from chhabaramesh/master
kv: In memory keyvalue db implementation Reviewed-by: Sage Weil <sage@redhat.com> Reviewed-by: Mark Nelson <mnelson@redhat.com>
This commit is contained in:
commit
c5e5ef7179
@ -155,6 +155,7 @@ SUBSYS(bdev, 1, 3)
|
||||
SUBSYS(kstore, 1, 5)
|
||||
SUBSYS(rocksdb, 4, 5)
|
||||
SUBSYS(leveldb, 4, 5)
|
||||
SUBSYS(memdb, 4, 5)
|
||||
SUBSYS(kinetic, 1, 5)
|
||||
SUBSYS(fuse, 1, 5)
|
||||
|
||||
|
@ -1018,4 +1018,32 @@ inline void decode(std::deque<T>& ls, bufferlist::iterator& p)
|
||||
bl.advance(struct_end - bl.get_off()); \
|
||||
}
|
||||
|
||||
/*
|
||||
* Encoders/decoders to read from current offset in a file handle and
|
||||
* encode/decode the data according to argument types.
|
||||
*/
|
||||
inline ssize_t decode_file(int fd, std::string &str)
|
||||
{
|
||||
bufferlist bl;
|
||||
__u32 len = 0;
|
||||
bl.read_fd(fd, sizeof(len));
|
||||
decode(len, bl);
|
||||
bl.read_fd(fd, len);
|
||||
decode(str, bl);
|
||||
return bl.length();
|
||||
}
|
||||
|
||||
inline ssize_t decode_file(int fd, bufferptr &bp)
|
||||
{
|
||||
bufferlist bl;
|
||||
__u32 len = 0;
|
||||
bl.read_fd(fd, sizeof(len));
|
||||
decode(len, bl);
|
||||
bl.read_fd(fd, len);
|
||||
bufferlist::iterator bli = bl.begin();
|
||||
|
||||
decode(bp, bli);
|
||||
return bl.length();
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -1,6 +1,7 @@
|
||||
set(kv_srcs
|
||||
KeyValueDB.cc
|
||||
LevelDBStore.cc
|
||||
MemDB.cc
|
||||
RocksDBStore.cc)
|
||||
add_library(kv_objs OBJECT ${kv_srcs})
|
||||
add_library(kv STATIC $<TARGET_OBJECTS:kv_objs>)
|
||||
|
@ -3,6 +3,7 @@
|
||||
|
||||
#include "KeyValueDB.h"
|
||||
#include "LevelDBStore.h"
|
||||
#include "MemDB.h"
|
||||
#ifdef HAVE_LIBROCKSDB
|
||||
#include "RocksDBStore.h"
|
||||
#endif
|
||||
@ -29,6 +30,11 @@ KeyValueDB *KeyValueDB::create(CephContext *cct, const string& type,
|
||||
return new RocksDBStore(cct, dir, p);
|
||||
}
|
||||
#endif
|
||||
|
||||
if ((type == "memdb") &&
|
||||
cct->check_experimental_feature_enabled("memdb")) {
|
||||
return new MemDB(cct, dir, p);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -47,5 +53,9 @@ int KeyValueDB::test_init(const string& type, const string& dir)
|
||||
return RocksDBStore::_test_init(dir);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (type == "memdb") {
|
||||
return MemDB::_test_init(dir);
|
||||
}
|
||||
return -EINVAL;
|
||||
}
|
||||
|
@ -124,6 +124,7 @@ public:
|
||||
virtual int init(string option_str="") = 0;
|
||||
virtual int open(std::ostream &out) = 0;
|
||||
virtual int create_and_open(std::ostream &out) = 0;
|
||||
virtual void close() { }
|
||||
|
||||
virtual Transaction get_transaction() = 0;
|
||||
virtual int submit_transaction(Transaction) = 0;
|
||||
|
@ -51,4 +51,8 @@ libkv_a_LIBADD += -lkinetic_client -lprotobuf -lglog -lgflags libcrypto.a
|
||||
noinst_HEADERS += kv/KineticStore.h
|
||||
endif
|
||||
|
||||
libkv_a_SOURCES += kv/MemDB.cc
|
||||
noinst_HEADERS += kv/MemDB.h
|
||||
|
||||
|
||||
endif # ENABLE_SERVER
|
||||
|
528
src/kv/MemDB.cc
Normal file
528
src/kv/MemDB.cc
Normal file
@ -0,0 +1,528 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
/*
|
||||
* In-memory crash non-safe keyvalue db
|
||||
* Author: Ramesh Chander, Ramesh.Chander@sandisk.com
|
||||
*/
|
||||
|
||||
#include <set>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <memory>
|
||||
#include <errno.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
|
||||
using std::string;
|
||||
#include "common/perf_counters.h"
|
||||
#include "common/debug.h"
|
||||
#include "include/str_list.h"
|
||||
#include "include/str_map.h"
|
||||
#include "KeyValueDB.h"
|
||||
#include "MemDB.h"
|
||||
|
||||
#include "include/assert.h"
|
||||
#include "common/debug.h"
|
||||
#include "common/errno.h"
|
||||
|
||||
#define dout_subsys ceph_subsys_memdb
|
||||
#undef dout_prefix
|
||||
#define dout_prefix *_dout << "memdb: "
|
||||
#define dtrace dout(30)
|
||||
#define dwarn dout(0)
|
||||
#define dinfo dout(0)
|
||||
|
||||
static void split_key(const string& raw_key, string *prefix, string *key)
|
||||
{
|
||||
size_t pos = raw_key.find(KEY_DELIM, 0);
|
||||
*prefix = raw_key.substr(0, pos);
|
||||
*key = raw_key.substr(pos + 1, raw_key.length());
|
||||
}
|
||||
|
||||
static string make_key(const string &prefix, const string &value)
|
||||
{
|
||||
string out = prefix;
|
||||
out.push_back(KEY_DELIM);
|
||||
out.append(value);
|
||||
return out;
|
||||
}
|
||||
|
||||
void MemDB::_encode(btree::btree_map<string,
|
||||
bufferptr>:: iterator iter, bufferlist &bl)
|
||||
{
|
||||
::encode(iter->first, bl);
|
||||
::encode(iter->second, bl);
|
||||
}
|
||||
|
||||
std::string MemDB::_get_data_fn()
|
||||
{
|
||||
string fn = m_db_path + "/" + "MemDB.db";
|
||||
return fn;
|
||||
}
|
||||
|
||||
void MemDB::_save()
|
||||
{
|
||||
std::lock_guard<std::mutex> l(m_lock);
|
||||
dout(10) << __func__ << " Saving MemDB to file: "<< _get_data_fn().c_str() << dendl;
|
||||
int mode = 0644;
|
||||
int fd = TEMP_FAILURE_RETRY(::open(_get_data_fn().c_str(),
|
||||
O_WRONLY|O_CREAT|O_TRUNC, mode));
|
||||
if (fd < 0) {
|
||||
int err = errno;
|
||||
cerr << "write_file(" << _get_data_fn().c_str() << "): failed to open file: "
|
||||
<< cpp_strerror(err) << std::endl;
|
||||
return;
|
||||
}
|
||||
btree::btree_map<string, bufferptr>::iterator iter = m_btree.begin();
|
||||
while (iter != m_btree.end()) {
|
||||
bufferlist bl;
|
||||
dout(10) << __func__ << " Key:"<< iter->first << dendl;
|
||||
_encode(iter, bl);
|
||||
bl.write_fd(fd);
|
||||
iter++;
|
||||
}
|
||||
|
||||
::close(fd);
|
||||
}
|
||||
|
||||
void MemDB::_load()
|
||||
{
|
||||
std::lock_guard<std::mutex> l(m_lock);
|
||||
dout(10) << __func__ << " Reading MemDB from file: "<< _get_data_fn().c_str() << dendl;
|
||||
/*
|
||||
* Open file and read it in single shot.
|
||||
*/
|
||||
int fd = TEMP_FAILURE_RETRY(::open(_get_data_fn().c_str(), O_RDONLY));
|
||||
if (fd < 0) {
|
||||
std::ostringstream oss;
|
||||
oss << "can't open " << _get_data_fn().c_str() << ": " << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
struct stat st;
|
||||
memset(&st, 0, sizeof(st));
|
||||
if (::fstat(fd, &st) < 0) {
|
||||
std::ostringstream oss;
|
||||
oss << "can't stat file " << _get_data_fn().c_str() << ": " << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
ssize_t file_size = st.st_size;
|
||||
ssize_t bytes_done = 0;
|
||||
while (bytes_done < file_size) {
|
||||
string key;
|
||||
bufferptr datap;
|
||||
|
||||
bytes_done += ::decode_file(fd, key);
|
||||
bytes_done += ::decode_file(fd, datap);
|
||||
|
||||
dout(10) << __func__ << " Key:"<< key << dendl;
|
||||
m_btree[key] = datap;
|
||||
}
|
||||
::close(fd);
|
||||
}
|
||||
|
||||
int MemDB::_init(bool create)
|
||||
{
|
||||
dout(1) << __func__ << dendl;
|
||||
if (create) {
|
||||
int r = ::mkdir(m_db_path.c_str(), 0700);
|
||||
if (r < 0) {
|
||||
if (r != EEXIST) {
|
||||
derr << __func__ << " mkdir failed: " << cpp_strerror(r) << dendl;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
_load();
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int MemDB::set_merge_operator(
|
||||
const string& prefix,
|
||||
std::shared_ptr<KeyValueDB::MergeOperator> mop)
|
||||
{
|
||||
merge_ops.push_back(std::make_pair(prefix, mop));
|
||||
return 0;
|
||||
}
|
||||
|
||||
int MemDB::do_open(ostream &out, bool create)
|
||||
{
|
||||
m_total_bytes = 0;
|
||||
m_allocated_bytes = 1;
|
||||
|
||||
return _init(create);
|
||||
}
|
||||
|
||||
MemDB::~MemDB()
|
||||
{
|
||||
close();
|
||||
dout(10) << __func__ << " Destroying MemDB instance: "<< dendl;
|
||||
}
|
||||
|
||||
void MemDB::close()
|
||||
{
|
||||
/*
|
||||
* Save whatever in memory btree.
|
||||
*/
|
||||
_save();
|
||||
}
|
||||
|
||||
int MemDB::submit_transaction(KeyValueDB::Transaction t)
|
||||
{
|
||||
MDBTransactionImpl* mt = static_cast<MDBTransactionImpl*>(t.get());
|
||||
|
||||
dtrace << __func__ << " " << mt->get_ops().size() << dendl;
|
||||
for(auto& op : mt->get_ops()) {
|
||||
if(op.first == MDBTransactionImpl::WRITE) {
|
||||
ms_op_t set_op = op.second;
|
||||
_setkey(set_op);
|
||||
} else if (op.first == MDBTransactionImpl::MERGE) {
|
||||
ms_op_t merge_op = op.second;
|
||||
_merge(merge_op);
|
||||
} else {
|
||||
ms_op_t rm_op = op.second;
|
||||
assert(op.first == MDBTransactionImpl::DELETE);
|
||||
_rmkey(rm_op);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int MemDB::submit_transaction_sync(KeyValueDB::Transaction tsync)
|
||||
{
|
||||
dtrace << __func__ << " " << dendl;
|
||||
submit_transaction(tsync);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int MemDB::transaction_rollback(KeyValueDB::Transaction t)
|
||||
{
|
||||
MDBTransactionImpl* mt = static_cast<MDBTransactionImpl*>(t.get());
|
||||
mt->clear();
|
||||
return 0;
|
||||
}
|
||||
|
||||
void MemDB::MDBTransactionImpl::set(
|
||||
const string &prefix, const string &k, const bufferlist &to_set_bl)
|
||||
{
|
||||
dtrace << __func__ << " " << prefix << " " << k << dendl;
|
||||
ops.push_back(make_pair(WRITE, std::make_pair(std::make_pair(prefix, k),
|
||||
to_set_bl)));
|
||||
}
|
||||
|
||||
void MemDB::MDBTransactionImpl::rmkey(const string &prefix,
|
||||
const string &k)
|
||||
{
|
||||
dtrace << __func__ << " " << prefix << " " << k << dendl;
|
||||
ops.push_back(make_pair(DELETE,
|
||||
std::make_pair(std::make_pair(prefix, k),
|
||||
NULL)));
|
||||
}
|
||||
|
||||
void MemDB::MDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
|
||||
{
|
||||
KeyValueDB::Iterator it = m_db->get_iterator(prefix);
|
||||
for (it->seek_to_first(); it->valid(); it->next()) {
|
||||
rmkey(prefix, it->key());
|
||||
}
|
||||
}
|
||||
|
||||
void MemDB::MDBTransactionImpl::merge(
|
||||
const std::string &prefix, const std::string &key, const bufferlist &value)
|
||||
{
|
||||
|
||||
dtrace << __func__ << " " << prefix << " " << key << dendl;
|
||||
ops.push_back(make_pair(MERGE, make_pair(std::make_pair(prefix, key), value)));
|
||||
return;
|
||||
}
|
||||
|
||||
int MemDB::_setkey(ms_op_t &op)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(m_lock);
|
||||
std::string key = make_key(op.first.first, op.first.second);
|
||||
bufferlist bl = op.second;
|
||||
|
||||
m_total_bytes += bl.length();
|
||||
|
||||
bufferlist bl_old;
|
||||
if (_get(op.first.first, op.first.second, &bl_old)) {
|
||||
/*
|
||||
* delete and free existing key.
|
||||
*/
|
||||
m_total_bytes -= bl_old.length();
|
||||
m_btree.erase(key);
|
||||
}
|
||||
|
||||
m_btree[key] = bufferptr((char *) bl.c_str(), bl.length());
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int MemDB::_rmkey(ms_op_t &op)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(m_lock);
|
||||
std::string key = make_key(op.first.first, op.first.second);
|
||||
|
||||
bufferlist bl_old;
|
||||
if (_get(op.first.first, op.first.second, &bl_old)) {
|
||||
m_total_bytes -= bl_old.length();
|
||||
}
|
||||
/*
|
||||
* Erase will call the destructor for bufferptr.
|
||||
*/
|
||||
return m_btree.erase(key);
|
||||
}
|
||||
|
||||
std::shared_ptr<KeyValueDB::MergeOperator> MemDB::_find_merge_op(std::string prefix)
|
||||
{
|
||||
for (const auto& i : merge_ops) {
|
||||
if (i.first == prefix) {
|
||||
return i.second;
|
||||
}
|
||||
}
|
||||
|
||||
dtrace << __func__ << " No merge op for " << prefix << dendl;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
int MemDB::_merge(ms_op_t &op)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(m_lock);
|
||||
std::string prefix = op.first.first;
|
||||
std::string key = make_key(op.first.first, op.first.second);
|
||||
bufferlist bl = op.second;
|
||||
int64_t bytes_adjusted = bl.length();
|
||||
|
||||
/*
|
||||
* find the operator for this prefix
|
||||
*/
|
||||
std::shared_ptr<MergeOperator> mop = _find_merge_op(prefix);
|
||||
assert(mop);
|
||||
|
||||
/*
|
||||
* call the merge operator with value and non value
|
||||
*/
|
||||
bufferlist bl_old;
|
||||
if (_get(op.first.first, op.first.second, &bl_old) == false) {
|
||||
std::string new_val;
|
||||
/*
|
||||
* Merge non existent.
|
||||
*/
|
||||
mop->merge_nonexistent(bl.c_str(), bl.length(), &new_val);
|
||||
m_btree[key] = bufferptr(new_val.c_str(), new_val.length());
|
||||
} else {
|
||||
/*
|
||||
* Merge existing.
|
||||
*/
|
||||
std::string new_val;
|
||||
mop->merge(bl_old.c_str(), bl_old.length(), bl.c_str(), bl.length(), &new_val);
|
||||
m_btree[key] = bufferptr(new_val.c_str(), new_val.length());
|
||||
bytes_adjusted -= bl_old.length();
|
||||
bl_old.clear();
|
||||
}
|
||||
|
||||
m_total_bytes += bytes_adjusted;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Caller take btree lock.
|
||||
*/
|
||||
bool MemDB::_get(const string &prefix, const string &k, bufferlist *out)
|
||||
{
|
||||
string key = make_key(prefix, k);
|
||||
|
||||
btree::btree_map<string, bufferptr>::iterator iter = m_btree.find(key);
|
||||
if (iter == m_btree.end()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
out->push_back((m_btree[key].clone()));
|
||||
return true;
|
||||
}
|
||||
|
||||
bool MemDB::_get_locked(const string &prefix, const string &k, bufferlist *out)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(m_lock);
|
||||
return _get(prefix, k, out);
|
||||
}
|
||||
|
||||
|
||||
int MemDB::get(const string &prefix, const std::string& key,
|
||||
bufferlist *out)
|
||||
{
|
||||
if (_get_locked(prefix, key, out)) {
|
||||
return 0;
|
||||
}
|
||||
return -ENOENT;
|
||||
}
|
||||
|
||||
int MemDB::get(const string &prefix, const std::set<string> &keys,
|
||||
std::map<string, bufferlist> *out)
|
||||
{
|
||||
for (const auto& i : keys) {
|
||||
bufferlist bl;
|
||||
if (_get_locked(prefix, i, &bl))
|
||||
out->insert(make_pair(i, bl));
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void MemDB::MDBWholeSpaceIteratorImpl::fill_current()
|
||||
{
|
||||
bufferlist bl;
|
||||
bl.append(m_iter->second.clone());
|
||||
m_key_value = std::make_pair(m_iter->first, bl);
|
||||
}
|
||||
|
||||
bool MemDB::MDBWholeSpaceIteratorImpl::valid()
|
||||
{
|
||||
if (m_key_value.first.empty()) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
MemDB::MDBWholeSpaceIteratorImpl::free_last()
|
||||
{
|
||||
m_key_value.first.clear();
|
||||
assert(m_key_value.first.empty());
|
||||
m_key_value.second.clear();
|
||||
}
|
||||
|
||||
string MemDB::MDBWholeSpaceIteratorImpl::key()
|
||||
{
|
||||
dtrace << __func__ << " " << m_key_value.first << dendl;
|
||||
string prefix, key;
|
||||
split_key(m_key_value.first, &prefix, &key);
|
||||
return key;
|
||||
}
|
||||
|
||||
pair<string,string> MemDB::MDBWholeSpaceIteratorImpl::raw_key()
|
||||
{
|
||||
string prefix, key;
|
||||
split_key(m_key_value.first, &prefix, &key);
|
||||
return make_pair(prefix, key);
|
||||
}
|
||||
|
||||
bool MemDB::MDBWholeSpaceIteratorImpl::raw_key_is_prefixed(
|
||||
const string &prefix)
|
||||
{
|
||||
string p, k;
|
||||
split_key(m_key_value.first, &p, &k);
|
||||
return (p == prefix);
|
||||
}
|
||||
|
||||
bufferlist MemDB::MDBWholeSpaceIteratorImpl::value()
|
||||
{
|
||||
dtrace << __func__ << " " << m_key_value << dendl;
|
||||
return m_key_value.second;
|
||||
}
|
||||
|
||||
int MemDB::MDBWholeSpaceIteratorImpl::next()
|
||||
{
|
||||
std::lock_guard<std::mutex> l(*m_btree_lock_p);
|
||||
free_last();
|
||||
m_iter++;
|
||||
if (m_iter != m_btree_p->end()) {
|
||||
fill_current();
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
int MemDB::MDBWholeSpaceIteratorImpl:: prev()
|
||||
{
|
||||
std::lock_guard<std::mutex> l(*m_btree_lock_p);
|
||||
free_last();
|
||||
m_iter--;
|
||||
if (m_iter != m_btree_p->end()) {
|
||||
fill_current();
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* First key >= to given key, if key is null then first key in btree.
|
||||
*/
|
||||
int MemDB::MDBWholeSpaceIteratorImpl::seek_to_first(const std::string &k)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(*m_btree_lock_p);
|
||||
free_last();
|
||||
if (k.empty()) {
|
||||
m_iter = m_btree_p->begin();
|
||||
} else {
|
||||
m_iter = m_btree_p->lower_bound(k);
|
||||
}
|
||||
|
||||
if (m_iter == m_btree_p->end()) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int MemDB::MDBWholeSpaceIteratorImpl::seek_to_last(const std::string &k)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(*m_btree_lock_p);
|
||||
|
||||
free_last();
|
||||
if (k.empty()) {
|
||||
m_iter = m_btree_p->end();
|
||||
m_iter--;
|
||||
} else {
|
||||
m_iter = m_btree_p->lower_bound(k);
|
||||
}
|
||||
|
||||
if (m_iter == m_btree_p->end()) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
MemDB::MDBWholeSpaceIteratorImpl::~MDBWholeSpaceIteratorImpl()
|
||||
{
|
||||
free_last();
|
||||
}
|
||||
|
||||
KeyValueDB::WholeSpaceIterator MemDB::_get_snapshot_iterator()
|
||||
{
|
||||
assert(0);
|
||||
}
|
||||
|
||||
int MemDB::MDBWholeSpaceIteratorImpl::upper_bound(const std::string &prefix,
|
||||
const std::string &after) {
|
||||
|
||||
std::lock_guard<std::mutex> l(*m_btree_lock_p);
|
||||
|
||||
dtrace << "upper_bound " << prefix.c_str() << after.c_str() << dendl;
|
||||
string k = make_key(prefix, after);
|
||||
m_iter = m_btree_p->upper_bound(k);
|
||||
if (m_iter != m_btree_p->end()) {
|
||||
fill_current();
|
||||
return 0;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
int MemDB::MDBWholeSpaceIteratorImpl::lower_bound(const std::string &prefix,
|
||||
const std::string &to) {
|
||||
std::lock_guard<std::mutex> l(*m_btree_lock_p);
|
||||
dtrace << "lower_bound " << prefix.c_str() << to.c_str() << dendl;
|
||||
string k = make_key(prefix, to);
|
||||
m_iter = m_btree_p->lower_bound(k);
|
||||
if (m_iter != m_btree_p->end()) {
|
||||
fill_current();
|
||||
return 0;
|
||||
}
|
||||
return -1;
|
||||
}
|
197
src/kv/MemDB.h
Normal file
197
src/kv/MemDB.h
Normal file
@ -0,0 +1,197 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
/*
|
||||
* In-memory crash non-safe keyvalue db
|
||||
* Author: Ramesh Chander, Ramesh.Chander@sandisk.com
|
||||
*/
|
||||
|
||||
#ifndef CEPH_OS_BLUESTORE_MEMDB_H
|
||||
#define CEPH_OS_BLUESTORE_MEMDB_H
|
||||
|
||||
#include "include/buffer.h"
|
||||
#include <ostream>
|
||||
#include <set>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <memory>
|
||||
#include "include/memory.h"
|
||||
#include <boost/scoped_ptr.hpp>
|
||||
#include "include/encoding.h"
|
||||
#include "include/cpp-btree/btree.h"
|
||||
#include "include/cpp-btree/btree_map.h"
|
||||
#include "include/encoding_btree.h"
|
||||
#include "KeyValueDB.h"
|
||||
#include "osd/osd_types.h"
|
||||
|
||||
using std::string;
|
||||
#define KEY_DELIM '\0'
|
||||
|
||||
class MemDB : public KeyValueDB
|
||||
{
|
||||
typedef std::pair<std::pair<std::string, std::string>, bufferlist> ms_op_t;
|
||||
std::mutex m_lock;
|
||||
uint64_t m_total_bytes;
|
||||
uint64_t m_allocated_bytes;
|
||||
|
||||
btree::btree_map<std::string, bufferptr> m_btree;
|
||||
CephContext *m_cct;
|
||||
void* m_priv;
|
||||
string m_options;
|
||||
string m_db_path;
|
||||
|
||||
int transaction_rollback(KeyValueDB::Transaction t);
|
||||
int _open(ostream &out);
|
||||
void close();
|
||||
bool _get(const string &prefix, const string &k, bufferlist *out);
|
||||
bool _get_locked(const string &prefix, const string &k, bufferlist *out);
|
||||
std::string _get_data_fn();
|
||||
void _encode(btree::btree_map<string, bufferptr>:: iterator iter, bufferlist &bl);
|
||||
void _save();
|
||||
void _load();
|
||||
|
||||
public:
|
||||
MemDB(CephContext *c, const string &path, void *p) :
|
||||
m_cct(c), m_priv(p), m_db_path(path)
|
||||
{
|
||||
//Nothing as of now
|
||||
}
|
||||
|
||||
~MemDB();
|
||||
virtual int set_merge_operator(const std::string& prefix,
|
||||
std::shared_ptr<MergeOperator> mop);
|
||||
|
||||
std::shared_ptr<MergeOperator> _find_merge_op(std::string prefix);
|
||||
|
||||
static
|
||||
int _test_init(const string& dir) { return 0; };
|
||||
|
||||
class MDBTransactionImpl : public KeyValueDB::TransactionImpl {
|
||||
public:
|
||||
enum op_type { WRITE = 1, MERGE = 2, DELETE = 3};
|
||||
private:
|
||||
|
||||
std::vector<std::pair<op_type, ms_op_t>> ops;
|
||||
MemDB *m_db;
|
||||
|
||||
bool key_is_prefixed(const string &prefix, const string& full_key);
|
||||
public:
|
||||
const std::vector<std::pair<op_type, ms_op_t>>&
|
||||
get_ops() { return ops; };
|
||||
|
||||
void set(const std::string &prefix, const std::string &key,
|
||||
const bufferlist &val);
|
||||
void rmkey(const std::string &prefix, const std::string &k);
|
||||
void rmkeys_by_prefix(const std::string &prefix);
|
||||
|
||||
void merge(const std::string &prefix, const std::string &key, const bufferlist &value);
|
||||
void clear() {
|
||||
ops.clear();
|
||||
}
|
||||
MDBTransactionImpl(MemDB* _db) :m_db(_db)
|
||||
{
|
||||
ops.clear();
|
||||
}
|
||||
~MDBTransactionImpl() {};
|
||||
};
|
||||
|
||||
private:
|
||||
|
||||
/*
|
||||
* Transaction states.
|
||||
*/
|
||||
int _merge(const std::string &k, bufferptr &bl);
|
||||
int _merge(ms_op_t &op);
|
||||
int _setkey(ms_op_t &op);
|
||||
int _rmkey(ms_op_t &op);
|
||||
|
||||
public:
|
||||
|
||||
int init(string option_str="") { m_options = option_str; return 0; }
|
||||
int _init(bool format);
|
||||
|
||||
int do_open(ostream &out, bool create);
|
||||
int open(ostream &out) { return do_open(out, false); }
|
||||
int create_and_open(ostream &out) { return do_open(out, true); }
|
||||
|
||||
KeyValueDB::Transaction get_transaction() {
|
||||
return std::shared_ptr<MDBTransactionImpl>(new MDBTransactionImpl(this));
|
||||
}
|
||||
|
||||
int submit_transaction(Transaction);
|
||||
int submit_transaction_sync(Transaction);
|
||||
|
||||
int get(const std::string &prefix, const std::set<std::string> &key,
|
||||
std::map<std::string, bufferlist> *out);
|
||||
|
||||
int get(const std::string &prefix, const std::string &key,
|
||||
bufferlist *out) override;
|
||||
|
||||
class MDBWholeSpaceIteratorImpl : public KeyValueDB::WholeSpaceIteratorImpl {
|
||||
|
||||
btree::btree_map<string, bufferptr>::iterator m_iter;
|
||||
std::pair<string, bufferlist> m_key_value;
|
||||
btree::btree_map<std::string, bufferptr> *m_btree_p;
|
||||
std::mutex *m_btree_lock_p;
|
||||
|
||||
public:
|
||||
MDBWholeSpaceIteratorImpl(btree::btree_map<std::string, bufferptr> *btree_p,
|
||||
std::mutex *btree_lock_p) {
|
||||
m_btree_p = btree_p;
|
||||
m_btree_lock_p = btree_lock_p;
|
||||
}
|
||||
|
||||
void fill_current();
|
||||
void free_last();
|
||||
|
||||
|
||||
int seek_to_first(const std::string &k);
|
||||
int seek_to_last(const std::string &k);
|
||||
|
||||
int seek_to_first() { return seek_to_first(NULL); };
|
||||
int seek_to_last() { return seek_to_last(NULL); };
|
||||
|
||||
int upper_bound(const std::string &prefix, const std::string &after);
|
||||
int lower_bound(const std::string &prefix, const std::string &to);
|
||||
bool valid();
|
||||
|
||||
int next();
|
||||
int prev();
|
||||
int status() { return 0; };
|
||||
|
||||
std::string key();
|
||||
std::pair<std::string,std::string> raw_key();
|
||||
bool raw_key_is_prefixed(const std::string &prefix);
|
||||
bufferlist value();
|
||||
~MDBWholeSpaceIteratorImpl();
|
||||
};
|
||||
|
||||
uint64_t get_estimated_size(std::map<std::string,uint64_t> &extra) {
|
||||
std::lock_guard<std::mutex> l(m_lock);
|
||||
return m_allocated_bytes;
|
||||
};
|
||||
|
||||
int get_statfs(struct store_statfs_t *buf) {
|
||||
std::lock_guard<std::mutex> l(m_lock);
|
||||
store_statfs_t s;
|
||||
s.total = m_total_bytes;
|
||||
s.allocated = m_allocated_bytes;
|
||||
s.stored = m_total_bytes;
|
||||
s.compressed = 0;
|
||||
s.compressed_allocated = 0;
|
||||
s.compressed_original = 0;
|
||||
*buf = s;
|
||||
return 0;
|
||||
}
|
||||
|
||||
protected:
|
||||
|
||||
WholeSpaceIterator _get_iterator() {
|
||||
return std::shared_ptr<KeyValueDB::WholeSpaceIteratorImpl>(
|
||||
new MDBWholeSpaceIteratorImpl(&m_btree, &m_lock));
|
||||
}
|
||||
|
||||
WholeSpaceIterator _get_snapshot_iterator();
|
||||
};
|
||||
|
||||
#endif
|
||||
|
@ -83,6 +83,37 @@ TEST_P(KVTest, OpenCloseReopenClose) {
|
||||
fini();
|
||||
}
|
||||
|
||||
/*
|
||||
* Basic write and read test case in same database session.
|
||||
*/
|
||||
TEST_P(KVTest, OpenWriteRead) {
|
||||
ASSERT_EQ(0, db->create_and_open(cout));
|
||||
{
|
||||
KeyValueDB::Transaction t = db->get_transaction();
|
||||
bufferlist value;
|
||||
value.append("value");
|
||||
t->set("prefix", "key", value);
|
||||
value.clear();
|
||||
value.append("value2");
|
||||
t->set("prefix", "key2", value);
|
||||
value.clear();
|
||||
value.append("value3");
|
||||
t->set("prefix", "key3", value);
|
||||
db->submit_transaction_sync(t);
|
||||
|
||||
bufferlist v1, v2;
|
||||
ASSERT_EQ(0, db->get("prefix", "key", &v1));
|
||||
ASSERT_EQ(v1.length(), 5u);
|
||||
(v1.c_str())[v1.length()] = 0x0;
|
||||
ASSERT_EQ(std::string(v1.c_str()), std::string("value"));
|
||||
ASSERT_EQ(0, db->get("prefix", "key2", &v2));
|
||||
ASSERT_EQ(v2.length(), 6u);
|
||||
(v2.c_str())[v2.length()] = 0x0;
|
||||
ASSERT_EQ(std::string(v2.c_str()), std::string("value2"));
|
||||
}
|
||||
fini();
|
||||
}
|
||||
|
||||
TEST_P(KVTest, PutReopen) {
|
||||
ASSERT_EQ(0, db->create_and_open(cout));
|
||||
{
|
||||
@ -168,6 +199,7 @@ struct AppendMOP : public KeyValueDB::MergeOperator {
|
||||
const char *ldata, size_t llen,
|
||||
const char *rdata, size_t rlen,
|
||||
std::string *new_value) {
|
||||
|
||||
*new_value = std::string(ldata, llen) + std::string(rdata, rlen);
|
||||
}
|
||||
// We use each operator name and each prefix to construct the
|
||||
@ -227,7 +259,7 @@ TEST_P(KVTest, Merge) {
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
KeyValueDB,
|
||||
KVTest,
|
||||
::testing::Values("leveldb", "rocksdb"));
|
||||
::testing::Values("leveldb", "rocksdb", "memdb"));
|
||||
|
||||
#else
|
||||
|
||||
@ -250,7 +282,7 @@ int main(int argc, char **argv) {
|
||||
common_init_finish(g_ceph_context);
|
||||
g_ceph_context->_conf->set_val(
|
||||
"enable_experimental_unrecoverable_data_corrupting_features",
|
||||
"rocksdb");
|
||||
"rocksdb, memdb");
|
||||
g_ceph_context->_conf->apply_changes(NULL);
|
||||
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
|
Loading…
Reference in New Issue
Block a user