mgr, mon: allow normal ceph services to register with manager

Additionally, introduce `task status` field in manager report
messages to forward status of executing tasks in daemons (e.g.,
status of executing scrubs in ceph metadata servers).

`task status` makes its way upto service map which is then used
to display the relevant information in ceph status.

Signed-off-by: Venky Shankar <vshankar@redhat.com>
This commit is contained in:
Venky Shankar 2019-07-02 04:11:35 -04:00
parent 7265b55d09
commit 5c25a01864
8 changed files with 189 additions and 85 deletions

View File

@ -73,7 +73,7 @@ WRITE_CLASS_ENCODER(PerfCounterType)
class MMgrReport : public Message {
private:
static constexpr int HEAD_VERSION = 7;
static constexpr int HEAD_VERSION = 8;
static constexpr int COMPAT_VERSION = 1;
public:
@ -98,6 +98,7 @@ public:
// for service registration
boost::optional<std::map<std::string,std::string>> daemon_status;
boost::optional<std::map<std::string,std::string>> task_status;
std::vector<DaemonHealthMetric> daemon_health_metrics;
@ -128,6 +129,9 @@ public:
if (header.version >= 7) {
decode(osd_perf_metric_reports, p);
}
if (header.version >= 8) {
decode(task_status, p);
}
}
void encode_payload(uint64_t features) override {
@ -141,6 +145,7 @@ public:
encode(daemon_health_metrics, payload);
encode(config_bl, payload);
encode(osd_perf_metric_reports, payload);
encode(task_status, payload);
}
std::string_view get_type_name() const override { return "mgrreport"; }
@ -161,6 +166,9 @@ public:
if (!daemon_health_metrics.empty()) {
out << " daemon_metrics=" << daemon_health_metrics.size();
}
if (task_status) {
out << " task_status=" << task_status->size();
}
out << ")";
}

View File

