mirror of
https://github.com/ceph/ceph
synced 2025-03-05 07:48:55 +00:00
osd: revamp complete_thru code
Use last_complete_ondisk terminology throughout.
This commit is contained in:
parent
9603162003
commit
e635d0eaba
@ -46,7 +46,7 @@ public:
|
||||
int32_t result;
|
||||
|
||||
// piggybacked osd state
|
||||
eversion_t pg_complete_thru;
|
||||
eversion_t last_complete_ondisk;
|
||||
osd_peer_stat_t peer_stat;
|
||||
|
||||
map<string,bufferptr> attrset;
|
||||
@ -67,7 +67,7 @@ public:
|
||||
}
|
||||
::decode(ack_type, p);
|
||||
::decode(result, p);
|
||||
::decode(pg_complete_thru, p);
|
||||
::decode(last_complete_ondisk, p);
|
||||
::decode(peer_stat, p);
|
||||
::decode(attrset, p);
|
||||
}
|
||||
@ -84,7 +84,7 @@ public:
|
||||
}
|
||||
::encode(ack_type, payload);
|
||||
::encode(result, payload);
|
||||
::encode(pg_complete_thru, payload);
|
||||
::encode(last_complete_ondisk, payload);
|
||||
::encode(peer_stat, payload);
|
||||
::encode(attrset, payload);
|
||||
}
|
||||
@ -101,8 +101,8 @@ public:
|
||||
|
||||
int get_result() { return result; }
|
||||
|
||||
void set_pg_complete_thru(eversion_t v) { pg_complete_thru = v; }
|
||||
eversion_t get_pg_complete_thru() { return pg_complete_thru; }
|
||||
void set_last_complete_ondisk(eversion_t v) { last_complete_ondisk = v; }
|
||||
eversion_t get_last_complete_ondisk() { return last_complete_ondisk; }
|
||||
|
||||
void set_peer_stat(const osd_peer_stat_t& stat) { peer_stat = stat; }
|
||||
const osd_peer_stat_t& get_peer_stat() { return peer_stat; }
|
||||
|
@ -974,6 +974,8 @@ void PG::clear_primary_state()
|
||||
peer_info.clear();
|
||||
peer_missing.clear();
|
||||
need_up_thru = false;
|
||||
peer_last_complete_ondisk.clear();
|
||||
min_last_complete_ondisk = eversion_t();
|
||||
|
||||
finish_sync_event = 0; // so that _finish_recvoery doesn't go off in another thread
|
||||
|
||||
@ -1024,7 +1026,7 @@ bool PG::recover_master_log(map< int, map<pg_t,Query> >& query_map)
|
||||
|
||||
// -- ok, we have all (prior_set) info. (and maybe others.)
|
||||
|
||||
dout(10) << " have prior_set info. peers_complete_thru " << peers_complete_thru << dendl;
|
||||
dout(10) << " have prior_set info. min_last_complete_ondisk " << min_last_complete_ondisk << dendl;
|
||||
|
||||
|
||||
/** CREATE THE MASTER PG::Log *********/
|
||||
@ -1035,7 +1037,7 @@ bool PG::recover_master_log(map< int, map<pg_t,Query> >& query_map)
|
||||
|
||||
oldest_update = info.last_update; // only of acting (current) osd set.
|
||||
int oldest_who = osd->whoami;
|
||||
peers_complete_thru = info.last_complete;
|
||||
min_last_complete_ondisk = info.last_complete;
|
||||
|
||||
for (map<int,Info>::iterator it = peer_info.begin();
|
||||
it != peer_info.end();
|
||||
@ -1054,8 +1056,8 @@ bool PG::recover_master_log(map< int, map<pg_t,Query> >& query_map)
|
||||
oldest_update = it->second.last_update;
|
||||
oldest_who = it->first;
|
||||
}
|
||||
if (it->second.last_complete < peers_complete_thru)
|
||||
peers_complete_thru = it->second.last_complete;
|
||||
if (it->second.last_complete < min_last_complete_ondisk)
|
||||
min_last_complete_ondisk = it->second.last_complete;
|
||||
}
|
||||
}
|
||||
if (newest_update == info.last_update) // or just me, if nobody better.
|
||||
@ -1204,7 +1206,7 @@ void PG::peer(ObjectStore::Transaction& t,
|
||||
if (!have_all_missing)
|
||||
return;
|
||||
|
||||
dout(10) << " peers_complete_thru " << peers_complete_thru << dendl;
|
||||
dout(10) << " min_last_complete_ondisk " << min_last_complete_ondisk << dendl;
|
||||
|
||||
|
||||
// -- ok. and have i located all pg contents?
|
||||
@ -1304,7 +1306,7 @@ void PG::activate(ObjectStore::Transaction& t,
|
||||
trim_past_intervals();
|
||||
|
||||
if (role == 0) { // primary state
|
||||
peers_complete_thru = eversion_t(0,0); // we don't know (yet)!
|
||||
min_last_complete_ondisk = eversion_t(0,0); // we don't know (yet)!
|
||||
}
|
||||
|
||||
assert(info.last_complete >= log.bottom || log.backlog);
|
||||
|
29
src/osd/PG.h
29
src/osd/PG.h
@ -661,10 +661,11 @@ protected:
|
||||
// primary state
|
||||
public:
|
||||
vector<int> acting;
|
||||
eversion_t last_complete_commit;
|
||||
eversion_t last_complete_ondisk; // last_complete that has committed.
|
||||
|
||||
// [primary only] content recovery state
|
||||
eversion_t peers_complete_thru;
|
||||
map<int,eversion_t> peer_last_complete_ondisk;
|
||||
eversion_t min_last_complete_ondisk; // min over last_complete_ondisk, peer_last_complete_ondisk
|
||||
bool have_master_log;
|
||||
protected:
|
||||
set<int> prior_set; // current+prior OSDs, as defined by info.history.last_epoch_started.
|
||||
@ -724,16 +725,17 @@ public:
|
||||
void clear_prior();
|
||||
bool prior_set_affected(OSDMap *map);
|
||||
|
||||
bool adjust_peers_complete_thru() {
|
||||
eversion_t t = info.last_complete;
|
||||
for (unsigned i=1; i<acting.size(); i++)
|
||||
if (peer_info[i].last_complete < t)
|
||||
t = peer_info[i].last_complete;
|
||||
if (t > peers_complete_thru) {
|
||||
peers_complete_thru = t;
|
||||
return true;
|
||||
bool calc_min_last_complete_ondisk() {
|
||||
eversion_t min = last_complete_ondisk;
|
||||
for (unsigned i=1; i<acting.size(); i++) {
|
||||
if (peer_last_complete_ondisk.count(acting[i]) == 0)
|
||||
return false; // we don't have complete info
|
||||
eversion_t a =peer_last_complete_ondisk[acting[i]];
|
||||
if (a < min)
|
||||
min = a;
|
||||
}
|
||||
return false;
|
||||
min_last_complete_ondisk = min;
|
||||
return true;
|
||||
}
|
||||
|
||||
void proc_replica_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog, Missing& omissing, int from);
|
||||
@ -994,8 +996,9 @@ inline ostream& operator<<(ostream& out, const PG& pg)
|
||||
}
|
||||
|
||||
if (pg.get_role() == 0) {
|
||||
out << " pct " << pg.peers_complete_thru;
|
||||
if (!pg.have_master_log) out << " !hml";
|
||||
out << " mlcod " << pg.min_last_complete_ondisk;
|
||||
if (!pg.have_master_log)
|
||||
out << " !hml";
|
||||
}
|
||||
|
||||
out << " " << pg_state_string(pg.get_state());
|
||||
|
@ -555,7 +555,7 @@ void ReplicatedPG::do_op(MOSDOp *op)
|
||||
// trim log?
|
||||
if (is_clean() ||
|
||||
log.top.version - log.bottom.version > info.stats.num_objects)
|
||||
ctx->trim_to = peers_complete_thru;
|
||||
ctx->trim_to = min_last_complete_ondisk;
|
||||
|
||||
log_op(ctx->log, ctx->trim_to, ctx->local_t);
|
||||
}
|
||||
@ -1490,7 +1490,7 @@ void ReplicatedPG::op_ondisk(RepGather *repop)
|
||||
dout(10) << "op_ondisk " << *repop << dendl;
|
||||
repop->waitfor_disk.erase(osd->get_nodeid());
|
||||
repop->waitfor_nvram.erase(osd->get_nodeid());
|
||||
repop->pg_complete_thru[osd->get_nodeid()] = repop->pg_local_last_complete;
|
||||
last_complete_ondisk = repop->pg_local_last_complete;
|
||||
eval_repop(repop);
|
||||
}
|
||||
}
|
||||
@ -1621,21 +1621,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
|
||||
|
||||
// done.
|
||||
if (repop->can_delete()) {
|
||||
// adjust peers_complete_thru
|
||||
if (!repop->pg_complete_thru.empty()) {
|
||||
eversion_t min = info.last_complete; // hrm....
|
||||
for (unsigned i=0; i<acting.size(); i++) {
|
||||
if (repop->pg_complete_thru[acting[i]] < min) // note: if we haven't heard, it'll be zero, which is what we want.
|
||||
min = repop->pg_complete_thru[acting[i]];
|
||||
}
|
||||
|
||||
if (min > peers_complete_thru) {
|
||||
dout(10) << " peers_complete_thru "
|
||||
<< peers_complete_thru << " -> " << min
|
||||
<< dendl;
|
||||
peers_complete_thru = min;
|
||||
}
|
||||
}
|
||||
calc_min_last_complete_ondisk();
|
||||
|
||||
dout(10) << " removing " << *repop << dendl;
|
||||
assert(!repop_queue.empty());
|
||||
@ -1715,7 +1701,7 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContext *
|
||||
|
||||
|
||||
void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type,
|
||||
int fromosd, eversion_t pg_complete_thru)
|
||||
int fromosd, eversion_t peer_lcod)
|
||||
{
|
||||
MOSDOp *op = (MOSDOp *)repop->ctx->op;
|
||||
|
||||
@ -1731,7 +1717,7 @@ void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type,
|
||||
repop->waitfor_disk.erase(fromosd);
|
||||
repop->waitfor_nvram.erase(fromosd);
|
||||
repop->waitfor_ack.erase(fromosd);
|
||||
peer_info[fromosd].last_complete;
|
||||
peer_last_complete_ondisk[fromosd] = peer_lcod;
|
||||
}
|
||||
} else if (ack_type & CEPH_OSD_FLAG_ONNVRAM) {
|
||||
// nvram
|
||||
@ -2139,8 +2125,9 @@ void ReplicatedPG::sub_op_modify_ondisk(MOSDSubOp *op, int ackerosd, eversion_t
|
||||
<< ", sending commit to osd" << ackerosd
|
||||
<< dendl;
|
||||
if (osd->osdmap->is_up(ackerosd)) {
|
||||
last_complete_ondisk = last_complete;
|
||||
MOSDSubOpReply *commit = new MOSDSubOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ONDISK);
|
||||
commit->set_pg_complete_thru(last_complete);
|
||||
commit->set_last_complete_ondisk(last_complete);
|
||||
commit->set_peer_stat(osd->get_my_stat_for(g_clock.now(), ackerosd));
|
||||
osd->messenger->send_message(commit, osd->osdmap->get_inst(ackerosd));
|
||||
}
|
||||
@ -2161,7 +2148,7 @@ void ReplicatedPG::sub_op_modify_reply(MOSDSubOpReply *r)
|
||||
repop_ack(repop_map[rep_tid],
|
||||
r->get_result(), r->ack_type,
|
||||
fromosd,
|
||||
r->get_pg_complete_thru());
|
||||
r->get_last_complete_ondisk());
|
||||
}
|
||||
|
||||
delete r;
|
||||
|
@ -287,7 +287,6 @@ public:
|
||||
utime_t start;
|
||||
|
||||
eversion_t pg_local_last_complete;
|
||||
map<int,eversion_t> pg_complete_thru;
|
||||
|
||||
RepGather(OpContext *c, ObjectContext *pi, bool noop_, tid_t rt,
|
||||
eversion_t lc) :
|
||||
@ -489,7 +488,6 @@ inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop)
|
||||
<< " wfack=" << repop.waitfor_ack
|
||||
//<< " wfnvram=" << repop.waitfor_nvram
|
||||
<< " wfdisk=" << repop.waitfor_disk;
|
||||
out << " pct=" << repop.pg_complete_thru;
|
||||
if (repop.ctx->op)
|
||||
out << " op=" << *(repop.ctx->op);
|
||||
out << ")";
|
||||
|
Loading…
Reference in New Issue
Block a user