crimson/osd: use ObjectContext and take obc locks

Signed-off-by: Samuel Just <sjust@redhat.com>
This commit is contained in:
Samuel Just 2019-10-09 15:07:55 -07:00
parent fd803498df
commit ffb66d4ad0
9 changed files with 426 additions and 197 deletions

View File

@ -23,6 +23,7 @@ add_executable(crimson-osd
objclass.cc
${PROJECT_SOURCE_DIR}/src/objclass/class_api.cc
${PROJECT_SOURCE_DIR}/src/osd/ClassHandler.cc
${PROJECT_SOURCE_DIR}/src/osd/osd_op_util.cc
${PROJECT_SOURCE_DIR}/src/osd/PeeringState.cc
${PROJECT_SOURCE_DIR}/src/osd/PGPeeringEvent.cc
${PROJECT_SOURCE_DIR}/src/osd/PGStateUtils.cc

View File

@ -63,7 +63,7 @@ OpsExecuter::call_errorator::future<> OpsExecuter::do_op_call(OSDOp& osd_op)
}
const auto flags = method->get_flags();
if (!os->exists && (flags & CLS_METHOD_WR) == 0) {
if (!obc->obs.exists && (flags & CLS_METHOD_WR) == 0) {
return crimson::ct_error::enoent::make();
}
@ -357,7 +357,10 @@ OpsExecuter::execute_osd_op(OSDOp& osd_op)
// TODO: dispatch via call table?
// TODO: we might want to find a way to unify both input and output
// of each op.
logger().debug("handling op {}", ceph_osd_op_name(osd_op.op.op));
logger().debug(
"handling op {} on object {}",
ceph_osd_op_name(osd_op.op.op),
get_target());
switch (const ceph_osd_op& op = osd_op.op; op.op) {
case CEPH_OSD_OP_SYNC_READ:
[[fallthrough]];

View File

@ -72,7 +72,7 @@ private:
virtual ~effect_t() = default;
};
PGBackend::cached_os_t os;
ObjectContextRef obc;
PG& pg;
PGBackend& backend;
Ref<MOSDOp> msg;
@ -91,10 +91,14 @@ private:
call_errorator::future<> do_op_call(class OSDOp& osd_op);
hobject_t &get_target() const {
return obc->obs.oi.soid;
}
template <class Func>
auto do_const_op(Func&& f) {
// TODO: pass backend as read-only
return std::forward<Func>(f)(backend, std::as_const(*os));
return std::forward<Func>(f)(backend, std::as_const(obc->obs));
}
template <class Func>
@ -107,7 +111,7 @@ private:
template <class Func>
auto do_write_op(Func&& f) {
++num_write;
return std::forward<Func>(f)(backend, *os, txn);
return std::forward<Func>(f)(backend, obc->obs, txn);
}
// PG operations are being provided with pg instead of os.
@ -122,14 +126,14 @@ private:
}
public:
OpsExecuter(PGBackend::cached_os_t os, PG& pg, Ref<MOSDOp> msg)
: os(std::move(os)),
OpsExecuter(ObjectContextRef obc, PG& pg, Ref<MOSDOp> msg)
: obc(std::move(obc)),
pg(pg),
backend(pg.get_backend()),
msg(std::move(msg)) {
}
OpsExecuter(PG& pg, Ref<MOSDOp> msg)
: OpsExecuter{PGBackend::cached_os_t{}, pg, std::move(msg)}
: OpsExecuter{ObjectContextRef(), pg, std::move(msg)}
{}
osd_op_errorator::future<> execute_osd_op(class OSDOp& osd_op);
@ -175,9 +179,9 @@ auto OpsExecuter::with_effect(
template <typename Func>
OpsExecuter::osd_op_errorator::future<> OpsExecuter::submit_changes(Func&& f) && {
if (__builtin_expect(op_effects.empty(), true)) {
return std::forward<Func>(f)(std::move(txn), std::move(os));
return std::forward<Func>(f)(std::move(txn), std::move(obc));
}
return std::forward<Func>(f)(std::move(txn), std::move(os)).safe_then([this] {
return std::forward<Func>(f)(std::move(txn), std::move(obc)).safe_then([this] {
// let's do the cleaning of `op_effects` in destructor
return crimson::do_for_each(op_effects, [] (auto& op_effect) {
return op_effect->execute();

View File

@ -4,6 +4,7 @@
#include <seastar/core/future.hh>
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
#include "crimson/osd/pg.h"
#include "crimson/osd/osd.h"
@ -43,11 +44,18 @@ ClientRequest::PGPipeline &ClientRequest::pp(PG &pg)
return pg.client_request_pg_pipeline;
}
bool ClientRequest::is_pg_op() const
{
return std::any_of(
begin(m->ops), end(m->ops),
[](auto& op) { return ceph_osd_op_type_pg(op.op.op); });
}
seastar::future<> ClientRequest::start()
{
logger().debug("{}: start", *this);
IRef ref = this;
IRef opref = this;
return with_blocking_future(handle.enter(cp().await_map))
.then([this]() {
return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch()));
@ -55,21 +63,67 @@ seastar::future<> ClientRequest::start()
return with_blocking_future(handle.enter(cp().get_pg));
}).then([this] {
return with_blocking_future(osd.wait_for_pg(m->get_spg()));
}).then([this, ref=std::move(ref)](Ref<PG> pg) {
}).then([this, opref=std::move(opref)](Ref<PG> pgref) {
return seastar::do_with(
std::move(pg), std::move(ref), [this](auto pg, auto op) {
return with_blocking_future(
handle.enter(pp(*pg).await_map)
).then([this, pg] {
std::move(pgref), std::move(opref), [this](auto pgref, auto opref) {
PG &pg = *pgref;
return with_blocking_future(
pg->osdmap_gate.wait_for_map(m->get_map_epoch()));
}).then([this, pg] (auto) {
return with_blocking_future(handle.enter(pp(*pg).process));
}).then([this, pg] {
return pg->handle_op(conn.get(), std::move(m));
handle.enter(pp(pg).await_map)
).then([this, &pg]() mutable {
return with_blocking_future(
pg.osdmap_gate.wait_for_map(m->get_map_epoch()));
}).then([this, &pg](auto map) mutable {
return with_blocking_future(
handle.enter(pp(pg).wait_for_active));
}).then([this, &pg]() mutable {
return pg.wait_for_active();
}).then([this, &pg]() mutable {
if (m->finish_decode()) {
m->clear_payload();
}
if (is_pg_op()) {
return process_pg_op(pg);
} else {
return process_op(pg);
}
});
});
});
});
}
seastar::future<> ClientRequest::process_pg_op(
PG &pg)
{
return pg.do_pg_ops(m)
.then([this](Ref<MOSDOpReply> reply) {
return conn->send(reply);
});
}
seastar::future<> ClientRequest::process_op(
PG &pg)
{
return with_blocking_future(
handle.enter(pp(pg).get_obc)
).then([this, &pg]() {
op_info.set_from_op(&*m, *pg.get_osdmap());
return pg.with_locked_obc(
m,
op_info,
this,
[this, &pg](auto obc) {
return with_blocking_future(handle.enter(pp(pg).process)
).then([this, &pg, obc]() {
return pg.do_osd_ops(m, obc);
}).then([this](Ref<MOSDOpReply> reply) {
return conn->send(reply);
});
});
}).safe_then([] {
return seastar::now();
}, PG::load_obc_ertr::all_same_way([](auto &code) {
logger().error("ClientRequest saw error code {}", code);
return seastar::now();
}));
}
}

View File

@ -3,6 +3,7 @@
#pragma once
#include "osd/osd_op_util.h"
#include "crimson/net/Connection.h"
#include "crimson/osd/osd_operation.h"
#include "crimson/common/type_helpers.h"
@ -17,6 +18,7 @@ class ClientRequest final : public OperationT<ClientRequest> {
OSD &osd;
crimson::net::ConnectionRef conn;
Ref<MOSDOp> m;
OpInfo op_info;
OrderedPipelinePhase::Handle handle;
public:
@ -33,6 +35,12 @@ public:
OrderedPipelinePhase await_map = {
"ClientRequest::PGPipeline::await_map"
};
OrderedPipelinePhase wait_for_active = {
"ClientRequest::PGPipeline::wait_for_active"
};
OrderedPipelinePhase get_obc = {
"ClientRequest::PGPipeline::get_obc"
};
OrderedPipelinePhase process = {
"ClientRequest::PGPipeline::process"
};
@ -45,9 +53,17 @@ public:
void print(std::ostream &) const final;
void dump_detail(Formatter *f) const final;
public:
seastar::future<> start();
private:
seastar::future<> process_pg_op(
PG &pg);
seastar::future<> process_op(
PG &pg);
bool is_pg_op() const;
ConnectionPipeline &cp();
PGPipeline &pp(PG &pg);
};

View File

@ -412,14 +412,14 @@ seastar::future<> PG::wait_for_active()
}
}
seastar::future<> PG::submit_transaction(boost::local_shared_ptr<ObjectState>&& os,
seastar::future<> PG::submit_transaction(ObjectContextRef&& obc,
ceph::os::Transaction&& txn,
const MOSDOp& req)
{
epoch_t map_epoch = get_osdmap_epoch();
eversion_t at_version{map_epoch, projected_last_update.version + 1};
return backend->mutate_object(peering_state.get_acting_recovery_backfill(),
std::move(os),
std::move(obc),
std::move(txn),
req,
peering_state.get_last_peering_reset(),
@ -433,56 +433,80 @@ seastar::future<> PG::submit_transaction(boost::local_shared_ptr<ObjectState>&&
});
}
seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(Ref<MOSDOp> m)
seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(
Ref<MOSDOp> m,
ObjectContextRef obc)
{
using osd_op_errorator = OpsExecuter::osd_op_errorator;
const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head()
: m->get_hobj();
return backend->get_object_state(oid).safe_then([this, m](auto os) mutable {
auto ox =
std::make_unique<OpsExecuter>(std::move(os), *this/* as const& */, m);
return crimson::do_for_each(m->ops, [this, ox = ox.get()](OSDOp& osd_op) {
logger().debug("will be handling op {}", ceph_osd_op_name(osd_op.op.op));
return ox->execute_osd_op(osd_op);
}).safe_then([this, m, ox = std::move(ox)] {
logger().debug("all operations have been executed successfully");
return std::move(*ox).submit_changes([this, m] (auto&& txn, auto&& os) -> osd_op_errorator::future<> {
auto ox =
std::make_unique<OpsExecuter>(obc, *this/* as const& */, m);
return crimson::do_for_each(
m->ops, [this, obc, m, ox = ox.get()](OSDOp& osd_op) {
logger().debug(
"do_osd_ops: {} - object {} - handling op {}",
*m,
obc->obs.oi.soid,
ceph_osd_op_name(osd_op.op.op));
return ox->execute_osd_op(osd_op);
}).safe_then([this, obc, m, ox = std::move(ox)] {
logger().debug(
"do_osd_ops: {} - object {} all operations successful",
*m,
obc->obs.oi.soid);
return std::move(*ox).submit_changes(
[this, m] (auto&& txn, auto&& obc) -> osd_op_errorator::future<> {
// XXX: the entire lambda could be scheduled conditionally. ::if_then()?
if (txn.empty()) {
logger().debug("txn is empty, bypassing mutate");
logger().debug(
"do_osd_ops: {} - object {} txn is empty, bypassing mutate",
*m,
obc->obs.oi.soid);
return osd_op_errorator::now();
} else {
return submit_transaction(std::move(os), std::move(txn), *m);
logger().debug(
"do_osd_ops: {} - object {} submitting txn",
*m,
obc->obs.oi.soid);
return submit_transaction(std::move(obc), std::move(txn), *m);
}
});
});
}).safe_then([m,this] {
}).safe_then([m, obc, this] {
auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
0, false);
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
logger().debug(
"do_osd_ops: {} - object {} sending reply",
*m,
obc->obs.oi.soid);
return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
}, OpsExecuter::osd_op_errorator::all_same_way([=,&oid] (const std::error_code& e) {
assert(e.value() > 0);
logger().debug("got statical error code while handling object {}: {} ({})",
oid, e.value(), e.message());
return backend->evict_object_state(oid).then([=] {
auto reply = make_message<MOSDOpReply>(
m.get(), -e.value(), get_osdmap_epoch(), 0, false);
reply->set_enoent_reply_versions(peering_state.get_info().last_update,
peering_state.get_info().last_user_version);
return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
});
logger().debug(
"do_osd_ops: {} - object {} got error code {}, {}",
*m,
obc->obs.oi.soid,
e.value(),
e.message());
auto reply = make_message<MOSDOpReply>(
m.get(), -e.value(), get_osdmap_epoch(), 0, false);
reply->set_enoent_reply_versions(peering_state.get_info().last_update,
peering_state.get_info().last_user_version);
return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
})).handle_exception_type([=,&oid](const crimson::osd::error& e) {
// we need this handler because throwing path which aren't errorated yet.
logger().debug("got ceph::osd::error while handling object {}: {} ({})",
oid, e.code(), e.what());
return backend->evict_object_state(oid).then([=] {
auto reply = make_message<MOSDOpReply>(
m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
reply->set_enoent_reply_versions(peering_state.get_info().last_update,
peering_state.get_info().last_user_version);
return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
});
logger().debug(
"do_osd_ops: {} - object {} got unhandled exception {} ({})",
*m,
obc->obs.oi.soid,
e.code(),
e.what());
auto reply = make_message<MOSDOpReply>(
m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
reply->set_enoent_reply_versions(peering_state.get_info().last_update,
peering_state.get_info().last_user_version);
return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
});
}
@ -506,22 +530,183 @@ seastar::future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
});
}
seastar::future<> PG::handle_op(crimson::net::Connection* conn,
Ref<MOSDOp> m)
std::pair<hobject_t, RWState::State> PG::get_oid_and_lock(
const MOSDOp &m,
const OpInfo &op_info)
{
return wait_for_active().then([conn, m, this] {
if (m->finish_decode()) {
m->clear_payload();
auto oid = m.get_snapid() == CEPH_SNAPDIR ?
m.get_hobj().get_head() : m.get_hobj();
RWState::State lock_type = RWState::RWNONE;
if (op_info.rwordered() && op_info.may_read()) {
lock_type = RWState::RWState::RWEXCL;
} else if (op_info.rwordered()) {
lock_type = RWState::RWState::RWWRITE;
} else {
ceph_assert(op_info.may_read());
lock_type = RWState::RWState::RWREAD;
}
return std::make_pair(oid, lock_type);
}
std::optional<hobject_t> PG::resolve_oid(
const SnapSet &ss,
const hobject_t &oid)
{
if (oid.snap > ss.seq) {
return oid.get_head();
} else {
// which clone would it be?
auto clone = std::upper_bound(
begin(ss.clones), end(ss.clones),
oid.snap);
if (clone == end(ss.clones)) {
// Doesn't exist, > last clone, < ss.seq
return std::nullopt;
}
if (std::any_of(begin(m->ops), end(m->ops),
[](auto& op) { return ceph_osd_op_type_pg(op.op.op); })) {
return do_pg_ops(m);
auto citer = ss.clone_snaps.find(*clone);
// TODO: how do we want to handle this kind of logic error?
ceph_assert(citer != ss.clone_snaps.end());
if (std::find(
citer->second.begin(),
citer->second.end(),
*clone) == citer->second.end()) {
return std::nullopt;
} else {
return do_osd_ops(m);
auto soid = oid;
soid.snap = *clone;
return std::optional<hobject_t>(soid);
}
}).then([conn](Ref<MOSDOpReply> reply) {
return conn->send(reply);
});
}
}
PG::load_obc_ertr::future<
std::pair<crimson::osd::ObjectContextRef, bool>>
PG::get_or_load_clone_obc(hobject_t oid, ObjectContextRef head)
{
ceph_assert(!oid.is_head());
using ObjectContextRef = crimson::osd::ObjectContextRef;
auto coid = resolve_oid(head->get_ro_ss(), oid);
if (!coid) {
return load_obc_ertr::make_ready_future<
std::pair<crimson::osd::ObjectContextRef, bool>>(
std::make_pair(ObjectContextRef(), true)
);
}
auto [obc, existed] = shard_services.obc_registry.get_cached_obc(*coid);
if (existed) {
return load_obc_ertr::make_ready_future<
std::pair<crimson::osd::ObjectContextRef, bool>>(
std::make_pair(obc, true)
);
} else {
bool got = obc->maybe_get_excl();
ceph_assert(got);
return backend->load_metadata(*coid).safe_then(
[oid, obc=std::move(obc), head, this](auto &&md) mutable {
obc->set_clone_state(std::move(md->os), std::move(head));
return load_obc_ertr::make_ready_future<
std::pair<crimson::osd::ObjectContextRef, bool>>(
std::make_pair(obc, false)
);
});
}
}
PG::load_obc_ertr::future<
std::pair<crimson::osd::ObjectContextRef, bool>>
PG::get_or_load_head_obc(hobject_t oid)
{
ceph_assert(oid.is_head());
auto [obc, existed] = shard_services.obc_registry.get_cached_obc(oid);
if (existed) {
logger().debug(
"{}: found {} in cache",
__func__,
oid);
return load_obc_ertr::make_ready_future<
std::pair<crimson::osd::ObjectContextRef, bool>>(
std::make_pair(std::move(obc), true)
);
} else {
logger().debug(
"{}: cache miss on {}",
__func__,
oid);
bool got = obc->maybe_get_excl();
ceph_assert(got);
return backend->load_metadata(oid).safe_then(
[oid, obc=std::move(obc), this](auto md) ->
load_obc_ertr::future<
std::pair<crimson::osd::ObjectContextRef, bool>>
{
logger().debug(
"{}: loaded obs {} for {}",
__func__,
md->os.oi,
oid);
if (!md->ss) {
logger().error(
"{}: oid {} missing snapset",
__func__,
oid);
return crimson::ct_error::object_corrupted::make();
}
obc->set_head_state(std::move(md->os), std::move(*(md->ss)));
logger().debug(
"{}: returning obc {} for {}",
__func__,
obc->obs.oi,
obc->obs.oi.soid);
return load_obc_ertr::make_ready_future<
std::pair<crimson::osd::ObjectContextRef, bool>>(
std::make_pair(obc, false)
);
});
}
}
PG::load_obc_ertr::future<crimson::osd::ObjectContextRef>
PG::get_locked_obc(
Operation *op, const hobject_t &oid, RWState::State type)
{
return get_or_load_head_obc(oid.get_head()).safe_then(
[this, op, oid, type](auto p) -> load_obc_ertr::future<ObjectContextRef>{
auto &[head_obc, head_existed] = p;
if (oid.is_head()) {
if (head_existed) {
return head_obc->get_lock_type(op, type).then([head_obc] {
ceph_assert(head_obc->loaded);
return load_obc_ertr::make_ready_future<ObjectContextRef>(head_obc);
});
} else {
head_obc->degrade_excl_to(type);
return load_obc_ertr::make_ready_future<ObjectContextRef>(head_obc);
}
} else {
return head_obc->get_lock_type(op, RWState::RWREAD).then(
[this, head_obc, op, oid, type] {
ceph_assert(head_obc->loaded);
return get_or_load_clone_obc(oid, head_obc);
}).safe_then([this, head_obc, op, oid, type](auto p) {
auto &[obc, existed] = p;
if (existed) {
return load_obc_ertr::future<>(
obc->get_lock_type(op, type)).safe_then([obc] {
ceph_assert(obc->loaded);
return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
});
} else {
obc->degrade_excl_to(type);
return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
}
}).safe_then([head_obc](auto obc) {
head_obc->put_lock_type(RWState::RWREAD);
return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
});
}
});
}
seastar::future<> PG::handle_rep_op(Ref<MOSDRepOp> req)

View File

@ -15,7 +15,7 @@
#include "crimson/net/Fwd.h"
#include "os/Transaction.h"
#include "osd/osd_types.h"
#include "osd/osd_internal_types.h"
#include "crimson/osd/object_context.h"
#include "osd/PeeringState.h"
#include "crimson/common/type_helpers.h"
@ -435,8 +435,46 @@ public:
void handle_advance_map(cached_map_t next_map, PeeringCtx &rctx);
void handle_activate_map(PeeringCtx &rctx);
void handle_initialize(PeeringCtx &rctx);
seastar::future<> handle_op(crimson::net::Connection* conn,
Ref<MOSDOp> m);
static std::pair<hobject_t, RWState::State> get_oid_and_lock(
const MOSDOp &m,
const OpInfo &op_info);
static std::optional<hobject_t> resolve_oid(
const SnapSet &snapset,
const hobject_t &oid);
using load_obc_ertr = crimson::errorator<
crimson::ct_error::object_corrupted>;
load_obc_ertr::future<
std::pair<crimson::osd::ObjectContextRef, bool>>
get_or_load_clone_obc(
hobject_t oid, crimson::osd::ObjectContextRef head_obc);
load_obc_ertr::future<
std::pair<crimson::osd::ObjectContextRef, bool>>
get_or_load_head_obc(hobject_t oid);
load_obc_ertr::future<ObjectContextRef> get_locked_obc(
Operation *op,
const hobject_t &oid,
RWState::State type);
public:
template <typename F>
auto with_locked_obc(
Ref<MOSDOp> &m,
const OpInfo &op_info,
Operation *op,
F &&f) {
auto [oid, type] = get_oid_and_lock(*m, op_info);
return get_locked_obc(op, oid, type)
.safe_then([this, f=std::forward<F>(f), type](auto obc) {
return f(obc).finally([this, obc, type] {
obc->put_lock_type(type);
return load_obc_ertr::now();
});
});
}
seastar::future<> handle_rep_op(Ref<MOSDRepOp> m);
void handle_rep_op_reply(crimson::net::Connection* conn,
const MOSDRepOpReply& m);
@ -447,7 +485,9 @@ private:
void do_peering_event(
const boost::statechart::event_base &evt,
PeeringCtx &rctx);
seastar::future<Ref<MOSDOpReply>> do_osd_ops(Ref<MOSDOp> m);
seastar::future<Ref<MOSDOpReply>> do_osd_ops(
Ref<MOSDOp> m,
ObjectContextRef obc);
seastar::future<Ref<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
seastar::future<> do_osd_op(
ObjectState& os,
@ -456,7 +496,7 @@ private:
seastar::future<ceph::bufferlist> do_pgnls(ceph::bufferlist& indata,
const std::string& nspace,
uint64_t limit);
seastar::future<> submit_transaction(boost::local_shared_ptr<ObjectState>&& os,
seastar::future<> submit_transaction(ObjectContextRef&& obc,
ceph::os::Transaction&& txn,
const MOSDOp& req);
@ -465,6 +505,11 @@ private:
ShardServices &shard_services;
cached_map_t osdmap;
public:
cached_map_t get_osdmap() { return osdmap; }
private:
std::unique_ptr<PGBackend> backend;
PeeringState peering_state;

View File

@ -13,6 +13,7 @@
#include "messages/MOSDOp.h"
#include "os/Transaction.h"
#include "common/Clock.h"
#include "crimson/os/cyanstore/cyan_object.h"
#include "crimson/os/futurized_collection.h"
@ -60,128 +61,63 @@ PGBackend::PGBackend(shard_id_t shard,
store{store}
{}
PGBackend::get_os_errorator::future<PGBackend::cached_os_t>
PGBackend::get_object_state(const hobject_t& oid)
{
// want the head?
if (oid.snap == CEPH_NOSNAP) {
logger().trace("find_object: {}@HEAD", oid);
return _load_os(oid);
} else {
// we want a snap
return _load_ss(oid).safe_then(
[oid,this](cached_ss_t ss) -> get_os_errorator::future<cached_os_t> {
// head?
if (oid.snap > ss->seq) {
return _load_os(oid.get_head());
} else {
// which clone would it be?
auto clone = std::upper_bound(begin(ss->clones), end(ss->clones),
oid.snap);
if (clone == end(ss->clones)) {
return crimson::ct_error::enoent::make();
}
// clone
auto soid = oid;
soid.snap = *clone;
return _load_ss(soid).safe_then(
[soid,this](cached_ss_t ss) -> get_os_errorator::future<cached_os_t> {
auto clone_snap = ss->clone_snaps.find(soid.snap);
assert(clone_snap != end(ss->clone_snaps));
if (clone_snap->second.empty()) {
logger().trace("find_object: {}@[] -- DNE", soid);
return crimson::ct_error::enoent::make();
}
auto first = clone_snap->second.back();
auto last = clone_snap->second.front();
if (first > soid.snap) {
logger().trace("find_object: {}@[{},{}] -- DNE",
soid, first, last);
return crimson::ct_error::enoent::make();
}
logger().trace("find_object: {}@[{},{}] -- HIT",
soid, first, last);
return _load_os(soid);
});
}
});
}
}
PGBackend::load_metadata_ertr::future<PGBackend::loaded_object_md_t>
PGBackend::load_metadata_ertr::future<PGBackend::loaded_object_md_t::ref>
PGBackend::load_metadata(const hobject_t& oid)
{
return store->get_attrs(
coll,
ghobject_t{oid, ghobject_t::NO_GEN, shard}).safe_then(
[oid, this](auto &&attrs) -> load_metadata_ertr::future<loaded_object_md_t>{
loaded_object_md_t ret;
[oid, this](auto &&attrs) -> load_metadata_ertr::future<loaded_object_md_t::ref>{
loaded_object_md_t::ref ret(new loaded_object_md_t());
if (auto oiiter = attrs.find(OI_ATTR); oiiter != attrs.end()) {
bufferlist bl;
bl.push_back(oiiter->second);
ret.os = ObjectState(
bl.push_back(std::move(oiiter->second));
ret->os = ObjectState(
object_info_t(bl),
true);
} else {
logger().error(
"load_metadata: object {} present but missing object info",
oid);
return crimson::ct_error::object_corrupted::make();
}
if (oid.is_head()) {
if (auto ssiter = attrs.find(SS_ATTR); ssiter != attrs.end()) {
bufferlist bl;
bl.push_back(ssiter->second);
ret.ss = SnapSet(bl);
bl.push_back(std::move(ssiter->second));
ret->ss = SnapSet(bl);
} else {
return crimson::ct_error::object_corrupted::make();
/* TODO: add support for writing out snapsets
logger().error(
"load_metadata: object {} present but missing snapset",
oid);
//return crimson::ct_error::object_corrupted::make();
*/
ret->ss = SnapSet();
}
}
return load_metadata_ertr::make_ready_future<loaded_object_md_t>(
return load_metadata_ertr::make_ready_future<loaded_object_md_t::ref>(
std::move(ret));
}, crimson::ct_error::enoent::handle([oid, this] {
return load_metadata_ertr::make_ready_future<loaded_object_md_t>(
loaded_object_md_t{
ObjectState(),
std::nullopt
logger().debug(
"load_metadata: object {} doesn't exist, returning empty metadata",
oid);
return load_metadata_ertr::make_ready_future<loaded_object_md_t::ref>(
new loaded_object_md_t{
ObjectState(
object_info_t(oid),
false),
oid.is_head() ? std::optional<SnapSet>(SnapSet()) : std::nullopt
});
}));
}
PGBackend::get_os_errorator::future<PGBackend::cached_os_t>
PGBackend::_load_os(const hobject_t& oid)
{
if (auto found = os_cache.find(oid); found) {
return get_os_errorator::make_ready_future<cached_os_t>(std::move(found));
}
return load_metadata(oid).safe_then([oid, this](auto &&md) {
return get_os_errorator::make_ready_future<cached_os_t>(
os_cache.insert(
oid,
std::make_unique<ObjectState>(std::move(md.os))));
});
}
PGBackend::get_os_errorator::future<PGBackend::cached_ss_t>
PGBackend::_load_ss(const hobject_t& oid)
{
if (auto found = ss_cache.find(oid); found) {
return get_os_errorator::make_ready_future<cached_ss_t>(std::move(found));
}
return load_metadata(oid).safe_then([oid, this](auto &&md) {
if (!md.ss) {
return get_os_errorator::make_ready_future<cached_ss_t>(
std::make_unique<SnapSet>());
} else {
return get_os_errorator::make_ready_future<cached_ss_t>(
ss_cache.insert(oid, std::make_unique<SnapSet>(std::move(*(md.ss)))));
}
});
}
seastar::future<crimson::osd::acked_peers_t>
PGBackend::mutate_object(
std::set<pg_shard_t> pg_shards,
cached_os_t&& os,
crimson::osd::ObjectContextRef &&obc,
ceph::os::Transaction&& txn,
const MOSDOp& m,
epoch_t min_epoch,
@ -189,36 +125,30 @@ PGBackend::mutate_object(
eversion_t ver)
{
logger().trace("mutate_object: num_ops={}", txn.get_num_ops());
if (os->exists) {
if (obc->obs.exists) {
#if 0
os.oi.version = ctx->at_version;
os.oi.prior_version = ctx->obs->oi.version;
obc->obs.oi.version = ctx->at_version;
obc->obs.oi.prior_version = ctx->obs->oi.version;
#endif
os->oi.last_reqid = m.get_reqid();
os->oi.mtime = m.get_mtime();
os->oi.local_mtime = ceph_clock_now();
obc->obs.oi.last_reqid = m.get_reqid();
obc->obs.oi.mtime = m.get_mtime();
obc->obs.oi.local_mtime = ceph_clock_now();
// object_info_t
{
ceph::bufferlist osv;
encode(os->oi, osv, 0);
encode(obc->obs.oi, osv, 0);
// TODO: get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
txn.setattr(coll->get_cid(), ghobject_t{os->oi.soid}, OI_ATTR, osv);
txn.setattr(coll->get_cid(), ghobject_t{obc->obs.oi.soid}, OI_ATTR, osv);
}
} else {
// reset cached ObjectState without enforcing eviction
os->oi = object_info_t(os->oi.soid);
obc->obs.oi = object_info_t(obc->obs.oi.soid);
}
return _submit_transaction(std::move(pg_shards), os->oi.soid, std::move(txn),
m.get_reqid(), min_epoch, map_epoch, ver);
}
seastar::future<>
PGBackend::evict_object_state(const hobject_t& oid)
{
os_cache.erase(oid);
return seastar::now();
return _submit_transaction(
std::move(pg_shards), obc->obs.oi.soid, std::move(txn),
m.get_reqid(), min_epoch, map_epoch, ver);
}
static inline bool _read_verify_data(

View File

@ -43,10 +43,6 @@ public:
crimson::os::CollectionRef coll,
crimson::osd::ShardServices& shard_services,
const ec_profile_t& ec_profile);
using cached_os_t = boost::local_shared_ptr<ObjectState>;
using get_os_errorator = crimson::errorator<crimson::ct_error::enoent>;
get_os_errorator::future<cached_os_t> get_object_state(const hobject_t& oid);
seastar::future<> evict_object_state(const hobject_t& oid);
using read_errorator = ll_read_errorator::extend<
crimson::ct_error::input_output_error,
@ -58,6 +54,7 @@ public:
size_t truncate_size,
uint32_t truncate_seq,
uint32_t flags);
using stat_errorator = crimson::errorator<crimson::ct_error::enoent>;
stat_errorator::future<> stat(
const ObjectState& os,
@ -80,7 +77,7 @@ public:
ceph::os::Transaction& trans);
seastar::future<crimson::osd::acked_peers_t> mutate_object(
std::set<pg_shard_t> pg_shards,
cached_os_t&& os,
crimson::osd::ObjectContextRef &&obc,
ceph::os::Transaction&& txn,
const MOSDOp& m,
epoch_t min_epoch,
@ -129,18 +126,12 @@ public:
struct loaded_object_md_t {
ObjectState os;
std::optional<SnapSet> ss;
using ref = std::unique_ptr<loaded_object_md_t>;
};
load_metadata_ertr::future<loaded_object_md_t> load_metadata(
load_metadata_ertr::future<loaded_object_md_t::ref> load_metadata(
const hobject_t &oid);
private:
using cached_ss_t = boost::local_shared_ptr<SnapSet>;
SharedLRU<hobject_t, SnapSet> ss_cache;
get_os_errorator::future<cached_ss_t> _load_ss(const hobject_t& oid);
SharedLRU<hobject_t, ObjectState> os_cache;
get_os_errorator::future<cached_os_t> _load_os(const hobject_t& oid);
virtual ll_read_errorator::future<ceph::bufferlist> _read(
const hobject_t& hoid,
size_t offset,