Merge pull request #34667 from aclamk/wip-rocksdb-reshard

Resharding tool for sharded rocksdb

Reviewed-by: Josh Durgin <jdurgin@redhat.com>
This commit is contained in:
Kefu Chai 2020-06-09 20:11:32 +08:00 committed by GitHub
commit e13193fcaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 897 additions and 15 deletions

View File

@ -23,6 +23,7 @@ Synopsis
| **ceph-bluestore-tool** bluefs-bdev-new-db --path *osd path* --dev-target *new-device*
| **ceph-bluestore-tool** bluefs-bdev-migrate --path *osd path* --dev-target *new-device* --devs-source *device1* [--devs-source *device2*]
| **ceph-bluestore-tool** free-dump|free-score --path *osd path* [ --allocator block/bluefs-wal/bluefs-db/bluefs-slow ]
| **ceph-bluestore-tool** reshard --path *osd path* --sharding *new sharding* [ --sharding-ctrl *control string* ]
Description
@ -97,6 +98,17 @@ Commands
Give a [0-1] number that represents quality of fragmentation in allocator.
0 represents case when all free space is in one chunk. 1 represents worst possible fragmentation.
:command:`reshard` --path *osd path* --sharding *new sharding* [ --resharding-ctrl *control string* ]
Changes sharding of BlueStore's RocksDB. Sharding is build on top of RocksDB column families.
This option allows to test performance of *new sharding* without need to redeploy OSD.
Resharding is usually a long process, which involves walking through entire RocksDB key space
and moving some of them to different column families.
Option --resharding-ctrl provides performance control over resharding process.
Interrupted resharding will prevent OSD from running.
Interrupted resharding does not corrupt data. It is always possible to continue previous resharding,
or select any other sharding scheme, including reverting to original one.
Options
=======
@ -137,6 +149,13 @@ Options
Useful for *free-dump* and *free-score* actions. Selects allocator(s).
.. option:: --resharding-ctrl *control string*
Provides control over resharding process. Specifies how often refresh RocksDB iterator,
and how large should commit batch be before committing to RocksDB. Option format is:
<iterator_refresh_bytes>/<iterator_refresh_keys>/<batch_commit_bytes>/<batch_commit_keys>
Default: 10000000/10000/1000000/1000
Device labels
=============

View File

