Merge pull request #241 from ceph/wip-4798

#4798

Reviewed-by: Greg Farnum <greg@inktank.com>
This commit is contained in:
Sage Weil 2013-04-23 17:17:02 -07:00
commit e09efda7c8
4 changed files with 53 additions and 11 deletions

View File

@ -9,13 +9,19 @@ class PaxosServiceMessage : public Message {
version_t version;
__s16 session_mon;
uint64_t session_mon_tid;
// track which epoch the leader received a forwarded request in, so we can
// discard forwarded requests appropriately on election boundaries.
epoch_t rx_election_epoch;
PaxosServiceMessage()
: Message(MSG_PAXOS),
version(0), session_mon(-1), session_mon_tid(0) { }
version(0), session_mon(-1), session_mon_tid(0),
rx_election_epoch(0) { }
PaxosServiceMessage(int type, version_t v, int enc_version=1, int compat_enc_version=0)
: Message(type, enc_version, compat_enc_version),
version(v), session_mon(-1), session_mon_tid(0) { }
version(v), session_mon(-1), session_mon_tid(0),
rx_election_epoch(0) { }
protected:
~PaxosServiceMessage() {}

View File

@ -2765,7 +2765,9 @@ void Monitor::forward_request_leader(PaxosServiceMessage *req)
} else if (session && !session->closed) {
RoutedRequest *rr = new RoutedRequest;
rr->tid = ++routed_request_tid;
rr->client = req->get_source_inst();
rr->client_inst = req->get_source_inst();
rr->con = req->get_connection();
rr->con->get();
encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
rr->session = static_cast<MonSession *>(session->get());
routed_requests[rr->tid] = rr;
@ -2809,6 +2811,14 @@ void Monitor::handle_forward(MForward *m)
PaxosServiceMessage *req = m->msg;
m->msg = NULL; // so ~MForward doesn't delete it
req->set_connection(c);
/*
* note which election epoch this is; we will drop the message if
* there is a future election since our peers will resend routed
* requests in that case.
*/
req->rx_election_epoch = get_epoch();
/* Because this is a special fake connection, we need to break
the ref loop between Connection and MonSession differently
than we normally do. Here, the Message refers to the Connection
@ -2910,7 +2920,7 @@ void Monitor::handle_route(MRoute *m)
// reset payload, in case encoding is dependent on target features
if (m->msg) {
m->msg->clear_payload();
messenger->send_message(m->msg, rr->session->inst);
messenger->send_message(m->msg, rr->con);
m->msg = NULL;
}
routed_requests.erase(m->session_mon_tid);
@ -2935,6 +2945,7 @@ void Monitor::resend_routed_requests()
{
dout(10) << "resend_routed_requests" << dendl;
int mon = get_leader();
list<Context*> retry;
for (map<uint64_t, RoutedRequest*>::iterator p = routed_requests.begin();
p != routed_requests.end();
++p) {
@ -2943,12 +2954,24 @@ void Monitor::resend_routed_requests()
bufferlist::iterator q = rr->request_bl.begin();
PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, q);
dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl;
MForward *forward = new MForward(rr->tid, req, rr->session->caps);
forward->client = rr->client;
forward->set_priority(req->get_priority());
messenger->send_message(forward, monmap->get_inst(mon));
}
if (mon == rank) {
dout(10) << " requeue for self tid " << rr->tid << " " << *req << dendl;
req->set_connection(rr->con);
rr->con->get();
retry.push_back(new C_RetryMessage(this, req));
delete rr;
} else {
dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl;
MForward *forward = new MForward(rr->tid, req, rr->session->caps);
forward->client = rr->client_inst;
forward->set_priority(req->get_priority());
messenger->send_message(forward, monmap->get_inst(mon));
}
}
if (mon == rank) {
routed_requests.clear();
finish_contexts(g_ceph_context, retry);
}
}
void Monitor::remove_session(MonSession *s)

View File

@ -1309,13 +1309,16 @@ public:
// request routing
struct RoutedRequest {
uint64_t tid;
entity_inst_t client;
bufferlist request_bl;
MonSession *session;
Connection *con;
entity_inst_t client_inst;
~RoutedRequest() {
if (session)
session->put();
if (con)
con->put();
}
};
uint64_t routed_request_tid;

View File

@ -34,6 +34,16 @@ static ostream& _prefix(std::ostream *_dout, Monitor *mon, Paxos *paxos, string
bool PaxosService::dispatch(PaxosServiceMessage *m)
{
dout(10) << "dispatch " << *m << " from " << m->get_orig_source_inst() << dendl;
// make sure this message isn't forwarded from a previous election epoch
if (m->rx_election_epoch &&
m->rx_election_epoch < mon->get_epoch()) {
dout(10) << " discarding forwarded message from previous election epoch "
<< m->rx_election_epoch << " < " << mon->get_epoch() << dendl;
m->put();
return true;
}
// make sure our map is readable and up to date
if (!is_readable(m->version)) {
dout(10) << " waiting for paxos -> readable (v" << m->version << ")" << dendl;