crimson/osd: extract methods out of ClientRequest::process_op()

* do_recover_missing()
* do_process()

for better readability

Signed-off-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2021-02-08 18:19:25 +08:00
parent c6dd356e4c
commit e730737e8b
2 changed files with 73 additions and 59 deletions

View File

@ -116,63 +116,21 @@ seastar::future<> ClientRequest::process_pg_op(
});
}
seastar::future<> ClientRequest::process_op(
Ref<PG> &pgref)
seastar::future<> ClientRequest::process_op(Ref<PG> &pg)
{
PG& pg = *pgref;
return with_blocking_future(
handle.enter(pp(pg).recover_missing)
).then([this, &pg, pgref] {
eversion_t ver;
const hobject_t& soid = m->get_hobj();
logger().debug("{} check for recovery, {}", *this, soid);
if (pg.is_unreadable_object(soid, &ver) ||
pg.is_degraded_or_backfilling_object(soid)) {
logger().debug("{} need to wait for recovery, {}", *this, soid);
if (pg.get_recovery_backend()->is_recovering(soid)) {
return pg.get_recovery_backend()->get_recovering(soid).wait_for_recovered();
} else {
auto [op, fut] = osd.get_shard_services().start_operation<UrgentRecovery>(
soid, ver, pgref, osd.get_shard_services(), pg.get_osdmap_epoch());
return std::move(fut);
}
}
return seastar::now();
}).then([this, &pg] {
return with_blocking_future(handle.enter(pp(pg).get_obc));
}).then([this, &pg, &pgref]() -> PG::load_obc_ertr::future<> {
op_info.set_from_op(&*m, *pg.get_osdmap());
return pg.with_locked_obc(m, op_info, this, [this, &pg, &pgref](auto obc) {
return with_blocking_future(
handle.enter(pp(pg).process)
).then([this, &pg, obc]()
-> crimson::errorator<crimson::ct_error::eagain>::future<Ref<MOSDOpReply>> {
if (!pg.is_primary()) {
// primary can handle both normal ops and balanced reads
if (is_misdirected(pg)) {
logger().trace("process_op: dropping misdirected op");
return seastar::make_ready_future<Ref<MOSDOpReply>>();
} else if (const hobject_t& hoid = m->get_hobj();
!pg.get_peering_state().can_serve_replica_read(hoid)) {
auto reply = make_message<MOSDOpReply>(
m.get(), -EAGAIN, pg.get_osdmap_epoch(),
m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK),
!m->has_flag(CEPH_OSD_FLAG_RETURNVEC));
return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
}
}
return pg.do_osd_ops(m, obc, op_info);
}).safe_then([this](Ref<MOSDOpReply> reply) {
if (reply) {
return conn->send(std::move(reply));
} else {
return seastar::now();
}
}, crimson::ct_error::eagain::handle([this, &pgref] {
return process_op(pgref);
}));
return with_blocking_future(handle.enter(pp(*pg).recover_missing)).then([&] {
return do_recover_missing(pg);
}).then([&] {
return with_blocking_future(handle.enter(pp(*pg).get_obc));
}).then([this, &pg]() -> PG::load_obc_ertr::future<> {
op_info.set_from_op(&*m, *pg->get_osdmap());
return pg->with_locked_obc(m, op_info, this, [this, &pg](auto obc) {
return with_blocking_future(handle.enter(pp(*pg).process)).then(
[this, &pg, obc] {
return do_process(pg, obc);
});
});
}).safe_then([pgref=std::move(pgref)] {
}).safe_then([pg=std::move(pg)] {
return seastar::now();
}, PG::load_obc_ertr::all_same_way([](auto &code) {
logger().error("ClientRequest saw error code {}", code);
@ -180,6 +138,58 @@ seastar::future<> ClientRequest::process_op(
}));
}
seastar::future<> ClientRequest::do_recover_missing(Ref<PG>& pg)
{
eversion_t ver;
const hobject_t& soid = m->get_hobj();
logger().debug("{} check for recovery, {}", *this, soid);
if (!pg->is_unreadable_object(soid, &ver) &&
!pg->is_degraded_or_backfilling_object(soid)) {
return seastar::now();
}
logger().debug("{} need to wait for recovery, {}", *this, soid);
if (pg->get_recovery_backend()->is_recovering(soid)) {
return pg->get_recovery_backend()->get_recovering(soid).wait_for_recovered();
} else {
auto [op, fut] =
osd.get_shard_services().start_operation<UrgentRecovery>(
soid, ver, pg, osd.get_shard_services(), pg->get_osdmap_epoch());
return std::move(fut);
}
}
seastar::future<>
ClientRequest::do_process(Ref<PG>& pg, crimson::osd::ObjectContextRef obc)
{
using do_ops_return_t =
crimson::errorator<crimson::ct_error::eagain>::future<Ref<MOSDOpReply>>;
return [&pg, obc]() -> do_ops_return_t {
if (!pg->is_primary()) {
// primary can handle both normal ops and balanced reads
if (is_misdirected(*pg)) {
logger().trace("process_op: dropping misdirected op");
return seastar::make_ready_future<Ref<MOSDOpReply>>();
} else if (const hobject_t& hoid = m->get_hobj();
!pg->get_peering_state().can_serve_replica_read(hoid)) {
auto reply = make_message<MOSDOpReply>(
m.get(), -EAGAIN, pg->get_osdmap_epoch(),
m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK),
!m->has_flag(CEPH_OSD_FLAG_RETURNVEC));
return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
}
}
return pg->do_osd_ops(m, obc, op_info);
}().safe_then([this](Ref<MOSDOpReply> reply) {
if (reply) {
return conn->send(std::move(reply));
} else {
return seastar::now();
}
}, crimson::ct_error::eagain::handle([this, &pg] {
return process_op(pg);
}));
}
bool ClientRequest::is_misdirected(const PG& pg) const
{
// otherwise take a closer look

View File

@ -5,6 +5,7 @@
#include "osd/osd_op_util.h"
#include "crimson/net/Connection.h"
#include "crimson/osd/object_context.h"
#include "crimson/osd/osd_operation.h"
#include "crimson/common/type_helpers.h"
#include "messages/MOSDOp.h"
@ -60,10 +61,13 @@ public:
seastar::future<> start();
private:
seastar::future<> process_pg_op(
Ref<PG> &pg);
seastar::future<> process_op(
Ref<PG> &pg);
seastar::future<> process_pg_op(Ref<PG>& pg);
seastar::future<> process_op(Ref<PG>& pg);
seastar::future<> do_recover_missing(Ref<PG>& pgref);
seastar::future<> do_process(
Ref<PG>& pg,
crimson::osd::ObjectContextRef obc);
bool is_pg_op() const;
ConnectionPipeline &cp();