Merge pull request #17029 from xiexingguo/wip-crush-rule-rename

mon: "ceph osd crush rule rename" support

Reviewed-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2017-08-25 10:21:38 -05:00 committed by GitHub
commit d930a4e062
5 changed files with 143 additions and 0 deletions

View File

@ -26,6 +26,10 @@ ceph osd crush set-device-class ssd osd.0
ceph osd crush set-device-class hdd osd.1
ceph osd crush rule create-replicated foo-ssd default host ssd
ceph osd crush rule create-replicated foo-hdd default host hdd
ceph osd crush rule ls-by-class ssd | grep 'foo-ssd'
ceph osd crush rule ls-by-class ssd | expect_false grep 'foo-hdd'
ceph osd crush rule ls-by-class hdd | grep 'foo-hdd'
ceph osd crush rule ls-by-class hdd | expect_false grep 'foo-ssd'
ceph osd erasure-code-profile set ec-foo-ssd crush-device-class=ssd m=2 k=2
ceph osd pool create ec-foo 2 erasure ec-foo-ssd
@ -33,6 +37,16 @@ ceph osd pool rm ec-foo ec-foo --yes-i-really-really-mean-it
ceph osd crush rule ls | grep foo
ceph osd crush rule rename foo foo-asdf
ceph osd crush rule rename bar bar-asdf
ceph osd crush rule ls | grep 'foo-asdf'
ceph osd crush rule ls | grep 'bar-asdf'
ceph osd crush rule rm foo 2>&1 | grep 'does not exist'
ceph osd crush rule rm bar 2>&1 | grep 'does not exist'
ceph osd crush rule rename foo-asdf foo
ceph osd crush rule rename bar-asdf bar
ceph osd crush rule ls | expect_false grep 'foo-asdf'
ceph osd crush rule ls | expect_false grep 'bar-asdf'
ceph osd crush rule rm foo
ceph osd crush rule rm foo # idempotent
ceph osd crush rule rm bar

View File

