Merge pull request #38663 from tchaikov/wip-crimson-recovery-cleanups

crimson/osd: fixes and cleanups

Reviewed-by: Xuehan Xu <xxhdx1985126@gmail.com>
Reviewed-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
This commit is contained in:
Kefu Chai 2021-01-04 22:54:10 +08:00 committed by GitHub
commit d39c1794a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 178 additions and 130 deletions

View File

@ -64,6 +64,11 @@ void excl_lock_from_excl::unlock()
{
}
tri_mutex::~tri_mutex()
{
assert(!is_acquired());
}
seastar::future<> tri_mutex::lock_for_read()
{
if (try_lock_for_read()) {

View File

@ -67,6 +67,7 @@ class tri_mutex : private read_lock,
{
public:
tri_mutex() = default;
~tri_mutex();
read_lock& for_read() {
return *this;

View File

@ -451,16 +451,23 @@ seastar::future<struct stat> AlienStore::stat(
});
}
seastar::future<ceph::bufferlist> AlienStore::omap_get_header(
CollectionRef ch,
const ghobject_t& oid)
auto AlienStore::omap_get_header(CollectionRef ch,
const ghobject_t& oid)
-> read_errorator::future<ceph::bufferlist>
{
return seastar::do_with(ceph::bufferlist(), [=](auto& bl) {
return tp->submit([=, &bl] {
auto c = static_cast<AlienCollection*>(ch.get());
return store->omap_get_header(c->collection, oid, &bl);
}).then([&bl] (int i) {
return seastar::make_ready_future<ceph::bufferlist>(std::move(bl));
}).then([&bl] (int r) -> read_errorator::future<ceph::bufferlist> {
if (r == -ENOENT) {
return crimson::ct_error::enoent::make();
} else if (r < 0) {
logger().error("omap_get_header: {}", r);
return crimson::ct_error::input_output_error::make();
} else {
return read_errorator::make_ready_future<ceph::bufferlist>(std::move(bl));
}
});
});
}

View File

@ -99,7 +99,7 @@ public:
seastar::future<struct stat> stat(
CollectionRef,
const ghobject_t&) final;
seastar::future<ceph::bufferlist> omap_get_header(
read_errorator::future<ceph::bufferlist> omap_get_header(
CollectionRef,
const ghobject_t&) final;
seastar::future<std::map<uint64_t, uint64_t>> fiemap(

View File

@ -295,18 +295,19 @@ CyanStore::omap_get_values(CollectionRef ch,
std::make_tuple(true, std::move(values)));
}
seastar::future<ceph::bufferlist>
CyanStore::omap_get_header(
CollectionRef ch,
const ghobject_t& oid
) {
auto
CyanStore::omap_get_header(CollectionRef ch,
const ghobject_t& oid)
-> read_errorator::future<ceph::bufferlist>
{
auto c = static_cast<Collection*>(ch.get());
auto o = c->get_object(oid);
if (!o) {
throw std::runtime_error(fmt::format("object does not exist: {}", oid));
return crimson::ct_error::enoent::make();
}
return seastar::make_ready_future<ceph::bufferlist>(o->omap_header);
return read_errorator::make_ready_future<ceph::bufferlist>(
o->omap_header);
}
seastar::future<> CyanStore::do_transaction(CollectionRef ch,

View File

@ -118,7 +118,7 @@ public:
const ghobject_t& end,
uint64_t limit) const final;
seastar::future<ceph::bufferlist> omap_get_header(
read_errorator::future<ceph::bufferlist> omap_get_header(
CollectionRef c,
const ghobject_t& oid) final;

View File

@ -124,7 +124,7 @@ public:
const std::optional<std::string> &start ///< [in] start, empty for begin
) = 0; ///< @return <done, values> values.empty() iff done
virtual seastar::future<bufferlist> omap_get_header(
virtual read_errorator::future<bufferlist> omap_get_header(
CollectionRef c,
const ghobject_t& oid) = 0;

View File

@ -170,9 +170,11 @@ seastar::future<struct stat> SeaStore::stat(
return seastar::make_ready_future<struct stat>(st);
}
seastar::future<ceph::bufferlist> omap_get_header(
auto
SeaStore::omap_get_header(
CollectionRef c,
const ghobject_t& oid)
-> read_errorator::future<bufferlist>
{
return seastar::make_ready_future<bufferlist>();
}

View File

@ -82,7 +82,7 @@ public:
const std::optional<std::string> &start ///< [in] start, empty for begin
) final; ///< @return <done, values> values.empty() iff done
seastar::future<bufferlist> omap_get_header(
read_errorator::future<bufferlist> omap_get_header(
CollectionRef c,
const ghobject_t& oid) final;

View File

@ -878,18 +878,20 @@ maybe_get_omap_vals(
}
}
seastar::future<ceph::bufferlist> PGBackend::omap_get_header(
PGBackend::ll_read_errorator::future<ceph::bufferlist>
PGBackend::omap_get_header(
const crimson::os::CollectionRef& c,
const ghobject_t& oid) const
{
return store->omap_get_header(c, oid);
}
seastar::future<> PGBackend::omap_get_header(
PGBackend::ll_read_errorator::future<>
PGBackend::omap_get_header(
const ObjectState& os,
OSDOp& osd_op) const
{
return omap_get_header(coll, ghobject_t{os.oi.soid}).then(
return omap_get_header(coll, ghobject_t{os.oi.soid}).safe_then(
[&osd_op] (ceph::bufferlist&& header) {
osd_op.outdata = std::move(header);
return seastar::now();

View File

@ -173,10 +173,10 @@ public:
const OSDOp& osd_op,
ceph::os::Transaction& trans,
osd_op_params_t& osd_op_params);
seastar::future<ceph::bufferlist> omap_get_header(
ll_read_errorator::future<ceph::bufferlist> omap_get_header(
const crimson::os::CollectionRef& c,
const ghobject_t& oid) const;
seastar::future<> omap_get_header(
ll_read_errorator::future<> omap_get_header(
const ObjectState& os,
OSDOp& osd_op) const;
seastar::future<> omap_set_header(

View File

@ -19,12 +19,14 @@ namespace {
hobject_t RecoveryBackend::get_temp_recovery_object(
const hobject_t& target,
eversion_t version)
eversion_t version) const
{
ostringstream ss;
ss << "temp_recovering_" << pg.get_info().pgid << "_" << version
<< "_" << pg.get_info().history.same_interval_since << "_" << target.snap;
hobject_t hoid = target.make_temp_hobject(ss.str());
hobject_t hoid =
target.make_temp_hobject(fmt::format("temp_recovering_{}_{}_{}_{}",
pg.get_info().pgid,
version,
pg.get_info().history.same_interval_since,
target.snap));
logger().debug("{} {}", __func__, hoid);
return hoid;
}
@ -102,9 +104,7 @@ seastar::future<> RecoveryBackend::handle_backfill_progress(
t);
return shard_services.get_store().do_transaction(
pg.get_collection_ref(), std::move(t)
).handle_exception([] (auto) {
ceph_assert("this transaction shall not fail" == nullptr);
});
).or_terminate();
}
seastar::future<> RecoveryBackend::handle_backfill_finish_ack(
@ -150,9 +150,7 @@ seastar::future<> RecoveryBackend::handle_backfill_remove(
}
return shard_services.get_store().do_transaction(
pg.get_collection_ref(), std::move(t)
).handle_exception([] (auto) {
ceph_abort_msg("this transaction shall not fail");
});
).or_terminate();
}
seastar::future<BackfillInterval> RecoveryBackend::scan_for_backfill(
@ -161,54 +159,49 @@ seastar::future<BackfillInterval> RecoveryBackend::scan_for_backfill(
const std::int64_t max)
{
logger().debug("{} starting from {}", __func__, start);
return seastar::do_with(
std::map<hobject_t, eversion_t>{},
[this, &start, max] (auto& version_map) {
return backend->list_objects(start, max).then(
[this, &start, &version_map] (auto&& ret) {
auto& [objects, next] = ret;
return seastar::do_for_each(
objects,
[this, &version_map] (const hobject_t& object) {
crimson::osd::ObjectContextRef obc;
if (pg.is_primary()) {
obc = shard_services.obc_registry.maybe_get_cached_obc(object);
}
if (obc) {
if (obc->obs.exists) {
logger().debug("scan_for_backfill found (primary): {} {}",
object, obc->obs.oi.version);
version_map[object] = obc->obs.oi.version;
} else {
// if the object does not exist here, it must have been removed
// between the collection_list_partial and here. This can happen
// for the first item in the range, which is usually last_backfill.
}
return seastar::now();
} else {
return backend->load_metadata(object).safe_then(
[&version_map, object] (auto md) {
if (md->os.exists) {
logger().debug("scan_for_backfill found: {} {}",
object, md->os.oi.version);
version_map[object] = md->os.oi.version;
}
return seastar::now();
}, PGBackend::load_metadata_ertr::assert_all{});
}
}).then(
[&version_map, &start, next=std::move(next), this] {
BackfillInterval bi;
bi.begin = start;
bi.end = std::move(next);
bi.version = pg.get_info().last_update;
bi.objects = std::move(version_map);
logger().debug("{} BackfillInterval filled, leaving",
"scan_for_backfill");
return seastar::make_ready_future<BackfillInterval>(std::move(bi));
});
});
auto version_map = seastar::make_lw_shared<std::map<hobject_t, eversion_t>>();
return backend->list_objects(start, max).then(
[this, start, version_map] (auto&& ret) {
auto&& [objects, next] = std::move(ret);
return seastar::parallel_for_each(std::move(objects),
[this, version_map] (const hobject_t& object) {
crimson::osd::ObjectContextRef obc;
if (pg.is_primary()) {
obc = shard_services.obc_registry.maybe_get_cached_obc(object);
}
if (obc) {
if (obc->obs.exists) {
logger().debug("scan_for_backfill found (primary): {} {}",
object, obc->obs.oi.version);
version_map->emplace(object, obc->obs.oi.version);
} else {
// if the object does not exist here, it must have been removed
// between the collection_list_partial and here. This can happen
// for the first item in the range, which is usually last_backfill.
}
return seastar::now();
} else {
return backend->load_metadata(object).safe_then(
[version_map, object] (auto md) {
if (md->os.exists) {
logger().debug("scan_for_backfill found: {} {}",
object, md->os.oi.version);
version_map->emplace(object, md->os.oi.version);
}
return seastar::now();
}, PGBackend::load_metadata_ertr::assert_all{});
}
}).then([version_map, start=std::move(start), next=std::move(next), this] {
BackfillInterval bi;
bi.begin = std::move(start);
bi.end = std::move(next);
bi.version = pg.get_info().last_update;
bi.objects = std::move(*version_map);
logger().debug("{} BackfillInterval filled, leaving",
"scan_for_backfill");
return seastar::make_ready_future<BackfillInterval>(std::move(bi));
});
});
}
seastar::future<> RecoveryBackend::handle_scan_get_digest(

View File

@ -188,7 +188,7 @@ protected:
std::map<hobject_t, WaitForObjectRecovery> recovering;
hobject_t get_temp_recovery_object(
const hobject_t& target,
eversion_t version);
eversion_t version) const;
boost::container::flat_set<hobject_t> temp_contents;

View File

@ -352,43 +352,27 @@ seastar::future<PushOp> ReplicatedRecoveryBackend::build_push_op(
logger().debug("{} {} @{}",
__func__, recovery_info.soid, recovery_info.version);
return seastar::do_with(ObjectRecoveryProgress(progress),
object_info_t(),
uint64_t(crimson::common::local_conf()
->osd_recovery_max_chunk),
eversion_t(),
recovery_info.version,
PushOp(),
[this, &recovery_info, &progress, stat]
(auto& new_progress, auto& oi, auto& available, auto& v, auto& pop) {
return [this, &recovery_info, &progress, &new_progress, &oi, &v, pop=&pop] {
v = recovery_info.version;
if (progress.first) {
return backend->omap_get_header(coll, ghobject_t(recovery_info.soid))
.then([this, &recovery_info, pop](auto bl) {
pop->omap_header.claim_append(bl);
return store->get_attrs(coll, ghobject_t(recovery_info.soid));
}).safe_then([&oi, pop, &new_progress, &v](auto attrs) mutable {
for (auto& [key, val] : attrs) {
pop->attrset[std::move(key)].push_back(std::move(val));
}
logger().debug("build_push_op: {}", pop->attrset[OI_ATTR]);
oi.decode(pop->attrset[OI_ATTR]);
new_progress.first = false;
if (v == eversion_t()) {
v = oi.version;
}
return seastar::make_ready_future<>();
}, crimson::os::FuturizedStore::get_attrs_ertr::all_same_way(
[] (const std::error_code& e) {
return seastar::make_exception_future<>(e);
})
);
(auto new_progress, auto available, auto v, auto pop) {
return read_metadata_for_push_op(recovery_info.soid,
progress, new_progress,
v, &pop).then([&](eversion_t local_ver) mutable {
// If requestor didn't know the version, use ours
if (v == eversion_t()) {
v = local_ver;
} else if (v != local_ver) {
logger().error("build_push_op: {} push {} v{} failed because local copy is {}",
pg.get_pgid(), recovery_info.soid, recovery_info.version, local_ver);
// TODO: bail out
}
return seastar::make_ready_future<>();
}().then([this, &recovery_info, &progress, &new_progress, &available, &pop]() mutable {
return read_omap_for_push_op(recovery_info.soid,
progress,
new_progress,
available, &pop);
&available, &pop);
}).then([this, &recovery_info, &progress, &available, &pop]() mutable {
logger().debug("build_push_op: available: {}, copy_subset: {}",
available, recovery_info.copy_subset);
@ -424,6 +408,48 @@ seastar::future<PushOp> ReplicatedRecoveryBackend::build_push_op(
});
}
seastar::future<eversion_t>
ReplicatedRecoveryBackend::read_metadata_for_push_op(
const hobject_t& oid,
const ObjectRecoveryProgress& progress,
ObjectRecoveryProgress& new_progress,
eversion_t ver,
PushOp* push_op)
{
if (!progress.first) {
return seastar::make_ready_future<eversion_t>(ver);
}
return seastar::when_all_succeed(
backend->omap_get_header(coll, ghobject_t(oid)).handle_error(
crimson::os::FuturizedStore::read_errorator::all_same_way(
[] (const std::error_code& e) {
return seastar::make_ready_future<bufferlist>();
})),
store->get_attrs(coll, ghobject_t(oid)).handle_error(
crimson::os::FuturizedStore::get_attrs_ertr::all_same_way(
[] (const std::error_code& e) {
return seastar::make_ready_future<crimson::os::FuturizedStore::attrs_t>();
}))
).then_unpack([&new_progress, push_op](auto bl, auto attrs) {
if (bl.length() == 0) {
logger().error("read_metadata_for_push_op: fail to read omap header");
return eversion_t{};
} else if (attrs.empty()) {
logger().error("read_metadata_for_push_op: fail to read attrs");
return eversion_t{};
}
push_op->omap_header.claim_append(std::move(bl));
for (auto&& [key, val] : std::move(attrs)) {
push_op->attrset[key].push_back(val);
}
logger().debug("read_metadata_for_push_op: {}", push_op->attrset[OI_ATTR]);
object_info_t oi;
oi.decode(push_op->attrset[OI_ATTR]);
new_progress.first = false;
return oi.version;
});
}
seastar::future<uint64_t>
ReplicatedRecoveryBackend::read_object_for_push_op(
const hobject_t& oid,
@ -477,16 +503,16 @@ ReplicatedRecoveryBackend::read_omap_for_push_op(
const hobject_t& oid,
const ObjectRecoveryProgress& progress,
ObjectRecoveryProgress& new_progress,
uint64_t max_len,
uint64_t* max_len,
PushOp* push_op)
{
if (progress.omap_complete) {
return seastar::make_ready_future<>();
}
return shard_services.get_store().get_omap_iterator(coll, ghobject_t{oid})
.then([&progress, &new_progress, &max_len, push_op](auto omap_iter) {
if (progress.omap_complete) {
return seastar::make_ready_future<>();
}
.then([&progress, &new_progress, max_len, push_op](auto omap_iter) {
return omap_iter->lower_bound(progress.omap_recovered_to).then(
[omap_iter, &new_progress, &max_len, push_op] {
[omap_iter, &new_progress, max_len, push_op] {
return seastar::do_until([omap_iter, &new_progress, max_len, push_op] {
if (!omap_iter->valid()) {
new_progress.omap_complete = true;
@ -502,20 +528,20 @@ ReplicatedRecoveryBackend::read_omap_for_push_op(
new_progress.omap_recovered_to = omap_iter->key();
return true;
}
if (omap_iter->key().size() + omap_iter->value().length() > max_len) {
if (omap_iter->key().size() + omap_iter->value().length() > *max_len) {
new_progress.omap_recovered_to = omap_iter->key();
return true;
}
return false;
},
[omap_iter, &max_len, push_op] {
[omap_iter, max_len, push_op] {
push_op->omap_entries.emplace(omap_iter->key(), omap_iter->value());
if (const uint64_t entry_size =
omap_iter->key().size() + omap_iter->value().length() > max_len;
entry_size >= max_len) {
max_len -= entry_size;
omap_iter->key().size() + omap_iter->value().length();
entry_size > *max_len) {
*max_len -= entry_size;
} else {
max_len = 0;
*max_len = 0;
}
return omap_iter->next();
});
@ -581,7 +607,7 @@ seastar::future<> ReplicatedRecoveryBackend::handle_pull(Ref<MOSDPGPull> m)
seastar::future<bool> ReplicatedRecoveryBackend::_handle_pull_response(
pg_shard_t from,
PushOp& pop,
const PushOp& pop,
PullOp* response,
ceph::os::Transaction* t)
{
@ -607,7 +633,7 @@ seastar::future<bool> ReplicatedRecoveryBackend::_handle_pull_response(
pi.recovery_info.soid, [&pi, &recovery_waiter, &pop](auto obc) {
pi.obc = obc;
recovery_waiter.obc = obc;
obc->obs.oi.decode(pop.attrset[OI_ATTR]);
obc->obs.oi.decode(pop.attrset.at(OI_ATTR));
pi.recovery_info.oi = obc->obs.oi;
return crimson::osd::PG::load_obc_ertr::now();
}).handle_error(crimson::ct_error::assert_all{});

View File

@ -59,9 +59,11 @@ protected:
const ObjectRecoveryInfo& recovery_info,
const ObjectRecoveryProgress& progress,
object_stat_sum_t* stat);
/// @returns true if this push op is the last push op for
/// recovery @c pop.soid
seastar::future<bool> _handle_pull_response(
pg_shard_t from,
PushOp& pop,
const PushOp& pop,
PullOp* response,
ceph::os::Transaction* t);
std::pair<interval_set<uint64_t>, ceph::bufferlist> trim_pushed_data(
@ -117,6 +119,16 @@ private:
const hobject_t& soid,
eversion_t need);
/// read the data attached to given object. the size of them is supposed to
/// be relatively small.
///
/// @return @c oi.version
seastar::future<eversion_t> read_metadata_for_push_op(
const hobject_t& oid,
const ObjectRecoveryProgress& progress,
ObjectRecoveryProgress& new_progress,
eversion_t ver,
PushOp* push_op);
/// read the remaining extents of object to be recovered and fill push_op
/// with them
///
@ -134,6 +146,6 @@ private:
const hobject_t& oid,
const ObjectRecoveryProgress& progress,
ObjectRecoveryProgress& new_progress,
uint64_t max_len,
uint64_t* max_len,
PushOp* push_op);
};

View File

@ -430,9 +430,8 @@ class interval_set {
}
offset_type range_end() const {
ceph_assert(!empty());
auto p = m.end();
p--;
return p->first+p->second;
auto p = m.rbegin();
return p->first + p->second;
}
// interval start after p (where p not in set)

View File

@ -5881,7 +5881,7 @@ struct object_info_t {
void encode(ceph::buffer::list& bl, uint64_t features) const;
void decode(ceph::buffer::list::const_iterator& bl);
void decode(ceph::buffer::list& bl) {
void decode(const ceph::buffer::list& bl) {
auto p = std::cbegin(bl);
decode(p);
}