@ -92,9 +92,6 @@ public:
for (auto& p : store.merge_ops) {
names[p.first] = p.second->name();
}
for (auto& p : store.cf_handles) {
names.erase(p.first);
}
for (auto& p : names) {
store.assoc_name += '.';
store.assoc_name += p.first;
@ -296,7 +293,7 @@ int RocksDBStore::ParseOptionsFromStringStatic(
return -EINVAL;
}
}
lgeneric_dout(cct, 0) << " set rocksdb option " << it->first
lgeneric_dout(cct, 1) << " set rocksdb option " << it->first
<< " = " << it->second << dendl;
}
return 0;
@ -414,6 +411,8 @@ int RocksDBStore::load_rocksdb_options(bool create_if_missing, rocksdb::Options&
if (priv) {
dout(10) << __func__ << " using custom Env " << priv << dendl;
opt.env = static_cast<rocksdb::Env*>(priv);
} else {
env = opt.env;
}
opt.env->SetAllowNonOwnerAccess(false);
@ -985,7 +984,6 @@ int RocksDBStore::_test_init(const string& dir)
RocksDBStore::~RocksDBStore()
{
close();
if (priv) {
delete static_cast<rocksdb::Env*>(priv);
}
@ -2635,3 +2633,426 @@ RocksDBStore::WholeSpaceIterator RocksDBStore::get_default_cf_iterator()
return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
db->NewIterator(rocksdb::ReadOptions(), default_cf));
}
int RocksDBStore::prepare_for_reshard(const std::string& new_sharding,
std::vector<std::string>& to_process_columns,
std::vector<rocksdb::ColumnFamilyHandle*>& to_process_handles)
{
//0. lock db from opening
//1. list existing columns
//2. apply merge operator to (main + columns) opts
//3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> existing_cfs
//4. open db, acquire existing column handles
//5. calculate missing columns
//6. create missing columns
//7. construct cf_handles according to new sharding
//8. check is all cf_handles are filled
bool b;
std::vector<ColumnFamily> new_sharding_def;
char const* error_position;
std::string error_msg;
b = parse_sharding_def(new_sharding, new_sharding_def, &error_position, &error_msg);
if (!b) {
dout(1) << __func__ << " bad sharding: " << dendl;
dout(1) << __func__ << new_sharding << dendl;
dout(1) << __func__ << std::string(error_position - &new_sharding[0], ' ') << "^" << error_msg << dendl;
return -EINVAL;
}
//0. lock db from opening
std::string stored_sharding_text;
rocksdb::ReadFileToString(env,
sharding_def_file,
&stored_sharding_text);
if (stored_sharding_text.find("reshardingXcommencingXlocked") == string::npos) {
rocksdb::Status status;
if (stored_sharding_text.size() != 0)
stored_sharding_text += " ";
stored_sharding_text += "reshardingXcommencingXlocked";
env->CreateDir(sharding_def_dir);
status = rocksdb::WriteStringToFile(env, stored_sharding_text,
sharding_def_file, true);
if (!status.ok()) {
derr << __func__ << " cannot write to " << sharding_def_file << dendl;
return -EIO;
}
}
//1. list existing columns
rocksdb::Status status;
std::vector<std::string> existing_columns;
rocksdb::Options opt;
int r = load_rocksdb_options(false, opt);
if (r) {
dout(1) << __func__ << " load rocksdb options failed" << dendl;
return r;
}
status = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt), path, &existing_columns);
if (!status.ok()) {
derr << "Unable to list column families: " << status.ToString() << dendl;
return -EINVAL;
}
dout(5) << "existing columns = " << existing_columns << dendl;
//2. apply merge operator to (main + columns) opts
//3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> cfs_to_open
std::vector<rocksdb::ColumnFamilyDescriptor> cfs_to_open;
for (const auto& full_name : existing_columns) {
//split col_name to <prefix>-<number>
std::string base_name;
size_t pos = full_name.find('-');
if (std::string::npos == pos)
base_name = full_name;
else
base_name = full_name.substr(0,pos);
rocksdb::ColumnFamilyOptions cf_opt(opt);
// search if we have options for this column
std::string options;
for (const auto& nsd : new_sharding_def) {
if (nsd.name == base_name) {
options = nsd.options;
break;
}
}
status = rocksdb::GetColumnFamilyOptionsFromString(cf_opt, options, &cf_opt);
if (!status.ok()) {
derr << __func__ << " failure parsing column options: " << options << dendl;
return -EINVAL;
}
if (base_name != rocksdb::kDefaultColumnFamilyName)
install_cf_mergeop(base_name, &cf_opt);
cfs_to_open.emplace_back(full_name, cf_opt);
}
//4. open db, acquire existing column handles
std::vector<rocksdb::ColumnFamilyHandle*> handles;
status = rocksdb::DB::Open(rocksdb::DBOptions(opt),
path, cfs_to_open, &handles, &db);
if (!status.ok()) {
derr << status.ToString() << dendl;
return -EINVAL;
}
for (size_t i = 0; i < cfs_to_open.size(); i++) {
dout(10) << "column " << cfs_to_open[i].name << " handle " << (void*)handles[i] << dendl;
}
//5. calculate missing columns
std::vector<std::string> new_sharding_columns;
std::vector<std::string> missing_columns;
sharding_def_to_columns(new_sharding_def,
new_sharding_columns);
dout(5) << "target columns = " << new_sharding_columns << dendl;
for (const auto& n : new_sharding_columns) {
bool found = false;
for (const auto& e : existing_columns) {
if (n == e) {
found = true;
break;
}
}
if (!found) {
missing_columns.push_back(n);
}
}
dout(5) << "missing columns = " << missing_columns << dendl;
//6. create missing columns
for (const auto& full_name : missing_columns) {
std::string base_name;
size_t pos = full_name.find('-');
if (std::string::npos == pos)
base_name = full_name;
else
base_name = full_name.substr(0,pos);
rocksdb::ColumnFamilyOptions cf_opt(opt);
// search if we have options for this column
std::string options;
for (const auto& nsd : new_sharding_def) {
if (nsd.name == base_name) {
options = nsd.options;
break;
}
}
status = rocksdb::GetColumnFamilyOptionsFromString(cf_opt, options, &cf_opt);
if (!status.ok()) {
derr << __func__ << " failure parsing column options: " << options << dendl;
return -EINVAL;
}
install_cf_mergeop(base_name, &cf_opt);
rocksdb::ColumnFamilyHandle *cf;
status = db->CreateColumnFamily(cf_opt, full_name, &cf);
if (!status.ok()) {
derr << __func__ << " Failed to create rocksdb column family: "
<< full_name << dendl;
return -EINVAL;
}
dout(10) << "created column " << full_name << " handle = " << (void*)cf << dendl;
existing_columns.push_back(full_name);
handles.push_back(cf);
}
//7. construct cf_handles according to new sharding
for (size_t i = 0; i < existing_columns.size(); i++) {
std::string full_name = existing_columns[i];
rocksdb::ColumnFamilyHandle *cf = handles[i];
std::string base_name;
size_t shard_idx = 0;
size_t pos = full_name.find('-');
dout(10) << "processing column " << full_name << dendl;
if (std::string::npos == pos) {
base_name = full_name;
} else {
base_name = full_name.substr(0,pos);
shard_idx = atoi(full_name.substr(pos+1).c_str());
}
if (rocksdb::kDefaultColumnFamilyName == base_name) {
default_cf = handles[i];
must_close_default_cf = true;
} else {
for (const auto& nsd : new_sharding_def) {
if (nsd.name == base_name) {
if (shard_idx < nsd.shard_cnt) {
add_column_family(base_name, nsd.hash_l, nsd.hash_h, shard_idx, cf);
} else {
//ignore columns with index larger then shard count
}
break;
}
}
}
}
//8. check if all cf_handles are filled
for (const auto& col : cf_handles) {
for (size_t i = 0; i < col.second.handles.size(); i++) {
if (col.second.handles[i] == nullptr) {
derr << "missing handle for column " << col.first << " shard " << i << dendl;
return -EIO;
}
}
}
to_process_columns = existing_columns;
to_process_handles = handles;
return 0;
}
int RocksDBStore::reshard_cleanup(const std::vector<std::string>& current_columns,
const std::vector<rocksdb::ColumnFamilyHandle*>& current_handles)
{
std::vector<std::string> new_sharding_columns;
for (const auto& col: cf_handles) {
if (col.second.handles.size() == 1) {
new_sharding_columns.push_back(col.first);
} else {
for (size_t i = 0; i < col.second.handles.size(); i++) {
new_sharding_columns.push_back(col.first + "-" + to_string(i));
}
}
}
for (size_t i = 0; i < current_columns.size(); i++) {
bool found = false;
for (size_t j = 0; j < new_sharding_columns.size(); j++) {
if (current_columns[i] == new_sharding_columns[j]) {
found = true;
break;
}
}
if (found || current_columns[i] == rocksdb::kDefaultColumnFamilyName) {
dout(5) << "Column " << current_columns[i] << " is part of new sharding." << dendl;
continue;
}
dout(5) << "Column " << current_columns[i] << " not part of new sharding. Deleting." << dendl;
// verify that column is empty
rocksdb::Iterator* it;
it = db->NewIterator(rocksdb::ReadOptions(), current_handles[i]);
ceph_assert(it);
it->SeekToFirst();
ceph_assert(!it->Valid());
delete it;
rocksdb::Status status;
status = db->DropColumnFamily(current_handles[i]);
if (!status.ok()) {
derr << __func__ << " Failed to delete column: "
<< current_columns[i] << dendl;
return -EINVAL;
}
}
return 0;
}
int RocksDBStore::reshard(const std::string& new_sharding, const RocksDBStore::resharding_ctrl* ctrl_in)
{
rocksdb::Status status;
int r;
std::vector<std::string> to_process_columns;
std::vector<rocksdb::ColumnFamilyHandle*> to_process_handles;
resharding_ctrl ctrl = ctrl_in ? *ctrl_in : resharding_ctrl();
size_t bytes_in_batch = 0;
size_t keys_in_batch = 0;
size_t bytes_per_iterator = 0;
size_t keys_per_iterator = 0;
size_t keys_processed = 0;
size_t keys_moved = 0;
rocksdb::WriteBatch* bat = nullptr;
auto flush_batch = [&]() {
dout(10) << "flushing batch, " << keys_in_batch << " keys, for "
<< bytes_in_batch << " bytes" << dendl;
rocksdb::WriteOptions woptions;
woptions.sync = true;
rocksdb::Status s = db->Write(woptions, bat);
ceph_assert(s.ok());
bytes_in_batch = 0;
keys_in_batch = 0;
delete bat;
bat = new rocksdb::WriteBatch();
ceph_assert(bat);
};
auto process_column = [&](rocksdb::ColumnFamilyHandle* handle,
const std::string& fixed_prefix)
{
int r = 0;
dout(5) << " column=" << (void*)handle << " prefix=" << fixed_prefix << dendl;
rocksdb::Iterator* it;
it = db->NewIterator(rocksdb::ReadOptions(), handle);
ceph_assert(it);
bat = new rocksdb::WriteBatch();
ceph_assert(bat);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
rocksdb::Slice raw_key = it->key();
dout(30) << "key=" << pretty_binary_string(raw_key.ToString()) << dendl;
//check if need to refresh iterator
if (bytes_per_iterator >= ctrl.bytes_per_iterator ||
keys_per_iterator >= ctrl.keys_per_iterator) {
dout(8) << "refreshing iterator" << dendl;
bytes_per_iterator = 0;
keys_per_iterator = 0;
std::string raw_key_str = raw_key.ToString();
delete it;
it = db->NewIterator(rocksdb::ReadOptions(), handle);
ceph_assert(it);
it->Seek(raw_key_str);
ceph_assert(it->Valid());
raw_key = it->key();
}
rocksdb::Slice value = it->value();
std::string prefix, key;
if (fixed_prefix.size() == 0) {
split_key(raw_key, &prefix, &key);
} else {
prefix = fixed_prefix;
key = raw_key.ToString();
}
keys_processed++;
if ((keys_processed % 10000) == 0) {
dout(10) << "processed " << keys_processed << " keys, moved " << keys_moved << dendl;
}
std::string new_raw_key;
rocksdb::ColumnFamilyHandle* new_handle = get_cf_handle(prefix, key);
if (new_handle == nullptr) {
new_handle = default_cf;
}
if (handle == new_handle) {
continue;
}
if (new_handle == default_cf) {
new_raw_key = combine_strings(prefix, key);
} else {
new_raw_key = key;
}
bat->Delete(handle, raw_key);
bat->Put(new_handle, new_raw_key, value);
dout(25) << "moving " << (void*)handle << "/" << pretty_binary_string(raw_key.ToString()) <<
" to " << (void*)new_handle << "/" << pretty_binary_string(new_raw_key) <<
" size " << value.size() << dendl;
keys_moved++;
bytes_in_batch += new_raw_key.size() * 2 + value.size();
keys_in_batch++;
bytes_per_iterator += new_raw_key.size() * 2 + value.size();
keys_per_iterator++;
//check if need to write batch
if (bytes_in_batch >= ctrl.bytes_per_batch ||
keys_in_batch >= ctrl.keys_per_batch) {
flush_batch();
if (ctrl.unittest_fail_after_first_batch) {
r = -1000;
goto out;
}
}
}
flush_batch();
out:
delete it;
delete bat;
return r;
};
r = prepare_for_reshard(new_sharding, to_process_columns, to_process_handles);
if (r != 0) {
dout(1) << "failed to prepare db for reshard" << dendl;
goto cleanup;
}
ceph_assert(to_process_columns.size() == to_process_handles.size());
for (size_t idx = 0; idx < to_process_columns.size(); idx++) {
dout(5) << "Processing column=" << to_process_columns[idx] <<
" handle=" << to_process_handles[idx] << dendl;
if (to_process_columns[idx] == rocksdb::kDefaultColumnFamilyName) {
ceph_assert(to_process_handles[idx] == default_cf);
r = process_column(default_cf, std::string());
} else {
std::string fixed_prefix = to_process_columns[idx].substr(0, to_process_columns[idx].find('-'));
dout(10) << "Prefix: " << fixed_prefix << dendl;
r = process_column(to_process_handles[idx], fixed_prefix);
}
if (r != 0) {
derr << "Error processing column " << to_process_columns[idx] << dendl;
goto cleanup;
}
if (ctrl.unittest_fail_after_processing_column) {
r = -1001;
goto cleanup;
}
}
r = reshard_cleanup(to_process_columns, to_process_handles);
if (r != 0) {
dout(5) << "failed to cleanup after reshard" << dendl;
goto cleanup;
}
if (ctrl.unittest_fail_after_successful_processing) {
r = -1002;
goto cleanup;
}
env->CreateDir(sharding_def_dir);
status = rocksdb::WriteStringToFile(env, new_sharding,
sharding_def_file, true);
if (!status.ok()) {
derr << __func__ << " cannot write to " << sharding_def_file << dendl;
r = -EIO;
}
cleanup:
//close column handles
for (const auto& col: cf_handles) {
for (size_t i = 0; i < col.second.handles.size(); i++) {
db->DestroyColumnFamilyHandle(col.second.handles[i]);
}
}
cf_handles.clear();
close();
return r;
}

