mirror of
https://github.com/ceph/ceph
synced 2025-03-11 02:39:05 +00:00
ReplicatedPG: pass a PushOp into handle_pull_response
This is the first step toward packaging multiple pushes/pulls into a single message. Signed-off-by: Samuel Just <sam.just@inktank.com>
This commit is contained in:
parent
82cb922e89
commit
a4984328be
@ -5507,37 +5507,38 @@ ObjectRecoveryInfo ReplicatedPG::recalc_subsets(const ObjectRecoveryInfo& recove
|
||||
return new_info;
|
||||
}
|
||||
|
||||
void ReplicatedPG::handle_pull_response(OpRequestRef op)
|
||||
bool ReplicatedPG::handle_pull_response(
|
||||
int from, PushOp &pop, PullOp *response,
|
||||
ObjectStore::Transaction *t)
|
||||
{
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request);
|
||||
interval_set<uint64_t> data_included = pop.data_included;
|
||||
bufferlist data;
|
||||
m->claim_data(data);
|
||||
interval_set<uint64_t> data_included = m->data_included;
|
||||
data.claim(pop.data);
|
||||
dout(10) << "handle_pull_response "
|
||||
<< m->recovery_info
|
||||
<< m->recovery_progress
|
||||
<< pop.recovery_info
|
||||
<< pop.after_progress
|
||||
<< " data.size() is " << data.length()
|
||||
<< " data_included: " << data_included
|
||||
<< dendl;
|
||||
if (m->version == eversion_t()) {
|
||||
if (pop.version == eversion_t()) {
|
||||
// replica doesn't have it!
|
||||
_failed_push(op);
|
||||
return;
|
||||
_failed_push(from, pop.soid);
|
||||
return false;
|
||||
}
|
||||
|
||||
hobject_t &hoid = m->recovery_info.soid;
|
||||
hobject_t &hoid = pop.soid;
|
||||
assert((data_included.empty() && data.length() == 0) ||
|
||||
(!data_included.empty() && data.length() > 0));
|
||||
|
||||
if (!pulling.count(hoid)) {
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
PullInfo &pi = pulling[hoid];
|
||||
if (pi.recovery_info.size == (uint64_t(-1))) {
|
||||
pi.recovery_info.size = m->recovery_info.size;
|
||||
pi.recovery_info.size = pop.recovery_info.size;
|
||||
pi.recovery_info.copy_subset.intersection_of(
|
||||
m->recovery_info.copy_subset);
|
||||
pop.recovery_info.copy_subset);
|
||||
}
|
||||
|
||||
pi.recovery_info = recalc_subsets(pi.recovery_info);
|
||||
@ -5555,7 +5556,7 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op)
|
||||
info.stats.stats.sum.num_bytes_recovered += data.length();
|
||||
|
||||
bool first = pi.recovery_progress.first;
|
||||
pi.recovery_progress = m->recovery_progress;
|
||||
pi.recovery_progress = pop.after_progress;
|
||||
|
||||
dout(10) << "new recovery_info " << pi.recovery_info
|
||||
<< ", new progress " << pi.recovery_progress
|
||||
@ -5563,15 +5564,15 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op)
|
||||
|
||||
if (first) {
|
||||
bufferlist oibl;
|
||||
if (m->attrset.count(OI_ATTR)) {
|
||||
oibl.push_back(m->attrset[OI_ATTR]);
|
||||
if (pop.attrset.count(OI_ATTR)) {
|
||||
oibl.push_back(pop.attrset[OI_ATTR]);
|
||||
::decode(pi.recovery_info.oi, oibl);
|
||||
} else {
|
||||
assert(0);
|
||||
}
|
||||
bufferlist ssbl;
|
||||
if (m->attrset.count(SS_ATTR)) {
|
||||
ssbl.push_back(m->attrset[SS_ATTR]);
|
||||
if (pop.attrset.count(SS_ATTR)) {
|
||||
ssbl.push_back(pop.attrset[SS_ATTR]);
|
||||
::decode(pi.recovery_info.ss, ssbl);
|
||||
} else {
|
||||
assert(pi.recovery_info.soid.snap != CEPH_NOSNAP &&
|
||||
@ -5581,19 +5582,15 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op)
|
||||
|
||||
bool complete = pi.is_complete();
|
||||
|
||||
ObjectStore::Transaction *t = new ObjectStore::Transaction;
|
||||
Context *onreadable = 0;
|
||||
Context *onreadable_sync = 0;
|
||||
Context *oncomplete = 0;
|
||||
submit_push_data(pi.recovery_info, first,
|
||||
complete,
|
||||
data_included, data,
|
||||
m->omap_header,
|
||||
m->attrset,
|
||||
m->omap_entries,
|
||||
pop.omap_header,
|
||||
pop.attrset,
|
||||
pop.omap_entries,
|
||||
t);
|
||||
|
||||
info.stats.stats.sum.num_keys_recovered += m->omap_entries.size();
|
||||
info.stats.stats.sum.num_keys_recovered += pop.omap_entries.size();
|
||||
|
||||
if (complete) {
|
||||
info.stats.stats.sum.num_objects_recovered++;
|
||||
@ -5614,30 +5611,21 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op)
|
||||
// keep track of active pushes for scrub
|
||||
++active_pushes;
|
||||
|
||||
onreadable = new C_OSD_AppliedRecoveredObject(this, t, obc);
|
||||
onreadable_sync = new C_OSD_OndiskWriteUnlock(obc);
|
||||
oncomplete = new C_OSD_CompletedPull(this, hoid, get_osdmap()->get_epoch());
|
||||
} else {
|
||||
onreadable = new ObjectStore::C_DeleteTransaction(t);
|
||||
t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
|
||||
t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
|
||||
t->register_on_complete(
|
||||
new C_OSD_CompletedPull(this, hoid, get_osdmap()->get_epoch()));
|
||||
}
|
||||
|
||||
int r = osd->store->
|
||||
queue_transaction(
|
||||
osr.get(), t,
|
||||
onreadable,
|
||||
new C_OSD_CommittedPushedObject(
|
||||
this, op,
|
||||
get_osdmap()->get_epoch(),
|
||||
info.last_complete),
|
||||
onreadable_sync,
|
||||
oncomplete,
|
||||
TrackedOpRef()
|
||||
);
|
||||
assert(r == 0);
|
||||
t->register_on_commit(
|
||||
new C_OSD_CommittedPushedObject(
|
||||
this,
|
||||
get_osdmap()->get_epoch(),
|
||||
info.last_complete));
|
||||
|
||||
if (complete) {
|
||||
pulling.erase(hoid);
|
||||
pull_from_peer[m->get_source().num()].erase(hoid);
|
||||
pull_from_peer[from].erase(hoid);
|
||||
publish_stats_to_osd();
|
||||
if (waiting_for_missing_object.count(hoid)) {
|
||||
dout(20) << " kicking waiters on " << hoid << dendl;
|
||||
@ -5648,11 +5636,12 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op)
|
||||
waiting_for_all_missing.clear();
|
||||
}
|
||||
}
|
||||
return false;
|
||||
} else {
|
||||
send_pull(pi.priority,
|
||||
m->get_source().num(),
|
||||
pi.recovery_info,
|
||||
pi.recovery_progress);
|
||||
response->soid = pop.soid;
|
||||
response->recovery_info = pi.recovery_info;
|
||||
response->recovery_progress = pi.recovery_progress;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -6045,7 +6034,7 @@ void ReplicatedPG::_committed_pushed_object(
|
||||
unlock();
|
||||
}
|
||||
|
||||
void ReplicatedPG::_applied_recovered_object(ObjectStore::Transaction *t, ObjectContext *obc)
|
||||
void ReplicatedPG::_applied_recovered_object(ObjectContext *obc)
|
||||
{
|
||||
lock();
|
||||
dout(10) << "_applied_recovered_object " << *obc << dendl;
|
||||
@ -6061,7 +6050,6 @@ void ReplicatedPG::_applied_recovered_object(ObjectStore::Transaction *t, Object
|
||||
}
|
||||
|
||||
unlock();
|
||||
delete t;
|
||||
}
|
||||
|
||||
void ReplicatedPG::_applied_recovered_object_replica(ObjectStore::Transaction *t)
|
||||
@ -6155,21 +6143,42 @@ void ReplicatedPG::trim_pushed_data(
|
||||
void ReplicatedPG::sub_op_push(OpRequestRef op)
|
||||
{
|
||||
op->mark_started();
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request);
|
||||
|
||||
if (is_primary()) {
|
||||
handle_pull_response(op);
|
||||
PushOp pop;
|
||||
pop.soid = m->recovery_info.soid;
|
||||
pop.version = m->version;
|
||||
m->claim_data(pop.data);
|
||||
pop.data_included.swap(m->data_included);
|
||||
pop.omap_header.swap(m->omap_header);
|
||||
pop.omap_entries.swap(m->omap_entries);
|
||||
pop.attrset.swap(m->attrset);
|
||||
pop.recovery_info = m->recovery_info;
|
||||
pop.before_progress = m->current_progress;
|
||||
pop.after_progress = m->recovery_progress;
|
||||
|
||||
PullOp resp;
|
||||
ObjectStore::Transaction *t = new ObjectStore::Transaction;
|
||||
t->register_on_applied(new ObjectStore::C_DeleteTransaction(t));
|
||||
bool more = handle_pull_response(m->get_source().num(), pop, &resp, t);
|
||||
t->register_on_commit(new C_OnPushCommit(this, op));
|
||||
osd->store->queue_transaction(osr.get(), t);
|
||||
if (more) {
|
||||
send_pull(
|
||||
m->get_source().num(),
|
||||
m->get_priority(),
|
||||
resp.recovery_info,
|
||||
resp.recovery_progress);
|
||||
}
|
||||
} else {
|
||||
handle_push(op);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
void ReplicatedPG::_failed_push(OpRequestRef op)
|
||||
void ReplicatedPG::_failed_push(int from, const hobject_t &soid)
|
||||
{
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
|
||||
assert(m->get_header().type == MSG_OSD_SUBOP);
|
||||
const hobject_t& soid = m->poid;
|
||||
int from = m->get_source().num();
|
||||
map<hobject_t,set<int> >::iterator p = missing_loc.find(soid);
|
||||
if (p != missing_loc.end()) {
|
||||
dout(0) << "_failed_push " << soid << " from osd." << from
|
||||
@ -6842,6 +6851,7 @@ int ReplicatedPG::recover_primary(int max)
|
||||
obc->obs.oi.version = latest->version;
|
||||
|
||||
ObjectStore::Transaction *t = new ObjectStore::Transaction;
|
||||
t->register_on_applied(new ObjectStore::C_DeleteTransaction(t));
|
||||
bufferlist b2;
|
||||
obc->obs.oi.encode(b2);
|
||||
t->setattr(coll, soid, OI_ATTR, b2);
|
||||
@ -6851,9 +6861,9 @@ int ReplicatedPG::recover_primary(int max)
|
||||
++active_pushes;
|
||||
|
||||
osd->store->queue_transaction(osr.get(), t,
|
||||
new C_OSD_AppliedRecoveredObject(this, t, obc),
|
||||
new C_OSD_AppliedRecoveredObject(this, obc),
|
||||
new C_OSD_CommittedPushedObject(
|
||||
this, OpRequestRef(),
|
||||
this,
|
||||
get_osdmap()->get_epoch(),
|
||||
info.last_complete),
|
||||
new C_OSD_OndiskWriteUnlock(obc));
|
||||
|
@ -556,7 +556,9 @@ protected:
|
||||
bufferlist data_received,
|
||||
interval_set<uint64_t> *intervals_usable,
|
||||
bufferlist *data_usable);
|
||||
void handle_pull_response(OpRequestRef op);
|
||||
bool handle_pull_response(
|
||||
int from, PushOp &op, PullOp *response,
|
||||
ObjectStore::Transaction *t);
|
||||
void handle_push(OpRequestRef op);
|
||||
int send_push(int priority, int peer,
|
||||
const ObjectRecoveryInfo& recovery_info,
|
||||
@ -809,12 +811,11 @@ protected:
|
||||
};
|
||||
struct C_OSD_AppliedRecoveredObject : public Context {
|
||||
ReplicatedPGRef pg;
|
||||
ObjectStore::Transaction *t;
|
||||
ObjectContext *obc;
|
||||
C_OSD_AppliedRecoveredObject(ReplicatedPG *p, ObjectStore::Transaction *tt, ObjectContext *o) :
|
||||
pg(p), t(tt), obc(o) {}
|
||||
C_OSD_AppliedRecoveredObject(ReplicatedPG *p, ObjectContext *o) :
|
||||
pg(p), obc(o) {}
|
||||
void finish(int r) {
|
||||
pg->_applied_recovered_object(t, obc);
|
||||
pg->_applied_recovered_object(obc);
|
||||
}
|
||||
};
|
||||
struct C_OSD_CommittedPushedObject : public Context {
|
||||
@ -875,12 +876,12 @@ protected:
|
||||
void sub_op_modify_commit(RepModify *rm);
|
||||
|
||||
void sub_op_modify_reply(OpRequestRef op);
|
||||
void _applied_recovered_object(ObjectStore::Transaction *t, ObjectContext *obc);
|
||||
void _applied_recovered_object(ObjectContext *obc);
|
||||
void _applied_recovered_object_replica(ObjectStore::Transaction *t);
|
||||
void _committed_pushed_object(epoch_t epoch, eversion_t lc);
|
||||
void recover_got(hobject_t oid, eversion_t v);
|
||||
void sub_op_push(OpRequestRef op);
|
||||
void _failed_push(OpRequestRef op);
|
||||
void _failed_push(int from, const hobject_t &soid);
|
||||
void sub_op_push_reply(OpRequestRef op);
|
||||
void sub_op_pull(OpRequestRef op);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user