Merge pull request #14404 from tchaikov/wip-another-mgr-command

mon,mgr: move reweight-by-* to mgr

Reviewed-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Kefu Chai 2017-04-13 13:57:42 +08:00 committed by GitHub
commit 9acd868857
14 changed files with 501 additions and 364 deletions

View File

@ -11,6 +11,13 @@ openstack:
- volumes: # attached to each instance
count: 3
size: 10 # GB
overrides:
ceph:
conf:
mgr:
mon reweight min pgs per osd: 4
mon:
mon reweight min pgs per osd: 4
tasks:
- install:
- ceph:

View File

@ -35,7 +35,7 @@ protected:
MonClient *monc;
Objecter *objecter;
FSMap fsmap;
Mutex lock;
mutable Mutex lock;
PGMap pg_map;
@ -57,29 +57,28 @@ public:
void notify_osdmap(const OSDMap &osd_map);
bool have_fsmap() {
bool have_fsmap() const {
Mutex::Locker l(lock);
return fsmap.get_epoch() > 0;
}
template<typename Callback, typename...Args>
void with_fsmap(Callback&& cb, Args&&...args)
void with_fsmap(Callback&& cb, Args&&...args) const
{
Mutex::Locker l(lock);
std::forward<Callback>(cb)(const_cast<const FSMap&>(fsmap),
std::forward<Args>(args)...);
std::forward<Callback>(cb)(fsmap, std::forward<Args>(args)...);
}
template<typename Callback, typename...Args>
void with_pgmap(Callback&& cb, Args&&...args)
auto with_pgmap(Callback&& cb, Args&&...args) const ->
decltype(cb(pg_map, std::forward<Args>(args)...))
{
Mutex::Locker l(lock);
std::forward<Callback>(cb)(const_cast<const PGMap&>(pg_map),
std::forward<Args>(args)...);
return std::forward<Callback>(cb)(pg_map, std::forward<Args>(args)...);
}
template<typename... Args>
void with_monmap(Args &&... args)
void with_monmap(Args &&... args) const
{
Mutex::Locker l(lock);
assert(monc != nullptr);
@ -87,10 +86,11 @@ public:
}
template<typename... Args>
void with_osdmap(Args &&... args)
auto with_osdmap(Args &&... args) const ->
decltype(objecter->with_osdmap(std::forward<Args>(args)...))
{
assert(objecter != nullptr);
objecter->with_osdmap(std::forward<Args>(args)...);
return objecter->with_osdmap(std::forward<Args>(args)...);
}
};

View File

