mirror of
https://github.com/ceph/ceph
synced 2025-02-19 08:57:27 +00:00
crimson/osd/*recovery_backend:Rename pi to pull_info
Signed-off-by: Matan Breizman <mbreizma@redhat.com>
This commit is contained in:
parent
22adfcc9b4
commit
9802136108
@ -41,8 +41,9 @@ void RecoveryBackend::clean_up(ceph::os::Transaction& t,
|
||||
temp_contents.clear();
|
||||
|
||||
for (auto& [soid, recovery_waiter] : recovering) {
|
||||
if ((recovery_waiter->pi && recovery_waiter->pi->is_complete())
|
||||
|| (!recovery_waiter->pi
|
||||
if ((recovery_waiter->pull_info
|
||||
&& recovery_waiter->pull_info->is_complete())
|
||||
|| (!recovery_waiter->pull_info
|
||||
&& recovery_waiter->obc && recovery_waiter->obc->obs.exists)) {
|
||||
recovery_waiter->obc->interrupt(
|
||||
::crimson::common::actingset_changed(
|
||||
|
@ -130,7 +130,7 @@ public:
|
||||
static constexpr const char* type_name = "WaitForObjectRecovery";
|
||||
|
||||
crimson::osd::ObjectContextRef obc;
|
||||
std::optional<pull_info_t> pi;
|
||||
std::optional<pull_info_t> pull_info;
|
||||
std::map<pg_shard_t, PushInfo> pushing;
|
||||
|
||||
seastar::future<> wait_for_readable() {
|
||||
|
@ -85,10 +85,10 @@ ReplicatedRecoveryBackend::maybe_push_shards(
|
||||
pg.get_recovery_handler()->on_global_recover(soid,
|
||||
push_info->second.stat,
|
||||
false);
|
||||
} else if (recovery.pi) {
|
||||
} else if (recovery.pull_info) {
|
||||
// no push happened (empty get_shards_to_push()) but pull actually did
|
||||
pg.get_recovery_handler()->on_global_recover(soid,
|
||||
recovery.pi->stat,
|
||||
recovery.pull_info->stat,
|
||||
false);
|
||||
} else {
|
||||
// no pulls, no pushes
|
||||
@ -115,9 +115,10 @@ ReplicatedRecoveryBackend::maybe_pull_missing_obj(
|
||||
}
|
||||
PullOp po;
|
||||
auto& recovery_waiter = get_recovering(soid);
|
||||
recovery_waiter.pi = std::make_optional<RecoveryBackend::pull_info_t>();
|
||||
auto& pi = *recovery_waiter.pi;
|
||||
prepare_pull(po, pi, soid, need);
|
||||
recovery_waiter.pull_info =
|
||||
std::make_optional<RecoveryBackend::pull_info_t>();
|
||||
auto& pull_info = *recovery_waiter.pull_info;
|
||||
prepare_pull(po, pull_info, soid, need);
|
||||
auto msg = crimson::make_message<MOSDPGPull>();
|
||||
msg->from = pg.get_pg_whoami();
|
||||
msg->set_priority(pg.get_recovery_op_priority());
|
||||
@ -127,7 +128,7 @@ ReplicatedRecoveryBackend::maybe_pull_missing_obj(
|
||||
msg->set_pulls({std::move(po)});
|
||||
return interruptor::make_interruptible(
|
||||
shard_services.send_to_osd(
|
||||
pi.from.osd,
|
||||
pull_info.from.osd,
|
||||
std::move(msg),
|
||||
pg.get_osdmap_epoch()
|
||||
)).then_interruptible([&recovery_waiter] {
|
||||
@ -337,7 +338,8 @@ ReplicatedRecoveryBackend::prep_push(
|
||||
});
|
||||
}
|
||||
|
||||
void ReplicatedRecoveryBackend::prepare_pull(PullOp& po, pull_info_t& pi,
|
||||
void ReplicatedRecoveryBackend::prepare_pull(PullOp& po,
|
||||
pull_info_t& pull_info,
|
||||
const hobject_t& soid,
|
||||
eversion_t need) {
|
||||
logger().debug("{}: {}, {}", __func__, soid, need);
|
||||
@ -362,10 +364,10 @@ void ReplicatedRecoveryBackend::prepare_pull(PullOp& po, pull_info_t& pi,
|
||||
po.recovery_progress.data_recovered_to = 0;
|
||||
po.recovery_progress.first = true;
|
||||
|
||||
pi.from = fromshard;
|
||||
pi.soid = soid;
|
||||
pi.recovery_info = po.recovery_info;
|
||||
pi.recovery_progress = po.recovery_progress;
|
||||
pull_info.from = fromshard;
|
||||
pull_info.soid = soid;
|
||||
pull_info.recovery_info = po.recovery_info;
|
||||
pull_info.recovery_progress = po.recovery_progress;
|
||||
}
|
||||
|
||||
RecoveryBackend::interruptible_future<PushOp>
|
||||
@ -664,34 +666,36 @@ ReplicatedRecoveryBackend::_handle_pull_response(
|
||||
|
||||
const hobject_t &hoid = pop.soid;
|
||||
auto& recovery_waiter = get_recovering(hoid);
|
||||
auto& pi = *recovery_waiter.pi;
|
||||
if (pi.recovery_info.size == (uint64_t(-1))) {
|
||||
pi.recovery_info.size = pop.recovery_info.size;
|
||||
pi.recovery_info.copy_subset.intersection_of(
|
||||
auto& pull_info = *recovery_waiter.pull_info;
|
||||
if (pull_info.recovery_info.size == (uint64_t(-1))) {
|
||||
pull_info.recovery_info.size = pop.recovery_info.size;
|
||||
pull_info.recovery_info.copy_subset.intersection_of(
|
||||
pop.recovery_info.copy_subset);
|
||||
}
|
||||
|
||||
// If primary doesn't have object info and didn't know version
|
||||
if (pi.recovery_info.version == eversion_t())
|
||||
pi.recovery_info.version = pop.version;
|
||||
if (pull_info.recovery_info.version == eversion_t())
|
||||
pull_info.recovery_info.version = pop.version;
|
||||
|
||||
auto prepare_waiter = interruptor::make_interruptible(
|
||||
seastar::make_ready_future<>());
|
||||
if (pi.recovery_progress.first) {
|
||||
if (pull_info.recovery_progress.first) {
|
||||
prepare_waiter = pg.obc_loader.with_obc<RWState::RWNONE>(
|
||||
pi.recovery_info.soid, [&pi, &recovery_waiter, &pop](auto obc) {
|
||||
pi.obc = obc;
|
||||
pull_info.recovery_info.soid,
|
||||
[&pull_info, &recovery_waiter, &pop](auto obc) {
|
||||
pull_info.obc = obc;
|
||||
recovery_waiter.obc = obc;
|
||||
obc->obs.oi.decode_no_oid(pop.attrset.at(OI_ATTR), pop.soid);
|
||||
pi.recovery_info.oi = obc->obs.oi;
|
||||
pull_info.recovery_info.oi = obc->obs.oi;
|
||||
return crimson::osd::PG::load_obc_ertr::now();
|
||||
}).handle_error_interruptible(crimson::ct_error::assert_all{});
|
||||
};
|
||||
return prepare_waiter.then_interruptible([this, &pi, &pop, t, response]() mutable {
|
||||
const bool first = pi.recovery_progress.first;
|
||||
pi.recovery_progress = pop.after_progress;
|
||||
return prepare_waiter.then_interruptible(
|
||||
[this, &pull_info, &pop, t, response]() mutable {
|
||||
const bool first = pull_info.recovery_progress.first;
|
||||
pull_info.recovery_progress = pop.after_progress;
|
||||
logger().debug("new recovery_info {}, new progress {}",
|
||||
pi.recovery_info, pi.recovery_progress);
|
||||
pull_info.recovery_info, pull_info.recovery_progress);
|
||||
interval_set<uint64_t> data_zeros;
|
||||
{
|
||||
uint64_t offset = pop.before_progress.data_recovered_to;
|
||||
@ -702,29 +706,31 @@ ReplicatedRecoveryBackend::_handle_pull_response(
|
||||
}
|
||||
}
|
||||
auto [usable_intervals, data] =
|
||||
trim_pushed_data(pi.recovery_info.copy_subset,
|
||||
trim_pushed_data(pull_info.recovery_info.copy_subset,
|
||||
pop.data_included, pop.data);
|
||||
bool complete = pi.is_complete();
|
||||
bool complete = pull_info.is_complete();
|
||||
bool clear_omap = !pop.before_progress.omap_complete;
|
||||
return submit_push_data(pi.recovery_info, first, complete, clear_omap,
|
||||
return submit_push_data(pull_info.recovery_info,
|
||||
first, complete, clear_omap,
|
||||
std::move(data_zeros), std::move(usable_intervals),
|
||||
std::move(data), std::move(pop.omap_header),
|
||||
pop.attrset, std::move(pop.omap_entries), t)
|
||||
.then_interruptible(
|
||||
[this, response, &pi, &pop, complete, t, bytes_recovered=data.length()] {
|
||||
pi.stat.num_keys_recovered += pop.omap_entries.size();
|
||||
pi.stat.num_bytes_recovered += bytes_recovered;
|
||||
[this, response, &pull_info, &pop, complete,
|
||||
t, bytes_recovered=data.length()] {
|
||||
pull_info.stat.num_keys_recovered += pop.omap_entries.size();
|
||||
pull_info.stat.num_bytes_recovered += bytes_recovered;
|
||||
|
||||
if (complete) {
|
||||
pi.stat.num_objects_recovered++;
|
||||
pull_info.stat.num_objects_recovered++;
|
||||
pg.get_recovery_handler()->on_local_recover(
|
||||
pop.soid, get_recovering(pop.soid).pi->recovery_info,
|
||||
pop.soid, get_recovering(pop.soid).pull_info->recovery_info,
|
||||
false, *t);
|
||||
return true;
|
||||
} else {
|
||||
response->soid = pop.soid;
|
||||
response->recovery_info = pi.recovery_info;
|
||||
response->recovery_progress = pi.recovery_progress;
|
||||
response->recovery_info = pull_info.recovery_info;
|
||||
response->recovery_progress = pull_info.recovery_progress;
|
||||
return false;
|
||||
}
|
||||
});
|
||||
@ -743,7 +749,7 @@ ReplicatedRecoveryBackend::handle_pull_response(
|
||||
if (pop.version == eversion_t()) {
|
||||
// replica doesn't have it!
|
||||
pg.get_recovery_handler()->on_failed_recover({ m->from }, pop.soid,
|
||||
get_recovering(pop.soid).pi->recovery_info.version);
|
||||
get_recovering(pop.soid).pull_info->recovery_info.version);
|
||||
return seastar::make_exception_future<>(
|
||||
std::runtime_error(fmt::format(
|
||||
"Error on pushing side {} when pulling obj {}",
|
||||
|
@ -53,7 +53,7 @@ protected:
|
||||
pg_shard_t pg_shard);
|
||||
void prepare_pull(
|
||||
PullOp& po,
|
||||
pull_info_t& pi,
|
||||
pull_info_t& pull_info,
|
||||
const hobject_t& soid,
|
||||
eversion_t need);
|
||||
std::vector<pg_shard_t> get_shards_to_push(
|
||||
|
Loading…
Reference in New Issue
Block a user