@ -42,7 +42,14 @@
#undef dout_prefix
#define dout_prefix *_dout << "mgr.server " << __func__ << " "
namespace {
template <typename Map>
bool map_compare(Map const &lhs, Map const &rhs) {
return lhs.size() == rhs.size()
&& std::equal(lhs.begin(), lhs.end(), rhs.begin(),
[] (auto a, auto b) { return a.first == b.first && a.second == b.second; });
}
}
DaemonServer::DaemonServer(MonClient *monc_,
Finisher &finisher_,
@ -415,6 +422,7 @@ bool DaemonServer::handle_open(const ref_t<MMgrOpen>& m)
std::lock_guard l(daemon->lock);
daemon->perf_counters.clear();
daemon->service_daemon = m->service_daemon;
if (m->service_daemon) {
daemon->set_metadata(m->daemon_metadata);
daemon->service_status = m->daemon_status;
@ -422,7 +430,7 @@ bool DaemonServer::handle_open(const ref_t<MMgrOpen>& m)
utime_t now = ceph_clock_now();
auto d = pending_service_map.get_daemon(m->service_name,
m->daemon_name);
if (d->gid != (uint64_t)m->get_source().num()) {
if (!d->gid || d->gid != (uint64_t)m->get_source().num()) {
dout(10) << "registering " << key << " in pending_service_map" << dendl;
d->gid = m->get_source().num();
d->addr = m->get_source_addr();
@ -506,98 +514,114 @@ bool DaemonServer::handle_report(const ref_t<MMgrReport>& m)
return true;
}
// Look up the DaemonState
DaemonStatePtr daemon;
if (daemon_state.exists(key)) {
dout(20) << "updating existing DaemonState for " << key << dendl;
daemon = daemon_state.get(key);
} else {
// we don't know the hostname at this stage, reject MMgrReport here.
dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
<< dendl;
// issue metadata request in background
if (!daemon_state.is_updating(key) &&
(key.first == "osd" || key.first == "mds" || key.first == "mon")) {
{
lock.lock();
std::ostringstream oss;
auto c = new MetadataUpdate(daemon_state, key);
if (key.first == "osd") {
oss << "{\"prefix\": \"osd metadata\", \"id\": "
<< key.second<< "}";
DaemonStatePtr daemon;
// Look up the DaemonState
if (daemon_state.exists(key)) {
dout(20) << "updating existing DaemonState for " << key << dendl;
daemon = daemon_state.get(key);
} else {
lock.unlock();
} else if (key.first == "mds") {
c->set_default("addr", stringify(m->get_source_addr()));
oss << "{\"prefix\": \"mds metadata\", \"who\": \""
<< key.second << "\"}";
// we don't know the hostname at this stage, reject MMgrReport here.
dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
<< dendl;
// issue metadata request in background
if (!daemon_state.is_updating(key) &&
(key.first == "osd" || key.first == "mds" || key.first == "mon")) {
std::ostringstream oss;
auto c = new MetadataUpdate(daemon_state, key);
if (key.first == "osd") {
oss << "{\"prefix\": \"osd metadata\", \"id\": "
<< key.second<< "}";
} else if (key.first == "mds") {
c->set_default("addr", stringify(m->get_source_addr()));
oss << "{\"prefix\": \"mds metadata\", \"who\": \""
<< key.second << "\"}";
} else if (key.first == "mon") {
oss << "{\"prefix\": \"mon metadata\", \"id\": \""
<< key.second << "\"}";
} else {
ceph_abort();
} else if (key.first == "mon") {
oss << "{\"prefix\": \"mon metadata\", \"id\": \""
<< key.second << "\"}";
} else {
ceph_abort();
}
monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
}
monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
}
{
std::lock_guard l(lock);
lock.lock();
// kill session
auto priv = m->get_connection()->get_priv();
auto session = static_cast<MgrSession*>(priv.get());
if (!session) {
return false;
return false;
}
m->get_connection()->mark_down();
dout(10) << "unregistering osd." << session->osd_id
<< " session " << session << " con " << m->get_connection() << dendl;
<< " session " << session << " con " << m->get_connection() << dendl;
if (osd_cons.find(session->osd_id) != osd_cons.end()) {
osd_cons[session->osd_id].erase(m->get_connection());
}
osd_cons[session->osd_id].erase(m->get_connection());
}
auto iter = daemon_connections.find(m->get_connection());
if (iter != daemon_connections.end()) {
daemon_connections.erase(iter);
daemon_connections.erase(iter);
}
lock.unlock();
return false;
}
// Update the DaemonState
ceph_assert(daemon != nullptr);
{
std::lock_guard l(daemon->lock);
auto &daemon_counters = daemon->perf_counters;
daemon_counters.update(*m.get());
auto p = m->config_bl.cbegin();
if (p != m->config_bl.end()) {
decode(daemon->config, p);
decode(daemon->ignored_mon_config, p);
dout(20) << " got config " << daemon->config
<< " ignored " << daemon->ignored_mon_config << dendl;
}
if (daemon->service_daemon) {
utime_t now = ceph_clock_now();
if (m->daemon_status) {
daemon->service_status_stamp = now;
daemon->service_status = *m->daemon_status;
}
if (m->task_status && !map_compare(daemon->task_status, *m->task_status)) {
auto d = pending_service_map.get_daemon(m->service_name, m->daemon_name);
if (d->gid) {
daemon->task_status = *m->task_status;
d->task_status = *m->task_status;
pending_service_map_dirty = pending_service_map.epoch;
}
}
daemon->last_service_beacon = now;
} else if (m->daemon_status) {
derr << "got status from non-daemon " << key << dendl;
}
if (m->get_connection()->peer_is_osd() || m->get_connection()->peer_is_mon()) {
// only OSD and MON send health_checks to me now
daemon->daemon_health_metrics = std::move(m->daemon_health_metrics);
dout(10) << "daemon_health_metrics " << daemon->daemon_health_metrics
<< dendl;
}
}
return false;
}
// Update the DaemonState
ceph_assert(daemon != nullptr);
{
std::lock_guard l(daemon->lock);
auto &daemon_counters = daemon->perf_counters;
daemon_counters.update(*m.get());
auto p = m->config_bl.cbegin();
if (p != m->config_bl.end()) {
decode(daemon->config, p);
decode(daemon->ignored_mon_config, p);
dout(20) << " got config " << daemon->config
<< " ignored " << daemon->ignored_mon_config << dendl;
}
if (daemon->service_daemon) {
utime_t now = ceph_clock_now();
if (m->daemon_status) {
daemon->service_status = *m->daemon_status;
daemon->service_status_stamp = now;
}
daemon->last_service_beacon = now;
} else if (m->daemon_status) {
derr << "got status from non-daemon " << key << dendl;
}
if (m->get_connection()->peer_is_osd() || m->get_connection()->peer_is_mon()) {
// only OSD and MON send health_checks to me now
daemon->daemon_health_metrics = std::move(m->daemon_health_metrics);
dout(10) << "daemon_health_metrics " << daemon->daemon_health_metrics
<< dendl;
}
lock.unlock();
}
// if there are any schema updates, notify the python modules

View File

@ -147,6 +147,7 @@ class DaemonState
bool service_daemon = false;
utime_t service_status_stamp;
std::map<std::string, std::string> service_status;
std::map<std::string, std::string> task_status;
utime_t last_service_beacon;
// running config

View File

@ -349,6 +349,11 @@ void MgrClient::_send_report()
daemon_dirty_status = false;
}
if (task_dirty_status) {
report->task_status = task_status;
task_dirty_status = false;
}
report->daemon_health_metrics = std::move(daemon_health_metrics);
cct->_conf.get_config_bl(last_config_bl_version, &report->config_bl,
@ -478,14 +483,6 @@ int MgrClient::service_daemon_register(
const std::map<std::string,std::string>& metadata)
{
std::lock_guard l(lock);
if (service == "osd" ||
service == "mds" ||
service == "client" ||
service == "mon" ||
service == "mgr") {
// normal ceph entity types are not allowed!
return -EINVAL;
}
if (service_daemon) {
return -EEXIST;
}
@ -514,6 +511,15 @@ int MgrClient::service_daemon_update_status(
return 0;
}
int MgrClient::service_daemon_update_task_status(
std::map<std::string,std::string> &&status) {
std::lock_guard l(lock);
ldout(cct,10) << status << dendl;
task_status = std::move(status);
task_dirty_status = true;
return 0;
}
void MgrClient::update_daemon_health(std::vector<DaemonHealthMetric>&& metrics)
{
std::lock_guard l(lock);

View File

@ -88,9 +88,11 @@ protected:
// for service registration and beacon
bool service_daemon = false;
bool daemon_dirty_status = false;
bool task_dirty_status = false;
std::string service_name, daemon_name;
std::map<std::string,std::string> daemon_metadata;
std::map<std::string,std::string> daemon_status;
std::map<std::string,std::string> task_status;
std::vector<DaemonHealthMetric> daemon_health_metrics;
void reconnect();
@ -148,6 +150,8 @@ public:
const std::map<std::string,std::string>& metadata);
int service_daemon_update_status(
std::map<std::string,std::string>&& status);
int service_daemon_update_task_status(
std::map<std::string,std::string> &&task_status);
void update_daemon_health(std::vector<DaemonHealthMetric>&& metrics);
private:

View File

@ -12,23 +12,27 @@ using ceph::Formatter;
void ServiceMap::Daemon::encode(bufferlist& bl, uint64_t features) const
{
ENCODE_START(1, 1, bl);
ENCODE_START(2, 1, bl);
encode(gid, bl);
encode(addr, bl, features);
encode(start_epoch, bl);
encode(start_stamp, bl);
encode(metadata, bl);
encode(task_status, bl);
ENCODE_FINISH(bl);
}
void ServiceMap::Daemon::decode(bufferlist::const_iterator& p)
{
DECODE_START(1, p);
DECODE_START(2, p);
decode(gid, p);
decode(addr, p);
decode(start_epoch, p);
decode(start_stamp, p);
decode(metadata, p);
if (struct_v >= 2) {
decode(task_status, p);
}
DECODE_FINISH(p);
}
@ -36,13 +40,18 @@ void ServiceMap::Daemon::dump(Formatter *f) const
{
f->dump_unsigned("start_epoch", start_epoch);
f->dump_stream("start_stamp") << start_stamp;
f->dump_unsigned("gid", gid);
f->dump_unsigned("gid", *gid);
f->dump_string("addr", addr.get_legacy_str());
f->open_object_section("metadata");
for (auto& p : metadata) {
f->dump_string(p.first.c_str(), p.second);
}
f->close_section();
f->open_object_section("task_status");
for (auto& p : task_status) {
f->dump_string(p.first.c_str(), p.second);
}
f->close_section();
}
void ServiceMap::Daemon::generate_test_instances(std::list<Daemon*>& ls)
@ -51,6 +60,7 @@ void ServiceMap::Daemon::generate_test_instances(std::list<Daemon*>& ls)
ls.push_back(new Daemon);
ls.back()->gid = 222;
ls.back()->metadata["this"] = "that";
ls.back()->task_status["task1"] = "running";
}
// Service

View File

@ -12,17 +12,20 @@
#include "include/buffer.h"
#include "msg/msg_types.h"
#include <boost/optional.hpp>
namespace ceph {
class Formatter;
}
struct ServiceMap {
struct Daemon {
uint64_t gid = 0;
boost::optional<uint64_t> gid;
entity_addr_t addr;
epoch_t start_epoch = 0; ///< epoch first registered
utime_t start_stamp; ///< timestamp daemon started/registered
std::map<std::string,std::string> metadata; ///< static metadata
std::map<std::string,std::string> task_status; ///< running task status
void encode(ceph::buffer::list& bl, uint64_t features) const;
void decode(ceph::buffer::list::const_iterator& p);
@ -64,6 +67,33 @@ struct ServiceMap {
return ss.str();
}
std::string get_task_summary(const std::string_view task_prefix) const {
// contruct a map similar to:
// {"service1 status" -> {"service1.0" -> "running"}}
// {"service2 status" -> {"service2.0" -> "idle"},
// {"service2.1" -> "running"}}
std::map<std::string, std::map<std::string, std::string>> by_task;
for (const auto &p : daemons) {
std::stringstream d;
d << task_prefix << "." << p.first;
for (const auto &q : p.second.task_status) {
auto p1 = by_task.emplace(q.first, std::map<std::string, std::string>{}).first;
auto p2 = p1->second.emplace(d.str(), std::string()).first;
p2->second = q.second;
}
}
std::stringstream ss;
for (const auto &p : by_task) {
ss << "\n " << p.first << ":";
for (auto q : p.second) {
ss << "\n " << q.first << ": " << q.second;
}
}
return ss.str();
}
void count_metadata(const std::string& field,
std::map<std::string,int> *out) const {
for (auto& p : daemons) {

View File

@ -3018,11 +3018,32 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
ss << "\n";
for (auto& p : service_map.services) {
const std::string &service = p.first;
// filter out normal ceph entity types
if (service == "osd" ||
service == "client" ||
service == "mon" ||
service == "mds" ||
service == "mgr") {
continue;
}
ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ')
<< p.second.get_summary() << "\n";
}
}
{
auto& service_map = mgrstatmon()->get_service_map();
if (!service_map.services.empty()) {
ss << "\n \n task status:\n";
{
for (auto &p : service_map.services) {
ss << p.second.get_task_summary(p.first);
}
}
}
}
ss << "\n \n data:\n";
mgrstatmon()->print_summary(NULL, &ss);