Merge pull request #47646 from cyx1231st/wip-seastore-move-cleaner-to-epm

crimson/os/seastore: move AsyncCleaner to EPM

Reviewed-by: Samuel Just <sjust@redhat.com>
Reviewed-by: Xuehan Xu <xxhdx1985126@gmail.com>
Reviewed-by: Zhang Song <zhangsong325@gmail.com>
This commit is contained in:
Yingxin 2022-08-24 07:54:15 +08:00 committed by GitHub
commit 03e45eb506
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 368 additions and 292 deletions

View File

@ -6,6 +6,7 @@
#include "crimson/os/seastore/logging.h"
#include "crimson/os/seastore/async_cleaner.h"
#include "crimson/os/seastore/backref_manager.h"
#include "crimson/os/seastore/transaction_manager.h"
SET_SUBSYS(seastore_cleaner);
@ -1044,7 +1045,8 @@ AsyncCleaner::mount_ret AsyncCleaner::mount()
LOG_PREFIX(AsyncCleaner::mount);
const auto& sms = sm_group->get_segment_managers();
INFO("{} segment managers", sms.size());
init_complete = false;
ceph_assert(state == cleaner_state_t::STOP);
state = cleaner_state_t::MOUNT;
stats = {};
journal_head = JOURNAL_SEQ_NULL;
journal_alloc_tail = JOURNAL_SEQ_NULL;
@ -1187,14 +1189,14 @@ AsyncCleaner::scan_extents_ret AsyncCleaner::scan_no_tail_segment(
});
}
void AsyncCleaner::complete_init()
void AsyncCleaner::start_gc()
{
LOG_PREFIX(AsyncCleaner::complete_init);
LOG_PREFIX(AsyncCleaner::start_gc);
ceph_assert(state == cleaner_state_t::SCAN_SPACE);
state = cleaner_state_t::READY;
if (disable_trim) {
init_complete = true;
return;
}
init_complete = true;
INFO("done, start GC, {}", gc_stat_printer_t{this, true});
ceph_assert(journal_head != JOURNAL_SEQ_NULL);
ceph_assert(journal_alloc_tail != JOURNAL_SEQ_NULL);
@ -1204,17 +1206,60 @@ void AsyncCleaner::complete_init()
seastar::future<> AsyncCleaner::stop()
{
if (is_ready()) {
state = cleaner_state_t::HALT;
} else {
state = cleaner_state_t::STOP;
}
return gc_process.stop(
).then([this] {
LOG_PREFIX(AsyncCleaner::stop);
INFO("done, {}", gc_stat_printer_t{this, true});
// run_until_halt() can be called at HALT
});
}
bool AsyncCleaner::check_usage()
{
SpaceTrackerIRef tracker(space_tracker->make_empty());
ecb->with_transaction_weak(
"check_usage",
[this, &tracker](auto &t) {
return backref_manager.scan_mapped_space(
t,
[&tracker](
paddr_t paddr,
extent_len_t len,
extent_types_t type,
laddr_t laddr)
{
if (paddr.get_addr_type() == paddr_types_t::SEGMENT) {
if (is_backref_node(type)) {
assert(laddr == L_ADDR_NULL);
tracker->allocate(
paddr.as_seg_paddr().get_segment_id(),
paddr.as_seg_paddr().get_segment_off(),
len);
} else if (laddr == L_ADDR_NULL) {
tracker->release(
paddr.as_seg_paddr().get_segment_id(),
paddr.as_seg_paddr().get_segment_off(),
len);
} else {
tracker->allocate(
paddr.as_seg_paddr().get_segment_id(),
paddr.as_seg_paddr().get_segment_off(),
len);
}
}
});
}).unsafe_get0();
return space_tracker->equals(*tracker);
}
void AsyncCleaner::mark_space_used(
paddr_t addr,
extent_len_t len,
bool init_scan)
extent_len_t len)
{
LOG_PREFIX(AsyncCleaner::mark_space_used);
if (addr.get_addr_type() != paddr_types_t::SEGMENT) {
@ -1222,7 +1267,7 @@ void AsyncCleaner::mark_space_used(
}
auto& seg_addr = addr.as_seg_paddr();
if (!init_scan && !init_complete) {
if (state < cleaner_state_t::SCAN_SPACE) {
return;
}
@ -1246,11 +1291,10 @@ void AsyncCleaner::mark_space_used(
void AsyncCleaner::mark_space_free(
paddr_t addr,
extent_len_t len,
bool init_scan)
extent_len_t len)
{
LOG_PREFIX(AsyncCleaner::mark_space_free);
if (!init_complete && !init_scan) {
if (state < cleaner_state_t::SCAN_SPACE) {
return;
}
if (addr.get_addr_type() != paddr_types_t::SEGMENT) {
@ -1336,7 +1380,7 @@ AsyncCleaner::reserve_projected_usage(std::size_t projected_usage)
if (disable_trim) {
return seastar::now();
}
ceph_assert(init_complete);
ceph_assert(is_ready());
// The pipeline configuration prevents another IO from entering
// prepare until the prior one exits and clears this.
ceph_assert(!blocked_io_wake);
@ -1379,7 +1423,7 @@ AsyncCleaner::reserve_projected_usage(std::size_t projected_usage)
void AsyncCleaner::release_projected_usage(std::size_t projected_usage)
{
if (disable_trim) return;
ceph_assert(init_complete);
ceph_assert(is_ready());
ceph_assert(stats.projected_used_bytes >= projected_usage);
stats.projected_used_bytes -= projected_usage;
return maybe_wake_gc_blocked_io();
@ -1388,14 +1432,14 @@ void AsyncCleaner::release_projected_usage(std::size_t projected_usage)
std::ostream &operator<<(std::ostream &os, AsyncCleaner::gc_stat_printer_t stats)
{
os << "gc_stats(";
if (stats.cleaner->init_complete) {
if (stats.cleaner->is_ready()) {
os << "should_block_on_(trim=" << stats.cleaner->should_block_on_trim()
<< ", reclaim=" << stats.cleaner->should_block_on_reclaim() << ")"
<< ", should_(trim_dirty=" << stats.cleaner->gc_should_trim_dirty()
<< ", trim_alloc=" << stats.cleaner->gc_should_trim_alloc()
<< ", reclaim=" << stats.cleaner->gc_should_reclaim_space() << ")";
} else {
os << "init";
os << "not-ready";
}
os << ", projected_avail_ratio=" << stats.cleaner->get_projected_available_ratio()
<< ", reclaim_ratio=" << stats.cleaner->get_reclaim_ratio()
@ -1404,7 +1448,7 @@ std::ostream &operator<<(std::ostream &os, AsyncCleaner::gc_stat_printer_t stats
os << ", journal_head=" << stats.cleaner->journal_head
<< ", alloc_tail=" << stats.cleaner->journal_alloc_tail
<< ", dirty_tail=" << stats.cleaner->journal_dirty_tail;
if (stats.cleaner->init_complete) {
if (stats.cleaner->is_ready()) {
os << ", alloc_tail_target=" << stats.cleaner->get_alloc_tail_target()
<< ", dirty_tail_target=" << stats.cleaner->get_dirty_tail_target()
<< ", tail_limit=" << stats.cleaner->get_tail_limit();

View File

@ -10,7 +10,6 @@
#include "osd/osd_types.h"
#include "crimson/os/seastore/backref_manager.h"
#include "crimson/os/seastore/cached_extent.h"
#include "crimson/os/seastore/seastore_types.h"
#include "crimson/os/seastore/segment_manager.h"
@ -526,8 +525,17 @@ public:
bool equals(const SpaceTrackerI &other) const;
};
class BackrefManager;
class AsyncCleaner : public SegmentProvider, public JournalTrimmer {
enum class cleaner_state_t {
STOP,
MOUNT,
SCAN_SPACE,
READY,
HALT,
} state = cleaner_state_t::STOP;
public:
/// Config
struct config_t {
@ -729,7 +737,6 @@ private:
SpaceTrackerIRef space_tracker;
segments_info_t segments;
bool init_complete = false;
struct {
/**
@ -792,6 +799,7 @@ private:
* Should be removed once proper support is added. TODO
*/
bool disable_trim = false;
public:
AsyncCleaner(
config_t config,
@ -803,20 +811,33 @@ public:
return *ool_segment_seq_allocator;
}
void set_extent_callback(ExtentCallbackInterface *cb) {
ecb = cb;
}
using mount_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
using mount_ret = mount_ertr::future<>;
mount_ret mount();
/*
* SegmentProvider interfaces
*/
journal_seq_t get_journal_head() const final {
return journal_head;
void start_scan_space() {
ceph_assert(state == cleaner_state_t::MOUNT);
state = cleaner_state_t::SCAN_SPACE;
assert(space_tracker->equals(*space_tracker->make_empty()));
}
const segment_info_t& get_seg_info(segment_id_t id) const final {
return segments[id];
void start_gc();
bool is_ready() const {
return state >= cleaner_state_t::READY;
}
/*
* JournalTrimmer interfaces
*/
journal_seq_t get_journal_head() const final {
return journal_head;
}
void set_journal_head(journal_seq_t head) final {
@ -838,6 +859,25 @@ public:
gc_process.maybe_wake_on_space_used();
}
journal_seq_t get_dirty_tail() const final {
return journal_dirty_tail;
}
journal_seq_t get_alloc_tail() const final {
return journal_alloc_tail;
}
void update_journal_tails(
journal_seq_t dirty_tail, journal_seq_t alloc_tail) final;
/*
* SegmentProvider interfaces
*/
const segment_info_t& get_seg_info(segment_id_t id) const final {
return segments[id];
}
segment_id_t allocate_segment(
segment_seq_t, segment_type_t, data_category_t, reclaim_gen_t) final;
@ -858,40 +898,17 @@ public:
return sm_group.get();
}
journal_seq_t get_dirty_tail() const final {
return journal_dirty_tail;
}
journal_seq_t get_alloc_tail() const final {
return journal_alloc_tail;
}
void update_journal_tails(
journal_seq_t dirty_tail, journal_seq_t alloc_tail) final;
void adjust_segment_util(double old_usage, double new_usage) {
auto old_index = get_bucket_index(old_usage);
auto new_index = get_bucket_index(new_usage);
assert(stats.segment_util.buckets[old_index].count > 0);
stats.segment_util.buckets[old_index].count--;
stats.segment_util.buckets[new_index].count++;
}
void mark_space_used(
paddr_t addr,
extent_len_t len,
bool init_scan = false);
extent_len_t len);
void mark_space_free(
paddr_t addr,
extent_len_t len,
bool init_scan = false);
extent_len_t len);
SpaceTrackerIRef get_empty_space_tracker() const {
return space_tracker->make_empty();
}
seastar::future<> reserve_projected_usage(std::size_t projected_usage);
void complete_init();
void release_projected_usage(size_t projected_usage);
store_statfs_t stat() const {
store_statfs_t st;
@ -907,17 +924,14 @@ public:
seastar::future<> stop();
// Testing interfaces
seastar::future<> run_until_halt() {
ceph_assert(state == cleaner_state_t::HALT);
return gc_process.run_until_halt();
}
void set_extent_callback(ExtentCallbackInterface *cb) {
ecb = cb;
}
bool debug_check_space(const SpaceTrackerI &tracker) {
return space_tracker->equals(tracker);
}
bool check_usage();
void set_disable_trim(bool val) {
disable_trim = val;
@ -962,7 +976,7 @@ private:
segment_id_t get_next_reclaim_segment() const;
journal_seq_t get_dirty_tail_target() const {
assert(init_complete);
assert(is_ready());
auto ret = journal_head;
ceph_assert(ret != JOURNAL_SEQ_NULL);
if (ret.segment_seq >= config.target_journal_dirty_segments) {
@ -975,7 +989,7 @@ private:
}
journal_seq_t get_tail_limit() const {
assert(init_complete);
assert(is_ready());
auto ret = journal_head;
ceph_assert(ret != JOURNAL_SEQ_NULL);
if (ret.segment_seq >= config.max_journal_segments) {
@ -988,7 +1002,7 @@ private:
}
journal_seq_t get_alloc_tail_target() const {
assert(init_complete);
assert(is_ready());
auto ret = journal_head;
ceph_assert(ret != JOURNAL_SEQ_NULL);
if (ret.segment_seq >= config.target_journal_alloc_segments) {
@ -1262,13 +1276,13 @@ private:
* Encapsulates whether block pending gc.
*/
bool should_block_on_trim() const {
assert(init_complete);
assert(is_ready());
if (disable_trim) return false;
return get_tail_limit() > get_journal_tail();
}
bool should_block_on_reclaim() const {
assert(init_complete);
assert(is_ready());
if (disable_trim) return false;
if (get_segments_reclaimable() == 0) {
return false;
@ -1278,20 +1292,14 @@ private:
}
bool should_block_on_gc() const {
assert(init_complete);
assert(is_ready());
return should_block_on_trim() || should_block_on_reclaim();
}
void log_gc_state(const char *caller) const;
public:
seastar::future<> reserve_projected_usage(std::size_t projected_usage);
void release_projected_usage(size_t projected_usage);
private:
void maybe_wake_gc_blocked_io() {
if (!init_complete) {
if (!is_ready()) {
return;
}
if (!should_block_on_gc() && blocked_io_wake) {
@ -1312,7 +1320,7 @@ private:
* Encapsulates logic for whether gc should be reclaiming segment space.
*/
bool gc_should_reclaim_space() const {
assert(init_complete);
assert(is_ready());
if (disable_trim) return false;
if (get_segments_reclaimable() == 0) {
return false;
@ -1327,12 +1335,12 @@ private:
}
bool gc_should_trim_dirty() const {
assert(init_complete);
assert(is_ready());
return get_dirty_tail_target() > journal_dirty_tail;
}
bool gc_should_trim_alloc() const {
assert(init_complete);
assert(is_ready());
return get_alloc_tail_target() > journal_alloc_tail;
}
/**
@ -1342,19 +1350,27 @@ private:
*/
bool gc_should_run() const {
if (disable_trim) return false;
ceph_assert(init_complete);
ceph_assert(is_ready());
return gc_should_reclaim_space()
|| gc_should_trim_dirty()
|| gc_should_trim_alloc();
}
void adjust_segment_util(double old_usage, double new_usage) {
auto old_index = get_bucket_index(old_usage);
auto new_index = get_bucket_index(new_usage);
assert(stats.segment_util.buckets[old_index].count > 0);
stats.segment_util.buckets[old_index].count--;
stats.segment_util.buckets[new_index].count++;
}
void init_mark_segment_closed(
segment_id_t segment,
segment_seq_t seq,
segment_type_t s_type,
data_category_t category,
reclaim_gen_t generation) {
ceph_assert(!init_complete);
ceph_assert(state == cleaner_state_t::MOUNT);
auto old_usage = calc_utilization(segment);
segments.init_closed(segment, seq, s_type, category, generation);
auto new_usage = calc_utilization(segment);

View File

@ -1415,8 +1415,7 @@ void Cache::backref_batch_update(
void Cache::complete_commit(
Transaction &t,
paddr_t final_block_start,
journal_seq_t start_seq,
AsyncCleaner *cleaner)
journal_seq_t start_seq)
{
LOG_PREFIX(Cache::complete_commit);
SUBTRACET(seastore_t, "final_block_start={}, start_seq={}",
@ -1438,11 +1437,7 @@ void Cache::complete_commit(
t, is_inline, *i);
const auto t_src = t.get_src();
add_extent(i, &t_src);
if (cleaner) {
cleaner->mark_space_used(
i->get_paddr(),
i->get_length());
}
epm.mark_space_used(i->get_paddr(), i->get_length());
if (is_backref_mapped_extent_node(i)) {
DEBUGT("backref_list new {} len {}",
t,
@ -1487,18 +1482,12 @@ void Cache::complete_commit(
}
}
if (cleaner) {
for (auto &i: t.retired_set) {
cleaner->mark_space_free(
i->get_paddr(),
i->get_length());
}
for (auto &i: t.existing_block_list) {
if (i->is_valid()) {
cleaner->mark_space_used(
i->get_paddr(),
i->get_length());
}
for (auto &i: t.retired_set) {
epm.mark_space_free(i->get_paddr(), i->get_length());
}
for (auto &i: t.existing_block_list) {
if (i->is_valid()) {
epm.mark_space_used(i->get_paddr(), i->get_length());
}
}

View File

@ -26,7 +26,6 @@ class BtreeBackrefManager;
namespace crimson::os::seastore {
class BackrefManager;
class AsyncCleaner;
class SegmentProvider;
struct backref_entry_t {
@ -712,8 +711,7 @@ public:
void complete_commit(
Transaction &t, ///< [in, out] current transaction
paddr_t final_block_start, ///< [in] offset of initial block
journal_seq_t seq, ///< [in] journal commit seq
AsyncCleaner *cleaner=nullptr ///< [out] optional segment stat listener
journal_seq_t seq ///< [in] journal commit seq
);
/**

View File

@ -173,4 +173,47 @@ SegmentedOolWriter::alloc_write_ool_extents(
});
}
void ExtentPlacementManager::set_async_cleaner(AsyncCleanerRef &&_cleaner)
{
cleaner = std::move(_cleaner);
writer_refs.clear();
ceph_assert(RECLAIM_GENERATIONS > 0);
data_writers_by_gen.resize(RECLAIM_GENERATIONS, {});
for (reclaim_gen_t gen = 0; gen < RECLAIM_GENERATIONS; ++gen) {
writer_refs.emplace_back(std::make_unique<SegmentedOolWriter>(
data_category_t::DATA, gen, *cleaner,
cleaner->get_ool_segment_seq_allocator()));
data_writers_by_gen[gen] = writer_refs.back().get();
}
md_writers_by_gen.resize(RECLAIM_GENERATIONS - 1, {});
for (reclaim_gen_t gen = 1; gen < RECLAIM_GENERATIONS; ++gen) {
writer_refs.emplace_back(std::make_unique<SegmentedOolWriter>(
data_category_t::METADATA, gen, *cleaner,
cleaner->get_ool_segment_seq_allocator()));
md_writers_by_gen[gen - 1] = writer_refs.back().get();
}
for (auto *device : cleaner->get_segment_manager_group()
->get_segment_managers()) {
add_device(device);
}
}
void ExtentPlacementManager::set_primary_device(Device *device)
{
ceph_assert(primary_device == nullptr);
primary_device = device;
if (device->get_device_type() == device_type_t::SEGMENTED) {
prefer_ool = false;
ceph_assert(devices_by_id[device->get_device_id()] == device);
} else {
// RBM device is not in the cleaner.
ceph_assert(device->get_device_type() == device_type_t::RANDOM_BLOCK);
prefer_ool = true;
add_device(primary_device);
}
}
}

View File

@ -5,6 +5,7 @@
#include "seastar/core/gate.hh"
#include "crimson/os/seastore/async_cleaner.h"
#include "crimson/os/seastore/cached_extent.h"
#include "crimson/os/seastore/journal/segment_allocator.h"
#include "crimson/os/seastore/logging.h"
@ -38,8 +39,6 @@ public:
};
using ExtentOolWriterRef = std::unique_ptr<ExtentOolWriter>;
class SegmentProvider;
/**
* SegmentedOolWriter
*
@ -90,41 +89,13 @@ public:
devices_by_id.resize(DEVICE_ID_MAX, nullptr);
}
void init_ool_writers(SegmentProvider &sp, SegmentSeqAllocator &ssa) {
// Currently only one SegmentProvider is supported
writer_refs.clear();
// TODO: device tiering
void set_async_cleaner(AsyncCleanerRef &&_cleaner);
ceph_assert(RECLAIM_GENERATIONS > 0);
data_writers_by_gen.resize(RECLAIM_GENERATIONS, {});
for (reclaim_gen_t gen = 0; gen < RECLAIM_GENERATIONS; ++gen) {
writer_refs.emplace_back(std::make_unique<SegmentedOolWriter>(
data_category_t::DATA, gen, sp, ssa));
data_writers_by_gen[gen] = writer_refs.back().get();
}
void set_primary_device(Device *device);
md_writers_by_gen.resize(RECLAIM_GENERATIONS - 1, {});
for (reclaim_gen_t gen = 1; gen < RECLAIM_GENERATIONS; ++gen) {
writer_refs.emplace_back(std::make_unique<SegmentedOolWriter>(
data_category_t::METADATA, gen, sp, ssa));
md_writers_by_gen[gen - 1] = writer_refs.back().get();
}
}
void add_device(Device* device, bool is_primary) {
auto device_id = device->get_device_id();
ceph_assert(devices_by_id[device_id] == nullptr);
devices_by_id[device_id] = device;
++num_devices;
if (is_primary) {
ceph_assert(primary_device == nullptr);
primary_device = device;
if (device->get_device_type() == device_type_t::SEGMENTED) {
prefer_ool = false;
} else {
ceph_assert(device->get_device_type() == device_type_t::RANDOM_BLOCK);
prefer_ool = true;
}
}
void set_extent_callback(AsyncCleaner::ExtentCallbackInterface *cb) {
cleaner->set_extent_callback(cb);
}
seastore_off_t get_block_size() const {
@ -138,8 +109,17 @@ public:
return *primary_device;
}
store_statfs_t get_stat() const {
return cleaner->stat();
}
using mount_ret = AsyncCleaner::mount_ret;
mount_ret mount() {
return cleaner->mount();
}
using open_ertr = ExtentOolWriter::open_ertr;
open_ertr::future<> open() {
open_ertr::future<> open_for_write() {
LOG_PREFIX(ExtentPlacementManager::open);
SUBINFO(seastore_journal, "started with {} devices", num_devices);
ceph_assert(primary_device != nullptr);
@ -152,6 +132,14 @@ public:
});
}
void start_scan_space() {
return cleaner->start_scan_space();
}
void start_gc() {
return cleaner->start_gc();
}
struct alloc_result_t {
paddr_t paddr;
bufferptr bp;
@ -221,6 +209,7 @@ public:
LOG_PREFIX(ExtentPlacementManager::delayed_alloc_or_ool_write);
SUBDEBUGT(seastore_journal, "start with {} delayed extents",
t, delayed_extents.size());
assert(writer_refs.size());
return seastar::do_with(
std::map<ExtentOolWriter*, std::list<LogicalCachedExtentRef>>(),
[this, &t, &delayed_extents](auto& alloc_map) {
@ -240,6 +229,10 @@ public:
});
}
seastar::future<> stop_gc() {
return cleaner->stop();
}
using close_ertr = ExtentOolWriter::close_ertr;
close_ertr::future<> close() {
LOG_PREFIX(ExtentPlacementManager::close);
@ -263,7 +256,52 @@ public:
return devices_by_id[addr.get_device_id()]->read(addr, len, out);
}
void mark_space_used(paddr_t addr, extent_len_t len) {
// TODO: improve tests to drop the cleaner check
if (cleaner) {
cleaner->mark_space_used(addr, len);
}
}
void mark_space_free(paddr_t addr, extent_len_t len) {
// TODO: improve tests to drop the cleaner check
if (cleaner) {
cleaner->mark_space_free(addr, len);
}
}
seastar::future<> reserve_projected_usage(std::size_t projected_usage) {
return cleaner->reserve_projected_usage(projected_usage);
}
void release_projected_usage(std::size_t projected_usage) {
return cleaner->release_projected_usage(projected_usage);
}
// Testing interfaces
void test_init_no_background(Device *test_device) {
assert(test_device->get_device_type() == device_type_t::SEGMENTED);
add_device(test_device);
set_primary_device(test_device);
}
bool check_usage() {
return cleaner->check_usage();
}
seastar::future<> run_background_work_until_halt() {
return cleaner->run_until_halt();
}
private:
void add_device(Device *device) {
auto device_id = device->get_device_id();
ceph_assert(devices_by_id[device_id] == nullptr);
devices_by_id[device_id] = device;
++num_devices;
}
ExtentOolWriter* get_writer(placement_hint_t hint,
data_category_t category,
reclaim_gen_t gen) {
@ -288,7 +326,11 @@ private:
std::vector<Device*> devices_by_id;
Device* primary_device = nullptr;
std::size_t num_devices = 0;
// TODO: device tiering
AsyncCleanerRef cleaner;
};
using ExtentPlacementManagerRef = std::unique_ptr<ExtentPlacementManager>;
}

View File

@ -331,6 +331,7 @@ SeaStore::mkfs_ertr::future<> SeaStore::mkfs(uuid_d new_osd_fsid)
init_managers();
return transaction_manager->mkfs();
}).safe_then([this] {
init_managers();
return transaction_manager->mount();
}).safe_then([this] {
return repeat_eagain([this] {
@ -1860,9 +1861,10 @@ uuid_d SeaStore::get_fsid() const
void SeaStore::init_managers()
{
ceph_assert(!transaction_manager);
ceph_assert(!collection_manager);
ceph_assert(!onode_manager);
transaction_manager.reset();
collection_manager.reset();
onode_manager.reset();
std::vector<Device*> sec_devices;
for (auto &dev : secondaries) {
sec_devices.emplace_back(dev.get());

View File

@ -24,20 +24,18 @@ SET_SUBSYS(seastore_tm);
namespace crimson::os::seastore {
TransactionManager::TransactionManager(
AsyncCleanerRef _async_cleaner,
JournalRef _journal,
CacheRef _cache,
LBAManagerRef _lba_manager,
ExtentPlacementManagerRef &&epm,
BackrefManagerRef&& backref_manager)
: async_cleaner(std::move(_async_cleaner)),
cache(std::move(_cache)),
ExtentPlacementManagerRef &&_epm,
BackrefManagerRef&& _backref_manager)
: cache(std::move(_cache)),
lba_manager(std::move(_lba_manager)),
journal(std::move(_journal)),
epm(std::move(epm)),
backref_manager(std::move(backref_manager))
epm(std::move(_epm)),
backref_manager(std::move(_backref_manager))
{
async_cleaner->set_extent_callback(this);
epm->set_extent_callback(this);
journal->set_write_pipeline(&write_pipeline);
}
@ -45,13 +43,13 @@ TransactionManager::mkfs_ertr::future<> TransactionManager::mkfs()
{
LOG_PREFIX(TransactionManager::mkfs);
INFO("enter");
return async_cleaner->mount(
return epm->mount(
).safe_then([this] {
return journal->open_for_mkfs();
}).safe_then([this](auto start_seq) {
journal->get_trimmer().update_journal_tails(start_seq, start_seq);
journal->get_trimmer().set_journal_head(start_seq);
return epm->open();
return epm->open_for_write();
}).safe_then([this, FNAME]() {
return with_transaction_intr(
Transaction::src_t::MUTATE,
@ -87,7 +85,7 @@ TransactionManager::mount_ertr::future<> TransactionManager::mount()
LOG_PREFIX(TransactionManager::mount);
INFO("enter");
cache->init();
return async_cleaner->mount(
return epm->mount(
).safe_then([this] {
return journal->replay(
[this](
@ -121,8 +119,7 @@ TransactionManager::mount_ertr::future<> TransactionManager::mount()
return lba_manager->init_cached_extent(t, e);
}
}).si_then([this, &t] {
assert(async_cleaner->debug_check_space(
*async_cleaner->get_empty_space_tracker()));
epm->start_scan_space();
return backref_manager->scan_mapped_space(
t,
[this](
@ -134,21 +131,21 @@ TransactionManager::mount_ertr::future<> TransactionManager::mount()
assert(laddr == L_ADDR_NULL);
backref_manager->cache_new_backref_extent(paddr, type);
cache->update_tree_extents_num(type, 1);
async_cleaner->mark_space_used(paddr, len, true);
epm->mark_space_used(paddr, len);
} else if (laddr == L_ADDR_NULL) {
cache->update_tree_extents_num(type, -1);
async_cleaner->mark_space_free(paddr, len, true);
epm->mark_space_free(paddr, len);
} else {
cache->update_tree_extents_num(type, 1);
async_cleaner->mark_space_used(paddr, len, true);
epm->mark_space_used(paddr, len);
}
});
});
});
}).safe_then([this] {
return epm->open();
return epm->open_for_write();
}).safe_then([FNAME, this] {
async_cleaner->complete_init();
epm->start_gc();
INFO("completed");
}).handle_error(
mount_ertr::pass_further{},
@ -162,7 +159,7 @@ TransactionManager::mount_ertr::future<> TransactionManager::mount()
TransactionManager::close_ertr::future<> TransactionManager::close() {
LOG_PREFIX(TransactionManager::close);
INFO("enter");
return async_cleaner->stop(
return epm->stop_gc(
).then([this] {
return cache->close();
}).safe_then([this] {
@ -283,12 +280,12 @@ TransactionManager::submit_transaction(
size_t projected_usage = t.get_allocation_size();
SUBTRACET(seastore_t, "waiting for projected_usage: {}", t, projected_usage);
return trans_intr::make_interruptible(
async_cleaner->reserve_projected_usage(projected_usage)
epm->reserve_projected_usage(projected_usage)
).then_interruptible([this, &t] {
return submit_transaction_direct(t);
}).finally([this, FNAME, projected_usage, &t] {
SUBTRACET(seastore_t, "releasing projected_usage: {}", t, projected_usage);
async_cleaner->release_projected_usage(projected_usage);
epm->release_projected_usage(projected_usage);
});
});
}
@ -351,8 +348,7 @@ TransactionManager::submit_transaction_direct(
cache->complete_commit(
tref,
submit_result.record_block_base,
start_seq,
async_cleaner.get());
start_seq);
std::vector<CachedExtentRef> lba_to_clear;
std::vector<CachedExtentRef> backref_to_clear;
@ -634,12 +630,10 @@ TransactionManagerRef make_transaction_manager(
auto sms = std::make_unique<SegmentManagerGroup>();
auto backref_manager = create_backref_manager(*cache);
epm->add_device(primary_device, true);
if (primary_device->get_device_type() == device_type_t::SEGMENTED) {
sms->add_segment_manager(static_cast<SegmentManager*>(primary_device));
}
for (auto &p_dev : secondary_devices) {
epm->add_device(p_dev, false);
ceph_assert(p_dev->get_device_type() == device_type_t::SEGMENTED);
sms->add_segment_manager(static_cast<SegmentManager*>(p_dev));
}
@ -677,12 +671,11 @@ TransactionManagerRef make_transaction_manager(
ERROR("disabling journal trimming since support for CircularBoundedJournal "
"hasn't been added yet");
}
epm->init_ool_writers(
*async_cleaner,
async_cleaner->get_ool_segment_seq_allocator());
epm->set_async_cleaner(std::move(async_cleaner));
epm->set_primary_device(primary_device);
return std::make_unique<TransactionManager>(
std::move(async_cleaner),
std::move(journal),
std::move(cache),
std::move(lba_manager),

View File

@ -21,7 +21,6 @@
#include "crimson/osd/exceptions.h"
#include "crimson/os/seastore/logging.h"
#include "crimson/os/seastore/async_cleaner.h"
#include "crimson/os/seastore/seastore_types.h"
#include "crimson/os/seastore/cache.h"
#include "crimson/os/seastore/lba_manager.h"
@ -65,7 +64,6 @@ public:
using base_iertr = Cache::base_iertr;
TransactionManager(
AsyncCleanerRef async_cleaner,
JournalRef journal,
CacheRef cache,
LBAManagerRef lba_manager,
@ -209,8 +207,7 @@ public:
*
* Read extent of type T at offset~length
*/
using read_extent_iertr = get_pin_iertr::extend_ertr<
SegmentManager::read_ertr>;
using read_extent_iertr = get_pin_iertr;
template <typename T>
using read_extent_ret = read_extent_iertr::future<
TCachedExtentRef<T>>;
@ -621,7 +618,7 @@ public:
}
store_statfs_t store_stat() const {
return async_cleaner->stat();
return epm->get_stat();
}
~TransactionManager();
@ -629,7 +626,6 @@ public:
private:
friend class Transaction;
AsyncCleanerRef async_cleaner;
CacheRef cache;
LBAManagerRef lba_manager;
JournalRef journal;
@ -644,8 +640,8 @@ private:
public:
// Testing interfaces
auto get_async_cleaner() {
return async_cleaner.get();
auto get_epm() {
return epm.get();
}
auto get_lba_manager() {

View File

@ -84,26 +84,24 @@ struct fltree_onode_manager_test_t
virtual FuturizedStore::mkfs_ertr::future<> _mkfs() final {
return TMTestState::_mkfs(
).safe_then([this] {
return tm->mount(
).safe_then([this] {
return repeat_eagain([this] {
return seastar::do_with(
create_mutate_transaction(),
[this](auto &ref_t) {
return with_trans_intr(*ref_t, [&](auto &t) {
return manager->mkfs(t
).si_then([this, &t] {
return submit_transaction_fut2(t);
});
});
});
});
}).safe_then([this] {
return tm->close();
}).handle_error(
crimson::ct_error::assert_all{"Invalid error in _mkfs"}
);
});
return restart_fut();
}).safe_then([this] {
return repeat_eagain([this] {
return seastar::do_with(
create_mutate_transaction(),
[this](auto &ref_t)
{
return with_trans_intr(*ref_t, [&](auto &t) {
return manager->mkfs(t
).si_then([this, &t] {
return submit_transaction_fut2(t);
});
});
});
});
}).handle_error(
crimson::ct_error::assert_all{"Invalid error in _mkfs"}
);
}
template <typename F>
@ -111,7 +109,6 @@ struct fltree_onode_manager_test_t
auto t = create_mutate_transaction();
std::invoke(f, *t);
submit_transaction(std::move(t));
async_cleaner->run_until_halt().get0();
}
template <typename F>

View File

@ -1591,7 +1591,6 @@ TEST_F(d_seastore_tm_test_t, 6_random_tree_insert_erase)
auto t = create_mutate_transaction();
INTR(tree->bootstrap, *t).unsafe_get();
submit_transaction(std::move(t));
async_cleaner->run_until_halt().get0();
}
// test insert
@ -1599,7 +1598,6 @@ TEST_F(d_seastore_tm_test_t, 6_random_tree_insert_erase)
auto t = create_mutate_transaction();
INTR(tree->insert, *t).unsafe_get();
submit_transaction(std::move(t));
async_cleaner->run_until_halt().get0();
}
{
auto t = create_read_transaction();
@ -1621,7 +1619,6 @@ TEST_F(d_seastore_tm_test_t, 6_random_tree_insert_erase)
auto size = kvs.size() / 4 * 3;
INTR_R(tree->erase, *t, size).unsafe_get();
submit_transaction(std::move(t));
async_cleaner->run_until_halt().get0();
}
{
auto t = create_read_transaction();
@ -1642,7 +1639,6 @@ TEST_F(d_seastore_tm_test_t, 6_random_tree_insert_erase)
auto size = kvs.size();
INTR_R(tree->erase, *t, size).unsafe_get();
submit_transaction(std::move(t));
async_cleaner->run_until_halt().get0();
}
{
auto t = create_read_transaction();
@ -1697,7 +1693,7 @@ TEST_F(d_seastore_tm_test_t, 7_tree_insert_erase_eagain)
});
});
}).unsafe_get0();
async_cleaner->run_until_halt().get0();
epm->run_background_work_until_halt().get0();
// insert
logger().warn("start inserting {} kvs ...", kvs.size());
@ -1717,7 +1713,7 @@ TEST_F(d_seastore_tm_test_t, 7_tree_insert_erase_eagain)
});
});
}).unsafe_get0();
async_cleaner->run_until_halt().get0();
epm->run_background_work_until_halt().get0();
++iter;
}
}
@ -1763,7 +1759,7 @@ TEST_F(d_seastore_tm_test_t, 7_tree_insert_erase_eagain)
});
});
}).unsafe_get0();
async_cleaner->run_until_halt().get0();
epm->run_background_work_until_halt().get0();
++iter;
}
kvs.erase_from_random(kvs.random_begin(), kvs.random_end());

View File

@ -124,14 +124,14 @@ struct btree_test_base :
block_size = segment_manager->get_block_size();
next = segment_id_t{segment_manager->get_device_id(), 0};
sms->add_segment_manager(segment_manager.get());
epm->add_device(segment_manager.get(), true);
epm->test_init_no_background(segment_manager.get());
journal->set_write_pipeline(&pipeline);
return journal->open_for_mkfs().discard_result();
}).safe_then([this] {
dummy_tail = journal_seq_t{0,
paddr_t::make_seg_paddr(segment_id_t(segment_manager->get_device_id(), 0), 0)};
return epm->open();
return epm->open_for_write();
}).safe_then([this] {
return seastar::do_with(
cache->create_transaction(

View File

@ -91,7 +91,7 @@ struct cache_test_t : public seastar_test_suite_t {
epm.reset(new ExtentPlacementManager());
cache.reset(new Cache(*epm));
current = paddr_t::make_seg_paddr(segment_id_t(segment_manager->get_device_id(), 0), 0);
epm->add_device(segment_manager.get(), true);
epm->test_init_no_background(segment_manager.get());
return seastar::do_with(
get_transaction(),
[this](auto &ref_t) {

View File

@ -8,7 +8,6 @@
#include "test/crimson/gtest_seastar.h"
#include "test/crimson/seastore/transaction_manager_test_state.h"
#include "crimson/os/seastore/async_cleaner.h"
#include "crimson/os/seastore/cache.h"
#include "crimson/os/seastore/transaction_manager.h"
#include "crimson/os/seastore/segment_manager/ephemeral.h"
@ -400,40 +399,7 @@ struct transaction_manager_test_t :
}
bool check_usage() {
auto t = create_weak_test_transaction();
SpaceTrackerIRef tracker(async_cleaner->get_empty_space_tracker());
with_trans_intr(
*t.t,
[this, &tracker](auto &t) {
return backref_manager->scan_mapped_space(
t,
[&tracker](
paddr_t paddr,
extent_len_t len,
extent_types_t type,
laddr_t laddr) {
if (paddr.get_addr_type() == paddr_types_t::SEGMENT) {
if (is_backref_node(type)) {
assert(laddr == L_ADDR_NULL);
tracker->allocate(
paddr.as_seg_paddr().get_segment_id(),
paddr.as_seg_paddr().get_segment_off(),
len);
} else if (laddr == L_ADDR_NULL) {
tracker->release(
paddr.as_seg_paddr().get_segment_id(),
paddr.as_seg_paddr().get_segment_off(),
len);
} else {
tracker->allocate(
paddr.as_seg_paddr().get_segment_id(),
paddr.as_seg_paddr().get_segment_off(),
len);
}
}
});
}).unsafe_get0();
return async_cleaner->debug_check_space(*tracker);
return epm->check_usage();
}
void replay() {
@ -609,7 +575,8 @@ struct transaction_manager_test_t :
"try_submit_transaction hit invalid error"
}
).then([this](auto ret) {
return async_cleaner->run_until_halt().then([ret] { return ret; });
return epm->run_background_work_until_halt(
).then([ret] { return ret; });
}).get0();
if (success) {
@ -659,7 +626,7 @@ struct transaction_manager_test_t :
});
});
}).safe_then([this]() {
return async_cleaner->run_until_halt();
return epm->run_background_work_until_halt();
}).handle_error(
crimson::ct_error::assert_all{
"Invalid error in SeaStore::list_collections"

View File

@ -6,8 +6,8 @@
#include <random>
#include <boost/iterator/counting_iterator.hpp>
#include "crimson/os/seastore/async_cleaner.h"
#include "crimson/os/seastore/cache.h"
#include "crimson/os/seastore/extent_placement_manager.h"
#include "crimson/os/seastore/logging.h"
#include "crimson/os/seastore/transaction_manager.h"
#include "crimson/os/seastore/segment_manager/ephemeral.h"
@ -39,32 +39,38 @@ protected:
}
virtual void _init() = 0;
void init() {
_init();
}
virtual void _destroy() = 0;
void destroy() {
_destroy();
virtual seastar::future<> _teardown() = 0;
seastar::future<> teardown() {
return _teardown().then([this] {
_destroy();
});
}
virtual seastar::future<> _teardown() = 0;
virtual FuturizedStore::mkfs_ertr::future<> _mkfs() = 0;
virtual FuturizedStore::mount_ertr::future<> _mount() = 0;
void restart() {
LOG_PREFIX(EphemeralTestState::restart);
seastar::future<> restart_fut() {
LOG_PREFIX(EphemeralTestState::restart_fut);
SUBINFO(test, "begin ...");
_teardown().get0();
destroy();
segment_manager->remount();
for (auto &sec_sm : secondary_segment_managers) {
sec_sm->remount();
}
init();
_mount().handle_error(crimson::ct_error::assert_all{}).get0();
SUBINFO(test, "finish");
return teardown().then([this] {
segment_manager->remount();
for (auto &sec_sm : secondary_segment_managers) {
sec_sm->remount();
}
_init();
return _mount().handle_error(crimson::ct_error::assert_all{});
}).then([FNAME] {
SUBINFO(test, "finish");
});
}
void restart() {
restart_fut().get0();
}
seastar::future<> segment_setup()
{
LOG_PREFIX(EphemeralTestState::segment_setup);
@ -96,28 +102,18 @@ protected:
segment_manager::get_ephemeral_device_config(cnt, get_num_devices()));
});
});
}).safe_then([this, FNAME] {
init();
return _mkfs(
).safe_then([this] {
return _teardown();
}).safe_then([this] {
destroy();
segment_manager->remount();
for (auto &sec_sm : secondary_segment_managers) {
sec_sm->remount();
}
init();
return _mount();
}).handle_error(
crimson::ct_error::assert_all{}
).then([FNAME] {
SUBINFO(test, "finish");
});
}).safe_then([this] {
_init();
return _mkfs();
}).safe_then([this] {
return restart_fut();
}).handle_error(
crimson::ct_error::assert_all{}
);
).then([FNAME] {
SUBINFO(test, "finish");
});
}
seastar::future<> randomblock_setup()
{
auto config =
@ -148,7 +144,7 @@ protected:
seastar::future<> tm_teardown() {
LOG_PREFIX(EphemeralTestState::tm_teardown);
SUBINFO(test, "begin");
return _teardown().then([this, FNAME] {
return teardown().then([this, FNAME] {
segment_manager.reset();
for (auto &sec_sm : secondary_segment_managers) {
sec_sm.reset();
@ -163,9 +159,8 @@ class TMTestState : public EphemeralTestState {
protected:
TransactionManagerRef tm;
LBAManager *lba_manager;
BackrefManager *backref_manager;
Cache* cache;
AsyncCleaner *async_cleaner;
ExtentPlacementManager *epm;
uint64_t seq = 0;
TMTestState() : EphemeralTestState(1) {}
@ -185,15 +180,15 @@ protected:
} else {
tm = make_transaction_manager(segment_manager.get(), sec_devices, true);
}
async_cleaner = tm->get_async_cleaner();
epm = tm->get_epm();
lba_manager = tm->get_lba_manager();
backref_manager = tm->get_backref_manager();
cache = tm->get_cache();
}
virtual void _destroy() override {
async_cleaner = nullptr;
epm = nullptr;
lba_manager = nullptr;
cache = nullptr;
tm.reset();
}
@ -211,9 +206,9 @@ protected:
).handle_error(
crimson::ct_error::assert_all{"Error in mount"}
).then([this] {
return async_cleaner->stop();
return epm->stop_gc();
}).then([this] {
return async_cleaner->run_until_halt();
return epm->run_background_work_until_halt();
});
}
@ -278,7 +273,7 @@ protected:
void submit_transaction(TransactionRef t) {
submit_transaction_fut(*t).unsafe_get0();
async_cleaner->run_until_halt().get0();
epm->run_background_work_until_halt().get0();
}
};
@ -389,9 +384,7 @@ protected:
}
virtual seastar::future<> _teardown() final {
return seastore->umount().then([this] {
seastore.reset();
});
return seastore->umount();
}
virtual FuturizedStore::mount_ertr::future<> _mount() final {