mon/monclient: hunt for multiple monitor in parallel

* add an option "mon_client_hunt_parallel" for the maxmimum number of parallel
  hunting sessions.

Fixes: http://tracker.ceph.com/issues/16091
Signed-off-by: Steven Dieffenbach <sdieffen@redhat.com>
Signed-off-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2016-09-14 21:39:04 +08:00
parent 8729e1eb9c
commit a2eb6ae3fb
12 changed files with 456 additions and 291 deletions

View File

@ -12728,7 +12728,7 @@ bool Client::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool
{
if (dest_type == CEPH_ENTITY_TYPE_MON)
return true;
*authorizer = monclient->auth->build_authorizer(dest_type);
*authorizer = monclient->build_authorizer(dest_type);
return true;
}

View File

@ -1,24 +0,0 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef __CEPH_COMMON_SIMPLERNG_H_
#define __CEPH_COMMON_SIMPLERNG_H_
/*
* rand() is not thread-safe. random_r family segfaults.
* boost::random::* have build issues.
*/
class SimpleRNG {
unsigned m_z, m_w;
public:
SimpleRNG(int seed) : m_z(seed), m_w(123) {}
unsigned operator()() {
m_z = 36969 * (m_z & 65535) + (m_z >> 16);
m_w = 18000 * (m_w & 65535) + (m_w >> 16);
return (m_z << 16) + m_w;
}
};
#endif

View File

@ -398,6 +398,7 @@ OPTION(cephx_sign_messages, OPT_BOOL, true) // Default to signing session messa
OPTION(auth_mon_ticket_ttl, OPT_DOUBLE, 60*60*12)
OPTION(auth_service_ticket_ttl, OPT_DOUBLE, 60*60)
OPTION(auth_debug, OPT_BOOL, false) // if true, assert when weird things happen
OPTION(mon_client_hunt_parallel, OPT_U32, 2) // how many mons to try to connect to in parallel during hunt
OPTION(mon_client_hunt_interval, OPT_DOUBLE, 3.0) // try new mon every N seconds until we connect
OPTION(mon_client_ping_interval, OPT_DOUBLE, 10.0) // ping every N seconds
OPTION(mon_client_ping_timeout, OPT_DOUBLE, 30.0) // fail if we don't hear back

View File

@ -60,7 +60,7 @@ bool librados::RadosClient::ms_get_authorizer(int dest_type,
/* monitor authorization is being handled on different layer */
if (dest_type == CEPH_ENTITY_TYPE_MON)
return true;
*authorizer = monclient.auth->build_authorizer(dest_type);
*authorizer = monclient.build_authorizer(dest_type);
return *authorizer != NULL;
}

View File

@ -1185,7 +1185,7 @@ bool MDSDaemon::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bo
return false;
}
*authorizer = monc->auth->build_authorizer(dest_type);
*authorizer = monc->build_authorizer(dest_type);
return *authorizer != NULL;
}

View File

@ -117,7 +117,7 @@ bool DaemonServer::ms_get_authorizer(int dest_type,
return false;
}
*authorizer = monc->auth->build_authorizer(dest_type);
*authorizer = monc->build_authorizer(dest_type);
dout(20) << "got authorizer " << *authorizer << dendl;
return *authorizer != NULL;
}

View File

@ -211,7 +211,7 @@ bool MgrStandby::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
return false;
}
*authorizer = monc->auth->build_authorizer(dest_type);
*authorizer = monc->build_authorizer(dest_type);
return *authorizer != NULL;
}

View File

