mirror of
https://github.com/ceph/ceph
synced 2024-12-15 16:07:00 +00:00
paxos: recover using stashed latest when state histories don't overlap
If we don't have incremental states to catch up, jump to the latest.
This commit is contained in:
parent
51c5823472
commit
357aa03344
@ -91,6 +91,7 @@ void Paxos::collect(version_t oldpn)
|
||||
|
||||
MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT, machine_id);
|
||||
collect->last_committed = last_committed;
|
||||
collect->first_committed = first_committed;
|
||||
collect->pn = accepted_pn;
|
||||
mon->messenger->send_message(collect, mon->monmap->get_inst(*p));
|
||||
}
|
||||
@ -114,6 +115,7 @@ void Paxos::handle_collect(MMonPaxos *collect)
|
||||
// reply
|
||||
MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST, machine_id);
|
||||
last->last_committed = last_committed;
|
||||
last->first_committed = first_committed;
|
||||
|
||||
// do we have an accepted but uncommitted value?
|
||||
// (it'll be at last_committed+1)
|
||||
@ -144,39 +146,44 @@ void Paxos::handle_collect(MMonPaxos *collect)
|
||||
last->pn_from = accepted_pn_from;
|
||||
|
||||
// and share whatever data we have
|
||||
if (collect->last_committed < last_committed) {
|
||||
bufferlist bl;
|
||||
version_t l = get_latest(bl);
|
||||
assert(l <= last_committed);
|
||||
|
||||
version_t v = collect->last_committed;
|
||||
|
||||
// start with a stashed full copy?
|
||||
/* hmm.
|
||||
if (l > v + 10) {
|
||||
last->latest_value.claim(bl);
|
||||
last->latest_version = l;
|
||||
v = l;
|
||||
}
|
||||
*/
|
||||
|
||||
// include (remaining) incrementals
|
||||
for (v++;
|
||||
v <= last_committed;
|
||||
v++) {
|
||||
if (mon->store->exists_bl_sn(machine_name, v)) {
|
||||
mon->store->get_bl_sn(last->values[v], machine_name, v);
|
||||
dout(10) << " sharing " << v << " ("
|
||||
<< last->values[v].length() << " bytes)" << dendl;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (collect->last_committed < last_committed)
|
||||
share_state(last, collect->first_committed, collect->last_committed);
|
||||
|
||||
// send reply
|
||||
mon->messenger->send_message(last, collect->get_source_inst());
|
||||
collect->put();
|
||||
}
|
||||
|
||||
void Paxos::share_state(MMonPaxos *m, version_t peer_first_committed, version_t peer_last_committed)
|
||||
{
|
||||
assert(peer_last_committed < last_committed);
|
||||
|
||||
dout(10) << "share_state peer has fc " << peer_first_committed << " lc " << peer_last_committed << dendl;
|
||||
version_t v = peer_last_committed;
|
||||
|
||||
// start with a stashed full copy?
|
||||
if (peer_last_committed < first_committed) {
|
||||
bufferlist bl;
|
||||
version_t l = get_latest(bl);
|
||||
assert(l <= last_committed);
|
||||
dout(10) << "share_state starting with latest " << l << " (" << bl.length() << " bytes)" << dendl;
|
||||
m->latest_value.claim(bl);
|
||||
m->latest_version = l;
|
||||
v = l;
|
||||
}
|
||||
|
||||
// include (remaining) incrementals
|
||||
for (v++;
|
||||
v <= last_committed;
|
||||
v++) {
|
||||
if (mon->store->exists_bl_sn(machine_name, v)) {
|
||||
mon->store->get_bl_sn(m->values[v], machine_name, v);
|
||||
dout(10) << " sharing " << v << " ("
|
||||
<< m->values[v].length() << " bytes)" << dendl;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// leader
|
||||
void Paxos::handle_last(MMonPaxos *last)
|
||||
@ -194,13 +201,7 @@ void Paxos::handle_last(MMonPaxos *last)
|
||||
// share committed values
|
||||
dout(10) << "sending commit to " << last->get_source() << dendl;
|
||||
MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT, machine_id);
|
||||
for (version_t v = last->last_committed+1;
|
||||
v <= last_committed;
|
||||
v++) {
|
||||
mon->store->get_bl_sn(commit->values[v], machine_name, v);
|
||||
dout(10) << " sharing " << v << " ("
|
||||
<< commit->values[v].length() << " bytes)" << dendl;
|
||||
}
|
||||
share_state(commit, last->first_committed, last->last_committed);
|
||||
commit->last_committed = last_committed;
|
||||
mon->messenger->send_message(commit, last->get_source_inst());
|
||||
}
|
||||
@ -494,6 +495,22 @@ void Paxos::handle_commit(MMonPaxos *commit)
|
||||
|
||||
// commit locally.
|
||||
bool big_sync = commit->values.size() > 2;
|
||||
|
||||
// stash?
|
||||
if (commit->latest_version) {
|
||||
dout(10) << "got stash version " << commit->latest_version << ", zapping old states" << dendl;
|
||||
stash_latest(commit->latest_version, commit->latest_value);
|
||||
|
||||
while (first_committed <= last_committed) {
|
||||
dout(10) << "trim " << first_committed << dendl;
|
||||
mon->store->erase_sn(machine_name, first_committed);
|
||||
first_committed++;
|
||||
}
|
||||
last_committed = commit->latest_version;
|
||||
first_committed = last_committed;
|
||||
mon->store->put_int(first_committed, machine_name, "first_committed");
|
||||
}
|
||||
|
||||
for (map<version_t,bufferlist>::iterator p = commit->values.begin();
|
||||
p != commit->values.end();
|
||||
++p) {
|
||||
@ -533,8 +550,7 @@ void Paxos::extend_lease()
|
||||
MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE, machine_id);
|
||||
lease->last_committed = last_committed;
|
||||
lease->lease_expire = lease_expire;
|
||||
if (mon->is_full_quorum())
|
||||
lease->first_committed = first_committed;
|
||||
lease->first_committed = first_committed;
|
||||
mon->messenger->send_message(lease, mon->monmap->get_inst(*p));
|
||||
}
|
||||
|
||||
|
@ -248,6 +248,8 @@ public:
|
||||
void leader_init();
|
||||
void peon_init();
|
||||
|
||||
void share_state(MMonPaxos *m, version_t first_committed, version_t last_committed);
|
||||
|
||||
// -- service interface --
|
||||
void wait_for_active(Context *c) {
|
||||
waiting_for_active.push_back(c);
|
||||
|
Loading…
Reference in New Issue
Block a user