mirror of
https://github.com/ceph/ceph
synced 2025-04-11 04:02:04 +00:00
Merge pull request #8707 from liewegas/wip-kv-merge
kv: add merge operator support, fix test Reviewed-by: Sage Weil <sage@redhat.com>
This commit is contained in:
commit
93d7deff53
@ -93,6 +93,13 @@ public:
|
|||||||
const std::string &prefix ///< [in] Prefix by which to remove keys
|
const std::string &prefix ///< [in] Prefix by which to remove keys
|
||||||
) = 0;
|
) = 0;
|
||||||
|
|
||||||
|
/// Merge value into key
|
||||||
|
virtual void merge(
|
||||||
|
const std::string &prefix, ///< [in] Prefix ==> MUST match some established merge operator
|
||||||
|
const std::string &key, ///< [in] Key to be merged
|
||||||
|
const bufferlist &value ///< [in] value to be merged into key
|
||||||
|
) { assert(0 == "Not implemented"); }
|
||||||
|
|
||||||
virtual ~TransactionImpl() {}
|
virtual ~TransactionImpl() {}
|
||||||
};
|
};
|
||||||
typedef ceph::shared_ptr< TransactionImpl > Transaction;
|
typedef ceph::shared_ptr< TransactionImpl > Transaction;
|
||||||
@ -277,7 +284,36 @@ public:
|
|||||||
virtual void compact_range_async(const std::string& prefix,
|
virtual void compact_range_async(const std::string& prefix,
|
||||||
const std::string& start, const std::string& end) {}
|
const std::string& start, const std::string& end) {}
|
||||||
|
|
||||||
|
// See RocksDB merge operator definition, we support the basic
|
||||||
|
// associative merge only right now.
|
||||||
|
class MergeOperator {
|
||||||
|
public:
|
||||||
|
/// Merge into a key that doesn't exist
|
||||||
|
virtual void merge_nonexistant(
|
||||||
|
const char *rdata, size_t rlen,
|
||||||
|
std::string *new_value) = 0;
|
||||||
|
/// Merge into a key that does exist
|
||||||
|
virtual void merge(
|
||||||
|
const char *ldata, size_t llen,
|
||||||
|
const char *rdata, size_t rlen,
|
||||||
|
std::string *new_value) = 0;
|
||||||
|
/// We use each operator name and each prefix to construct the overall RocksDB operator name for consistency check at open time.
|
||||||
|
virtual string name() const = 0;
|
||||||
|
|
||||||
|
virtual ~MergeOperator() {}
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Setup one or more operators, this needs to be done BEFORE the DB is opened.
|
||||||
|
virtual int set_merge_operator(const std::string& prefix,
|
||||||
|
std::shared_ptr<MergeOperator> mop) {
|
||||||
|
return -EOPNOTSUPP;
|
||||||
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
/// List of matching prefixes and merge operators
|
||||||
|
std::vector<std::pair<std::string,
|
||||||
|
std::shared_ptr<MergeOperator> > > merge_ops;
|
||||||
|
|
||||||
virtual WholeSpaceIterator _get_iterator() = 0;
|
virtual WholeSpaceIterator _get_iterator() = 0;
|
||||||
virtual WholeSpaceIterator _get_snapshot_iterator() = 0;
|
virtual WholeSpaceIterator _get_snapshot_iterator() = 0;
|
||||||
};
|
};
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
#include "rocksdb/cache.h"
|
#include "rocksdb/cache.h"
|
||||||
#include "rocksdb/filter_policy.h"
|
#include "rocksdb/filter_policy.h"
|
||||||
#include "rocksdb/utilities/convenience.h"
|
#include "rocksdb/utilities/convenience.h"
|
||||||
|
#include "rocksdb/merge_operator.h"
|
||||||
using std::string;
|
using std::string;
|
||||||
#include "common/perf_counters.h"
|
#include "common/perf_counters.h"
|
||||||
#include "common/debug.h"
|
#include "common/debug.h"
|
||||||
@ -32,6 +33,66 @@ using std::string;
|
|||||||
#undef dout_prefix
|
#undef dout_prefix
|
||||||
#define dout_prefix *_dout << "rocksdb: "
|
#define dout_prefix *_dout << "rocksdb: "
|
||||||
|
|
||||||
|
//
|
||||||
|
// One of these per rocksdb instance, implements the merge operator prefix stuff
|
||||||
|
//
|
||||||
|
class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperator {
|
||||||
|
RocksDBStore& store;
|
||||||
|
public:
|
||||||
|
const char *Name() const {
|
||||||
|
// Construct a name that rocksDB will validate against. We want to
|
||||||
|
// do this in a way that doesn't constrain the ordering of calls
|
||||||
|
// to set_merge_operator, so sort the merge operators and then
|
||||||
|
// construct a name from all of those parts.
|
||||||
|
store.assoc_name.clear();
|
||||||
|
map<std::string,std::string> names;
|
||||||
|
for (auto& p : store.merge_ops) names[p.first] = p.second->name();
|
||||||
|
for (auto& p : names) {
|
||||||
|
store.assoc_name += '.';
|
||||||
|
store.assoc_name += p.first;
|
||||||
|
store.assoc_name += ':';
|
||||||
|
store.assoc_name += p.second;
|
||||||
|
}
|
||||||
|
return store.assoc_name.c_str();
|
||||||
|
}
|
||||||
|
|
||||||
|
MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
|
||||||
|
|
||||||
|
virtual bool Merge(const rocksdb::Slice& key,
|
||||||
|
const rocksdb::Slice* existing_value,
|
||||||
|
const rocksdb::Slice& value,
|
||||||
|
std::string* new_value,
|
||||||
|
rocksdb::Logger* logger) const {
|
||||||
|
// Check each prefix
|
||||||
|
for (auto& p : store.merge_ops) {
|
||||||
|
if (p.first.compare(0, p.first.length(),
|
||||||
|
key.data(), p.first.length()) == 0 &&
|
||||||
|
key.data()[p.first.length()] == 0) {
|
||||||
|
if (existing_value) {
|
||||||
|
p.second->merge(existing_value->data(), existing_value->size(),
|
||||||
|
value.data(), value.size(),
|
||||||
|
new_value);
|
||||||
|
} else {
|
||||||
|
p.second->merge_nonexistant(value.data(), value.size(), new_value);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true; // OK :)
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
int RocksDBStore::set_merge_operator(
|
||||||
|
const string& prefix,
|
||||||
|
std::shared_ptr<KeyValueDB::MergeOperator> mop)
|
||||||
|
{
|
||||||
|
// If you fail here, it's because you can't do this on an open database
|
||||||
|
assert(db == nullptr);
|
||||||
|
merge_ops.push_back(std::make_pair(prefix,mop));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
class CephRocksdbLogger : public rocksdb::Logger {
|
class CephRocksdbLogger : public rocksdb::Logger {
|
||||||
CephContext *cct;
|
CephContext *cct;
|
||||||
public:
|
public:
|
||||||
@ -226,6 +287,7 @@ int RocksDBStore::do_open(ostream &out, bool create_if_missing)
|
|||||||
dout(10) << __func__ << " set block size to " << g_conf->rocksdb_block_size
|
dout(10) << __func__ << " set block size to " << g_conf->rocksdb_block_size
|
||||||
<< " cache size to " << g_conf->rocksdb_cache_size << dendl;
|
<< " cache size to " << g_conf->rocksdb_cache_size << dendl;
|
||||||
|
|
||||||
|
opt.merge_operator.reset(new MergeOperatorRouter(*this));
|
||||||
status = rocksdb::DB::Open(opt, path, &db);
|
status = rocksdb::DB::Open(opt, path, &db);
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
derr << status.ToString() << dendl;
|
derr << status.ToString() << dendl;
|
||||||
@ -260,6 +322,7 @@ int RocksDBStore::_test_init(const string& dir)
|
|||||||
rocksdb::DB *db;
|
rocksdb::DB *db;
|
||||||
rocksdb::Status status = rocksdb::DB::Open(options, dir, &db);
|
rocksdb::Status status = rocksdb::DB::Open(options, dir, &db);
|
||||||
delete db;
|
delete db;
|
||||||
|
db = nullptr;
|
||||||
return status.ok() ? 0 : -EIO;
|
return status.ok() ? 0 : -EIO;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -270,6 +333,7 @@ RocksDBStore::~RocksDBStore()
|
|||||||
|
|
||||||
// Ensure db is destroyed before dependent db_cache and filterpolicy
|
// Ensure db is destroyed before dependent db_cache and filterpolicy
|
||||||
delete db;
|
delete db;
|
||||||
|
db = nullptr;
|
||||||
|
|
||||||
if (priv) {
|
if (priv) {
|
||||||
delete static_cast<rocksdb::Env*>(priv);
|
delete static_cast<rocksdb::Env*>(priv);
|
||||||
@ -383,6 +447,26 @@ void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void RocksDBStore::RocksDBTransactionImpl::merge(
|
||||||
|
const string &prefix,
|
||||||
|
const string &k,
|
||||||
|
const bufferlist &to_set_bl)
|
||||||
|
{
|
||||||
|
string key = combine_strings(prefix, k);
|
||||||
|
|
||||||
|
// bufferlist::c_str() is non-constant, so we can't call c_str()
|
||||||
|
if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
|
||||||
|
bat->Merge(rocksdb::Slice(key),
|
||||||
|
rocksdb::Slice(to_set_bl.buffers().front().c_str(),
|
||||||
|
to_set_bl.length()));
|
||||||
|
} else {
|
||||||
|
// make a copy
|
||||||
|
bufferlist val = to_set_bl;
|
||||||
|
bat->Merge(rocksdb::Slice(key),
|
||||||
|
rocksdb::Slice(val.c_str(), val.length()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int RocksDBStore::get(
|
int RocksDBStore::get(
|
||||||
const string &prefix,
|
const string &prefix,
|
||||||
const std::set<string> &keys,
|
const std::set<string> &keys,
|
||||||
@ -534,6 +618,7 @@ bool RocksDBStore::check_omap_dir(string &omap_dir)
|
|||||||
rocksdb::DB *db;
|
rocksdb::DB *db;
|
||||||
rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db);
|
rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db);
|
||||||
delete db;
|
delete db;
|
||||||
|
db = nullptr;
|
||||||
return status.ok();
|
return status.ok();
|
||||||
}
|
}
|
||||||
void RocksDBStore::compact_range(const string& start, const string& end)
|
void RocksDBStore::compact_range(const string& start, const string& end)
|
||||||
|
@ -154,6 +154,10 @@ public:
|
|||||||
void rmkeys_by_prefix(
|
void rmkeys_by_prefix(
|
||||||
const string &prefix
|
const string &prefix
|
||||||
);
|
);
|
||||||
|
void merge(
|
||||||
|
const string& prefix,
|
||||||
|
const string& k,
|
||||||
|
const bufferlist &bl);
|
||||||
};
|
};
|
||||||
|
|
||||||
KeyValueDB::Transaction get_transaction() {
|
KeyValueDB::Transaction get_transaction() {
|
||||||
@ -217,6 +221,12 @@ public:
|
|||||||
static bufferlist to_bufferlist(rocksdb::Slice in);
|
static bufferlist to_bufferlist(rocksdb::Slice in);
|
||||||
static string past_prefix(const string &prefix);
|
static string past_prefix(const string &prefix);
|
||||||
|
|
||||||
|
class MergeOperatorRouter;
|
||||||
|
friend class MergeOperatorRouter;
|
||||||
|
virtual int set_merge_operator(const std::string& prefix,
|
||||||
|
std::shared_ptr<KeyValueDB::MergeOperator> mop);
|
||||||
|
string assoc_name; ///< Name of associative operator
|
||||||
|
|
||||||
virtual uint64_t get_estimated_size(map<string,uint64_t> &extra) {
|
virtual uint64_t get_estimated_size(map<string,uint64_t> &extra) {
|
||||||
DIR *store_dir = opendir(path.c_str());
|
DIR *store_dir = opendir(path.c_str());
|
||||||
if (!store_dir) {
|
if (!store_dir) {
|
||||||
@ -291,4 +301,6 @@ protected:
|
|||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -35,9 +35,20 @@ public:
|
|||||||
|
|
||||||
KVTest() : db(0) {}
|
KVTest() : db(0) {}
|
||||||
|
|
||||||
|
void rm_r(string path) {
|
||||||
|
string cmd = string("rm -r ") + path;
|
||||||
|
cout << "==> " << cmd << std::endl;
|
||||||
|
int r = ::system(cmd.c_str());
|
||||||
|
if (r) {
|
||||||
|
cerr << "failed with exit code " << r
|
||||||
|
<< ", continuing anyway" << std::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void init() {
|
void init() {
|
||||||
|
cout << "Creating " << string(GetParam()) << "\n";
|
||||||
db.reset(KeyValueDB::create(g_ceph_context, string(GetParam()),
|
db.reset(KeyValueDB::create(g_ceph_context, string(GetParam()),
|
||||||
string("kv_test_temp_dir")));
|
"kv_test_temp_dir"));
|
||||||
}
|
}
|
||||||
void fini() {
|
void fini() {
|
||||||
db.reset(NULL);
|
db.reset(NULL);
|
||||||
@ -47,14 +58,15 @@ public:
|
|||||||
int r = ::mkdir("kv_test_temp_dir", 0777);
|
int r = ::mkdir("kv_test_temp_dir", 0777);
|
||||||
if (r < 0 && errno != EEXIST) {
|
if (r < 0 && errno != EEXIST) {
|
||||||
r = -errno;
|
r = -errno;
|
||||||
cerr << __func__ << ": unable to create kv_test_temp_dir"
|
cerr << __func__ << ": unable to create kv_test_temp_dir: "
|
||||||
<< ": " << cpp_strerror(r) << std::endl;
|
<< cpp_strerror(r) << std::endl;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
init();
|
init();
|
||||||
}
|
}
|
||||||
virtual void TearDown() {
|
virtual void TearDown() {
|
||||||
fini();
|
fini();
|
||||||
|
rm_r("kv_test_temp_dir");
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -87,11 +99,11 @@ TEST_P(KVTest, PutReopen) {
|
|||||||
init();
|
init();
|
||||||
ASSERT_EQ(0, db->open(cout));
|
ASSERT_EQ(0, db->open(cout));
|
||||||
{
|
{
|
||||||
bufferlist v;
|
bufferlist v1, v2;
|
||||||
ASSERT_EQ(0, db->get("prefix", "key", &v));
|
ASSERT_EQ(0, db->get("prefix", "key", &v1));
|
||||||
ASSERT_EQ(v.length(), 5u);
|
ASSERT_EQ(v1.length(), 5u);
|
||||||
ASSERT_EQ(0, db->get("prefix", "key2", &v));
|
ASSERT_EQ(0, db->get("prefix", "key2", &v2));
|
||||||
ASSERT_EQ(v.length(), 5u);
|
ASSERT_EQ(v2.length(), 5u);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
KeyValueDB::Transaction t = db->get_transaction();
|
KeyValueDB::Transaction t = db->get_transaction();
|
||||||
@ -104,11 +116,11 @@ TEST_P(KVTest, PutReopen) {
|
|||||||
init();
|
init();
|
||||||
ASSERT_EQ(0, db->open(cout));
|
ASSERT_EQ(0, db->open(cout));
|
||||||
{
|
{
|
||||||
bufferlist v;
|
bufferlist v1, v2, v3;
|
||||||
ASSERT_EQ(-ENOENT, db->get("prefix", "key", &v));
|
ASSERT_EQ(-ENOENT, db->get("prefix", "key", &v1));
|
||||||
ASSERT_EQ(0, db->get("prefix", "key2", &v));
|
ASSERT_EQ(0, db->get("prefix", "key2", &v2));
|
||||||
ASSERT_EQ(v.length(), 5u);
|
ASSERT_EQ(v2.length(), 5u);
|
||||||
ASSERT_EQ(-ENOENT, db->get("prefix", "key3", &v));
|
ASSERT_EQ(-ENOENT, db->get("prefix", "key3", &v3));
|
||||||
}
|
}
|
||||||
fini();
|
fini();
|
||||||
}
|
}
|
||||||
@ -144,6 +156,71 @@ TEST_P(KVTest, BenchCommit) {
|
|||||||
utime_t dur = end - start;
|
utime_t dur = end - start;
|
||||||
cout << n << " commits in " << dur << ", avg latency " << (dur / (double)n)
|
cout << n << " commits in " << dur << ", avg latency " << (dur / (double)n)
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
|
fini();
|
||||||
|
}
|
||||||
|
|
||||||
|
struct AppendMOP : public KeyValueDB::MergeOperator {
|
||||||
|
virtual void merge_nonexistant(
|
||||||
|
const char *rdata, size_t rlen, std::string *new_value) override {
|
||||||
|
*new_value = "?" + std::string(rdata, rlen);
|
||||||
|
}
|
||||||
|
virtual void merge(
|
||||||
|
const char *ldata, size_t llen,
|
||||||
|
const char *rdata, size_t rlen,
|
||||||
|
std::string *new_value) {
|
||||||
|
*new_value = std::string(ldata, llen) + std::string(rdata, rlen);
|
||||||
|
}
|
||||||
|
// We use each operator name and each prefix to construct the
|
||||||
|
// overall RocksDB operator name for consistency check at open time.
|
||||||
|
virtual string name() const {
|
||||||
|
return "Append";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
string tostr(bufferlist& b) {
|
||||||
|
return string(b.c_str(),b.length());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_P(KVTest, Merge) {
|
||||||
|
shared_ptr<KeyValueDB::MergeOperator> p(new AppendMOP);
|
||||||
|
int r = db->set_merge_operator("A",p);
|
||||||
|
if (r < 0)
|
||||||
|
return; // No merge operators for this database type
|
||||||
|
ASSERT_EQ(0, db->create_and_open(cout));
|
||||||
|
{
|
||||||
|
KeyValueDB::Transaction t = db->get_transaction();
|
||||||
|
bufferlist v1, v2, v3;
|
||||||
|
v1.append(string("1"));
|
||||||
|
v2.append(string("2"));
|
||||||
|
v3.append(string("3"));
|
||||||
|
t->set("P", "K1", v1);
|
||||||
|
t->set("A", "A1", v2);
|
||||||
|
t->rmkey("A", "A2");
|
||||||
|
t->merge("A", "A2", v3);
|
||||||
|
db->submit_transaction_sync(t);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
bufferlist v1, v2, v3;
|
||||||
|
ASSERT_EQ(0, db->get("P", "K1", &v1));
|
||||||
|
ASSERT_EQ(tostr(v1), "1");
|
||||||
|
ASSERT_EQ(0, db->get("A", "A1", &v2));
|
||||||
|
ASSERT_EQ(tostr(v2), "2");
|
||||||
|
ASSERT_EQ(0, db->get("A", "A2", &v3));
|
||||||
|
ASSERT_EQ(tostr(v3), "?3");
|
||||||
|
}
|
||||||
|
{
|
||||||
|
KeyValueDB::Transaction t = db->get_transaction();
|
||||||
|
bufferlist v1;
|
||||||
|
v1.append(string("1"));
|
||||||
|
t->merge("A", "A2", v1);
|
||||||
|
db->submit_transaction_sync(t);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
bufferlist v;
|
||||||
|
ASSERT_EQ(0, db->get("A", "A2", &v));
|
||||||
|
ASSERT_EQ(tostr(v), "?31");
|
||||||
|
}
|
||||||
|
fini();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user