@ -15,6 +15,8 @@
#include "auth/RotatingKeyRing.h"
#include "json_spirit/json_spirit_writer.h"
#include "messages/MMgrOpen.h"
#include "messages/MMgrConfigure.h"
#include "messages/MCommand.h"
@ -369,6 +371,24 @@ bool DaemonServer::_allowed_command(
return capable;
}
class ReplyOnFinish : public Context {
DaemonServer* mgr;
MCommand *m;
bufferlist odata;
public:
bufferlist from_mon;
string outs;
ReplyOnFinish(DaemonServer* mgr, MCommand *m, bufferlist&& odata)
: mgr(mgr), m(m), odata(std::move(odata))
{}
void finish(int r) override {
odata.claim_append(from_mon);
mgr->_reply(m, r, outs, odata);
}
};
bool DaemonServer::handle_command(MCommand *m)
{
int r = 0;
@ -398,8 +418,7 @@ bool DaemonServer::handle_command(MCommand *m)
map<string,string> param_str_map;
if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
r = -EINVAL;
goto out;
return _reply(m, -EINVAL, ss.str(), odata);
}
{
@ -441,7 +460,8 @@ bool DaemonServer::handle_command(MCommand *m)
}
#endif
f.close_section(); // command_descriptions
goto out;
f.flush(odata);
return _reply(m, r, ss.str(), odata);
}
// lookup command
@ -449,9 +469,7 @@ bool DaemonServer::handle_command(MCommand *m)
ARRAY_SIZE(mgr_commands));
_generate_command_map(cmdmap, param_str_map);
if (!mgr_cmd) {
ss << "command not supported";
r = -EINVAL;
goto out;
return _reply(m, -EINVAL, "command not supported", odata);
}
// validate user's permissions for requested command
@ -461,9 +479,7 @@ bool DaemonServer::handle_command(MCommand *m)
audit_clog->info() << "from='" << session->inst << "' "
<< "entity='" << session->entity_name << "' "
<< "cmd=" << m->cmd << ": access denied";
ss << "access denied";
r = -EACCES;
goto out;
return _reply(m, -EACCES, "access denied", odata);
}
audit_clog->debug()
@ -483,8 +499,7 @@ bool DaemonServer::handle_command(MCommand *m)
cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr);
if (!pgid.parse(pgidstr.c_str())) {
ss << "invalid pgid '" << pgidstr << "'";
r = -EINVAL;
goto out;
return _reply(m, -EINVAL, ss.str(), odata);
}
bool pg_exists = false;
cluster_state.with_osdmap([&](const OSDMap& osdmap) {
@ -492,8 +507,7 @@ bool DaemonServer::handle_command(MCommand *m)
});
if (!pg_exists) {
ss << "pg " << pgid << " dne";
r = -ENOENT;
goto out;
return _reply(m, -ENOENT, ss.str(), odata);
}
int acting_primary = -1;
entity_inst_t inst;
@ -505,8 +519,7 @@ bool DaemonServer::handle_command(MCommand *m)
});
if (acting_primary == -1) {
ss << "pg " << pgid << " has no primary osd";
r = -EAGAIN;
goto out;
return _reply(m, -EAGAIN, ss.str(), odata);
}
vector<pg_t> pgs = { pgid };
msgr->send_message(new MOSDScrub(monc->get_fsid(),
@ -516,21 +529,107 @@ bool DaemonServer::handle_command(MCommand *m)
inst);
ss << "instructing pg " << pgid << " on osd." << acting_primary
<< " (" << inst << ") to " << scrubop;
r = 0;
}
else {
cluster_state.with_pgmap(
[&](const PGMap& pg_map) {
cluster_state.with_osdmap([&](const OSDMap& osdmap) {
r = process_pg_map_command(prefix, cmdmap, pg_map, osdmap,
f.get(), &ss, &odata);
return _reply(m, 0, ss.str(), odata);
} else if (prefix == "osd reweight-by-pg" ||
prefix == "osd reweight-by-utilization" ||
prefix == "osd test-reweight-by-pg" ||
prefix == "osd test-reweight-by-utilization") {
bool by_pg =
prefix == "osd reweight-by-pg" || prefix == "osd test-reweight-by-pg";
bool dry_run =
prefix == "osd test-reweight-by-pg" ||
prefix == "osd test-reweight-by-utilization";
int64_t oload;
cmd_getval(g_ceph_context, cmdmap, "oload", oload, int64_t(120));
set<int64_t> pools;
vector<string> poolnames;
cmd_getval(g_ceph_context, cmdmap, "pools", poolnames);
cluster_state.with_osdmap([&](const OSDMap& osdmap) {
for (const auto& poolname : poolnames) {
int64_t pool = osdmap.lookup_pg_pool_name(poolname);
if (pool < 0) {
ss << "pool '" << poolname << "' does not exist";
r = -ENOENT;
}
pools.insert(pool);
}
});
if (r) {
return _reply(m, r, ss.str(), odata);
}
double max_change = g_conf->mon_reweight_max_change;
cmd_getval(g_ceph_context, cmdmap, "max_change", max_change);
if (max_change <= 0.0) {
ss << "max_change " << max_change << " must be positive";
return _reply(m, -EINVAL, ss.str(), odata);
}
int64_t max_osds = g_conf->mon_reweight_max_osds;
cmd_getval(g_ceph_context, cmdmap, "max_osds", max_osds);
if (max_osds <= 0) {
ss << "max_osds " << max_osds << " must be positive";
return _reply(m, -EINVAL, ss.str(), odata);
}
string no_increasing;
cmd_getval(g_ceph_context, cmdmap, "no_increasing", no_increasing);
string out_str;
map<int32_t, uint32_t> new_weights;
r = cluster_state.with_pgmap([&](const PGMap& pgmap) {
return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
return reweight::by_utilization(osdmap, pgmap,
oload,
max_change,
max_osds,
by_pg,
pools.empty() ? NULL : &pools,
no_increasing == "--no-increasing",
&new_weights,
&ss, &out_str, f.get());
});
});
if (r >= 0) {
dout(10) << "reweight::by_utilization: finished with " << out_str << dendl;
}
if (f)
f->flush(odata);
else
odata.append(out_str);
if (r < 0) {
ss << "FAILED reweight-by-pg";
return _reply(m, r, ss.str(), odata);
} else if (r == 0 || dry_run) {
ss << "no change";
return _reply(m, r, ss.str(), odata);
} else {
json_spirit::Object json_object;
for (const auto& osd_weight : new_weights) {
json_spirit::Config::add(json_object,
std::to_string(osd_weight.first),
std::to_string(osd_weight.second));
}
string s = json_spirit::write(json_object);
std::replace(begin(s), end(s), '\"', '\'');
const string cmd =
"{"
"\"prefix\": \"osd reweightn\", "
"\"weights\": \"" + s + "\""
"}";
auto on_finish = new ReplyOnFinish(this, m, std::move(odata));
monc->start_mon_command({cmd}, {},
&on_finish->from_mon, &on_finish->outs, on_finish);
return true;
}
} else {
r = cluster_state.with_pgmap([&](const PGMap& pg_map) {
return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
return process_pg_map_command(prefix, cmdmap, pg_map, osdmap,
f.get(), &ss, &odata);
});
});
}
if (r != -EOPNOTSUPP)
return _reply(m, r, ss.str(), odata);
// fall back to registered python handlers
if (r == -EOPNOTSUPP) {
else {
// Let's find you a handler!
MgrPyModule *handler = nullptr;
auto py_commands = py_modules.get_commands();
@ -546,8 +645,7 @@ bool DaemonServer::handle_command(MCommand *m)
if (handler == nullptr) {
ss << "No handler found for '" << prefix << "'";
dout(4) << "No handler found for '" << prefix << "'" << dendl;
r = -EINVAL;
goto out;
return _reply(m, -EINVAL, ss.str(), odata);
}
// FIXME: go run this python part in another thread, not inline
@ -557,27 +655,29 @@ bool DaemonServer::handle_command(MCommand *m)
stringstream ds;
r = handler->handle_command(cmdmap, &ds, &ss);
odata.append(ds);
goto out;
return _reply(m, 0, ss.str(), odata);
}
}
out:
bool DaemonServer::_reply(MCommand* m,
int ret,
const std::string& s,
const bufferlist& payload)
{
dout(1) << __func__ << " r=" << ret << " " << s << dendl;
auto con = m->get_connection();
if (!con) {
dout(10) << __func__ << " connection dropped for command" << dendl;
m->put();
return true;
}
// Let the connection drop as soon as we've sent our response
if (m->get_connection()) {
m->get_connection()->mark_disposable();
}
std::string rs;
rs = ss.str();
dout(1) << "do_command r=" << r << " " << rs << dendl;
if (con) {
MCommandReply *reply = new MCommandReply(r, rs);
reply->set_tid(m->get_tid());
reply->set_data(odata);
con->send_message(reply);
}
con->mark_disposable();
auto response = new MCommandReply(ret, s);
response->set_tid(m->get_tid());
response->set_data(payload);
con->send_message(response);
m->put();
return true;
}

