mgr/DaemonServer: handle MMgrReports in parallel

The DaemonStateIndex locking is sufficient to make all
the report processing safe: holding DaemonServer::lock
through all ms_dispatch was unnecessarily serializing
dispatch.

Signed-off-by: John Spray <john.spray@redhat.com>
This commit is contained in:
John Spray 2017-08-28 07:29:36 -04:00
parent 806f10847c
commit 64af9d3da0

View File

@ -253,8 +253,9 @@ bool DaemonServer::ms_handle_refused(Connection *con)
bool DaemonServer::ms_dispatch(Message *m)
{
Mutex::Locker l(lock);
// Note that we do *not* take ::lock here, in order to avoid
// serializing all message handling. It's up to each handler
// to take whatever locks it needs.
switch (m->get_type()) {
case MSG_PGSTATS:
cluster_state.ingest_pgstats(static_cast<MPGStats*>(m));
@ -275,6 +276,8 @@ bool DaemonServer::ms_dispatch(Message *m)
void DaemonServer::maybe_ready(int32_t osd_id)
{
Mutex::Locker l(lock);
if (!pgmap_ready && reported_osds.find(osd_id) == reported_osds.end()) {
dout(4) << "initial report from osd " << osd_id << dendl;
reported_osds.insert(osd_id);
@ -314,6 +317,8 @@ void DaemonServer::shutdown()
bool DaemonServer::handle_open(MMgrOpen *m)
{
Mutex::Locker l(lock);
DaemonKey key;
if (!m->service_name.empty()) {
key.first = m->service_name;
@ -400,6 +405,7 @@ bool DaemonServer::handle_report(MMgrReport *m)
return true;
}
// Look up the DaemonState
DaemonStatePtr daemon;
if (daemon_state.exists(key)) {
dout(20) << "updating existing DaemonState for " << key << dendl;
@ -414,12 +420,26 @@ bool DaemonServer::handle_report(MMgrReport *m)
// daemons without sessions, and ensuring that session open
// always contains metadata.
}
// Update the DaemonState
assert(daemon != nullptr);
auto &daemon_counters = daemon->perf_counters;
{
Mutex::Locker l(daemon->lock);
auto &daemon_counters = daemon->perf_counters;
daemon_counters.update(m);
if (daemon->service_daemon) {
utime_t now = ceph_clock_now();
if (m->daemon_status) {
daemon->service_status = *m->daemon_status;
daemon->service_status_stamp = now;
}
daemon->last_service_beacon = now;
} else if (m->daemon_status) {
derr << "got status from non-daemon " << key << dendl;
}
}
// if there are any schema updates, notify the python modules
if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
ostringstream oss;
@ -427,17 +447,6 @@ bool DaemonServer::handle_report(MMgrReport *m)
py_modules.notify_all("perf_schema_update", oss.str());
}
if (daemon->service_daemon) {
utime_t now = ceph_clock_now();
if (m->daemon_status) {
daemon->service_status = *m->daemon_status;
daemon->service_status_stamp = now;
}
daemon->last_service_beacon = now;
} else if (m->daemon_status) {
derr << "got status from non-daemon " << key << dendl;
}
m->put();
return true;
}
@ -512,6 +521,7 @@ bool DaemonServer::_allowed_command(
bool DaemonServer::handle_command(MCommand *m)
{
Mutex::Locker l(lock);
int r = 0;
std::stringstream ss;
std::string prefix;