mirror of
https://github.com/ceph/ceph
synced 2025-01-03 01:22:53 +00:00
crimson/osd: recovery read lock should be held across the hole object push phase
Signed-off-by: Xuehan Xu <xxhdx1985126@gmail.com>
This commit is contained in:
parent
39c8941c5a
commit
d8293d937f
@ -180,7 +180,6 @@ public:
|
||||
}
|
||||
void drop_recovery_read() {
|
||||
assert(recovery_read_marker);
|
||||
lock.unlock_for_read();
|
||||
recovery_read_marker = false;
|
||||
}
|
||||
bool maybe_get_excl() {
|
||||
|
@ -28,22 +28,25 @@ seastar::future<> ReplicatedRecoveryBackend::recover_object(
|
||||
// start tracking the recovery of soid
|
||||
return seastar::do_with(std::map<pg_shard_t, PushOp>(), get_shards_to_push(soid),
|
||||
[this, soid, need](auto& pops, auto& shards) {
|
||||
return maybe_pull_missing_obj(soid, need).then([this, soid](bool pulled) {
|
||||
return load_obc_for_recovery(soid, pulled);
|
||||
}).safe_then([this, soid, need, &pops, &shards] {
|
||||
if (!shards.empty()) {
|
||||
return prep_push(soid, need, &pops, shards);
|
||||
} else {
|
||||
return seastar::now();
|
||||
}
|
||||
}, crimson::ct_error::all_same_way([this, soid](const std::error_code& e) {
|
||||
auto recovery_waiter = recovering.find(soid);
|
||||
if (auto obc = recovery_waiter->second.obc; obc) {
|
||||
obc->drop_recovery_read();
|
||||
}
|
||||
recovering.erase(recovery_waiter);
|
||||
return seastar::make_exception_future<>(e);
|
||||
})).then([this, &pops, &shards, soid] {
|
||||
return maybe_pull_missing_obj(soid, need).then(
|
||||
[this, soid, need, &pops, &shards](bool pulled) {
|
||||
return maybe_push_shards(soid, need, pops, shards);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
seastar::future<>
|
||||
ReplicatedRecoveryBackend::maybe_push_shards(
|
||||
const hobject_t& soid,
|
||||
eversion_t need,
|
||||
std::map<pg_shard_t, PushOp>& pops,
|
||||
std::list<std::map<pg_shard_t, pg_missing_t>::const_iterator>& shards)
|
||||
{
|
||||
auto push_func = [this, soid, need, &pops, &shards] {
|
||||
auto fut = seastar::now();
|
||||
if (!shards.empty())
|
||||
fut = prep_push(soid, need, &pops, shards);
|
||||
return fut.then([this, &pops, &shards, soid] {
|
||||
return seastar::parallel_for_each(shards,
|
||||
[this, &pops, soid](auto shard) {
|
||||
auto msg = make_message<MOSDPGPush>();
|
||||
@ -54,7 +57,7 @@ seastar::future<> ReplicatedRecoveryBackend::recover_object(
|
||||
msg->set_priority(pg.get_recovery_op_priority());
|
||||
msg->pushes.push_back(pops[shard->first]);
|
||||
return shard_services.send_to_osd(shard->first.osd, std::move(msg),
|
||||
pg.get_osdmap_epoch()).then(
|
||||
pg.get_osdmap_epoch()).then(
|
||||
[this, soid, shard] {
|
||||
return recovering.at(soid).wait_for_pushes(shard->first);
|
||||
});
|
||||
@ -79,7 +82,27 @@ seastar::future<> ReplicatedRecoveryBackend::recover_object(
|
||||
recovering.erase(soid);
|
||||
return seastar::make_exception_future<>(e);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
auto& recovery_waiter = recovering.at(soid);
|
||||
if (recovery_waiter.obc) {
|
||||
return seastar::futurize_invoke(std::move(push_func));
|
||||
}
|
||||
logger().debug("recover_object: loading obc: {}", soid);
|
||||
return pg.with_head_obc<RWState::RWREAD>(soid,
|
||||
[&recovery_waiter, push_func=std::move(push_func)](auto obc) {
|
||||
logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid);
|
||||
recovery_waiter.obc = obc;
|
||||
recovery_waiter.obc->wait_recovery_read();
|
||||
return seastar::futurize_invoke(std::move(push_func));
|
||||
}).template safe_then(
|
||||
[] { return seastar::now(); },
|
||||
crimson::osd::PG::load_obc_ertr::all_same_way([soid](auto& code) {
|
||||
//TODO: may need eio handling?
|
||||
logger().error("recover_object saw error code {},"
|
||||
" ignoring object {}", code, soid);
|
||||
return seastar::now();
|
||||
}));
|
||||
}
|
||||
|
||||
seastar::future<bool>
|
||||
@ -114,23 +137,6 @@ ReplicatedRecoveryBackend::maybe_pull_missing_obj(
|
||||
});
|
||||
}
|
||||
|
||||
auto ReplicatedRecoveryBackend::load_obc_for_recovery(
|
||||
const hobject_t& soid,
|
||||
bool pulled) ->
|
||||
load_obc_ertr::future<>
|
||||
{
|
||||
auto& recovery_waiter = recovering.at(soid);
|
||||
if (recovery_waiter.obc) {
|
||||
return load_obc_ertr::now();
|
||||
}
|
||||
return pg.with_head_obc<RWState::RWREAD>(soid, [&recovery_waiter](auto obc) {
|
||||
logger().debug("load_obc_for_recovery: loaded obc: {}", obc->obs.oi.soid);
|
||||
recovery_waiter.obc = obc;
|
||||
recovery_waiter.obc->wait_recovery_read();
|
||||
return seastar::now();
|
||||
});
|
||||
}
|
||||
|
||||
seastar::future<> ReplicatedRecoveryBackend::push_delete(
|
||||
const hobject_t& soid,
|
||||
eversion_t need)
|
||||
|
@ -119,9 +119,12 @@ private:
|
||||
/// load object context for recovery if it is not ready yet
|
||||
using load_obc_ertr = crimson::errorator<
|
||||
crimson::ct_error::object_corrupted>;
|
||||
load_obc_ertr::future<> load_obc_for_recovery(
|
||||
|
||||
seastar::future<> maybe_push_shards(
|
||||
const hobject_t& soid,
|
||||
bool pulled);
|
||||
eversion_t need,
|
||||
std::map<pg_shard_t, PushOp>& pops,
|
||||
std::list<std::map<pg_shard_t, pg_missing_t>::const_iterator>& shards);
|
||||
|
||||
/// read the remaining extents of object to be recovered and fill push_op
|
||||
/// with them
|
||||
|
Loading…
Reference in New Issue
Block a user