mirror of
https://github.com/ceph/ceph
synced 2025-04-01 00:26:47 +00:00
kv/KeyValueDB: add merge operator interface and test
Signed-off-by: Allen Samuels <allen.samuels@sandisk.com> Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
parent
2b075aa52d
commit
abbececc78
@ -93,6 +93,13 @@ public:
|
||||
const std::string &prefix ///< [in] Prefix by which to remove keys
|
||||
) = 0;
|
||||
|
||||
/// Merge value into key
|
||||
virtual void merge(
|
||||
const std::string &prefix, ///< [in] Prefix ==> MUST match some established merge operator
|
||||
const std::string &key, ///< [in] Key to be merged
|
||||
const bufferlist &value ///< [in] value to be merged into key
|
||||
) { assert(0 == "Not implemented"); }
|
||||
|
||||
virtual ~TransactionImpl() {}
|
||||
};
|
||||
typedef ceph::shared_ptr< TransactionImpl > Transaction;
|
||||
@ -277,7 +284,36 @@ public:
|
||||
virtual void compact_range_async(const std::string& prefix,
|
||||
const std::string& start, const std::string& end) {}
|
||||
|
||||
// See RocksDB merge operator definition, we support the basic
|
||||
// associative merge only right now.
|
||||
class MergeOperator {
|
||||
public:
|
||||
/// Merge into a key that doesn't exist
|
||||
virtual void merge_nonexistant(
|
||||
const char *rdata, size_t rlen,
|
||||
std::string *new_value) = 0;
|
||||
/// Merge into a key that does exist
|
||||
virtual void merge(
|
||||
const char *ldata, size_t llen,
|
||||
const char *rdata, size_t rlen,
|
||||
std::string *new_value) = 0;
|
||||
/// We use each operator name and each prefix to construct the overall RocksDB operator name for consistency check at open time.
|
||||
virtual string name() const = 0;
|
||||
|
||||
virtual ~MergeOperator() {}
|
||||
};
|
||||
|
||||
/// Setup one or more operators, this needs to be done BEFORE the DB is opened.
|
||||
virtual int set_merge_operator(const std::string& prefix,
|
||||
std::shared_ptr<MergeOperator> mop) {
|
||||
return -EOPNOTSUPP;
|
||||
}
|
||||
|
||||
protected:
|
||||
/// List of matching prefixes and merge operators
|
||||
std::vector<std::pair<std::string,
|
||||
std::shared_ptr<MergeOperator> > > merge_ops;
|
||||
|
||||
virtual WholeSpaceIterator _get_iterator() = 0;
|
||||
virtual WholeSpaceIterator _get_snapshot_iterator() = 0;
|
||||
};
|
||||
|
@ -159,6 +159,70 @@ TEST_P(KVTest, BenchCommit) {
|
||||
fini();
|
||||
}
|
||||
|
||||
struct AppendMOP : public KeyValueDB::MergeOperator {
|
||||
virtual void merge_nonexistant(
|
||||
const char *rdata, size_t rlen, std::string *new_value) override {
|
||||
*new_value = "?" + std::string(rdata, rlen);
|
||||
}
|
||||
virtual void merge(
|
||||
const char *ldata, size_t llen,
|
||||
const char *rdata, size_t rlen,
|
||||
std::string *new_value) {
|
||||
*new_value = std::string(ldata, llen) + std::string(rdata, rlen);
|
||||
}
|
||||
// We use each operator name and each prefix to construct the
|
||||
// overall RocksDB operator name for consistency check at open time.
|
||||
virtual string name() const {
|
||||
return "Append";
|
||||
}
|
||||
};
|
||||
|
||||
string tostr(bufferlist& b) {
|
||||
return string(b.c_str(),b.length());
|
||||
}
|
||||
|
||||
TEST_P(KVTest, Merge) {
|
||||
shared_ptr<KeyValueDB::MergeOperator> p(new AppendMOP);
|
||||
int r = db->set_merge_operator("A",p);
|
||||
if (r < 0)
|
||||
return; // No merge operators for this database type
|
||||
ASSERT_EQ(0, db->create_and_open(cout));
|
||||
{
|
||||
KeyValueDB::Transaction t = db->get_transaction();
|
||||
bufferlist v1, v2, v3;
|
||||
v1.append(string("1"));
|
||||
v2.append(string("2"));
|
||||
v3.append(string("3"));
|
||||
t->set("P", "K1", v1);
|
||||
t->set("A", "A1", v2);
|
||||
t->rmkey("A", "A2");
|
||||
t->merge("A", "A2", v3);
|
||||
db->submit_transaction_sync(t);
|
||||
}
|
||||
{
|
||||
bufferlist v1, v2, v3;
|
||||
ASSERT_EQ(0, db->get("P", "K1", &v1));
|
||||
ASSERT_EQ(tostr(v1), "1");
|
||||
ASSERT_EQ(0, db->get("A", "A1", &v2));
|
||||
ASSERT_EQ(tostr(v2), "2");
|
||||
ASSERT_EQ(0, db->get("A", "A2", &v3));
|
||||
ASSERT_EQ(tostr(v3), "?3");
|
||||
}
|
||||
{
|
||||
KeyValueDB::Transaction t = db->get_transaction();
|
||||
bufferlist v1;
|
||||
v1.append(string("1"));
|
||||
t->merge("A", "A2", v1);
|
||||
db->submit_transaction_sync(t);
|
||||
}
|
||||
{
|
||||
bufferlist v;
|
||||
ASSERT_EQ(0, db->get("A", "A2", &v));
|
||||
ASSERT_EQ(tostr(v), "?31");
|
||||
}
|
||||
fini();
|
||||
}
|
||||
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
KeyValueDB,
|
||||
|
Loading…
Reference in New Issue
Block a user