diff --git a/src/kv/KeyValueDB.h b/src/kv/KeyValueDB.h index c7a83a25323..aef93a3eb3c 100644 --- a/src/kv/KeyValueDB.h +++ b/src/kv/KeyValueDB.h @@ -93,6 +93,13 @@ public: const std::string &prefix ///< [in] Prefix by which to remove keys ) = 0; + /// Merge value into key + virtual void merge( + const std::string &prefix, ///< [in] Prefix ==> MUST match some established merge operator + const std::string &key, ///< [in] Key to be merged + const bufferlist &value ///< [in] value to be merged into key + ) { assert(0 == "Not implemented"); } + virtual ~TransactionImpl() {} }; typedef ceph::shared_ptr< TransactionImpl > Transaction; @@ -277,7 +284,36 @@ public: virtual void compact_range_async(const std::string& prefix, const std::string& start, const std::string& end) {} + // See RocksDB merge operator definition, we support the basic + // associative merge only right now. + class MergeOperator { + public: + /// Merge into a key that doesn't exist + virtual void merge_nonexistant( + const char *rdata, size_t rlen, + std::string *new_value) = 0; + /// Merge into a key that does exist + virtual void merge( + const char *ldata, size_t llen, + const char *rdata, size_t rlen, + std::string *new_value) = 0; + /// We use each operator name and each prefix to construct the overall RocksDB operator name for consistency check at open time. + virtual string name() const = 0; + + virtual ~MergeOperator() {} + }; + + /// Setup one or more operators, this needs to be done BEFORE the DB is opened. + virtual int set_merge_operator(const std::string& prefix, + std::shared_ptr<MergeOperator> mop) { + return -EOPNOTSUPP; + } + protected: + /// List of matching prefixes and merge operators + std::vector<std::pair<std::string, + std::shared_ptr<MergeOperator> > > merge_ops; + virtual WholeSpaceIterator _get_iterator() = 0; virtual WholeSpaceIterator _get_snapshot_iterator() = 0; }; diff --git a/src/kv/RocksDBStore.cc b/src/kv/RocksDBStore.cc index d8751cc4435..bd0acd6ad40 100644 --- a/src/kv/RocksDBStore.cc +++ b/src/kv/RocksDBStore.cc @@ -18,6 +18,7 @@ #include "rocksdb/cache.h" #include "rocksdb/filter_policy.h" #include "rocksdb/utilities/convenience.h" +#include "rocksdb/merge_operator.h" using std::string; #include "common/perf_counters.h" #include "common/debug.h" @@ -32,6 +33,66 @@ using std::string; #undef dout_prefix #define dout_prefix *_dout << "rocksdb: " +// +// One of these per rocksdb instance, implements the merge operator prefix stuff +// +class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperator { + RocksDBStore& store; + public: + const char *Name() const { + // Construct a name that rocksDB will validate against. We want to + // do this in a way that doesn't constrain the ordering of calls + // to set_merge_operator, so sort the merge operators and then + // construct a name from all of those parts. + store.assoc_name.clear(); + map<std::string,std::string> names; + for (auto& p : store.merge_ops) names[p.first] = p.second->name(); + for (auto& p : names) { + store.assoc_name += '.'; + store.assoc_name += p.first; + store.assoc_name += ':'; + store.assoc_name += p.second; + } + return store.assoc_name.c_str(); + } + + MergeOperatorRouter(RocksDBStore &_store) : store(_store) {} + + virtual bool Merge(const rocksdb::Slice& key, + const rocksdb::Slice* existing_value, + const rocksdb::Slice& value, + std::string* new_value, + rocksdb::Logger* logger) const { + // Check each prefix + for (auto& p : store.merge_ops) { + if (p.first.compare(0, p.first.length(), + key.data(), p.first.length()) == 0 && + key.data()[p.first.length()] == 0) { + if (existing_value) { + p.second->merge(existing_value->data(), existing_value->size(), + value.data(), value.size(), + new_value); + } else { + p.second->merge_nonexistant(value.data(), value.size(), new_value); + } + break; + } + } + return true; // OK :) + } + +}; + +int RocksDBStore::set_merge_operator( + const string& prefix, + std::shared_ptr<KeyValueDB::MergeOperator> mop) +{ + // If you fail here, it's because you can't do this on an open database + assert(db == nullptr); + merge_ops.push_back(std::make_pair(prefix,mop)); + return 0; +} + class CephRocksdbLogger : public rocksdb::Logger { CephContext *cct; public: @@ -226,6 +287,7 @@ int RocksDBStore::do_open(ostream &out, bool create_if_missing) dout(10) << __func__ << " set block size to " << g_conf->rocksdb_block_size << " cache size to " << g_conf->rocksdb_cache_size << dendl; + opt.merge_operator.reset(new MergeOperatorRouter(*this)); status = rocksdb::DB::Open(opt, path, &db); if (!status.ok()) { derr << status.ToString() << dendl; @@ -260,6 +322,7 @@ int RocksDBStore::_test_init(const string& dir) rocksdb::DB *db; rocksdb::Status status = rocksdb::DB::Open(options, dir, &db); delete db; + db = nullptr; return status.ok() ? 0 : -EIO; } @@ -270,6 +333,7 @@ RocksDBStore::~RocksDBStore() // Ensure db is destroyed before dependent db_cache and filterpolicy delete db; + db = nullptr; if (priv) { delete static_cast<rocksdb::Env*>(priv); @@ -383,6 +447,26 @@ void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix } } +void RocksDBStore::RocksDBTransactionImpl::merge( + const string &prefix, + const string &k, + const bufferlist &to_set_bl) +{ + string key = combine_strings(prefix, k); + + // bufferlist::c_str() is non-constant, so we can't call c_str() + if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { + bat->Merge(rocksdb::Slice(key), + rocksdb::Slice(to_set_bl.buffers().front().c_str(), + to_set_bl.length())); + } else { + // make a copy + bufferlist val = to_set_bl; + bat->Merge(rocksdb::Slice(key), + rocksdb::Slice(val.c_str(), val.length())); + } +} + int RocksDBStore::get( const string &prefix, const std::set<string> &keys, @@ -534,6 +618,7 @@ bool RocksDBStore::check_omap_dir(string &omap_dir) rocksdb::DB *db; rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db); delete db; + db = nullptr; return status.ok(); } void RocksDBStore::compact_range(const string& start, const string& end) diff --git a/src/kv/RocksDBStore.h b/src/kv/RocksDBStore.h index 41508ba6cd1..79897248c4d 100644 --- a/src/kv/RocksDBStore.h +++ b/src/kv/RocksDBStore.h @@ -154,6 +154,10 @@ public: void rmkeys_by_prefix( const string &prefix ); + void merge( + const string& prefix, + const string& k, + const bufferlist &bl); }; KeyValueDB::Transaction get_transaction() { @@ -217,6 +221,12 @@ public: static bufferlist to_bufferlist(rocksdb::Slice in); static string past_prefix(const string &prefix); + class MergeOperatorRouter; + friend class MergeOperatorRouter; + virtual int set_merge_operator(const std::string& prefix, + std::shared_ptr<KeyValueDB::MergeOperator> mop); + string assoc_name; ///< Name of associative operator + virtual uint64_t get_estimated_size(map<string,uint64_t> &extra) { DIR *store_dir = opendir(path.c_str()); if (!store_dir) { @@ -291,4 +301,6 @@ protected: }; + + #endif diff --git a/src/test/objectstore/test_kv.cc b/src/test/objectstore/test_kv.cc index b561650b309..828411d2a35 100644 --- a/src/test/objectstore/test_kv.cc +++ b/src/test/objectstore/test_kv.cc @@ -35,9 +35,20 @@ public: KVTest() : db(0) {} + void rm_r(string path) { + string cmd = string("rm -r ") + path; + cout << "==> " << cmd << std::endl; + int r = ::system(cmd.c_str()); + if (r) { + cerr << "failed with exit code " << r + << ", continuing anyway" << std::endl; + } + } + void init() { + cout << "Creating " << string(GetParam()) << "\n"; db.reset(KeyValueDB::create(g_ceph_context, string(GetParam()), - string("kv_test_temp_dir"))); + "kv_test_temp_dir")); } void fini() { db.reset(NULL); @@ -47,14 +58,15 @@ public: int r = ::mkdir("kv_test_temp_dir", 0777); if (r < 0 && errno != EEXIST) { r = -errno; - cerr << __func__ << ": unable to create kv_test_temp_dir" - << ": " << cpp_strerror(r) << std::endl; + cerr << __func__ << ": unable to create kv_test_temp_dir: " + << cpp_strerror(r) << std::endl; return; } init(); } virtual void TearDown() { fini(); + rm_r("kv_test_temp_dir"); } }; @@ -87,11 +99,11 @@ TEST_P(KVTest, PutReopen) { init(); ASSERT_EQ(0, db->open(cout)); { - bufferlist v; - ASSERT_EQ(0, db->get("prefix", "key", &v)); - ASSERT_EQ(v.length(), 5u); - ASSERT_EQ(0, db->get("prefix", "key2", &v)); - ASSERT_EQ(v.length(), 5u); + bufferlist v1, v2; + ASSERT_EQ(0, db->get("prefix", "key", &v1)); + ASSERT_EQ(v1.length(), 5u); + ASSERT_EQ(0, db->get("prefix", "key2", &v2)); + ASSERT_EQ(v2.length(), 5u); } { KeyValueDB::Transaction t = db->get_transaction(); @@ -104,11 +116,11 @@ TEST_P(KVTest, PutReopen) { init(); ASSERT_EQ(0, db->open(cout)); { - bufferlist v; - ASSERT_EQ(-ENOENT, db->get("prefix", "key", &v)); - ASSERT_EQ(0, db->get("prefix", "key2", &v)); - ASSERT_EQ(v.length(), 5u); - ASSERT_EQ(-ENOENT, db->get("prefix", "key3", &v)); + bufferlist v1, v2, v3; + ASSERT_EQ(-ENOENT, db->get("prefix", "key", &v1)); + ASSERT_EQ(0, db->get("prefix", "key2", &v2)); + ASSERT_EQ(v2.length(), 5u); + ASSERT_EQ(-ENOENT, db->get("prefix", "key3", &v3)); } fini(); } @@ -144,6 +156,71 @@ TEST_P(KVTest, BenchCommit) { utime_t dur = end - start; cout << n << " commits in " << dur << ", avg latency " << (dur / (double)n) << std::endl; + fini(); +} + +struct AppendMOP : public KeyValueDB::MergeOperator { + virtual void merge_nonexistant( + const char *rdata, size_t rlen, std::string *new_value) override { + *new_value = "?" + std::string(rdata, rlen); + } + virtual void merge( + const char *ldata, size_t llen, + const char *rdata, size_t rlen, + std::string *new_value) { + *new_value = std::string(ldata, llen) + std::string(rdata, rlen); + } + // We use each operator name and each prefix to construct the + // overall RocksDB operator name for consistency check at open time. + virtual string name() const { + return "Append"; + } +}; + +string tostr(bufferlist& b) { + return string(b.c_str(),b.length()); +} + +TEST_P(KVTest, Merge) { + shared_ptr<KeyValueDB::MergeOperator> p(new AppendMOP); + int r = db->set_merge_operator("A",p); + if (r < 0) + return; // No merge operators for this database type + ASSERT_EQ(0, db->create_and_open(cout)); + { + KeyValueDB::Transaction t = db->get_transaction(); + bufferlist v1, v2, v3; + v1.append(string("1")); + v2.append(string("2")); + v3.append(string("3")); + t->set("P", "K1", v1); + t->set("A", "A1", v2); + t->rmkey("A", "A2"); + t->merge("A", "A2", v3); + db->submit_transaction_sync(t); + } + { + bufferlist v1, v2, v3; + ASSERT_EQ(0, db->get("P", "K1", &v1)); + ASSERT_EQ(tostr(v1), "1"); + ASSERT_EQ(0, db->get("A", "A1", &v2)); + ASSERT_EQ(tostr(v2), "2"); + ASSERT_EQ(0, db->get("A", "A2", &v3)); + ASSERT_EQ(tostr(v3), "?3"); + } + { + KeyValueDB::Transaction t = db->get_transaction(); + bufferlist v1; + v1.append(string("1")); + t->merge("A", "A2", v1); + db->submit_transaction_sync(t); + } + { + bufferlist v; + ASSERT_EQ(0, db->get("A", "A2", &v)); + ASSERT_EQ(tostr(v), "?31"); + } + fini(); }