Merge pull request #38630 from tchaikov/wip-crimson-cleanups

crimson/osd: cleanups to drop some do_with() calls

Reviewed-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
This commit is contained in:
Kefu Chai 2020-12-19 13:24:31 +08:00 committed by GitHub
commit 507fdb635a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 133 additions and 163 deletions

View File

@ -27,82 +27,70 @@ seastar::future<> ReplicatedRecoveryBackend::recover_object(
assert(is_recovering(soid));
// start tracking the recovery of soid
return maybe_pull_missing_obj(soid, need).then([this, soid, need] {
return seastar::do_with(get_shards_to_push(soid),
[this, soid, need](auto& shards) {
return maybe_push_shards(soid, need, shards);
});
auto& recovery_waiter = recovering.at(soid);
if (recovery_waiter.obc) {
return maybe_push_shards(soid, need);
} else {
logger().debug("recover_object: loading obc: {}", soid);
return pg.with_head_obc<RWState::RWREAD>(soid,
[this, soid, need, &recovery_waiter](auto obc) {
logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid);
recovery_waiter.obc = obc;
recovery_waiter.obc->wait_recovery_read();
return maybe_push_shards(soid, need);
}).handle_error(
crimson::osd::PG::load_obc_ertr::all_same_way([soid](auto& code) {
// TODO: may need eio handling?
logger().error("recover_object saw error code {}, ignoring object {}",
code, soid);
}));
}
});
}
seastar::future<>
ReplicatedRecoveryBackend::maybe_push_shards(
const hobject_t& soid,
eversion_t need,
std::vector<pg_shard_t>& shards)
eversion_t need)
{
auto push_func = [this, soid, need, &shards] {
auto prepare_pops = seastar::now();
if (!shards.empty()) {
prepare_pops =
prep_push(soid, need, shards).then([this, &shards, soid](auto pops) {
return seastar::parallel_for_each(shards,
[this, &pops, soid](auto shard) {
auto msg = make_message<MOSDPGPush>();
msg->from = pg.get_pg_whoami();
msg->pgid = pg.get_pgid();
msg->map_epoch = pg.get_osdmap_epoch();
msg->min_epoch = pg.get_last_peering_reset();
msg->pushes.push_back(pops[shard]);
msg->set_priority(pg.get_recovery_op_priority());
return shard_services.send_to_osd(shard.osd,
std::move(msg),
pg.get_osdmap_epoch()).then(
[this, soid, shard] {
return recovering.at(soid).wait_for_pushes(shard);
});
});
});
}
return prepare_pops.then([this, soid] {
auto &recovery = recovering.at(soid);
auto push_info = recovery.pushing.begin();
object_stat_sum_t stat = {};
if (push_info != recovery.pushing.end()) {
stat = push_info->second.stat;
} else {
// no push happened, take pull_info's stat
assert(recovery.pi);
stat = recovery.pi->stat;
}
pg.get_recovery_handler()->on_global_recover(soid, stat, false);
return seastar::make_ready_future<>();
}).handle_exception([this, soid](auto e) {
auto &recovery = recovering.at(soid);
if (recovery.obc) {
recovery.obc->drop_recovery_read();
}
recovering.erase(soid);
return seastar::make_exception_future<>(e);
return seastar::parallel_for_each(get_shards_to_push(soid),
[this, need, soid](auto shard) {
return prep_push(soid, need, shard).then([this, soid, shard](auto push) {
auto msg = make_message<MOSDPGPush>();
msg->from = pg.get_pg_whoami();
msg->pgid = pg.get_pgid();
msg->map_epoch = pg.get_osdmap_epoch();
msg->min_epoch = pg.get_last_peering_reset();
msg->pushes.push_back(std::move(push));
msg->set_priority(pg.get_recovery_op_priority());
return shard_services.send_to_osd(shard.osd,
std::move(msg),
pg.get_osdmap_epoch()).then(
[this, soid, shard] {
return recovering.at(soid).wait_for_pushes(shard);
});
});
}; // push_func
auto& recovery_waiter = recovering.at(soid);
if (recovery_waiter.obc) {
return seastar::futurize_invoke(std::move(push_func));
}
logger().debug("recover_object: loading obc: {}", soid);
return pg.with_head_obc<RWState::RWREAD>(soid,
[&recovery_waiter, push_func=std::move(push_func)](auto obc) {
logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid);
recovery_waiter.obc = obc;
recovery_waiter.obc->wait_recovery_read();
return seastar::futurize_invoke(std::move(push_func));
}).handle_error(
crimson::osd::PG::load_obc_ertr::all_same_way([soid](auto& code) {
//TODO: may need eio handling?
logger().error("recover_object saw error code {},"
" ignoring object {}", code, soid);
}));
}).then([this, soid] {
auto &recovery = recovering.at(soid);
auto push_info = recovery.pushing.begin();
object_stat_sum_t stat = {};
if (push_info != recovery.pushing.end()) {
stat = push_info->second.stat;
} else {
// no push happened, take pull_info's stat
assert(recovery.pi);
stat = recovery.pi->stat;
}
pg.get_recovery_handler()->on_global_recover(soid, stat, false);
return seastar::make_ready_future<>();
}).handle_exception([this, soid](auto e) {
auto &recovery = recovering.at(soid);
if (recovery.obc) {
recovery.obc->drop_recovery_read();
}
recovering.erase(soid);
return seastar::make_exception_future<>(e);
});
}
seastar::future<>
@ -152,12 +140,11 @@ seastar::future<> ReplicatedRecoveryBackend::push_delete(
if (iter == pg.get_shard_missing().end())
return seastar::make_ready_future<>();
if (iter->second.is_missing(soid)) {
logger().debug("{} will remove {} from {}", __func__, soid, shard);
logger().debug("push_delete: will remove {} from {}", soid, shard);
pg.begin_peer_recover(shard, soid);
spg_t target_pg = spg_t(pg.get_info().pgid.pgid, shard.shard);
spg_t target_pg(pg.get_info().pgid.pgid, shard.shard);
auto msg = make_message<MOSDPGRecoveryDelete>(
pg.get_pg_whoami(), target_pg, pg.get_osdmap_epoch(), min_epoch);
msg->set_priority(pg.get_recovery_op_priority());
msg->objects.push_back(std::make_pair(soid, need));
return shard_services.send_to_osd(shard.osd, std::move(msg),
@ -256,10 +243,8 @@ seastar::future<> ReplicatedRecoveryBackend::recover_delete(
if (shard == pg.get_pg_whoami())
continue;
if (pg.get_shard_missing(shard)->is_missing(soid)) {
logger().debug("{}: soid {} needs to deleted from replca {}",
__func__,
soid,
shard);
logger().debug("recover_delete: soid {} needs to deleted from replca {}",
soid, shard);
object_missing = true;
break;
}
@ -280,67 +265,53 @@ seastar::future<> ReplicatedRecoveryBackend::recover_delete(
});
}
seastar::future<std::map<pg_shard_t, PushOp>>
seastar::future<PushOp>
ReplicatedRecoveryBackend::prep_push(
const hobject_t& soid,
eversion_t need,
const std::vector<pg_shard_t>& shards)
pg_shard_t pg_shard)
{
logger().debug("{}: {}, {}", __func__, soid, need);
return seastar::do_with(std::map<pg_shard_t, PushOp>(),
std::map<pg_shard_t, interval_set<uint64_t>>(),
[this, soid, &shards](auto& pops,
auto& data_subsets) {
return seastar::parallel_for_each(shards,
[this, soid, pops, &data_subsets](auto pg_shard) mutable {
pops.emplace(pg_shard, PushOp());
auto& recovery_waiter = recovering.at(soid);
auto& obc = recovery_waiter.obc;
auto& data_subset = data_subsets[pg_shard];
auto& recovery_waiter = recovering.at(soid);
auto& obc = recovery_waiter.obc;
interval_set<uint64_t> data_subset;
if (obc->obs.oi.size) {
data_subset.insert(0, obc->obs.oi.size);
}
const auto& missing = pg.get_shard_missing().find(pg_shard)->second;
if (HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS)) {
const auto it = missing.get_items().find(soid);
assert(it != missing.get_items().end());
data_subset.intersection_of(it->second.clean_regions.get_dirty_regions());
logger().debug("prep_push: {} data_subset {}", soid, data_subset);
}
if (obc->obs.oi.size) {
data_subset.insert(0, obc->obs.oi.size);
}
const auto& missing = pg.get_shard_missing().find(pg_shard)->second;
if (HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS)) {
const auto it = missing.get_items().find(soid);
assert(it != missing.get_items().end());
data_subset.intersection_of(it->second.clean_regions.get_dirty_regions());
logger().debug("calc_head_subsets {} data_subset {}", soid, data_subset);
}
logger().debug("prep_push: {} to {}", soid, pg_shard);
auto& pi = recovery_waiter.pushing[pg_shard];
pg.begin_peer_recover(pg_shard, soid);
const auto pmissing_iter = pg.get_shard_missing().find(pg_shard);
const auto missing_iter = pmissing_iter->second.get_items().find(soid);
assert(missing_iter != pmissing_iter->second.get_items().end());
logger().debug("prep_push: {} to {}", soid, pg_shard);
auto& pi = recovery_waiter.pushing[pg_shard];
pg.begin_peer_recover(pg_shard, soid);
const auto pmissing_iter = pg.get_shard_missing().find(pg_shard);
const auto missing_iter = pmissing_iter->second.get_items().find(soid);
assert(missing_iter != pmissing_iter->second.get_items().end());
pi.obc = obc;
pi.recovery_info.size = obc->obs.oi.size;
pi.recovery_info.copy_subset = data_subset;
pi.recovery_info.soid = soid;
pi.recovery_info.oi = obc->obs.oi;
pi.recovery_info.version = obc->obs.oi.version;
pi.recovery_info.object_exist =
missing_iter->second.clean_regions.object_is_exist();
pi.recovery_progress.omap_complete =
(!missing_iter->second.clean_regions.omap_is_dirty() &&
HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS));
pi.obc = obc;
pi.recovery_info.size = obc->obs.oi.size;
pi.recovery_info.copy_subset = data_subset;
pi.recovery_info.soid = soid;
pi.recovery_info.oi = obc->obs.oi;
pi.recovery_info.version = obc->obs.oi.version;
pi.recovery_info.object_exist =
missing_iter->second.clean_regions.object_is_exist();
pi.recovery_progress.omap_complete =
!missing_iter->second.clean_regions.omap_is_dirty() &&
HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS);
return build_push_op(pi.recovery_info, pi.recovery_progress,
&pi.stat).then(
[this, soid, pg_shard, &pops](auto pop) {
auto& recovery_waiter = recovering.at(soid);
auto& pi = recovery_waiter.pushing[pg_shard];
pi.recovery_progress = pop.after_progress;
pops[pg_shard] = std::move(pop);
return seastar::make_ready_future<>();
});
}).then([&pops]() mutable {
return seastar::make_ready_future<std::map<pg_shard_t, PushOp>>(std::move(pops));
});
return build_push_op(pi.recovery_info, pi.recovery_progress, &pi.stat).then(
[this, soid, pg_shard](auto pop) {
auto& recovery_waiter = recovering.at(soid);
auto& pi = recovery_waiter.pushing[pg_shard];
pi.recovery_progress = pop.after_progress;
return pop;
});
}
@ -400,9 +371,8 @@ seastar::future<PushOp> ReplicatedRecoveryBackend::build_push_op(
pop->omap_header.claim_append(bl);
return store->get_attrs(coll, ghobject_t(recovery_info.soid));
}).safe_then([&oi, pop, &new_progress, &v](auto attrs) mutable {
//pop->attrset = attrs;
for (auto p : attrs) {
pop->attrset[p.first].push_back(p.second);
for (auto& [key, val] : attrs) {
pop->attrset[std::move(key)].push_back(std::move(val));
}
logger().debug("build_push_op: {}", pop->attrset[OI_ATTR]);
oi.decode(pop->attrset[OI_ATTR]);
@ -666,7 +636,7 @@ seastar::future<bool> ReplicatedRecoveryBackend::_handle_pull_response(
bool complete = pi.is_complete();
bool clear_omap = !pop.before_progress.omap_complete;
return submit_push_data(pi.recovery_info, first, complete, clear_omap,
data_zeros, usable_intervals, data, pop.omap_header,
std::move(data_zeros), usable_intervals, data, pop.omap_header,
pop.attrset, pop.omap_entries, t).then(
[this, response, &pi, &pop, complete, t, bytes_recovered=data.length()] {
pi.stat.num_keys_recovered += pop.omap_entries.size();
@ -677,12 +647,12 @@ seastar::future<bool> ReplicatedRecoveryBackend::_handle_pull_response(
pg.get_recovery_handler()->on_local_recover(
pop.soid, recovering.at(pop.soid).pi->recovery_info,
false, *t);
return seastar::make_ready_future<bool>(true);
return true;
} else {
response->soid = pop.soid;
response->recovery_info = pi.recovery_info;
response->recovery_progress = pi.recovery_progress;
return seastar::make_ready_future<bool>(false);
return false;
}
});
});
@ -745,29 +715,29 @@ seastar::future<> ReplicatedRecoveryBackend::_handle_push(
{
logger().debug("{}", __func__);
return seastar::do_with(interval_set<uint64_t>(),
bufferlist(),
[this, &pop, t, response](auto& data_zeros, auto& data) {
data = pop.data;
bool first = pop.before_progress.first;
bool complete = pop.after_progress.data_complete
&& pop.after_progress.omap_complete;
bool clear_omap = !pop.before_progress.omap_complete;
uint64_t z_offset = pop.before_progress.data_recovered_to;
uint64_t z_length = pop.after_progress.data_recovered_to
- pop.before_progress.data_recovered_to;
if (z_length)
data_zeros.insert(z_offset, z_length);
response->soid = pop.recovery_info.soid;
bool first = pop.before_progress.first;
interval_set<uint64_t> data_zeros;
{
uint64_t offset = pop.before_progress.data_recovered_to;
uint64_t length = (pop.after_progress.data_recovered_to -
pop.before_progress.data_recovered_to);
if (length) {
data_zeros.insert(offset, length);
}
}
bool complete = (pop.after_progress.data_complete &&
pop.after_progress.omap_complete);
bool clear_omap = !pop.before_progress.omap_complete;
response->soid = pop.recovery_info.soid;
return submit_push_data(pop.recovery_info, first, complete, clear_omap,
data_zeros, pop.data_included, data, pop.omap_header, pop.attrset,
pop.omap_entries, t).then([this, complete, &pop, t] {
if (complete) {
pg.get_recovery_handler()->on_local_recover(pop.recovery_info.soid,
pop.recovery_info, false, *t);
}
});
return submit_push_data(pop.recovery_info, first, complete, clear_omap,
std::move(data_zeros), pop.data_included, pop.data, pop.omap_header,
pop.attrset, pop.omap_entries, t).then([this, complete, &pop, t] {
if (complete) {
pg.get_recovery_handler()->on_local_recover(
pop.recovery_info.soid, pop.recovery_info,
false, *t);
}
});
}
@ -904,7 +874,7 @@ seastar::future<> ReplicatedRecoveryBackend::submit_push_data(
bool first,
bool complete,
bool clear_omap,
interval_set<uint64_t> &data_zeros,
interval_set<uint64_t> data_zeros,
const interval_set<uint64_t> &intervals_included,
bufferlist data_included,
bufferlist omap_header,
@ -986,8 +956,9 @@ seastar::future<> ReplicatedRecoveryBackend::submit_push_data(
});
}
return seastar::make_ready_future<>();
}().then([this, &data_zeros, &recovery_info, &intervals_included, t, target_oid,
&omap_entries, &attrs, data_included, complete, first] {
}().then([this, data_zeros=std::move(data_zeros),
&recovery_info, &intervals_included, t, target_oid,
&omap_entries, &attrs, data_included, complete, first]() mutable {
uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL;
// Punch zeros for data, if fiemap indicates nothing but it is marked dirty
if (!data_zeros.empty()) {

View File

@ -44,10 +44,10 @@ protected:
Ref<MOSDPGRecoveryDelete> m);
seastar::future<> handle_recovery_delete_reply(
Ref<MOSDPGRecoveryDeleteReply> m);
seastar::future<std::map<pg_shard_t, PushOp>> prep_push(
seastar::future<PushOp> prep_push(
const hobject_t& soid,
eversion_t need,
const std::vector<pg_shard_t>& shards);
pg_shard_t pg_shard);
void prepare_pull(
PullOp& po,
PullInfo& pi,
@ -73,7 +73,7 @@ protected:
bool first,
bool complete,
bool clear_omap,
interval_set<uint64_t> &data_zeros,
interval_set<uint64_t> data_zeros,
const interval_set<uint64_t> &intervals_included,
ceph::bufferlist data_included,
ceph::bufferlist omap_header,
@ -115,8 +115,7 @@ private:
seastar::future<> maybe_push_shards(
const hobject_t& soid,
eversion_t need,
std::vector<pg_shard_t>& shards);
eversion_t need);
/// read the remaining extents of object to be recovered and fill push_op
/// with them