crimson/osd: Introduce with_head_and_clone_obc()

In continuation to 7ca2690be956a36f61c7729946b94ccd970dd9c7:
Now that the head ref is no longer a member of obc, we need a new
substitute way to get the head when needed.

When loading a clone object, the head object is loaded
first (See with_clone_obc). Therefore we can make use of this design
to move the loaded head forward to the relevant func (See with_head_and_clone_obc).
Usually, we wouldn't need to make use of both the head and the clone obc in the
same function. However, SnapTrimObjSubEvent::remove_or_update is an abnormal usage.

Note: We want to avoid holding any unneeded references to obcs
to allow the obc_registery to evict no longer valid obc.
Therefore, with_obc() which references only a single obc is the
preferred entry point for loading obcs.

Signed-off-by: Matan Breizman <mbreizma@redhat.com>
This commit is contained in:
Matan Breizman 2023-03-05 09:31:03 +00:00
parent 4bf99c69f2
commit 3ea1d8e09d
3 changed files with 62 additions and 4 deletions

View File

@ -80,6 +80,45 @@ using crimson::common::local_conf;
});
}
template<RWState::State State>
ObjectContextLoader::load_obc_iertr::future<>
ObjectContextLoader::with_head_and_clone_obc(
hobject_t oid,
with_both_obc_func_t&& func)
{
LOG_PREFIX(ObjectContextLoader::with_head_and_clone_obc);
assert(!oid.is_head());
return with_obc<RWState::RWREAD>(
oid.get_head(),
[FNAME, oid, func=std::move(func), this](auto head) mutable
-> load_obc_iertr::future<> {
if (!head->obs.exists) {
ERRORDPP("head doesn't exist for object {}", dpp, head->obs.oi.soid);
return load_obc_iertr::future<>{
crimson::ct_error::enoent::make()
};
}
auto coid = resolve_oid(head->get_head_ss(), oid);
if (!coid) {
ERRORDPP("clone {} not found", dpp, oid);
return load_obc_iertr::future<>{
crimson::ct_error::enoent::make()
};
}
auto [clone, existed] = obc_registry.get_cached_obc(*coid);
return clone->template with_lock<State, IOInterruptCondition>(
[existed=existed, clone=std::move(clone),
func=std::move(func), head=std::move(head), this]()
-> load_obc_iertr::future<> {
auto loaded = get_or_load_obc<State>(clone, existed);
return loaded.safe_then_interruptible(
[func = std::move(func), head=std::move(head)](auto clone) {
return std::move(func)(std::move(head), std::move(clone));
});
});
});
}
template<RWState::State State>
ObjectContextLoader::load_obc_iertr::future<>
ObjectContextLoader::with_obc(hobject_t oid,
@ -186,4 +225,9 @@ using crimson::common::local_conf;
template ObjectContextLoader::load_obc_iertr::future<>
ObjectContextLoader::with_obc<RWState::RWEXCL>(hobject_t,
with_obc_func_t&&);
template ObjectContextLoader::load_obc_iertr::future<>
ObjectContextLoader::with_head_and_clone_obc<RWState::RWWRITE>(
hobject_t,
with_both_obc_func_t&&);
}

View File

@ -32,6 +32,9 @@ public:
using with_obc_func_t =
std::function<load_obc_iertr::future<> (ObjectContextRef)>;
using with_both_obc_func_t =
std::function<load_obc_iertr::future<> (ObjectContextRef, ObjectContextRef)>;
template<RWState::State State>
load_obc_iertr::future<> with_obc(hobject_t oid,
with_obc_func_t&& func);
@ -48,6 +51,14 @@ public:
hobject_t oid,
with_obc_func_t&& func);
// Use this variant in the case where both the head
// object *and* the matching clone object are being used
// in func.
template<RWState::State State>
load_obc_iertr::future<> with_head_and_clone_obc(
hobject_t oid,
with_both_obc_func_t&& func);
template<RWState::State State>
load_obc_iertr::future<> with_head_obc(ObjectContextRef obc,
bool existed,

View File

@ -494,15 +494,18 @@ SnapTrimObjSubEvent::with_pg(
}).then_interruptible([this] {
logger().debug("{}: getting obc for {}", *this, coid);
// end of commonality
// with_cone_obc lock both clone's and head's obcs
return pg->obc_loader.with_clone_obc<RWState::RWWRITE>(coid, [this](auto clone_obc) {
// with_head_and_clone_obc lock both clone's and head's obcs
return pg->obc_loader.with_head_and_clone_obc<RWState::RWWRITE>(
coid,
[this](auto head_obc, auto clone_obc) {
logger().debug("{}: got clone_obc={}", *this, fmt::ptr(clone_obc.get()));
return enter_stage<interruptor>(
pp().process
).then_interruptible([this, clone_obc=std::move(clone_obc)]() mutable {
).then_interruptible(
[this,clone_obc=std::move(clone_obc), head_obc=std::move(head_obc)]() mutable {
logger().debug("{}: processing clone_obc={}", *this, fmt::ptr(clone_obc.get()));
return remove_or_update(
clone_obc, clone_obc->head
clone_obc, head_obc
).safe_then_unpack_interruptible([clone_obc, this]
(auto&& txn, auto&& log_entries) mutable {
auto [submitted, all_completed] = pg->submit_transaction(