mirror of
https://github.com/ceph/ceph
synced 2025-01-31 15:32:38 +00:00
Merge pull request #53078 from xxhdx1985126/wip-crimson-pg-recovery
crimson/osd/pg_recovery: avoiding duplicated object recovering Reviewed-by: Samuel Just <sjust@redhat.com> Reviewed-by: Matan Breizman <mbreizma@redhat.com>
This commit is contained in:
commit
a538294a55
@ -275,20 +275,27 @@ PGRecovery::recover_missing(
|
||||
RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
|
||||
const hobject_t &soid, eversion_t need)
|
||||
{
|
||||
if (pg->get_peering_state().get_missing_loc().is_deleted(soid)) {
|
||||
return pg->get_recovery_backend()->add_recovering(soid).wait_track_blocking(
|
||||
trigger,
|
||||
pg->get_recovery_backend()->recover_delete(soid, need));
|
||||
logger().info("{} {} v {}", __func__, soid, need);
|
||||
auto [recovering, added] = pg->get_recovery_backend()->add_recovering(soid);
|
||||
if (added) {
|
||||
logger().info("{} {} v {}, new recovery", __func__, soid, need);
|
||||
if (pg->get_peering_state().get_missing_loc().is_deleted(soid)) {
|
||||
return recovering.wait_track_blocking(
|
||||
trigger,
|
||||
pg->get_recovery_backend()->recover_delete(soid, need));
|
||||
} else {
|
||||
return recovering.wait_track_blocking(
|
||||
trigger,
|
||||
pg->get_recovery_backend()->recover_object(soid, need)
|
||||
.handle_exception_interruptible(
|
||||
[=, this, soid = std::move(soid)] (auto e) {
|
||||
on_failed_recover({ pg->get_pg_whoami() }, soid, need);
|
||||
return seastar::make_ready_future<>();
|
||||
})
|
||||
);
|
||||
}
|
||||
} else {
|
||||
return pg->get_recovery_backend()->add_recovering(soid).wait_track_blocking(
|
||||
trigger,
|
||||
pg->get_recovery_backend()->recover_object(soid, need)
|
||||
.handle_exception_interruptible(
|
||||
[=, this, soid = std::move(soid)] (auto e) {
|
||||
on_failed_recover({ pg->get_pg_whoami() }, soid, need);
|
||||
return seastar::make_ready_future<>();
|
||||
})
|
||||
);
|
||||
return recovering.wait_for_recovered();
|
||||
}
|
||||
}
|
||||
|
||||
@ -297,16 +304,23 @@ RecoveryBackend::interruptible_future<> PGRecovery::prep_object_replica_deletes(
|
||||
const hobject_t& soid,
|
||||
eversion_t need)
|
||||
{
|
||||
return pg->get_recovery_backend()->add_recovering(soid).wait_track_blocking(
|
||||
trigger,
|
||||
pg->get_recovery_backend()->push_delete(soid, need).then_interruptible(
|
||||
[=, this] {
|
||||
object_stat_sum_t stat_diff;
|
||||
stat_diff.num_objects_recovered = 1;
|
||||
on_global_recover(soid, stat_diff, true);
|
||||
return seastar::make_ready_future<>();
|
||||
})
|
||||
);
|
||||
logger().info("{} {} v {}", __func__, soid, need);
|
||||
auto [recovering, added] = pg->get_recovery_backend()->add_recovering(soid);
|
||||
if (added) {
|
||||
logger().info("{} {} v {}, new recovery", __func__, soid, need);
|
||||
return recovering.wait_track_blocking(
|
||||
trigger,
|
||||
pg->get_recovery_backend()->push_delete(soid, need).then_interruptible(
|
||||
[=, this] {
|
||||
object_stat_sum_t stat_diff;
|
||||
stat_diff.num_objects_recovered = 1;
|
||||
on_global_recover(soid, stat_diff, true);
|
||||
return seastar::make_ready_future<>();
|
||||
})
|
||||
);
|
||||
} else {
|
||||
return recovering.wait_for_recovered();
|
||||
}
|
||||
}
|
||||
|
||||
RecoveryBackend::interruptible_future<> PGRecovery::prep_object_replica_pushes(
|
||||
@ -314,15 +328,22 @@ RecoveryBackend::interruptible_future<> PGRecovery::prep_object_replica_pushes(
|
||||
const hobject_t& soid,
|
||||
eversion_t need)
|
||||
{
|
||||
return pg->get_recovery_backend()->add_recovering(soid).wait_track_blocking(
|
||||
trigger,
|
||||
pg->get_recovery_backend()->recover_object(soid, need)
|
||||
.handle_exception_interruptible(
|
||||
[=, this, soid = std::move(soid)] (auto e) {
|
||||
on_failed_recover({ pg->get_pg_whoami() }, soid, need);
|
||||
return seastar::make_ready_future<>();
|
||||
})
|
||||
);
|
||||
logger().info("{} {} v {}", __func__, soid, need);
|
||||
auto [recovering, added] = pg->get_recovery_backend()->add_recovering(soid);
|
||||
if (added) {
|
||||
logger().info("{} {} v {}, new recovery", __func__, soid, need);
|
||||
return recovering.wait_track_blocking(
|
||||
trigger,
|
||||
pg->get_recovery_backend()->recover_object(soid, need)
|
||||
.handle_exception_interruptible(
|
||||
[=, this, soid = std::move(soid)] (auto e) {
|
||||
on_failed_recover({ pg->get_pg_whoami() }, soid, need);
|
||||
return seastar::make_ready_future<>();
|
||||
})
|
||||
);
|
||||
} else {
|
||||
return recovering.wait_for_recovered();
|
||||
}
|
||||
}
|
||||
|
||||
RecoveryBackend::interruptible_future<>
|
||||
@ -477,9 +498,11 @@ void PGRecovery::enqueue_push(
|
||||
const hobject_t& obj,
|
||||
const eversion_t& v)
|
||||
{
|
||||
logger().debug("{}: obj={} v={}",
|
||||
logger().info("{}: obj={} v={}",
|
||||
__func__, obj, v);
|
||||
pg->get_recovery_backend()->add_recovering(obj);
|
||||
auto [recovering, added] = pg->get_recovery_backend()->add_recovering(obj);
|
||||
if (!added)
|
||||
return;
|
||||
std::ignore = pg->get_recovery_backend()->recover_object(obj, v).\
|
||||
handle_exception_interruptible([] (auto) {
|
||||
ceph_abort_msg("got exception on backfill's push");
|
||||
|
@ -45,10 +45,10 @@ public:
|
||||
coll{coll},
|
||||
backend{backend} {}
|
||||
virtual ~RecoveryBackend() {}
|
||||
WaitForObjectRecovery& add_recovering(const hobject_t& soid) {
|
||||
std::pair<WaitForObjectRecovery&, bool> add_recovering(const hobject_t& soid) {
|
||||
auto [it, added] = recovering.emplace(soid, new WaitForObjectRecovery{});
|
||||
assert(added);
|
||||
return *(it->second);
|
||||
assert(it->second);
|
||||
return {*(it->second), added};
|
||||
}
|
||||
WaitForObjectRecovery& get_recovering(const hobject_t& soid) {
|
||||
assert(is_recovering(soid));
|
||||
|
Loading…
Reference in New Issue
Block a user