Merge pull request #730 from ceph/wip-monc-ping

mon: MonClient: ping monitors without authenticating

* add support on the monitor to reply to MPing messages with the contents of
  'mon_status' and 'health', regardless of a client having authenticated beforehand.

* add support on the MonClient to send a MPing message to a randomly picked
  monitor (it was easier this way, '-m ip:port' allows for targeted ping) and block
  waiting for a reply.

* add support on librados, pybind/rados.py and the 'ceph' tool to send pings to
  monitors.

Resolves: #5984

Reviewed-by: Greg Farnum <greg@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
Reviewed-by: Dan Mick <dan.mick@inktank.com>
Reviewed-by: Sage Weil <sage@inktank.com>
This commit is contained in:
João Eduardo Luís 2013-10-22 19:18:55 -07:00
commit c2cf8489bc
10 changed files with 307 additions and 15 deletions

View File

@ -475,6 +475,19 @@ def complete(sigdict, args, target):
return 0
###
# ping a monitor
###
def ping_monitor(cluster_handle, name):
if 'mon.' not in name:
print >> sys.stderr, '"ping" expects a monitor to ping; try "ping mon.<id>"'
return 1
mon_id = name[len('mon.'):]
s = cluster_handle.ping_monitor(mon_id)
print s
return 0
###
# main
###
@ -603,7 +616,14 @@ def main():
hdr('Monitor commands:')
print '[Contacting monitor, timeout after %d seconds]' % timeout
if childargs[0] == 'ping':
if len(childargs) < 2:
print >> sys.stderr, '"ping" requires a monitor name as argument: "ping mon.<id>"'
return 1
try:
if childargs[0] == 'ping':
return ping_monitor(cluster_handle, childargs[1])
cluster_handle.connect(timeout=timeout)
except KeyboardInterrupt:
print >> sys.stderr, 'Cluster connection aborted'

View File

