mirror of
https://github.com/ceph/ceph
synced 2025-04-01 14:51:13 +00:00
Merge pull request #33943 from xxhdx1985126/wip-crimson-alienstore-bugs
crimson: fix bugs that come up when osds go through down/up Reviewed-by: Samuel Just <sjust@redhat.com> Reviewed-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
commit
0e40f6fb01
@ -11,15 +11,16 @@
|
||||
|
||||
namespace crimson::os {
|
||||
|
||||
struct AlienCollection final : public FuturizedCollection {
|
||||
|
||||
ObjectStore::CollectionHandle collection;
|
||||
|
||||
class AlienCollection final : public FuturizedCollection {
|
||||
public:
|
||||
AlienCollection(ObjectStore::CollectionHandle ch)
|
||||
: FuturizedCollection(ch->cid),
|
||||
collection(ch) {}
|
||||
|
||||
~AlienCollection() {}
|
||||
};
|
||||
|
||||
private:
|
||||
ObjectStore::CollectionHandle collection;
|
||||
friend AlienStore;
|
||||
};
|
||||
}
|
||||
|
@ -380,4 +380,121 @@ unsigned AlienStore::get_max_attr_name_length() const
|
||||
return 256;
|
||||
}
|
||||
|
||||
seastar::future<struct stat> AlienStore::stat(
|
||||
CollectionRef ch,
|
||||
const ghobject_t& oid)
|
||||
{
|
||||
return seastar::do_with((struct stat){}, [this, ch, oid](auto& st) {
|
||||
return tp->submit([this, ch, oid, &st] {
|
||||
auto c = static_cast<AlienCollection*>(ch.get());
|
||||
store->stat(c->collection, oid, &st);
|
||||
return st;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
seastar::future<ceph::bufferlist> AlienStore::omap_get_header(
|
||||
CollectionRef ch,
|
||||
const ghobject_t& oid)
|
||||
{
|
||||
return seastar::do_with(ceph::bufferlist(), [=](auto& bl) {
|
||||
return tp->submit([=, &bl] {
|
||||
auto c = static_cast<AlienCollection*>(ch.get());
|
||||
return store->omap_get_header(c->collection, oid, &bl);
|
||||
}).then([&bl] (int i) {
|
||||
return seastar::make_ready_future<ceph::bufferlist>(std::move(bl));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
seastar::future<std::map<uint64_t, uint64_t>> AlienStore::fiemap(
|
||||
CollectionRef ch,
|
||||
const ghobject_t& oid,
|
||||
uint64_t off,
|
||||
uint64_t len)
|
||||
{
|
||||
return seastar::do_with(std::map<uint64_t, uint64_t>(), [=](auto& destmap) {
|
||||
return tp->submit([=, &destmap] {
|
||||
auto c = static_cast<AlienCollection*>(ch.get());
|
||||
return store->fiemap(c->collection, oid, off, len, destmap);
|
||||
}).then([&destmap] (int i) {
|
||||
return seastar::make_ready_future
|
||||
<std::map<uint64_t, uint64_t>>
|
||||
(std::move(destmap));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
seastar::future<FuturizedStore::OmapIteratorRef> AlienStore::get_omap_iterator(
|
||||
CollectionRef ch,
|
||||
const ghobject_t& oid)
|
||||
{
|
||||
return tp->submit([=] {
|
||||
auto c = static_cast<AlienCollection*>(ch.get());
|
||||
auto iter = store->get_omap_iterator(c->collection, oid);
|
||||
return FuturizedStore::OmapIteratorRef(
|
||||
new AlienStore::AlienOmapIterator(iter,
|
||||
this));
|
||||
});
|
||||
}
|
||||
|
||||
//TODO: each iterator op needs one submit, this is not efficient,
|
||||
// needs further optimization.
|
||||
seastar::future<int> AlienStore::AlienOmapIterator::seek_to_first()
|
||||
{
|
||||
return store->tp->submit([=] {
|
||||
return iter->seek_to_first();
|
||||
});
|
||||
}
|
||||
|
||||
seastar::future<int> AlienStore::AlienOmapIterator::upper_bound(
|
||||
const std::string& after)
|
||||
{
|
||||
return store->tp->submit([this, after] {
|
||||
return iter->upper_bound(after);
|
||||
});
|
||||
}
|
||||
|
||||
seastar::future<int> AlienStore::AlienOmapIterator::lower_bound(
|
||||
const std::string& to)
|
||||
{
|
||||
return store->tp->submit([this, to] {
|
||||
return iter->lower_bound(to);
|
||||
});
|
||||
}
|
||||
|
||||
seastar::future<int> AlienStore::AlienOmapIterator::next()
|
||||
{
|
||||
return store->tp->submit([this] {
|
||||
return iter->next();
|
||||
});
|
||||
}
|
||||
|
||||
bool AlienStore::AlienOmapIterator::valid() const
|
||||
{
|
||||
return iter->valid();
|
||||
}
|
||||
|
||||
std::string AlienStore::AlienOmapIterator::key()
|
||||
{
|
||||
return iter->key();
|
||||
}
|
||||
|
||||
seastar::future<std::string> AlienStore::AlienOmapIterator::tail_key()
|
||||
{
|
||||
return store->tp->submit([this] {
|
||||
return iter->tail_key();
|
||||
});
|
||||
}
|
||||
|
||||
ceph::buffer::list AlienStore::AlienOmapIterator::value()
|
||||
{
|
||||
return iter->value();
|
||||
}
|
||||
|
||||
int AlienStore::AlienOmapIterator::status() const
|
||||
{
|
||||
return iter->status();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -21,6 +21,23 @@ class Transaction;
|
||||
namespace crimson::os {
|
||||
class AlienStore final : public FuturizedStore {
|
||||
public:
|
||||
class AlienOmapIterator final : public OmapIterator {
|
||||
public:
|
||||
AlienOmapIterator(ObjectMap::ObjectMapIterator& it,
|
||||
AlienStore* store) : iter(it), store(store) {}
|
||||
seastar::future<int> seek_to_first();
|
||||
seastar::future<int> upper_bound(const std::string& after);
|
||||
seastar::future<int> lower_bound(const std::string& to);
|
||||
bool valid() const;
|
||||
seastar::future<int> next();
|
||||
std::string key();
|
||||
seastar::future<std::string> tail_key();
|
||||
ceph::buffer::list value();
|
||||
int status() const;
|
||||
private:
|
||||
ObjectMap::ObjectMapIterator iter;
|
||||
AlienStore* store;
|
||||
};
|
||||
mutable std::unique_ptr<crimson::thread::ThreadPool> tp;
|
||||
AlienStore(const std::string& path, const ConfigValues& values);
|
||||
~AlienStore() final;
|
||||
@ -74,7 +91,20 @@ public:
|
||||
uuid_d get_fsid() const final;
|
||||
seastar::future<store_statfs_t> stat() const final;
|
||||
unsigned get_max_attr_name_length() const final;
|
||||
|
||||
seastar::future<struct stat> stat(
|
||||
CollectionRef,
|
||||
const ghobject_t&) final;
|
||||
seastar::future<ceph::bufferlist> omap_get_header(
|
||||
CollectionRef,
|
||||
const ghobject_t&) final;
|
||||
seastar::future<std::map<uint64_t, uint64_t>> fiemap(
|
||||
CollectionRef,
|
||||
const ghobject_t&,
|
||||
uint64_t off,
|
||||
uint64_t len) final;
|
||||
seastar::future<FuturizedStore::OmapIteratorRef> get_omap_iterator(
|
||||
CollectionRef ch,
|
||||
const ghobject_t& oid) final;
|
||||
private:
|
||||
constexpr static unsigned MAX_KEYS_PER_OMAP_GET_CALL = 32;
|
||||
const std::string path;
|
||||
|
@ -1,10 +1,13 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
#pragma once
|
||||
|
||||
#include <cstddef>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <boost/intrusive_ptr.hpp>
|
||||
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
|
||||
|
||||
#include "include/buffer.h"
|
||||
|
||||
namespace crimson::os {
|
||||
|
@ -276,6 +276,20 @@ CyanStore::omap_get_values(
|
||||
true, values);
|
||||
}
|
||||
|
||||
seastar::future<ceph::bufferlist>
|
||||
CyanStore::omap_get_header(
|
||||
CollectionRef ch,
|
||||
const ghobject_t& oid
|
||||
) {
|
||||
auto c = static_cast<Collection*>(ch.get());
|
||||
auto o = c->get_object(oid);
|
||||
if (!o) {
|
||||
throw std::runtime_error(fmt::format("object does not exist: {}", oid));
|
||||
}
|
||||
|
||||
return seastar::make_ready_future<ceph::bufferlist>(o->omap_header);
|
||||
}
|
||||
|
||||
seastar::future<> CyanStore::do_transaction(CollectionRef ch,
|
||||
ceph::os::Transaction&& t)
|
||||
{
|
||||
@ -647,4 +661,79 @@ unsigned CyanStore::get_max_attr_name_length() const
|
||||
// arbitrary limitation exactly like in the case of MemStore.
|
||||
return 256;
|
||||
}
|
||||
|
||||
seastar::future<FuturizedStore::OmapIteratorRef> CyanStore::get_omap_iterator(
|
||||
CollectionRef ch,
|
||||
const ghobject_t& oid)
|
||||
{
|
||||
auto c = static_cast<Collection*>(ch.get());
|
||||
auto o = c->get_object(oid);
|
||||
if (!o) {
|
||||
throw std::runtime_error(fmt::format("object does not exist: {}", oid));
|
||||
}
|
||||
return seastar::make_ready_future<FuturizedStore::OmapIteratorRef>(
|
||||
new CyanStore::CyanOmapIterator(o));
|
||||
}
|
||||
|
||||
seastar::future<std::map<uint64_t, uint64_t>>
|
||||
CyanStore::fiemap(
|
||||
CollectionRef ch,
|
||||
const ghobject_t& oid,
|
||||
uint64_t off,
|
||||
uint64_t len)
|
||||
{
|
||||
auto c = static_cast<Collection*>(ch.get());
|
||||
|
||||
ObjectRef o = c->get_object(oid);
|
||||
if (!o) {
|
||||
throw std::runtime_error(fmt::format("object does not exist: {}", oid));
|
||||
}
|
||||
std::map<uint64_t, uint64_t> m{{0, o->get_size()}};
|
||||
return seastar::make_ready_future<std::map<uint64_t, uint64_t>>(std::move(m));
|
||||
}
|
||||
|
||||
seastar::future<struct stat>
|
||||
CyanStore::stat(
|
||||
CollectionRef ch,
|
||||
const ghobject_t& oid)
|
||||
{
|
||||
auto c = static_cast<Collection*>(ch.get());
|
||||
auto o = c->get_object(oid);
|
||||
if (!o) {
|
||||
throw std::runtime_error(fmt::format("object does not exist: {}", oid));
|
||||
}
|
||||
struct stat st;
|
||||
st.st_size = o->get_size();
|
||||
return seastar::make_ready_future<struct stat>(std::move(st));
|
||||
}
|
||||
|
||||
seastar::future<int> CyanStore::CyanOmapIterator::seek_to_first()
|
||||
{
|
||||
iter = obj->omap.begin();
|
||||
return seastar::make_ready_future<int>(0);
|
||||
}
|
||||
|
||||
seastar::future<int> CyanStore::CyanOmapIterator::upper_bound(const std::string& after)
|
||||
{
|
||||
iter = obj->omap.upper_bound(after);
|
||||
return seastar::make_ready_future<int>(0);
|
||||
}
|
||||
|
||||
seastar::future<int> CyanStore::CyanOmapIterator::lower_bound(const std::string &to)
|
||||
{
|
||||
iter = obj->omap.lower_bound(to);
|
||||
return seastar::make_ready_future<int>(0);
|
||||
}
|
||||
|
||||
bool CyanStore::CyanOmapIterator::valid() const
|
||||
{
|
||||
return iter != obj->omap.end();
|
||||
}
|
||||
|
||||
seastar::future<int> CyanStore::CyanOmapIterator::next()
|
||||
{
|
||||
++iter;
|
||||
return seastar::make_ready_future<int>(0);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include "osd/osd_types.h"
|
||||
#include "include/uuid.h"
|
||||
|
||||
#include "crimson/os/cyanstore/cyan_object.h"
|
||||
#include "crimson/os/futurized_store.h"
|
||||
|
||||
namespace ceph::os {
|
||||
@ -35,6 +36,34 @@ class CyanStore final : public FuturizedStore {
|
||||
uuid_d osd_fsid;
|
||||
|
||||
public:
|
||||
class CyanOmapIterator final : public OmapIterator {
|
||||
public:
|
||||
CyanOmapIterator() {}
|
||||
CyanOmapIterator(ObjectRef obj) : obj(obj) {
|
||||
iter = obj->omap.begin();
|
||||
}
|
||||
virtual seastar::future<int> seek_to_first();
|
||||
virtual seastar::future<int> upper_bound(const std::string &after);
|
||||
virtual seastar::future<int> lower_bound(const std::string &to);
|
||||
virtual bool valid() const;
|
||||
virtual seastar::future<int> next();
|
||||
virtual std::string key() {
|
||||
return iter->first;
|
||||
}
|
||||
virtual seastar::future<std::string> tail_key() {
|
||||
return seastar::make_ready_future<std::string>((++obj->omap.end())->first);
|
||||
}
|
||||
virtual ceph::buffer::list value() {
|
||||
return iter->second;
|
||||
}
|
||||
virtual int status() const {
|
||||
return iter != obj->omap.end() ? 0 : -1;
|
||||
}
|
||||
virtual ~CyanOmapIterator() {}
|
||||
private:
|
||||
std::map<std::string, bufferlist>::const_iterator iter;
|
||||
ObjectRef obj;
|
||||
};
|
||||
|
||||
CyanStore(const std::string& path);
|
||||
~CyanStore() final;
|
||||
@ -45,6 +74,9 @@ public:
|
||||
|
||||
seastar::future<> mkfs(uuid_d new_osd_fsid) final;
|
||||
seastar::future<store_statfs_t> stat() const final;
|
||||
seastar::future<struct stat> stat(
|
||||
CollectionRef c,
|
||||
const ghobject_t& oid) final;
|
||||
|
||||
read_errorator::future<ceph::bufferlist> read(
|
||||
CollectionRef c,
|
||||
@ -78,6 +110,10 @@ public:
|
||||
const std::optional<std::string> &start ///< [in] start, empty for begin
|
||||
) final; ///< @return <done, values> values.empty() iff done
|
||||
|
||||
seastar::future<ceph::bufferlist> omap_get_header(
|
||||
CollectionRef c,
|
||||
const ghobject_t& oid) final;
|
||||
|
||||
seastar::future<CollectionRef> create_new_collection(const coll_t& cid) final;
|
||||
seastar::future<CollectionRef> open_collection(const coll_t& cid) final;
|
||||
seastar::future<std::vector<coll_t>> list_collections() final;
|
||||
@ -91,6 +127,15 @@ public:
|
||||
uuid_d get_fsid() const final;
|
||||
unsigned get_max_attr_name_length() const final;
|
||||
|
||||
seastar::future<OmapIteratorRef> get_omap_iterator(
|
||||
CollectionRef c,
|
||||
const ghobject_t& oid);
|
||||
|
||||
seastar::future<std::map<uint64_t, uint64_t>> fiemap(CollectionRef c,
|
||||
const ghobject_t& oid,
|
||||
uint64_t off,
|
||||
uint64_t len);
|
||||
|
||||
private:
|
||||
int _remove(const coll_t& cid, const ghobject_t& oid);
|
||||
int _touch(const coll_t& cid, const ghobject_t& oid);
|
||||
|
@ -26,6 +26,43 @@ class FuturizedCollection;
|
||||
class FuturizedStore {
|
||||
|
||||
public:
|
||||
class OmapIterator {
|
||||
public:
|
||||
virtual seastar::future<int> seek_to_first() {
|
||||
return seastar::make_ready_future<int>(0);
|
||||
}
|
||||
virtual seastar::future<int> upper_bound(const std::string &after) {
|
||||
return seastar::make_ready_future<int>(0);
|
||||
}
|
||||
virtual seastar::future<int> lower_bound(const std::string &to) {
|
||||
return seastar::make_ready_future<int>(0);
|
||||
}
|
||||
virtual bool valid() const {
|
||||
return false;
|
||||
}
|
||||
virtual seastar::future<int> next() {
|
||||
return seastar::make_ready_future<int>(0);
|
||||
}
|
||||
virtual std::string key() {
|
||||
return {};
|
||||
}
|
||||
virtual seastar::future<std::string> tail_key() {
|
||||
return seastar::make_ready_future<std::string>();
|
||||
}
|
||||
virtual ceph::buffer::list value() {
|
||||
return {};
|
||||
}
|
||||
virtual int status() const {
|
||||
return 0;
|
||||
}
|
||||
virtual ~OmapIterator() {}
|
||||
private:
|
||||
unsigned count = 0;
|
||||
friend void intrusive_ptr_add_ref(FuturizedStore::OmapIterator* iter);
|
||||
friend void intrusive_ptr_release(FuturizedStore::OmapIterator* iter);
|
||||
};
|
||||
using OmapIteratorRef = boost::intrusive_ptr<OmapIterator>;
|
||||
|
||||
static std::unique_ptr<FuturizedStore> create(const std::string& type,
|
||||
const std::string& data,
|
||||
const ConfigValues& values);
|
||||
@ -70,6 +107,9 @@ public:
|
||||
virtual get_attrs_ertr::future<attrs_t> get_attrs(
|
||||
CollectionRef c,
|
||||
const ghobject_t& oid) = 0;
|
||||
virtual seastar::future<struct stat> stat(
|
||||
CollectionRef c,
|
||||
const ghobject_t& oid) = 0;
|
||||
|
||||
using omap_values_t = std::map<std::string, bufferlist, std::less<>>;
|
||||
using omap_keys_t = std::set<std::string>;
|
||||
@ -88,12 +128,24 @@ public:
|
||||
const std::optional<std::string> &start ///< [in] start, empty for begin
|
||||
) = 0; ///< @return <done, values> values.empty() iff done
|
||||
|
||||
virtual seastar::future<bufferlist> omap_get_header(
|
||||
CollectionRef c,
|
||||
const ghobject_t& oid) = 0;
|
||||
|
||||
virtual seastar::future<CollectionRef> create_new_collection(const coll_t& cid) = 0;
|
||||
virtual seastar::future<CollectionRef> open_collection(const coll_t& cid) = 0;
|
||||
virtual seastar::future<std::vector<coll_t>> list_collections() = 0;
|
||||
|
||||
virtual seastar::future<> do_transaction(CollectionRef ch,
|
||||
ceph::os::Transaction&& txn) = 0;
|
||||
virtual seastar::future<OmapIteratorRef> get_omap_iterator(
|
||||
CollectionRef ch,
|
||||
const ghobject_t& oid) = 0;
|
||||
virtual seastar::future<std::map<uint64_t, uint64_t>> fiemap(
|
||||
CollectionRef ch,
|
||||
const ghobject_t& oid,
|
||||
uint64_t off,
|
||||
uint64_t len) = 0;
|
||||
|
||||
virtual seastar::future<> write_meta(const std::string& key,
|
||||
const std::string& value) = 0;
|
||||
@ -102,4 +154,17 @@ public:
|
||||
virtual unsigned get_max_attr_name_length() const = 0;
|
||||
};
|
||||
|
||||
inline void intrusive_ptr_add_ref(FuturizedStore::OmapIterator* iter) {
|
||||
assert(iter);
|
||||
iter->count++;
|
||||
}
|
||||
|
||||
inline void intrusive_ptr_release(FuturizedStore::OmapIterator* iter) {
|
||||
assert(iter);
|
||||
assert(iter->count > 0);
|
||||
if ((--iter->count) == 0) {
|
||||
delete iter;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -549,13 +549,15 @@ seastar::future<Ref<PG>> OSD::make_pg(cached_map_t create_map,
|
||||
|
||||
seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
|
||||
{
|
||||
return PGMeta{store.get(), pgid}.get_epoch().then([this](epoch_t e) {
|
||||
return seastar::do_with(PGMeta(store.get(), pgid), [this, pgid] (auto& pg_meta) {
|
||||
return pg_meta.get_epoch();
|
||||
}).then([this](epoch_t e) {
|
||||
return get_map(e);
|
||||
}).then([pgid, this] (auto&& create_map) {
|
||||
return make_pg(std::move(create_map), pgid, false);
|
||||
}).then([this, pgid](Ref<PG> pg) {
|
||||
}).then([this](Ref<PG> pg) {
|
||||
return pg->read_state(store.get()).then([pg] {
|
||||
return seastar::make_ready_future<Ref<PG>>(std::move(pg));
|
||||
return seastar::make_ready_future<Ref<PG>>(std::move(pg));
|
||||
});
|
||||
}).handle_exception([pgid](auto ep) {
|
||||
logger().info("pg {} saw exception on load {}", pgid, ep);
|
||||
|
@ -364,8 +364,9 @@ void PG::init(
|
||||
|
||||
seastar::future<> PG::read_state(crimson::os::FuturizedStore* store)
|
||||
{
|
||||
return PGMeta{store, pgid}.load(
|
||||
).then([this, store](pg_info_t pg_info, PastIntervals past_intervals) {
|
||||
return seastar::do_with(PGMeta(store, pgid), [this, store] (auto& pg_meta) {
|
||||
return pg_meta.load();
|
||||
}).then([this, store](pg_info_t pg_info, PastIntervals past_intervals) {
|
||||
return peering_state.init_from_disk_state(
|
||||
std::move(pg_info),
|
||||
std::move(past_intervals),
|
||||
|
@ -1627,23 +1627,24 @@ public:
|
||||
|
||||
std::optional<std::string> next;
|
||||
|
||||
void process_entry(const std::pair<std::string, ceph::bufferlist> &p) {
|
||||
if (p.first[0] == '_')
|
||||
void process_entry(crimson::os::FuturizedStore::OmapIteratorRef &p) {
|
||||
if (p->key()[0] == '_')
|
||||
return;
|
||||
ceph::bufferlist bl = p.second;//Copy bufferlist before creating iterator
|
||||
//Copy bufferlist before creating iterator
|
||||
ceph::bufferlist bl = p->value();
|
||||
auto bp = bl.cbegin();
|
||||
if (p.first == "divergent_priors") {
|
||||
if (p->key() == "divergent_priors") {
|
||||
decode(divergent_priors, bp);
|
||||
ldpp_dout(dpp, 20) << "read_log_and_missing " << divergent_priors.size()
|
||||
<< " divergent_priors" << dendl;
|
||||
ceph_assert("crimson shouldn't have had divergent_priors" == 0);
|
||||
} else if (p.first == "can_rollback_to") {
|
||||
} else if (p->key() == "can_rollback_to") {
|
||||
decode(on_disk_can_rollback_to, bp);
|
||||
} else if (p.first == "rollback_info_trimmed_to") {
|
||||
} else if (p->key() == "rollback_info_trimmed_to") {
|
||||
decode(on_disk_rollback_info_trimmed_to, bp);
|
||||
} else if (p.first == "may_include_deletes_in_missing") {
|
||||
} else if (p->key() == "may_include_deletes_in_missing") {
|
||||
missing.may_include_deletes = true;
|
||||
} else if (p.first.substr(0, 7) == string("missing")) {
|
||||
} else if (p->key().substr(0, 7) == string("missing")) {
|
||||
hobject_t oid;
|
||||
pg_missing_item item;
|
||||
decode(oid, bp);
|
||||
@ -1652,7 +1653,7 @@ public:
|
||||
ceph_assert(missing.may_include_deletes);
|
||||
}
|
||||
missing.add(oid, std::move(item));
|
||||
} else if (p.first.substr(0, 4) == string("dup_")) {
|
||||
} else if (p->key().substr(0, 4) == string("dup_")) {
|
||||
pg_log_dup_t dup;
|
||||
decode(dup, bp);
|
||||
if (!dups.empty()) {
|
||||
@ -1679,18 +1680,18 @@ public:
|
||||
missing.may_include_deletes = false;
|
||||
|
||||
auto reader = std::unique_ptr<FuturizedStoreLogReader>(this);
|
||||
return seastar::repeat(
|
||||
[this]() {
|
||||
return store.omap_get_values(ch, pgmeta_oid, next).then(
|
||||
[this](
|
||||
bool done, crimson::os::FuturizedStore::omap_values_t values) {
|
||||
for (auto &&p : values) {
|
||||
process_entry(p);
|
||||
}
|
||||
return done ? seastar::stop_iteration::yes
|
||||
: seastar::stop_iteration::no;
|
||||
});
|
||||
}).then([this, reader{std::move(reader)}]() {
|
||||
return store.get_omap_iterator(ch, pgmeta_oid).then([this](auto iter) {
|
||||
return seastar::repeat([this, iter]() mutable {
|
||||
if (!iter->valid()) {
|
||||
return seastar::make_ready_future<seastar::stop_iteration>(
|
||||
seastar::stop_iteration::yes);
|
||||
}
|
||||
process_entry(iter);
|
||||
return iter->next().then([](int) {
|
||||
return seastar::stop_iteration::no;
|
||||
});
|
||||
});
|
||||
}).then([this, reader{std::move(reader)}]() {
|
||||
log = IndexedLog(
|
||||
info.last_update,
|
||||
info.log_tail,
|
||||
|
Loading…
Reference in New Issue
Block a user