Merge pull request #38569 from tchaikov/wip-crimson-cleanups

crimson/osd: cleanups

Reviewed-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
Reviewed-by: Xuehan Xu <xxhdx1985126@gmail.com>
This commit is contained in:
Kefu Chai 2020-12-15 14:08:04 +08:00 committed by GitHub
commit fe126d34b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 174 additions and 194 deletions

View File

@ -26,12 +26,11 @@ seastar::future<> ReplicatedRecoveryBackend::recover_object(
// always add_recovering(soid) before recover_object(soid)
assert(is_recovering(soid));
// start tracking the recovery of soid
return seastar::do_with(std::map<pg_shard_t, PushOp>(), get_shards_to_push(soid),
[this, soid, need](auto& pops, auto& shards) {
return maybe_pull_missing_obj(soid, need).then(
[this, soid, need, &pops, &shards](bool pulled) {
return maybe_push_shards(soid, need, pops, shards);
});
return maybe_pull_missing_obj(soid, need).then([this, soid, need] {
return seastar::do_with(get_shards_to_push(soid),
[this, soid, need](auto& shards) {
return maybe_push_shards(soid, need, shards);
});
});
}
@ -39,50 +38,53 @@ seastar::future<>
ReplicatedRecoveryBackend::maybe_push_shards(
const hobject_t& soid,
eversion_t need,
std::map<pg_shard_t, PushOp>& pops,
std::list<std::map<pg_shard_t, pg_missing_t>::const_iterator>& shards)
std::vector<pg_shard_t>& shards)
{
auto push_func = [this, soid, need, &pops, &shards] {
auto fut = seastar::now();
if (!shards.empty())
fut = prep_push(soid, need, &pops, shards);
return fut.then([this, &pops, &shards, soid] {
return seastar::parallel_for_each(shards,
[this, &pops, soid](auto shard) {
auto msg = make_message<MOSDPGPush>();
msg->from = pg.get_pg_whoami();
msg->pgid = pg.get_pgid();
msg->map_epoch = pg.get_osdmap_epoch();
msg->min_epoch = pg.get_last_peering_reset();
msg->set_priority(pg.get_recovery_op_priority());
msg->pushes.push_back(pops[shard->first]);
return shard_services.send_to_osd(shard->first.osd, std::move(msg),
pg.get_osdmap_epoch()).then(
[this, soid, shard] {
return recovering.at(soid).wait_for_pushes(shard->first);
});
});
}).then([this, soid] {
auto& recovery = recovering.at(soid);
auto push_func = [this, soid, need, &shards] {
auto prepare_pops = seastar::now();
if (!shards.empty()) {
prepare_pops =
prep_push(soid, need, shards).then([this, &shards, soid](auto pops) {
return seastar::parallel_for_each(shards,
[this, &pops, soid](auto shard) {
auto msg = make_message<MOSDPGPush>();
msg->from = pg.get_pg_whoami();
msg->pgid = pg.get_pgid();
msg->map_epoch = pg.get_osdmap_epoch();
msg->min_epoch = pg.get_last_peering_reset();
msg->pushes.push_back(pops[shard]);
msg->set_priority(pg.get_recovery_op_priority());
return shard_services.send_to_osd(shard.osd,
std::move(msg),
pg.get_osdmap_epoch()).then(
[this, soid, shard] {
return recovering.at(soid).wait_for_pushes(shard);
});
});
});
}
return prepare_pops.then([this, soid] {
auto &recovery = recovering.at(soid);
auto push_info = recovery.pushing.begin();
object_stat_sum_t stat = {};
if (push_info != recovery.pushing.end()) {
stat = push_info->second.stat;
stat = push_info->second.stat;
} else {
// no push happened, take pull_info's stat
assert(recovery.pi);
stat = recovery.pi->stat;
// no push happened, take pull_info's stat
assert(recovery.pi);
stat = recovery.pi->stat;
}
pg.get_recovery_handler()->on_global_recover(soid, stat, false);
return seastar::make_ready_future<>();
}).handle_exception([this, soid](auto e) {
auto& recovery = recovering.at(soid);
if (recovery.obc)
recovery.obc->drop_recovery_read();
auto &recovery = recovering.at(soid);
if (recovery.obc) {
recovery.obc->drop_recovery_read();
}
recovering.erase(soid);
return seastar::make_exception_future<>(e);
});
};
}; // push_func
auto& recovery_waiter = recovering.at(soid);
if (recovery_waiter.obc) {
@ -105,14 +107,14 @@ ReplicatedRecoveryBackend::maybe_push_shards(
}));
}
seastar::future<bool>
seastar::future<>
ReplicatedRecoveryBackend::maybe_pull_missing_obj(
const hobject_t& soid,
eversion_t need)
{
pg_missing_tracker_t local_missing = pg.get_local_missing();
if (!local_missing.is_missing(soid)) {
return seastar::make_ready_future<bool>(false);
return seastar::make_ready_future<>();
}
PullOp po;
auto& recovery_waiter = recovering.at(soid);
@ -132,8 +134,6 @@ ReplicatedRecoveryBackend::maybe_pull_missing_obj(
pg.get_osdmap_epoch()
).then([&recovery_waiter] {
return recovery_waiter.wait_for_pull();
}).then([] {
return seastar::make_ready_future<bool>(true);
});
}
@ -282,27 +282,29 @@ seastar::future<> ReplicatedRecoveryBackend::recover_delete(
});
}
seastar::future<> ReplicatedRecoveryBackend::prep_push(
seastar::future<std::map<pg_shard_t, PushOp>>
ReplicatedRecoveryBackend::prep_push(
const hobject_t& soid,
eversion_t need,
std::map<pg_shard_t, PushOp>* pops,
const std::list<std::map<pg_shard_t, pg_missing_t>::const_iterator>& shards)
const std::vector<pg_shard_t>& shards)
{
logger().debug("{}: {}, {}", __func__, soid, need);
return seastar::do_with(std::map<pg_shard_t, interval_set<uint64_t>>(),
[this, soid, pops, &shards](auto& data_subsets) {
return seastar::do_with(std::map<pg_shard_t, PushOp>(),
std::map<pg_shard_t, interval_set<uint64_t>>(),
[this, soid, &shards](auto& pops,
auto& data_subsets) {
return seastar::parallel_for_each(shards,
[this, soid, pops, &data_subsets](auto pg_shard) mutable {
pops->emplace(pg_shard->first, PushOp());
pops.emplace(pg_shard, PushOp());
auto& recovery_waiter = recovering.at(soid);
auto& obc = recovery_waiter.obc;
auto& data_subset = data_subsets[pg_shard->first];
auto& data_subset = data_subsets[pg_shard];
if (obc->obs.oi.size) {
data_subset.insert(0, obc->obs.oi.size);
}
const auto& missing = pg.get_shard_missing().find(pg_shard->first)->second;
const auto& missing = pg.get_shard_missing().find(pg_shard)->second;
if (HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS)) {
const auto it = missing.get_items().find(soid);
assert(it != missing.get_items().end());
@ -310,10 +312,10 @@ seastar::future<> ReplicatedRecoveryBackend::prep_push(
logger().debug("calc_head_subsets {} data_subset {}", soid, data_subset);
}
logger().debug("prep_push: {} to {}", soid, pg_shard->first);
auto& pi = recovery_waiter.pushing[pg_shard->first];
pg.begin_peer_recover(pg_shard->first, soid);
const auto pmissing_iter = pg.get_shard_missing().find(pg_shard->first);
logger().debug("prep_push: {} to {}", soid, pg_shard);
auto& pi = recovery_waiter.pushing[pg_shard];
pg.begin_peer_recover(pg_shard, soid);
const auto pmissing_iter = pg.get_shard_missing().find(pg_shard);
const auto missing_iter = pmissing_iter->second.get_items().find(soid);
assert(missing_iter != pmissing_iter->second.get_items().end());
@ -330,13 +332,16 @@ seastar::future<> ReplicatedRecoveryBackend::prep_push(
HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS);
return build_push_op(pi.recovery_info, pi.recovery_progress,
&pi.stat, &(*pops)[pg_shard->first]).then(
[this, soid, pg_shard](auto new_progress) {
&pi.stat).then(
[this, soid, pg_shard, &pops](auto pop) {
auto& recovery_waiter = recovering.at(soid);
auto& pi = recovery_waiter.pushing[pg_shard->first];
pi.recovery_progress = new_progress;
auto& pi = recovery_waiter.pushing[pg_shard];
pi.recovery_progress = pop.after_progress;
pops[pg_shard] = std::move(pop);
return seastar::make_ready_future<>();
});
}).then([&pops]() mutable {
return seastar::make_ready_future<std::map<pg_shard_t, PushOp>>(std::move(pops));
});
});
}
@ -374,12 +379,11 @@ void ReplicatedRecoveryBackend::prepare_pull(PullOp& po, PullInfo& pi,
pi.recovery_progress = po.recovery_progress;
}
seastar::future<ObjectRecoveryProgress> ReplicatedRecoveryBackend::build_push_op(
seastar::future<PushOp> ReplicatedRecoveryBackend::build_push_op(
const ObjectRecoveryInfo& recovery_info,
const ObjectRecoveryProgress& progress,
object_stat_sum_t* stat,
PushOp* pop
) {
object_stat_sum_t* stat)
{
logger().debug("{} {} @{}",
__func__, recovery_info.soid, recovery_info.version);
return seastar::do_with(ObjectRecoveryProgress(progress),
@ -387,9 +391,10 @@ seastar::future<ObjectRecoveryProgress> ReplicatedRecoveryBackend::build_push_op
uint64_t(crimson::common::local_conf()
->osd_recovery_max_chunk),
eversion_t(),
[this, &recovery_info, &progress, stat, pop]
(auto& new_progress, auto& oi, auto& available, auto& v) {
return [this, &recovery_info, &progress, &new_progress, &oi, pop, &v] {
PushOp(),
[this, &recovery_info, &progress, stat]
(auto& new_progress, auto& oi, auto& available, auto& v, auto& pop) {
return [this, &recovery_info, &progress, &new_progress, &oi, &v, pop=&pop] {
v = recovery_info.version;
if (progress.first) {
return backend->omap_get_header(coll, ghobject_t(recovery_info.soid))
@ -415,20 +420,20 @@ seastar::future<ObjectRecoveryProgress> ReplicatedRecoveryBackend::build_push_op
);
}
return seastar::make_ready_future<>();
}().then([this, &recovery_info, &progress, &new_progress, &available, pop] {
}().then([this, &recovery_info, &progress, &new_progress, &available, &pop]() mutable {
return read_omap_for_push_op(recovery_info.soid,
progress,
new_progress,
available, pop);
}).then([this, &recovery_info, &progress, &available, pop] {
available, &pop);
}).then([this, &recovery_info, &progress, &available, &pop]() mutable {
logger().debug("build_push_op: available: {}, copy_subset: {}",
available, recovery_info.copy_subset);
return read_object_for_push_op(recovery_info.soid,
recovery_info.copy_subset,
progress.data_recovered_to,
available, pop);
}).then([&recovery_info, &v, &progress, &new_progress, stat, pop]
(uint64_t recovered_to) {
available, &pop);
}).then([&recovery_info, &v, &progress, &new_progress, stat, &pop]
(uint64_t recovered_to) mutable {
new_progress.data_recovered_to = recovered_to;
if (new_progress.is_complete(recovery_info)) {
new_progress.data_complete = true;
@ -440,18 +445,17 @@ seastar::future<ObjectRecoveryProgress> ReplicatedRecoveryBackend::build_push_op
new_progress.omap_complete = false;
}
if (stat) {
stat->num_keys_recovered += pop->omap_entries.size();
stat->num_bytes_recovered += pop->data.length();
stat->num_keys_recovered += pop.omap_entries.size();
stat->num_bytes_recovered += pop.data.length();
}
pop->version = v;
pop->soid = recovery_info.soid;
pop->recovery_info = recovery_info;
pop->after_progress = new_progress;
pop->before_progress = progress;
pop.version = v;
pop.soid = recovery_info.soid;
pop.recovery_info = recovery_info;
pop.after_progress = new_progress;
pop.before_progress = progress;
logger().debug("build_push_op: pop version: {}, pop data length: {}",
pop->version, pop->data.length());
return seastar::make_ready_future<ObjectRecoveryProgress>
(std::move(new_progress));
pop.version, pop.data.length());
return seastar::make_ready_future<PushOp>(std::move(pop));
});
});
}
@ -555,23 +559,19 @@ ReplicatedRecoveryBackend::read_omap_for_push_op(
});
}
std::list<std::map<pg_shard_t, pg_missing_t>::const_iterator>
ReplicatedRecoveryBackend::get_shards_to_push(const hobject_t& soid)
std::vector<pg_shard_t>
ReplicatedRecoveryBackend::get_shards_to_push(const hobject_t& soid) const
{
std::list<std::map<pg_shard_t, pg_missing_t>::const_iterator> shards;
std::vector<pg_shard_t> shards;
assert(pg.get_acting_recovery_backfill().size() > 0);
for (set<pg_shard_t>::iterator i =
pg.get_acting_recovery_backfill().begin();
i != pg.get_acting_recovery_backfill().end();
++i) {
if (*i == pg.get_pg_whoami())
for (const auto& peer : pg.get_acting_recovery_backfill()) {
if (peer == pg.get_pg_whoami())
continue;
pg_shard_t peer = *i;
map<pg_shard_t, pg_missing_t>::const_iterator j =
auto shard_missing =
pg.get_shard_missing().find(peer);
assert(j != pg.get_shard_missing().end());
if (j->second.is_missing(soid)) {
shards.push_back(j);
assert(shard_missing != pg.get_shard_missing().end());
if (shard_missing->second.is_missing(soid)) {
shards.push_back(shard_missing->first);
}
}
return shards;
@ -580,49 +580,37 @@ ReplicatedRecoveryBackend::get_shards_to_push(const hobject_t& soid)
seastar::future<> ReplicatedRecoveryBackend::handle_pull(Ref<MOSDPGPull> m)
{
logger().debug("{}: {}", __func__, *m);
vector<PullOp> pulls;
m->take_pulls(&pulls);
return seastar::do_with(std::move(pulls),
[this, m, from = m->from](auto& pulls) {
return seastar::parallel_for_each(pulls, [this, m, from](auto& pull_op) {
const hobject_t& soid = pull_op.soid;
return seastar::do_with(PushOp(),
[this, &soid, &pull_op, from](auto& pop) {
logger().debug("handle_pull: {}", soid);
return backend->stat(coll, ghobject_t(soid)).then(
[this, &pull_op, &pop](auto st) {
ObjectRecoveryInfo &recovery_info = pull_op.recovery_info;
ObjectRecoveryProgress &progress = pull_op.recovery_progress;
if (progress.first && recovery_info.size == ((uint64_t) -1)) {
// Adjust size and copy_subset
recovery_info.size = st.st_size;
if (st.st_size) {
interval_set<uint64_t> object_range;
object_range.insert(0, st.st_size);
recovery_info.copy_subset.intersection_of(object_range);
} else {
recovery_info.copy_subset.clear();
}
assert(recovery_info.clone_subset.empty());
}
return build_push_op(recovery_info, progress, 0, &pop);
}).handle_exception([soid, &pop](auto e) {
pop.recovery_info.version = eversion_t();
pop.version = eversion_t();
pop.soid = soid;
return seastar::make_ready_future<ObjectRecoveryProgress>();
}).then([this, &pop, from](auto new_progress) {
auto msg = make_message<MOSDPGPush>();
msg->from = pg.get_pg_whoami();
msg->pgid = pg.get_pgid();
msg->map_epoch = pg.get_osdmap_epoch();
msg->min_epoch = pg.get_last_peering_reset();
msg->set_priority(pg.get_recovery_op_priority());
msg->pushes.push_back(pop);
return shard_services.send_to_osd(from.osd, std::move(msg),
pg.get_osdmap_epoch());
});
});
return seastar::parallel_for_each(m->take_pulls(),
[this, from=m->from](auto& pull_op) {
const hobject_t& soid = pull_op.soid;
logger().debug("handle_pull: {}", soid);
return backend->stat(coll, ghobject_t(soid)).then(
[this, &pull_op](auto st) {
ObjectRecoveryInfo &recovery_info = pull_op.recovery_info;
ObjectRecoveryProgress &progress = pull_op.recovery_progress;
if (progress.first && recovery_info.size == ((uint64_t) -1)) {
// Adjust size and copy_subset
recovery_info.size = st.st_size;
if (st.st_size) {
interval_set<uint64_t> object_range;
object_range.insert(0, st.st_size);
recovery_info.copy_subset.intersection_of(object_range);
} else {
recovery_info.copy_subset.clear();
}
assert(recovery_info.clone_subset.empty());
}
return build_push_op(recovery_info, progress, 0);
}).then([this, from](auto pop) {
auto msg = make_message<MOSDPGPush>();
msg->from = pg.get_pg_whoami();
msg->pgid = pg.get_pgid();
msg->map_epoch = pg.get_osdmap_epoch();
msg->min_epoch = pg.get_last_peering_reset();
msg->set_priority(pg.get_recovery_op_priority());
msg->pushes.push_back(std::move(pop));
return shard_services.send_to_osd(from.osd, std::move(msg),
pg.get_osdmap_epoch());
});
});
}
@ -827,10 +815,10 @@ seastar::future<> ReplicatedRecoveryBackend::handle_push(
});
}
seastar::future<bool> ReplicatedRecoveryBackend::_handle_push_reply(
seastar::future<std::optional<PushOp>>
ReplicatedRecoveryBackend::_handle_push_reply(
pg_shard_t peer,
const PushReplyOp &op,
PushOp *reply)
const PushReplyOp &op)
{
const hobject_t& soid = op.soid;
logger().debug("{}, soid {}, from {}", __func__, soid, peer);
@ -838,27 +826,26 @@ seastar::future<bool> ReplicatedRecoveryBackend::_handle_push_reply(
if (recovering_iter == recovering.end()
|| !recovering_iter->second.pushing.count(peer)) {
logger().debug("huh, i wasn't pushing {} to osd.{}", soid, peer);
return seastar::make_ready_future<bool>(true);
return seastar::make_ready_future<std::optional<PushOp>>();
} else {
auto& pi = recovering_iter->second.pushing[peer];
return [this, &pi, &soid, reply, peer, recovering_iter] {
bool error = pi.recovery_progress.error;
if (!pi.recovery_progress.data_complete && !error) {
return build_push_op(pi.recovery_info, pi.recovery_progress,
&pi.stat, reply).then([&pi] (auto new_progress) {
pi.recovery_progress = new_progress;
return seastar::make_ready_future<bool>(false);
});
}
if (!error)
pg.get_recovery_handler()->on_peer_recover(peer, soid, pi.recovery_info);
recovering_iter->second.set_pushed(peer);
return seastar::make_ready_future<bool>(true);
}().handle_exception([recovering_iter, &pi, peer] (auto e) {
pi.recovery_progress.error = true;
recovering_iter->second.set_push_failed(peer, e);
return seastar::make_ready_future<bool>(true);
});
bool error = pi.recovery_progress.error;
if (!pi.recovery_progress.data_complete && !error) {
return build_push_op(pi.recovery_info, pi.recovery_progress,
&pi.stat).then([&pi] (auto pop) {
pi.recovery_progress = pop.after_progress;
return seastar::make_ready_future<std::optional<PushOp>>(std::move(pop));
}).handle_exception([recovering_iter, &pi, peer] (auto e) {
pi.recovery_progress.error = true;
recovering_iter->second.set_push_failed(peer, e);
return seastar::make_ready_future<std::optional<PushOp>>();
});
}
if (!error) {
pg.get_recovery_handler()->on_peer_recover(peer, soid, pi.recovery_info);
}
recovering_iter->second.set_pushed(peer);
return seastar::make_ready_future<std::optional<PushOp>>();
}
}
@ -869,21 +856,22 @@ seastar::future<> ReplicatedRecoveryBackend::handle_push_reply(
auto from = m->from;
auto& push_reply = m->replies[0]; //TODO: only one reply per message
return seastar::do_with(PushOp(), [this, &push_reply, from](auto& pop) {
return _handle_push_reply(from, push_reply, &pop).then(
[this, &pop, from](bool finished) {
if (!finished) {
auto msg = make_message<MOSDPGPush>();
msg->from = pg.get_pg_whoami();
msg->pgid = pg.get_pgid();
msg->map_epoch = pg.get_osdmap_epoch();
msg->min_epoch = pg.get_last_peering_reset();
msg->set_priority(pg.get_recovery_op_priority());
msg->pushes.push_back(pop);
return shard_services.send_to_osd(from.osd, std::move(msg), pg.get_osdmap_epoch());
}
return _handle_push_reply(from, push_reply).then(
[this, from](std::optional<PushOp> push_op) {
if (push_op) {
auto msg = make_message<MOSDPGPush>();
msg->from = pg.get_pg_whoami();
msg->pgid = pg.get_pgid();
msg->map_epoch = pg.get_osdmap_epoch();
msg->min_epoch = pg.get_last_peering_reset();
msg->set_priority(pg.get_recovery_op_priority());
msg->pushes.push_back(std::move(*push_op));
return shard_services.send_to_osd(from.osd,
std::move(msg),
pg.get_osdmap_epoch());
} else {
return seastar::make_ready_future<>();
});
}
});
}

View File

@ -44,23 +44,21 @@ protected:
Ref<MOSDPGRecoveryDelete> m);
seastar::future<> handle_recovery_delete_reply(
Ref<MOSDPGRecoveryDeleteReply> m);
seastar::future<> prep_push(
seastar::future<std::map<pg_shard_t, PushOp>> prep_push(
const hobject_t& soid,
eversion_t need,
std::map<pg_shard_t, PushOp>* pops,
const std::list<std::map<pg_shard_t, pg_missing_t>::const_iterator>& shards);
const std::vector<pg_shard_t>& shards);
void prepare_pull(
PullOp& po,
PullInfo& pi,
const hobject_t& soid,
eversion_t need);
std::list<std::map<pg_shard_t, pg_missing_t>::const_iterator> get_shards_to_push(
const hobject_t& soid);
seastar::future<ObjectRecoveryProgress> build_push_op(
std::vector<pg_shard_t> get_shards_to_push(
const hobject_t& soid) const;
seastar::future<PushOp> build_push_op(
const ObjectRecoveryInfo& recovery_info,
const ObjectRecoveryProgress& progress,
object_stat_sum_t* stat,
PushOp* pop);
object_stat_sum_t* stat);
seastar::future<bool> _handle_pull_response(
pg_shard_t from,
PushOp& pop,
@ -92,10 +90,9 @@ protected:
const PushOp &pop,
PushReplyOp *response,
ceph::os::Transaction *t);
seastar::future<bool> _handle_push_reply(
seastar::future<std::optional<PushOp>> _handle_push_reply(
pg_shard_t peer,
const PushReplyOp &op,
PushOp *reply);
const PushReplyOp &op);
seastar::future<> on_local_recover_persist(
const hobject_t& soid,
const ObjectRecoveryInfo& _recovery_info,
@ -110,9 +107,7 @@ protected:
}
private:
/// pull missing object from peer
///
/// @return true if the object is pulled, false otherwise
seastar::future<bool> maybe_pull_missing_obj(
seastar::future<> maybe_pull_missing_obj(
const hobject_t& soid,
eversion_t need);
@ -123,8 +118,7 @@ private:
seastar::future<> maybe_push_shards(
const hobject_t& soid,
eversion_t need,
std::map<pg_shard_t, PushOp>& pops,
std::list<std::map<pg_shard_t, pg_missing_t>::const_iterator>& shards);
std::vector<pg_shard_t>& shards);
/// read the remaining extents of object to be recovered and fill push_op
/// with them

View File

@ -40,8 +40,8 @@ public:
return pgid;
}
void take_pulls(std::vector<PullOp> *outpulls) {
outpulls->swap(pulls);
std::vector<PullOp> take_pulls() {
return std::move(pulls);
}
void set_pulls(std::vector<PullOp>&& pull_ops) {
pulls = std::move(pull_ops);

View File

@ -899,9 +899,7 @@ void ReplicatedBackend::do_pull(OpRequestRef op)
pg_shard_t from = m->from;
map<pg_shard_t, vector<PushOp> > replies;
vector<PullOp> pulls;
m->take_pulls(&pulls);
for (auto& i : pulls) {
for (auto& i : m->take_pulls()) {
replies[from].push_back(PushOp());
handle_pull(from, i, &(replies[from].back()));
}