diff --git a/src/ceph.in b/src/ceph.in index 56422f6ed82..72c58541b0f 100755 --- a/src/ceph.in +++ b/src/ceph.in @@ -475,6 +475,19 @@ def complete(sigdict, args, target): return 0 +### +# ping a monitor +### +def ping_monitor(cluster_handle, name): + if 'mon.' not in name: + print >> sys.stderr, '"ping" expects a monitor to ping; try "ping mon."' + return 1 + + mon_id = name[len('mon.'):] + s = cluster_handle.ping_monitor(mon_id) + print s + return 0 + ### # main ### @@ -603,7 +616,14 @@ def main(): hdr('Monitor commands:') print '[Contacting monitor, timeout after %d seconds]' % timeout + if childargs[0] == 'ping': + if len(childargs) < 2: + print >> sys.stderr, '"ping" requires a monitor name as argument: "ping mon."' + return 1 + try: + if childargs[0] == 'ping': + return ping_monitor(cluster_handle, childargs[1]) cluster_handle.connect(timeout=timeout) except KeyboardInterrupt: print >> sys.stderr, 'Cluster connection aborted' diff --git a/src/include/rados/librados.h b/src/include/rados/librados.h index 515663c2335..a67b85ec3d3 100644 --- a/src/include/rados/librados.h +++ b/src/include/rados/librados.h @@ -223,6 +223,23 @@ int rados_create2(rados_t *pcluster, const char *const clustername, */ int rados_create_with_context(rados_t *cluster, rados_config_t cct); +/** + * Ping the monitor with ID @p mon_id, storing the resulting reply in + * @p buf (if specified) with a maximum size of @p len. + * + * The result buffer is allocated on the heap; the caller is + * expected to release that memory with rados_buffer_free(). The + * buffer and length pointers can be NULL, in which case they are + * not filled in. + * + * @param cluster cluster handle + * @param[in] mon_id ID of the monitor to ping + * @param[out] outstr double pointer with the resulting reply + * @param[out] outstrlen pointer with the size of the reply in @p outstr + */ +int rados_ping_monitor(rados_t cluster, const char *mon_id, + char **outstr, size_t *outstrlen); + /** * Connect to the cluster. * diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc index 1be3ebd10f9..d6700c83d7c 100644 --- a/src/librados/RadosClient.cc +++ b/src/librados/RadosClient.cc @@ -131,6 +131,26 @@ int librados::RadosClient::get_fsid(std::string *s) return 0; } +int librados::RadosClient::ping_monitor(const string mon_id, string *result) +{ + int err = 0; + /* If we haven't yet connected, we have no way of telling whether we + * already built monc's initial monmap. IF we are in CONNECTED state, + * then it is safe to assume that we went through connect(), which does + * build a monmap. + */ + if (state != CONNECTED) { + ldout(cct, 10) << __func__ << " build monmap" << dendl; + err = monclient.build_initial_monmap(); + } + if (err < 0) { + return err; + } + + err = monclient.ping_monitor(mon_id, result); + return err; +} + int librados::RadosClient::connect() { common_init_finish(cct); diff --git a/src/librados/RadosClient.h b/src/librados/RadosClient.h index 2244788d876..7c5c8af1ca0 100644 --- a/src/librados/RadosClient.h +++ b/src/librados/RadosClient.h @@ -77,6 +77,7 @@ public: RadosClient(CephContext *cct_); ~RadosClient(); + int ping_monitor(string mon_id, string *result); int connect(); void shutdown(); diff --git a/src/librados/librados.cc b/src/librados/librados.cc index 217a0a7bfb2..95abbc2f260 100644 --- a/src/librados/librados.cc +++ b/src/librados/librados.cc @@ -1900,6 +1900,22 @@ static void do_out_buffer(string& outbl, char **outbuf, size_t *outbuflen) *outbuflen = outbl.length(); } +extern "C" int rados_ping_monitor(rados_t cluster, const char *mon_id, + char **outstr, size_t *outstrlen) +{ + librados::RadosClient *client = (librados::RadosClient *)cluster; + string str; + + if (!mon_id) + return -EINVAL; + + int ret = client->ping_monitor(mon_id, &str); + if (ret == 0 && !str.empty() && outstr && outstrlen) { + do_out_buffer(str, outstr, outstrlen); + } + return ret; +} + extern "C" int rados_mon_command(rados_t cluster, const char **cmd, size_t cmdlen, const char *inbuf, size_t inbuflen, diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index 38bead8f29f..f35a0dafd44 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -22,6 +22,7 @@ #include "messages/MAuthReply.h" #include "messages/MMonCommand.h" #include "messages/MMonCommandAck.h" +#include "messages/MPing.h" #include "messages/MMonSubscribe.h" #include "messages/MMonSubscribeAck.h" @@ -107,32 +108,31 @@ int MonClient::get_monmap_privately() { ldout(cct, 10) << "get_monmap_privately" << dendl; Mutex::Locker l(monc_lock); - + bool temp_msgr = false; SimpleMessenger* smessenger = NULL; if (!messenger) { messenger = smessenger = new SimpleMessenger(cct, entity_name_t::CLIENT(-1), - "temp_mon_client", - getpid()); + "temp_mon_client", getpid()); messenger->add_dispatcher_head(this); smessenger->start(); - temp_msgr = true; + temp_msgr = true; } - + int attempt = 10; - + ldout(cct, 10) << "have " << monmap.epoch << " fsid " << monmap.fsid << dendl; - + while (monmap.fsid.is_zero()) { cur_mon = _pick_random_mon(); cur_con = messenger->get_connection(monmap.get_inst(cur_mon)); ldout(cct, 10) << "querying mon." << cur_mon << " " << cur_con->get_peer_addr() << dendl; messenger->send_message(new MMonGetMap, cur_con); - + if (--attempt == 0) break; - + utime_t interval; interval.set_from_double(cct->_conf->mon_client_hunt_interval); map_cond.WaitInterval(cct, monc_lock, interval); @@ -153,7 +153,7 @@ int MonClient::get_monmap_privately() messenger = 0; monc_lock.Lock(); } - + hunting = true; // reset this to true! cur_mon.clear(); @@ -165,6 +165,78 @@ int MonClient::get_monmap_privately() } +/** + * Ping the monitor with id @p mon_id and set the resulting reply in + * the provided @p result_reply, if this last parameter is not NULL. + * + * So that we don't rely on the MonClient's default messenger, set up + * during connect(), we create our own messenger to comunicate with the + * specified monitor. This is advantageous in the following ways: + * + * - Isolate the ping procedure from the rest of the MonClient's operations, + * allowing us to not acquire or manage the big monc_lock, thus not + * having to block waiting for some other operation to finish before we + * can proceed. + * * for instance, we can ping mon.FOO even if we are currently hunting + * or blocked waiting for auth to complete with mon.BAR. + * + * - Ping a monitor prior to establishing a connection (using connect()) + * and properly establish the MonClient's messenger. This frees us + * from dealing with the complex foo that happens in connect(). + * + * We also don't rely on MonClient as a dispatcher for this messenger, + * unlike what happens with the MonClient's default messenger. This allows + * us to sandbox the whole ping, having it much as a separate entity in + * the MonClient class, considerably simplifying the handling and dispatching + * of messages without needing to consider monc_lock. + * + * Current drawback is that we will establish a messenger for each ping + * we want to issue, instead of keeping a single messenger instance that + * would be used for all pings. + */ +int MonClient::ping_monitor(const string &mon_id, string *result_reply) +{ + ldout(cct, 10) << __func__ << dendl; + + if (mon_id.empty()) { + ldout(cct, 10) << __func__ << " specified mon id is empty!" << dendl; + return -EINVAL; + } else if (!monmap.contains(mon_id)) { + ldout(cct, 10) << __func__ << " no such monitor 'mon." << mon_id << "'" + << dendl; + return -ENOENT; + } + + MonClientPinger *pinger = new MonClientPinger(cct, result_reply); + + Messenger *smsgr = new SimpleMessenger(cct, + entity_name_t::CLIENT(-1), + "temp_ping_client", getpid()); + smsgr->add_dispatcher_head(pinger); + smsgr->start(); + + ConnectionRef con = smsgr->get_connection(monmap.get_inst(mon_id)); + ldout(cct, 10) << __func__ << " ping mon." << mon_id + << " " << con->get_peer_addr() << dendl; + smsgr->send_message(new MPing, con); + + pinger->lock.Lock(); + int ret = pinger->wait_for_reply(cct->_conf->client_mount_timeout); + if (ret == 0) { + ldout(cct,10) << __func__ << " got ping reply" << dendl; + } else { + ret = -ret; + } + pinger->lock.Unlock(); + + smsgr->mark_down(con); + smsgr->shutdown(); + smsgr->wait(); + delete smsgr; + delete pinger; + return ret; +} + bool MonClient::ms_dispatch(Message *m) { if (my_addr == entity_addr_t()) diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h index 64a399a197a..0246050059b 100644 --- a/src/mon/MonClient.h +++ b/src/mon/MonClient.h @@ -42,6 +42,7 @@ class MMonCommandAck; class MCommandReply; struct MAuthReply; class MAuthRotating; +class MPing; class LogClient; class AuthSupported; class AuthAuthorizeHandlerRegistry; @@ -54,6 +55,58 @@ enum MonClientState { MC_STATE_HAVE_SESSION, }; +struct MonClientPinger : public Dispatcher { + + Mutex lock; + Cond ping_recvd_cond; + string *result; + bool done; + + MonClientPinger(CephContext *cct_, string *res_) : + Dispatcher(cct_), + lock("MonClientPinger::lock"), + result(res_), + done(false) + { } + + int wait_for_reply(double timeout = 0.0) { + utime_t until = ceph_clock_now(cct); + until += (timeout > 0 ? timeout : cct->_conf->client_mount_timeout); + done = false; + + int ret = 0; + while (!done) { + ret = ping_recvd_cond.WaitUntil(lock, until); + if (ret == -ETIMEDOUT) + break; + } + return ret; + } + + bool ms_dispatch(Message *m) { + Mutex::Locker l(lock); + if (m->get_type() != CEPH_MSG_PING) + return false; + + bufferlist &payload = m->get_payload(); + if (result && payload.length() > 0) { + bufferlist::iterator p = payload.begin(); + ::decode(*result, p); + } + done = true; + ping_recvd_cond.SignalAll(); + m->put(); + return true; + } + bool ms_handle_reset(Connection *con) { + Mutex::Locker l(lock); + done = true; + ping_recvd_cond.SignalAll(); + return true; + } + void ms_handle_remote_reset(Connection *con) {} +}; + class MonClient : public Dispatcher { public: MonMap monmap; @@ -211,6 +264,17 @@ public: int build_initial_monmap(); int get_monmap(); int get_monmap_privately(); + /** + * Ping monitor with ID @p mon_id and record the resulting + * reply in @p result_reply. + * + * @param[in] mon_id Target monitor's ID + * @param[out] Resulting reply from mon.ID, if param != NULL + * @returns 0 in case of success; < 0 in case of error, + * -ETIMEDOUT if monitor didn't reply before timeout + * expired (default: conf->client_mount_timeout). + */ + int ping_monitor(const string &mon_id, string *result_reply); void send_mon_message(Message *m) { Mutex::Locker l(monc_lock); diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index d8c90bc3d76..cd541f6bf83 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -52,6 +52,7 @@ #include "messages/MTimeCheck.h" #include "messages/MMonHealth.h" +#include "messages/MPing.h" #include "common/strtol.h" #include "common/ceph_argparse.h" @@ -241,9 +242,11 @@ void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format, boost::scoped_ptr f(new_formatter(format)); - if (command == "mon_status") + if (command == "mon_status") { _mon_status(f.get(), ss); - else if (command == "quorum_status") + if (f) + f->flush(ss); + } else if (command == "quorum_status") _quorum_status(f.get(), ss); else if (command == "sync_force") { string validate; @@ -1673,9 +1676,11 @@ void Monitor::_mon_status(Formatter *f, ostream& ss) f->close_section(); // mon_status - f->flush(ss); - if (free_formatter) + if (free_formatter) { + // flush formatter to ss and delete it iff we created the formatter + f->flush(ss); delete f; + } } void Monitor::get_health(string& status, bufferlist *detailbl, Formatter *f) @@ -2176,6 +2181,8 @@ void Monitor::handle_command(MMonCommand *m) r = 0; } else if (prefix == "mon_status") { _mon_status(f.get(), ds); + if (f) + f->flush(ds); rdata.append(ds); rs = ""; r = 0; @@ -2585,6 +2592,10 @@ bool Monitor::_ms_dispatch(Message *m) // of assessing whether we should handle it or not. if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH && m->get_type() != CEPH_MSG_MON_GET_MAP)) { + if (m->get_type() == CEPH_MSG_PING) { + // let it go through and be dispatched immediately! + return dispatch(s, m, false); + } dout(1) << __func__ << " dropping stray message " << *m << " from " << m->get_source_inst() << dendl; return false; @@ -2797,6 +2808,10 @@ bool Monitor::dispatch(MonSession *s, Message *m, const bool src_is_mon) health_monitor->dispatch(static_cast(m)); break; + case CEPH_MSG_PING: + handle_ping(static_cast(m)); + break; + default: ret = false; } @@ -2804,6 +2819,32 @@ bool Monitor::dispatch(MonSession *s, Message *m, const bool src_is_mon) return ret; } +void Monitor::handle_ping(MPing *m) +{ + dout(10) << __func__ << " " << *m << dendl; + MPing *reply = new MPing; + entity_inst_t inst = m->get_source_inst(); + bufferlist payload; + Formatter *f = new JSONFormatter(true); + f->open_object_section("pong"); + + string health_str; + get_health(health_str, NULL, f); + { + stringstream ss; + _mon_status(f, ss); + } + + f->close_section(); + stringstream ss; + f->flush(ss); + ::encode(ss.str(), payload); + reply->set_payload(payload); + dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl; + messenger->send_message(reply, inst); + m->put(); +} + void Monitor::timecheck_start() { dout(10) << __func__ << dendl; diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 2c1c2cdeb19..2c066f6e263 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -46,6 +46,7 @@ #include "perfglue/heap_profiler.h" #include "messages/MMonCommand.h" +#include "messages/MPing.h" #include "mon/MonitorStore.h" #include "mon/MonitorDBStore.h" @@ -485,6 +486,10 @@ private: /** * @} */ + /** + * Handle ping messages from others. + */ + void handle_ping(MPing *m); Context *probe_timeout_event; // for probing diff --git a/src/pybind/rados.py b/src/pybind/rados.py index d977c1a298a..fecf4bb16d0 100644 --- a/src/pybind/rados.py +++ b/src/pybind/rados.py @@ -67,6 +67,10 @@ class LogicError(Error): """ `` class, derived from `Error` """ pass +class TimedOut(Error): + """ `TimedOut` class, derived from `Error` """ + pass + def make_ex(ret, msg): """ Translate a librados return code into an exception. @@ -85,7 +89,8 @@ def make_ex(ret, msg): errno.ENOSPC : NoSpace, errno.EEXIST : ObjectExists, errno.ENODATA : NoData, - errno.EINTR : InterruptedOrTimeoutError + errno.EINTR : InterruptedOrTimeoutError, + errno.ETIMEDOUT : TimedOut } ret = abs(ret) if ret in errors: @@ -361,6 +366,37 @@ Rados object in state %s." % (self.state)) if (ret != 0): raise make_ex(ret, "error calling conf_set") + + def ping_monitor(self, mon_id): + """ + Ping a monitor to assess liveness + + May be used as a simply way to assess liveness, or to obtain + informations about the monitor in a simple way even in the + absence of quorum. + + :param mon_id: the ID portion of the monitor's name (i.e., mon.) + :type mon_id: str + :returns: the string reply from the monitor + """ + + self.require_state("configuring", "connected") + + outstrp = pointer(pointer(c_char())) + outstrlen = c_long() + + ret = run_in_thread(self.librados.rados_ping_monitor, + (self.cluster, c_char_p(mon_id), + outstrp, byref(outstrlen))) + + my_outstr = outstrp.contents[:(outstrlen.value)] + if outstrlen.value: + run_in_thread(self.librados.rados_buffer_free, (outstrp.contents,)) + + if ret != 0: + raise make_ex(ret, "error calling ping_monitor") + return my_outstr + def connect(self, timeout=0): """ Connect to the cluster.