View File

@ -73,6 +73,11 @@ protected:
const map<string,string>& param_str_map,
const MgrCommand *this_cmd);
private:
friend class ReplyOnFinish;
bool _reply(MCommand* m,
int ret, const std::string& s, const bufferlist& payload);
public:
int init(uint64_t gid, entity_addr_t client_addr);
void shutdown();

View File

@ -68,3 +68,31 @@ COMMAND("osd pool stats " \
"name=name,type=CephString,req=false",
"obtain stats from all pools, or from specified pool",
"osd", "r", "cli,rest")
COMMAND("osd reweight-by-utilization " \
"name=oload,type=CephInt,req=false " \
"name=max_change,type=CephFloat,req=false " \
"name=max_osds,type=CephInt,req=false " \
"name=no_increasing,type=CephChoices,strings=--no-increasing,req=false",\
"reweight OSDs by utilization [overload-percentage-for-consideration, default 120]", \
"osd", "rw", "cli,rest")
COMMAND("osd test-reweight-by-utilization " \
"name=oload,type=CephInt,req=false " \
"name=max_change,type=CephFloat,req=false " \
"name=max_osds,type=CephInt,req=false " \
"name=no_increasing,type=CephChoices,strings=--no-increasing,req=false",\
"dry run of reweight OSDs by utilization [overload-percentage-for-consideration, default 120]", \
"osd", "rw", "cli,rest")
COMMAND("osd reweight-by-pg " \
"name=oload,type=CephInt,req=false " \
"name=max_change,type=CephFloat,req=false " \
"name=max_osds,type=CephInt,req=false " \
"name=pools,type=CephPoolname,n=N,req=false", \
"reweight OSDs by PG distribution [overload-percentage-for-consideration, default 120]", \
"osd", "rw", "cli,rest")
COMMAND("osd test-reweight-by-pg " \
"name=oload,type=CephInt,req=false " \
"name=max_change,type=CephFloat,req=false " \
"name=max_osds,type=CephInt,req=false " \
"name=pools,type=CephPoolname,n=N,req=false", \
"dry run of reweight OSDs by PG distribution [overload-percentage-for-consideration, default 120]", \
"osd", "rw", "cli,rest")

View File

