mirror of
https://github.com/ceph/ceph
synced 2025-01-20 18:21:57 +00:00
Merge pull request #246 from ceph/wip-4793
#4793 Reviewed-by: Sage Weil <sage@inktank.com>
This commit is contained in:
commit
42ab1f4561
@ -21,7 +21,7 @@
|
||||
|
||||
class MMonElection : public Message {
|
||||
|
||||
static const int HEAD_VERSION = 3;
|
||||
static const int HEAD_VERSION = 4;
|
||||
static const int COMPAT_VERSION = 2;
|
||||
|
||||
public:
|
||||
@ -45,11 +45,20 @@ public:
|
||||
bufferlist monmap_bl;
|
||||
set<int> quorum;
|
||||
uint64_t quorum_features;
|
||||
version_t paxos_first_version;
|
||||
version_t paxos_last_version;
|
||||
|
||||
MMonElection() : Message(MSG_MON_ELECTION, HEAD_VERSION, COMPAT_VERSION) { }
|
||||
MMonElection(int o, epoch_t e, MonMap *m)
|
||||
MMonElection() : Message(MSG_MON_ELECTION, HEAD_VERSION, COMPAT_VERSION),
|
||||
op(0), epoch(0), quorum_features(0), paxos_first_version(0),
|
||||
paxos_last_version(0)
|
||||
{ }
|
||||
|
||||
MMonElection(int o, epoch_t e, MonMap *m,
|
||||
version_t paxos_first, version_t paxos_last)
|
||||
: Message(MSG_MON_ELECTION, HEAD_VERSION, COMPAT_VERSION),
|
||||
fsid(m->fsid), op(o), epoch(e), quorum_features(0) {
|
||||
fsid(m->fsid), op(o), epoch(e), quorum_features(0),
|
||||
paxos_first_version(paxos_first), paxos_last_version(paxos_last)
|
||||
{
|
||||
// encode using full feature set; we will reencode for dest later,
|
||||
// if necessary
|
||||
m->encode(monmap_bl, CEPH_FEATURES_ALL);
|
||||
@ -78,6 +87,8 @@ public:
|
||||
::encode(monmap_bl, payload);
|
||||
::encode(quorum, payload);
|
||||
::encode(quorum_features, payload);
|
||||
::encode(paxos_first_version, payload);
|
||||
::encode(paxos_last_version, payload);
|
||||
}
|
||||
void decode_payload() {
|
||||
bufferlist::iterator p = payload.begin();
|
||||
@ -93,6 +104,10 @@ public:
|
||||
::decode(quorum_features, p);
|
||||
else
|
||||
quorum_features = 0;
|
||||
if (header.version >= 4) {
|
||||
::decode(paxos_first_version, p);
|
||||
::decode(paxos_last_version, p);
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
|
@ -80,18 +80,21 @@ void Elector::start()
|
||||
electing_me = true;
|
||||
acked_me[mon->rank] = CEPH_FEATURES_ALL;
|
||||
leader_acked = -1;
|
||||
acked_first_paxos_version = mon->paxos->get_first_committed();
|
||||
|
||||
// bcast to everyone else
|
||||
for (unsigned i=0; i<mon->monmap->size(); ++i) {
|
||||
if ((int)i == mon->rank) continue;
|
||||
Message *m = new MMonElection(MMonElection::OP_PROPOSE, epoch, mon->monmap);
|
||||
Message *m = new MMonElection(MMonElection::OP_PROPOSE, epoch, mon->monmap,
|
||||
mon->paxos->get_first_committed(),
|
||||
mon->paxos->get_version());
|
||||
mon->messenger->send_message(m, mon->monmap->get_inst(i));
|
||||
}
|
||||
|
||||
reset_timer();
|
||||
}
|
||||
|
||||
void Elector::defer(int who)
|
||||
void Elector::defer(int who, version_t paxos_first)
|
||||
{
|
||||
dout(5) << "defer to " << who << dendl;
|
||||
|
||||
@ -103,8 +106,11 @@ void Elector::defer(int who)
|
||||
|
||||
// ack them
|
||||
leader_acked = who;
|
||||
acked_first_paxos_version = paxos_first;
|
||||
ack_stamp = ceph_clock_now(g_ceph_context);
|
||||
mon->messenger->send_message(new MMonElection(MMonElection::OP_ACK, epoch, mon->monmap),
|
||||
mon->messenger->send_message(new MMonElection(MMonElection::OP_ACK, epoch, mon->monmap,
|
||||
mon->paxos->get_first_committed(),
|
||||
mon->paxos->get_version()),
|
||||
mon->monmap->get_inst(who));
|
||||
|
||||
// set a timer
|
||||
@ -168,7 +174,10 @@ void Elector::victory()
|
||||
p != quorum.end();
|
||||
++p) {
|
||||
if (*p == mon->rank) continue;
|
||||
MMonElection *m = new MMonElection(MMonElection::OP_VICTORY, epoch, mon->monmap);
|
||||
MMonElection *m = new MMonElection(MMonElection::OP_VICTORY, epoch,
|
||||
mon->monmap,
|
||||
mon->paxos->get_first_committed(),
|
||||
mon->paxos->get_version());
|
||||
m->quorum = quorum;
|
||||
mon->messenger->send_message(m, mon->monmap->get_inst(*p));
|
||||
}
|
||||
@ -204,10 +213,13 @@ void Elector::handle_propose(MMonElection *m)
|
||||
}
|
||||
}
|
||||
|
||||
if (mon->rank < from) {
|
||||
if ((mon->rank < from) &&
|
||||
// be careful that we have new enough data to be leader!
|
||||
(m->paxos_first_version <= mon->paxos->get_version())) {
|
||||
// i would win over them.
|
||||
if (leader_acked >= 0) { // we already acked someone
|
||||
assert(leader_acked < from); // and they still win, of course
|
||||
assert((leader_acked < from) || // and they still win, of course
|
||||
(acked_first_paxos_version > mon->paxos->get_version()));
|
||||
dout(5) << "no, we already acked " << leader_acked << dendl;
|
||||
} else {
|
||||
// wait, i should win!
|
||||
@ -216,16 +228,20 @@ void Elector::handle_propose(MMonElection *m)
|
||||
mon->start_election();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
} else if (m->paxos_last_version >= mon->paxos->get_first_committed()) {
|
||||
// they would win over me
|
||||
if (leader_acked < 0 || // haven't acked anyone yet, or
|
||||
leader_acked > from || // they would win over who you did ack, or
|
||||
leader_acked == from) { // this is the guy we're already deferring to
|
||||
defer(from);
|
||||
leader_acked == from) { // this is the guy we're already deferring to
|
||||
defer(from, m->paxos_first_version);
|
||||
} else {
|
||||
// ignore them!
|
||||
dout(5) << "no, we already acked " << leader_acked << dendl;
|
||||
}
|
||||
} else { // they are too out-of-date
|
||||
dout(5) << "no, they are too far behind; paxos version: "
|
||||
<< m->paxos_last_version << " versus my first "
|
||||
<< mon->paxos->get_first_committed() << dendl;
|
||||
}
|
||||
|
||||
m->put();
|
||||
|
@ -125,6 +125,10 @@ class Elector {
|
||||
* Indicates who we have acked
|
||||
*/
|
||||
int leader_acked;
|
||||
/**
|
||||
* Indicates the first_paxos_commit on who we've acked
|
||||
*/
|
||||
version_t acked_first_paxos_version;
|
||||
/**
|
||||
* Indicates when we have acked him
|
||||
*/
|
||||
@ -197,16 +201,17 @@ class Elector {
|
||||
* to become the Leader. We will only defer an election if the monitor we
|
||||
* are deferring to outranks us.
|
||||
*
|
||||
* @pre @p who outranks us (i.e., who < our rank)
|
||||
* @pre @p who outranks us (who < our rank, or we're behind their store)
|
||||
* @pre @p who outranks any other monitor we have deferred to in the past
|
||||
* @post electing_me is false
|
||||
* @post leader_acked equals @p who
|
||||
* @post we sent an ack message to @p who
|
||||
* @post we reset the expire_event timer
|
||||
*
|
||||
* @param who Some other monitor's numeric identifier.
|
||||
* @param who Some other monitor's numeric identifier.
|
||||
* @param paxos_first The other monitor's first committed paxos version
|
||||
*/
|
||||
void defer(int who);
|
||||
void defer(int who, version_t paxos_first);
|
||||
/**
|
||||
* The election has taken too long and has expired.
|
||||
*
|
||||
@ -326,7 +331,8 @@ class Elector {
|
||||
epoch(0),
|
||||
participating(true),
|
||||
electing_me(false),
|
||||
leader_acked(-1) { }
|
||||
leader_acked(-1),
|
||||
acked_first_paxos_version(0) { }
|
||||
|
||||
/**
|
||||
* Initiate the Elector class.
|
||||
|
@ -1902,7 +1902,9 @@ void Monitor::handle_probe_reply(MMonProbe *m)
|
||||
if (m->quorum.size()) {
|
||||
dout(10) << " existing quorum " << m->quorum << dendl;
|
||||
|
||||
if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
|
||||
if ((paxos->get_version() + g_conf->paxos_max_join_drift <
|
||||
m->paxos_last_version) ||
|
||||
(paxos->get_version() < m->paxos_first_version)){
|
||||
dout(10) << " peer paxos version " << m->paxos_last_version
|
||||
<< " vs my version " << paxos->get_version()
|
||||
<< " (too far ahead)"
|
||||
|
Loading…
Reference in New Issue
Block a user