@ -223,6 +223,23 @@ int rados_create2(rados_t *pcluster, const char *const clustername,
*/
int rados_create_with_context(rados_t *cluster, rados_config_t cct);
/**
* Ping the monitor with ID @p mon_id, storing the resulting reply in
* @p buf (if specified) with a maximum size of @p len.
*
* The result buffer is allocated on the heap; the caller is
* expected to release that memory with rados_buffer_free(). The
* buffer and length pointers can be NULL, in which case they are
* not filled in.
*
* @param cluster cluster handle
* @param[in] mon_id ID of the monitor to ping
* @param[out] outstr double pointer with the resulting reply
* @param[out] outstrlen pointer with the size of the reply in @p outstr
*/
int rados_ping_monitor(rados_t cluster, const char *mon_id,
char **outstr, size_t *outstrlen);
/**
* Connect to the cluster.
*

View File

@ -131,6 +131,26 @@ int librados::RadosClient::get_fsid(std::string *s)
return 0;
}
int librados::RadosClient::ping_monitor(const string mon_id, string *result)
{
int err = 0;
/* If we haven't yet connected, we have no way of telling whether we
* already built monc's initial monmap. IF we are in CONNECTED state,
* then it is safe to assume that we went through connect(), which does
* build a monmap.
*/
if (state != CONNECTED) {
ldout(cct, 10) << __func__ << " build monmap" << dendl;
err = monclient.build_initial_monmap();
}
if (err < 0) {
return err;
}
err = monclient.ping_monitor(mon_id, result);
return err;
}
int librados::RadosClient::connect()
{
common_init_finish(cct);

View File

@ -77,6 +77,7 @@ public:
RadosClient(CephContext *cct_);
~RadosClient();
int ping_monitor(string mon_id, string *result);
int connect();
void shutdown();

View File

@ -1900,6 +1900,22 @@ static void do_out_buffer(string& outbl, char **outbuf, size_t *outbuflen)
*outbuflen = outbl.length();
}
extern "C" int rados_ping_monitor(rados_t cluster, const char *mon_id,
char **outstr, size_t *outstrlen)
{
librados::RadosClient *client = (librados::RadosClient *)cluster;
string str;
if (!mon_id)
return -EINVAL;
int ret = client->ping_monitor(mon_id, &str);
if (ret == 0 && !str.empty() && outstr && outstrlen) {
do_out_buffer(str, outstr, outstrlen);
}
return ret;
}
extern "C" int rados_mon_command(rados_t cluster, const char **cmd,
size_t cmdlen,
const char *inbuf, size_t inbuflen,

View File

@ -22,6 +22,7 @@
#include "messages/MAuthReply.h"
#include "messages/MMonCommand.h"
#include "messages/MMonCommandAck.h"
#include "messages/MPing.h"
#include "messages/MMonSubscribe.h"
#include "messages/MMonSubscribeAck.h"
@ -107,32 +108,31 @@ int MonClient::get_monmap_privately()
{
ldout(cct, 10) << "get_monmap_privately" << dendl;
Mutex::Locker l(monc_lock);
bool temp_msgr = false;
SimpleMessenger* smessenger = NULL;
if (!messenger) {
messenger = smessenger = new SimpleMessenger(cct,
entity_name_t::CLIENT(-1),
"temp_mon_client",
getpid());
"temp_mon_client", getpid());
messenger->add_dispatcher_head(this);
smessenger->start();
temp_msgr = true;
temp_msgr = true;
}
int attempt = 10;
ldout(cct, 10) << "have " << monmap.epoch << " fsid " << monmap.fsid << dendl;
while (monmap.fsid.is_zero()) {
cur_mon = _pick_random_mon();
cur_con = messenger->get_connection(monmap.get_inst(cur_mon));
ldout(cct, 10) << "querying mon." << cur_mon << " " << cur_con->get_peer_addr() << dendl;
messenger->send_message(new MMonGetMap, cur_con);
if (--attempt == 0)
break;
utime_t interval;
interval.set_from_double(cct->_conf->mon_client_hunt_interval);
map_cond.WaitInterval(cct, monc_lock, interval);
@ -153,7 +153,7 @@ int MonClient::get_monmap_privately()
messenger = 0;
monc_lock.Lock();
}
hunting = true; // reset this to true!
cur_mon.clear();
@ -165,6 +165,78 @@ int MonClient::get_monmap_privately()
}
/**
* Ping the monitor with id @p mon_id and set the resulting reply in
* the provided @p result_reply, if this last parameter is not NULL.
*
* So that we don't rely on the MonClient's default messenger, set up
* during connect(), we create our own messenger to comunicate with the
* specified monitor. This is advantageous in the following ways:
*
* - Isolate the ping procedure from the rest of the MonClient's operations,
* allowing us to not acquire or manage the big monc_lock, thus not
* having to block waiting for some other operation to finish before we
* can proceed.
* * for instance, we can ping mon.FOO even if we are currently hunting
* or blocked waiting for auth to complete with mon.BAR.
*
* - Ping a monitor prior to establishing a connection (using connect())
* and properly establish the MonClient's messenger. This frees us
* from dealing with the complex foo that happens in connect().
*
* We also don't rely on MonClient as a dispatcher for this messenger,
* unlike what happens with the MonClient's default messenger. This allows
* us to sandbox the whole ping, having it much as a separate entity in
* the MonClient class, considerably simplifying the handling and dispatching
* of messages without needing to consider monc_lock.
*
* Current drawback is that we will establish a messenger for each ping
* we want to issue, instead of keeping a single messenger instance that
* would be used for all pings.
*/
int MonClient::ping_monitor(const string &mon_id, string *result_reply)
{
ldout(cct, 10) << __func__ << dendl;
if (mon_id.empty()) {
ldout(cct, 10) << __func__ << " specified mon id is empty!" << dendl;
return -EINVAL;
} else if (!monmap.contains(mon_id)) {
ldout(cct, 10) << __func__ << " no such monitor 'mon." << mon_id << "'"
<< dendl;
return -ENOENT;
}
MonClientPinger *pinger = new MonClientPinger(cct, result_reply);
Messenger *smsgr = new SimpleMessenger(cct,
entity_name_t::CLIENT(-1),
"temp_ping_client", getpid());
smsgr->add_dispatcher_head(pinger);
smsgr->start();
ConnectionRef con = smsgr->get_connection(monmap.get_inst(mon_id));
ldout(cct, 10) << __func__ << " ping mon." << mon_id
<< " " << con->get_peer_addr() << dendl;
smsgr->send_message(new MPing, con);
pinger->lock.Lock();
int ret = pinger->wait_for_reply(cct->_conf->client_mount_timeout);
if (ret == 0) {
ldout(cct,10) << __func__ << " got ping reply" << dendl;
} else {
ret = -ret;
}
pinger->lock.Unlock();
smsgr->mark_down(con);
smsgr->shutdown();
smsgr->wait();
delete smsgr;
delete pinger;
return ret;
}
bool MonClient::ms_dispatch(Message *m)
{
if (my_addr == entity_addr_t())

View File

@ -42,6 +42,7 @@ class MMonCommandAck;
class MCommandReply;
struct MAuthReply;
class MAuthRotating;
class MPing;
class LogClient;
class AuthSupported;
class AuthAuthorizeHandlerRegistry;
@ -54,6 +55,58 @@ enum MonClientState {
MC_STATE_HAVE_SESSION,
};
struct MonClientPinger : public Dispatcher {
Mutex lock;
Cond ping_recvd_cond;
string *result;
bool done;
MonClientPinger(CephContext *cct_, string *res_) :
Dispatcher(cct_),
lock("MonClientPinger::lock"),
result(res_),
done(false)
{ }
int wait_for_reply(double timeout = 0.0) {
utime_t until = ceph_clock_now(cct);
until += (timeout > 0 ? timeout : cct->_conf->client_mount_timeout);
done = false;
int ret = 0;
while (!done) {
ret = ping_recvd_cond.WaitUntil(lock, until);
if (ret == -ETIMEDOUT)
break;
}
return ret;
}
bool ms_dispatch(Message *m) {
Mutex::Locker l(lock);
if (m->get_type() != CEPH_MSG_PING)
return false;
bufferlist &payload = m->get_payload();
if (result && payload.length() > 0) {
bufferlist::iterator p = payload.begin();
::decode(*result, p);
}
done = true;
ping_recvd_cond.SignalAll();
m->put();
return true;
}
bool ms_handle_reset(Connection *con) {
Mutex::Locker l(lock);
done = true;
ping_recvd_cond.SignalAll();
return true;
}
void ms_handle_remote_reset(Connection *con) {}
};
class MonClient : public Dispatcher {
public:
MonMap monmap;
@ -211,6 +264,17 @@ public:
int build_initial_monmap();
int get_monmap();
int get_monmap_privately();
/**
* Ping monitor with ID @p mon_id and record the resulting
* reply in @p result_reply.
*
* @param[in] mon_id Target monitor's ID
* @param[out] Resulting reply from mon.ID, if param != NULL
* @returns 0 in case of success; < 0 in case of error,
* -ETIMEDOUT if monitor didn't reply before timeout
* expired (default: conf->client_mount_timeout).
*/
int ping_monitor(const string &mon_id, string *result_reply);
void send_mon_message(Message *m) {
Mutex::Locker l(monc_lock);

View File

@ -52,6 +52,7 @@
#include "messages/MTimeCheck.h"
#include "messages/MMonHealth.h"
#include "messages/MPing.h"
#include "common/strtol.h"
#include "common/ceph_argparse.h"
@ -241,9 +242,11 @@ void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format,
boost::scoped_ptr<Formatter> f(new_formatter(format));
if (command == "mon_status")
if (command == "mon_status") {
_mon_status(f.get(), ss);
else if (command == "quorum_status")
if (f)
f->flush(ss);
} else if (command == "quorum_status")
_quorum_status(f.get(), ss);
else if (command == "sync_force") {
string validate;
@ -1673,9 +1676,11 @@ void Monitor::_mon_status(Formatter *f, ostream& ss)
f->close_section(); // mon_status
f->flush(ss);
if (free_formatter)
if (free_formatter) {
// flush formatter to ss and delete it iff we created the formatter
f->flush(ss);
delete f;
}
}
void Monitor::get_health(string& status, bufferlist *detailbl, Formatter *f)
@ -2176,6 +2181,8 @@ void Monitor::handle_command(MMonCommand *m)
r = 0;
} else if (prefix == "mon_status") {
_mon_status(f.get(), ds);
if (f)
f->flush(ds);
rdata.append(ds);
rs = "";
r = 0;
@ -2585,6 +2592,10 @@ bool Monitor::_ms_dispatch(Message *m)
// of assessing whether we should handle it or not.
if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH &&
m->get_type() != CEPH_MSG_MON_GET_MAP)) {
if (m->get_type() == CEPH_MSG_PING) {
// let it go through and be dispatched immediately!
return dispatch(s, m, false);
}
dout(1) << __func__ << " dropping stray message " << *m
<< " from " << m->get_source_inst() << dendl;
return false;
@ -2797,6 +2808,10 @@ bool Monitor::dispatch(MonSession *s, Message *m, const bool src_is_mon)
health_monitor->dispatch(static_cast<MMonHealth *>(m));
break;
case CEPH_MSG_PING:
handle_ping(static_cast<MPing*>(m));
break;
default:
ret = false;
}
@ -2804,6 +2819,32 @@ bool Monitor::dispatch(MonSession *s, Message *m, const bool src_is_mon)
return ret;
}
void Monitor::handle_ping(MPing *m)
{
dout(10) << __func__ << " " << *m << dendl;
MPing *reply = new MPing;
entity_inst_t inst = m->get_source_inst();
bufferlist payload;
Formatter *f = new JSONFormatter(true);
f->open_object_section("pong");
string health_str;
get_health(health_str, NULL, f);
{
stringstream ss;
_mon_status(f, ss);
}
f->close_section();
stringstream ss;
f->flush(ss);
::encode(ss.str(), payload);
reply->set_payload(payload);
dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
messenger->send_message(reply, inst);
m->put();
}
void Monitor::timecheck_start()
{
dout(10) << __func__ << dendl;

View File

@ -46,6 +46,7 @@
#include "perfglue/heap_profiler.h"
#include "messages/MMonCommand.h"
#include "messages/MPing.h"
#include "mon/MonitorStore.h"
#include "mon/MonitorDBStore.h"
@ -485,6 +486,10 @@ private:
/**
* @}
*/
/**
* Handle ping messages from others.
*/
void handle_ping(MPing *m);
Context *probe_timeout_event; // for probing

View File

@ -67,6 +67,10 @@ class LogicError(Error):
""" `` class, derived from `Error` """
pass
class TimedOut(Error):
""" `TimedOut` class, derived from `Error` """
pass
def make_ex(ret, msg):
"""
Translate a librados return code into an exception.
@ -85,7 +89,8 @@ def make_ex(ret, msg):
errno.ENOSPC : NoSpace,
errno.EEXIST : ObjectExists,
errno.ENODATA : NoData,
errno.EINTR : InterruptedOrTimeoutError
errno.EINTR : InterruptedOrTimeoutError,
errno.ETIMEDOUT : TimedOut
}
ret = abs(ret)
if ret in errors:
@ -361,6 +366,37 @@ Rados object in state %s." % (self.state))
if (ret != 0):
raise make_ex(ret, "error calling conf_set")
def ping_monitor(self, mon_id):
"""
Ping a monitor to assess liveness
May be used as a simply way to assess liveness, or to obtain
informations about the monitor in a simple way even in the
absence of quorum.
:param mon_id: the ID portion of the monitor's name (i.e., mon.<ID>)
:type mon_id: str
:returns: the string reply from the monitor
"""
self.require_state("configuring", "connected")
outstrp = pointer(pointer(c_char()))
outstrlen = c_long()
ret = run_in_thread(self.librados.rados_ping_monitor,
(self.cluster, c_char_p(mon_id),
outstrp, byref(outstrlen)))
my_outstr = outstrp.contents[:(outstrlen.value)]
if outstrlen.value:
run_in_thread(self.librados.rados_buffer_free, (outstrp.contents,))
if ret != 0:
raise make_ex(ret, "error calling ping_monitor")
return my_outstr
def connect(self, timeout=0):
"""
Connect to the cluster.