mgr, mon: allow normal ceph services to register with manager

Additionally, introduce `task status` field in manager report
messages to forward status of executing tasks in daemons (e.g.,
status of executing scrubs in ceph metadata servers).

`task status` makes its way upto service map which is then used
to display the relevant information in ceph status.

Signed-off-by: Venky Shankar <vshankar@redhat.com>
This commit is contained in:
Venky Shankar 2019-07-02 04:11:35 -04:00
parent 7265b55d09
commit 5c25a01864
8 changed files with 189 additions and 85 deletions

View File

@ -73,7 +73,7 @@ WRITE_CLASS_ENCODER(PerfCounterType)
class MMgrReport : public Message { class MMgrReport : public Message {
private: private:
static constexpr int HEAD_VERSION = 7; static constexpr int HEAD_VERSION = 8;
static constexpr int COMPAT_VERSION = 1; static constexpr int COMPAT_VERSION = 1;
public: public:
@ -98,6 +98,7 @@ public:
// for service registration // for service registration
boost::optional<std::map<std::string,std::string>> daemon_status; boost::optional<std::map<std::string,std::string>> daemon_status;
boost::optional<std::map<std::string,std::string>> task_status;
std::vector<DaemonHealthMetric> daemon_health_metrics; std::vector<DaemonHealthMetric> daemon_health_metrics;
@ -128,6 +129,9 @@ public:
if (header.version >= 7) { if (header.version >= 7) {
decode(osd_perf_metric_reports, p); decode(osd_perf_metric_reports, p);
} }
if (header.version >= 8) {
decode(task_status, p);
}
} }
void encode_payload(uint64_t features) override { void encode_payload(uint64_t features) override {
@ -141,6 +145,7 @@ public:
encode(daemon_health_metrics, payload); encode(daemon_health_metrics, payload);
encode(config_bl, payload); encode(config_bl, payload);
encode(osd_perf_metric_reports, payload); encode(osd_perf_metric_reports, payload);
encode(task_status, payload);
} }
std::string_view get_type_name() const override { return "mgrreport"; } std::string_view get_type_name() const override { return "mgrreport"; }
@ -161,6 +166,9 @@ public:
if (!daemon_health_metrics.empty()) { if (!daemon_health_metrics.empty()) {
out << " daemon_metrics=" << daemon_health_metrics.size(); out << " daemon_metrics=" << daemon_health_metrics.size();
} }
if (task_status) {
out << " task_status=" << task_status->size();
}
out << ")"; out << ")";
} }

View File

