mirror of
https://github.com/ceph/ceph
synced 2025-03-25 11:48:05 +00:00
mon: route osdmaps sent by mon properly
This commit is contained in:
parent
d1c51b2642
commit
b67b1d92ed
3
src/TODO
3
src/TODO
@ -45,7 +45,8 @@ v0.17
|
||||
|
||||
- kill mon->osd
|
||||
- kill shutdown msg
|
||||
- route send_full/send_latest...
|
||||
- notify random osd when a map commits?
|
||||
- send map on MPGStats, sometimes?
|
||||
- kill mon->mds
|
||||
- beacon reply
|
||||
- kill shutdown msg
|
||||
|
@ -182,7 +182,8 @@ void OSDMonitor::committed()
|
||||
int r = osdmap.get_any_up_osd();
|
||||
if (r >= 0) {
|
||||
dout(10) << "committed, telling random osd" << r << " all about it" << dendl;
|
||||
send_latest(osdmap.get_inst(r), osdmap.get_epoch() - 1); // whatev, they'll request more if they need it
|
||||
//send_latest(osdmap.get_inst(r), osdmap.get_epoch() - 1); // whatev, they'll request more if they need it
|
||||
#warning hmm, fixme?
|
||||
}
|
||||
}
|
||||
|
||||
@ -317,7 +318,7 @@ bool OSDMonitor::preprocess_failure(MOSDFailure *m)
|
||||
osdmap.get_addr(from) != m->get_orig_source_inst().addr ||
|
||||
osdmap.is_down(from)) {
|
||||
dout(5) << "preprocess_failure from dead osd" << from << ", ignoring" << dendl;
|
||||
send_incremental(m->get_orig_source_inst(), m->get_epoch()+1);
|
||||
send_incremental(m, m->get_epoch()+1);
|
||||
goto didit;
|
||||
}
|
||||
}
|
||||
@ -327,21 +328,21 @@ bool OSDMonitor::preprocess_failure(MOSDFailure *m)
|
||||
if (!osdmap.have_inst(badboy)) {
|
||||
dout(5) << "preprocess_failure dne(/dup?): " << m->get_failed() << ", from " << m->get_orig_source_inst() << dendl;
|
||||
if (m->get_epoch() < osdmap.get_epoch())
|
||||
send_incremental(m->get_orig_source_inst(), m->get_epoch()+1);
|
||||
send_incremental(m, m->get_epoch()+1);
|
||||
goto didit;
|
||||
}
|
||||
if (osdmap.get_inst(badboy) != m->get_failed()) {
|
||||
dout(5) << "preprocess_failure wrong osd: report " << m->get_failed() << " != map's " << osdmap.get_inst(badboy)
|
||||
<< ", from " << m->get_orig_source_inst() << dendl;
|
||||
if (m->get_epoch() < osdmap.get_epoch())
|
||||
send_incremental(m->get_orig_source_inst(), m->get_epoch()+1);
|
||||
send_incremental(m, m->get_epoch()+1);
|
||||
goto didit;
|
||||
}
|
||||
// already reported?
|
||||
if (osdmap.is_down(badboy)) {
|
||||
dout(5) << "preprocess_failure dup: " << m->get_failed() << ", from " << m->get_orig_source_inst() << dendl;
|
||||
if (m->get_epoch() < osdmap.get_epoch())
|
||||
send_incremental(m->get_orig_source_inst(), m->get_epoch()+1);
|
||||
send_incremental(m, m->get_epoch()+1);
|
||||
goto didit;
|
||||
}
|
||||
|
||||
@ -377,8 +378,7 @@ bool OSDMonitor::prepare_failure(MOSDFailure *m)
|
||||
void OSDMonitor::_reported_failure(MOSDFailure *m)
|
||||
{
|
||||
dout(7) << "_reported_failure on " << m->get_failed() << ", telling " << m->get_orig_source_inst() << dendl;
|
||||
send_latest(m->get_orig_source_inst(), m->get_epoch());
|
||||
delete m;
|
||||
send_latest(m, m->get_epoch());
|
||||
}
|
||||
|
||||
|
||||
@ -474,13 +474,11 @@ void OSDMonitor::_booted(MOSDBoot *m, bool logit)
|
||||
{
|
||||
dout(7) << "_booted " << m->get_orig_source_inst()
|
||||
<< " w " << m->sb.weight << " from " << m->sb.current_epoch << dendl;
|
||||
send_latest(m->get_orig_source_inst(), m->sb.current_epoch+1);
|
||||
|
||||
stringstream ss;
|
||||
ss << m->get_orig_source_inst() << " boot";
|
||||
mon->get_logclient()->log(LOG_INFO, ss);
|
||||
|
||||
delete m;
|
||||
send_latest(m, m->sb.current_epoch+1);
|
||||
}
|
||||
|
||||
|
||||
@ -520,13 +518,12 @@ bool OSDMonitor::prepare_alive(MOSDAlive *m)
|
||||
return true;
|
||||
}
|
||||
|
||||
void OSDMonitor::_reply_map(Message *m, epoch_t e)
|
||||
void OSDMonitor::_reply_map(PaxosServiceMessage *m, epoch_t e)
|
||||
{
|
||||
dout(7) << "_reply_map " << e
|
||||
<< " from " << m->get_orig_source_inst()
|
||||
<< dendl;
|
||||
send_latest(m->get_orig_source_inst(), e);
|
||||
delete m;
|
||||
send_latest(m, e);
|
||||
}
|
||||
|
||||
// -------------
|
||||
@ -632,51 +629,59 @@ void OSDMonitor::send_to_waiting()
|
||||
{
|
||||
dout(10) << "send_to_waiting " << osdmap.get_epoch() << dendl;
|
||||
|
||||
map<entity_inst_t,epoch_t>::iterator i = waiting_for_map.begin();
|
||||
while (i != waiting_for_map.end()) {
|
||||
if (i->second) {
|
||||
if (i->second <= osdmap.get_epoch())
|
||||
send_incremental(i->first, i->second);
|
||||
else {
|
||||
dout(10) << "send_to_waiting skipping " << i->first
|
||||
<< " wants " << i->second
|
||||
<< dendl;
|
||||
i++;
|
||||
map<epoch_t, list<PaxosServiceMessage*> >::iterator p = waiting_for_map.begin();
|
||||
while (p != waiting_for_map.end()) {
|
||||
epoch_t from = p->first;
|
||||
|
||||
if (from) {
|
||||
if (from <= osdmap.get_epoch()) {
|
||||
while (!p->second.empty()) {
|
||||
send_incremental(p->second.front(), from);
|
||||
p->second.pop_front();
|
||||
}
|
||||
} else {
|
||||
dout(10) << "send_to_waiting from " << from << dendl;
|
||||
p++;
|
||||
continue;
|
||||
}
|
||||
} else
|
||||
send_full(i->first);
|
||||
} else {
|
||||
while (!p->second.empty()) {
|
||||
send_full(p->second.front());
|
||||
p->second.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
waiting_for_map.erase(i++);
|
||||
waiting_for_map.erase(p++);
|
||||
}
|
||||
}
|
||||
|
||||
void OSDMonitor::send_latest(entity_inst_t who, epoch_t start)
|
||||
void OSDMonitor::send_latest(PaxosServiceMessage *m, epoch_t start)
|
||||
{
|
||||
if (paxos->is_readable()) {
|
||||
dout(5) << "send_latest to " << who << " start " << start << " now" << dendl;
|
||||
dout(5) << "send_latest to " << m->get_orig_source_inst()
|
||||
<< " start " << start << dendl;
|
||||
if (start == 0)
|
||||
send_full(who);
|
||||
send_full(m);
|
||||
else
|
||||
send_incremental(who, start);
|
||||
send_incremental(m, start);
|
||||
} else {
|
||||
dout(5) << "send_latest to " << who << " start " << start << " later" << dendl;
|
||||
waiting_for_map[who] = start;
|
||||
dout(5) << "send_latest to " << m->get_orig_source_inst()
|
||||
<< " start " << start << " later" << dendl;
|
||||
waiting_for_map[start].push_back(m);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void OSDMonitor::send_full(entity_inst_t who)
|
||||
void OSDMonitor::send_full(PaxosServiceMessage *m)
|
||||
{
|
||||
dout(5) << "send_full to " << who << dendl;
|
||||
mon->messenger->send_message(new MOSDMap(mon->monmap->fsid, &osdmap), who);
|
||||
dout(5) << "send_full to " << m->get_orig_source_inst() << dendl;
|
||||
mon->send_reply(m, new MOSDMap(mon->monmap->fsid, &osdmap));
|
||||
delete m;
|
||||
}
|
||||
|
||||
void OSDMonitor::send_incremental(entity_inst_t dest, epoch_t from)
|
||||
MOSDMap *OSDMonitor::build_incremental(epoch_t from)
|
||||
{
|
||||
dout(5) << "send_incremental from " << from << " -> " << osdmap.get_epoch()
|
||||
<< " to " << dest << dendl;
|
||||
|
||||
dout(10) << "build_incremental from " << from << " to " << osdmap.get_epoch() << dendl;
|
||||
MOSDMap *m = new MOSDMap(mon->monmap->fsid);
|
||||
|
||||
for (epoch_t e = osdmap.get_epoch();
|
||||
@ -695,8 +700,16 @@ void OSDMonitor::send_incremental(entity_inst_t dest, epoch_t from)
|
||||
assert(0); // we should have all maps.
|
||||
}
|
||||
}
|
||||
|
||||
mon->messenger->send_message(m, dest);
|
||||
return m;
|
||||
}
|
||||
|
||||
void OSDMonitor::send_incremental(PaxosServiceMessage *req, epoch_t from)
|
||||
{
|
||||
dout(5) << "send_incremental from " << from << " -> " << osdmap.get_epoch()
|
||||
<< " to " << req->get_orig_source_inst() << dendl;
|
||||
MOSDMap *m = build_incremental(from);
|
||||
mon->send_reply(req, m);
|
||||
delete req;
|
||||
}
|
||||
|
||||
|
||||
@ -721,7 +734,12 @@ void OSDMonitor::check_subs()
|
||||
void OSDMonitor::check_sub(Subscription *sub)
|
||||
{
|
||||
if (sub->last < osdmap.get_epoch()) {
|
||||
send_latest(sub->session->inst, sub->last);
|
||||
if (sub->last)
|
||||
mon->messenger->send_message(build_incremental(sub->last),
|
||||
sub->session->inst);
|
||||
else
|
||||
mon->messenger->send_message(new MOSDMap(mon->monmap->fsid, &osdmap),
|
||||
sub->session->inst);
|
||||
if (sub->onetime)
|
||||
mon->session_map.remove_sub(sub);
|
||||
else
|
||||
@ -1011,10 +1029,6 @@ bool OSDMonitor::prepare_command(MMonCommand *m)
|
||||
ss << "marked down osd" << osd;
|
||||
getline(ss, rs);
|
||||
paxos->wait_for_commit(new Monitor::C_Command(mon, m, 0, rs, paxos->get_version()));
|
||||
|
||||
// send them the new map when it updates, so they know it
|
||||
waiting_for_map[osdmap.get_inst(osd)] = osdmap.get_epoch();
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -34,13 +34,14 @@ class Monitor;
|
||||
class MOSDBoot;
|
||||
class MMonCommand;
|
||||
class MPoolSnap;
|
||||
class MOSDMap;
|
||||
|
||||
class OSDMonitor : public PaxosService {
|
||||
public:
|
||||
OSDMap osdmap;
|
||||
|
||||
private:
|
||||
map<entity_inst_t, epoch_t> waiting_for_map; // who -> start epoch
|
||||
map<epoch_t, list<PaxosServiceMessage*> > waiting_for_map;
|
||||
|
||||
// [leader]
|
||||
OSDMap::Incremental pending_inc;
|
||||
@ -65,8 +66,9 @@ private:
|
||||
|
||||
// ...
|
||||
void send_to_waiting(); // send current map to waiters.
|
||||
void send_full(entity_inst_t dest);
|
||||
void send_incremental(entity_inst_t dest, epoch_t since);
|
||||
void send_full(PaxosServiceMessage *m);
|
||||
MOSDMap *build_incremental(epoch_t from);
|
||||
void send_incremental(PaxosServiceMessage *m, epoch_t since);
|
||||
|
||||
bool preprocess_failure(class MOSDFailure *m);
|
||||
bool prepare_failure(class MOSDFailure *m);
|
||||
@ -78,7 +80,7 @@ private:
|
||||
|
||||
bool preprocess_alive(class MOSDAlive *m);
|
||||
bool prepare_alive(class MOSDAlive *m);
|
||||
void _reply_map(Message *m, epoch_t e);
|
||||
void _reply_map(PaxosServiceMessage *m, epoch_t e);
|
||||
|
||||
bool preprocess_pgtemp(class MOSDPGTemp *m);
|
||||
bool prepare_pgtemp(class MOSDPGTemp *m);
|
||||
@ -104,9 +106,9 @@ private:
|
||||
|
||||
struct C_ReplyMap : public Context {
|
||||
OSDMonitor *osdmon;
|
||||
Message *m;
|
||||
PaxosServiceMessage *m;
|
||||
epoch_t e;
|
||||
C_ReplyMap(OSDMonitor *o, Message *mm, epoch_t ee) : osdmon(o), m(mm), e(ee) {}
|
||||
C_ReplyMap(OSDMonitor *o, PaxosServiceMessage *mm, epoch_t ee) : osdmon(o), m(mm), e(ee) {}
|
||||
void finish(int r) {
|
||||
osdmon->_reply_map(m, e);
|
||||
}
|
||||
@ -154,7 +156,7 @@ private:
|
||||
|
||||
void mark_all_down();
|
||||
|
||||
void send_latest(entity_inst_t i, epoch_t start=0);
|
||||
void send_latest(PaxosServiceMessage *m, epoch_t start=0);
|
||||
|
||||
void blacklist(entity_addr_t a, utime_t until);
|
||||
|
||||
|
@ -267,11 +267,13 @@ bool PGMonitor::preprocess_pg_stats(MPGStats *stats)
|
||||
{
|
||||
int from = stats->get_orig_source().num();
|
||||
|
||||
/*
|
||||
// first, just see if they need a new osdmap. but
|
||||
// only if they've had the map for a while.
|
||||
if (stats->had_map_for > 30.0 &&
|
||||
stats->epoch < mon->osdmon()->osdmap.get_epoch())
|
||||
mon->osdmon()->send_latest(stats->get_orig_source_inst(), stats->epoch+1);
|
||||
*/
|
||||
|
||||
// any new osd or pg info?
|
||||
if (pg_map.osd_stat.count(from) ||
|
||||
|
Loading…
Reference in New Issue
Block a user