mirror of
https://github.com/ceph/ceph
synced 2025-04-08 10:42:01 +00:00
mgr, mon: allow normal ceph services to register with manager
Additionally, introduce `task status` field in manager report messages to forward status of executing tasks in daemons (e.g., status of executing scrubs in ceph metadata servers). `task status` makes its way upto service map which is then used to display the relevant information in ceph status. Signed-off-by: Venky Shankar <vshankar@redhat.com>
This commit is contained in:
parent
7265b55d09
commit
5c25a01864
@ -73,7 +73,7 @@ WRITE_CLASS_ENCODER(PerfCounterType)
|
||||
|
||||
class MMgrReport : public Message {
|
||||
private:
|
||||
static constexpr int HEAD_VERSION = 7;
|
||||
static constexpr int HEAD_VERSION = 8;
|
||||
static constexpr int COMPAT_VERSION = 1;
|
||||
|
||||
public:
|
||||
@ -98,6 +98,7 @@ public:
|
||||
|
||||
// for service registration
|
||||
boost::optional<std::map<std::string,std::string>> daemon_status;
|
||||
boost::optional<std::map<std::string,std::string>> task_status;
|
||||
|
||||
std::vector<DaemonHealthMetric> daemon_health_metrics;
|
||||
|
||||
@ -128,6 +129,9 @@ public:
|
||||
if (header.version >= 7) {
|
||||
decode(osd_perf_metric_reports, p);
|
||||
}
|
||||
if (header.version >= 8) {
|
||||
decode(task_status, p);
|
||||
}
|
||||
}
|
||||
|
||||
void encode_payload(uint64_t features) override {
|
||||
@ -141,6 +145,7 @@ public:
|
||||
encode(daemon_health_metrics, payload);
|
||||
encode(config_bl, payload);
|
||||
encode(osd_perf_metric_reports, payload);
|
||||
encode(task_status, payload);
|
||||
}
|
||||
|
||||
std::string_view get_type_name() const override { return "mgrreport"; }
|
||||
@ -161,6 +166,9 @@ public:
|
||||
if (!daemon_health_metrics.empty()) {
|
||||
out << " daemon_metrics=" << daemon_health_metrics.size();
|
||||
}
|
||||
if (task_status) {
|
||||
out << " task_status=" << task_status->size();
|
||||
}
|
||||
out << ")";
|
||||
}
|
||||
|
||||
|
@ -42,7 +42,14 @@
|
||||
#undef dout_prefix
|
||||
#define dout_prefix *_dout << "mgr.server " << __func__ << " "
|
||||
|
||||
|
||||
namespace {
|
||||
template <typename Map>
|
||||
bool map_compare(Map const &lhs, Map const &rhs) {
|
||||
return lhs.size() == rhs.size()
|
||||
&& std::equal(lhs.begin(), lhs.end(), rhs.begin(),
|
||||
[] (auto a, auto b) { return a.first == b.first && a.second == b.second; });
|
||||
}
|
||||
}
|
||||
|
||||
DaemonServer::DaemonServer(MonClient *monc_,
|
||||
Finisher &finisher_,
|
||||
@ -415,6 +422,7 @@ bool DaemonServer::handle_open(const ref_t<MMgrOpen>& m)
|
||||
std::lock_guard l(daemon->lock);
|
||||
daemon->perf_counters.clear();
|
||||
|
||||
daemon->service_daemon = m->service_daemon;
|
||||
if (m->service_daemon) {
|
||||
daemon->set_metadata(m->daemon_metadata);
|
||||
daemon->service_status = m->daemon_status;
|
||||
@ -422,7 +430,7 @@ bool DaemonServer::handle_open(const ref_t<MMgrOpen>& m)
|
||||
utime_t now = ceph_clock_now();
|
||||
auto d = pending_service_map.get_daemon(m->service_name,
|
||||
m->daemon_name);
|
||||
if (d->gid != (uint64_t)m->get_source().num()) {
|
||||
if (!d->gid || d->gid != (uint64_t)m->get_source().num()) {
|
||||
dout(10) << "registering " << key << " in pending_service_map" << dendl;
|
||||
d->gid = m->get_source().num();
|
||||
d->addr = m->get_source_addr();
|
||||
@ -506,98 +514,114 @@ bool DaemonServer::handle_report(const ref_t<MMgrReport>& m)
|
||||
return true;
|
||||
}
|
||||
|
||||
// Look up the DaemonState
|
||||
DaemonStatePtr daemon;
|
||||
if (daemon_state.exists(key)) {
|
||||
dout(20) << "updating existing DaemonState for " << key << dendl;
|
||||
daemon = daemon_state.get(key);
|
||||
} else {
|
||||
// we don't know the hostname at this stage, reject MMgrReport here.
|
||||
dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
|
||||
<< dendl;
|
||||
|
||||
// issue metadata request in background
|
||||
if (!daemon_state.is_updating(key) &&
|
||||
(key.first == "osd" || key.first == "mds" || key.first == "mon")) {
|
||||
{
|
||||
lock.lock();
|
||||
|
||||
std::ostringstream oss;
|
||||
auto c = new MetadataUpdate(daemon_state, key);
|
||||
if (key.first == "osd") {
|
||||
oss << "{\"prefix\": \"osd metadata\", \"id\": "
|
||||
<< key.second<< "}";
|
||||
DaemonStatePtr daemon;
|
||||
// Look up the DaemonState
|
||||
if (daemon_state.exists(key)) {
|
||||
dout(20) << "updating existing DaemonState for " << key << dendl;
|
||||
daemon = daemon_state.get(key);
|
||||
} else {
|
||||
lock.unlock();
|
||||
|
||||
} else if (key.first == "mds") {
|
||||
c->set_default("addr", stringify(m->get_source_addr()));
|
||||
oss << "{\"prefix\": \"mds metadata\", \"who\": \""
|
||||
<< key.second << "\"}";
|
||||
// we don't know the hostname at this stage, reject MMgrReport here.
|
||||
dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
|
||||
<< dendl;
|
||||
// issue metadata request in background
|
||||
if (!daemon_state.is_updating(key) &&
|
||||
(key.first == "osd" || key.first == "mds" || key.first == "mon")) {
|
||||
|
||||
std::ostringstream oss;
|
||||
auto c = new MetadataUpdate(daemon_state, key);
|
||||
if (key.first == "osd") {
|
||||
oss << "{\"prefix\": \"osd metadata\", \"id\": "
|
||||
<< key.second<< "}";
|
||||
|
||||
} else if (key.first == "mds") {
|
||||
c->set_default("addr", stringify(m->get_source_addr()));
|
||||
oss << "{\"prefix\": \"mds metadata\", \"who\": \""
|
||||
<< key.second << "\"}";
|
||||
|
||||
} else if (key.first == "mon") {
|
||||
oss << "{\"prefix\": \"mon metadata\", \"id\": \""
|
||||
<< key.second << "\"}";
|
||||
} else {
|
||||
ceph_abort();
|
||||
} else if (key.first == "mon") {
|
||||
oss << "{\"prefix\": \"mon metadata\", \"id\": \""
|
||||
<< key.second << "\"}";
|
||||
} else {
|
||||
ceph_abort();
|
||||
}
|
||||
|
||||
monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
|
||||
}
|
||||
|
||||
monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard l(lock);
|
||||
lock.lock();
|
||||
|
||||
// kill session
|
||||
auto priv = m->get_connection()->get_priv();
|
||||
auto session = static_cast<MgrSession*>(priv.get());
|
||||
if (!session) {
|
||||
return false;
|
||||
return false;
|
||||
}
|
||||
m->get_connection()->mark_down();
|
||||
|
||||
dout(10) << "unregistering osd." << session->osd_id
|
||||
<< " session " << session << " con " << m->get_connection() << dendl;
|
||||
<< " session " << session << " con " << m->get_connection() << dendl;
|
||||
|
||||
if (osd_cons.find(session->osd_id) != osd_cons.end()) {
|
||||
osd_cons[session->osd_id].erase(m->get_connection());
|
||||
}
|
||||
osd_cons[session->osd_id].erase(m->get_connection());
|
||||
}
|
||||
|
||||
auto iter = daemon_connections.find(m->get_connection());
|
||||
if (iter != daemon_connections.end()) {
|
||||
daemon_connections.erase(iter);
|
||||
daemon_connections.erase(iter);
|
||||
}
|
||||
|
||||
lock.unlock();
|
||||
return false;
|
||||
}
|
||||
|
||||
// Update the DaemonState
|
||||
ceph_assert(daemon != nullptr);
|
||||
{
|
||||
std::lock_guard l(daemon->lock);
|
||||
auto &daemon_counters = daemon->perf_counters;
|
||||
daemon_counters.update(*m.get());
|
||||
|
||||
auto p = m->config_bl.cbegin();
|
||||
if (p != m->config_bl.end()) {
|
||||
decode(daemon->config, p);
|
||||
decode(daemon->ignored_mon_config, p);
|
||||
dout(20) << " got config " << daemon->config
|
||||
<< " ignored " << daemon->ignored_mon_config << dendl;
|
||||
}
|
||||
|
||||
if (daemon->service_daemon) {
|
||||
utime_t now = ceph_clock_now();
|
||||
if (m->daemon_status) {
|
||||
daemon->service_status_stamp = now;
|
||||
daemon->service_status = *m->daemon_status;
|
||||
}
|
||||
if (m->task_status && !map_compare(daemon->task_status, *m->task_status)) {
|
||||
auto d = pending_service_map.get_daemon(m->service_name, m->daemon_name);
|
||||
if (d->gid) {
|
||||
daemon->task_status = *m->task_status;
|
||||
d->task_status = *m->task_status;
|
||||
pending_service_map_dirty = pending_service_map.epoch;
|
||||
}
|
||||
}
|
||||
daemon->last_service_beacon = now;
|
||||
} else if (m->daemon_status) {
|
||||
derr << "got status from non-daemon " << key << dendl;
|
||||
}
|
||||
if (m->get_connection()->peer_is_osd() || m->get_connection()->peer_is_mon()) {
|
||||
// only OSD and MON send health_checks to me now
|
||||
daemon->daemon_health_metrics = std::move(m->daemon_health_metrics);
|
||||
dout(10) << "daemon_health_metrics " << daemon->daemon_health_metrics
|
||||
<< dendl;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// Update the DaemonState
|
||||
ceph_assert(daemon != nullptr);
|
||||
{
|
||||
std::lock_guard l(daemon->lock);
|
||||
auto &daemon_counters = daemon->perf_counters;
|
||||
daemon_counters.update(*m.get());
|
||||
|
||||
auto p = m->config_bl.cbegin();
|
||||
if (p != m->config_bl.end()) {
|
||||
decode(daemon->config, p);
|
||||
decode(daemon->ignored_mon_config, p);
|
||||
dout(20) << " got config " << daemon->config
|
||||
<< " ignored " << daemon->ignored_mon_config << dendl;
|
||||
}
|
||||
|
||||
if (daemon->service_daemon) {
|
||||
utime_t now = ceph_clock_now();
|
||||
if (m->daemon_status) {
|
||||
daemon->service_status = *m->daemon_status;
|
||||
daemon->service_status_stamp = now;
|
||||
}
|
||||
daemon->last_service_beacon = now;
|
||||
} else if (m->daemon_status) {
|
||||
derr << "got status from non-daemon " << key << dendl;
|
||||
}
|
||||
if (m->get_connection()->peer_is_osd() || m->get_connection()->peer_is_mon()) {
|
||||
// only OSD and MON send health_checks to me now
|
||||
daemon->daemon_health_metrics = std::move(m->daemon_health_metrics);
|
||||
dout(10) << "daemon_health_metrics " << daemon->daemon_health_metrics
|
||||
<< dendl;
|
||||
}
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
// if there are any schema updates, notify the python modules
|
||||
|
@ -147,6 +147,7 @@ class DaemonState
|
||||
bool service_daemon = false;
|
||||
utime_t service_status_stamp;
|
||||
std::map<std::string, std::string> service_status;
|
||||
std::map<std::string, std::string> task_status;
|
||||
utime_t last_service_beacon;
|
||||
|
||||
// running config
|
||||
|
@ -349,6 +349,11 @@ void MgrClient::_send_report()
|
||||
daemon_dirty_status = false;
|
||||
}
|
||||
|
||||
if (task_dirty_status) {
|
||||
report->task_status = task_status;
|
||||
task_dirty_status = false;
|
||||
}
|
||||
|
||||
report->daemon_health_metrics = std::move(daemon_health_metrics);
|
||||
|
||||
cct->_conf.get_config_bl(last_config_bl_version, &report->config_bl,
|
||||
@ -478,14 +483,6 @@ int MgrClient::service_daemon_register(
|
||||
const std::map<std::string,std::string>& metadata)
|
||||
{
|
||||
std::lock_guard l(lock);
|
||||
if (service == "osd" ||
|
||||
service == "mds" ||
|
||||
service == "client" ||
|
||||
service == "mon" ||
|
||||
service == "mgr") {
|
||||
// normal ceph entity types are not allowed!
|
||||
return -EINVAL;
|
||||
}
|
||||
if (service_daemon) {
|
||||
return -EEXIST;
|
||||
}
|
||||
@ -514,6 +511,15 @@ int MgrClient::service_daemon_update_status(
|
||||
return 0;
|
||||
}
|
||||
|
||||
int MgrClient::service_daemon_update_task_status(
|
||||
std::map<std::string,std::string> &&status) {
|
||||
std::lock_guard l(lock);
|
||||
ldout(cct,10) << status << dendl;
|
||||
task_status = std::move(status);
|
||||
task_dirty_status = true;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void MgrClient::update_daemon_health(std::vector<DaemonHealthMetric>&& metrics)
|
||||
{
|
||||
std::lock_guard l(lock);
|
||||
|
@ -88,9 +88,11 @@ protected:
|
||||
// for service registration and beacon
|
||||
bool service_daemon = false;
|
||||
bool daemon_dirty_status = false;
|
||||
bool task_dirty_status = false;
|
||||
std::string service_name, daemon_name;
|
||||
std::map<std::string,std::string> daemon_metadata;
|
||||
std::map<std::string,std::string> daemon_status;
|
||||
std::map<std::string,std::string> task_status;
|
||||
std::vector<DaemonHealthMetric> daemon_health_metrics;
|
||||
|
||||
void reconnect();
|
||||
@ -148,6 +150,8 @@ public:
|
||||
const std::map<std::string,std::string>& metadata);
|
||||
int service_daemon_update_status(
|
||||
std::map<std::string,std::string>&& status);
|
||||
int service_daemon_update_task_status(
|
||||
std::map<std::string,std::string> &&task_status);
|
||||
void update_daemon_health(std::vector<DaemonHealthMetric>&& metrics);
|
||||
|
||||
private:
|
||||
|
@ -12,23 +12,27 @@ using ceph::Formatter;
|
||||
|
||||
void ServiceMap::Daemon::encode(bufferlist& bl, uint64_t features) const
|
||||
{
|
||||
ENCODE_START(1, 1, bl);
|
||||
ENCODE_START(2, 1, bl);
|
||||
encode(gid, bl);
|
||||
encode(addr, bl, features);
|
||||
encode(start_epoch, bl);
|
||||
encode(start_stamp, bl);
|
||||
encode(metadata, bl);
|
||||
encode(task_status, bl);
|
||||
ENCODE_FINISH(bl);
|
||||
}
|
||||
|
||||
void ServiceMap::Daemon::decode(bufferlist::const_iterator& p)
|
||||
{
|
||||
DECODE_START(1, p);
|
||||
DECODE_START(2, p);
|
||||
decode(gid, p);
|
||||
decode(addr, p);
|
||||
decode(start_epoch, p);
|
||||
decode(start_stamp, p);
|
||||
decode(metadata, p);
|
||||
if (struct_v >= 2) {
|
||||
decode(task_status, p);
|
||||
}
|
||||
DECODE_FINISH(p);
|
||||
}
|
||||
|
||||
@ -36,13 +40,18 @@ void ServiceMap::Daemon::dump(Formatter *f) const
|
||||
{
|
||||
f->dump_unsigned("start_epoch", start_epoch);
|
||||
f->dump_stream("start_stamp") << start_stamp;
|
||||
f->dump_unsigned("gid", gid);
|
||||
f->dump_unsigned("gid", *gid);
|
||||
f->dump_string("addr", addr.get_legacy_str());
|
||||
f->open_object_section("metadata");
|
||||
for (auto& p : metadata) {
|
||||
f->dump_string(p.first.c_str(), p.second);
|
||||
}
|
||||
f->close_section();
|
||||
f->open_object_section("task_status");
|
||||
for (auto& p : task_status) {
|
||||
f->dump_string(p.first.c_str(), p.second);
|
||||
}
|
||||
f->close_section();
|
||||
}
|
||||
|
||||
void ServiceMap::Daemon::generate_test_instances(std::list<Daemon*>& ls)
|
||||
@ -51,6 +60,7 @@ void ServiceMap::Daemon::generate_test_instances(std::list<Daemon*>& ls)
|
||||
ls.push_back(new Daemon);
|
||||
ls.back()->gid = 222;
|
||||
ls.back()->metadata["this"] = "that";
|
||||
ls.back()->task_status["task1"] = "running";
|
||||
}
|
||||
|
||||
// Service
|
||||
|
@ -12,17 +12,20 @@
|
||||
#include "include/buffer.h"
|
||||
#include "msg/msg_types.h"
|
||||
|
||||
#include <boost/optional.hpp>
|
||||
|
||||
namespace ceph {
|
||||
class Formatter;
|
||||
}
|
||||
|
||||
struct ServiceMap {
|
||||
struct Daemon {
|
||||
uint64_t gid = 0;
|
||||
boost::optional<uint64_t> gid;
|
||||
entity_addr_t addr;
|
||||
epoch_t start_epoch = 0; ///< epoch first registered
|
||||
utime_t start_stamp; ///< timestamp daemon started/registered
|
||||
std::map<std::string,std::string> metadata; ///< static metadata
|
||||
std::map<std::string,std::string> task_status; ///< running task status
|
||||
|
||||
void encode(ceph::buffer::list& bl, uint64_t features) const;
|
||||
void decode(ceph::buffer::list::const_iterator& p);
|
||||
@ -64,6 +67,33 @@ struct ServiceMap {
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
std::string get_task_summary(const std::string_view task_prefix) const {
|
||||
// contruct a map similar to:
|
||||
// {"service1 status" -> {"service1.0" -> "running"}}
|
||||
// {"service2 status" -> {"service2.0" -> "idle"},
|
||||
// {"service2.1" -> "running"}}
|
||||
std::map<std::string, std::map<std::string, std::string>> by_task;
|
||||
for (const auto &p : daemons) {
|
||||
std::stringstream d;
|
||||
d << task_prefix << "." << p.first;
|
||||
for (const auto &q : p.second.task_status) {
|
||||
auto p1 = by_task.emplace(q.first, std::map<std::string, std::string>{}).first;
|
||||
auto p2 = p1->second.emplace(d.str(), std::string()).first;
|
||||
p2->second = q.second;
|
||||
}
|
||||
}
|
||||
|
||||
std::stringstream ss;
|
||||
for (const auto &p : by_task) {
|
||||
ss << "\n " << p.first << ":";
|
||||
for (auto q : p.second) {
|
||||
ss << "\n " << q.first << ": " << q.second;
|
||||
}
|
||||
}
|
||||
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
void count_metadata(const std::string& field,
|
||||
std::map<std::string,int> *out) const {
|
||||
for (auto& p : daemons) {
|
||||
|
@ -3018,11 +3018,32 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
|
||||
osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
|
||||
ss << "\n";
|
||||
for (auto& p : service_map.services) {
|
||||
const std::string &service = p.first;
|
||||
// filter out normal ceph entity types
|
||||
if (service == "osd" ||
|
||||
service == "client" ||
|
||||
service == "mon" ||
|
||||
service == "mds" ||
|
||||
service == "mgr") {
|
||||
continue;
|
||||
}
|
||||
ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ')
|
||||
<< p.second.get_summary() << "\n";
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
auto& service_map = mgrstatmon()->get_service_map();
|
||||
if (!service_map.services.empty()) {
|
||||
ss << "\n \n task status:\n";
|
||||
{
|
||||
for (auto &p : service_map.services) {
|
||||
ss << p.second.get_task_summary(p.first);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ss << "\n \n data:\n";
|
||||
mgrstatmon()->print_summary(NULL, &ss);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user