osdmap: allow incremental to represent osd deletion

Convert new_down to new_state, with values xored onto the old state.  We
preserve compatibility with old incrementals because they were (virtually)
always 0, and we can special case that to mean toggle CEPH_OSD_UP.  We
don't really care if clients get new values right.. if they don't clear
the EXISTS flag that doesn't really hurt them.  It's only important that
the monitor get it right.

To ensure that, we rev the monitor internal protocol.

Signed-off-by: Sage Weil <sage@newdream.net>
This commit is contained in:
Sage Weil 2011-05-03 12:34:54 -07:00
parent 890dc2aed2
commit f680eca790
3 changed files with 27 additions and 20 deletions

View File

@ -23,7 +23,7 @@
*/
#define CEPH_OSD_PROTOCOL 8 /* cluster internal */
#define CEPH_MDS_PROTOCOL 13 /* cluster internal */
#define CEPH_MON_PROTOCOL 6 /* cluster internal */
#define CEPH_MON_PROTOCOL 7 /* cluster internal */
#define CEPH_OSDC_PROTOCOL 24 /* server/client */
#define CEPH_MDSC_PROTOCOL 32 /* server/client */
#define CEPH_MONC_PROTOCOL 15 /* server/client */

View File

@ -243,10 +243,14 @@ void OSDMonitor::encode_pending(bufferlist &bl)
pending_inc.modified = g_clock.now();
// tell me about it
for (map<int32_t,uint8_t>::iterator i = pending_inc.new_down.begin();
i != pending_inc.new_down.end();
for (map<int32_t,uint8_t>::iterator i = pending_inc.new_state.begin();
i != pending_inc.new_state.end();
i++) {
dout(2) << " osd" << i->first << " DOWN clean=" << (int)i->second << dendl;
int s = i->second ? i->second : CEPH_OSD_UP;
if (s & CEPH_OSD_UP)
dout(2) << " osd" << i->first << " DOWN" << dendl;
if (s & CEPH_OSD_EXISTS)
dout(2) << " osd" << i->first << " DNE" << dendl;
}
for (map<int32_t,entity_addr_t>::iterator i = pending_inc.new_up_client.begin();
i != pending_inc.new_up_client.end();
@ -489,7 +493,7 @@ bool OSDMonitor::prepare_failure(MOSDFailure *m)
(reports >= g_conf.osd_min_down_reports)) {
dout(1) << "have enough reports/reporters to mark osd" << target_osd
<< " as down" << dendl;
pending_inc.new_down[target_osd] = false;
pending_inc.new_state[target_osd] = CEPH_OSD_UP;
paxos->wait_for_commit(new C_Reported(this, m));
//clear out failure reports
failed_notes.erase(failed_notes.lower_bound(target_osd),
@ -584,9 +588,10 @@ bool OSDMonitor::prepare_boot(MOSDBoot *m)
dout(7) << "prepare_boot was up, first marking down " << osdmap.get_inst(from) << dendl;
assert(osdmap.get_inst(from) != m->get_orig_source_inst()); // preproces should have caught it
if (pending_inc.new_down.count(from) == 0) {
if (pending_inc.new_state.count(from) == 0 ||
(pending_inc.new_state[from] & CEPH_OSD_UP) == 0) {
// mark previous guy down
pending_inc.new_down[from] = false;
pending_inc.new_state[from] = CEPH_OSD_UP;
}
paxos->wait_for_commit(new C_RetryMessage(this, m));
} else if (pending_inc.new_up_client.count(from)) { //FIXME: should this be using new_up_client?
@ -1082,7 +1087,7 @@ void OSDMonitor::handle_osd_timeouts(const utime_t &now,
if (t == last_osd_report.end()) {
derr << "OSDMonitor::handle_osd_timeouts: never got MOSDPGStat "
<< "info from osd " << i << ". Marking down!" << dendl;
pending_inc.new_down[i] = true;
pending_inc.new_state[i] = CEPH_OSD_UP;
new_down = true;
}
else {
@ -1092,7 +1097,7 @@ void OSDMonitor::handle_osd_timeouts(const utime_t &now,
derr << "OSDMonitor::handle_osd_timeouts: last got MOSDPGStat "
<< "info from osd " << i << " at " << t->second << ". It has "
<< "been " << diff << ", so we're marking it down!" << dendl;
pending_inc.new_down[i] = true;
pending_inc.new_state[i] = CEPH_OSD_UP;
new_down = true;
}
}
@ -1114,7 +1119,7 @@ void OSDMonitor::mark_all_down()
it != ls.end();
it++) {
if (osdmap.is_down(*it)) continue;
pending_inc.new_down[*it] = true; // FIXME: am i sure it's clean? we need a proper osd shutdown sequence!
pending_inc.new_state[*it] = CEPH_OSD_UP;
}
propose_pending();
@ -1430,7 +1435,7 @@ bool OSDMonitor::prepare_command(MMonCommand *m)
} else if (osdmap.is_down(osd)) {
ss << "osd" << osd << " is already down";
} else {
pending_inc.new_down[osd] = false;
pending_inc.new_state[osd] = CEPH_OSD_UP;
ss << "marked down osd" << osd;
getline(ss, rs);
paxos->wait_for_commit(new Monitor::C_Command(mon, m, 0, rs, paxos->get_version()));

View File

@ -155,7 +155,7 @@ public:
set<int32_t> old_pools;
map<int32_t,entity_addr_t> new_up_client;
map<int32_t,entity_addr_t> new_up_internal;
map<int32_t,uint8_t> new_down;
map<int32_t,uint8_t> new_state; // XORed onto previous state.
map<int32_t,uint32_t> new_weight;
map<pg_t,vector<int32_t> > new_pg_temp; // [] to remove
map<int32_t,epoch_t> new_up_thru;
@ -183,7 +183,7 @@ public:
::encode(new_pool_names, bl);
::encode(old_pools, bl);
::encode(new_up_client, bl);
::encode(new_down, bl);
::encode(new_state, bl);
::encode(new_weight, bl);
::encode(new_pg_temp, bl);
@ -217,7 +217,7 @@ public:
::decode(new_pool_names, p);
::decode(old_pools, p);
::decode(new_up_client, p);
::decode(new_down, p);
::decode(new_state, p);
::decode(new_weight, p);
::decode(new_pg_temp, p);
@ -570,13 +570,15 @@ private:
set_weight(i->first, i->second);
// up/down
for (map<int32_t,uint8_t>::iterator i = inc.new_down.begin();
i != inc.new_down.end();
for (map<int32_t,uint8_t>::iterator i = inc.new_state.begin();
i != inc.new_state.end();
i++) {
assert(osd_state[i->first] & CEPH_OSD_UP);
osd_state[i->first] &= ~CEPH_OSD_UP;
osd_info[i->first].down_at = epoch;
//cout << "epoch " << epoch << " down osd" << i->first << endl;
int s = i->second ? i->second : CEPH_OSD_UP;
if ((osd_state[i->first] & CEPH_OSD_UP) &&
(s & CEPH_OSD_UP)) {
osd_info[i->first].down_at = epoch;
}
osd_state[i->first] ^= s;
}
for (map<int32_t,entity_addr_t>::iterator i = inc.new_up_client.begin();
i != inc.new_up_client.end();