Merge pull request #14883 from tchaikov/wip-mgr-misc

mgr: Misc. bug fixes

Reviewed-by: Sage Weil <sage@redhat.com>
Reviewed-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2017-05-01 09:15:18 +08:00 committed by GitHub
commit 61a87c2c31
8 changed files with 203 additions and 135 deletions

View File

@ -142,7 +142,7 @@ class Finisher {
/// Construct a named Finisher that logs its queue length.
Finisher(CephContext *cct_, string name, string tn) :
cct(cct_), finisher_lock("Finisher::finisher_lock"),
cct(cct_), finisher_lock("Finisher::" + name),
finisher_stop(false), finisher_running(false),
thread_name(tn), logger(0),
finisher_thread(this) {

View File

@ -30,6 +30,7 @@
#define dout_prefix *_dout << "mgr.server " << __func__ << " "
DaemonServer::DaemonServer(MonClient *monc_,
Finisher &finisher_,
DaemonStateIndex &daemon_state_,
ClusterState &cluster_state_,
PyModules &py_modules_,
@ -54,6 +55,7 @@ DaemonServer::DaemonServer(MonClient *monc_,
g_conf->mgr_mon_messages)),
msgr(nullptr),
monc(monc_),
finisher(finisher_),
daemon_state(daemon_state_),
cluster_state(cluster_state_),
py_modules(py_modules_),
@ -371,40 +373,82 @@ bool DaemonServer::_allowed_command(
return capable;
}
class ReplyOnFinish : public Context {
DaemonServer* mgr;
MCommand *m;
bufferlist odata;
public:
bufferlist from_mon;
string outs;
ReplyOnFinish(DaemonServer* mgr, MCommand *m, bufferlist&& odata)
: mgr(mgr), m(m), odata(std::move(odata))
{}
void finish(int r) override {
odata.claim_append(from_mon);
mgr->_reply(m, r, outs, odata);
}
};
bool DaemonServer::handle_command(MCommand *m)
{
int r = 0;
std::stringstream ss;
bufferlist odata;
std::string prefix;
assert(lock.is_locked_by_me());
cmdmap_t cmdmap;
/**
* The working data for processing an MCommand. This lives in
* a class to enable passing it into other threads for processing
* outside of the thread/locks that called handle_command.
*/
class CommandContext
{
public:
MCommand *m;
bufferlist odata;
cmdmap_t cmdmap;
// TODO background the call into python land so that we don't
// block a messenger thread on python code.
CommandContext(MCommand *m_)
: m(m_)
{
}
ConnectionRef con = m->get_connection();
MgrSessionRef session(static_cast<MgrSession*>(con->get_priv()));
~CommandContext()
{
m->put();
}
void reply(int r, const std::stringstream &ss)
{
reply(r, ss.str());
}
void reply(int r, const std::string &rs)
{
// Let the connection drop as soon as we've sent our response
ConnectionRef con = m->get_connection();
if (con) {
con->mark_disposable();
}
dout(1) << "do_command r=" << r << " " << rs << dendl;
if (con) {
MCommandReply *reply = new MCommandReply(r, rs);
reply->set_tid(m->get_tid());
reply->set_data(odata);
con->send_message(reply);
}
}
};
/**
* A context for receiving a bufferlist/error string from a background
* function and then calling back to a CommandContext when it's done
*/
class ReplyOnFinish : public Context {
std::shared_ptr<CommandContext> cmdctx;
public:
bufferlist from_mon;
string outs;
ReplyOnFinish(std::shared_ptr<CommandContext> cmdctx_)
: cmdctx(cmdctx_)
{}
void finish(int r) override {
cmdctx->odata.claim_append(from_mon);
cmdctx->reply(r, outs);
}
};
std::shared_ptr<CommandContext> cmdctx = std::make_shared<CommandContext>(m);
MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
if (!session) {
return true;
}
@ -412,23 +456,23 @@ bool DaemonServer::handle_command(MCommand *m)
if (session->inst.name == entity_name_t())
session->inst.name = m->get_source();
string format;
std::string format;
boost::scoped_ptr<Formatter> f;
const MgrCommand *mgr_cmd;
map<string,string> param_str_map;
if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
return _reply(m, -EINVAL, ss.str(), odata);
if (!cmdmap_from_json(m->cmd, &(cmdctx->cmdmap), ss)) {
cmdctx->reply(-EINVAL, ss);
return true;
}
{
cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
cmd_getval(g_ceph_context, cmdctx->cmdmap, "format", format, string("plain"));
f.reset(Formatter::create(format));
}
dout(4) << "decoded " << cmdmap.size() << dendl;
cmd_getval(cct, cmdmap, "prefix", prefix);
cmd_getval(cct, cmdctx->cmdmap, "prefix", prefix);
dout(4) << "decoded " << cmdctx->cmdmap.size() << dendl;
dout(4) << "prefix=" << prefix << dendl;
if (prefix == "get_command_descriptions") {
@ -460,26 +504,36 @@ bool DaemonServer::handle_command(MCommand *m)
}
#endif
f.close_section(); // command_descriptions
f.flush(odata);
return _reply(m, r, ss.str(), odata);
f.flush(cmdctx->odata);
cmdctx->reply(0, ss);
return true;
}
// lookup command
mgr_cmd = _get_mgrcommand(prefix, mgr_commands,
const MgrCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands,
ARRAY_SIZE(mgr_commands));
_generate_command_map(cmdmap, param_str_map);
_generate_command_map(cmdctx->cmdmap, param_str_map);
if (!mgr_cmd) {
return _reply(m, -EINVAL, "command not supported", odata);
}
// validate user's permissions for requested command
if (!_allowed_command(session.get(), mgr_cmd->module, prefix, cmdmap,
param_str_map, mgr_cmd)) {
dout(1) << __func__ << " access denied" << dendl;
audit_clog->info() << "from='" << session->inst << "' "
<< "entity='" << session->entity_name << "' "
<< "cmd=" << m->cmd << ": access denied";
return _reply(m, -EACCES, "access denied", odata);
MgrCommand py_command = {"", "", "py", "rw", "cli"};
if (!_allowed_command(session.get(), py_command.module, prefix, cmdctx->cmdmap,
param_str_map, &py_command)) {
dout(1) << " access denied" << dendl;
ss << "access denied";
cmdctx->reply(-EACCES, ss);
return true;
}
} else {
// validate user's permissions for requested command
if (!_allowed_command(session.get(), mgr_cmd->module, prefix, cmdctx->cmdmap,
param_str_map, mgr_cmd)) {
dout(1) << " access denied" << dendl;
audit_clog->info() << "from='" << session->inst << "' "
<< "entity='" << session->entity_name << "' "
<< "cmd=" << m->cmd << ": access denied";
ss << "access denied";
cmdctx->reply(-EACCES, ss);
return true;
}
}
audit_clog->debug()
@ -496,10 +550,11 @@ bool DaemonServer::handle_command(MCommand *m)
string scrubop = prefix.substr(3, string::npos);
pg_t pgid;
string pgidstr;
cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr);
cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
if (!pgid.parse(pgidstr.c_str())) {
ss << "invalid pgid '" << pgidstr << "'";
return _reply(m, -EINVAL, ss.str(), odata);
cmdctx->reply(-EINVAL, ss);
return true;
}
bool pg_exists = false;
cluster_state.with_osdmap([&](const OSDMap& osdmap) {
@ -507,7 +562,8 @@ bool DaemonServer::handle_command(MCommand *m)
});
if (!pg_exists) {
ss << "pg " << pgid << " dne";
return _reply(m, -ENOENT, ss.str(), odata);
cmdctx->reply(-ENOENT, ss);
return true;
}
int acting_primary = -1;
entity_inst_t inst;
@ -519,7 +575,8 @@ bool DaemonServer::handle_command(MCommand *m)
});
if (acting_primary == -1) {
ss << "pg " << pgid << " has no primary osd";
return _reply(m, -EAGAIN, ss.str(), odata);
cmdctx->reply(-EAGAIN, ss);
return true;
}
vector<pg_t> pgs = { pgid };
msgr->send_message(new MOSDScrub(monc->get_fsid(),
@ -529,7 +586,8 @@ bool DaemonServer::handle_command(MCommand *m)
inst);
ss << "instructing pg " << pgid << " on osd." << acting_primary
<< " (" << inst << ") to " << scrubop;
return _reply(m, 0, ss.str(), odata);
cmdctx->reply(0, ss);
return true;
} else if (prefix == "osd reweight-by-pg" ||
prefix == "osd reweight-by-utilization" ||
prefix == "osd test-reweight-by-pg" ||
@ -540,10 +598,10 @@ bool DaemonServer::handle_command(MCommand *m)
prefix == "osd test-reweight-by-pg" ||
prefix == "osd test-reweight-by-utilization";
int64_t oload;
cmd_getval(g_ceph_context, cmdmap, "oload", oload, int64_t(120));
cmd_getval(g_ceph_context, cmdctx->cmdmap, "oload", oload, int64_t(120));
set<int64_t> pools;
vector<string> poolnames;
cmd_getval(g_ceph_context, cmdmap, "pools", poolnames);
cmd_getval(g_ceph_context, cmdctx->cmdmap, "pools", poolnames);
cluster_state.with_osdmap([&](const OSDMap& osdmap) {
for (const auto& poolname : poolnames) {
int64_t pool = osdmap.lookup_pg_pool_name(poolname);
@ -555,22 +613,25 @@ bool DaemonServer::handle_command(MCommand *m)
}
});
if (r) {
return _reply(m, r, ss.str(), odata);
cmdctx->reply(r, ss);
return true;
}
double max_change = g_conf->mon_reweight_max_change;
cmd_getval(g_ceph_context, cmdmap, "max_change", max_change);
cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_change", max_change);
if (max_change <= 0.0) {
ss << "max_change " << max_change << " must be positive";
return _reply(m, -EINVAL, ss.str(), odata);
cmdctx->reply(-EINVAL, ss);
return true;
}
int64_t max_osds = g_conf->mon_reweight_max_osds;
cmd_getval(g_ceph_context, cmdmap, "max_osds", max_osds);
cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_osds", max_osds);
if (max_osds <= 0) {
ss << "max_osds " << max_osds << " must be positive";
return _reply(m, -EINVAL, ss.str(), odata);
cmdctx->reply(-EINVAL, ss);
return true;
}
string no_increasing;
cmd_getval(g_ceph_context, cmdmap, "no_increasing", no_increasing);
cmd_getval(g_ceph_context, cmdctx->cmdmap, "no_increasing", no_increasing);
string out_str;
mempool::osdmap::map<int32_t, uint32_t> new_weights;
r = cluster_state.with_pgmap([&](const PGMap& pgmap) {
@ -589,16 +650,19 @@ bool DaemonServer::handle_command(MCommand *m)
if (r >= 0) {
dout(10) << "reweight::by_utilization: finished with " << out_str << dendl;
}
if (f)
f->flush(odata);
else
odata.append(out_str);
if (f) {
f->flush(cmdctx->odata);
} else {
cmdctx->odata.append(out_str);
}
if (r < 0) {
ss << "FAILED reweight-by-pg";
return _reply(m, r, ss.str(), odata);
cmdctx->reply(r, ss);
return true;
} else if (r == 0 || dry_run) {
ss << "no change";
return _reply(m, r, ss.str(), odata);
cmdctx->reply(r, ss);
return true;
} else {
json_spirit::Object json_object;
for (const auto& osd_weight : new_weights) {
@ -613,7 +677,7 @@ bool DaemonServer::handle_command(MCommand *m)
"\"prefix\": \"osd reweightn\", "
"\"weights\": \"" + s + "\""
"}";
auto on_finish = new ReplyOnFinish(this, m, std::move(odata));
auto on_finish = new ReplyOnFinish(cmdctx);
monc->start_mon_command({cmd}, {},
&on_finish->from_mon, &on_finish->outs, on_finish);
return true;
@ -621,63 +685,46 @@ bool DaemonServer::handle_command(MCommand *m)
} else {
r = cluster_state.with_pgmap([&](const PGMap& pg_map) {
return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
return process_pg_map_command(prefix, cmdmap, pg_map, osdmap,
f.get(), &ss, &odata);
return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap,
f.get(), &ss, &cmdctx->odata);
});
});
}
if (r != -EOPNOTSUPP)
return _reply(m, r, ss.str(), odata);
// fall back to registered python handlers
else {
// Let's find you a handler!
MgrPyModule *handler = nullptr;
auto py_commands = py_modules.get_commands();
for (const auto &pyc : py_commands) {
auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl;
if (pyc_prefix == prefix) {
handler = pyc.handler;
break;
}
if (r != -EOPNOTSUPP) {
cmdctx->reply(r, ss);
return true;
}
if (handler == nullptr) {
ss << "No handler found for '" << prefix << "'";
dout(4) << "No handler found for '" << prefix << "'" << dendl;
return _reply(m, -EINVAL, ss.str(), odata);
}
// FIXME: go run this python part in another thread, not inline
// with a ms_dispatch, so that the python part can block if it
// wants to.
dout(4) << "passing through " << cmdmap.size() << dendl;
stringstream ds;
r = handler->handle_command(cmdmap, &ds, &ss);
odata.append(ds);
return _reply(m, 0, ss.str(), odata);
}
}
bool DaemonServer::_reply(MCommand* m,
int ret,
const std::string& s,
const bufferlist& payload)
{
dout(1) << __func__ << " r=" << ret << " " << s << dendl;
auto con = m->get_connection();
if (!con) {
dout(10) << __func__ << " connection dropped for command" << dendl;
m->put();
// None of the special native commands,
MgrPyModule *handler = nullptr;
auto py_commands = py_modules.get_commands();
for (const auto &pyc : py_commands) {
auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl;
if (pyc_prefix == prefix) {
handler = pyc.handler;
break;
}
}
if (handler == nullptr) {
ss << "No handler found for '" << prefix << "'";
dout(4) << "No handler found for '" << prefix << "'" << dendl;
cmdctx->reply(-EINVAL, ss);
return true;
} else {
// Okay, now we have a handler to call, but we must not call it
// in this thread, because the python handlers can do anything,
// including blocking, and including calling back into mgr.
dout(4) << "passing through " << cmdctx->cmdmap.size() << dendl;
finisher.queue(new FunctionContext([cmdctx, handler](int r_) {
std::stringstream ds;
std::stringstream ss;
int r = handler->handle_command(cmdctx->cmdmap, &ds, &ss);
cmdctx->odata.append(ds);
cmdctx->reply(r, ss);
}));
return true;
}
// Let the connection drop as soon as we've sent our response
con->mark_disposable();
auto response = new MCommandReply(ret, s);
response->set_tid(m->get_tid());
response->set_data(payload);
con->send_message(response);
m->put();
return true;
}

View File

@ -54,6 +54,7 @@ protected:
Messenger *msgr;
MonClient *monc;
Finisher &finisher;
DaemonStateIndex &daemon_state;
ClusterState &cluster_state;
PyModules &py_modules;
@ -85,6 +86,7 @@ public:
entity_addr_t get_myaddr() const;
DaemonServer(MonClient *monc_,
Finisher &finisher_,
DaemonStateIndex &daemon_state_,
ClusterState &cluster_state_,
PyModules &py_modules_,

View File

@ -48,7 +48,8 @@ Mgr::Mgr(MonClient *monc_, Messenger *clientm_, Objecter *objecter_,
finisher(g_ceph_context, "Mgr", "mgr-fin"),
py_modules(daemon_state, cluster_state, *monc, finisher),
cluster_state(monc, nullptr),
server(monc, daemon_state, cluster_state, py_modules, clog_, audit_clog_),
server(monc, finisher, daemon_state, cluster_state, py_modules,
clog_, audit_clog_),
initialized(false),
initializing(false)
{
@ -353,17 +354,20 @@ void Mgr::load_config()
void Mgr::shutdown()
{
// FIXME: pre-empt init() if it is currently running, so that it will
// give up the lock for us.
Mutex::Locker l(lock);
finisher.queue(new FunctionContext([&](int) {
// First stop the server so that we're not taking any more incoming
// requests
server.shutdown();
{
Mutex::Locker l(lock);
monc->sub_unwant("log-info");
monc->sub_unwant("mgrdigest");
monc->sub_unwant("fsmap");
// First stop the server so that we're not taking any more incoming
// requests
server.shutdown();
}
// after the messenger is stopped, signal modules to shutdown via finisher
py_modules.shutdown();
}));
// Then stop the finisher to ensure its enqueued contexts aren't going
// to touch references to the things we're about to tear down
finisher.wait_for_empty();
@ -445,6 +449,8 @@ void Mgr::handle_log(MLog *m)
for (const auto &e : m->entries) {
py_modules.notify_all(e);
}
m->put();
}
bool Mgr::ms_dispatch(Message *m)
@ -468,11 +474,12 @@ bool Mgr::ms_dispatch(Message *m)
ceph_abort();
py_modules.notify_all("mon_map", "");
m->put();
break;
case CEPH_MSG_FS_MAP:
py_modules.notify_all("fs_map", "");
handle_fs_map((MFSMap*)m);
m->put();
return false; // I shall let this pass through for Client
break;
case CEPH_MSG_OSD_MAP:
handle_osd_map();
@ -486,7 +493,6 @@ bool Mgr::ms_dispatch(Message *m)
break;
case MSG_LOG:
handle_log(static_cast<MLog *>(m));
m->put();
break;
default:
@ -570,6 +576,7 @@ void Mgr::handle_mgr_digest(MMgrDigest* m)
// the pgmap might have changed since last time we were here.
py_modules.notify_all("pg_summary", "");
dout(10) << "done." << dendl;
m->put();
}

View File

@ -242,6 +242,8 @@ void MgrStandby::handle_mgr_map(MMgrMap* mmap)
active_mgr.reset();
}
}
mmap->put();
}
bool MgrStandby::ms_dispatch(Message *m)
@ -256,13 +258,14 @@ bool MgrStandby::ms_dispatch(Message *m)
default:
if (active_mgr) {
return active_mgr->ms_dispatch(m);
lock.Unlock();
active_mgr->ms_dispatch(m);
lock.Lock();
} else {
return false;
}
}
m->put();
return true;
}