@ -42,7 +42,14 @@
#undef dout_prefix #undef dout_prefix
#define dout_prefix *_dout << "mgr.server " << __func__ << " " #define dout_prefix *_dout << "mgr.server " << __func__ << " "
namespace {
template <typename Map>
bool map_compare(Map const &lhs, Map const &rhs) {
return lhs.size() == rhs.size()
&& std::equal(lhs.begin(), lhs.end(), rhs.begin(),
[] (auto a, auto b) { return a.first == b.first && a.second == b.second; });
}
}
DaemonServer::DaemonServer(MonClient *monc_, DaemonServer::DaemonServer(MonClient *monc_,
Finisher &finisher_, Finisher &finisher_,
@ -415,6 +422,7 @@ bool DaemonServer::handle_open(const ref_t<MMgrOpen>& m)
std::lock_guard l(daemon->lock); std::lock_guard l(daemon->lock);
daemon->perf_counters.clear(); daemon->perf_counters.clear();
daemon->service_daemon = m->service_daemon;
if (m->service_daemon) { if (m->service_daemon) {
daemon->set_metadata(m->daemon_metadata); daemon->set_metadata(m->daemon_metadata);
daemon->service_status = m->daemon_status; daemon->service_status = m->daemon_status;
@ -422,7 +430,7 @@ bool DaemonServer::handle_open(const ref_t<MMgrOpen>& m)
utime_t now = ceph_clock_now(); utime_t now = ceph_clock_now();
auto d = pending_service_map.get_daemon(m->service_name, auto d = pending_service_map.get_daemon(m->service_name,
m->daemon_name); m->daemon_name);
if (d->gid != (uint64_t)m->get_source().num()) { if (!d->gid || d->gid != (uint64_t)m->get_source().num()) {
dout(10) << "registering " << key << " in pending_service_map" << dendl; dout(10) << "registering " << key << " in pending_service_map" << dendl;
d->gid = m->get_source().num(); d->gid = m->get_source().num();
d->addr = m->get_source_addr(); d->addr = m->get_source_addr();
@ -506,98 +514,114 @@ bool DaemonServer::handle_report(const ref_t<MMgrReport>& m)
return true; return true;
} }
// Look up the DaemonState
DaemonStatePtr daemon;
if (daemon_state.exists(key)) {
dout(20) << "updating existing DaemonState for " << key << dendl;
daemon = daemon_state.get(key);
} else {
// we don't know the hostname at this stage, reject MMgrReport here.
dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
<< dendl;
// issue metadata request in background {
if (!daemon_state.is_updating(key) && lock.lock();
(key.first == "osd" || key.first == "mds" || key.first == "mon")) {
std::ostringstream oss; DaemonStatePtr daemon;
auto c = new MetadataUpdate(daemon_state, key); // Look up the DaemonState
if (key.first == "osd") { if (daemon_state.exists(key)) {
oss << "{\"prefix\": \"osd metadata\", \"id\": " dout(20) << "updating existing DaemonState for " << key << dendl;
<< key.second<< "}"; daemon = daemon_state.get(key);
} else {
lock.unlock();
} else if (key.first == "mds") { // we don't know the hostname at this stage, reject MMgrReport here.
c->set_default("addr", stringify(m->get_source_addr())); dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
oss << "{\"prefix\": \"mds metadata\", \"who\": \"" << dendl;
<< key.second << "\"}"; // issue metadata request in background
if (!daemon_state.is_updating(key) &&
(key.first == "osd" || key.first == "mds" || key.first == "mon")) {
} else if (key.first == "mon") { std::ostringstream oss;
oss << "{\"prefix\": \"mon metadata\", \"id\": \"" auto c = new MetadataUpdate(daemon_state, key);
<< key.second << "\"}"; if (key.first == "osd") {
} else { oss << "{\"prefix\": \"osd metadata\", \"id\": "
ceph_abort(); << key.second<< "}";
} else if (key.first == "mds") {
c->set_default("addr", stringify(m->get_source_addr()));
oss << "{\"prefix\": \"mds metadata\", \"who\": \""
<< key.second << "\"}";
} else if (key.first == "mon") {
oss << "{\"prefix\": \"mon metadata\", \"id\": \""
<< key.second << "\"}";
} else {
ceph_abort();
}
monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
} }
monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c); lock.lock();
}
{
std::lock_guard l(lock);
// kill session // kill session
auto priv = m->get_connection()->get_priv(); auto priv = m->get_connection()->get_priv();
auto session = static_cast<MgrSession*>(priv.get()); auto session = static_cast<MgrSession*>(priv.get());
if (!session) { if (!session) {
return false; return false;
} }
m->get_connection()->mark_down(); m->get_connection()->mark_down();
dout(10) << "unregistering osd." << session->osd_id dout(10) << "unregistering osd." << session->osd_id
<< " session " << session << " con " << m->get_connection() << dendl; << " session " << session << " con " << m->get_connection() << dendl;
if (osd_cons.find(session->osd_id) != osd_cons.end()) { if (osd_cons.find(session->osd_id) != osd_cons.end()) {
osd_cons[session->osd_id].erase(m->get_connection()); osd_cons[session->osd_id].erase(m->get_connection());
} }
auto iter = daemon_connections.find(m->get_connection()); auto iter = daemon_connections.find(m->get_connection());
if (iter != daemon_connections.end()) { if (iter != daemon_connections.end()) {
daemon_connections.erase(iter); daemon_connections.erase(iter);
}
lock.unlock();
return false;
}
// Update the DaemonState
ceph_assert(daemon != nullptr);
{
std::lock_guard l(daemon->lock);
auto &daemon_counters = daemon->perf_counters;
daemon_counters.update(*m.get());
auto p = m->config_bl.cbegin();
if (p != m->config_bl.end()) {
decode(daemon->config, p);
decode(daemon->ignored_mon_config, p);
dout(20) << " got config " << daemon->config
<< " ignored " << daemon->ignored_mon_config << dendl;
}
if (daemon->service_daemon) {
utime_t now = ceph_clock_now();
if (m->daemon_status) {
daemon->service_status_stamp = now;
daemon->service_status = *m->daemon_status;
}
if (m->task_status && !map_compare(daemon->task_status, *m->task_status)) {
auto d = pending_service_map.get_daemon(m->service_name, m->daemon_name);
if (d->gid) {
daemon->task_status = *m->task_status;
d->task_status = *m->task_status;
pending_service_map_dirty = pending_service_map.epoch;
}
}
daemon->last_service_beacon = now;
} else if (m->daemon_status) {
derr << "got status from non-daemon " << key << dendl;
}
if (m->get_connection()->peer_is_osd() || m->get_connection()->peer_is_mon()) {
// only OSD and MON send health_checks to me now
daemon->daemon_health_metrics = std::move(m->daemon_health_metrics);
dout(10) << "daemon_health_metrics " << daemon->daemon_health_metrics
<< dendl;
} }
} }
return false; lock.unlock();
}
// Update the DaemonState
ceph_assert(daemon != nullptr);
{
std::lock_guard l(daemon->lock);
auto &daemon_counters = daemon->perf_counters;
daemon_counters.update(*m.get());
auto p = m->config_bl.cbegin();
if (p != m->config_bl.end()) {
decode(daemon->config, p);
decode(daemon->ignored_mon_config, p);
dout(20) << " got config " << daemon->config
<< " ignored " << daemon->ignored_mon_config << dendl;
}
if (daemon->service_daemon) {
utime_t now = ceph_clock_now();
if (m->daemon_status) {
daemon->service_status = *m->daemon_status;
daemon->service_status_stamp = now;
}
daemon->last_service_beacon = now;
} else if (m->daemon_status) {
derr << "got status from non-daemon " << key << dendl;
}
if (m->get_connection()->peer_is_osd() || m->get_connection()->peer_is_mon()) {
// only OSD and MON send health_checks to me now
daemon->daemon_health_metrics = std::move(m->daemon_health_metrics);
dout(10) << "daemon_health_metrics " << daemon->daemon_health_metrics
<< dendl;
}
} }
// if there are any schema updates, notify the python modules // if there are any schema updates, notify the python modules

View File

@ -147,6 +147,7 @@ class DaemonState
bool service_daemon = false; bool service_daemon = false;
utime_t service_status_stamp; utime_t service_status_stamp;
std::map<std::string, std::string> service_status; std::map<std::string, std::string> service_status;
std::map<std::string, std::string> task_status;
utime_t last_service_beacon; utime_t last_service_beacon;
// running config // running config

View File

@ -349,6 +349,11 @@ void MgrClient::_send_report()
daemon_dirty_status = false; daemon_dirty_status = false;
} }
if (task_dirty_status) {
report->task_status = task_status;
task_dirty_status = false;
}
report->daemon_health_metrics = std::move(daemon_health_metrics); report->daemon_health_metrics = std::move(daemon_health_metrics);
cct->_conf.get_config_bl(last_config_bl_version, &report->config_bl, cct->_conf.get_config_bl(last_config_bl_version, &report->config_bl,
@ -478,14 +483,6 @@ int MgrClient::service_daemon_register(
const std::map<std::string,std::string>& metadata) const std::map<std::string,std::string>& metadata)
{ {
std::lock_guard l(lock); std::lock_guard l(lock);
if (service == "osd" ||
service == "mds" ||
service == "client" ||
service == "mon" ||
service == "mgr") {
// normal ceph entity types are not allowed!
return -EINVAL;
}
if (service_daemon) { if (service_daemon) {
return -EEXIST; return -EEXIST;
} }
@ -514,6 +511,15 @@ int MgrClient::service_daemon_update_status(
return 0; return 0;
} }
int MgrClient::service_daemon_update_task_status(
std::map<std::string,std::string> &&status) {
std::lock_guard l(lock);
ldout(cct,10) << status << dendl;
task_status = std::move(status);
task_dirty_status = true;
return 0;
}
void MgrClient::update_daemon_health(std::vector<DaemonHealthMetric>&& metrics) void MgrClient::update_daemon_health(std::vector<DaemonHealthMetric>&& metrics)
{ {
std::lock_guard l(lock); std::lock_guard l(lock);

View File

@ -88,9 +88,11 @@ protected:
// for service registration and beacon // for service registration and beacon
bool service_daemon = false; bool service_daemon = false;
bool daemon_dirty_status = false; bool daemon_dirty_status = false;
bool task_dirty_status = false;
std::string service_name, daemon_name; std::string service_name, daemon_name;
std::map<std::string,std::string> daemon_metadata; std::map<std::string,std::string> daemon_metadata;
std::map<std::string,std::string> daemon_status; std::map<std::string,std::string> daemon_status;
std::map<std::string,std::string> task_status;
std::vector<DaemonHealthMetric> daemon_health_metrics; std::vector<DaemonHealthMetric> daemon_health_metrics;
void reconnect(); void reconnect();
@ -148,6 +150,8 @@ public:
const std::map<std::string,std::string>& metadata); const std::map<std::string,std::string>& metadata);
int service_daemon_update_status( int service_daemon_update_status(
std::map<std::string,std::string>&& status); std::map<std::string,std::string>&& status);
int service_daemon_update_task_status(
std::map<std::string,std::string> &&task_status);
void update_daemon_health(std::vector<DaemonHealthMetric>&& metrics); void update_daemon_health(std::vector<DaemonHealthMetric>&& metrics);
private: private:

View File

@ -12,23 +12,27 @@ using ceph::Formatter;
void ServiceMap::Daemon::encode(bufferlist& bl, uint64_t features) const void ServiceMap::Daemon::encode(bufferlist& bl, uint64_t features) const
{ {
ENCODE_START(1, 1, bl); ENCODE_START(2, 1, bl);
encode(gid, bl); encode(gid, bl);
encode(addr, bl, features); encode(addr, bl, features);
encode(start_epoch, bl); encode(start_epoch, bl);
encode(start_stamp, bl); encode(start_stamp, bl);
encode(metadata, bl); encode(metadata, bl);
encode(task_status, bl);
ENCODE_FINISH(bl); ENCODE_FINISH(bl);
} }
void ServiceMap::Daemon::decode(bufferlist::const_iterator& p) void ServiceMap::Daemon::decode(bufferlist::const_iterator& p)
{ {
DECODE_START(1, p); DECODE_START(2, p);
decode(gid, p); decode(gid, p);
decode(addr, p); decode(addr, p);
decode(start_epoch, p); decode(start_epoch, p);
decode(start_stamp, p); decode(start_stamp, p);
decode(metadata, p); decode(metadata, p);
if (struct_v >= 2) {
decode(task_status, p);
}
DECODE_FINISH(p); DECODE_FINISH(p);
} }
@ -36,13 +40,18 @@ void ServiceMap::Daemon::dump(Formatter *f) const
{ {
f->dump_unsigned("start_epoch", start_epoch); f->dump_unsigned("start_epoch", start_epoch);
f->dump_stream("start_stamp") << start_stamp; f->dump_stream("start_stamp") << start_stamp;
f->dump_unsigned("gid", gid); f->dump_unsigned("gid", *gid);
f->dump_string("addr", addr.get_legacy_str()); f->dump_string("addr", addr.get_legacy_str());
f->open_object_section("metadata"); f->open_object_section("metadata");
for (auto& p : metadata) { for (auto& p : metadata) {
f->dump_string(p.first.c_str(), p.second); f->dump_string(p.first.c_str(), p.second);
} }
f->close_section(); f->close_section();
f->open_object_section("task_status");
for (auto& p : task_status) {
f->dump_string(p.first.c_str(), p.second);
}
f->close_section();
} }
void ServiceMap::Daemon::generate_test_instances(std::list<Daemon*>& ls) void ServiceMap::Daemon::generate_test_instances(std::list<Daemon*>& ls)
@ -51,6 +60,7 @@ void ServiceMap::Daemon::generate_test_instances(std::list<Daemon*>& ls)
ls.push_back(new Daemon); ls.push_back(new Daemon);
ls.back()->gid = 222; ls.back()->gid = 222;
ls.back()->metadata["this"] = "that"; ls.back()->metadata["this"] = "that";
ls.back()->task_status["task1"] = "running";
} }
// Service // Service

