crimson/osd: let build_push_op() return PushOp

for better readability

Signed-off-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2020-12-15 01:32:08 +08:00
parent 7b2ab64ed0
commit 2f88d7fbd4
2 changed files with 94 additions and 104 deletions

View File

@ -332,11 +332,12 @@ ReplicatedRecoveryBackend::prep_push(
HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS);
return build_push_op(pi.recovery_info, pi.recovery_progress,
&pi.stat, &pops[pg_shard]).then(
[this, soid, pg_shard](auto new_progress) {
&pi.stat).then(
[this, soid, pg_shard, &pops](auto pop) {
auto& recovery_waiter = recovering.at(soid);
auto& pi = recovery_waiter.pushing[pg_shard];
pi.recovery_progress = new_progress;
pi.recovery_progress = pop.after_progress;
pops[pg_shard] = std::move(pop);
return seastar::make_ready_future<>();
});
}).then([&pops]() mutable {
@ -378,12 +379,11 @@ void ReplicatedRecoveryBackend::prepare_pull(PullOp& po, PullInfo& pi,
pi.recovery_progress = po.recovery_progress;
}
seastar::future<ObjectRecoveryProgress> ReplicatedRecoveryBackend::build_push_op(
seastar::future<PushOp> ReplicatedRecoveryBackend::build_push_op(
const ObjectRecoveryInfo& recovery_info,
const ObjectRecoveryProgress& progress,
object_stat_sum_t* stat,
PushOp* pop
) {
object_stat_sum_t* stat)
{
logger().debug("{} {} @{}",
__func__, recovery_info.soid, recovery_info.version);
return seastar::do_with(ObjectRecoveryProgress(progress),
@ -391,9 +391,10 @@ seastar::future<ObjectRecoveryProgress> ReplicatedRecoveryBackend::build_push_op
uint64_t(crimson::common::local_conf()
->osd_recovery_max_chunk),
eversion_t(),
[this, &recovery_info, &progress, stat, pop]
(auto& new_progress, auto& oi, auto& available, auto& v) {
return [this, &recovery_info, &progress, &new_progress, &oi, pop, &v] {
PushOp(),
[this, &recovery_info, &progress, stat]
(auto& new_progress, auto& oi, auto& available, auto& v, auto& pop) {
return [this, &recovery_info, &progress, &new_progress, &oi, &v, pop=&pop] {
v = recovery_info.version;
if (progress.first) {
return backend->omap_get_header(coll, ghobject_t(recovery_info.soid))
@ -419,20 +420,20 @@ seastar::future<ObjectRecoveryProgress> ReplicatedRecoveryBackend::build_push_op
);
}
return seastar::make_ready_future<>();
}().then([this, &recovery_info, &progress, &new_progress, &available, pop] {
}().then([this, &recovery_info, &progress, &new_progress, &available, &pop]() mutable {
return read_omap_for_push_op(recovery_info.soid,
progress,
new_progress,
available, pop);
}).then([this, &recovery_info, &progress, &available, pop] {
available, &pop);
}).then([this, &recovery_info, &progress, &available, &pop]() mutable {
logger().debug("build_push_op: available: {}, copy_subset: {}",
available, recovery_info.copy_subset);
return read_object_for_push_op(recovery_info.soid,
recovery_info.copy_subset,
progress.data_recovered_to,
available, pop);
}).then([&recovery_info, &v, &progress, &new_progress, stat, pop]
(uint64_t recovered_to) {
available, &pop);
}).then([&recovery_info, &v, &progress, &new_progress, stat, &pop]
(uint64_t recovered_to) mutable {
new_progress.data_recovered_to = recovered_to;
if (new_progress.is_complete(recovery_info)) {
new_progress.data_complete = true;
@ -444,18 +445,17 @@ seastar::future<ObjectRecoveryProgress> ReplicatedRecoveryBackend::build_push_op
new_progress.omap_complete = false;
}
if (stat) {
stat->num_keys_recovered += pop->omap_entries.size();
stat->num_bytes_recovered += pop->data.length();
stat->num_keys_recovered += pop.omap_entries.size();
stat->num_bytes_recovered += pop.data.length();
}
pop->version = v;
pop->soid = recovery_info.soid;
pop->recovery_info = recovery_info;
pop->after_progress = new_progress;
pop->before_progress = progress;
pop.version = v;
pop.soid = recovery_info.soid;
pop.recovery_info = recovery_info;
pop.after_progress = new_progress;
pop.before_progress = progress;
logger().debug("build_push_op: pop version: {}, pop data length: {}",
pop->version, pop->data.length());
return seastar::make_ready_future<ObjectRecoveryProgress>
(std::move(new_progress));
pop.version, pop.data.length());
return seastar::make_ready_future<PushOp>(std::move(pop));
});
});
}
@ -586,42 +586,34 @@ seastar::future<> ReplicatedRecoveryBackend::handle_pull(Ref<MOSDPGPull> m)
[this, m, from = m->from](auto& pulls) {
return seastar::parallel_for_each(pulls, [this, m, from](auto& pull_op) {
const hobject_t& soid = pull_op.soid;
return seastar::do_with(PushOp(),
[this, &soid, &pull_op, from](auto& pop) {
logger().debug("handle_pull: {}", soid);
return backend->stat(coll, ghobject_t(soid)).then(
[this, &pull_op, &pop](auto st) {
ObjectRecoveryInfo &recovery_info = pull_op.recovery_info;
ObjectRecoveryProgress &progress = pull_op.recovery_progress;
if (progress.first && recovery_info.size == ((uint64_t) -1)) {
// Adjust size and copy_subset
recovery_info.size = st.st_size;
if (st.st_size) {
interval_set<uint64_t> object_range;
object_range.insert(0, st.st_size);
recovery_info.copy_subset.intersection_of(object_range);
} else {
recovery_info.copy_subset.clear();
}
assert(recovery_info.clone_subset.empty());
}
return build_push_op(recovery_info, progress, 0, &pop);
}).handle_exception([soid, &pop](auto e) {
pop.recovery_info.version = eversion_t();
pop.version = eversion_t();
pop.soid = soid;
return seastar::make_ready_future<ObjectRecoveryProgress>();
}).then([this, &pop, from](auto new_progress) {
auto msg = make_message<MOSDPGPush>();
msg->from = pg.get_pg_whoami();
msg->pgid = pg.get_pgid();
msg->map_epoch = pg.get_osdmap_epoch();
msg->min_epoch = pg.get_last_peering_reset();
msg->set_priority(pg.get_recovery_op_priority());
msg->pushes.push_back(pop);
return shard_services.send_to_osd(from.osd, std::move(msg),
pg.get_osdmap_epoch());
});
logger().debug("handle_pull: {}", soid);
return backend->stat(coll, ghobject_t(soid)).then(
[this, &pull_op](auto st) {
ObjectRecoveryInfo &recovery_info = pull_op.recovery_info;
ObjectRecoveryProgress &progress = pull_op.recovery_progress;
if (progress.first && recovery_info.size == ((uint64_t) -1)) {
// Adjust size and copy_subset
recovery_info.size = st.st_size;
if (st.st_size) {
interval_set<uint64_t> object_range;
object_range.insert(0, st.st_size);
recovery_info.copy_subset.intersection_of(object_range);
} else {
recovery_info.copy_subset.clear();
}
assert(recovery_info.clone_subset.empty());
}
return build_push_op(recovery_info, progress, 0);
}).then([this, from](auto pop) {
auto msg = make_message<MOSDPGPush>();
msg->from = pg.get_pg_whoami();
msg->pgid = pg.get_pgid();
msg->map_epoch = pg.get_osdmap_epoch();
msg->min_epoch = pg.get_last_peering_reset();
msg->set_priority(pg.get_recovery_op_priority());
msg->pushes.push_back(std::move(pop));
return shard_services.send_to_osd(from.osd, std::move(msg),
pg.get_osdmap_epoch());
});
});
});
@ -827,10 +819,10 @@ seastar::future<> ReplicatedRecoveryBackend::handle_push(
});
}
seastar::future<bool> ReplicatedRecoveryBackend::_handle_push_reply(
seastar::future<std::optional<PushOp>>
ReplicatedRecoveryBackend::_handle_push_reply(
pg_shard_t peer,
const PushReplyOp &op,
PushOp *reply)
const PushReplyOp &op)
{
const hobject_t& soid = op.soid;
logger().debug("{}, soid {}, from {}", __func__, soid, peer);
@ -838,27 +830,26 @@ seastar::future<bool> ReplicatedRecoveryBackend::_handle_push_reply(
if (recovering_iter == recovering.end()
|| !recovering_iter->second.pushing.count(peer)) {
logger().debug("huh, i wasn't pushing {} to osd.{}", soid, peer);
return seastar::make_ready_future<bool>(true);
return seastar::make_ready_future<std::optional<PushOp>>();
} else {
auto& pi = recovering_iter->second.pushing[peer];
return [this, &pi, &soid, reply, peer, recovering_iter] {
bool error = pi.recovery_progress.error;
if (!pi.recovery_progress.data_complete && !error) {
return build_push_op(pi.recovery_info, pi.recovery_progress,
&pi.stat, reply).then([&pi] (auto new_progress) {
pi.recovery_progress = new_progress;
return seastar::make_ready_future<bool>(false);
});
}
if (!error)
pg.get_recovery_handler()->on_peer_recover(peer, soid, pi.recovery_info);
recovering_iter->second.set_pushed(peer);
return seastar::make_ready_future<bool>(true);
}().handle_exception([recovering_iter, &pi, peer] (auto e) {
pi.recovery_progress.error = true;
recovering_iter->second.set_push_failed(peer, e);
return seastar::make_ready_future<bool>(true);
});
bool error = pi.recovery_progress.error;
if (!pi.recovery_progress.data_complete && !error) {
return build_push_op(pi.recovery_info, pi.recovery_progress,
&pi.stat).then([&pi] (auto pop) {
pi.recovery_progress = pop.after_progress;
return seastar::make_ready_future<std::optional<PushOp>>(std::move(pop));
}).handle_exception([recovering_iter, &pi, peer] (auto e) {
pi.recovery_progress.error = true;
recovering_iter->second.set_push_failed(peer, e);
return seastar::make_ready_future<std::optional<PushOp>>();
});
}
if (!error) {
pg.get_recovery_handler()->on_peer_recover(peer, soid, pi.recovery_info);
}
recovering_iter->second.set_pushed(peer);
return seastar::make_ready_future<std::optional<PushOp>>();
}
}
@ -869,21 +860,22 @@ seastar::future<> ReplicatedRecoveryBackend::handle_push_reply(
auto from = m->from;
auto& push_reply = m->replies[0]; //TODO: only one reply per message
return seastar::do_with(PushOp(), [this, &push_reply, from](auto& pop) {
return _handle_push_reply(from, push_reply, &pop).then(
[this, &pop, from](bool finished) {
if (!finished) {
auto msg = make_message<MOSDPGPush>();
msg->from = pg.get_pg_whoami();
msg->pgid = pg.get_pgid();
msg->map_epoch = pg.get_osdmap_epoch();
msg->min_epoch = pg.get_last_peering_reset();
msg->set_priority(pg.get_recovery_op_priority());
msg->pushes.push_back(pop);
return shard_services.send_to_osd(from.osd, std::move(msg), pg.get_osdmap_epoch());
}
return _handle_push_reply(from, push_reply).then(
[this, from](std::optional<PushOp> push_op) {
if (push_op) {
auto msg = make_message<MOSDPGPush>();
msg->from = pg.get_pg_whoami();
msg->pgid = pg.get_pgid();
msg->map_epoch = pg.get_osdmap_epoch();
msg->min_epoch = pg.get_last_peering_reset();
msg->set_priority(pg.get_recovery_op_priority());
msg->pushes.push_back(std::move(*push_op));
return shard_services.send_to_osd(from.osd,
std::move(msg),
pg.get_osdmap_epoch());
} else {
return seastar::make_ready_future<>();
});
}
});
}

View File

@ -55,11 +55,10 @@ protected:
eversion_t need);
std::vector<pg_shard_t> get_shards_to_push(
const hobject_t& soid) const;
seastar::future<ObjectRecoveryProgress> build_push_op(
seastar::future<PushOp> build_push_op(
const ObjectRecoveryInfo& recovery_info,
const ObjectRecoveryProgress& progress,
object_stat_sum_t* stat,
PushOp* pop);
object_stat_sum_t* stat);
seastar::future<bool> _handle_pull_response(
pg_shard_t from,
PushOp& pop,
@ -91,10 +90,9 @@ protected:
const PushOp &pop,
PushReplyOp *response,
ceph::os::Transaction *t);
seastar::future<bool> _handle_push_reply(
seastar::future<std::optional<PushOp>> _handle_push_reply(
pg_shard_t peer,
const PushReplyOp &op,
PushOp *reply);
const PushReplyOp &op);
seastar::future<> on_local_recover_persist(
const hobject_t& soid,
const ObjectRecoveryInfo& _recovery_info,