Merge pull request #49546 from Matan-B/wip-matanb-pull-push-naming

osd/ReplicatedBackend: Rename Push/Pull Info
This commit is contained in:
Laura Flores 2023-03-09 12:55:18 -06:00 committed by GitHub
commit 8a9ee6f6a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 84 additions and 93 deletions

View File

@ -1452,14 +1452,14 @@ void ReplicatedBackend::prepare_pull(
ceph_assert(!pulling.count(soid));
pull_from_peer[fromshard].insert(soid);
PullInfo &pi = pulling[soid];
pi.from = fromshard;
pi.soid = soid;
pi.head_ctx = headctx;
pi.recovery_info = op.recovery_info;
pi.recovery_progress = op.recovery_progress;
pi.cache_dont_need = h->cache_dont_need;
pi.lock_manager = std::move(lock_manager);
pull_info_t &pull_info = pulling[soid];
pull_info.from = fromshard;
pull_info.soid = soid;
pull_info.head_ctx = headctx;
pull_info.recovery_info = op.recovery_info;
pull_info.recovery_progress = op.recovery_progress;
pull_info.cache_dont_need = h->cache_dont_need;
pull_info.lock_manager = std::move(lock_manager);
}
/*
@ -1563,28 +1563,28 @@ int ReplicatedBackend::prep_push(
const auto missing_iter = pmissing_iter->second.get_items().find(soid);
assert(missing_iter != pmissing_iter->second.get_items().end());
// take note.
PushInfo &pi = pushing[soid][peer];
pi.obc = obc;
pi.recovery_info.size = obc->obs.oi.size;
pi.recovery_info.copy_subset = data_subset;
pi.recovery_info.clone_subset = clone_subsets;
pi.recovery_info.soid = soid;
pi.recovery_info.oi = obc->obs.oi;
pi.recovery_info.ss = pop->recovery_info.ss;
pi.recovery_info.version = version;
pi.recovery_info.object_exist = missing_iter->second.clean_regions.object_is_exist();
pi.recovery_progress.omap_complete = !missing_iter->second.clean_regions.omap_is_dirty();
pi.lock_manager = std::move(lock_manager);
push_info_t &push_info = pushing[soid][peer];
push_info.obc = obc;
push_info.recovery_info.size = obc->obs.oi.size;
push_info.recovery_info.copy_subset = data_subset;
push_info.recovery_info.clone_subset = clone_subsets;
push_info.recovery_info.soid = soid;
push_info.recovery_info.oi = obc->obs.oi;
push_info.recovery_info.ss = pop->recovery_info.ss;
push_info.recovery_info.version = version;
push_info.recovery_info.object_exist = missing_iter->second.clean_regions.object_is_exist();
push_info.recovery_progress.omap_complete = !missing_iter->second.clean_regions.omap_is_dirty();
push_info.lock_manager = std::move(lock_manager);
ObjectRecoveryProgress new_progress;
int r = build_push_op(pi.recovery_info,
pi.recovery_progress,
int r = build_push_op(push_info.recovery_info,
push_info.recovery_progress,
&new_progress,
pop,
&(pi.stat), cache_dont_need);
&(push_info.stat), cache_dont_need);
if (r < 0)
return r;
pi.recovery_progress = new_progress;
push_info.recovery_progress = new_progress;
return 0;
}
@ -1800,18 +1800,18 @@ bool ReplicatedBackend::handle_pull_response(
return false;
}
PullInfo &pi = piter->second;
if (pi.recovery_info.size == (uint64_t(-1))) {
pi.recovery_info.size = pop.recovery_info.size;
pi.recovery_info.copy_subset.intersection_of(
pull_info_t &pull_info = piter->second;
if (pull_info.recovery_info.size == (uint64_t(-1))) {
pull_info.recovery_info.size = pop.recovery_info.size;
pull_info.recovery_info.copy_subset.intersection_of(
pop.recovery_info.copy_subset);
}
// If primary doesn't have object info and didn't know version
if (pi.recovery_info.version == eversion_t()) {
pi.recovery_info.version = pop.version;
if (pull_info.recovery_info.version == eversion_t()) {
pull_info.recovery_info.version = pop.version;
}
bool first = pi.recovery_progress.first;
bool first = pull_info.recovery_progress.first;
if (first) {
// attrs only reference the origin bufferlist (decode from
// MOSDPGPush message) whose size is much greater than attrs in
@ -1823,23 +1823,23 @@ bool ReplicatedBackend::handle_pull_response(
for (auto& a : attrset) {
a.second.rebuild();
}
pi.obc = get_parent()->get_obc(pi.recovery_info.soid, attrset);
pull_info.obc = get_parent()->get_obc(pull_info.recovery_info.soid, attrset);
if (attrset.find(SS_ATTR) != attrset.end()) {
bufferlist ssbv = attrset.at(SS_ATTR);
SnapSet ss(ssbv);
assert(!pi.obc->ssc->exists || ss.seq == pi.obc->ssc->snapset.seq);
assert(!pull_info.obc->ssc->exists || ss.seq == pull_info.obc->ssc->snapset.seq);
}
pi.recovery_info.oi = pi.obc->obs.oi;
pi.recovery_info = recalc_subsets(
pi.recovery_info,
pi.obc->ssc,
pi.lock_manager);
pull_info.recovery_info.oi = pull_info.obc->obs.oi;
pull_info.recovery_info = recalc_subsets(
pull_info.recovery_info,
pull_info.obc->ssc,
pull_info.lock_manager);
}
interval_set<uint64_t> usable_intervals;
bufferlist usable_data;
trim_pushed_data(pi.recovery_info.copy_subset,
trim_pushed_data(pull_info.recovery_info.copy_subset,
data_included,
data,
&usable_intervals,
@ -1848,24 +1848,24 @@ bool ReplicatedBackend::handle_pull_response(
data = std::move(usable_data);
pi.recovery_progress = pop.after_progress;
pull_info.recovery_progress = pop.after_progress;
dout(10) << "new recovery_info " << pi.recovery_info
<< ", new progress " << pi.recovery_progress
dout(10) << "new recovery_info " << pull_info.recovery_info
<< ", new progress " << pull_info.recovery_progress
<< dendl;
interval_set<uint64_t> data_zeros;
uint64_t z_offset = pop.before_progress.data_recovered_to;
uint64_t z_length = pop.after_progress.data_recovered_to - pop.before_progress.data_recovered_to;
if (z_length)
data_zeros.insert(z_offset, z_length);
bool complete = pi.is_complete();
bool complete = pull_info.is_complete();
bool clear_omap = !pop.before_progress.omap_complete;
submit_push_data(pi.recovery_info,
submit_push_data(pull_info.recovery_info,
first,
complete,
clear_omap,
pi.cache_dont_need,
pull_info.cache_dont_need,
data_zeros,
data_included,
data,
@ -1874,26 +1874,26 @@ bool ReplicatedBackend::handle_pull_response(
pop.omap_entries,
t);
pi.stat.num_keys_recovered += pop.omap_entries.size();
pi.stat.num_bytes_recovered += data.length();
pull_info.stat.num_keys_recovered += pop.omap_entries.size();
pull_info.stat.num_bytes_recovered += data.length();
get_parent()->get_logger()->inc(l_osd_rbytes, pop.omap_entries.size() + data.length());
if (complete) {
pi.stat.num_objects_recovered++;
pull_info.stat.num_objects_recovered++;
// XXX: This could overcount if regular recovery is needed right after a repair
if (get_parent()->pg_is_repair()) {
pi.stat.num_objects_repaired++;
pull_info.stat.num_objects_repaired++;
get_parent()->inc_osd_stat_repaired();
}
clear_pull_from(piter);
to_continue->push_back({hoid, pi.stat});
to_continue->push_back({hoid, pull_info.stat});
get_parent()->on_local_recover(
hoid, pi.recovery_info, pi.obc, false, t);
hoid, pull_info.recovery_info, pull_info.obc, false, t);
return false;
} else {
response->soid = pop.soid;
response->recovery_info = pi.recovery_info;
response->recovery_progress = pi.recovery_progress;
response->recovery_info = pull_info.recovery_info;
response->recovery_progress = pull_info.recovery_progress;
return true;
}
}
@ -2205,18 +2205,18 @@ bool ReplicatedBackend::handle_push_reply(
<< dendl;
return false;
} else {
PushInfo *pi = &pushing[soid][peer];
push_info_t *push_info = &pushing[soid][peer];
bool error = pushing[soid].begin()->second.recovery_progress.error;
if (!pi->recovery_progress.data_complete && !error) {
if (!push_info->recovery_progress.data_complete && !error) {
dout(10) << " pushing more from, "
<< pi->recovery_progress.data_recovered_to
<< " of " << pi->recovery_info.copy_subset << dendl;
<< push_info->recovery_progress.data_recovered_to
<< " of " << push_info->recovery_info.copy_subset << dendl;
ObjectRecoveryProgress new_progress;
int r = build_push_op(
pi->recovery_info,
pi->recovery_progress, &new_progress, reply,
&(pi->stat));
push_info->recovery_info,
push_info->recovery_progress, &new_progress, reply,
&(push_info->stat));
// Handle the case of a read error right after we wrote, which is
// hopefully extremely rare.
if (r < 0) {
@ -2225,19 +2225,19 @@ bool ReplicatedBackend::handle_push_reply(
error = true;
goto done;
}
pi->recovery_progress = new_progress;
push_info->recovery_progress = new_progress;
return true;
} else {
// done!
done:
if (!error)
get_parent()->on_peer_recover( peer, soid, pi->recovery_info);
get_parent()->on_peer_recover( peer, soid, push_info->recovery_info);
get_parent()->release_locks(pi->lock_manager);
object_stat_sum_t stat = pi->stat;
eversion_t v = pi->recovery_info.version;
get_parent()->release_locks(push_info->lock_manager);
object_stat_sum_t stat = push_info->stat;
eversion_t v = push_info->recovery_info.version;
pushing[soid].erase(peer);
pi = NULL;
push_info = nullptr;
if (pushing[soid].empty()) {
if (!error)
@ -2351,7 +2351,7 @@ void ReplicatedBackend::_failed_pull(pg_shard_t from, const hobject_t &soid)
}
void ReplicatedBackend::clear_pull_from(
map<hobject_t, PullInfo>::iterator piter)
map<hobject_t, pull_info_t>::iterator piter)
{
auto from = piter->second.from;
pull_from_peer[from].erase(piter->second.soid);
@ -2360,7 +2360,7 @@ void ReplicatedBackend::clear_pull_from(
}
void ReplicatedBackend::clear_pull(
map<hobject_t, PullInfo>::iterator piter,
map<hobject_t, pull_info_t>::iterator piter,
bool clear_pull_from_peer)
{
if (clear_pull_from_peer) {

View File

@ -91,19 +91,15 @@ public:
void dump_recovery_info(ceph::Formatter *f) const override {
{
f->open_array_section("pull_from_peer");
for (std::map<pg_shard_t, std::set<hobject_t> >::const_iterator i = pull_from_peer.begin();
i != pull_from_peer.end();
++i) {
for (const auto& i : pull_from_peer) {
f->open_object_section("pulling_from");
f->dump_stream("pull_from") << i->first;
f->dump_stream("pull_from") << i.first;
{
f->open_array_section("pulls");
for (std::set<hobject_t>::const_iterator j = i->second.begin();
j != i->second.end();
++j) {
for (const auto& j : i.second) {
f->open_object_section("pull_info");
ceph_assert(pulling.count(*j));
pulling.find(*j)->second.dump(f);
ceph_assert(pulling.count(j));
pulling.find(j)->second.dump(f);
f->close_section();
}
f->close_section();
@ -114,22 +110,17 @@ public:
}
{
f->open_array_section("pushing");
for (std::map<hobject_t, std::map<pg_shard_t, PushInfo>>::const_iterator i =
pushing.begin();
i != pushing.end();
++i) {
for(const auto& i : pushing) {
f->open_object_section("object");
f->dump_stream("pushing") << i->first;
f->dump_stream("pushing") << i.first;
{
f->open_array_section("pushing_to");
for (std::map<pg_shard_t, PushInfo>::const_iterator j = i->second.begin();
j != i->second.end();
++j) {
for (const auto& j : i.second) {
f->open_object_section("push_progress");
f->dump_stream("pushing_to") << j->first;
f->dump_stream("pushing_to") << j.first;
{
f->open_object_section("push_info");
j->second.dump(f);
j.second.dump(f);
f->close_section();
}
f->close_section();
@ -164,7 +155,7 @@ public:
private:
// push
struct PushInfo {
struct push_info_t {
ObjectRecoveryProgress recovery_progress;
ObjectRecoveryInfo recovery_info;
ObjectContextRef obc;
@ -184,10 +175,10 @@ private:
}
}
};
std::map<hobject_t, std::map<pg_shard_t, PushInfo>> pushing;
std::map<hobject_t, std::map<pg_shard_t, push_info_t>> pushing;
// pull
struct PullInfo {
struct pull_info_t {
pg_shard_t from;
hobject_t soid;
ObjectRecoveryProgress recovery_progress;
@ -216,15 +207,15 @@ private:
}
};
std::map<hobject_t, PullInfo> pulling;
std::map<hobject_t, pull_info_t> pulling;
// Reverse mapping from osd peer to objects being pulled from that peer
std::map<pg_shard_t, std::set<hobject_t> > pull_from_peer;
void clear_pull(
std::map<hobject_t, PullInfo>::iterator piter,
std::map<hobject_t, pull_info_t>::iterator piter,
bool clear_pull_from_peer = true);
void clear_pull_from(
std::map<hobject_t, PullInfo>::iterator piter);
std::map<hobject_t, pull_info_t>::iterator piter);
void _do_push(OpRequestRef op);
void _do_pull_response(OpRequestRef op);