View File

@ -12,17 +12,20 @@
#include "include/buffer.h" #include "include/buffer.h"
#include "msg/msg_types.h" #include "msg/msg_types.h"
#include <boost/optional.hpp>
namespace ceph { namespace ceph {
class Formatter; class Formatter;
} }
struct ServiceMap { struct ServiceMap {
struct Daemon { struct Daemon {
uint64_t gid = 0; boost::optional<uint64_t> gid;
entity_addr_t addr; entity_addr_t addr;
epoch_t start_epoch = 0; ///< epoch first registered epoch_t start_epoch = 0; ///< epoch first registered
utime_t start_stamp; ///< timestamp daemon started/registered utime_t start_stamp; ///< timestamp daemon started/registered
std::map<std::string,std::string> metadata; ///< static metadata std::map<std::string,std::string> metadata; ///< static metadata
std::map<std::string,std::string> task_status; ///< running task status
void encode(ceph::buffer::list& bl, uint64_t features) const; void encode(ceph::buffer::list& bl, uint64_t features) const;
void decode(ceph::buffer::list::const_iterator& p); void decode(ceph::buffer::list::const_iterator& p);
@ -64,6 +67,33 @@ struct ServiceMap {
return ss.str(); return ss.str();
} }
std::string get_task_summary(const std::string_view task_prefix) const {
// contruct a map similar to:
// {"service1 status" -> {"service1.0" -> "running"}}
// {"service2 status" -> {"service2.0" -> "idle"},
// {"service2.1" -> "running"}}
std::map<std::string, std::map<std::string, std::string>> by_task;
for (const auto &p : daemons) {
std::stringstream d;
d << task_prefix << "." << p.first;
for (const auto &q : p.second.task_status) {
auto p1 = by_task.emplace(q.first, std::map<std::string, std::string>{}).first;
auto p2 = p1->second.emplace(d.str(), std::string()).first;
p2->second = q.second;
}
}
std::stringstream ss;
for (const auto &p : by_task) {
ss << "\n " << p.first << ":";
for (auto q : p.second) {
ss << "\n " << q.first << ": " << q.second;
}
}
return ss.str();
}
void count_metadata(const std::string& field, void count_metadata(const std::string& field,
std::map<std::string,int> *out) const { std::map<std::string,int> *out) const {
for (auto& p : daemons) { for (auto& p : daemons) {

View File

@ -3018,11 +3018,32 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' ')); osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
ss << "\n"; ss << "\n";
for (auto& p : service_map.services) { for (auto& p : service_map.services) {
const std::string &service = p.first;
// filter out normal ceph entity types
if (service == "osd" ||
service == "client" ||
service == "mon" ||
service == "mds" ||
service == "mgr") {
continue;
}
ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ') ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ')
<< p.second.get_summary() << "\n"; << p.second.get_summary() << "\n";
} }
} }
{
auto& service_map = mgrstatmon()->get_service_map();
if (!service_map.services.empty()) {
ss << "\n \n task status:\n";
{
for (auto &p : service_map.services) {
ss << p.second.get_task_summary(p.first);
}
}
}
}
ss << "\n \n data:\n"; ss << "\n \n data:\n";
mgrstatmon()->print_summary(NULL, &ss); mgrstatmon()->print_summary(NULL, &ss);