@ -12,6 +12,8 @@
*
*/
#include <random>
#include "messages/MMonGetMap.h"
#include "messages/MMonGetVersion.h"
#include "messages/MMonGetVersionReply.h"
@ -39,24 +41,18 @@
#define dout_subsys ceph_subsys_monc
#undef dout_prefix
#define dout_prefix *_dout << "monclient" << (hunting ? "(hunting)":"") << ": "
#define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
MonClient::MonClient(CephContext *cct_) :
Dispatcher(cct_),
state(MC_STATE_NONE),
messenger(NULL),
cur_con(NULL),
rng(getpid()),
monc_lock("MonClient::monc_lock"),
timer(cct_, monc_lock), finisher(cct_),
initialized(false),
no_keyring_disabled_cephx(false),
log_client(NULL),
more_log_pending(false),
hunting(true),
want_monmap(true),
want_keys(0), global_id(0),
authenticate_err(0),
had_a_connection(false),
reopen_interval_multiplier(1.0),
last_mon_command_tid(0),
@ -80,7 +76,7 @@ int MonClient::get_monmap()
Mutex::Locker l(monc_lock);
_sub_want("monmap", 0, 0);
if (cur_mon.empty())
if (!_opened())
_reopen_session();
while (want_monmap)
@ -111,14 +107,17 @@ int MonClient::get_monmap_privately()
ldout(cct, 10) << "have " << monmap.epoch << " fsid " << monmap.fsid << dendl;
std::random_device rd;
std::mt19937 rng(rd());
assert(monmap.size() > 0);
std::uniform_int_distribution<unsigned> ranks(0, monmap.size() - 1);
while (monmap.fsid.is_zero()) {
cur_mon = _pick_random_mon();
cur_con = messenger->get_connection(monmap.get_inst(cur_mon));
if (cur_con) {
ldout(cct, 10) << "querying mon." << cur_mon << " "
<< cur_con->get_peer_addr() << dendl;
cur_con->send_message(new MMonGetMap);
}
auto rank = ranks(rng);
auto& pending_con = _add_conn(rank);
auto con = pending_con.get_con();
ldout(cct, 10) << "querying mon." << monmap.get_name(rank) << " "
<< con->get_peer_addr() << dendl;
con->send_message(new MMonGetMap);
if (--attempt == 0)
break;
@ -127,17 +126,13 @@ int MonClient::get_monmap_privately()
interval.set_from_double(cct->_conf->mon_client_hunt_interval);
map_cond.WaitInterval(monc_lock, interval);
if (monmap.fsid.is_zero() && cur_con) {
cur_con->mark_down(); // nope, clean that connection up
if (monmap.fsid.is_zero() && con) {
con->mark_down(); // nope, clean that connection up
}
}
if (temp_msgr) {
if (cur_con) {
cur_con->mark_down();
cur_con.reset(NULL);
cur_mon.clear();
}
pending_cons.clear();
monc_lock.Unlock();
messenger->shutdown();
if (smessenger)
@ -147,9 +142,7 @@ int MonClient::get_monmap_privately()
monc_lock.Lock();
}
hunting = true; // reset this to true!
cur_mon.clear();
cur_con.reset(NULL);
pending_cons.clear();
if (!monmap.fsid.is_zero())
return 0;
@ -254,8 +247,15 @@ bool MonClient::ms_dispatch(Message *m)
Mutex::Locker lock(monc_lock);
// ignore any messages outside our current session
if (m->get_connection() != cur_con) {
if (_hunting()) {
if (!pending_cons.count(m->get_source_addr())) {
// ignore any messages outside hunting sessions
ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
m->put();
return true;
}
} else if (!active_con || active_con->get_con() != m->get_connection()) {
// ignore any messages outside our session(s)
ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
m->put();
return true;
@ -311,10 +311,12 @@ void MonClient::flush_log()
void MonClient::handle_monmap(MMonMap *m)
{
ldout(cct, 10) << __func__ << " " << *m << dendl;
auto peer = m->get_source_addr();
string cur_mon = monmap.get_name(peer);
bufferlist::iterator p = m->monmapbl.begin();
::decode(monmap, p);
assert(!cur_mon.empty());
ldout(cct, 10) << " got monmap " << monmap.epoch
<< ", mon." << cur_mon << " is now rank " << monmap.get_rank(cur_mon)
<< dendl;
@ -324,9 +326,10 @@ void MonClient::handle_monmap(MMonMap *m)
_sub_got("monmap", monmap.get_epoch());
if (!monmap.get_addr_name(cur_con->get_peer_addr(), cur_mon)) {
if (!monmap.get_addr_name(peer, cur_mon)) {
ldout(cct, 10) << "mon." << cur_mon << " went away" << dendl;
_reopen_session(); // can't find the mon we were talking to (above)
// can't find the mon we were talking to (above)
_reopen_session();
}
map_cond.Signal();
@ -409,10 +412,9 @@ void MonClient::shutdown()
waiting_for_session.pop_front();
}
if (cur_con)
cur_con->mark_down();
cur_con.reset(NULL);
cur_mon.clear();
active_con.reset();
pending_cons.clear();
auth.reset();
monc_lock.Unlock();
@ -430,20 +432,20 @@ int MonClient::authenticate(double timeout)
{
Mutex::Locker lock(monc_lock);
if (state == MC_STATE_HAVE_SESSION) {
if (active_con) {
ldout(cct, 5) << "already authenticated" << dendl;
return 0;
}
_sub_want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
if (cur_mon.empty())
if (!_opened())
_reopen_session();
utime_t until = ceph_clock_now();
until += timeout;
if (timeout > 0.0)
ldout(cct, 10) << "authenticate will time out at " << until << dendl;
while (state != MC_STATE_HAVE_SESSION && !authenticate_err) {
while (!active_con && !authenticate_err) {
if (timeout > 0.0) {
int r = auth_cond.WaitUntil(monc_lock, until);
if (r == ETIMEDOUT) {
@ -455,8 +457,9 @@ int MonClient::authenticate(double timeout)
}
}
if (state == MC_STATE_HAVE_SESSION) {
ldout(cct, 5) << __func__ << " success, global_id " << global_id << dendl;
if (active_con) {
ldout(cct, 5) << __func__ << " success, global_id "
<< active_con->get_global_id() << dendl;
}
if (authenticate_err < 0 && no_keyring_disabled_cephx) {
@ -468,104 +471,94 @@ int MonClient::authenticate(double timeout)
void MonClient::handle_auth(MAuthReply *m)
{
Context *cb = NULL;
bufferlist::iterator p = m->result_bl.begin();
if (state == MC_STATE_NEGOTIATING) {
if (!auth || (int)m->protocol != auth->get_protocol()) {
auth.reset(get_auth_client_handler(cct, m->protocol,
rotating_secrets.get()));
if (!auth) {
ldout(cct, 10) << "no handler for protocol " << m->protocol << dendl;
if (m->result == -ENOTSUP) {
ldout(cct, 10) << "none of our auth protocols are supported by the server"
<< dendl;
authenticate_err = m->result;
auth_cond.SignalAll();
}
m->put();
return;
}
// do not request MGR key unless the mon has the SERVER_KRAKEN
// feature. otherwise it will give us an auth error. note that
// we have to use the FEATUREMASK because pre-jewel the kraken
// feature bit was used for something else.
if ((want_keys & CEPH_ENTITY_TYPE_MGR) &&
!(m->get_connection()->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) {
ldout(cct, 1) << __func__
<< " not requesting MGR keys from pre-kraken monitor"
<< dendl;
want_keys &= ~CEPH_ENTITY_TYPE_MGR;
}
auth->set_want_keys(want_keys);
auth->init(entity_name);
auth->set_global_id(global_id);
} else {
auth->reset();
assert(monc_lock.is_locked());
if (!_hunting()) {
std::swap(active_con->get_auth(), auth);
int ret = active_con->authenticate(m);
m->put();
std::swap(auth, active_con->get_auth());
if (ret != -EAGAIN) {
_finish_auth(ret);
}
state = MC_STATE_AUTHENTICATING;
}
assert(auth);
if (m->global_id && m->global_id != global_id) {
global_id = m->global_id;
auth->set_global_id(global_id);
ldout(cct, 10) << "my global_id is " << m->global_id << dendl;
}
int ret = auth->handle_response(m->result, p);
m->put();
if (ret == -EAGAIN) {
MAuth *ma = new MAuth;
ma->protocol = auth->get_protocol();
auth->prepare_build_request();
ret = auth->build_request(ma->auth_payload);
_send_mon_message(ma, true);
return;
}
// hunting
auto found = pending_cons.find(m->get_source_addr());
assert(found != pending_cons.end());
int auth_err = found->second.handle_auth(m, entity_name, want_keys,
rotating_secrets.get());
m->put();
if (auth_err == -EAGAIN) {
return;
}
if (auth_err) {
pending_cons.erase(found);
if (!pending_cons.empty()) {
// keep trying with pending connections
return;
}
// the last try just failed, give up.
} else {
auto& mc = found->second;
assert(mc.have_session());
active_con.reset(new MonConnection(std::move(mc)));
pending_cons.clear();
}
_finish_hunting();
authenticate_err = ret;
if (ret == 0) {
if (state != MC_STATE_HAVE_SESSION) {
state = MC_STATE_HAVE_SESSION;
last_rotating_renew_sent = utime_t();
while (!waiting_for_session.empty()) {
_send_mon_message(waiting_for_session.front());
waiting_for_session.pop_front();
}
_resend_mon_commands();
if (log_client) {
log_client->reset_session();
send_log();
}
if (session_established_context) {
cb = session_established_context.release();
}
if (!auth_err) {
last_rotating_renew_sent = utime_t();
while (!waiting_for_session.empty()) {
_send_mon_message(waiting_for_session.front());
waiting_for_session.pop_front();
}
_check_auth_tickets();
_resend_mon_commands();
if (log_client) {
log_client->reset_session();
send_log();
}
if (active_con)
std::swap(auth, active_con->get_auth());
}
auth_cond.SignalAll();
if (cb) {
monc_lock.Unlock();
cb->complete(0);
monc_lock.Lock();
_finish_auth(auth_err);
if (!auth_err) {
Context *cb = nullptr;
if (session_established_context) {
cb = session_established_context.release();
}
if (cb) {
monc_lock.Unlock();
cb->complete(0);
monc_lock.Lock();
}
}
}
void MonClient::_finish_auth(int auth_err)
{
authenticate_err = auth_err;
// _resend_mon_commands() could _reopen_session() if the connected mon is not
// the one the MonCommand is targeting.
if (!auth_err && active_con) {
assert(auth);
_check_auth_tickets();
}
auth_cond.SignalAll();
}
// ---------
void MonClient::_send_mon_message(Message *m, bool force)
void MonClient::_send_mon_message(Message *m)
{
assert(monc_lock.is_locked());
assert(!cur_mon.empty());
if (force || state == MC_STATE_HAVE_SESSION) {
assert(cur_con);
ldout(cct, 10) << "_send_mon_message to mon." << cur_mon
if (active_con) {
auto cur_con = active_con->get_con();
ldout(cct, 10) << "_send_mon_message to mon."
<< monmap.get_name(cur_con->get_peer_addr())
<< " at " << cur_con->get_peer_addr() << dendl;
cur_con->send_message(m);
} else {
@ -573,48 +566,23 @@ void MonClient::_send_mon_message(Message *m, bool force)
}
}
string MonClient::_pick_random_mon()
{
assert(monmap.size() > 0);
if (monmap.size() == 1) {
return monmap.get_name(0);
} else {
int max = monmap.size();
int o = -1;
if (!cur_mon.empty()) {
o = monmap.get_rank(cur_mon);
if (o >= 0)
max--;
}
int32_t n = rng() % max;
if (o >= 0 && n >= o)
n++;
return monmap.get_name(n);
}
}
void MonClient::_reopen_session(int rank, string name)
{
assert(monc_lock.is_locked());
ldout(cct, 10) << __func__ << " rank " << rank << " name " << name << dendl;
if (rank < 0 && name.length() == 0) {
cur_mon = _pick_random_mon();
} else if (name.length()) {
cur_mon = name;
} else {
cur_mon = monmap.get_name(rank);
}
active_con.reset();
pending_cons.clear();
if (cur_con) {
cur_con->mark_down();
_start_hunting();
if (name.length()) {
_add_conn(monmap.get_rank(name));
} else if (rank >= 0) {
_add_conn(rank);
} else {
_add_conns();
}
cur_con = messenger->get_connection(monmap.get_inst(cur_mon));
ldout(cct, 10) << "picked mon." << cur_mon << " con " << cur_con
<< " addr " << cur_con->get_peer_addr()
<< dendl;
// throw out old queued messages
while (!waiting_for_session.empty()) {
@ -629,34 +597,10 @@ void MonClient::_reopen_session(int rank, string name)
version_requests.erase(version_requests.begin());
}
// adjust timeouts if necessary
if (had_a_connection) {
reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff;
if (reopen_interval_multiplier >
cct->_conf->mon_client_hunt_interval_max_multiple)
reopen_interval_multiplier =
cct->_conf->mon_client_hunt_interval_max_multiple;
for (auto& c : pending_cons) {
c.second.start(monmap.get_epoch(), entity_name, *auth_supported);
}
// restart authentication handshake
state = MC_STATE_NEGOTIATING;
hunting = true;
// send an initial keepalive to ensure our timestamp is valid by the
// time we are in an OPENED state (by sequencing this before
// authentication).
cur_con->send_keepalive();
MAuth *m = new MAuth;
m->protocol = 0;
m->monmap_epoch = monmap.get_epoch();
__u8 struct_v = 1;
::encode(struct_v, m->auth_payload);
::encode(auth_supported->get_supported_set(), m->auth_payload);
::encode(entity_name, m->auth_payload);
::encode(global_id, m->auth_payload);
_send_mon_message(m, true);
for (map<string,ceph_mon_subscribe_item>::iterator p = sub_sent.begin();
p != sub_sent.end();
++p) {
@ -667,38 +611,106 @@ void MonClient::_reopen_session(int rank, string name)
_renew_subs();
}
MonConnection& MonClient::_add_conn(unsigned rank)
{
auto peer = monmap.get_addr(rank);
auto conn = messenger->get_connection(monmap.get_inst(rank));
MonConnection mc(cct, conn);
auto inserted = pending_cons.insert(move(make_pair(peer, move(mc))));
ldout(cct, 10) << "picked mon." << monmap.get_name(rank)
<< " con " << conn
<< " addr " << conn->get_peer_addr()
<< dendl;
return inserted.first->second;
}
void MonClient::_add_conns()
{
unsigned n = cct->_conf->mon_client_hunt_parallel;
if (n == 0 || n > monmap.size()) {
n = monmap.size();
}
vector<unsigned> ranks(n);
for (unsigned i = 0; i < n; i++) {
ranks[i] = i;
}
std::random_device rd;
std::mt19937 rng(rd());
std::shuffle(ranks.begin(), ranks.end(), rng);
for (unsigned i = 0; i < n; i++) {
_add_conn(ranks[i]);
}
}
bool MonClient::ms_handle_reset(Connection *con)
{
Mutex::Locker lock(monc_lock);
if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
if (cur_mon.empty() || con != cur_con) {
ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addr() << dendl;
return true;
if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON)
return false;
if (_hunting()) {
if (pending_cons.count(con->get_peer_addr())) {
ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addr() << dendl;
} else {
ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addr() << dendl;
}
return true;
} else {
if (active_con && con == active_con->get_con()) {
ldout(cct, 10) << __func__ << " current mon " << con->get_peer_addr() << dendl;
if (hunting)
return true;
ldout(cct, 0) << "hunting for new mon" << dendl;
_reopen_session();
return false;
} else {
ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addr() << dendl;
return true;
}
}
return false;
}
bool MonClient::_opened() const
{
assert(monc_lock.is_locked());
return active_con || _hunting();
}
bool MonClient::_hunting() const
{
return !pending_cons.empty();
}
void MonClient::_start_hunting()
{
assert(!_hunting());
// adjust timeouts if necessary
if (!had_a_connection)
return;
reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff;
if (reopen_interval_multiplier >
cct->_conf->mon_client_hunt_interval_max_multiple) {
reopen_interval_multiplier =
cct->_conf->mon_client_hunt_interval_max_multiple;
}
}
void MonClient::_finish_hunting()
{
assert(monc_lock.is_locked());
if (hunting) {
ldout(cct, 1) << "found mon." << cur_mon << dendl;
hunting = false;
had_a_connection = true;
reopen_interval_multiplier /= 2.0;
if (reopen_interval_multiplier < 1.0)
reopen_interval_multiplier = 1.0;
// the pending conns have been cleaned.
assert(!_hunting());
if (active_con) {
auto con = active_con->get_con();
ldout(cct, 1) << "found mon."
<< monmap.get_name(con->get_peer_addr())
<< dendl;
} else {
ldout(cct, 1) << "no mon sessions established" << dendl;
}
had_a_connection = true;
reopen_interval_multiplier /= 2.0;
if (reopen_interval_multiplier < 1.0)
reopen_interval_multiplier = 1.0;
}
void MonClient::tick()
@ -707,12 +719,13 @@ void MonClient::tick()
_check_auth_tickets();
if (hunting) {
if (_hunting()) {
ldout(cct, 1) << "continuing hunt" << dendl;
_reopen_session();
} else if (!cur_mon.empty()) {
} else if (active_con) {
// just renew as needed
utime_t now = ceph_clock_now();
auto cur_con = active_con->get_con();
if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) {
ldout(cct, 10) << "renew subs? (now: " << now
<< "; renew after: " << sub_renew_after << ") -- "
@ -724,16 +737,14 @@ void MonClient::tick()
cur_con->send_keepalive();
if (state == MC_STATE_HAVE_SESSION) {
if (cct->_conf->mon_client_ping_timeout > 0 &&
cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
utime_t lk = cur_con->get_last_keepalive_ack();
utime_t interval = now - lk;
if (interval > cct->_conf->mon_client_ping_timeout) {
ldout(cct, 1) << "no keepalive since " << lk << " (" << interval
<< " seconds), reconnecting" << dendl;
_reopen_session();
}
if (cct->_conf->mon_client_ping_timeout > 0 &&
cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
utime_t lk = cur_con->get_last_keepalive_ack();
utime_t interval = now - lk;
if (interval > cct->_conf->mon_client_ping_timeout) {
ldout(cct, 1) << "no keepalive since " << lk << " (" << interval
<< " seconds), reconnecting" << dendl;
_reopen_session();
}
send_log();
@ -753,10 +764,11 @@ void MonClient::schedule_tick()
}
};
if (hunting)
if (_hunting()) {
timer.add_event_after(cct->_conf->mon_client_hunt_interval
* reopen_interval_multiplier, new C_Tick(this));
else
* reopen_interval_multiplier,
new C_Tick(this));
} else
timer.add_event_after(cct->_conf->mon_client_ping_interval, new C_Tick(this));
}
@ -771,7 +783,7 @@ void MonClient::_renew_subs()
}
ldout(cct, 10) << __func__ << dendl;
if (cur_mon.empty())
if (!_opened())
_reopen_session();
else {
if (sub_renew_sent == utime_t())
@ -807,7 +819,7 @@ void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
int MonClient::_check_auth_tickets()
{
assert(monc_lock.is_locked());
if (state == MC_STATE_HAVE_SESSION && auth) {
if (active_con && auth) {
if (auth->need_tickets()) {
ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
MAuth *m = new MAuth;
@ -831,7 +843,7 @@ int MonClient::_check_auth_rotating()
return 0;
}
if (!auth || state != MC_STATE_HAVE_SESSION) {
if (!active_con || !auth) {
ldout(cct, 10) << "_check_auth_rotating waiting for auth session" << dendl;
return 0;
}
@ -902,8 +914,13 @@ int MonClient::wait_auth_rotating(double timeout)
void MonClient::_send_command(MonCommand *r)
{
entity_addr_t peer;
if (active_con) {
peer = active_con->get_con()->get_peer_addr();
}
if (r->target_rank >= 0 &&
r->target_rank != monmap.get_rank(cur_mon)) {
r->target_rank != monmap.get_rank(peer)) {
ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
<< " wants rank " << r->target_rank
<< ", reopening session"
@ -918,7 +935,7 @@ void MonClient::_send_command(MonCommand *r)
}
if (r->target_name.length() &&
r->target_name != cur_mon) {
r->target_name != monmap.get_name(peer)) {
ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
<< " wants mon " << r->target_name
<< ", reopening session"
@ -1111,3 +1128,138 @@ void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
}
m->put();
}
AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
Mutex::Locker l(monc_lock);
assert(auth || active_con->get_auth());
if (auth)
return auth->build_authorizer(service_id);
else
return active_con->get_auth()->build_authorizer(service_id);
}
#define dout_subsys ceph_subsys_monc
#undef dout_prefix
#define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
MonConnection::MonConnection(CephContext *cct, ConnectionRef con)
: cct(cct), con(con)
{}
MonConnection::~MonConnection()
{
if (con) {
con->mark_down();
con.reset();
}
}
bool MonConnection::have_session() const
{
return state == State::HAVE_SESSION;
}
void MonConnection::start(epoch_t epoch,
const EntityName& entity_name,
const AuthMethodList& auth_supported)
{
// restart authentication handshake
state = State::NEGOTIATING;
// send an initial keepalive to ensure our timestamp is valid by the
// time we are in an OPENED state (by sequencing this before
// authentication).
con->send_keepalive();
auto m = new MAuth;
m->protocol = 0;
m->monmap_epoch = epoch;
__u8 struct_v = 1;
::encode(struct_v, m->auth_payload);
::encode(auth_supported.get_supported_set(), m->auth_payload);
::encode(entity_name, m->auth_payload);
::encode(global_id, m->auth_payload);
con->send_message(m);
}
int MonConnection::handle_auth(MAuthReply* m,
const EntityName& entity_name,
uint32_t want_keys,
RotatingKeyRing* keyring)
{
if (state == State::NEGOTIATING) {
int r = _negotiate(m, entity_name, want_keys, keyring);
if (r) {
return r;
}
state = State::AUTHENTICATING;
}
int r = authenticate(m);
if (!r) {
state = State::HAVE_SESSION;
}
return r;
}
int MonConnection::_negotiate(MAuthReply *m,
const EntityName& entity_name,
uint32_t want_keys,
RotatingKeyRing* keyring)
{
if (auth && (int)m->protocol == auth->get_protocol()) {
// good, negotiation completed
auth->reset();
return 0;
}
auth.reset(get_auth_client_handler(cct, m->protocol, keyring));
if (!auth) {
ldout(cct, 10) << "no handler for protocol " << m->protocol << dendl;
if (m->result == -ENOTSUP) {
ldout(cct, 10) << "none of our auth protocols are supported by the server"
<< dendl;
}
return m->result;
}
// do not request MGR key unless the mon has the SERVER_KRAKEN
// feature. otherwise it will give us an auth error. note that
// we have to use the FEATUREMASK because pre-jewel the kraken
// feature bit was used for something else.
if ((want_keys & CEPH_ENTITY_TYPE_MGR) &&
!(m->get_connection()->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) {
ldout(cct, 1) << __func__
<< " not requesting MGR keys from pre-kraken monitor"
<< dendl;
want_keys &= ~CEPH_ENTITY_TYPE_MGR;
}
auth->set_want_keys(want_keys);
auth->init(entity_name);
auth->set_global_id(global_id);
return 0;
}
int MonConnection::authenticate(MAuthReply *m)
{
assert(auth);
if (!m->global_id) {
ldout(cct, 1) << "peer sent an invalid global_id" << dendl;
}
if (m->global_id != global_id) {
// it's a new session
auth->reset();
global_id = m->global_id;
auth->set_global_id(global_id);
ldout(cct, 10) << "my global_id is " << m->global_id << dendl;
}
auto p = m->result_bl.begin();
int ret = auth->handle_response(m->result, p);
if (ret == -EAGAIN) {
auto ma = new MAuth;
ma->protocol = auth->get_protocol();
auth->prepare_build_request();
auth->build_request(ma->auth_payload);
con->send_message(ma);
}
return ret;
}

View File

@ -15,6 +15,8 @@
#ifndef CEPH_MONCLIENT_H
#define CEPH_MONCLIENT_H
#include <memory>
#include "msg/Messenger.h"
#include "MonMap.h"
@ -24,28 +26,20 @@
#include "common/config.h"
#include "auth/AuthClientHandler.h"
#include "auth/RotatingKeyRing.h"
#include "common/SimpleRNG.h"
class MMonMap;
class MMonGetVersion;
class MMonGetVersionReply;
struct MMonSubscribeAck;
class MMonCommandAck;
class MCommandReply;
struct MAuthReply;
class MPing;
class MAuthRotating;
class LogClient;
struct AuthAuthorizer;
class AuthMethodList;
class Messenger;
class AuthClientHandler;
// class RotatingKeyRing;
class KeyRing;
enum MonClientState {
MC_STATE_NONE,
MC_STATE_NEGOTIATING,
MC_STATE_AUTHENTICATING,
MC_STATE_HAVE_SESSION,
};
struct MonClientPinger : public Dispatcher {
@ -102,18 +96,63 @@ struct MonClientPinger : public Dispatcher {
}
};
class MonConnection {
public:
MonConnection(CephContext *cct,
ConnectionRef conn);
~MonConnection();
MonConnection(MonConnection&& rhs) = default;
MonConnection& operator=(MonConnection&&) = default;
MonConnection(const MonConnection& rhs) = delete;
MonConnection& operator=(const MonConnection&) = delete;
int handle_auth(MAuthReply *m,
const EntityName& entity_name,
uint32_t want_keys,
RotatingKeyRing* keyring);
int authenticate(MAuthReply *m);
void start(epoch_t epoch,
const EntityName& entity_name,
const AuthMethodList& auth_supported);
bool have_session() const;
uint64_t get_global_id() const {
return global_id;
}
ConnectionRef get_con() {
return con;
}
std::unique_ptr<AuthClientHandler>& get_auth() {
return auth;
}
private:
int _negotiate(MAuthReply *m,
const EntityName& entity_name,
uint32_t want_keys,
RotatingKeyRing* keyring);
private:
CephContext *cct;
enum class State {
NONE,
NEGOTIATING,
AUTHENTICATING,
HAVE_SESSION,
};
State state = State::NONE;
ConnectionRef con;
std::unique_ptr<AuthClientHandler> auth;
uint64_t global_id = 0;
};
class MonClient : public Dispatcher {
public:
MonMap monmap;
private:
MonClientState state;
Messenger *messenger;
string cur_mon;
ConnectionRef cur_con;
SimpleRNG rng;
std::unique_ptr<MonConnection> active_con;
std::map<entity_addr_t, MonConnection> pending_cons;
EntityName entity_name;
@ -143,24 +182,18 @@ private:
void handle_auth(MAuthReply *m);
// monitor session
bool hunting;
void tick();
void schedule_tick();
Cond auth_cond;
// monclient
bool want_monmap;
uint32_t want_keys;
uint64_t global_id;
// authenticate
private:
Cond map_cond;
int authenticate_err;
private:
// authenticate
std::unique_ptr<AuthClientHandler> auth;
uint32_t want_keys = 0;
Cond auth_cond;
int authenticate_err = 0;
list<Message*> waiting_for_session;
utime_t last_rotating_renew_sent;
@ -168,13 +201,18 @@ private:
bool had_a_connection;
double reopen_interval_multiplier;
string _pick_random_mon();
bool _opened() const;
bool _hunting() const;
void _start_hunting();
void _finish_hunting();
void _finish_auth(int auth_err);
void _reopen_session(int rank, string name);
void _reopen_session() {
_reopen_session(-1, string());
}
void _send_mon_message(Message *m, bool force=false);
MonConnection& _add_conn(unsigned rank);
void _add_conns();
void _send_mon_message(Message *m);
public:
void set_entity_name(EntityName name) { entity_name = name; }
@ -241,9 +279,6 @@ private:
sub_new.erase(what);
}
// auth tickets
public:
std::unique_ptr<AuthClientHandler> auth;
public:
void renew_subs() {
Mutex::Locker l(monc_lock);
@ -362,20 +397,20 @@ public:
}
uint64_t get_global_id() const {
return global_id;
Mutex::Locker l(monc_lock);
if (active_con) {
return active_con->get_global_id();
} else {
return 0;
}
}
void set_messenger(Messenger *m) { messenger = m; }
entity_addr_t get_myaddr() const { return messenger->get_myaddr(); }
void send_auth_message(Message *m) {
_send_mon_message(m, true);
}
AuthAuthorizer* build_authorizer(int service_id) const;
void set_want_keys(uint32_t want) {
want_keys = want;
if (auth)
auth->set_want_keys(want | CEPH_ENTITY_TYPE_MON);
}
// admin commands
@ -404,6 +439,7 @@ private:
void _resend_mon_commands();
int _cancel_mon_command(uint64_t tid, int r);
void _finish_command(MonCommand *r, int ret, string rs);
void _finish_auth();
void handle_mon_command_ack(MMonCommandAck *ack);
public:

View File

@ -6206,7 +6206,7 @@ bool OSD::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool for
}
}
*authorizer = monc->auth->build_authorizer(dest_type);
*authorizer = monc->build_authorizer(dest_type);
return *authorizer != NULL;
}

View File

@ -4420,7 +4420,7 @@ bool Objecter::ms_get_authorizer(int dest_type,
return false;
if (dest_type == CEPH_ENTITY_TYPE_MON)
return true;
*authorizer = monc->auth->build_authorizer(dest_type);
*authorizer = monc->build_authorizer(dest_type);
return *authorizer != NULL;
}

View File

@ -165,6 +165,6 @@ bool MDSUtility::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
return false;
}
*authorizer = monc->auth->build_authorizer(dest_type);
*authorizer = monc->build_authorizer(dest_type);
return *authorizer != NULL;
}