@ -172,7 +172,7 @@ void MonCapGrant::expand_profile_mon(const EntityName& name) const
profile_grants.push_back(MonCapGrant("log", MON_CAP_W));
profile_grants.push_back(MonCapGrant("mon", MON_CAP_R));
profile_grants.push_back(MonCapGrant("mds", MON_CAP_R));
profile_grants.push_back(MonCapGrant("osd", MON_CAP_R));
profile_grants.push_back(MonCapGrant("osd", MON_CAP_R | MON_CAP_W));
profile_grants.push_back(MonCapGrant("config-key", MON_CAP_R));
string prefix = string("daemon-private/mgr/");
profile_grants.push_back(MonCapGrant("config-key get", "key",

View File

@ -466,15 +466,10 @@ public:
* to the MonMap
*/
template<typename Callback, typename...Args>
auto with_monmap(Callback&& cb, Args&&...args) ->
typename std::enable_if<
std::is_void<
decltype(cb(const_cast<const MonMap&>(monmap),
std::forward<Args>(args)...))>::value,
void>::type {
auto with_monmap(Callback&& cb, Args&&...args) const ->
decltype(cb(monmap, std::forward<Args>(args)...)) {
Mutex::Locker l(monc_lock);
std::forward<Callback>(cb)(const_cast<const MonMap&>(monmap),
std::forward<Args>(args)...);
return std::forward<Callback>(cb)(monmap, std::forward<Args>(args)...);
}
private:

View File

@ -638,6 +638,10 @@ COMMAND("osd reweight " \
"name=id,type=CephOsdName " \
"type=CephFloat,name=weight,range=0.0|1.0", \
"reweight osd to 0.0 < <weight> < 1.0", "osd", "rw", "cli,rest")
COMMAND("osd reweightn " \
"name=weights,type=CephString",
"reweight osds with {<id>: <weight>,...})",
"osd", "rw", "cli,rest")
COMMAND("osd pg-temp " \
"name=pgid,type=CephPgid " \
"name=id,type=CephOsdName,n=N,req=false", \
@ -748,34 +752,6 @@ COMMAND("osd pool get-quota " \
COMMAND("osd utilization",
"get basic pg distribution stats",
"osd", "r", "cli,rest")
COMMAND("osd reweight-by-utilization " \
"name=oload,type=CephInt,req=false " \
"name=max_change,type=CephFloat,req=false " \
"name=max_osds,type=CephInt,req=false " \
"name=no_increasing,type=CephChoices,strings=--no-increasing,req=false",\
"reweight OSDs by utilization [overload-percentage-for-consideration, default 120]", \
"osd", "rw", "cli,rest")
COMMAND("osd test-reweight-by-utilization " \
"name=oload,type=CephInt,req=false " \
"name=max_change,type=CephFloat,req=false " \
"name=max_osds,type=CephInt,req=false " \
"name=no_increasing,type=CephChoices,strings=--no-increasing,req=false",\
"dry run of reweight OSDs by utilization [overload-percentage-for-consideration, default 120]", \
"osd", "rw", "cli,rest")
COMMAND("osd reweight-by-pg " \
"name=oload,type=CephInt,req=false " \
"name=max_change,type=CephFloat,req=false " \
"name=max_osds,type=CephInt,req=false " \
"name=pools,type=CephPoolname,n=N,req=false", \
"reweight OSDs by PG distribution [overload-percentage-for-consideration, default 120]", \
"osd", "rw", "cli,rest")
COMMAND("osd test-reweight-by-pg " \
"name=oload,type=CephInt,req=false " \
"name=max_change,type=CephFloat,req=false " \
"name=max_osds,type=CephInt,req=false " \
"name=pools,type=CephPoolname,n=N,req=false", \
"dry run of reweight OSDs by PG distribution [overload-percentage-for-consideration, default 120]", \
"osd", "rw", "cli,rest")
COMMAND("osd df " \
"name=output_method,type=CephChoices,strings=plain|tree,req=false", \
"show OSD utilization", "osd", "r", "cli,rest")

View File

@ -70,6 +70,8 @@
#include "include/str_list.h"
#include "include/str_map.h"
#include "json_spirit/json_spirit_reader.h"
#define dout_subsys ceph_subsys_mon
#define OSD_PG_CREATING_PREFIX "osd_pg_creating"
@ -448,237 +450,6 @@ void OSDMonitor::update_logger()
mon->cluster_logger->set(l_cluster_osd_epoch, osdmap.get_epoch());
}
/* Assign a lower weight to overloaded OSDs.
*
* The osds that will get a lower weight are those with with a utilization
* percentage 'oload' percent greater than the average utilization.
*/
int OSDMonitor::reweight_by_utilization(int oload,
double max_changef,
int max_osds,
bool by_pg, const set<int64_t> *pools,
bool no_increasing,
bool dry_run,
std::stringstream *ss,
std::string *out_str,
Formatter *f)
{
if (oload <= 100) {
*ss << "You must give a percentage higher than 100. "
"The reweighting threshold will be calculated as <average-utilization> "
"times <input-percentage>. For example, an argument of 200 would "
"reweight OSDs which are twice as utilized as the average OSD.\n";
return -EINVAL;
}
const PGMap &pgm = mon->pgmon()->pg_map;
vector<int> pgs_by_osd(osdmap.get_max_osd());
// Avoid putting a small number (or 0) in the denominator when calculating
// average_util
double average_util;
if (by_pg) {
// by pg mapping
double weight_sum = 0.0; // sum up the crush weights
unsigned num_pg_copies = 0;
int num_osds = 0;
for (ceph::unordered_map<pg_t,pg_stat_t>::const_iterator p =
pgm.pg_stat.begin();
p != pgm.pg_stat.end();
++p) {
if (pools && pools->count(p->first.pool()) == 0)
continue;
for (vector<int>::const_iterator q = p->second.acting.begin();
q != p->second.acting.end();
++q) {
if (*q >= (int)pgs_by_osd.size())
pgs_by_osd.resize(*q);
if (pgs_by_osd[*q] == 0) {
if (osdmap.crush->get_item_weightf(*q) <= 0) {
//skip if we currently can not identify item
continue;
}
weight_sum += osdmap.crush->get_item_weightf(*q);
++num_osds;
}
++pgs_by_osd[*q];
++num_pg_copies;
}
}
if (!num_osds || (num_pg_copies / num_osds < g_conf->mon_reweight_min_pgs_per_osd)) {
*ss << "Refusing to reweight: we only have " << num_pg_copies
<< " PGs across " << num_osds << " osds!\n";
return -EDOM;
}
average_util = (double)num_pg_copies / weight_sum;
} else {
// by osd utilization
int num_osd = MAX(1, pgm.osd_stat.size());
if ((uint64_t)pgm.osd_sum.kb * 1024 / num_osd
< g_conf->mon_reweight_min_bytes_per_osd) {
*ss << "Refusing to reweight: we only have " << pgm.osd_sum.kb
<< " kb across all osds!\n";
return -EDOM;
}
if ((uint64_t)pgm.osd_sum.kb_used * 1024 / num_osd
< g_conf->mon_reweight_min_bytes_per_osd) {
*ss << "Refusing to reweight: we only have " << pgm.osd_sum.kb_used
<< " kb used across all osds!\n";
return -EDOM;
}
average_util = (double)pgm.osd_sum.kb_used / (double)pgm.osd_sum.kb;
}
// adjust down only if we are above the threshold
double overload_util = average_util * (double)oload / 100.0;
// but aggressively adjust weights up whenever possible.
double underload_util = average_util;
unsigned max_change = (unsigned)(max_changef * (double)0x10000);
ostringstream oss;
if (f) {
f->open_object_section("reweight_by_utilization");
f->dump_int("overload_min", oload);
f->dump_float("max_change", max_changef);
f->dump_int("max_change_osds", max_osds);
f->dump_float("average_utilization", average_util);
f->dump_float("overload_utilization", overload_util);
} else {
oss << "oload " << oload << "\n";
oss << "max_change " << max_changef << "\n";
oss << "max_change_osds " << max_osds << "\n";
oss.precision(4);
oss << "average_utilization " << std::fixed << average_util << "\n";
oss << "overload_utilization " << overload_util << "\n";
}
bool changed = false;
int num_changed = 0;
// precompute util for each OSD
std::vector<std::pair<int, float> > util_by_osd;
for (ceph::unordered_map<int,osd_stat_t>::const_iterator p =
pgm.osd_stat.begin();
p != pgm.osd_stat.end();
++p) {
std::pair<int, float> osd_util;
osd_util.first = p->first;
if (by_pg) {
if (p->first >= (int)pgs_by_osd.size() ||
pgs_by_osd[p->first] == 0) {
// skip if this OSD does not contain any pg
// belonging to the specified pool(s).
continue;
}
if (osdmap.crush->get_item_weightf(p->first) <= 0) {
// skip if we are unable to locate item.
continue;
}
osd_util.second = pgs_by_osd[p->first] / osdmap.crush->get_item_weightf(p->first);
} else {
osd_util.second = (double)p->second.kb_used / (double)p->second.kb;
}
util_by_osd.push_back(osd_util);
}
// sort by absolute deviation from the mean utilization,
// in descending order.
std::sort(util_by_osd.begin(), util_by_osd.end(),
[average_util](std::pair<int, float> l, std::pair<int, float> r) {
return abs(l.second - average_util) > abs(r.second - average_util);
}
);
OSDMap::Incremental newinc;
if (f)
f->open_array_section("reweights");
for (std::vector<std::pair<int, float> >::const_iterator p =
util_by_osd.begin();
p != util_by_osd.end();
++p) {
unsigned weight = osdmap.get_weight(p->first);
if (weight == 0) {
// skip if OSD is currently out
continue;
}
float util = p->second;
if (util >= overload_util) {
// Assign a lower weight to overloaded OSDs. The current weight
// is a factor to take into account the original weights,
// to represent e.g. differing storage capacities
unsigned new_weight = (unsigned)((average_util / util) * (float)weight);
if (weight > max_change)
new_weight = MAX(new_weight, weight - max_change);
newinc.new_weight[p->first] = new_weight;
if (!dry_run) {
pending_inc.new_weight[p->first] = new_weight;
changed = true;
}
if (f) {
f->open_object_section("osd");
f->dump_int("osd", p->first);
f->dump_float("weight", (float)weight / (float)0x10000);
f->dump_float("new_weight", (float)new_weight / (float)0x10000);
f->close_section();
} else {
oss << "osd." << p->first << " weight "
<< (float)weight / (float)0x10000 << " -> "
<< (float)new_weight / (float)0x10000 << "\n";
}
if (++num_changed >= max_osds)
break;
}
if (!no_increasing && util <= underload_util) {
// assign a higher weight.. if we can.
unsigned new_weight = (unsigned)((average_util / util) * (float)weight);
new_weight = MIN(new_weight, weight + max_change);
if (new_weight > 0x10000)
new_weight = 0x10000;
if (new_weight > weight) {
newinc.new_weight[p->first] = new_weight;
if (!dry_run) {
pending_inc.new_weight[p->first] = new_weight;
changed = true;
}
oss << "osd." << p->first << " weight "
<< (float)weight / (float)0x10000 << " -> "
<< (float)new_weight / (float)0x10000 << "\n";
if (++num_changed >= max_osds)
break;
}
}
}
if (f) {
f->close_section();
}
OSDMap newmap;
newmap.deepish_copy_from(osdmap);
newinc.fsid = newmap.fsid;
newinc.epoch = newmap.get_epoch() + 1;
newmap.apply_incremental(newinc);
osdmap.summarize_mapping_stats(&newmap, pools, out_str, f);
if (f) {
f->close_section();
} else {
*out_str += "\n";
*out_str += oss.str();
}
dout(10) << "reweight_by_utilization: finished with " << out_str << dendl;
return changed;
}
template <typename F>
class OSDUtilizationDumper : public CrushTreeDumper::Dumper<F> {
public:
@ -6075,6 +5846,42 @@ bool OSDMonitor::prepare_command(MonOpRequestRef op)
return prepare_command_impl(op, cmdmap);
}
static int parse_reweights(CephContext *cct,
const map<string,cmd_vartype> &cmdmap,
const OSDMap& osdmap,
map<int32_t, uint32_t>* weights)
{
string weights_str;
if (!cmd_getval(g_ceph_context, cmdmap, "weights", weights_str)) {
return -EINVAL;
}
std::replace(begin(weights_str), end(weights_str), '\'', '"');
json_spirit::mValue json_value;
if (!json_spirit::read(weights_str, json_value)) {
return -EINVAL;
}
if (json_value.type() != json_spirit::obj_type) {
return -EINVAL;
}
const auto obj = json_value.get_obj();
try {
for (auto& osd_weight : obj) {
auto osd_id = std::stoi(osd_weight.first);
if (!osdmap.exists(osd_id)) {
return -ENOENT;
}
if (osd_weight.second.type() != json_spirit::str_type) {
return -EINVAL;
}
auto weight = std::stoul(osd_weight.second.get_str());
weights->insert({osd_id, weight});
}
} catch (const std::logic_error& e) {
return -EINVAL;
}
return 0;
}
bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
map<string,cmd_vartype> &cmdmap)
{
@ -7688,6 +7495,19 @@ bool OSDMonitor::prepare_command_impl(MonOpRequestRef op,
err = -ENOENT;
goto reply;
}
} else if (prefix == "osd reweightn") {
map<int32_t, uint32_t> weights;
err = parse_reweights(g_ceph_context, cmdmap, osdmap, &weights);
if (err) {
ss << "unable to parse 'weights' value '"
<< cmd_vartype_stringify(cmdmap["weights"]) << "'";
goto reply;
}
pending_inc.new_weight = std::move(weights);
wait_for_finished_proposal(
op,
new Monitor::C_Command(mon, op, 0, rs, rdata, get_last_committed() + 1));
return true;
} else if (prefix == "osd lost") {
int64_t id;
if (!cmd_getval(g_ceph_context, cmdmap, "id", id)) {
@ -8747,24 +8567,31 @@ done:
string no_increasing;
cmd_getval(g_ceph_context, cmdmap, "no_increasing", no_increasing);
string out_str;
err = reweight_by_utilization(oload,
max_change,
max_osds,
by_pg,
pools.empty() ? NULL : &pools,
no_increasing == "--no-increasing",
dry_run,
&ss, &out_str, f.get());
map<int32_t, uint32_t> new_weights;
err = reweight::by_utilization(osdmap,
mon->pgmon()->pg_map,
oload,
max_change,
max_osds,
by_pg,
pools.empty() ? NULL : &pools,
no_increasing == "--no-increasing",
&new_weights,
&ss, &out_str, f.get());
if (err >= 0) {
dout(10) << "reweight::by_utilization: finished with " << out_str << dendl;
}
if (f)
f->flush(rdata);
else
rdata.append(out_str);
if (err < 0) {
ss << "FAILED reweight-by-pg";
} else if (err == 0) {
} else if (err == 0 || dry_run) {
ss << "no change";
} else {
ss << "SUCCESSFUL reweight-by-pg";
pending_inc.new_weight = std::move(new_weights);
wait_for_finished_proposal(
op,
new Monitor::C_Command(mon, op, 0, rs, rdata, get_last_committed() + 1));

View File

@ -240,16 +240,6 @@ public:
MonOpRequestRef req = MonOpRequestRef());
private:
int reweight_by_utilization(int oload,
double max_change,
int max_osds,
bool by_pg,
const set<int64_t> *pools,
bool no_increasing,
bool dry_run,
std::stringstream *ss,
std::string *out_str,
Formatter *f);
void print_utilization(ostream &out, Formatter *f, bool tree) const;
bool check_source(PaxosServiceMessage *m, uuid_d fsid);

View File

@ -2884,3 +2884,210 @@ void PGMapUpdater::check_down_pgs(
}
}
}
int reweight::by_utilization(
const OSDMap &osdmap,
const PGMap &pgm,
int oload,
double max_changef,
int max_osds,
bool by_pg, const set<int64_t> *pools,
bool no_increasing,
map<int32_t, uint32_t>* new_weights,
std::stringstream *ss,
std::string *out_str,
Formatter *f)
{
if (oload <= 100) {
*ss << "You must give a percentage higher than 100. "
"The reweighting threshold will be calculated as <average-utilization> "
"times <input-percentage>. For example, an argument of 200 would "
"reweight OSDs which are twice as utilized as the average OSD.\n";
return -EINVAL;
}
vector<int> pgs_by_osd(osdmap.get_max_osd());
// Avoid putting a small number (or 0) in the denominator when calculating
// average_util
double average_util;
if (by_pg) {
// by pg mapping
double weight_sum = 0.0; // sum up the crush weights
unsigned num_pg_copies = 0;
int num_osds = 0;
for (const auto& pg : pgm.pg_stat) {
if (pools && pools->count(pg.first.pool()) == 0)
continue;
for (const auto acting : pg.second.acting) {
if (acting >= (int)pgs_by_osd.size())
pgs_by_osd.resize(acting);
if (pgs_by_osd[acting] == 0) {
if (osdmap.crush->get_item_weightf(acting) <= 0) {
//skip if we currently can not identify item
continue;
}
weight_sum += osdmap.crush->get_item_weightf(acting);
++num_osds;
}
++pgs_by_osd[acting];
++num_pg_copies;
}
}
if (!num_osds || (num_pg_copies / num_osds < g_conf->mon_reweight_min_pgs_per_osd)) {
*ss << "Refusing to reweight: we only have " << num_pg_copies
<< " PGs across " << num_osds << " osds!\n";
return -EDOM;
}
average_util = (double)num_pg_copies / weight_sum;
} else {
// by osd utilization
int num_osd = MAX(1, pgm.osd_stat.size());
if ((uint64_t)pgm.osd_sum.kb * 1024 / num_osd
< g_conf->mon_reweight_min_bytes_per_osd) {
*ss << "Refusing to reweight: we only have " << pgm.osd_sum.kb
<< " kb across all osds!\n";
return -EDOM;
}
if ((uint64_t)pgm.osd_sum.kb_used * 1024 / num_osd
< g_conf->mon_reweight_min_bytes_per_osd) {
*ss << "Refusing to reweight: we only have " << pgm.osd_sum.kb_used
<< " kb used across all osds!\n";
return -EDOM;
}
average_util = (double)pgm.osd_sum.kb_used / (double)pgm.osd_sum.kb;
}
// adjust down only if we are above the threshold
const double overload_util = average_util * (double)oload / 100.0;
// but aggressively adjust weights up whenever possible.
const double underload_util = average_util;
const unsigned max_change = (unsigned)(max_changef * (double)0x10000);
ostringstream oss;
if (f) {
f->open_object_section("reweight_by_utilization");
f->dump_int("overload_min", oload);
f->dump_float("max_change", max_changef);
f->dump_int("max_change_osds", max_osds);
f->dump_float("average_utilization", average_util);
f->dump_float("overload_utilization", overload_util);
} else {
oss << "oload " << oload << "\n";
oss << "max_change " << max_changef << "\n";
oss << "max_change_osds " << max_osds << "\n";
oss.precision(4);
oss << "average_utilization " << std::fixed << average_util << "\n";
oss << "overload_utilization " << overload_util << "\n";
}
int num_changed = 0;
// precompute util for each OSD
std::vector<std::pair<int, float> > util_by_osd;
for (const auto& p : pgm.osd_stat) {
std::pair<int, float> osd_util;
osd_util.first = p.first;
if (by_pg) {
if (p.first >= (int)pgs_by_osd.size() ||
pgs_by_osd[p.first] == 0) {
// skip if this OSD does not contain any pg
// belonging to the specified pool(s).
continue;
}
if (osdmap.crush->get_item_weightf(p.first) <= 0) {
// skip if we are unable to locate item.
continue;
}
osd_util.second = pgs_by_osd[p.first] / osdmap.crush->get_item_weightf(p.first);
} else {
osd_util.second = (double)p.second.kb_used / (double)p.second.kb;
}
util_by_osd.push_back(osd_util);
}
// sort by absolute deviation from the mean utilization,
// in descending order.
std::sort(util_by_osd.begin(), util_by_osd.end(),
[average_util](std::pair<int, float> l, std::pair<int, float> r) {
return abs(l.second - average_util) > abs(r.second - average_util);
}
);
if (f)
f->open_array_section("reweights");
for (const auto& p : util_by_osd) {
unsigned weight = osdmap.get_weight(p.first);
if (weight == 0) {
// skip if OSD is currently out
continue;
}
float util = p.second;
if (util >= overload_util) {
// Assign a lower weight to overloaded OSDs. The current weight
// is a factor to take into account the original weights,
// to represent e.g. differing storage capacities
unsigned new_weight = (unsigned)((average_util / util) * (float)weight);
if (weight > max_change)
new_weight = MAX(new_weight, weight - max_change);
new_weights->insert({p.first, new_weight});
if (f) {
f->open_object_section("osd");
f->dump_int("osd", p.first);
f->dump_float("weight", (float)weight / (float)0x10000);
f->dump_float("new_weight", (float)new_weight / (float)0x10000);
f->close_section();
} else {
oss << "osd." << p.first << " weight "
<< (float)weight / (float)0x10000 << " -> "
<< (float)new_weight / (float)0x10000 << "\n";
}
if (++num_changed >= max_osds)
break;
}
if (!no_increasing && util <= underload_util) {
// assign a higher weight.. if we can.
unsigned new_weight = (unsigned)((average_util / util) * (float)weight);
new_weight = MIN(new_weight, weight + max_change);
if (new_weight > 0x10000)
new_weight = 0x10000;
if (new_weight > weight) {
new_weights->insert({p.first, new_weight});
oss << "osd." << p.first << " weight "
<< (float)weight / (float)0x10000 << " -> "
<< (float)new_weight / (float)0x10000 << "\n";
if (++num_changed >= max_osds)
break;
}
}
}
if (f) {
f->close_section();
}
OSDMap newmap;
newmap.deepish_copy_from(osdmap);
OSDMap::Incremental newinc;
newinc.fsid = newmap.get_fsid();
newinc.epoch = newmap.get_epoch() + 1;
newinc.new_weight = *new_weights;
newmap.apply_incremental(newinc);
osdmap.summarize_mapping_stats(&newmap, pools, out_str, f);
if (f) {
f->close_section();
} else {
*out_str += "\n";
*out_str += oss.str();
}
return num_changed;
}

View File

@ -440,4 +440,23 @@ public:
PGMap::Incremental *pending_inc);
};
namespace reweight {
/* Assign a lower weight to overloaded OSDs.
*
* The osds that will get a lower weight are those with with a utilization
* percentage 'oload' percent greater than the average utilization.
*/
int by_utilization(const OSDMap &osd_map,
const PGMap &pg_map,
int oload,
double max_changef,
int max_osds,
bool by_pg, const set<int64_t> *pools,
bool no_increasing,
map<int32_t, uint32_t>* new_weights,
std::stringstream *ss,
std::string *out_str,
Formatter *f);
}
#endif

View File

@ -1952,28 +1952,10 @@ private:
// here or you will have great woe and misery.
template<typename Callback, typename...Args>
auto with_osdmap(Callback&& cb, Args&&...args) ->
typename std::enable_if<
std::is_void<
decltype(cb(const_cast<const OSDMap&>(*osdmap),
std::forward<Args>(args)...))>::value,
void>::type {
auto with_osdmap(Callback&& cb, Args&&... args) const ->
decltype(cb(*osdmap, std::forward<Args>(args)...)) {
shared_lock l(rwlock);
std::forward<Callback>(cb)(const_cast<const OSDMap&>(*osdmap),
std::forward<Args>(args)...);
}
template<typename Callback, typename...Args>
auto with_osdmap(Callback&& cb, Args&&... args) ->
typename std::enable_if<
!std::is_void<
decltype(cb(const_cast<const OSDMap&>(*osdmap),
std::forward<Args>(args)...))>::value,
decltype(cb(const_cast<const OSDMap&>(*osdmap),
std::forward<Args>(args)...))>::type {
shared_lock l(rwlock);
return std::forward<Callback>(cb)(const_cast<const OSDMap&>(*osdmap),
std::forward<Args>(args)...);
return std::forward<Callback>(cb)(*osdmap, std::forward<Args>(args)...);
}

View File

@ -436,6 +436,7 @@ $extra_conf
mgr modules = rest fsstatus
mgr data = $CEPH_DEV_DIR/mgr.\$id
mgr module path = $MGR_PYTHON_PATH
mon reweight min pgs per osd = 4
$DAEMONOPTS
$CMGRDEBUG
$extra_conf