diff --git a/doc/man/8/ceph-bluestore-tool.rst b/doc/man/8/ceph-bluestore-tool.rst index 2bc951ebd13..b18f7bc51a9 100644 --- a/doc/man/8/ceph-bluestore-tool.rst +++ b/doc/man/8/ceph-bluestore-tool.rst @@ -23,6 +23,7 @@ Synopsis | **ceph-bluestore-tool** bluefs-bdev-new-db --path *osd path* --dev-target *new-device* | **ceph-bluestore-tool** bluefs-bdev-migrate --path *osd path* --dev-target *new-device* --devs-source *device1* [--devs-source *device2*] | **ceph-bluestore-tool** free-dump|free-score --path *osd path* [ --allocator block/bluefs-wal/bluefs-db/bluefs-slow ] +| **ceph-bluestore-tool** reshard --path *osd path* --sharding *new sharding* [ --sharding-ctrl *control string* ] Description @@ -97,6 +98,17 @@ Commands Give a [0-1] number that represents quality of fragmentation in allocator. 0 represents case when all free space is in one chunk. 1 represents worst possible fragmentation. +:command:`reshard` --path *osd path* --sharding *new sharding* [ --resharding-ctrl *control string* ] + + Changes sharding of BlueStore's RocksDB. Sharding is build on top of RocksDB column families. + This option allows to test performance of *new sharding* without need to redeploy OSD. + Resharding is usually a long process, which involves walking through entire RocksDB key space + and moving some of them to different column families. + Option --resharding-ctrl provides performance control over resharding process. + Interrupted resharding will prevent OSD from running. + Interrupted resharding does not corrupt data. It is always possible to continue previous resharding, + or select any other sharding scheme, including reverting to original one. + Options ======= @@ -137,6 +149,13 @@ Options Useful for *free-dump* and *free-score* actions. Selects allocator(s). +.. option:: --resharding-ctrl *control string* + + Provides control over resharding process. Specifies how often refresh RocksDB iterator, + and how large should commit batch be before committing to RocksDB. Option format is: + /// + Default: 10000000/10000/1000000/1000 + Device labels ============= diff --git a/src/kv/RocksDBStore.cc b/src/kv/RocksDBStore.cc index af42aae5ef7..31ce5d729c9 100644 --- a/src/kv/RocksDBStore.cc +++ b/src/kv/RocksDBStore.cc @@ -92,9 +92,6 @@ public: for (auto& p : store.merge_ops) { names[p.first] = p.second->name(); } - for (auto& p : store.cf_handles) { - names.erase(p.first); - } for (auto& p : names) { store.assoc_name += '.'; store.assoc_name += p.first; @@ -296,7 +293,7 @@ int RocksDBStore::ParseOptionsFromStringStatic( return -EINVAL; } } - lgeneric_dout(cct, 0) << " set rocksdb option " << it->first + lgeneric_dout(cct, 1) << " set rocksdb option " << it->first << " = " << it->second << dendl; } return 0; @@ -414,6 +411,8 @@ int RocksDBStore::load_rocksdb_options(bool create_if_missing, rocksdb::Options& if (priv) { dout(10) << __func__ << " using custom Env " << priv << dendl; opt.env = static_cast(priv); + } else { + env = opt.env; } opt.env->SetAllowNonOwnerAccess(false); @@ -985,7 +984,6 @@ int RocksDBStore::_test_init(const string& dir) RocksDBStore::~RocksDBStore() { close(); - if (priv) { delete static_cast(priv); } @@ -2635,3 +2633,426 @@ RocksDBStore::WholeSpaceIterator RocksDBStore::get_default_cf_iterator() return std::make_shared( db->NewIterator(rocksdb::ReadOptions(), default_cf)); } + +int RocksDBStore::prepare_for_reshard(const std::string& new_sharding, + std::vector& to_process_columns, + std::vector& to_process_handles) +{ + //0. lock db from opening + //1. list existing columns + //2. apply merge operator to (main + columns) opts + //3. prepare std::vector existing_cfs + //4. open db, acquire existing column handles + //5. calculate missing columns + //6. create missing columns + //7. construct cf_handles according to new sharding + //8. check is all cf_handles are filled + + bool b; + std::vector new_sharding_def; + char const* error_position; + std::string error_msg; + b = parse_sharding_def(new_sharding, new_sharding_def, &error_position, &error_msg); + if (!b) { + dout(1) << __func__ << " bad sharding: " << dendl; + dout(1) << __func__ << new_sharding << dendl; + dout(1) << __func__ << std::string(error_position - &new_sharding[0], ' ') << "^" << error_msg << dendl; + return -EINVAL; + } + + //0. lock db from opening + std::string stored_sharding_text; + rocksdb::ReadFileToString(env, + sharding_def_file, + &stored_sharding_text); + if (stored_sharding_text.find("reshardingXcommencingXlocked") == string::npos) { + rocksdb::Status status; + if (stored_sharding_text.size() != 0) + stored_sharding_text += " "; + stored_sharding_text += "reshardingXcommencingXlocked"; + env->CreateDir(sharding_def_dir); + status = rocksdb::WriteStringToFile(env, stored_sharding_text, + sharding_def_file, true); + if (!status.ok()) { + derr << __func__ << " cannot write to " << sharding_def_file << dendl; + return -EIO; + } + } + + //1. list existing columns + + rocksdb::Status status; + std::vector existing_columns; + rocksdb::Options opt; + int r = load_rocksdb_options(false, opt); + if (r) { + dout(1) << __func__ << " load rocksdb options failed" << dendl; + return r; + } + status = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt), path, &existing_columns); + if (!status.ok()) { + derr << "Unable to list column families: " << status.ToString() << dendl; + return -EINVAL; + } + dout(5) << "existing columns = " << existing_columns << dendl; + + //2. apply merge operator to (main + columns) opts + //3. prepare std::vector cfs_to_open + + std::vector cfs_to_open; + for (const auto& full_name : existing_columns) { + //split col_name to - + std::string base_name; + size_t pos = full_name.find('-'); + if (std::string::npos == pos) + base_name = full_name; + else + base_name = full_name.substr(0,pos); + + rocksdb::ColumnFamilyOptions cf_opt(opt); + // search if we have options for this column + std::string options; + for (const auto& nsd : new_sharding_def) { + if (nsd.name == base_name) { + options = nsd.options; + break; + } + } + status = rocksdb::GetColumnFamilyOptionsFromString(cf_opt, options, &cf_opt); + if (!status.ok()) { + derr << __func__ << " failure parsing column options: " << options << dendl; + return -EINVAL; + } + if (base_name != rocksdb::kDefaultColumnFamilyName) + install_cf_mergeop(base_name, &cf_opt); + cfs_to_open.emplace_back(full_name, cf_opt); + } + + //4. open db, acquire existing column handles + std::vector handles; + status = rocksdb::DB::Open(rocksdb::DBOptions(opt), + path, cfs_to_open, &handles, &db); + if (!status.ok()) { + derr << status.ToString() << dendl; + return -EINVAL; + } + for (size_t i = 0; i < cfs_to_open.size(); i++) { + dout(10) << "column " << cfs_to_open[i].name << " handle " << (void*)handles[i] << dendl; + } + + //5. calculate missing columns + std::vector new_sharding_columns; + std::vector missing_columns; + sharding_def_to_columns(new_sharding_def, + new_sharding_columns); + dout(5) << "target columns = " << new_sharding_columns << dendl; + for (const auto& n : new_sharding_columns) { + bool found = false; + for (const auto& e : existing_columns) { + if (n == e) { + found = true; + break; + } + } + if (!found) { + missing_columns.push_back(n); + } + } + dout(5) << "missing columns = " << missing_columns << dendl; + + //6. create missing columns + for (const auto& full_name : missing_columns) { + std::string base_name; + size_t pos = full_name.find('-'); + if (std::string::npos == pos) + base_name = full_name; + else + base_name = full_name.substr(0,pos); + + rocksdb::ColumnFamilyOptions cf_opt(opt); + // search if we have options for this column + std::string options; + for (const auto& nsd : new_sharding_def) { + if (nsd.name == base_name) { + options = nsd.options; + break; + } + } + status = rocksdb::GetColumnFamilyOptionsFromString(cf_opt, options, &cf_opt); + if (!status.ok()) { + derr << __func__ << " failure parsing column options: " << options << dendl; + return -EINVAL; + } + install_cf_mergeop(base_name, &cf_opt); + rocksdb::ColumnFamilyHandle *cf; + status = db->CreateColumnFamily(cf_opt, full_name, &cf); + if (!status.ok()) { + derr << __func__ << " Failed to create rocksdb column family: " + << full_name << dendl; + return -EINVAL; + } + dout(10) << "created column " << full_name << " handle = " << (void*)cf << dendl; + existing_columns.push_back(full_name); + handles.push_back(cf); + } + + //7. construct cf_handles according to new sharding + for (size_t i = 0; i < existing_columns.size(); i++) { + std::string full_name = existing_columns[i]; + rocksdb::ColumnFamilyHandle *cf = handles[i]; + std::string base_name; + size_t shard_idx = 0; + size_t pos = full_name.find('-'); + dout(10) << "processing column " << full_name << dendl; + if (std::string::npos == pos) { + base_name = full_name; + } else { + base_name = full_name.substr(0,pos); + shard_idx = atoi(full_name.substr(pos+1).c_str()); + } + if (rocksdb::kDefaultColumnFamilyName == base_name) { + default_cf = handles[i]; + must_close_default_cf = true; + } else { + for (const auto& nsd : new_sharding_def) { + if (nsd.name == base_name) { + if (shard_idx < nsd.shard_cnt) { + add_column_family(base_name, nsd.hash_l, nsd.hash_h, shard_idx, cf); + } else { + //ignore columns with index larger then shard count + } + break; + } + } + } + } + + //8. check if all cf_handles are filled + for (const auto& col : cf_handles) { + for (size_t i = 0; i < col.second.handles.size(); i++) { + if (col.second.handles[i] == nullptr) { + derr << "missing handle for column " << col.first << " shard " << i << dendl; + return -EIO; + } + } + } + to_process_columns = existing_columns; + to_process_handles = handles; + return 0; +} + +int RocksDBStore::reshard_cleanup(const std::vector& current_columns, + const std::vector& current_handles) +{ + std::vector new_sharding_columns; + for (const auto& col: cf_handles) { + if (col.second.handles.size() == 1) { + new_sharding_columns.push_back(col.first); + } else { + for (size_t i = 0; i < col.second.handles.size(); i++) { + new_sharding_columns.push_back(col.first + "-" + to_string(i)); + } + } + } + + for (size_t i = 0; i < current_columns.size(); i++) { + bool found = false; + for (size_t j = 0; j < new_sharding_columns.size(); j++) { + if (current_columns[i] == new_sharding_columns[j]) { + found = true; + break; + } + } + if (found || current_columns[i] == rocksdb::kDefaultColumnFamilyName) { + dout(5) << "Column " << current_columns[i] << " is part of new sharding." << dendl; + continue; + } + dout(5) << "Column " << current_columns[i] << " not part of new sharding. Deleting." << dendl; + + // verify that column is empty + rocksdb::Iterator* it; + it = db->NewIterator(rocksdb::ReadOptions(), current_handles[i]); + ceph_assert(it); + it->SeekToFirst(); + ceph_assert(!it->Valid()); + delete it; + rocksdb::Status status; + status = db->DropColumnFamily(current_handles[i]); + if (!status.ok()) { + derr << __func__ << " Failed to delete column: " + << current_columns[i] << dendl; + return -EINVAL; + } + } + return 0; +} + +int RocksDBStore::reshard(const std::string& new_sharding, const RocksDBStore::resharding_ctrl* ctrl_in) +{ + rocksdb::Status status; + int r; + std::vector to_process_columns; + std::vector to_process_handles; + + resharding_ctrl ctrl = ctrl_in ? *ctrl_in : resharding_ctrl(); + size_t bytes_in_batch = 0; + size_t keys_in_batch = 0; + size_t bytes_per_iterator = 0; + size_t keys_per_iterator = 0; + size_t keys_processed = 0; + size_t keys_moved = 0; + + rocksdb::WriteBatch* bat = nullptr; + + auto flush_batch = [&]() { + dout(10) << "flushing batch, " << keys_in_batch << " keys, for " + << bytes_in_batch << " bytes" << dendl; + rocksdb::WriteOptions woptions; + woptions.sync = true; + rocksdb::Status s = db->Write(woptions, bat); + ceph_assert(s.ok()); + bytes_in_batch = 0; + keys_in_batch = 0; + delete bat; + bat = new rocksdb::WriteBatch(); + ceph_assert(bat); + }; + + auto process_column = [&](rocksdb::ColumnFamilyHandle* handle, + const std::string& fixed_prefix) + { + int r = 0; + dout(5) << " column=" << (void*)handle << " prefix=" << fixed_prefix << dendl; + rocksdb::Iterator* it; + it = db->NewIterator(rocksdb::ReadOptions(), handle); + ceph_assert(it); + bat = new rocksdb::WriteBatch(); + ceph_assert(bat); + + for (it->SeekToFirst(); it->Valid(); it->Next()) { + rocksdb::Slice raw_key = it->key(); + dout(30) << "key=" << pretty_binary_string(raw_key.ToString()) << dendl; + //check if need to refresh iterator + if (bytes_per_iterator >= ctrl.bytes_per_iterator || + keys_per_iterator >= ctrl.keys_per_iterator) { + dout(8) << "refreshing iterator" << dendl; + bytes_per_iterator = 0; + keys_per_iterator = 0; + std::string raw_key_str = raw_key.ToString(); + delete it; + it = db->NewIterator(rocksdb::ReadOptions(), handle); + ceph_assert(it); + it->Seek(raw_key_str); + ceph_assert(it->Valid()); + raw_key = it->key(); + } + rocksdb::Slice value = it->value(); + std::string prefix, key; + if (fixed_prefix.size() == 0) { + split_key(raw_key, &prefix, &key); + } else { + prefix = fixed_prefix; + key = raw_key.ToString(); + } + keys_processed++; + if ((keys_processed % 10000) == 0) { + dout(10) << "processed " << keys_processed << " keys, moved " << keys_moved << dendl; + } + std::string new_raw_key; + rocksdb::ColumnFamilyHandle* new_handle = get_cf_handle(prefix, key); + if (new_handle == nullptr) { + new_handle = default_cf; + } + if (handle == new_handle) { + continue; + } + if (new_handle == default_cf) { + new_raw_key = combine_strings(prefix, key); + } else { + new_raw_key = key; + } + bat->Delete(handle, raw_key); + bat->Put(new_handle, new_raw_key, value); + dout(25) << "moving " << (void*)handle << "/" << pretty_binary_string(raw_key.ToString()) << + " to " << (void*)new_handle << "/" << pretty_binary_string(new_raw_key) << + " size " << value.size() << dendl; + keys_moved++; + bytes_in_batch += new_raw_key.size() * 2 + value.size(); + keys_in_batch++; + bytes_per_iterator += new_raw_key.size() * 2 + value.size(); + keys_per_iterator++; + + //check if need to write batch + if (bytes_in_batch >= ctrl.bytes_per_batch || + keys_in_batch >= ctrl.keys_per_batch) { + flush_batch(); + if (ctrl.unittest_fail_after_first_batch) { + r = -1000; + goto out; + } + } + } + flush_batch(); + out: + delete it; + delete bat; + return r; + }; + + r = prepare_for_reshard(new_sharding, to_process_columns, to_process_handles); + if (r != 0) { + dout(1) << "failed to prepare db for reshard" << dendl; + goto cleanup; + } + + ceph_assert(to_process_columns.size() == to_process_handles.size()); + for (size_t idx = 0; idx < to_process_columns.size(); idx++) { + dout(5) << "Processing column=" << to_process_columns[idx] << + " handle=" << to_process_handles[idx] << dendl; + if (to_process_columns[idx] == rocksdb::kDefaultColumnFamilyName) { + ceph_assert(to_process_handles[idx] == default_cf); + r = process_column(default_cf, std::string()); + } else { + std::string fixed_prefix = to_process_columns[idx].substr(0, to_process_columns[idx].find('-')); + dout(10) << "Prefix: " << fixed_prefix << dendl; + r = process_column(to_process_handles[idx], fixed_prefix); + } + if (r != 0) { + derr << "Error processing column " << to_process_columns[idx] << dendl; + goto cleanup; + } + if (ctrl.unittest_fail_after_processing_column) { + r = -1001; + goto cleanup; + } + } + + r = reshard_cleanup(to_process_columns, to_process_handles); + if (r != 0) { + dout(5) << "failed to cleanup after reshard" << dendl; + goto cleanup; + } + + if (ctrl.unittest_fail_after_successful_processing) { + r = -1002; + goto cleanup; + } + env->CreateDir(sharding_def_dir); + status = rocksdb::WriteStringToFile(env, new_sharding, + sharding_def_file, true); + if (!status.ok()) { + derr << __func__ << " cannot write to " << sharding_def_file << dendl; + r = -EIO; + } + + cleanup: + //close column handles + for (const auto& col: cf_handles) { + for (size_t i = 0; i < col.second.handles.size(); i++) { + db->DestroyColumnFamilyHandle(col.second.handles[i]); + } + } + cf_handles.clear(); + close(); + return r; +} diff --git a/src/kv/RocksDBStore.h b/src/kv/RocksDBStore.h index f98c30a3d3f..bdedda083d0 100644 --- a/src/kv/RocksDBStore.h +++ b/src/kv/RocksDBStore.h @@ -519,6 +519,23 @@ err: WholeSpaceIterator get_wholespace_iterator(IteratorOpts opts = 0) override; private: WholeSpaceIterator get_default_cf_iterator(); + + int prepare_for_reshard(const std::string& new_sharding, + std::vector& to_process_columns, + std::vector& to_process_handles); + int reshard_cleanup(const std::vector& current_columns, + const std::vector& current_handles); +public: + struct resharding_ctrl { + size_t bytes_per_iterator = 10000000; /// amount of data to process before refreshing iterator + size_t keys_per_iterator = 10000; + size_t bytes_per_batch = 1000000; /// amount of data before submitting batch + size_t keys_per_batch = 1000; + bool unittest_fail_after_first_batch = false; + bool unittest_fail_after_processing_column = false; + bool unittest_fail_after_successful_processing = false; + }; + int reshard(const std::string& new_sharding, const resharding_ctrl* ctrl = nullptr); }; #endif diff --git a/src/os/bluestore/BlueStore.cc b/src/os/bluestore/BlueStore.cc index 397cb3c0409..7e7488f4793 100644 --- a/src/os/bluestore/BlueStore.cc +++ b/src/os/bluestore/BlueStore.cc @@ -5664,19 +5664,80 @@ void BlueStore::_sync_bluefs_and_fm() } } -int BlueStore::_open_db(bool create, bool to_repair_db, bool read_only) +int BlueStore::open_db_environment(KeyValueDB **pdb) +{ + string kv_dir_fn; + string kv_backend; + _kv_only = true; + { + string type; + int r = read_meta("type", &type); + if (r < 0) { + derr << __func__ << " failed to load os-type: " << cpp_strerror(r) + << dendl; + return r; + } + + if (type != "bluestore") { + derr << __func__ << " expected bluestore, but type is " << type << dendl; + return -EIO; + } + } + int r = _open_path(); + if (r < 0) + return r; + r = _open_fsid(false); + if (r < 0) + goto out_path; + + r = _read_fsid(&fsid); + if (r < 0) + goto out_fsid; + + r = _lock_fsid(); + if (r < 0) + goto out_fsid; + + r = _open_bdev(false); + if (r < 0) + goto out_fsid; + + r = _prepare_db_environment(false, false, &kv_dir_fn, &kv_backend); + if (r < 0) + goto out_bdev; + + *pdb = db; + return 0; + + out_bdev: + _close_bdev(); + out_fsid: + _close_fsid(); + out_path: + _close_path(); + + return r; +} + +int BlueStore::close_db_environment() +{ + _close_db_and_around(false); + _close_bdev(); + _close_fsid(); + _close_path(); + return 0; +} + +int BlueStore::_prepare_db_environment(bool create, bool read_only, + std::string* _fn, std::string* _kv_backend) { int r; ceph_assert(!db); - ceph_assert(!(create && read_only)); - string fn = path + "/db"; - string options; - stringstream err; + std::string& fn=*_fn; + std::string& kv_backend=*_kv_backend; + fn = path + "/db"; std::shared_ptr merge_op(new Int64ArrayMergeOperator); - string kv_backend; - std::string sharding_def; - if (create) { kv_backend = cct->_conf->bluestore_kvbackend; } else { @@ -5816,7 +5877,23 @@ int BlueStore::_open_db(bool create, bool to_repair_db, bool read_only) FreelistManager::setup_merge_operators(db); db->set_merge_operator(PREFIX_STAT, merge_op); db->set_cache_size(cache_kv_ratio * cache_size); + return 0; +} +int BlueStore::_open_db(bool create, bool to_repair_db, bool read_only) +{ + int r; + ceph_assert(!(create && read_only)); + string options; + stringstream err; + string kv_dir_fn; + string kv_backend; + std::string sharding_def; + r = _prepare_db_environment(create, read_only, &kv_dir_fn, &kv_backend); + if (r < 0) { + derr << __func__ << " failed to prepare db environment: " << err.str() << dendl; + return -EIO; + } if (kv_backend == "rocksdb") { options = cct->_conf->bluestore_rocksdb_options; if (cct->_conf.get_val("bluestore_rocksdb_cf")) { @@ -5842,7 +5919,7 @@ int BlueStore::_open_db(bool create, bool to_repair_db, bool read_only) return -EIO; } dout(1) << __func__ << " opened " << kv_backend - << " path " << fn << " options " << options << dendl; + << " path " << kv_dir_fn << " options " << options << dendl; return 0; } diff --git a/src/os/bluestore/BlueStore.h b/src/os/bluestore/BlueStore.h index 95c55a04ff1..759b6b6126d 100644 --- a/src/os/bluestore/BlueStore.h +++ b/src/os/bluestore/BlueStore.h @@ -2282,6 +2282,9 @@ private: */ int _open_db_and_around(bool read_only); void _close_db_and_around(bool read_only); + int _prepare_db_environment(bool create, bool read_only, + std::string* kv_dir, std::string* kv_backend); + int _close_db_environment(); // updates legacy bluefs related recs in DB to a state valid for // downgrades from nautilus. @@ -2528,6 +2531,9 @@ public: return 0; } + int open_db_environment(KeyValueDB **pdb); + int close_db_environment(); + int write_meta(const std::string& key, const std::string& value) override; int read_meta(const std::string& key, std::string *value) override; diff --git a/src/os/bluestore/bluestore_tool.cc b/src/os/bluestore/bluestore_tool.cc index 092a05acb7f..58a75bc0b49 100644 --- a/src/os/bluestore/bluestore_tool.cc +++ b/src/os/bluestore/bluestore_tool.cc @@ -19,6 +19,7 @@ #include "os/bluestore/BlueFS.h" #include "os/bluestore/BlueStore.h" #include "common/admin_socket.h" +#include "kv/RocksDBStore.h" namespace po = boost::program_options; @@ -226,6 +227,9 @@ int main(int argc, char **argv) string log_file; string key, value; vector allocs_name; + string empty_sharding(1, '\0'); + string new_sharding = empty_sharding; + string resharding_ctrl; int log_level = 30; bool fsck_deep = false; po::options_description po_options("Options"); @@ -242,6 +246,8 @@ int main(int argc, char **argv) ("key,k", po::value(&key), "label metadata key name") ("value,v", po::value(&value), "label metadata value") ("allocator", po::value>(&allocs_name), "allocator to inspect: 'block'/'bluefs-wal'/'bluefs-db'/'bluefs-slow'") + ("sharding", po::value(&new_sharding), "new sharding to apply") + ("resharding-ctrl", po::value(&resharding_ctrl), "gives control over resharding procedure details") ; po::options_description po_positional("Positional options"); po_positional.add_options() @@ -262,7 +268,8 @@ int main(int argc, char **argv) "bluefs-log-dump, " "free-dump, " "free-score, " - "bluefs-stats") + "bluefs-stats, " + "reshard") ; po::options_description po_all("All options"); po_all.add(po_options).add(po_positional); @@ -395,6 +402,16 @@ int main(int argc, char **argv) if (allocs_name.empty()) allocs_name = vector{"block", "bluefs-db", "bluefs-wal", "bluefs-slow"}; } + if (action == "reshard") { + if (path.empty()) { + cerr << "must specify bluestore path" << std::endl; + exit(EXIT_FAILURE); + } + if (new_sharding == empty_sharding) { + cerr << "must provide reshard specification" << std::endl; + exit(EXIT_FAILURE); + } + } vector args; if (log_file.size()) { args.push_back("--log-file"); @@ -880,6 +897,57 @@ int main(int argc, char **argv) } cout << std::string(out.c_str(), out.length()) << std::endl; bluestore.cold_close(); + } else if (action == "reshard") { + auto get_ctrl = [&](size_t& val) { + if (!resharding_ctrl.empty()) { + size_t pos; + std::string token; + pos = resharding_ctrl.find('/'); + token = resharding_ctrl.substr(0, pos); + if (pos != std::string::npos) + resharding_ctrl.erase(0, pos + 1); + else + resharding_ctrl.erase(); + char* endptr; + val = strtoll(token.c_str(), &endptr, 0); + if (*endptr != '\0') { + cerr << "invalid --resharding-ctrl. '" << token << "' is not a number" << std::endl; + exit(EXIT_FAILURE); + } + } + }; + BlueStore bluestore(cct.get(), path); + KeyValueDB *db_ptr; + RocksDBStore::resharding_ctrl ctrl; + if (!resharding_ctrl.empty()) { + get_ctrl(ctrl.bytes_per_iterator); + get_ctrl(ctrl.keys_per_iterator); + get_ctrl(ctrl.bytes_per_batch); + get_ctrl(ctrl.keys_per_batch); + if (!resharding_ctrl.empty()) { + cerr << "extra chars in --resharding-ctrl" << std::endl; + exit(EXIT_FAILURE); + } + } + int r = bluestore.open_db_environment(&db_ptr); + if (r < 0) { + cerr << "error preparing db environment: " << cpp_strerror(r) << std::endl; + exit(EXIT_FAILURE); + } + if (r < 0) { + cerr << "error starting k-v inside bluestore: " << cpp_strerror(r) << std::endl; + exit(EXIT_FAILURE); + } + RocksDBStore* rocks_db = dynamic_cast(db_ptr); + ceph_assert(db_ptr); + ceph_assert(rocks_db); + r = rocks_db->reshard(new_sharding, &ctrl); + if (r < 0) { + cerr << "error resharding: " << cpp_strerror(r) << std::endl; + } else { + cout << "reshard success" << std::endl; + } + bluestore.close_db_environment(); } else { cerr << "unrecognized action " << action << std::endl; return 1; diff --git a/src/test/objectstore/test_kv.cc b/src/test/objectstore/test_kv.cc index 24053c7fbf8..7ea1e38d4c9 100644 --- a/src/test/objectstore/test_kv.cc +++ b/src/test/objectstore/test_kv.cc @@ -987,6 +987,280 @@ TEST_P(RocksDBShardingTest, wholespace_lookup_limits) { +class RocksDBResharding : public ::testing::Test { +public: + boost::scoped_ptr db; + + RocksDBResharding() : db(0) {} + + string _bl_to_str(bufferlist val) { + string str(val.c_str(), val.length()); + return str; + } + + void rm_r(string path) { + string cmd = string("rm -r ") + path; + if (verbose) + cout << "==> " << cmd << std::endl; + int r = ::system(cmd.c_str()); + if (r) { + cerr << "failed with exit code " << r + << ", continuing anyway" << std::endl; + } + } + + void SetUp() override { + verbose = getenv("VERBOSE") && strcmp(getenv("VERBOSE"), "1") == 0; + + int r = ::mkdir("kv_test_temp_dir", 0777); + if (r < 0 && errno != EEXIST) { + r = -errno; + cerr << __func__ << ": unable to create kv_test_temp_dir: " + << cpp_strerror(r) << std::endl; + return; + } + + KeyValueDB* db_kv = KeyValueDB::create(g_ceph_context, "rocksdb", + "kv_test_temp_dir"); + RocksDBStore* db_rocks = dynamic_cast(db_kv); + ceph_assert(db_rocks); + db.reset(db_rocks); + ASSERT_EQ(0, db->init(g_conf()->bluestore_rocksdb_options)); + } + void TearDown() override { + db.reset(nullptr); + rm_r("kv_test_temp_dir"); + } + + bool verbose; + std::vector prefixes = {"Ad", "Betelgeuse", "C", "D", "Evade"}; + std::vector randoms = {"0", "1", "2", "3", "4", "5", + "found", "brain", "fully", "pen", "worth", "race", + "stand", "nodded", "whenever", "surrounded", "industrial", "skin", + "this", "direction", "family", "beginning", "whenever", "held", + "metal", "year", "like", "valuable", "softly", "whistle", + "perfectly", "broken", "idea", "also", "coffee", "branch", + "tongue", "immediately", "bent", "partly", "burn", "include", + "certain", "burst", "final", "smoke", "positive", "perfectly" + }; + int R = randoms.size(); + int k = 0; + std::map data; + + void generate_data() { + data.clear(); + for (size_t p = 0; p < prefixes.size(); p++) { + size_t elem_count = 1 << (( p * 3 ) + 3); + for (size_t i = 0; i < elem_count; i++) { + std::string key; + for (int x = 0; x < 5; x++) { + key = key + randoms[rand() % R]; + } + std::string value; + for (int x = 0; x < 3; x++) { + value = value + randoms[rand() % R]; + } + data[RocksDBStore::combine_strings(prefixes[p], key)] = value; + } + } + } + + void data_to_db() { + KeyValueDB::Transaction t = db->get_transaction(); + size_t i = 0; + for (auto& d: data) { + bufferlist v1; + v1.append(d.second); + string prefix; + string key; + RocksDBStore::split_key(d.first, &prefix, &key); + t->set(prefix, key, v1); + if (verbose) + std::cout << "SET " << prefix << " " << key << std::endl; + i++; + if ((i % 1000) == 0) { + ASSERT_EQ(db->submit_transaction_sync(t), 0); + t.reset(); + if (verbose) + std::cout << "writing key to DB" << std::endl; + t = db->get_transaction(); + } + } + if (verbose) + std::cout << "writing keys to DB" << std::endl; + ASSERT_EQ(db->submit_transaction_sync(t), 0); + } + + void clear_db() { + KeyValueDB::Transaction t = db->get_transaction(); + for (auto &d : data) { + string prefix; + string key; + RocksDBStore::split_key(d.first, &prefix, &key); + t->rmkey(prefix, key); + } + ASSERT_EQ(db->submit_transaction_sync(t), 0); + //paranoid, check if db empty + KeyValueDB::WholeSpaceIterator it = db->get_wholespace_iterator(); + ASSERT_EQ(it->seek_to_first(), 0); + ASSERT_EQ(it->valid(), false); + } + + void check_db() { + KeyValueDB::WholeSpaceIterator it = db->get_wholespace_iterator(); + //move forward + auto dit = data.begin(); + int r = it->seek_to_first(); + ASSERT_EQ(r, 0); + ASSERT_EQ(it->valid(), (dit != data.end())); + + while (dit != data.end()) { + ASSERT_EQ(it->valid(), true); + string prefix; + string key; + RocksDBStore::split_key(dit->first, &prefix, &key); + auto raw_key = it->raw_key(); + ASSERT_EQ(raw_key.first, prefix); + ASSERT_EQ(raw_key.second, key); + ASSERT_EQ(it->value().to_str(), dit->second); + if (verbose) + std::cout << "next " << prefix << " " << key << std::endl; + ASSERT_EQ(it->next(), 0); + ++dit; + } + ASSERT_EQ(it->valid(), false); + } +}; + +TEST_F(RocksDBResharding, basic) { + ASSERT_EQ(0, db->create_and_open(cout, "")); + generate_data(); + data_to_db(); + check_db(); + db->close(); + ASSERT_EQ(db->reshard("Evade(4)"), 0); + ASSERT_EQ(db->open(cout), 0); + check_db(); + db->close(); +} + +TEST_F(RocksDBResharding, all_to_shards) { + ASSERT_EQ(0, db->create_and_open(cout, "")); + generate_data(); + data_to_db(); + check_db(); + db->close(); + ASSERT_EQ(db->reshard("Ad(1) Betelgeuse(1) C(1) D(1) Evade(1)"), 0); + ASSERT_EQ(db->open(cout), 0); + check_db(); + db->close(); +} + +TEST_F(RocksDBResharding, all_to_shards_and_back_again) { + ASSERT_EQ(0, db->create_and_open(cout, "")); + generate_data(); + data_to_db(); + check_db(); + db->close(); + ASSERT_EQ(db->reshard("Ad(1) Betelgeuse(1) C(1) D(1) Evade(1)"), 0); + ASSERT_EQ(db->open(cout), 0); + check_db(); + db->close(); + ASSERT_EQ(db->reshard(""), 0); + ASSERT_EQ(db->open(cout), 0); + check_db(); + db->close(); +} + +TEST_F(RocksDBResharding, resume_interrupted_at_batch) { + ASSERT_EQ(0, db->create_and_open(cout, "")); + generate_data(); + data_to_db(); + check_db(); + db->close(); + RocksDBStore::resharding_ctrl ctrl; + ctrl.unittest_fail_after_first_batch = true; + ASSERT_EQ(db->reshard("Evade(4)", &ctrl), -1000); + ASSERT_NE(db->open(cout), 0); + ASSERT_EQ(db->reshard("Evade(4)"), 0); + ASSERT_EQ(db->open(cout), 0); + check_db(); + db->close(); +} + +TEST_F(RocksDBResharding, resume_interrupted_at_column) { + ASSERT_EQ(0, db->create_and_open(cout, "")); + generate_data(); + data_to_db(); + check_db(); + db->close(); + RocksDBStore::resharding_ctrl ctrl; + ctrl.unittest_fail_after_processing_column = true; + ASSERT_EQ(db->reshard("Evade(4)", &ctrl), -1001); + ASSERT_NE(db->open(cout), 0); + ASSERT_EQ(db->reshard("Evade(4)"), 0); + ASSERT_EQ(db->open(cout), 0); + check_db(); + db->close(); +} + +TEST_F(RocksDBResharding, resume_interrupted_before_commit) { + ASSERT_EQ(0, db->create_and_open(cout, "")); + generate_data(); + data_to_db(); + check_db(); + db->close(); + RocksDBStore::resharding_ctrl ctrl; + ctrl.unittest_fail_after_successful_processing = true; + ASSERT_EQ(db->reshard("Evade(4)", &ctrl), -1002); + ASSERT_NE(db->open(cout), 0); + ASSERT_EQ(db->reshard("Evade(4)"), 0); + ASSERT_EQ(db->open(cout), 0); + check_db(); + db->close(); +} + +TEST_F(RocksDBResharding, prevent_incomplete_hash_change) { + ASSERT_EQ(0, db->create_and_open(cout, "Evade(4,0-3)")); + generate_data(); + data_to_db(); + check_db(); + db->close(); + RocksDBStore::resharding_ctrl ctrl; + ctrl.unittest_fail_after_successful_processing = true; + ASSERT_EQ(db->reshard("Evade(4,0-8)", &ctrl), -1002); + ASSERT_NE(db->open(cout), 0); + ASSERT_EQ(db->reshard("Evade(4,0-8)"), 0); + ASSERT_EQ(db->open(cout), 0); + check_db(); + db->close(); +} + +TEST_F(RocksDBResharding, change_reshard) { + ASSERT_EQ(0, db->create_and_open(cout, "Ad(4)")); + generate_data(); + data_to_db(); + check_db(); + db->close(); + RocksDBStore::resharding_ctrl ctrl; + ctrl.unittest_fail_after_first_batch = true; + ASSERT_EQ(db->reshard("C(5) D(3)", &ctrl), -1000); + ASSERT_NE(db->open(cout), 0); + ctrl.unittest_fail_after_first_batch = false; + ctrl.unittest_fail_after_processing_column = true; + ASSERT_EQ(db->reshard("C(5) Evade(2)", &ctrl), -1001); + ASSERT_NE(db->open(cout), 0); + ctrl.unittest_fail_after_processing_column = false; + ctrl.unittest_fail_after_successful_processing = true; + ASSERT_EQ(db->reshard("Evade(2) D(3)", &ctrl), -1002); + ASSERT_NE(db->open(cout), 0); + ASSERT_EQ(db->reshard("Ad(1) Evade(5)"), 0); + ASSERT_EQ(db->open(cout), 0); + check_db(); + db->close(); +} + + INSTANTIATE_TEST_SUITE_P( KeyValueDB, KVTest,