View File

@ -519,6 +519,23 @@ err:
WholeSpaceIterator get_wholespace_iterator(IteratorOpts opts = 0) override;
private:
WholeSpaceIterator get_default_cf_iterator();
int prepare_for_reshard(const std::string& new_sharding,
std::vector<std::string>& to_process_columns,
std::vector<rocksdb::ColumnFamilyHandle*>& to_process_handles);
int reshard_cleanup(const std::vector<std::string>& current_columns,
const std::vector<rocksdb::ColumnFamilyHandle*>& current_handles);
public:
struct resharding_ctrl {
size_t bytes_per_iterator = 10000000; /// amount of data to process before refreshing iterator
size_t keys_per_iterator = 10000;
size_t bytes_per_batch = 1000000; /// amount of data before submitting batch
size_t keys_per_batch = 1000;
bool unittest_fail_after_first_batch = false;
bool unittest_fail_after_processing_column = false;
bool unittest_fail_after_successful_processing = false;
};
int reshard(const std::string& new_sharding, const resharding_ctrl* ctrl = nullptr);
};
#endif

View File

@ -5664,19 +5664,80 @@ void BlueStore::_sync_bluefs_and_fm()
}
}
int BlueStore::_open_db(bool create, bool to_repair_db, bool read_only)
int BlueStore::open_db_environment(KeyValueDB **pdb)
{
string kv_dir_fn;
string kv_backend;
_kv_only = true;
{
string type;
int r = read_meta("type", &type);
if (r < 0) {
derr << __func__ << " failed to load os-type: " << cpp_strerror(r)
<< dendl;
return r;
}
if (type != "bluestore") {
derr << __func__ << " expected bluestore, but type is " << type << dendl;
return -EIO;
}
}
int r = _open_path();
if (r < 0)
return r;
r = _open_fsid(false);
if (r < 0)
goto out_path;
r = _read_fsid(&fsid);
if (r < 0)
goto out_fsid;
r = _lock_fsid();
if (r < 0)
goto out_fsid;
r = _open_bdev(false);
if (r < 0)
goto out_fsid;
r = _prepare_db_environment(false, false, &kv_dir_fn, &kv_backend);
if (r < 0)
goto out_bdev;
*pdb = db;
return 0;
out_bdev:
_close_bdev();
out_fsid:
_close_fsid();
out_path:
_close_path();
return r;
}
int BlueStore::close_db_environment()
{
_close_db_and_around(false);
_close_bdev();
_close_fsid();
_close_path();
return 0;
}
int BlueStore::_prepare_db_environment(bool create, bool read_only,
std::string* _fn, std::string* _kv_backend)
{
int r;
ceph_assert(!db);
ceph_assert(!(create && read_only));
string fn = path + "/db";
string options;
stringstream err;
std::string& fn=*_fn;
std::string& kv_backend=*_kv_backend;
fn = path + "/db";
std::shared_ptr<Int64ArrayMergeOperator> merge_op(new Int64ArrayMergeOperator);
string kv_backend;
std::string sharding_def;
if (create) {
kv_backend = cct->_conf->bluestore_kvbackend;
} else {
@ -5816,7 +5877,23 @@ int BlueStore::_open_db(bool create, bool to_repair_db, bool read_only)
FreelistManager::setup_merge_operators(db);
db->set_merge_operator(PREFIX_STAT, merge_op);
db->set_cache_size(cache_kv_ratio * cache_size);
return 0;
}
int BlueStore::_open_db(bool create, bool to_repair_db, bool read_only)
{
int r;
ceph_assert(!(create && read_only));
string options;
stringstream err;
string kv_dir_fn;
string kv_backend;
std::string sharding_def;
r = _prepare_db_environment(create, read_only, &kv_dir_fn, &kv_backend);
if (r < 0) {
derr << __func__ << " failed to prepare db environment: " << err.str() << dendl;
return -EIO;
}
if (kv_backend == "rocksdb") {
options = cct->_conf->bluestore_rocksdb_options;
if (cct->_conf.get_val<bool>("bluestore_rocksdb_cf")) {
@ -5842,7 +5919,7 @@ int BlueStore::_open_db(bool create, bool to_repair_db, bool read_only)
return -EIO;
}
dout(1) << __func__ << " opened " << kv_backend
<< " path " << fn << " options " << options << dendl;
<< " path " << kv_dir_fn << " options " << options << dendl;
return 0;
}

View File

@ -2282,6 +2282,9 @@ private:
*/
int _open_db_and_around(bool read_only);
void _close_db_and_around(bool read_only);
int _prepare_db_environment(bool create, bool read_only,
std::string* kv_dir, std::string* kv_backend);
int _close_db_environment();
// updates legacy bluefs related recs in DB to a state valid for
// downgrades from nautilus.
@ -2528,6 +2531,9 @@ public:
return 0;
}
int open_db_environment(KeyValueDB **pdb);
int close_db_environment();
int write_meta(const std::string& key, const std::string& value) override;
int read_meta(const std::string& key, std::string *value) override;

View File

@ -19,6 +19,7 @@
#include "os/bluestore/BlueFS.h"
#include "os/bluestore/BlueStore.h"
#include "common/admin_socket.h"
#include "kv/RocksDBStore.h"
namespace po = boost::program_options;
@ -226,6 +227,9 @@ int main(int argc, char **argv)
string log_file;
string key, value;
vector<string> allocs_name;
string empty_sharding(1, '\0');
string new_sharding = empty_sharding;
string resharding_ctrl;
int log_level = 30;
bool fsck_deep = false;
po::options_description po_options("Options");
@ -242,6 +246,8 @@ int main(int argc, char **argv)
("key,k", po::value<string>(&key), "label metadata key name")
("value,v", po::value<string>(&value), "label metadata value")
("allocator", po::value<vector<string>>(&allocs_name), "allocator to inspect: 'block'/'bluefs-wal'/'bluefs-db'/'bluefs-slow'")
("sharding", po::value<string>(&new_sharding), "new sharding to apply")
("resharding-ctrl", po::value<string>(&resharding_ctrl), "gives control over resharding procedure details")
;
po::options_description po_positional("Positional options");
po_positional.add_options()
@ -262,7 +268,8 @@ int main(int argc, char **argv)
"bluefs-log-dump, "
"free-dump, "
"free-score, "
"bluefs-stats")
"bluefs-stats, "
"reshard")
;
po::options_description po_all("All options");
po_all.add(po_options).add(po_positional);
@ -395,6 +402,16 @@ int main(int argc, char **argv)
if (allocs_name.empty())
allocs_name = vector<string>{"block", "bluefs-db", "bluefs-wal", "bluefs-slow"};
}
if (action == "reshard") {
if (path.empty()) {
cerr << "must specify bluestore path" << std::endl;
exit(EXIT_FAILURE);
}
if (new_sharding == empty_sharding) {
cerr << "must provide reshard specification" << std::endl;
exit(EXIT_FAILURE);
}
}
vector<const char*> args;
if (log_file.size()) {
args.push_back("--log-file");
@ -880,6 +897,57 @@ int main(int argc, char **argv)
}
cout << std::string(out.c_str(), out.length()) << std::endl;
bluestore.cold_close();
} else if (action == "reshard") {
auto get_ctrl = [&](size_t& val) {
if (!resharding_ctrl.empty()) {
size_t pos;
std::string token;
pos = resharding_ctrl.find('/');
token = resharding_ctrl.substr(0, pos);
if (pos != std::string::npos)
resharding_ctrl.erase(0, pos + 1);
else
resharding_ctrl.erase();
char* endptr;
val = strtoll(token.c_str(), &endptr, 0);
if (*endptr != '\0') {
cerr << "invalid --resharding-ctrl. '" << token << "' is not a number" << std::endl;
exit(EXIT_FAILURE);
}
}
};
BlueStore bluestore(cct.get(), path);
KeyValueDB *db_ptr;
RocksDBStore::resharding_ctrl ctrl;
if (!resharding_ctrl.empty()) {
get_ctrl(ctrl.bytes_per_iterator);
get_ctrl(ctrl.keys_per_iterator);
get_ctrl(ctrl.bytes_per_batch);
get_ctrl(ctrl.keys_per_batch);
if (!resharding_ctrl.empty()) {
cerr << "extra chars in --resharding-ctrl" << std::endl;
exit(EXIT_FAILURE);
}
}
int r = bluestore.open_db_environment(&db_ptr);
if (r < 0) {
cerr << "error preparing db environment: " << cpp_strerror(r) << std::endl;
exit(EXIT_FAILURE);
}
if (r < 0) {
cerr << "error starting k-v inside bluestore: " << cpp_strerror(r) << std::endl;
exit(EXIT_FAILURE);
}
RocksDBStore* rocks_db = dynamic_cast<RocksDBStore*>(db_ptr);
ceph_assert(db_ptr);
ceph_assert(rocks_db);
r = rocks_db->reshard(new_sharding, &ctrl);
if (r < 0) {
cerr << "error resharding: " << cpp_strerror(r) << std::endl;
} else {
cout << "reshard success" << std::endl;
}
bluestore.close_db_environment();
} else {
cerr << "unrecognized action " << action << std::endl;
return 1;

View File

@ -987,6 +987,280 @@ TEST_P(RocksDBShardingTest, wholespace_lookup_limits) {
class RocksDBResharding : public ::testing::Test {
public:
boost::scoped_ptr<RocksDBStore> db;
RocksDBResharding() : db(0) {}
string _bl_to_str(bufferlist val) {
string str(val.c_str(), val.length());
return str;
}
void rm_r(string path) {
string cmd = string("rm -r ") + path;
if (verbose)
cout << "==> " << cmd << std::endl;
int r = ::system(cmd.c_str());
if (r) {
cerr << "failed with exit code " << r
<< ", continuing anyway" << std::endl;
}
}
void SetUp() override {
verbose = getenv("VERBOSE") && strcmp(getenv("VERBOSE"), "1") == 0;
int r = ::mkdir("kv_test_temp_dir", 0777);
if (r < 0 && errno != EEXIST) {
r = -errno;
cerr << __func__ << ": unable to create kv_test_temp_dir: "
<< cpp_strerror(r) << std::endl;
return;
}
KeyValueDB* db_kv = KeyValueDB::create(g_ceph_context, "rocksdb",
"kv_test_temp_dir");
RocksDBStore* db_rocks = dynamic_cast<RocksDBStore*>(db_kv);
ceph_assert(db_rocks);
db.reset(db_rocks);
ASSERT_EQ(0, db->init(g_conf()->bluestore_rocksdb_options));
}
void TearDown() override {
db.reset(nullptr);
rm_r("kv_test_temp_dir");
}
bool verbose;
std::vector<std::string> prefixes = {"Ad", "Betelgeuse", "C", "D", "Evade"};
std::vector<std::string> randoms = {"0", "1", "2", "3", "4", "5",
"found", "brain", "fully", "pen", "worth", "race",
"stand", "nodded", "whenever", "surrounded", "industrial", "skin",
"this", "direction", "family", "beginning", "whenever", "held",
"metal", "year", "like", "valuable", "softly", "whistle",
"perfectly", "broken", "idea", "also", "coffee", "branch",
"tongue", "immediately", "bent", "partly", "burn", "include",
"certain", "burst", "final", "smoke", "positive", "perfectly"
};
int R = randoms.size();
int k = 0;
std::map<std::string, std::string> data;
void generate_data() {
data.clear();
for (size_t p = 0; p < prefixes.size(); p++) {
size_t elem_count = 1 << (( p * 3 ) + 3);
for (size_t i = 0; i < elem_count; i++) {
std::string key;
for (int x = 0; x < 5; x++) {
key = key + randoms[rand() % R];
}
std::string value;
for (int x = 0; x < 3; x++) {
value = value + randoms[rand() % R];
}
data[RocksDBStore::combine_strings(prefixes[p], key)] = value;
}
}
}
void data_to_db() {
KeyValueDB::Transaction t = db->get_transaction();
size_t i = 0;
for (auto& d: data) {
bufferlist v1;
v1.append(d.second);
string prefix;
string key;
RocksDBStore::split_key(d.first, &prefix, &key);
t->set(prefix, key, v1);
if (verbose)
std::cout << "SET " << prefix << " " << key << std::endl;
i++;
if ((i % 1000) == 0) {
ASSERT_EQ(db->submit_transaction_sync(t), 0);
t.reset();
if (verbose)
std::cout << "writing key to DB" << std::endl;
t = db->get_transaction();
}
}
if (verbose)
std::cout << "writing keys to DB" << std::endl;
ASSERT_EQ(db->submit_transaction_sync(t), 0);
}
void clear_db() {
KeyValueDB::Transaction t = db->get_transaction();
for (auto &d : data) {
string prefix;
string key;
RocksDBStore::split_key(d.first, &prefix, &key);
t->rmkey(prefix, key);
}
ASSERT_EQ(db->submit_transaction_sync(t), 0);
//paranoid, check if db empty
KeyValueDB::WholeSpaceIterator it = db->get_wholespace_iterator();
ASSERT_EQ(it->seek_to_first(), 0);
ASSERT_EQ(it->valid(), false);
}
void check_db() {
KeyValueDB::WholeSpaceIterator it = db->get_wholespace_iterator();
//move forward
auto dit = data.begin();
int r = it->seek_to_first();
ASSERT_EQ(r, 0);
ASSERT_EQ(it->valid(), (dit != data.end()));
while (dit != data.end()) {
ASSERT_EQ(it->valid(), true);
string prefix;
string key;
RocksDBStore::split_key(dit->first, &prefix, &key);
auto raw_key = it->raw_key();
ASSERT_EQ(raw_key.first, prefix);
ASSERT_EQ(raw_key.second, key);
ASSERT_EQ(it->value().to_str(), dit->second);
if (verbose)
std::cout << "next " << prefix << " " << key << std::endl;
ASSERT_EQ(it->next(), 0);
++dit;
}
ASSERT_EQ(it->valid(), false);
}
};
TEST_F(RocksDBResharding, basic) {
ASSERT_EQ(0, db->create_and_open(cout, ""));
generate_data();
data_to_db();
check_db();
db->close();
ASSERT_EQ(db->reshard("Evade(4)"), 0);
ASSERT_EQ(db->open(cout), 0);
check_db();
db->close();
}
TEST_F(RocksDBResharding, all_to_shards) {
ASSERT_EQ(0, db->create_and_open(cout, ""));
generate_data();
data_to_db();
check_db();
db->close();
ASSERT_EQ(db->reshard("Ad(1) Betelgeuse(1) C(1) D(1) Evade(1)"), 0);
ASSERT_EQ(db->open(cout), 0);
check_db();
db->close();
}
TEST_F(RocksDBResharding, all_to_shards_and_back_again) {
ASSERT_EQ(0, db->create_and_open(cout, ""));
generate_data();
data_to_db();
check_db();
db->close();
ASSERT_EQ(db->reshard("Ad(1) Betelgeuse(1) C(1) D(1) Evade(1)"), 0);
ASSERT_EQ(db->open(cout), 0);
check_db();
db->close();
ASSERT_EQ(db->reshard(""), 0);
ASSERT_EQ(db->open(cout), 0);
check_db();
db->close();
}
TEST_F(RocksDBResharding, resume_interrupted_at_batch) {
ASSERT_EQ(0, db->create_and_open(cout, ""));
generate_data();
data_to_db();
check_db();
db->close();
RocksDBStore::resharding_ctrl ctrl;
ctrl.unittest_fail_after_first_batch = true;
ASSERT_EQ(db->reshard("Evade(4)", &ctrl), -1000);
ASSERT_NE(db->open(cout), 0);
ASSERT_EQ(db->reshard("Evade(4)"), 0);
ASSERT_EQ(db->open(cout), 0);
check_db();
db->close();
}
TEST_F(RocksDBResharding, resume_interrupted_at_column) {
ASSERT_EQ(0, db->create_and_open(cout, ""));
generate_data();
data_to_db();
check_db();
db->close();
RocksDBStore::resharding_ctrl ctrl;
ctrl.unittest_fail_after_processing_column = true;
ASSERT_EQ(db->reshard("Evade(4)", &ctrl), -1001);
ASSERT_NE(db->open(cout), 0);
ASSERT_EQ(db->reshard("Evade(4)"), 0);
ASSERT_EQ(db->open(cout), 0);
check_db();
db->close();
}
TEST_F(RocksDBResharding, resume_interrupted_before_commit) {
ASSERT_EQ(0, db->create_and_open(cout, ""));
generate_data();
data_to_db();
check_db();
db->close();
RocksDBStore::resharding_ctrl ctrl;
ctrl.unittest_fail_after_successful_processing = true;
ASSERT_EQ(db->reshard("Evade(4)", &ctrl), -1002);
ASSERT_NE(db->open(cout), 0);
ASSERT_EQ(db->reshard("Evade(4)"), 0);
ASSERT_EQ(db->open(cout), 0);
check_db();
db->close();
}
TEST_F(RocksDBResharding, prevent_incomplete_hash_change) {
ASSERT_EQ(0, db->create_and_open(cout, "Evade(4,0-3)"));
generate_data();
data_to_db();
check_db();
db->close();
RocksDBStore::resharding_ctrl ctrl;
ctrl.unittest_fail_after_successful_processing = true;
ASSERT_EQ(db->reshard("Evade(4,0-8)", &ctrl), -1002);
ASSERT_NE(db->open(cout), 0);
ASSERT_EQ(db->reshard("Evade(4,0-8)"), 0);
ASSERT_EQ(db->open(cout), 0);
check_db();
db->close();
}
TEST_F(RocksDBResharding, change_reshard) {
ASSERT_EQ(0, db->create_and_open(cout, "Ad(4)"));
generate_data();
data_to_db();
check_db();
db->close();
RocksDBStore::resharding_ctrl ctrl;
ctrl.unittest_fail_after_first_batch = true;
ASSERT_EQ(db->reshard("C(5) D(3)", &ctrl), -1000);
ASSERT_NE(db->open(cout), 0);
ctrl.unittest_fail_after_first_batch = false;
ctrl.unittest_fail_after_processing_column = true;
ASSERT_EQ(db->reshard("C(5) Evade(2)", &ctrl), -1001);
ASSERT_NE(db->open(cout), 0);
ctrl.unittest_fail_after_processing_column = false;
ctrl.unittest_fail_after_successful_processing = true;
ASSERT_EQ(db->reshard("Evade(2) D(3)", &ctrl), -1002);
ASSERT_NE(db->open(cout), 0);
ASSERT_EQ(db->reshard("Ad(1) Evade(5)"), 0);
ASSERT_EQ(db->open(cout), 0);
check_db();
db->close();
}
INSTANTIATE_TEST_SUITE_P(
KeyValueDB,
KVTest,