@ -291,6 +291,33 @@ int CrushWrapper::rename_bucket(const string& srcname,
return set_item_name(oldid, dstname);
}
int CrushWrapper::rename_rule(const string& srcname,
const string& dstname,
ostream *ss)
{
if (!rule_exists(srcname)) {
if (ss) {
*ss << "source rule name '" << srcname << "' does not exist";
}
return -ENOENT;
}
if (rule_exists(dstname)) {
if (ss) {
*ss << "destination rule name '" << dstname << "' already exists";
}
return -EEXIST;
}
int rule_id = get_rule_id(srcname);
auto it = rule_name_map.find(rule_id);
assert(it != rule_name_map.end());
it->second = dstname;
if (have_rmaps) {
rule_name_rmap.erase(srcname);
rule_name_rmap[dstname] = rule_id;
}
return 0;
}
void CrushWrapper::find_takes(set<int>& roots) const
{
for (unsigned i=0; i<crush->max_rules; i++) {
@ -1996,6 +2023,37 @@ int CrushWrapper::device_class_clone(
return 0;
}
int CrushWrapper::get_rules_by_class(const string &class_name, set<int> *rules)
{
assert(rules);
rules->clear();
if (!class_exists(class_name)) {
return -ENOENT;
}
int class_id = get_class_id(class_name);
for (unsigned i = 0; i < crush->max_rules; ++i) {
crush_rule *r = crush->rules[i];
if (!r)
continue;
for (unsigned j = 0; j < r->len; ++j) {
if (r->steps[j].op == CRUSH_RULE_TAKE) {
int step_item = r->steps[j].arg1;
int original_item;
int c;
int res = split_id_class(step_item, &original_item, &c);
if (res < 0) {
return res;
}
if (c != -1 && c == class_id) {
rules->insert(i);
break;
}
}
}
}
return 0;
}
bool CrushWrapper::_class_is_dead(int class_id)
{
for (auto &p: class_map) {

View File

@ -539,6 +539,9 @@ public:
ostream *ss);
// rule names
int rename_rule(const string& srcname,
const string& dstname,
ostream *ss);
bool rule_exists(string name) const {
build_rmaps();
return rule_name_rmap.count(name);
@ -1217,6 +1220,7 @@ public:
int rename_class(const string& srcname, const string& dstname);
int populate_classes(
const std::map<int32_t, map<int32_t, int32_t>>& old_class_bucket);
int get_rules_by_class(const string &class_name, set<int> *rules);
bool _class_is_dead(int class_id);
void cleanup_dead_classes();
int rebuild_roots_with_classes();

View File

@ -509,6 +509,10 @@ COMMAND("osd lspools " \
COMMAND_WITH_FLAG("osd crush rule list", "list crush rules", "osd", "r", "cli,rest",
FLAG(DEPRECATED))
COMMAND("osd crush rule ls", "list crush rules", "osd", "r", "cli,rest")
COMMAND("osd crush rule ls-by-class " \
"name=class,type=CephString,goodchars=[A-Za-z0-9-_.],req=false", \
"list all crush rules that reference the same <class>", \
"osd", "r", "cli,rest")
COMMAND("osd crush rule dump " \
"name=name,type=CephString,goodchars=[A-Za-z0-9-_.],req=false", \
"dump crush rule <name> (default all)", \
@ -646,6 +650,11 @@ COMMAND("osd crush rule create-erasure " \
COMMAND("osd crush rule rm " \
"name=name,type=CephString,goodchars=[A-Za-z0-9-_.] ", \
"remove crush rule <name>", "osd", "rw", "cli,rest")
COMMAND("osd crush rule rename " \
"name=srcname,type=CephString,goodchars=[A-Za-z0-9-_.] " \
"name=dstname,type=CephString,goodchars=[A-Za-z0-9-_.]", \
"rename crush rule <srcname> to <dstname>",
"osd", "rw", "cli,rest")
COMMAND("osd crush tree "
"name=shadow,type=CephChoices,strings=--show-shadow,req=false", \
"dump crush buckets and items in a tree view",

View File

@ -4930,6 +4930,34 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op)
osdmap.crush->list_rules(&ss);
rdata.append(ss.str());
}
} else if (prefix == "osd crush rule ls-by-class") {
string class_name;
cmd_getval(g_ceph_context, cmdmap, "class", class_name);
if (class_name.empty()) {
ss << "no class specified";
r = -EINVAL;
goto reply;
}
set<int> rules;
r = osdmap.crush->get_rules_by_class(class_name, &rules);
if (r < 0) {
ss << "failed to get rules by class '" << class_name << "'";
goto reply;
}
if (f) {
f->open_array_section("rules");
for (auto &rule: rules) {
f->dump_string("name", osdmap.crush->get_rule_name(rule));
}
f->close_section();
f->flush(rdata);
} else {
ostringstream rs;
for (auto &rule: rules) {
rs << osdmap.crush->get_rule_name(rule) << "\n";
}
rdata.append(rs.str());
}
} else if (prefix == "osd crush rule dump") {
string name;
cmd_getval(g_ceph_context, cmdmap, "name", name);
@ -8577,6 +8605,36 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
get_last_committed() + 1));
return true;
} else if (prefix == "osd crush rule rename") {
string srcname;
string dstname;
cmd_getval(g_ceph_context, cmdmap, "srcname", srcname);
cmd_getval(g_ceph_context, cmdmap, "dstname", dstname);
if (srcname.empty() || dstname.empty()) {
ss << "must specify both source rule name and destination rule name";
err = -EINVAL;
goto reply;
}
if (srcname == dstname) {
ss << "destination rule name is equal to source rule name";
err = 0;
goto reply;
}
CrushWrapper newcrush;
_get_pending_crush(newcrush);
err = newcrush.rename_rule(srcname, dstname, &ss);
if (err < 0) {
// ss has reason for failure
goto reply;
}
pending_inc.crush.clear();
newcrush.encode(pending_inc.crush, mon->get_quorum_con_features());
getline(ss, rs);
wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, 0, rs,
get_last_committed() + 1));
return true;
} else if (prefix == "osd setmaxosd") {
int64_t newmax;
if (!cmd_getval(g_ceph_context, cmdmap, "newmax", newmax)) {