View File

@ -126,7 +126,12 @@ public:
dump(f);
} else {
if (get_active_gid() != 0) {
*ss << "active: " << get_active_name() << " ";
*ss << "active: " << get_active_name();
if (!available) {
// If the daemon hasn't gone active yet, indicate that.
*ss << "(starting)";
}
*ss << " ";
} else {
*ss << "no daemons active ";
}

View File

@ -263,7 +263,9 @@ void MgrMonitor::check_sub(Subscription *sub)
}
} else {
assert(sub->type == "mgrdigest");
send_digests();
if (digest_callback == nullptr) {
send_digests();
}
}
}
@ -536,7 +538,9 @@ bool MgrMonitor::prepare_command(MonOpRequestRef op)
void MgrMonitor::init()
{
send_digests(); // To get it to schedule its own event
if (digest_callback == nullptr) {
send_digests(); // To get it to schedule its own event
}
}
void MgrMonitor::on_shutdown()

View File

@ -657,7 +657,7 @@ EOF
EOF
fi
prun $SUDO "$CEPH_BIN/ceph-authtool" --create-keyring --gen-key --name="mds.$name" "$key_fn"
ceph_adm -i "$key_fn" auth add "mds.$name" mon 'allow profile mds' osd 'allow *' mds 'allow' mgr 'allow'
ceph_adm -i "$key_fn" auth add "mds.$name" mon 'allow profile mds' osd 'allow *' mds 'allow' mgr 'allow profile mds'
if [ "$standby" -eq 1 ]; then
prun $SUDO "$CEPH_BIN/ceph-authtool" --create-keyring --gen-key --name="mds.${name}s" \
"$CEPH_DEV_DIR/mds.${name}s/keyring"