1
0
mirror of https://github.com/ceph/ceph synced 2025-04-01 14:51:13 +00:00

Merge pull request from xxhdx1985126/wip-extent-placement-manager-2

crimson/os/seastore: add extent placement manager

Reviewed-by: Chunmei Liu <chunmei.liu@intel.com>
Reviewed-by: Yingxin Cheng <yingxin.cheng@intel.com>
Reviewed-by: Samuel Just <sjust@redhat.com>
This commit is contained in:
Samuel Just 2021-09-08 19:08:26 -07:00 committed by GitHub
commit 83a991fffa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 2100 additions and 925 deletions

View File

@ -40,3 +40,8 @@ options:
default: true
see_also:
- seastore_device_size
- name: seastore_init_rewrite_segments_num_per_device
type: uint
level: dev
desc: Initial number of segments for rewriting extents per device
default: 6

View File

@ -0,0 +1,43 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
// vim: ts=8 sw=2 smarttab
#pragma once
#include <seastar/core/future.hh>
#include <seastar/core/condition-variable.hh>
#include <seastar/core/loop.hh>
#include "crimson/common/interruptible_future.h"
namespace crimson {
class condition_variable : public seastar::condition_variable {
public:
template <typename Pred, typename Func>
auto wait(
Pred&& pred,
Func&& action) noexcept {
using func_result_t = std::invoke_result_t<Func>;
using intr_errorator_t = typename func_result_t::interrupt_errorator_type;
using intr_cond_t = typename func_result_t::interrupt_cond_type;
using interruptor = crimson::interruptible::interruptor<intr_cond_t>;
return interruptor::repeat(
[this, pred=std::forward<Pred>(pred),
action=std::forward<Func>(action)]()
-> typename intr_errorator_t::template future<seastar::stop_iteration> {
if (!pred()) {
return seastar::condition_variable::wait().then([] {
return seastar::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::no);
});
} else {
return action().si_then([] {
return seastar::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::yes);
});
}
});
}
};
} // namespace crimson

View File

@ -782,6 +782,11 @@ public:
}
};
template <typename T>
static future<T> make_errorator_future(seastar::future<T>&& fut) {
return std::move(fut);
}
// assert_all{ "TODO" };
class assert_all {
const char* const msg = nullptr;

View File

@ -612,6 +612,7 @@ private:
template <typename InterruptCond, typename Errorator>
struct interruptible_errorator {
using base_ertr = Errorator;
using intr_cond_t = InterruptCond;
template <typename ValueT = void>
using future = interruptible_future_detail<InterruptCond,
@ -659,6 +660,9 @@ class [[nodiscard]] interruptible_future_detail<
public:
using core_type = ErroratedFuture<crimson::errorated_future_marker<T>>;
using errorator_type = typename core_type::errorator_type;
using interrupt_errorator_type =
interruptible_errorator<InterruptCond, errorator_type>;
using interrupt_cond_type = InterruptCond;
template <typename U>
using interrupt_futurize_t =

View File

@ -7,6 +7,7 @@ add_library(crimson-seastore STATIC
transaction.cc
journal.cc
cache.cc
scanner.cc
lba_manager.cc
segment_cleaner.cc
lba_manager/btree/btree_lba_manager.cc
@ -32,6 +33,7 @@ add_library(crimson-seastore STATIC
collection_manager.cc
collection_manager/flat_collection_manager.cc
collection_manager/collection_flat_node.cc
extent_placement_manager.cc
object_data_handler.cc
seastore.cc
random_block_manager/nvme_manager.cc

View File

@ -593,11 +593,13 @@ void Cache::replace_extent(CachedExtentRef next, CachedExtentRef prev)
extents.replace(*next, *prev);
if (prev->get_type() == extent_types_t::ROOT) {
assert(prev->primary_ref_list_hook.is_linked());
assert(prev->is_dirty());
stats.dirty_bytes -= prev->get_length();
dirty.erase(dirty.s_iterator_to(*prev));
intrusive_ptr_release(&*prev);
assert(prev->is_clean()
|| prev->primary_ref_list_hook.is_linked());
if (prev->is_dirty()) {
stats.dirty_bytes -= prev->get_length();
dirty.erase(dirty.s_iterator_to(*prev));
intrusive_ptr_release(&*prev);
}
add_to_dirty(next);
} else if (prev->is_dirty()) {
assert(prev->get_dirty_from() == next->get_dirty_from());
@ -660,6 +662,7 @@ void Cache::invalidate(Transaction& t, CachedExtent& conflicting_extent)
if (!i->is_valid()) {
continue;
}
DEBUGT("was mutating extent: {}", t, *i);
efforts.mutate.increment(i->get_length());
efforts.mutate_delta_bytes += i->get_delta().length();
}
@ -710,7 +713,8 @@ void Cache::on_transaction_destruct(Transaction& t)
CachedExtentRef Cache::alloc_new_extent_by_type(
Transaction &t, ///< [in, out] current transaction
extent_types_t type, ///< [in] type tag
segment_off_t length ///< [in] length
segment_off_t length, ///< [in] length
bool delay ///< [in] whether to delay paddr alloc
)
{
switch (type) {
@ -718,26 +722,26 @@ CachedExtentRef Cache::alloc_new_extent_by_type(
assert(0 == "ROOT is never directly alloc'd");
return CachedExtentRef();
case extent_types_t::LADDR_INTERNAL:
return alloc_new_extent<lba_manager::btree::LBAInternalNode>(t, length);
return alloc_new_extent<lba_manager::btree::LBAInternalNode>(t, length, delay);
case extent_types_t::LADDR_LEAF:
return alloc_new_extent<lba_manager::btree::LBALeafNode>(t, length);
return alloc_new_extent<lba_manager::btree::LBALeafNode>(t, length, delay);
case extent_types_t::ONODE_BLOCK_STAGED:
return alloc_new_extent<onode::SeastoreNodeExtent>(t, length);
return alloc_new_extent<onode::SeastoreNodeExtent>(t, length, delay);
case extent_types_t::OMAP_INNER:
return alloc_new_extent<omap_manager::OMapInnerNode>(t, length);
return alloc_new_extent<omap_manager::OMapInnerNode>(t, length, delay);
case extent_types_t::OMAP_LEAF:
return alloc_new_extent<omap_manager::OMapLeafNode>(t, length);
return alloc_new_extent<omap_manager::OMapLeafNode>(t, length, delay);
case extent_types_t::COLL_BLOCK:
return alloc_new_extent<collection_manager::CollectionNode>(t, length);
return alloc_new_extent<collection_manager::CollectionNode>(t, length, delay);
case extent_types_t::OBJECT_DATA_BLOCK:
return alloc_new_extent<ObjectDataBlock>(t, length);
return alloc_new_extent<ObjectDataBlock>(t, length, delay);
case extent_types_t::RETIRED_PLACEHOLDER:
ceph_assert(0 == "impossible");
return CachedExtentRef();
case extent_types_t::TEST_BLOCK:
return alloc_new_extent<TestBlock>(t, length);
return alloc_new_extent<TestBlock>(t, length, delay);
case extent_types_t::TEST_BLOCK_PHYSICAL:
return alloc_new_extent<TestBlockPhysical>(t, length);
return alloc_new_extent<TestBlockPhysical>(t, length, delay);
case extent_types_t::NONE: {
ceph_assert(0 == "NONE is an invalid extent type");
return CachedExtentRef();
@ -801,7 +805,10 @@ record_t Cache::prepare_record(Transaction &t)
// Should be valid due to interruptible future
for (auto &i: t.read_set) {
assert(i.ref->is_valid());
if (!i.ref->is_valid()) {
DEBUGT("read_set invalid extent: {}, aborting", t, *i.ref);
ceph_abort("no invalid extent allowed in transactions' read_set");
}
get_by_ext(efforts.read_by_ext,
i.ref->get_type()).increment(i.ref->get_length());
}
@ -879,6 +886,9 @@ record_t Cache::prepare_record(Transaction &t)
record.extents.reserve(t.fresh_block_list.size());
for (auto &i: t.fresh_block_list) {
DEBUGT("fresh block {}", t, *i);
if (!i->is_inline()) {
continue;
}
get_by_ext(efforts.fresh_by_ext,
i->get_type()).increment(i->get_length());
bufferlist bl;
@ -911,7 +921,9 @@ void Cache::complete_commit(
DEBUGT("enter", t);
for (auto &i: t.fresh_block_list) {
i->set_paddr(final_block_start.add_relative(i->get_paddr()));
if (i->is_inline()) {
i->set_paddr(final_block_start.add_relative(i->get_paddr()));
}
i->last_committed_crc = i->get_crc32c();
i->on_initial_write();
@ -974,7 +986,7 @@ void Cache::init() {
root = nullptr;
}
root = new RootBlock();
root->state = CachedExtent::extent_state_t::DIRTY;
root->state = CachedExtent::extent_state_t::CLEAN;
add_extent(root);
}
@ -1016,6 +1028,7 @@ Cache::replay_delta(
remove_extent(root);
root->apply_delta_and_adjust_crc(record_base, delta.bl);
root->dirty_from_or_retired_at = journal_seq;
root->state = CachedExtent::extent_state_t::DIRTY;
add_extent(root);
return replay_delta_ertr::now();
} else {

View File

@ -370,20 +370,33 @@ public:
/**
* alloc_new_extent
*
* Allocates a fresh extent. addr will be relative until commit.
* Allocates a fresh extent. if delayed is true, addr will be alloc'd later
*/
template <typename T>
TCachedExtentRef<T> alloc_new_extent(
Transaction &t, ///< [in, out] current transaction
segment_off_t length ///< [in] length
Transaction &t, ///< [in, out] current transaction
segment_off_t length, ///< [in] length
bool delayed = false ///< [in] whether the paddr allocation of extent is delayed
) {
auto ret = CachedExtent::make_cached_extent_ref<T>(
alloc_cache_buf(length));
t.add_fresh_extent(ret);
t.add_fresh_extent(ret, delayed);
ret->state = CachedExtent::extent_state_t::INITIAL_WRITE_PENDING;
return ret;
}
void mark_delayed_extent_inline(
Transaction& t,
LogicalCachedExtentRef& ref) {
t.mark_delayed_extent_inline(ref);
}
void mark_delayed_extent_ool(
Transaction& t,
LogicalCachedExtentRef& ref) {
t.mark_delayed_extent_ool(ref);
}
/**
* alloc_new_extent
*
@ -392,7 +405,8 @@ public:
CachedExtentRef alloc_new_extent_by_type(
Transaction &t, ///< [in, out] current transaction
extent_types_t type, ///< [in] type tag
segment_off_t length ///< [in] length
segment_off_t length, ///< [in] length
bool delayed = false ///< [in] whether delay addr allocation
);
/**
@ -487,6 +501,9 @@ public:
Transaction &t,
F &&f)
{
// journal replay should has been finished at this point,
// Cache::root should have been inserted to the dirty list
assert(root->is_dirty());
std::vector<CachedExtentRef> dirty;
for (auto &e : extents) {
dirty.push_back(CachedExtentRef(&e));

View File

@ -61,6 +61,7 @@ std::ostream &operator<<(std::ostream &out, const CachedExtent &ext)
CachedExtent::~CachedExtent()
{
if (parent_index) {
assert(is_linked());
parent_index->erase(*this);
}
}

View File

@ -17,9 +17,13 @@
namespace crimson::os::seastore {
class ool_record_t;
class Transaction;
class CachedExtent;
using CachedExtentRef = boost::intrusive_ptr<CachedExtent>;
class SegmentedAllocator;
class TransactionManager;
class ExtentPlacementManager;
// #define DEBUG_CACHED_EXTENT_REF
#ifdef DEBUG_CACHED_EXTENT_REF
@ -320,6 +324,15 @@ public:
virtual ~CachedExtent();
/// type of the backend device that will hold this extent
device_type_t backend_type = device_type_t::NONE;
/// hint for allocators
ool_placement_hint_t hint;
bool is_inline() const {
return poffset.is_relative();
}
private:
template <typename T>
friend class read_set_item_t;
@ -341,6 +354,10 @@ private:
friend class ExtentIndex;
friend class Transaction;
bool is_linked() {
return extent_index_hook.is_linked();
}
/// hook for intrusive ref list (mainly dirty or lru list)
boost::intrusive::list_member_hook<> primary_ref_list_hook;
using primary_ref_list_member_options = boost::intrusive::member_hook<
@ -456,6 +473,10 @@ protected:
}
}
friend class crimson::os::seastore::ool_record_t;
friend class crimson::os::seastore::SegmentedAllocator;
friend class crimson::os::seastore::TransactionManager;
friend class crimson::os::seastore::ExtentPlacementManager;
};
std::ostream &operator<<(std::ostream &, CachedExtent::extent_state_t);
@ -529,6 +550,12 @@ public:
);
}
template <typename Disposer>
void clear_and_dispose(Disposer disposer) {
extent_index.clear_and_dispose(disposer);
bytes = 0;
}
void clear() {
extent_index.clear();
bytes = 0;
@ -550,12 +577,13 @@ public:
void erase(CachedExtent &extent) {
assert(extent.parent_index);
auto erased = extent_index.erase(extent);
assert(extent.is_linked());
auto erased = extent_index.erase(
extent_index.s_iterator_to(extent));
extent.parent_index = nullptr;
if (erased) {
bytes -= extent.get_length();
}
assert(erased);
bytes -= extent.get_length();
}
void replace(CachedExtent &to, CachedExtent &from) {

View File

@ -0,0 +1,271 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
// vim: ts=8 sw=2 smarttab expandtab
#include "crimson/os/seastore/journal.h"
#include "crimson/os/seastore/extent_placement_manager.h"
namespace {
seastar::logger& logger() {
return crimson::get_logger(ceph_subsys_seastore);
}
}
namespace crimson::os::seastore {
SegmentedAllocator::SegmentedAllocator(
SegmentProvider& sp,
SegmentManager& sm,
LBAManager& lba_manager,
Journal& journal,
Cache& cache)
: segment_provider(sp),
segment_manager(sm),
lba_manager(lba_manager),
journal(journal),
cache(cache)
{
std::generate_n(
std::back_inserter(writers),
crimson::common::get_conf<uint64_t>(
"seastore_init_rewrite_segments_num_per_device"),
[&] {
return Writer{
segment_provider,
segment_manager,
lba_manager,
journal,
cache};
});
}
SegmentedAllocator::Writer::finish_record_ret
SegmentedAllocator::Writer::finish_write(
Transaction& t,
ool_record_t& record) {
return trans_intr::do_for_each(record.get_extents(),
[this, &t](auto& ool_extent) {
auto& lextent = ool_extent.get_lextent();
logger().debug("SegmentedAllocator::Writer::finish_write: "
"extent: {}, ool_paddr: {}",
*lextent,
ool_extent.get_ool_paddr());
return lba_manager.update_mapping(
t,
lextent->get_laddr(),
lextent->get_paddr(),
ool_extent.get_ool_paddr()
).si_then([&ool_extent, &t, &lextent, this] {
ool_extent.persist_paddr();
lextent->backend_type = device_type_t::NONE;
lextent->hint = {};
cache.mark_delayed_extent_ool(t, lextent);
return finish_record_iertr::now();
});
}).si_then([&record] {
record.clear();
});
}
SegmentedAllocator::Writer::write_iertr::future<>
SegmentedAllocator::Writer::_write(
Transaction& t,
ool_record_t& record)
{
bufferlist bl = record.encode(current_segment->segment->get_segment_id(), 0);
seastar::promise<> pr;
current_segment->inflight_writes.emplace_back(pr.get_future());
logger().debug(
"SegmentedAllocator::Writer::write: written {} extents,"
" {} bytes to segment {} at {}",
record.get_num_extents(),
bl.length(),
current_segment->segment->get_segment_id(),
record.get_base());
return trans_intr::make_interruptible(
current_segment->segment->write(record.get_base(), bl).safe_then(
[this, pr=std::move(pr),
it=(--current_segment->inflight_writes.end()),
cs=current_segment]() mutable {
if (cs->outdated) {
pr.set_value();
} else{
current_segment->inflight_writes.erase(it);
}
return seastar::now();
})
).si_then([this, &record, &t]() mutable {
return finish_write(t, record);
});
}
void SegmentedAllocator::Writer::add_extent_to_write(
ool_record_t& record,
LogicalCachedExtentRef& extent) {
logger().debug(
"SegmentedAllocator::Writer::add_extent_to_write: "
"add extent {} to record",
extent);
extent->prepare_write();
record.add_extent(extent);
}
SegmentedAllocator::Writer::write_iertr::future<>
SegmentedAllocator::Writer::write(
Transaction& t,
std::list<LogicalCachedExtentRef>& extents)
{
auto write_func = [this, &extents, &t] {
return seastar::do_with(ool_record_t(segment_manager.get_block_size()),
[this, &extents, &t](auto& record) {
return trans_intr::repeat([this, &record, &t, &extents]()
-> write_iertr::future<seastar::stop_iteration> {
if (extents.empty()) {
return seastar::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::yes);
}
return segment_rotation_guard.wait(
[this] {
return !rolling_segment;
},
[this, &record, &extents, &t]() -> write_iertr::future<> {
record.set_base(allocated_to);
for (auto it = extents.begin();
it != extents.end();) {
auto& extent = *it;
auto wouldbe_length =
record.get_wouldbe_encoded_record_length(extent);
if (_needs_roll(wouldbe_length)) {
// reached the segment end, write and roll
assert(!rolling_segment);
rolling_segment = true;
auto num_extents = record.get_num_extents();
logger().debug(
"SegmentedAllocator::Writer::write: end of segment, writing {} extents to segment {} at {}",
num_extents,
current_segment->segment->get_segment_id(),
allocated_to);
return (num_extents ?
_write(t, record) :
write_iertr::now()
).si_then([this]() mutable {
return roll_segment(false);
});
}
add_extent_to_write(record, extent);
it = extents.erase(it);
}
record_size_t rsize = record.get_encoded_record_length();
logger().debug(
"SegmentedAllocator::Writer::write: writing {} extents to segment {} at {}",
record.get_num_extents(),
current_segment->segment->get_segment_id(),
allocated_to);
allocated_to += rsize.mdlength + rsize.dlength;
return _write(t, record);
}
).si_then([]()
-> write_iertr::future<seastar::stop_iteration> {
return seastar::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::no);
});
});
});
};
if (rolling_segment) {
return segment_rotation_guard.wait([this] {
return !rolling_segment;
}, std::move(write_func));
} else if (!current_segment) {
return trans_intr::make_interruptible(roll_segment(true)).si_then(
[write_func=std::move(write_func)] {
return write_func();
});
}
return write_func();
}
bool SegmentedAllocator::Writer::_needs_roll(segment_off_t length) const {
return allocated_to + length > current_segment->segment->get_write_capacity();
}
SegmentedAllocator::Writer::init_segment_ertr::future<>
SegmentedAllocator::Writer::init_segment(Segment& segment) {
bufferptr bp(
ceph::buffer::create_page_aligned(
segment_manager.get_block_size()));
bp.zero();
auto header =segment_header_t{
journal.next_journal_segment_seq - 1, // current seg seq = next seg seq - 1
segment.get_segment_id(),
NO_DELTAS, 0, true};
logger().debug("SegmentedAllocator::Writer::init_segment: initting {}, {}",
segment.get_segment_id(),
header);
ceph::bufferlist bl;
encode(header, bl);
bl.cbegin().copy(bl.length(), bp.c_str());
bl.clear();
bl.append(bp);
allocated_to = segment_manager.get_block_size();
return segment.write(0, bl).handle_error(
crimson::ct_error::input_output_error::pass_further{},
crimson::ct_error::assert_all{
"Invalid error when initing segment"}
);
}
SegmentedAllocator::Writer::roll_segment_ertr::future<>
SegmentedAllocator::Writer::roll_segment(bool set_rolling) {
if (set_rolling) {
rolling_segment = true;
}
assert(rolling_segment);
if (current_segment) {
(void) seastar::with_gate(writer_guard, [this] {
auto fut = seastar::now();
if (!current_segment->inflight_writes.empty()) {
fut = seastar::when_all_succeed(
current_segment->inflight_writes.begin(),
current_segment->inflight_writes.end());
}
current_segment->outdated = true;
return fut.then(
[cs=std::move(current_segment), this, it=(--open_segments.end())] {
return cs->segment->close().safe_then([this, cs, it] {
assert((*it).get() == cs.get());
segment_provider.close_segment(cs->segment->get_segment_id());
open_segments.erase(it);
});
});
}).handle_exception_type([](seastar::gate_closed_exception e) {
logger().debug(
"SegmentedAllocator::Writer::roll_segment:"
" writer_guard closed, should be stopping");
return seastar::now();
});
}
return segment_provider.get_segment().safe_then([this](auto segment) {
return segment_manager.open(segment);
}).safe_then([this](auto segref) {
return init_segment(*segref).safe_then([segref=std::move(segref), this] {
assert(!current_segment.get());
current_segment.reset(new open_segment_wrapper_t());
current_segment->segment = segref;
open_segments.emplace_back(current_segment);
rolling_segment = false;
segment_rotation_guard.broadcast();
});
}).handle_error(
roll_segment_ertr::pass_further{},
crimson::ct_error::all_same_way([] { ceph_assert(0 == "TODO"); })
);
}
}

View File

@ -0,0 +1,421 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
// vim: ts=8 sw=2 smarttab expandtab
#pragma once
#include "seastar/core/gate.hh"
#include "crimson/common/condition_variable.h"
#include "crimson/common/log.h"
#include "crimson/os/seastore/cache.h"
#include "crimson/os/seastore/cached_extent.h"
#include "crimson/os/seastore/lba_manager.h"
namespace crimson::os::seastore {
/**
* ool_record_t
*
* Encapsulates logic for building and encoding an ool record destined for
* an ool segment.
*
* Uses a metadata header to enable scanning the ool segment for gc purposes.
* Introducing a seperate physical->logical mapping would enable removing the
* metadata block overhead.
*/
class ool_record_t {
class OolExtent {
public:
OolExtent(LogicalCachedExtentRef& lextent)
: lextent(lextent) {}
void set_ool_paddr(paddr_t addr) {
ool_offset = addr;
}
paddr_t get_ool_paddr() const {
return ool_offset;
}
void persist_paddr() {
lextent->set_paddr(ool_offset);
ool_offset = P_ADDR_NULL;
}
bufferptr& get_bptr() {
return lextent->get_bptr();
}
LogicalCachedExtentRef& get_lextent() {
return lextent;
}
private:
paddr_t ool_offset;
LogicalCachedExtentRef lextent;
};
public:
ool_record_t(size_t block_size) : block_size(block_size) {}
record_size_t get_encoded_record_length() {
return crimson::os::seastore::get_encoded_record_length(record, block_size);
}
size_t get_wouldbe_encoded_record_length(LogicalCachedExtentRef& extent) {
auto raw_mdlength = get_encoded_record_raw_mdlength(record, block_size);
auto wouldbe_mdlength = p2roundup(
raw_mdlength + ceph::encoded_sizeof_bounded<extent_info_t>(),
block_size);
return wouldbe_mdlength + extent_buf_len + extent->get_bptr().length();
}
ceph::bufferlist encode(segment_id_t segment, segment_nonce_t nonce) {
assert(extents.size() == record.extents.size());
auto rsize = get_encoded_record_length();
segment_off_t extent_offset = base + rsize.mdlength;
for (auto& extent : extents) {
extent.set_ool_paddr(
{segment, extent_offset});
extent_offset += extent.get_bptr().length();
}
assert(extent_offset == (segment_off_t)(base + rsize.mdlength + rsize.dlength));
return encode_record(rsize, std::move(record), block_size, base, nonce);
}
void add_extent(LogicalCachedExtentRef& extent) {
extents.emplace_back(extent);
ceph::bufferlist bl;
bl.append(extent->get_bptr());
record.extents.emplace_back(extent_t{
extent->get_type(),
extent->get_laddr(),
std::move(bl)});
extent_buf_len += extent->get_bptr().length();
}
std::vector<OolExtent>& get_extents() {
return extents;
}
void set_base(segment_off_t b) {
base = b;
}
segment_off_t get_base() {
return base;
}
void clear() {
record.extents.clear();
extents.clear();
assert(!record.deltas.size());
extent_buf_len = 0;
base = MAX_SEG_OFF;
}
uint64_t get_num_extents() {
return extents.size();
}
private:
std::vector<OolExtent> extents;
record_t record;
size_t block_size;
segment_off_t extent_buf_len = 0;
segment_off_t base = MAX_SEG_OFF;
};
/**
* ExtentOolWriter
*
* Interface through which final write to ool segment is performed.
*/
class ExtentOolWriter {
public:
using write_iertr = trans_iertr<crimson::errorator<
crimson::ct_error::input_output_error, // media error or corruption
crimson::ct_error::invarg, // if offset is < write pointer or misaligned
crimson::ct_error::ebadf, // segment closed
crimson::ct_error::enospc // write exceeds segment size
>>;
using stop_ertr = Segment::close_ertr;
virtual stop_ertr::future<> stop() = 0;
virtual write_iertr::future<> write(
Transaction& t,
std::list<LogicalCachedExtentRef>& extent) = 0;
virtual ~ExtentOolWriter() {}
};
/**
* ExtentAllocator
*
* Handles allocating ool extents from a specific family of targets.
*/
class ExtentAllocator {
public:
using alloc_paddr_iertr = trans_iertr<crimson::errorator<
crimson::ct_error::input_output_error, // media error or corruption
crimson::ct_error::invarg, // if offset is < write pointer or misaligned
crimson::ct_error::ebadf, // segment closed
crimson::ct_error::enospc // write exceeds segment size
>>;
virtual alloc_paddr_iertr::future<> alloc_ool_extents_paddr(
Transaction& t,
std::list<LogicalCachedExtentRef>&) = 0;
using stop_ertr = ExtentOolWriter::stop_ertr;
virtual stop_ertr::future<> stop() = 0;
virtual ~ExtentAllocator() {};
};
using ExtentAllocatorRef = std::unique_ptr<ExtentAllocator>;
struct open_segment_wrapper_t : public boost::intrusive_ref_counter<
open_segment_wrapper_t,
boost::thread_unsafe_counter> {
SegmentRef segment;
std::list<seastar::future<>> inflight_writes;
bool outdated = false;
};
using open_segment_wrapper_ref =
boost::intrusive_ptr<open_segment_wrapper_t>;
/**
* SegmentedAllocator
*
* Handles out-of-line writes to a SegmentManager device (such as a ZNS device
* or conventional flash device where sequential writes are heavily preferred).
*
* Creates <seastore_init_rewrite_segments_per_device> Writer instances
* internally to round-robin writes. Later work will partition allocations
* based on hint (age, presumably) among the created Writers.
* Each Writer makes use of SegmentProvider to obtain a new segment for writes
* as needed.
*/
class SegmentedAllocator : public ExtentAllocator {
class Writer : public ExtentOolWriter {
public:
Writer(
SegmentProvider& sp,
SegmentManager& sm,
LBAManager& lba_manager,
Journal& journal,
Cache& cache)
: segment_provider(sp),
segment_manager(sm),
lba_manager(lba_manager),
journal(journal),
cache(cache)
{}
Writer(Writer &&) = default;
write_iertr::future<> write(
Transaction& t,
std::list<LogicalCachedExtentRef>& extent) final;
stop_ertr::future<> stop() final {
return writer_guard.close().then([this] {
return crimson::do_for_each(open_segments, [](auto& seg_wrapper) {
return seg_wrapper->segment->close();
});
});
}
private:
using update_lba_mapping_iertr = LBAManager::update_le_mapping_iertr;
using finish_record_iertr = update_lba_mapping_iertr;
using finish_record_ret = finish_record_iertr::future<>;
finish_record_ret finish_write(
Transaction& t,
ool_record_t& record);
bool _needs_roll(segment_off_t length) const;
write_iertr::future<> _write(
Transaction& t,
ool_record_t& record);
using roll_segment_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
roll_segment_ertr::future<> roll_segment(bool);
using init_segment_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
init_segment_ertr::future<> init_segment(Segment& segment);
void add_extent_to_write(
ool_record_t&,
LogicalCachedExtentRef& extent);
SegmentProvider& segment_provider;
SegmentManager& segment_manager;
open_segment_wrapper_ref current_segment;
std::list<open_segment_wrapper_ref> open_segments;
segment_off_t allocated_to = 0;
LBAManager& lba_manager;
Journal& journal;
crimson::condition_variable segment_rotation_guard;
seastar::gate writer_guard;
bool rolling_segment = false;
Cache& cache;
};
public:
SegmentedAllocator(
SegmentProvider& sp,
SegmentManager& sm,
LBAManager& lba_manager,
Journal& journal,
Cache& cache);
Writer &get_writer(ool_placement_hint_t hint) {
return writers[std::rand() % writers.size()];
}
alloc_paddr_iertr::future<> alloc_ool_extents_paddr(
Transaction& t,
std::list<LogicalCachedExtentRef>& extents) final {
return seastar::do_with(
std::map<Writer*, std::list<LogicalCachedExtentRef>>(),
[this, extents=std::move(extents), &t](auto& alloc_map) {
for (auto& extent : extents) {
auto writer = &(get_writer(extent->hint));
alloc_map[writer].emplace_back(extent);
}
return trans_intr::do_for_each(alloc_map, [&t](auto& p) {
auto writer = p.first;
auto& extents_to_pesist = p.second;
return writer->write(t, extents_to_pesist);
});
});
}
stop_ertr::future<> stop() {
return crimson::do_for_each(writers, [](auto& writer) {
return writer.stop();
});
}
private:
SegmentProvider& segment_provider;
SegmentManager& segment_manager;
std::vector<Writer> writers;
LBAManager& lba_manager;
Journal& journal;
Cache& cache;
};
class ExtentPlacementManager {
public:
ExtentPlacementManager(
Cache& cache,
LBAManager& lba_manager
) : cache(cache), lba_manager(lba_manager) {}
/**
* alloc_new_extent_by_type
*
* Create a new extent, CachedExtent::poffset may not be set
* if a delayed allocation is needed.
*/
CachedExtentRef alloc_new_extent_by_type(
Transaction& t,
extent_types_t type,
segment_off_t length,
ool_placement_hint_t hint = ool_placement_hint_t::NONE) {
// only logical extents should fall in this path
assert(is_logical_type(type));
auto dtype = get_allocator_type(hint);
CachedExtentRef extent;
// for extents that would be stored in NVDIMM/PMEM, no delayed
// allocation is needed
if (need_delayed_allocation(dtype)) {
// set a unique temperary paddr, this is necessary because
// transaction's write_set is indexed by paddr
extent = cache.alloc_new_extent_by_type(t, type, length, true);
} else {
extent = cache.alloc_new_extent_by_type(t, type, length);
}
extent->backend_type = dtype;
extent->hint = hint;
return extent;
}
template<
typename T,
std::enable_if_t<std::is_base_of_v<LogicalCachedExtent, T>, int> = 0>
TCachedExtentRef<T> alloc_new_extent(
Transaction& t,
segment_off_t length,
ool_placement_hint_t hint = ool_placement_hint_t::NONE)
{
auto dtype = get_allocator_type(hint);
TCachedExtentRef<T> extent;
if (need_delayed_allocation(dtype)) {
// set a unique temperary paddr, this is necessary because
// transaction's write_set is indexed by paddr
extent = cache.alloc_new_extent<T>(t, length, true);
} else {
extent = cache.alloc_new_extent<T>(t, length);
}
extent->backend_type = dtype;
extent->hint = hint;
return extent;
}
/**
* delayed_alloc_or_ool_write
*
* Performs any outstanding ool writes and updates pending lba updates
* accordingly
*/
using alloc_paddr_iertr = ExtentOolWriter::write_iertr;
alloc_paddr_iertr::future<> delayed_alloc_or_ool_write(
Transaction& t) {
return seastar::do_with(
std::map<ExtentAllocator*, std::list<LogicalCachedExtentRef>>(),
std::list<std::pair<paddr_t, LogicalCachedExtentRef>>(),
[this, &t](auto& alloc_map, auto& inline_list) mutable {
auto& alloc_list = t.get_delayed_alloc_list();
for (auto& extent : alloc_list) {
// extents may be invalidated
if (!extent->is_valid()) {
continue;
}
if (should_be_inline(extent)) {
auto old_addr = extent->get_paddr();
cache.mark_delayed_extent_inline(t, extent);
inline_list.emplace_back(old_addr, extent);
continue;
}
auto& allocator_ptr = get_allocator(extent->backend_type, extent->hint);
alloc_map[allocator_ptr.get()].emplace_back(extent);
}
return trans_intr::do_for_each(alloc_map, [&t](auto& p) {
auto allocator = p.first;
auto& extents = p.second;
return allocator->alloc_ool_extents_paddr(t, extents);
}).si_then([&inline_list, this, &t] {
return trans_intr::do_for_each(inline_list, [this, &t](auto& p) {
auto old_addr = p.first;
auto& extent = p.second;
return lba_manager.update_mapping(
t,
extent->get_laddr(),
old_addr,
extent->get_paddr());
});
});
});
}
void add_allocator(device_type_t type, ExtentAllocatorRef&& allocator) {
allocators[type].emplace_back(std::move(allocator));
}
private:
device_type_t get_allocator_type(ool_placement_hint_t hint) {
return device_type_t::SEGMENTED;
}
bool should_be_inline(LogicalCachedExtentRef& extent) {
return (std::rand() % 2) == 0;
}
ExtentAllocatorRef& get_allocator(
device_type_t type,
ool_placement_hint_t hint) {
auto& devices = allocators[type];
return devices[std::rand() % devices.size()];
}
Cache& cache;
LBAManager& lba_manager;
std::map<device_type_t, std::vector<ExtentAllocatorRef>> allocators;
};
using ExtentPlacementManagerRef = std::unique_ptr<ExtentPlacementManager>;
}

View File

@ -8,6 +8,7 @@
#include "crimson/os/seastore/journal.h"
#include "include/intarith.h"
#include "crimson/os/seastore/segment_cleaner.h"
#include "crimson/os/seastore/segment_manager.h"
namespace {
@ -25,6 +26,7 @@ std::ostream &operator<<(std::ostream &out, const segment_header_t &header)
<< ", physical_segment_id=" << header.physical_segment_id
<< ", journal_tail=" << header.journal_tail
<< ", segment_nonce=" << header.segment_nonce
<< ", out-of-line=" << header.out_of_line
<< ")";
}
@ -48,18 +50,14 @@ segment_nonce_t generate_nonce(
sizeof(meta.seastore_id.uuid));
}
Journal::Journal(SegmentManager &segment_manager)
: segment_manager(segment_manager) {}
Journal::Journal(SegmentManager &segment_manager, Scanner& scanner)
: segment_manager(segment_manager), scanner(scanner) {}
Journal::initialize_segment_ertr::future<segment_seq_t>
Journal::initialize_segment(Segment &segment)
{
auto new_tail = segment_provider->get_journal_tail_target();
logger().debug(
"initialize_segment {} journal_tail_target {}",
segment.get_segment_id(),
new_tail);
// write out header
ceph_assert(segment.get_write_ptr() == 0);
bufferlist bl;
@ -71,7 +69,13 @@ Journal::initialize_segment(Segment &segment)
seq,
segment.get_segment_id(),
segment_provider->get_journal_tail_target(),
current_segment_nonce};
current_segment_nonce,
false};
logger().debug(
"initialize_segment {} journal_tail_target {}, header {}",
segment.get_segment_id(),
new_tail,
header);
encode(header, bl);
bufferptr bp(
@ -96,99 +100,14 @@ Journal::initialize_segment(Segment &segment)
});
}
ceph::bufferlist Journal::encode_record(
record_size_t rsize,
record_t &&record)
{
bufferlist data_bl;
for (auto &i: record.extents) {
data_bl.append(i.bl);
}
bufferlist bl;
record_header_t header{
rsize.mdlength,
rsize.dlength,
(uint32_t)record.deltas.size(),
(uint32_t)record.extents.size(),
current_segment_nonce,
committed_to,
data_bl.crc32c(-1)
};
encode(header, bl);
auto metadata_crc_filler = bl.append_hole(sizeof(uint32_t));
for (const auto &i: record.extents) {
encode(extent_info_t(i), bl);
}
for (const auto &i: record.deltas) {
encode(i, bl);
}
auto block_size = segment_manager.get_block_size();
if (bl.length() % block_size != 0) {
bl.append_zero(
block_size - (bl.length() % block_size));
}
ceph_assert(bl.length() == rsize.mdlength);
auto bliter = bl.cbegin();
auto metadata_crc = bliter.crc32c(
ceph::encoded_sizeof_bounded<record_header_t>(),
-1);
bliter += sizeof(checksum_t); /* crc hole again */
metadata_crc = bliter.crc32c(
bliter.get_remaining(),
metadata_crc);
ceph_le32 metadata_crc_le;
metadata_crc_le = metadata_crc;
metadata_crc_filler.copy_in(
sizeof(checksum_t),
reinterpret_cast<const char *>(&metadata_crc_le));
bl.claim_append(data_bl);
ceph_assert(bl.length() == (rsize.dlength + rsize.mdlength));
return bl;
}
bool Journal::validate_metadata(const bufferlist &bl)
{
auto bliter = bl.cbegin();
auto test_crc = bliter.crc32c(
ceph::encoded_sizeof_bounded<record_header_t>(),
-1);
ceph_le32 recorded_crc_le;
decode(recorded_crc_le, bliter);
uint32_t recorded_crc = recorded_crc_le;
test_crc = bliter.crc32c(
bliter.get_remaining(),
test_crc);
return test_crc == recorded_crc;
}
Journal::read_validate_data_ret Journal::read_validate_data(
paddr_t record_base,
const record_header_t &header)
{
return segment_manager.read(
record_base.add_offset(header.mdlength),
header.dlength
).safe_then([=, &header](auto bptr) {
bufferlist bl;
bl.append(bptr);
return bl.crc32c(-1) == header.data_crc;
});
}
Journal::write_record_ret Journal::write_record(
record_size_t rsize,
record_t &&record,
OrderingHandle &handle)
{
ceph::bufferlist to_write = encode_record(
rsize, std::move(record));
rsize, std::move(record), segment_manager.get_block_size(),
committed_to, current_segment_nonce);
auto target = written_to;
assert((to_write.length() % segment_manager.get_block_size()) == 0);
written_to += to_write.length();
@ -231,24 +150,6 @@ Journal::write_record_ret Journal::write_record(
});
}
Journal::record_size_t Journal::get_encoded_record_length(
const record_t &record) const {
extent_len_t metadata =
(extent_len_t)ceph::encoded_sizeof_bounded<record_header_t>();
metadata += sizeof(checksum_t) /* crc */;
metadata += record.extents.size() *
ceph::encoded_sizeof_bounded<extent_info_t>();
extent_len_t data = 0;
for (const auto &i: record.deltas) {
metadata += ceph::encoded_sizeof(i);
}
for (const auto &i: record.extents) {
data += i.bl.length();
}
metadata = p2roundup(metadata, (extent_len_t)segment_manager.get_block_size());
return record_size_t{metadata, data};
}
bool Journal::needs_roll(segment_off_t length) const
{
return length + written_to >
@ -286,49 +187,6 @@ Journal::roll_journal_segment()
);
}
Journal::read_segment_header_ret
Journal::read_segment_header(segment_id_t segment)
{
return segment_manager.read(
paddr_t{segment, 0},
segment_manager.get_block_size()
).handle_error(
read_segment_header_ertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error in Journal::read_segment_header"
}
).safe_then([=](bufferptr bptr) -> read_segment_header_ret {
logger().debug("segment {} bptr size {}", segment, bptr.length());
segment_header_t header;
bufferlist bl;
bl.push_back(bptr);
logger().debug(
"Journal::read_segment_header: segment {} block crc {}",
segment,
bl.begin().crc32c(segment_manager.get_block_size(), 0));
auto bp = bl.cbegin();
try {
decode(header, bp);
} catch (ceph::buffer::error &e) {
logger().debug(
"Journal::read_segment_header: segment {} unable to decode "
"header, skipping",
segment);
return crimson::ct_error::enodata::make();
}
logger().debug(
"Journal::read_segment_header: segment {} header {}",
segment,
header);
return read_segment_header_ret(
read_segment_header_ertr::ready_future_marker{},
header);
});
}
Journal::open_for_write_ret Journal::open_for_write()
{
return roll_journal_segment().safe_then([this](auto seq) {
@ -343,187 +201,80 @@ Journal::open_for_write_ret Journal::open_for_write()
});
}
Journal::find_replay_segments_fut Journal::find_replay_segments()
Journal::prep_replay_segments_fut
Journal::prep_replay_segments(
std::vector<std::pair<segment_id_t, segment_header_t>> segments)
{
return seastar::do_with(
std::vector<std::pair<segment_id_t, segment_header_t>>(),
[this](auto &&segments) mutable {
return crimson::do_for_each(
boost::make_counting_iterator(segment_id_t{0}),
boost::make_counting_iterator(segment_manager.get_num_segments()),
[this, &segments](auto i) {
return read_segment_header(i
).safe_then([this, &segments, i](auto header) mutable {
if (generate_nonce(
header.journal_segment_seq,
segment_manager.get_meta()) != header.segment_nonce) {
logger().debug(
"find_replay_segments: nonce mismatch segment {} header {}",
i,
header);
assert(0 == "impossible");
return find_replay_segments_ertr::now();
}
segments.emplace_back(i, std::move(header));
return find_replay_segments_ertr::now();
}).handle_error(
crimson::ct_error::enoent::handle([i](auto) {
logger().debug(
"find_replay_segments: segment {} not available for read",
i);
return find_replay_segments_ertr::now();
}),
crimson::ct_error::enodata::handle([i](auto) {
logger().debug(
"find_replay_segments: segment {} header undecodable",
i);
return find_replay_segments_ertr::now();
}),
find_replay_segments_ertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error in Journal::find_replay_segments"
}
);
}).safe_then([this, &segments]() mutable -> find_replay_segments_fut {
logger().debug(
"find_replay_segments: have {} segments",
segments.size());
if (segments.empty()) {
return crimson::ct_error::input_output_error::make();
}
std::sort(
segments.begin(),
segments.end(),
[](const auto &lt, const auto &rt) {
return lt.second.journal_segment_seq <
rt.second.journal_segment_seq;
});
next_journal_segment_seq =
segments.rbegin()->second.journal_segment_seq + 1;
std::for_each(
segments.begin(),
segments.end(),
[this](auto &seg) {
segment_provider->init_mark_segment_closed(
seg.first,
seg.second.journal_segment_seq);
});
auto journal_tail = segments.rbegin()->second.journal_tail;
segment_provider->update_journal_tail_committed(journal_tail);
auto replay_from = journal_tail.offset;
logger().debug(
"Journal::find_replay_segments: journal_tail={}",
journal_tail);
auto from = segments.begin();
if (replay_from != P_ADDR_NULL) {
from = std::find_if(
segments.begin(),
segments.end(),
[&replay_from](const auto &seg) -> bool {
return seg.first == replay_from.segment;
});
if (from->second.journal_segment_seq != journal_tail.segment_seq) {
logger().error(
"find_replay_segments: journal_tail {} does not match {}",
journal_tail,
from->second);
assert(0 == "invalid");
}
} else {
replay_from = paddr_t{
from->first,
(segment_off_t)segment_manager.get_block_size()};
}
auto ret = replay_segments_t(segments.end() - from);
std::transform(
from, segments.end(), ret.begin(),
[this](const auto &p) {
auto ret = journal_seq_t{
p.second.journal_segment_seq,
paddr_t{
p.first,
(segment_off_t)segment_manager.get_block_size()}};
logger().debug(
"Journal::find_replay_segments: replaying from {}",
ret);
return std::make_pair(ret, p.second);
});
ret[0].first.offset = replay_from;
return find_replay_segments_fut(
find_replay_segments_ertr::ready_future_marker{},
std::move(ret));
});
});
}
Journal::read_validate_record_metadata_ret Journal::read_validate_record_metadata(
paddr_t start,
segment_nonce_t nonce)
{
auto block_size = segment_manager.get_block_size();
if (start.offset + block_size > (int64_t)segment_manager.get_segment_size()) {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::nullopt);
logger().debug(
"prep_replay_segments: have {} segments",
segments.size());
if (segments.empty()) {
return crimson::ct_error::input_output_error::make();
}
return segment_manager.read(start, block_size
).safe_then(
[=](bufferptr bptr) mutable
-> read_validate_record_metadata_ret {
logger().debug("read_validate_record_metadata: reading {}", start);
auto block_size = segment_manager.get_block_size();
bufferlist bl;
bl.append(bptr);
auto bp = bl.cbegin();
record_header_t header;
try {
decode(header, bp);
} catch (ceph::buffer::error &e) {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::nullopt);
}
if (header.segment_nonce != nonce) {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::nullopt);
}
if (header.mdlength > (extent_len_t)block_size) {
if (start.offset + header.mdlength >
(int64_t)segment_manager.get_segment_size()) {
return crimson::ct_error::input_output_error::make();
}
return segment_manager.read(
{start.segment, start.offset + (segment_off_t)block_size},
header.mdlength - block_size).safe_then(
[header=std::move(header), bl=std::move(bl)](
auto &&bptail) mutable {
bl.push_back(bptail);
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::make_pair(std::move(header), std::move(bl)));
});
} else {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::make_pair(std::move(header), std::move(bl))
);
}
}).safe_then([=](auto p) {
if (p && validate_metadata(p->second)) {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::move(*p)
);
} else {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::nullopt);
}
std::sort(
segments.begin(),
segments.end(),
[](const auto &lt, const auto &rt) {
return lt.second.journal_segment_seq <
rt.second.journal_segment_seq;
});
next_journal_segment_seq =
segments.rbegin()->second.journal_segment_seq + 1;
std::for_each(
segments.begin(),
segments.end(),
[this](auto &seg) {
segment_provider->init_mark_segment_closed(
seg.first,
seg.second.journal_segment_seq,
false);
});
auto journal_tail = segments.rbegin()->second.journal_tail;
segment_provider->update_journal_tail_committed(journal_tail);
auto replay_from = journal_tail.offset;
logger().debug(
"Journal::prep_replay_segments: journal_tail={}",
journal_tail);
auto from = segments.begin();
if (replay_from != P_ADDR_NULL) {
from = std::find_if(
segments.begin(),
segments.end(),
[&replay_from](const auto &seg) -> bool {
return seg.first == replay_from.segment;
});
if (from->second.journal_segment_seq != journal_tail.segment_seq) {
logger().error(
"prep_replay_segments: journal_tail {} does not match {}",
journal_tail,
from->second);
assert(0 == "invalid");
}
} else {
replay_from = paddr_t{
from->first,
(segment_off_t)segment_manager.get_block_size()};
}
auto ret = replay_segments_t(segments.end() - from);
std::transform(
from, segments.end(), ret.begin(),
[this](const auto &p) {
auto ret = journal_seq_t{
p.second.journal_segment_seq,
paddr_t{
p.first,
(segment_off_t)segment_manager.get_block_size()}};
logger().debug(
"Journal::prep_replay_segments: replaying from {}",
ret);
return std::make_pair(ret, p.second);
});
ret[0].first.offset = replay_from;
return prep_replay_segments_fut(
prep_replay_segments_ertr::ready_future_marker{},
std::move(ret));
}
std::optional<std::vector<delta_info_t>> Journal::try_decode_deltas(
@ -546,25 +297,6 @@ std::optional<std::vector<delta_info_t>> Journal::try_decode_deltas(
return deltas;
}
std::optional<std::vector<extent_info_t>> Journal::try_decode_extent_infos(
record_header_t header,
const bufferlist &bl)
{
auto bliter = bl.cbegin();
bliter += ceph::encoded_sizeof_bounded<record_header_t>();
bliter += sizeof(checksum_t) /* crc */;
logger().debug("{}: decoding {} extents", __func__, header.extents);
std::vector<extent_info_t> extent_infos(header.extents);
for (auto &&i : extent_infos) {
try {
decode(i, bliter);
} catch (ceph::buffer::error &e) {
return std::nullopt;
}
}
return extent_infos;
}
Journal::replay_ertr::future<>
Journal::replay_segment(
journal_seq_t seq,
@ -574,7 +306,7 @@ Journal::replay_segment(
logger().debug("replay_segment: starting at {}", seq);
return seastar::do_with(
scan_valid_records_cursor(seq.offset),
found_record_handler_t(
Scanner::found_record_handler_t(
[=, &handler](paddr_t base,
const record_header_t &header,
const bufferlist &mdbuf) {
@ -619,20 +351,29 @@ Journal::replay_segment(
});
}),
[=](auto &cursor, auto &dhandler) {
return scan_valid_records(
return scanner.scan_valid_records(
cursor,
header.segment_nonce,
std::numeric_limits<size_t>::max(),
dhandler).safe_then([](auto){});
dhandler).safe_then([](auto){}
).handle_error(
replay_ertr::pass_further{},
crimson::ct_error::assert_all{
"shouldn't meet with any other error other replay_ertr"
}
);;
});
}
Journal::replay_ret Journal::replay(delta_handler_t &&delta_handler)
Journal::replay_ret Journal::replay(
std::vector<std::pair<segment_id_t, segment_header_t>>&& segment_headers,
delta_handler_t &&delta_handler)
{
return seastar::do_with(
std::move(delta_handler), replay_segments_t(),
[this](auto &handler, auto &segments) mutable -> replay_ret {
return find_replay_segments().safe_then(
[this, segment_headers=std::move(segment_headers)]
(auto &handler, auto &segments) mutable -> replay_ret {
return prep_replay_segments(std::move(segment_headers)).safe_then(
[this, &handler, &segments](auto replay_segs) mutable {
logger().debug("replay: found {} segments", replay_segs.size());
segments = std::move(replay_segs);
@ -643,165 +384,4 @@ Journal::replay_ret Journal::replay(delta_handler_t &&delta_handler)
});
}
Journal::scan_extents_ret Journal::scan_extents(
scan_extents_cursor &cursor,
extent_len_t bytes_to_read)
{
auto ret = std::make_unique<scan_extents_ret_bare>();
auto* extents = ret.get();
return read_segment_header(cursor.get_offset().segment
).handle_error(
scan_extents_ertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error in Journal::scan_extents"
}
).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) {
auto segment_nonce = segment_header.segment_nonce;
return seastar::do_with(
found_record_handler_t(
[extents, this](
paddr_t base,
const record_header_t &header,
const bufferlist &mdbuf) mutable {
auto infos = try_decode_extent_infos(
header,
mdbuf);
if (!infos) {
// This should be impossible, we did check the crc on the mdbuf
logger().error(
"Journal::scan_extents unable to decode extents for record {}",
base);
assert(infos);
}
paddr_t extent_offset = base.add_offset(header.mdlength);
for (const auto &i : *infos) {
extents->emplace_back(extent_offset, i);
extent_offset.offset += i.len;
}
return scan_extents_ertr::now();
}),
[=, &cursor](auto &dhandler) {
return scan_valid_records(
cursor,
segment_nonce,
bytes_to_read,
dhandler).discard_result();
});
}).safe_then([ret=std::move(ret)] {
return std::move(*ret);
});
}
Journal::scan_valid_records_ret Journal::scan_valid_records(
scan_valid_records_cursor &cursor,
segment_nonce_t nonce,
size_t budget,
found_record_handler_t &handler)
{
if (cursor.offset.offset == 0) {
cursor.offset.offset = segment_manager.get_block_size();
}
auto retref = std::make_unique<size_t>(0);
auto budget_used = *retref;
return crimson::repeat(
[=, &cursor, &budget_used, &handler]() mutable
-> scan_valid_records_ertr::future<seastar::stop_iteration> {
return [=, &handler, &cursor, &budget_used] {
if (!cursor.last_valid_header_found) {
return read_validate_record_metadata(cursor.offset, nonce
).safe_then([=, &cursor](auto md) {
logger().debug(
"Journal::scan_valid_records: read complete {}",
cursor.offset);
if (!md) {
logger().debug(
"Journal::scan_valid_records: found invalid header at {}, presumably at end",
cursor.offset);
cursor.last_valid_header_found = true;
return scan_valid_records_ertr::now();
} else {
logger().debug(
"Journal::scan_valid_records: valid record read at {}",
cursor.offset);
cursor.last_committed = paddr_t{
cursor.offset.segment,
md->first.committed_to};
cursor.pending_records.emplace_back(
cursor.offset,
md->first,
md->second);
cursor.offset.offset +=
md->first.dlength + md->first.mdlength;
return scan_valid_records_ertr::now();
}
}).safe_then([=, &cursor, &budget_used, &handler] {
return crimson::repeat(
[=, &budget_used, &cursor, &handler] {
logger().debug(
"Journal::scan_valid_records: valid record read, processing queue");
if (cursor.pending_records.empty()) {
/* This is only possible if the segment is empty.
* A record's last_commited must be prior to its own
* location since it itself cannot yet have been committed
* at its own time of submission. Thus, the most recently
* read record must always fall after cursor.last_committed */
return scan_valid_records_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::yes);
}
auto &next = cursor.pending_records.front();
if (next.offset > cursor.last_committed) {
return scan_valid_records_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::yes);
}
budget_used +=
next.header.dlength + next.header.mdlength;
return handler(
next.offset,
next.header,
next.mdbuffer
).safe_then([&cursor] {
cursor.pending_records.pop_front();
return scan_valid_records_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::no);
});
});
});
} else {
assert(!cursor.pending_records.empty());
auto &next = cursor.pending_records.front();
return read_validate_data(next.offset, next.header
).safe_then([=, &budget_used, &next, &cursor, &handler](auto valid) {
if (!valid) {
cursor.pending_records.clear();
return scan_valid_records_ertr::now();
}
budget_used +=
next.header.dlength + next.header.mdlength;
return handler(
next.offset,
next.header,
next.mdbuffer
).safe_then([&cursor] {
cursor.pending_records.pop_front();
return scan_valid_records_ertr::now();
});
});
}
}().safe_then([=, &budget_used, &cursor] {
if (cursor.is_complete() || budget_used >= budget) {
return seastar::stop_iteration::yes;
} else {
return seastar::stop_iteration::no;
}
});
}).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret {
return scan_valid_records_ret(
scan_valid_records_ertr::ready_future_marker{},
std::move(*retref));
});
}
}

View File

@ -14,6 +14,7 @@
#include "include/denc.h"
#include "crimson/common/log.h"
#include "crimson/os/seastore/scanner.h"
#include "crimson/os/seastore/segment_manager.h"
#include "crimson/os/seastore/ordering_handle.h"
#include "crimson/os/seastore/seastore_types.h"
@ -21,115 +22,18 @@
namespace crimson::os::seastore {
using segment_nonce_t = uint32_t;
/**
* Segment header
*
* Every segment contains and encode segment_header_t in the first block.
* Our strategy for finding the journal replay point is:
* 1) Find the segment with the highest journal_segment_seq
* 2) Replay starting at record located at that segment's journal_tail
*/
struct segment_header_t {
segment_seq_t journal_segment_seq;
segment_id_t physical_segment_id; // debugging
journal_seq_t journal_tail;
segment_nonce_t segment_nonce;
DENC(segment_header_t, v, p) {
DENC_START(1, 1, p);
denc(v.journal_segment_seq, p);
denc(v.physical_segment_id, p);
denc(v.journal_tail, p);
denc(v.segment_nonce, p);
DENC_FINISH(p);
}
};
std::ostream &operator<<(std::ostream &out, const segment_header_t &header);
struct record_header_t {
// Fixed portion
extent_len_t mdlength; // block aligned, length of metadata
extent_len_t dlength; // block aligned, length of data
uint32_t deltas; // number of deltas
uint32_t extents; // number of extents
segment_nonce_t segment_nonce;// nonce of containing segment
segment_off_t committed_to; // records in this segment prior to committed_to
// have been fully written
checksum_t data_crc; // crc of data payload
DENC(record_header_t, v, p) {
DENC_START(1, 1, p);
denc(v.mdlength, p);
denc(v.dlength, p);
denc(v.deltas, p);
denc(v.extents, p);
denc(v.segment_nonce, p);
denc(v.committed_to, p);
denc(v.data_crc, p);
DENC_FINISH(p);
}
};
struct extent_info_t {
extent_types_t type = extent_types_t::NONE;
laddr_t addr = L_ADDR_NULL;
extent_len_t len = 0;
extent_info_t() = default;
extent_info_t(const extent_t &et)
: type(et.type), addr(et.addr), len(et.bl.length()) {}
DENC(extent_info_t, v, p) {
DENC_START(1, 1, p);
denc(v.type, p);
denc(v.addr, p);
denc(v.len, p);
DENC_FINISH(p);
}
};
std::ostream &operator<<(std::ostream &out, const extent_info_t &header);
/**
* Callback interface for managing available segments
*/
class JournalSegmentProvider {
public:
using get_segment_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
using get_segment_ret = get_segment_ertr::future<segment_id_t>;
virtual get_segment_ret get_segment() = 0;
virtual void close_segment(segment_id_t) {}
virtual void set_journal_segment(
segment_id_t segment,
segment_seq_t seq) {}
virtual journal_seq_t get_journal_tail_target() const = 0;
virtual void update_journal_tail_committed(journal_seq_t tail_committed) = 0;
virtual void init_mark_segment_closed(
segment_id_t segment, segment_seq_t seq) {}
virtual segment_seq_t get_seq(segment_id_t id) { return 0; }
virtual ~JournalSegmentProvider() {}
};
class SegmentProvider;
class SegmentedAllocator;
/**
* Manages stream of atomically written records to a SegmentManager.
*/
class Journal {
public:
Journal(SegmentManager &segment_manager);
Journal(SegmentManager &segment_manager, Scanner& scanner);
/**
* Sets the JournalSegmentProvider.
* Sets the SegmentProvider.
*
* Not provided in constructor to allow the provider to not own
* or construct the Journal (TransactionManager).
@ -137,7 +41,7 @@ public:
* Note, Journal does not own this ptr, user must ensure that
* *provider outlives Journal.
*/
void set_segment_provider(JournalSegmentProvider *provider) {
void set_segment_provider(SegmentProvider *provider) {
segment_provider = provider;
}
@ -192,7 +96,8 @@ public:
OrderingHandle &handle
) {
assert(write_pipeline);
auto rsize = get_encoded_record_length(record);
auto rsize = get_encoded_record_length(
record, segment_manager.get_block_size());
auto total = rsize.mdlength + rsize.dlength;
if (total > max_record_length()) {
auto &logger = crimson::get_logger(ceph_subsys_seastore);
@ -232,33 +137,16 @@ public:
replay_ret(journal_seq_t seq,
paddr_t record_block_base,
const delta_info_t&)>;
replay_ret replay(delta_handler_t &&delta_handler);
/**
* scan_extents
*
* Scans records beginning at addr until the first record boundary after
* addr + bytes_to_read.
*
* Returns list<extent, extent_info>
* cursor.is_complete() will be true when no further extents exist in segment.
*/
class scan_valid_records_cursor;
using scan_extents_cursor = scan_valid_records_cursor;
using scan_extents_ertr = SegmentManager::read_ertr;
using scan_extents_ret_bare = std::list<std::pair<paddr_t, extent_info_t>>;
using scan_extents_ret = scan_extents_ertr::future<scan_extents_ret_bare>;
scan_extents_ret scan_extents(
scan_extents_cursor &cursor,
extent_len_t bytes_to_read
);
replay_ret replay(
std::vector<std::pair<segment_id_t, segment_header_t>>&& segment_headers,
delta_handler_t &&delta_handler);
void set_write_pipeline(WritePipeline *_write_pipeline) {
write_pipeline = _write_pipeline;
}
private:
JournalSegmentProvider *segment_provider = nullptr;
SegmentProvider *segment_provider = nullptr;
SegmentManager &segment_manager;
segment_seq_t next_journal_segment_seq = 0;
@ -268,6 +156,7 @@ private:
segment_off_t written_to = 0;
segment_off_t committed_to = 0;
Scanner& scanner;
WritePipeline *write_pipeline = nullptr;
void reset_soft_state() {
@ -283,40 +172,6 @@ private:
initialize_segment_ertr::future<segment_seq_t> initialize_segment(
Segment &segment);
struct record_size_t {
extent_len_t mdlength = 0;
extent_len_t dlength = 0;
record_size_t(
extent_len_t mdlength,
extent_len_t dlength)
: mdlength(mdlength), dlength(dlength) {}
};
/**
* Return <mdlength, dlength> pair denoting length of
* metadata and blocks respectively.
*/
record_size_t get_encoded_record_length(
const record_t &record) const;
/// create encoded record bl
ceph::bufferlist encode_record(
record_size_t rsize,
record_t &&record);
/// validate embedded metadata checksum
static bool validate_metadata(const bufferlist &bl);
/// read and validate data
using read_validate_data_ertr = SegmentManager::read_ertr;
using read_validate_data_ret = read_validate_data_ertr::future<bool>;
read_validate_data_ret read_validate_data(
paddr_t record_base,
const record_header_t &header ///< caller must ensure lifetime through
/// future resolution
);
/// do record write
using write_record_ertr = crimson::errorator<
@ -335,96 +190,24 @@ private:
/// returns true iff current segment has insufficient space
bool needs_roll(segment_off_t length) const;
using read_segment_header_ertr = crimson::errorator<
crimson::ct_error::enoent,
crimson::ct_error::enodata,
crimson::ct_error::input_output_error
>;
using read_segment_header_ret = read_segment_header_ertr::future<
segment_header_t>;
read_segment_header_ret read_segment_header(segment_id_t segment);
/// return ordered vector of segments to replay
using replay_segments_t = std::vector<
std::pair<journal_seq_t, segment_header_t>>;
using find_replay_segments_ertr = crimson::errorator<
using prep_replay_segments_ertr = crimson::errorator<
crimson::ct_error::input_output_error
>;
using find_replay_segments_fut = find_replay_segments_ertr::future<
using prep_replay_segments_fut = prep_replay_segments_ertr::future<
replay_segments_t>;
find_replay_segments_fut find_replay_segments();
prep_replay_segments_fut prep_replay_segments(
std::vector<std::pair<segment_id_t, segment_header_t>> segments);
/// attempts to decode deltas from bl, return nullopt if unsuccessful
std::optional<std::vector<delta_info_t>> try_decode_deltas(
record_header_t header,
const bufferlist &bl);
/// attempts to decode extent infos from bl, return nullopt if unsuccessful
std::optional<std::vector<extent_info_t>> try_decode_extent_infos(
record_header_t header,
const bufferlist &bl);
/// read record metadata for record starting at start
using read_validate_record_metadata_ertr = replay_ertr;
using read_validate_record_metadata_ret =
read_validate_record_metadata_ertr::future<
std::optional<std::pair<record_header_t, bufferlist>>
>;
read_validate_record_metadata_ret read_validate_record_metadata(
paddr_t start,
segment_nonce_t nonce);
public:
/// scan segment for end incrementally
struct scan_valid_records_cursor {
bool last_valid_header_found = false;
paddr_t offset;
paddr_t last_committed;
struct found_record_t {
paddr_t offset;
record_header_t header;
bufferlist mdbuffer;
found_record_t(
paddr_t offset,
const record_header_t &header,
const bufferlist &mdbuffer)
: offset(offset), header(header), mdbuffer(mdbuffer) {}
};
std::deque<found_record_t> pending_records;
bool is_complete() const {
return last_valid_header_found && pending_records.empty();
}
paddr_t get_offset() const {
return offset;
}
scan_valid_records_cursor(
paddr_t offset)
: offset(offset) {}
};
private:
using scan_valid_records_ertr = SegmentManager::read_ertr;
using scan_valid_records_ret = scan_valid_records_ertr::future<
size_t>;
using found_record_handler_t = std::function<
scan_valid_records_ertr::future<>(
paddr_t record_block_base,
// callee may assume header and bl will remain valid until
// returned future resolves
const record_header_t &header,
const bufferlist &bl)>;
scan_valid_records_ret scan_valid_records(
scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
segment_nonce_t nonce, ///< [in] nonce for segment
size_t budget, ///< [in] max budget to use
found_record_handler_t &handler ///< [in] handler for records
); ///< @return used budget
/// replays records starting at start through end of segment
replay_ertr::future<>
replay_segment(
@ -434,13 +217,11 @@ private:
);
extent_len_t max_record_length() const;
friend class crimson::os::seastore::SegmentedAllocator;
};
using JournalRef = std::unique_ptr<Journal>;
}
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::segment_header_t)
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::record_header_t)
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::extent_info_t)
namespace crimson::os::seastore {

View File

@ -161,6 +161,18 @@ public:
Transaction &t,
CachedExtentRef extent) = 0;
/**
* delayed_update_mapping
*
* update lba mapping for delayed allocated extents
*/
using update_le_mapping_iertr = base_iertr;
using update_le_mapping_ret = base_iertr::future<>;
virtual update_le_mapping_ret update_mapping(
Transaction& t,
laddr_t laddr,
paddr_t prev_addr,
paddr_t paddr) = 0;
/**
* get_physical_extent_if_live
*

View File

@ -338,50 +338,14 @@ BtreeLBAManager::rewrite_extent_ret BtreeLBAManager::rewrite_extent(
ERRORT("{} has been invalidated", t, *extent);
}
assert(!extent->has_been_invalidated());
assert(!extent->is_logical());
logger().debug(
"{}: rewriting {}",
__func__,
*extent);
if (extent->is_logical()) {
auto lextent = extent->cast<LogicalCachedExtent>();
cache.retire_extent(t, extent);
auto nlextent = cache.alloc_new_extent_by_type(
t,
lextent->get_type(),
lextent->get_length())->cast<LogicalCachedExtent>();
lextent->get_bptr().copy_out(
0,
lextent->get_length(),
nlextent->get_bptr().c_str());
nlextent->set_laddr(lextent->get_laddr());
nlextent->set_pin(lextent->get_pin().duplicate());
logger().debug(
"{}: rewriting {} into {}",
__func__,
*lextent,
*nlextent);
return update_mapping(
t,
lextent->get_laddr(),
[prev_addr = lextent->get_paddr(), addr = nlextent->get_paddr()](
const lba_map_val_t &in) {
lba_map_val_t ret = in;
ceph_assert(in.paddr == prev_addr);
ret.paddr = addr;
return ret;
}).si_then(
[nlextent](auto) {},
rewrite_extent_iertr::pass_further{},
/* ENOENT in particular should be impossible */
crimson::ct_error::assert_all{
"Invalid error in BtreeLBAManager::rewrite_extent after update_mapping"
}
);
} else if (is_lba_node(*extent)) {
if (is_lba_node(*extent)) {
auto c = get_context(t);
return with_btree(
c,
@ -393,6 +357,33 @@ BtreeLBAManager::rewrite_extent_ret BtreeLBAManager::rewrite_extent(
}
}
BtreeLBAManager::update_le_mapping_ret
BtreeLBAManager::update_mapping(
Transaction& t,
laddr_t laddr,
paddr_t prev_addr,
paddr_t addr)
{
return update_mapping(
t,
laddr,
[prev_addr, addr](
const lba_map_val_t &in) {
assert(!addr.is_null());
lba_map_val_t ret = in;
ceph_assert(in.paddr == prev_addr);
ret.paddr = addr;
return ret;
}).si_then(
[](auto) {},
update_le_mapping_iertr::pass_further{},
/* ENOENT in particular should be impossible */
crimson::ct_error::assert_all{
"Invalid error in BtreeLBAManager::rewrite_extent after update_mapping"
}
);
}
BtreeLBAManager::get_physical_extent_if_live_ret
BtreeLBAManager::get_physical_extent_if_live(
Transaction &t,

View File

@ -103,6 +103,12 @@ public:
Transaction &t,
CachedExtentRef extent) final;
update_le_mapping_ret update_mapping(
Transaction& t,
laddr_t laddr,
paddr_t prev_addr,
paddr_t paddr) final;
get_physical_extent_if_live_ret get_physical_extent_if_live(
Transaction &t,
extent_types_t type,

View File

@ -549,7 +549,12 @@ LBABtree::handle_split_ret LBABtree::handle_split(
if (split_from > 1) {
auto &pos = iter.get_internal(split_from);
DEBUGT("splitting internal {} at depth {}", c.trans, *pos.node, split_from);
DEBUGT("splitting internal {} at depth {}, parent: {} at pos: {}",
c.trans,
*pos.node,
split_from,
*parent_pos.node,
parent_pos.pos);
auto [left, right] = split_level(parent_pos, pos);
if (pos.pos < left->get_size()) {
@ -562,7 +567,11 @@ LBABtree::handle_split_ret LBABtree::handle_split(
}
} else {
auto &pos = iter.leaf;
DEBUGT("splitting leaf {}", c.trans, *pos.node);
DEBUGT("splitting leaf {}, parent: {} at pos: {}",
c.trans,
*pos.node,
*parent_pos.node,
parent_pos.pos);
auto [left, right] = split_level(parent_pos, pos);
/* right->get_node_meta().begin == pivot == right->begin()->get_key()

View File

@ -323,7 +323,7 @@ OMapInnerNode::merge_entry(
auto is_left = (iter + 1) == iter_end();
auto donor_iter = is_left ? iter - 1 : iter + 1;
return omap_load_extent(oc, donor_iter->get_val(), get_meta().depth - 1)
.si_then([=] (auto &&donor) mutable {
.si_then([=, px=this] (auto &&donor) mutable {
auto [l, r] = is_left ?
std::make_pair(donor, entry) : std::make_pair(entry, donor);
auto [liter, riter] = is_left ?
@ -332,7 +332,8 @@ OMapInnerNode::merge_entry(
logger().debug("{}::merge_entry make_full_merge l {} r {}", __func__, *l, *r);
assert(entry->extent_is_below_min());
return l->make_full_merge(oc, r).si_then([liter=liter, riter=riter,
l=l, r=r, oc, this] (auto &&replacement){
l=l, r=r, oc, this, px] (auto &&replacement){
logger().debug("OMapInnerNode::merge_entry to update parent: {}", *px);
journal_inner_update(liter, replacement->get_laddr(), maybe_get_delta_buffer());
journal_inner_remove(riter, maybe_get_delta_buffer());
//retire extent
@ -353,7 +354,8 @@ OMapInnerNode::merge_entry(
} else {
logger().debug("{}::merge_entry balanced l {} r {}", __func__, *l, *r);
return l->make_balanced(oc, r).si_then([liter=liter, riter=riter,
l=l, r=r, oc, this] (auto tuple) {
l=l, r=r, oc, this, px] (auto tuple) {
logger().debug("OMapInnerNode::merge_entry to update parent: {}", *px);
auto [replacement_l, replacement_r, replacement_pivot] = tuple;
//update operation will not cuase node overflow, so we can do it first
journal_inner_update(liter, replacement_l->get_laddr(), maybe_get_delta_buffer());

View File

@ -108,10 +108,11 @@ struct OMapInnerNode
}
ceph::bufferlist get_delta() final {
assert(!delta_buffer.empty());
ceph::bufferlist bl;
encode(delta_buffer, bl);
delta_buffer.clear();
if (!delta_buffer.empty()) {
encode(delta_buffer, bl);
delta_buffer.clear();
}
return bl;
}
@ -205,10 +206,11 @@ struct OMapLeafNode
}
ceph::bufferlist get_delta() final {
assert(!delta_buffer.empty());
ceph::bufferlist bl;
encode(delta_buffer, bl);
delta_buffer.clear();
if (!delta_buffer.empty()) {
encode(delta_buffer, bl);
delta_buffer.clear();
}
return bl;
}

View File

@ -0,0 +1,336 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
// vim: ts=8 sw=2 smarttab expandtab
#include "crimson/os/seastore/segment_manager.h"
#include "crimson/os/seastore/scanner.h"
#include "crimson/common/log.h"
namespace {
seastar::logger& logger() {
return crimson::get_logger(ceph_subsys_seastore);
}
}
namespace crimson::os::seastore {
Scanner::read_segment_header_ret
Scanner::read_segment_header(segment_id_t segment)
{
return segment_manager.read(
paddr_t{segment, 0},
segment_manager.get_block_size()
).handle_error(
read_segment_header_ertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error in Scanner::read_segment_header"
}
).safe_then([=](bufferptr bptr) -> read_segment_header_ret {
logger().debug("segment {} bptr size {}", segment, bptr.length());
segment_header_t header;
bufferlist bl;
bl.push_back(bptr);
logger().debug(
"Scanner::read_segment_header: segment {} block crc {}",
segment,
bl.begin().crc32c(segment_manager.get_block_size(), 0));
auto bp = bl.cbegin();
try {
decode(header, bp);
} catch (ceph::buffer::error &e) {
logger().debug(
"Scanner::read_segment_header: segment {} unable to decode "
"header, skipping",
segment);
return crimson::ct_error::enodata::make();
}
logger().debug(
"Scanner::read_segment_header: segment {} header {}",
segment,
header);
return read_segment_header_ret(
read_segment_header_ertr::ready_future_marker{},
header);
});
}
Scanner::scan_extents_ret Scanner::scan_extents(
scan_extents_cursor &cursor,
extent_len_t bytes_to_read)
{
auto ret = std::make_unique<scan_extents_ret_bare>();
auto* extents = ret.get();
return read_segment_header(cursor.get_offset().segment
).handle_error(
scan_extents_ertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error in Scanner::scan_extents"
}
).safe_then([bytes_to_read, extents, &cursor, this](auto segment_header) {
auto segment_nonce = segment_header.segment_nonce;
return seastar::do_with(
found_record_handler_t(
[extents, this](
paddr_t base,
const record_header_t &header,
const bufferlist &mdbuf) mutable {
auto infos = try_decode_extent_infos(
header,
mdbuf);
if (!infos) {
// This should be impossible, we did check the crc on the mdbuf
logger().error(
"Scanner::scan_extents unable to decode extents for record {}",
base);
assert(infos);
}
paddr_t extent_offset = base.add_offset(header.mdlength);
for (const auto &i : *infos) {
extents->emplace_back(extent_offset, i);
extent_offset.offset += i.len;
}
return scan_extents_ertr::now();
}),
[=, &cursor](auto &dhandler) {
return scan_valid_records(
cursor,
segment_nonce,
bytes_to_read,
dhandler).discard_result();
});
}).safe_then([ret=std::move(ret)] {
return std::move(*ret);
});
}
Scanner::scan_valid_records_ret Scanner::scan_valid_records(
scan_valid_records_cursor &cursor,
segment_nonce_t nonce,
size_t budget,
found_record_handler_t &handler)
{
if (cursor.offset.offset == 0) {
cursor.offset.offset = segment_manager.get_block_size();
}
auto retref = std::make_unique<size_t>(0);
auto budget_used = *retref;
return crimson::repeat(
[=, &cursor, &budget_used, &handler]() mutable
-> scan_valid_records_ertr::future<seastar::stop_iteration> {
return [=, &handler, &cursor, &budget_used] {
if (!cursor.last_valid_header_found) {
return read_validate_record_metadata(cursor.offset, nonce
).safe_then([=, &cursor](auto md) {
logger().debug(
"Scanner::scan_valid_records: read complete {}",
cursor.offset);
if (!md) {
logger().debug(
"Scanner::scan_valid_records: found invalid header at {}, presumably at end",
cursor.offset);
cursor.last_valid_header_found = true;
return scan_valid_records_ertr::now();
} else {
logger().debug(
"Scanner::scan_valid_records: valid record read at {}",
cursor.offset);
cursor.last_committed = paddr_t{
cursor.offset.segment,
md->first.committed_to};
cursor.pending_records.emplace_back(
cursor.offset,
md->first,
md->second);
cursor.offset.offset +=
md->first.dlength + md->first.mdlength;
return scan_valid_records_ertr::now();
}
}).safe_then([=, &cursor, &budget_used, &handler] {
return crimson::repeat(
[=, &budget_used, &cursor, &handler] {
logger().debug(
"Scanner::scan_valid_records: valid record read, processing queue");
if (cursor.pending_records.empty()) {
/* This is only possible if the segment is empty.
* A record's last_commited must be prior to its own
* location since it itself cannot yet have been committed
* at its own time of submission. Thus, the most recently
* read record must always fall after cursor.last_committed */
return scan_valid_records_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::yes);
}
auto &next = cursor.pending_records.front();
if (next.offset > cursor.last_committed) {
return scan_valid_records_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::yes);
}
budget_used +=
next.header.dlength + next.header.mdlength;
return handler(
next.offset,
next.header,
next.mdbuffer
).safe_then([&cursor] {
cursor.pending_records.pop_front();
return scan_valid_records_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::no);
});
});
});
} else {
assert(!cursor.pending_records.empty());
auto &next = cursor.pending_records.front();
return read_validate_data(next.offset, next.header
).safe_then([=, &budget_used, &next, &cursor, &handler](auto valid) {
if (!valid) {
cursor.pending_records.clear();
return scan_valid_records_ertr::now();
}
budget_used +=
next.header.dlength + next.header.mdlength;
return handler(
next.offset,
next.header,
next.mdbuffer
).safe_then([&cursor] {
cursor.pending_records.pop_front();
return scan_valid_records_ertr::now();
});
});
}
}().safe_then([=, &budget_used, &cursor] {
if (cursor.is_complete() || budget_used >= budget) {
return seastar::stop_iteration::yes;
} else {
return seastar::stop_iteration::no;
}
});
}).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret {
return scan_valid_records_ret(
scan_valid_records_ertr::ready_future_marker{},
std::move(*retref));
});
}
Scanner::read_validate_record_metadata_ret
Scanner::read_validate_record_metadata(
paddr_t start,
segment_nonce_t nonce)
{
auto block_size = segment_manager.get_block_size();
if (start.offset + block_size > (int64_t)segment_manager.get_segment_size()) {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::nullopt);
}
return segment_manager.read(start, block_size
).safe_then(
[=](bufferptr bptr) mutable
-> read_validate_record_metadata_ret {
logger().debug("read_validate_record_metadata: reading {}", start);
auto block_size = segment_manager.get_block_size();
bufferlist bl;
bl.append(bptr);
auto bp = bl.cbegin();
record_header_t header;
try {
decode(header, bp);
} catch (ceph::buffer::error &e) {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::nullopt);
}
if (header.segment_nonce != nonce) {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::nullopt);
}
if (header.mdlength > (extent_len_t)block_size) {
if (start.offset + header.mdlength >
(int64_t)segment_manager.get_segment_size()) {
return crimson::ct_error::input_output_error::make();
}
return segment_manager.read(
{start.segment, start.offset + (segment_off_t)block_size},
header.mdlength - block_size).safe_then(
[header=std::move(header), bl=std::move(bl)](
auto &&bptail) mutable {
bl.push_back(bptail);
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::make_pair(std::move(header), std::move(bl)));
});
} else {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::make_pair(std::move(header), std::move(bl))
);
}
}).safe_then([=](auto p) {
if (p && validate_metadata(p->second)) {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::move(*p)
);
} else {
return read_validate_record_metadata_ret(
read_validate_record_metadata_ertr::ready_future_marker{},
std::nullopt);
}
});
}
std::optional<std::vector<extent_info_t>>
Scanner::try_decode_extent_infos(
record_header_t header,
const bufferlist &bl)
{
auto bliter = bl.cbegin();
bliter += ceph::encoded_sizeof_bounded<record_header_t>();
bliter += sizeof(checksum_t) /* crc */;
logger().debug("{}: decoding {} extents", __func__, header.extents);
std::vector<extent_info_t> extent_infos(header.extents);
for (auto &&i : extent_infos) {
try {
decode(i, bliter);
} catch (ceph::buffer::error &e) {
return std::nullopt;
}
}
return extent_infos;
}
Scanner::read_validate_data_ret
Scanner::read_validate_data(
paddr_t record_base,
const record_header_t &header)
{
return segment_manager.read(
record_base.add_offset(header.mdlength),
header.dlength
).safe_then([=, &header](auto bptr) {
bufferlist bl;
bl.append(bptr);
return bl.crc32c(-1) == header.data_crc;
});
}
bool Scanner::validate_metadata(const bufferlist &bl)
{
auto bliter = bl.cbegin();
auto test_crc = bliter.crc32c(
ceph::encoded_sizeof_bounded<record_header_t>(),
-1);
ceph_le32 recorded_crc_le;
decode(recorded_crc_le, bliter);
uint32_t recorded_crc = recorded_crc_le;
test_crc = bliter.crc32c(
bliter.get_remaining(),
test_crc);
return test_crc == recorded_crc;
}
} // namespace crimson::os::seastore

View File

@ -0,0 +1,102 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
// vim: ts=8 sw=2 smarttab expandtab
#pragma once
#include "crimson/common/errorator.h"
#include "crimson/os/seastore/seastore_types.h"
#include "crimson/os/seastore/segment_manager.h"
namespace crimson::os::seastore {
class SegmentCleaner;
class Scanner {
public:
using read_ertr = crimson::errorator<
crimson::ct_error::input_output_error,
crimson::ct_error::invarg,
crimson::ct_error::enoent,
crimson::ct_error::erange>;
Scanner(SegmentManager& segment_manager)
: segment_manager(segment_manager) {}
using read_segment_header_ertr = crimson::errorator<
crimson::ct_error::enoent,
crimson::ct_error::enodata,
crimson::ct_error::input_output_error
>;
using read_segment_header_ret = read_segment_header_ertr::future<
segment_header_t>;
read_segment_header_ret read_segment_header(segment_id_t segment);
/**
* scan_extents
*
* Scans records beginning at addr until the first record boundary after
* addr + bytes_to_read.
*
* Returns list<extent, extent_info>
* cursor.is_complete() will be true when no further extents exist in segment.
*/
using scan_extents_cursor = scan_valid_records_cursor;
using scan_extents_ertr = read_ertr::extend<crimson::ct_error::enodata>;
using scan_extents_ret_bare = std::list<std::pair<paddr_t, extent_info_t>>;
using scan_extents_ret = scan_extents_ertr::future<scan_extents_ret_bare>;
scan_extents_ret scan_extents(
scan_extents_cursor &cursor,
extent_len_t bytes_to_read
);
using scan_valid_records_ertr = read_ertr::extend<crimson::ct_error::enodata>;
using scan_valid_records_ret = scan_valid_records_ertr::future<
size_t>;
using found_record_handler_t = std::function<
scan_valid_records_ertr::future<>(
paddr_t record_block_base,
// callee may assume header and bl will remain valid until
// returned future resolves
const record_header_t &header,
const bufferlist &bl)>;
scan_valid_records_ret scan_valid_records(
scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
segment_nonce_t nonce, ///< [in] nonce for segment
size_t budget, ///< [in] max budget to use
found_record_handler_t &handler ///< [in] handler for records
); ///< @return used budget
private:
SegmentManager& segment_manager;
/// read record metadata for record starting at start
using read_validate_record_metadata_ertr = read_ertr;
using read_validate_record_metadata_ret =
read_validate_record_metadata_ertr::future<
std::optional<std::pair<record_header_t, bufferlist>>
>;
read_validate_record_metadata_ret read_validate_record_metadata(
paddr_t start,
segment_nonce_t nonce);
/// attempts to decode extent infos from bl, return nullopt if unsuccessful
std::optional<std::vector<extent_info_t>> try_decode_extent_infos(
record_header_t header,
const bufferlist &bl);
/// read and validate data
using read_validate_data_ertr = read_ertr;
using read_validate_data_ret = read_validate_data_ertr::future<bool>;
read_validate_data_ret read_validate_data(
paddr_t record_base,
const record_header_t &header ///< caller must ensure lifetime through
/// future resolution
);
/// validate embedded metadata checksum
static bool validate_metadata(const bufferlist &bl);
};
using ScannerRef = std::unique_ptr<Scanner>;
} // namespace crimson::os::seastore

View File

@ -119,15 +119,17 @@ seastar::future<> SeaStore::mkfs(uuid_d new_osd_fsid)
}).safe_then([this] {
return transaction_manager->mount();
}).safe_then([this] {
return transaction_manager->with_transaction_intr(
Transaction::src_t::MUTATE, [this](auto& t) {
return onode_manager->mkfs(t
).si_then([this, &t] {
return collection_manager->mkfs(t);
}).si_then([this, &t](auto coll_root) {
transaction_manager->write_collection_root(
t, coll_root);
return transaction_manager->submit_transaction(t);
return repeat_eagain([this] {
return transaction_manager->with_transaction_intr(
Transaction::src_t::MUTATE, [this](auto& t) {
return onode_manager->mkfs(t
).si_then([this, &t] {
return collection_manager->mkfs(t);
}).si_then([this, &t](auto coll_root) {
transaction_manager->write_collection_root(
t, coll_root);
return transaction_manager->submit_transaction(t);
});
});
});
}).safe_then([this] {
@ -1173,14 +1175,28 @@ std::unique_ptr<SeaStore> make_seastore(
segment_manager::block::BlockSegmentManager
>(device + "/block");
auto scanner = std::make_unique<Scanner>(*sm);
auto& scanner_ref = *scanner.get();
auto segment_cleaner = std::make_unique<SegmentCleaner>(
SegmentCleaner::config_t::get_default(),
std::move(scanner),
false /* detailed */);
auto journal = std::make_unique<Journal>(*sm);
auto journal = std::make_unique<Journal>(*sm, scanner_ref);
auto cache = std::make_unique<Cache>(*sm);
auto lba_manager = lba_manager::create_lba_manager(*sm, *cache);
auto epm = std::make_unique<ExtentPlacementManager>(*cache, *lba_manager);
epm->add_allocator(
device_type_t::SEGMENTED,
std::make_unique<SegmentedAllocator>(
*segment_cleaner,
*sm,
*lba_manager,
*journal,
*cache));
journal->set_segment_provider(&*segment_cleaner);
auto tm = std::make_unique<TransactionManager>(
@ -1188,7 +1204,8 @@ std::unique_ptr<SeaStore> make_seastore(
std::move(segment_cleaner),
std::move(journal),
std::move(cache),
std::move(lba_manager));
std::move(lba_manager),
std::move(epm));
auto cm = std::make_unique<collection_manager::FlatCollectionManager>(*tm);
return std::make_unique<SeaStore>(

View File

@ -17,6 +17,8 @@ std::ostream &segment_to_stream(std::ostream &out, const segment_id_t &t)
return out << "ZERO_SEG";
else if (t == FAKE_SEG_ID)
return out << "FAKE_SEG";
else if (t == DELAYED_TEMP_SEG_ID)
return out << "DELAYED_TEMP_SEG";
else
return out << t;
}
@ -110,4 +112,94 @@ std::ostream &operator<<(std::ostream &lhs, const delta_info_t &rhs)
<< ")";
}
extent_len_t get_encoded_record_raw_mdlength(
const record_t &record,
size_t block_size) {
extent_len_t metadata =
(extent_len_t)ceph::encoded_sizeof_bounded<record_header_t>();
metadata += sizeof(checksum_t) /* crc */;
metadata += record.extents.size() *
ceph::encoded_sizeof_bounded<extent_info_t>();
for (const auto &i: record.deltas) {
metadata += ceph::encoded_sizeof(i);
}
return metadata;
}
record_size_t get_encoded_record_length(
const record_t &record,
size_t block_size) {
extent_len_t metadata =
get_encoded_record_raw_mdlength(record, block_size);
extent_len_t data = 0;
for (const auto &i: record.extents) {
data += i.bl.length();
}
metadata = p2roundup(metadata, (extent_len_t)block_size);
return record_size_t{metadata, data};
}
ceph::bufferlist encode_record(
record_size_t rsize,
record_t &&record,
size_t block_size,
segment_off_t committed_to,
segment_nonce_t current_segment_nonce)
{
bufferlist data_bl;
for (auto &i: record.extents) {
data_bl.append(i.bl);
}
bufferlist bl;
record_header_t header{
rsize.mdlength,
rsize.dlength,
(uint32_t)record.deltas.size(),
(uint32_t)record.extents.size(),
current_segment_nonce,
committed_to,
data_bl.crc32c(-1)
};
encode(header, bl);
auto metadata_crc_filler = bl.append_hole(sizeof(uint32_t));
for (const auto &i: record.extents) {
encode(extent_info_t(i), bl);
}
for (const auto &i: record.deltas) {
encode(i, bl);
}
if (bl.length() % block_size != 0) {
bl.append_zero(
block_size - (bl.length() % block_size));
}
ceph_assert(bl.length() == rsize.mdlength);
auto bliter = bl.cbegin();
auto metadata_crc = bliter.crc32c(
ceph::encoded_sizeof_bounded<record_header_t>(),
-1);
bliter += sizeof(checksum_t); /* crc hole again */
metadata_crc = bliter.crc32c(
bliter.get_remaining(),
metadata_crc);
ceph_le32 metadata_crc_le;
metadata_crc_le = metadata_crc;
metadata_crc_filler.copy_in(
sizeof(checksum_t),
reinterpret_cast<const char *>(&metadata_crc_le));
bl.claim_append(data_bl);
ceph_assert(bl.length() == (rsize.dlength + rsize.mdlength));
return bl;
}
bool need_delayed_allocation(device_type_t type) {
return type <= RANDOM_BLOCK;
}
}

View File

@ -55,6 +55,8 @@ constexpr segment_id_t FAKE_SEG_ID =
*/
constexpr segment_id_t ZERO_SEG_ID =
std::numeric_limits<segment_id_t>::max() - 5;
constexpr segment_id_t DELAYED_TEMP_SEG_ID =
std::numeric_limits<segment_id_t>::max() - 6;
std::ostream &segment_to_stream(std::ostream &, const segment_id_t &t);
@ -215,6 +217,9 @@ constexpr paddr_t make_fake_paddr(segment_off_t off) {
constexpr paddr_t zero_paddr() {
return paddr_t{ZERO_SEG_ID, 0};
}
constexpr paddr_t delayed_temp_paddr(segment_off_t off) {
return paddr_t{DELAYED_TEMP_SEG_ID, off};
}
struct __attribute((packed)) paddr_le_t {
ceph_le32 segment = ceph_le32(NULL_SEG_ID);
@ -238,6 +243,21 @@ using objaddr_t = uint32_t;
constexpr objaddr_t OBJ_ADDR_MAX = std::numeric_limits<objaddr_t>::max();
constexpr objaddr_t OBJ_ADDR_NULL = OBJ_ADDR_MAX - 1;
enum class ool_placement_hint_t {
NONE, /// Denotes empty hint
NUM_HINTS /// Constant for number of hints
};
enum device_type_t {
NONE = 0,
SEGMENTED, // i.e. Hard_Disk, SATA_SSD, NAND_NVME
RANDOM_BLOCK, // i.e. RANDOM_BD
PMEM, // i.e. NVDIMM, PMEM
NUM_TYPES
};
bool need_delayed_allocation(device_type_t type);
/* Monotonically increasing identifier for the location of a
* journal_record.
*/
@ -721,9 +741,148 @@ struct rbm_alloc_delta_t {
op_types_t op;
};
struct extent_info_t {
extent_types_t type = extent_types_t::NONE;
laddr_t addr = L_ADDR_NULL;
extent_len_t len = 0;
extent_info_t() = default;
extent_info_t(const extent_t &et)
: type(et.type), addr(et.addr), len(et.bl.length()) {}
DENC(extent_info_t, v, p) {
DENC_START(1, 1, p);
denc(v.type, p);
denc(v.addr, p);
denc(v.len, p);
DENC_FINISH(p);
}
};
using segment_nonce_t = uint32_t;
/**
* Segment header
*
* Every segment contains and encode segment_header_t in the first block.
* Our strategy for finding the journal replay point is:
* 1) Find the segment with the highest journal_segment_seq
* 2) Replay starting at record located at that segment's journal_tail
*/
struct segment_header_t {
segment_seq_t journal_segment_seq;
segment_id_t physical_segment_id; // debugging
journal_seq_t journal_tail;
segment_nonce_t segment_nonce;
bool out_of_line;
DENC(segment_header_t, v, p) {
DENC_START(1, 1, p);
denc(v.journal_segment_seq, p);
denc(v.physical_segment_id, p);
denc(v.journal_tail, p);
denc(v.segment_nonce, p);
denc(v.out_of_line, p);
DENC_FINISH(p);
}
};
std::ostream &operator<<(std::ostream &out, const segment_header_t &header);
struct record_header_t {
// Fixed portion
extent_len_t mdlength; // block aligned, length of metadata
extent_len_t dlength; // block aligned, length of data
uint32_t deltas; // number of deltas
uint32_t extents; // number of extents
segment_nonce_t segment_nonce;// nonce of containing segment
segment_off_t committed_to; // records in this segment prior to committed_to
// have been fully written
checksum_t data_crc; // crc of data payload
DENC(record_header_t, v, p) {
DENC_START(1, 1, p);
denc(v.mdlength, p);
denc(v.dlength, p);
denc(v.deltas, p);
denc(v.extents, p);
denc(v.segment_nonce, p);
denc(v.committed_to, p);
denc(v.data_crc, p);
DENC_FINISH(p);
}
};
std::ostream &operator<<(std::ostream &out, const extent_info_t &header);
struct record_size_t {
extent_len_t mdlength = 0;
extent_len_t dlength = 0;
record_size_t(
extent_len_t mdlength,
extent_len_t dlength)
: mdlength(mdlength), dlength(dlength) {}
};
extent_len_t get_encoded_record_raw_mdlength(
const record_t &record,
size_t block_size);
/**
* Return <mdlength, dlength> pair denoting length of
* metadata and blocks respectively.
*/
record_size_t get_encoded_record_length(
const record_t &record,
size_t block_size);
ceph::bufferlist encode_record(
record_size_t rsize,
record_t &&record,
size_t block_size,
segment_off_t committed_to,
segment_nonce_t current_segment_nonce = 0);
/// scan segment for end incrementally
struct scan_valid_records_cursor {
bool last_valid_header_found = false;
paddr_t offset;
paddr_t last_committed;
struct found_record_t {
paddr_t offset;
record_header_t header;
bufferlist mdbuffer;
found_record_t(
paddr_t offset,
const record_header_t &header,
const bufferlist &mdbuffer)
: offset(offset), header(header), mdbuffer(mdbuffer) {}
};
std::deque<found_record_t> pending_records;
bool is_complete() const {
return last_valid_header_found && pending_records.empty();
}
paddr_t get_offset() const {
return offset;
}
scan_valid_records_cursor(
paddr_t offset)
: offset(offset) {}
};
}
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::seastore_meta_t)
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::paddr_t)
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::journal_seq_t)
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::delta_info_t)
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::record_header_t)
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::extent_info_t)
WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::segment_header_t)

View File

@ -144,9 +144,13 @@ void SpaceTrackerDetailed::dump_usage(segment_id_t id) const
segment_usage[id].dump_usage(block_size);
}
SegmentCleaner::SegmentCleaner(config_t config, bool detailed)
SegmentCleaner::SegmentCleaner(
config_t config,
ScannerRef&& scr,
bool detailed)
: detailed(detailed),
config(config),
scanner(std::move(scr)),
gc_process(*this)
{
register_metrics();
@ -181,9 +185,10 @@ SegmentCleaner::get_segment_ret SegmentCleaner::get_segment()
void SegmentCleaner::update_journal_tail_target(journal_seq_t target)
{
logger().debug(
"{}: {}",
"{}: {}, current tail target {}",
__func__,
target);
target,
journal_tail_target);
assert(journal_tail_target == journal_seq_t() || target >= journal_tail_target);
if (journal_tail_target == journal_seq_t() || target > journal_tail_target) {
journal_tail_target = target;
@ -304,7 +309,7 @@ SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space()
}
next.offset = 0;
scan_cursor =
std::make_unique<ExtentCallbackInterface::scan_extents_cursor>(
std::make_unique<Scanner::scan_extents_cursor>(
next);
logger().debug(
"SegmentCleaner::do_gc: starting gc on segment {}",
@ -313,7 +318,7 @@ SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space()
ceph_assert(!scan_cursor->is_complete());
}
return ecb->scan_extents(
return scanner->scan_extents(
*scan_cursor,
config.reclaim_bytes_stride
).safe_then([this](auto &&_extents) {
@ -372,4 +377,42 @@ SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space()
});
}
SegmentCleaner::init_segments_ret SegmentCleaner::init_segments() {
return seastar::do_with(
std::vector<std::pair<segment_id_t, segment_header_t>>(),
[this](auto& segments) {
return crimson::do_for_each(
boost::make_counting_iterator(segment_id_t{0}),
boost::make_counting_iterator(segment_id_t{num_segments}),
[this, &segments](auto segment_id) {
return scanner->read_segment_header(segment_id)
.safe_then([&segments, segment_id, this](auto header) {
if (header.out_of_line) {
logger().debug("Scanner::init_segments: out-of-line segment {}", segment_id);
init_mark_segment_closed(
segment_id,
header.journal_segment_seq,
true);
} else {
logger().debug("Scanner::init_segments: journal segment {}", segment_id);
segments.emplace_back(std::make_pair(segment_id, std::move(header)));
}
return seastar::now();
}).handle_error(
crimson::ct_error::enoent::handle([](auto) {
return init_segments_ertr::now();
}),
crimson::ct_error::enodata::handle([](auto) {
return init_segments_ertr::now();
}),
crimson::ct_error::input_output_error::pass_further{}
);
}).safe_then([&segments] {
return seastar::make_ready_future<
std::vector<std::pair<segment_id_t, segment_header_t>>>(
std::move(segments));
});
});
}
}

View File

@ -25,8 +25,11 @@ struct segment_info_t {
segment_seq_t journal_segment_seq = NULL_SEG_SEQ;
bool out_of_line = false;
bool is_in_journal(journal_seq_t tail_committed) const {
return journal_segment_seq != NULL_SEG_SEQ &&
return !out_of_line &&
journal_segment_seq != NULL_SEG_SEQ &&
tail_committed.segment_seq <= journal_segment_seq;
}
@ -43,6 +46,35 @@ struct segment_info_t {
}
};
/**
* Callback interface for managing available segments
*/
class SegmentProvider {
public:
using get_segment_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
using get_segment_ret = get_segment_ertr::future<segment_id_t>;
virtual get_segment_ret get_segment() = 0;
virtual void close_segment(segment_id_t) {}
virtual void set_journal_segment(
segment_id_t segment,
segment_seq_t seq) {}
virtual journal_seq_t get_journal_tail_target() const = 0;
virtual void update_journal_tail_committed(journal_seq_t tail_committed) = 0;
virtual void init_mark_segment_closed(
segment_id_t segment,
segment_seq_t seq,
bool out_of_line) {}
virtual segment_seq_t get_seq(segment_id_t id) { return 0; }
virtual ~SegmentProvider() {}
};
class SpaceTrackerI {
public:
virtual int64_t allocate(
@ -81,7 +113,7 @@ class SpaceTrackerSimple : public SpaceTrackerI {
return live_bytes_by_segment[segment];
}
public:
SpaceTrackerSimple(size_t num_segments)
SpaceTrackerSimple(segment_id_t num_segments)
: live_bytes_by_segment(num_segments, 0) {}
int64_t allocate(
@ -163,7 +195,7 @@ class SpaceTrackerDetailed : public SpaceTrackerI {
std::vector<SegmentMap> segment_usage;
public:
SpaceTrackerDetailed(size_t num_segments, size_t segment_size, size_t block_size)
SpaceTrackerDetailed(segment_id_t num_segments, size_t segment_size, size_t block_size)
: block_size(block_size),
segment_size(segment_size),
segment_usage(num_segments, segment_size / block_size) {}
@ -208,7 +240,7 @@ public:
};
class SegmentCleaner : public JournalSegmentProvider {
class SegmentCleaner : public SegmentProvider {
public:
/// Config
struct config_t {
@ -318,18 +350,6 @@ public:
laddr_t laddr,
segment_off_t len) = 0;
/**
* scan_extents
*
* Interface shim for Journal::scan_extents
*/
using scan_extents_cursor = Journal::scan_valid_records_cursor;
using scan_extents_ertr = Journal::scan_extents_ertr;
using scan_extents_ret = Journal::scan_extents_ret;
virtual scan_extents_ret scan_extents(
scan_extents_cursor &cursor,
extent_len_t bytes_to_read) = 0;
/**
* release_segment
*
@ -359,10 +379,12 @@ private:
const bool detailed;
const config_t config;
size_t num_segments = 0;
segment_id_t num_segments = 0;
size_t segment_size = 0;
size_t block_size = 0;
ScannerRef scanner;
SpaceTrackerIRef space_tracker;
std::vector<segment_info_t> segments;
size_t empty_segments;
@ -390,7 +412,10 @@ private:
std::optional<seastar::promise<>> blocked_io_wake;
public:
SegmentCleaner(config_t config, bool detailed = false);
SegmentCleaner(
config_t config,
ScannerRef&& scanner,
bool detailed = false);
void mount(SegmentManager &sm) {
init_complete = false;
@ -417,6 +442,13 @@ public:
empty_segments = num_segments;
}
using init_segments_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
using init_segments_ret_bare =
std::vector<std::pair<segment_id_t, segment_header_t>>;
using init_segments_ret = init_segments_ertr::future<init_segments_ret_bare>;
init_segments_ret init_segments();
get_segment_ret get_segment() final;
void close_segment(segment_id_t segment) final;
@ -456,13 +488,18 @@ public:
return journal_head;
}
void init_mark_segment_closed(segment_id_t segment, segment_seq_t seq) final {
void init_mark_segment_closed(
segment_id_t segment,
segment_seq_t seq,
bool out_of_line) final
{
crimson::get_logger(ceph_subsys_seastore).debug(
"SegmentCleaner::init_mark_segment_closed: segment {}, seq {}",
segment,
seq);
mark_closed(segment);
segments[segment].journal_segment_seq = seq;
segments[segment].out_of_line = out_of_line;
}
segment_seq_t get_seq(segment_id_t id) final {
@ -606,7 +643,7 @@ private:
// GC status helpers
std::unique_ptr<
ExtentCallbackInterface::scan_extents_cursor
Scanner::scan_extents_cursor
> scan_cursor;
/**
@ -684,7 +721,7 @@ private:
} gc_process;
using gc_ertr = work_ertr::extend_ertr<
ExtentCallbackInterface::scan_extents_ertr
Scanner::scan_extents_ertr
>;
gc_cycle_ret do_gc_cycle();
@ -812,6 +849,7 @@ private:
"gc_should_reclaim_space {}, "
"journal_head {}, "
"journal_tail_target {}, "
"journal_tail_commit {}, "
"dirty_tail {}, "
"dirty_tail_limit {}, "
"gc_should_trim_journal {}, ",
@ -827,6 +865,7 @@ private:
gc_should_reclaim_space(),
journal_head,
journal_tail_target,
journal_tail_committed,
get_dirty_tail(),
get_dirty_tail_limit(),
gc_should_trim_journal()

View File

@ -7,6 +7,7 @@
#include <boost/intrusive/list.hpp>
#include "crimson/common/log.h"
#include "crimson/os/seastore/ordering_handle.h"
#include "crimson/os/seastore/seastore_types.h"
#include "crimson/os/seastore/cached_extent.h"
@ -23,6 +24,12 @@ class Transaction;
* Representation of in-progress mutation. Used exclusively through Cache methods.
*/
class Transaction {
struct cached_extent_disposer {
void operator() (CachedExtent* extent) {
extent->parent_index = nullptr;
extent->state = CachedExtent::extent_state_t::INVALID;
}
};
public:
using Ref = std::unique_ptr<Transaction>;
using on_destruct_func_t = std::function<void(Transaction&)>;
@ -39,6 +46,11 @@ public:
if (out)
*out = CachedExtentRef(&*iter);
return get_extent_ret::PRESENT;
} else if (auto iter = delayed_set.find_offset(addr);
iter != delayed_set.end()) {
if (out)
*out = CachedExtentRef(&*iter);
return get_extent_ret::PRESENT;
} else if (
auto iter = read_set.find(addr);
iter != read_set.end()) {
@ -60,7 +72,12 @@ public:
// will affect relative paddrs, and it should be rare to retire a fresh
// extent.
ref->state = CachedExtent::extent_state_t::INVALID;
write_set.erase(*ref);
if (ref->is_inline()) {
write_set.erase(*ref);
} else {
// if ref is not relative, it must be in the delayed set
delayed_set.erase(*ref);
}
} else if (ref->is_mutation_pending()) {
ref->state = CachedExtent::extent_state_t::INVALID;
write_set.erase(*ref);
@ -83,11 +100,37 @@ public:
ceph_assert(inserted);
}
void add_fresh_extent(CachedExtentRef ref) {
void add_fresh_extent(
CachedExtentRef ref,
bool delayed = false) {
ceph_assert(!is_weak());
fresh_block_list.push_back(ref);
if (delayed) {
assert(ref->is_logical());
ref->set_paddr(delayed_temp_paddr(delayed_temp_offset));
delayed_temp_offset += ref->get_length();
delayed_alloc_list.emplace_back(ref->cast<LogicalCachedExtent>());
delayed_set.insert(*ref);
} else {
ref->set_paddr(make_record_relative_paddr(offset));
offset += ref->get_length();
fresh_block_list.push_back(ref);
write_set.insert(*ref);
}
}
void mark_delayed_extent_inline(LogicalCachedExtentRef& ref) {
ref->set_paddr(make_record_relative_paddr(offset));
offset += ref->get_length();
delayed_set.erase(*ref);
fresh_block_list.push_back(ref);
write_set.insert(*ref);
}
void mark_delayed_extent_ool(LogicalCachedExtentRef& ref) {
assert(!ref->get_paddr().is_null());
assert(!ref->is_inline());
delayed_set.erase(*ref);
fresh_block_list.push_back(ref);
write_set.insert(*ref);
}
@ -133,6 +176,10 @@ public:
return fresh_block_list;
}
auto& get_delayed_alloc_list() {
return delayed_alloc_list;
}
const auto &get_mutated_block_list() {
return mutated_block_list;
}
@ -183,11 +230,8 @@ public:
~Transaction() {
on_destruct(*this);
for (auto i = write_set.begin();
i != write_set.end();) {
i->state = CachedExtent::extent_state_t::INVALID;
write_set.erase(*i++);
}
write_set.clear_and_dispose(cached_extent_disposer());
delayed_set.clear_and_dispose(cached_extent_disposer());
}
friend class crimson::os::seastore::SeaStore;
@ -196,10 +240,13 @@ public:
void reset_preserve_handle(journal_seq_t initiated_after) {
root.reset();
offset = 0;
delayed_temp_offset = 0;
read_set.clear();
write_set.clear();
write_set.clear_and_dispose(cached_extent_disposer());
delayed_set.clear_and_dispose(cached_extent_disposer());
fresh_block_list.clear();
mutated_block_list.clear();
delayed_alloc_list.clear();
retired_set.clear();
onode_tree_stats = {};
lba_tree_stats = {};
@ -245,12 +292,18 @@ private:
RootBlockRef root; ///< ref to root if read or written by transaction
segment_off_t offset = 0; ///< relative offset of next block
segment_off_t delayed_temp_offset = 0;
read_set_t<Transaction> read_set; ///< set of extents read by paddr
ExtentIndex write_set; ///< set of extents written by paddr
ExtentIndex delayed_set; ///< set of extents whose paddr
/// allocation are delayed
std::list<CachedExtentRef> fresh_block_list; ///< list of fresh blocks
std::list<CachedExtentRef> mutated_block_list; ///< list of mutated blocks
///< list of ool extents whose addresses are not
// determine until transaction submission
std::list<LogicalCachedExtentRef> delayed_alloc_list;
pextent_set_t retired_set; ///< list of extents mutated by this transaction

View File

@ -16,12 +16,14 @@ TransactionManager::TransactionManager(
SegmentCleanerRef _segment_cleaner,
JournalRef _journal,
CacheRef _cache,
LBAManagerRef _lba_manager)
LBAManagerRef _lba_manager,
ExtentPlacementManagerRef&& epm)
: segment_manager(_segment_manager),
segment_cleaner(std::move(_segment_cleaner)),
cache(std::move(_cache)),
lba_manager(std::move(_lba_manager)),
journal(std::move(_journal))
journal(std::move(_journal)),
epm(std::move(epm))
{
segment_cleaner->set_extent_callback(this);
journal->set_write_pipeline(&write_pipeline);
@ -63,8 +65,16 @@ TransactionManager::mount_ertr::future<> TransactionManager::mount()
LOG_PREFIX(TransactionManager::mount);
cache->init();
segment_cleaner->mount(segment_manager);
return journal->replay([this](auto seq, auto paddr, const auto &e) {
return cache->replay_delta(seq, paddr, e);
return segment_cleaner->init_segments().safe_then(
[this](auto&& segments) {
return journal->replay(
std::move(segments),
[this](auto seq, auto paddr, const auto &e) {
auto fut = cache->replay_delta(seq, paddr, e);
segment_cleaner->update_journal_tail_target(
cache->get_oldest_dirty_from().value_or(seq));
return fut;
});
}).safe_then([this] {
return journal->open_for_write();
}).safe_then([this, FNAME](auto addr) {
@ -227,10 +237,15 @@ TransactionManager::submit_transaction_direct(
{
LOG_PREFIX(TransactionManager::submit_transaction_direct);
DEBUGT("about to prepare", tref);
return trans_intr::make_interruptible(
tref.get_handle().enter(write_pipeline.prepare)
).then_interruptible([this, FNAME, &tref]() mutable
-> submit_transaction_iertr::future<> {
return epm->delayed_alloc_or_ool_write(tref)
.handle_error_interruptible(
crimson::ct_error::input_output_error::pass_further(),
crimson::ct_error::assert_all("invalid error")
).si_then([&tref, this] {
return tref.get_handle().enter(write_pipeline.prepare);
}).si_then([this, FNAME, &tref]() mutable
-> submit_transaction_iertr::future<> {
auto record = cache->prepare_record(tref);
tref.get_handle().maybe_release_collection_lock();
@ -276,6 +291,49 @@ TransactionManager::get_next_dirty_extents(
return cache->get_next_dirty_extents(t, seq, max_bytes);
}
TransactionManager::rewrite_extent_ret
TransactionManager::rewrite_logical_extent(
Transaction& t,
LogicalCachedExtentRef extent)
{
LOG_PREFIX(TransactionManager::rewrite_logical_extent);
if (extent->has_been_invalidated()) {
ERRORT("{} has been invalidated", t, *extent);
}
assert(!extent->has_been_invalidated());
DEBUGT("rewriting {}", t, *extent);
auto lextent = extent->cast<LogicalCachedExtent>();
cache->retire_extent(t, extent);
auto nlextent = epm->alloc_new_extent_by_type(
t,
lextent->get_type(),
lextent->get_length())->cast<LogicalCachedExtent>();
lextent->get_bptr().copy_out(
0,
lextent->get_length(),
nlextent->get_bptr().c_str());
nlextent->set_laddr(lextent->get_laddr());
nlextent->set_pin(lextent->get_pin().duplicate());
DEBUGT(
"rewriting {} into {}",
t,
*lextent,
*nlextent);
if (need_delayed_allocation(extent->backend_type)) {
// hold old poffset for later mapping updating assert check
nlextent->set_paddr(lextent->get_paddr());
return rewrite_extent_iertr::now();
}
return lba_manager->update_mapping(
t,
lextent->get_laddr(),
lextent->get_paddr(),
nlextent->get_paddr());
}
TransactionManager::rewrite_extent_ret TransactionManager::rewrite_extent(
Transaction &t,
CachedExtentRef extent)
@ -295,7 +353,12 @@ TransactionManager::rewrite_extent_ret TransactionManager::rewrite_extent(
cache->duplicate_for_write(t, extent);
return rewrite_extent_iertr::now();
}
return lba_manager->rewrite_extent(t, extent);
if (extent->is_logical()) {
return rewrite_logical_extent(t, extent->cast<LogicalCachedExtent>());
} else {
return lba_manager->rewrite_extent(t, extent);
}
}
TransactionManager::get_extent_if_live_ret TransactionManager::get_extent_if_live(

View File

@ -27,6 +27,7 @@
#include "crimson/os/seastore/segment_manager.h"
#include "crimson/os/seastore/lba_manager.h"
#include "crimson/os/seastore/journal.h"
#include "crimson/os/seastore/extent_placement_manager.h"
namespace crimson::os::seastore {
class Journal;
@ -69,7 +70,8 @@ public:
SegmentCleanerRef segment_cleaner,
JournalRef journal,
CacheRef cache,
LBAManagerRef lba_manager);
LBAManagerRef lba_manager,
ExtentPlacementManagerRef&& epm);
/// Writes initial metadata to disk
using mkfs_ertr = base_ertr;
@ -282,7 +284,7 @@ public:
Transaction &t,
laddr_t hint,
extent_len_t len) {
auto ext = cache->alloc_new_extent<T>(
auto ext = epm->alloc_new_extent<T>(
t,
len);
return lba_manager->alloc_extent(
@ -290,10 +292,12 @@ public:
hint,
len,
ext->get_paddr()
).si_then([ext=std::move(ext), len, this](auto &&ref) mutable {
).si_then([ext=std::move(ext), len, hint, &t, this](auto &&ref) mutable {
LOG_PREFIX(TransactionManager::alloc_extent);
ext->set_pin(std::move(ref));
stats.extents_allocated_total++;
stats.extents_allocated_bytes += len;
DEBUGT("new extent: {}, hint: {}", t, *ext, hint);
return alloc_extent_iertr::make_ready_future<TCachedExtentRef<T>>(
std::move(ext));
});
@ -373,18 +377,6 @@ public:
laddr_t laddr,
segment_off_t len) final;
using scan_extents_cursor =
SegmentCleaner::ExtentCallbackInterface::scan_extents_cursor;
using scan_extents_ertr =
SegmentCleaner::ExtentCallbackInterface::scan_extents_ertr;
using scan_extents_ret =
SegmentCleaner::ExtentCallbackInterface::scan_extents_ret;
scan_extents_ret scan_extents(
scan_extents_cursor &cursor,
extent_len_t bytes_to_read) final {
return journal->scan_extents(cursor, bytes_to_read);
}
using release_segment_ret =
SegmentCleaner::ExtentCallbackInterface::release_segment_ret;
release_segment_ret release_segment(
@ -509,6 +501,7 @@ private:
CacheRef cache;
LBAManagerRef lba_manager;
JournalRef journal;
ExtentPlacementManagerRef epm;
WritePipeline write_pipeline;
@ -523,6 +516,9 @@ private:
seastar::metrics::metric_group metrics;
void register_metrics();
rewrite_extent_ret rewrite_logical_extent(
Transaction& t,
LogicalCachedExtentRef extent);
public:
// Testing interfaces
auto get_segment_cleaner() {

View File

@ -208,6 +208,7 @@ int main(int argc, char* argv[])
"This is normally used in combination with --mkfs")
("mkfs", "create a [new] data directory")
("debug", "enable debug output on all loggers")
("trace", "enable trace output on all loggers")
("no-mon-config", "do not retrieve configuration from monitors on boot")
("prometheus_port", bpo::value<uint16_t>()->default_value(0),
"Prometheus port. Set to zero to disable")
@ -247,6 +248,11 @@ int main(int argc, char* argv[])
seastar::log_level::debug
);
}
if (config.count("trace")) {
seastar::global_logger_registry().set_all_loggers_level(
seastar::log_level::trace
);
}
sharded_conf().start(init_params.name, cluster_name).get();
auto stop_conf = seastar::defer([] {
sharded_conf().stop().get();

View File

@ -127,14 +127,28 @@ seastar::future<bufferlist> TMDriver::read(
void TMDriver::init()
{
auto scanner = std::make_unique<Scanner>(*segment_manager);
auto& scanner_ref = *scanner.get();
auto segment_cleaner = std::make_unique<SegmentCleaner>(
SegmentCleaner::config_t::get_default(),
std::move(scanner),
false /* detailed */);
segment_cleaner->mount(*segment_manager);
auto journal = std::make_unique<Journal>(*segment_manager);
auto journal = std::make_unique<Journal>(*segment_manager, scanner_ref);
auto cache = std::make_unique<Cache>(*segment_manager);
auto lba_manager = lba_manager::create_lba_manager(*segment_manager, *cache);
auto epm = std::make_unique<ExtentPlacementManager>(*cache, *lba_manager);
epm->add_allocator(
device_type_t::SEGMENTED,
std::make_unique<SegmentedAllocator>(
*segment_cleaner,
*segment_manager,
*lba_manager,
*journal,
*cache));
journal->set_segment_provider(&*segment_cleaner);
tm = std::make_unique<TransactionManager>(
@ -142,7 +156,8 @@ void TMDriver::init()
std::move(segment_cleaner),
std::move(journal),
std::move(cache),
std::move(lba_manager));
std::move(lba_manager),
std::move(epm));
}
void TMDriver::clear()

View File

@ -84,16 +84,18 @@ struct fltree_onode_manager_test_t
).then([this] {
return tm->mount(
).safe_then([this] {
return seastar::do_with(
create_mutate_transaction(),
[this](auto &ref_t) {
return with_trans_intr(*ref_t, [&](auto &t) {
return manager->mkfs(t
).si_then([this, &t] {
return submit_transaction_fut2(t);
return repeat_eagain([this] {
return seastar::do_with(
create_mutate_transaction(),
[this](auto &ref_t) {
return with_trans_intr(*ref_t, [&](auto &t) {
return manager->mkfs(t
).si_then([this, &t] {
return submit_transaction_fut2(t);
});
});
});
});
});
}).safe_then([this] {
return tm->close();
}).handle_error(

View File

@ -25,8 +25,9 @@ using namespace crimson::os::seastore::lba_manager;
using namespace crimson::os::seastore::lba_manager::btree;
struct btree_lba_manager_test :
public seastar_test_suite_t, JournalSegmentProvider {
public seastar_test_suite_t, SegmentProvider {
segment_manager::EphemeralSegmentManagerRef segment_manager;
ScannerRef scanner;
Journal journal;
Cache cache;
BtreeLBAManagerRef lba_manager;
@ -37,7 +38,8 @@ struct btree_lba_manager_test :
btree_lba_manager_test()
: segment_manager(segment_manager::create_test_ephemeral()),
journal(*segment_manager),
scanner(new Scanner(*segment_manager)),
journal(*segment_manager, *scanner),
cache(*segment_manager),
lba_manager(new BtreeLBAManager(*segment_manager, cache)),
block_size(segment_manager->get_block_size())

View File

@ -6,6 +6,7 @@
#include <random>
#include "crimson/common/log.h"
#include "crimson/os/seastore/segment_cleaner.h"
#include "crimson/os/seastore/journal.h"
#include "crimson/os/seastore/segment_manager/ephemeral.h"
@ -62,7 +63,7 @@ struct record_validator_t {
}
};
struct journal_test_t : seastar_test_suite_t, JournalSegmentProvider {
struct journal_test_t : seastar_test_suite_t, SegmentProvider {
segment_manager::EphemeralSegmentManagerRef segment_manager;
WritePipeline pipeline;
std::unique_ptr<Journal> journal;
@ -73,11 +74,13 @@ struct journal_test_t : seastar_test_suite_t, JournalSegmentProvider {
const segment_off_t block_size;
ScannerRef scanner;
journal_test_t()
: segment_manager(segment_manager::create_test_ephemeral()),
block_size(segment_manager->get_block_size())
{
}
block_size(segment_manager->get_block_size()),
scanner(std::make_unique<Scanner>(*segment_manager))
{}
segment_id_t next = 0;
get_segment_ret get_segment() final {
@ -90,7 +93,7 @@ struct journal_test_t : seastar_test_suite_t, JournalSegmentProvider {
void update_journal_tail_committed(journal_seq_t paddr) final {}
seastar::future<> set_up_fut() final {
journal.reset(new Journal(*segment_manager));
journal.reset(new Journal(*segment_manager, *scanner));
journal->set_segment_provider(this);
journal->set_write_pipeline(&pipeline);
return segment_manager->init(
@ -107,12 +110,43 @@ struct journal_test_t : seastar_test_suite_t, JournalSegmentProvider {
auto replay(T &&f) {
return journal->close(
).safe_then([this, f=std::move(f)]() mutable {
journal.reset(new Journal(*segment_manager));
journal.reset(new Journal(*segment_manager, *scanner));
journal->set_segment_provider(this);
journal->set_write_pipeline(&pipeline);
return journal->replay(std::forward<T>(std::move(f)));
}).safe_then([this] {
return journal->open_for_write();
return seastar::do_with(
std::vector<std::pair<segment_id_t, segment_header_t>>(),
[this](auto& segments) {
return crimson::do_for_each(
boost::make_counting_iterator(segment_id_t{0}),
boost::make_counting_iterator(segment_manager->get_num_segments()),
[this, &segments](auto segment_id) {
return scanner->read_segment_header(segment_id)
.safe_then([&segments, segment_id](auto header) {
if (!header.out_of_line) {
segments.emplace_back(std::make_pair(segment_id, std::move(header)));
}
return seastar::now();
}).handle_error(
crimson::ct_error::enoent::handle([](auto) {
return SegmentCleaner::init_segments_ertr::now();
}),
crimson::ct_error::enodata::handle([](auto) {
return SegmentCleaner::init_segments_ertr::now();
}),
crimson::ct_error::input_output_error::pass_further{}
);
}).safe_then([&segments] {
return seastar::make_ready_future<
std::vector<std::pair<segment_id_t, segment_header_t>>>(
std::move(segments));
});
}).safe_then([this, f=std::move(f)](auto&& segments) mutable {
return journal->replay(
std::move(segments),
std::forward<T>(std::move(f)));
}).safe_then([this] {
return journal->open_for_write();
});
});
}

View File

@ -70,13 +70,27 @@ protected:
auto get_transaction_manager(
SegmentManager &segment_manager) {
auto scanner = std::make_unique<Scanner>(segment_manager);
auto& scanner_ref = *scanner.get();
auto segment_cleaner = std::make_unique<SegmentCleaner>(
SegmentCleaner::config_t::get_default(),
std::move(scanner),
true);
auto journal = std::make_unique<Journal>(segment_manager);
auto journal = std::make_unique<Journal>(segment_manager, scanner_ref);
auto cache = std::make_unique<Cache>(segment_manager);
auto lba_manager = lba_manager::create_lba_manager(segment_manager, *cache);
auto epm = std::make_unique<ExtentPlacementManager>(*cache, *lba_manager);
epm->add_allocator(
device_type_t::SEGMENTED,
std::make_unique<SegmentedAllocator>(
*segment_cleaner,
segment_manager,
*lba_manager,
*journal,
*cache));
journal->set_segment_provider(&*segment_cleaner);
return std::make_unique<TransactionManager>(
@ -84,7 +98,8 @@ auto get_transaction_manager(
std::move(segment_cleaner),
std::move(journal),
std::move(cache),
std::move(lba_manager));
std::move(lba_manager),
std::move(epm));
}
auto get_seastore(SegmentManagerRef sm) {

View File

@ -141,6 +141,7 @@ extra_conf=""
new=0
standby=0
debug=0
trace=0
ip=""
nodaemon=0
redirect=0
@ -197,6 +198,7 @@ read -r -d '' usage <<EOF || true
usage: $0 [option]... \nex: MON=3 OSD=1 MDS=1 MGR=1 RGW=1 NFS=1 $0 -n -d
options:
-d, --debug
-t, --trace
-s, --standby_mds: Generate standby-replay MDS for each active
-l, --localhost: use localhost instead of hostname
-i <ip>: bind to specific ip
@ -270,6 +272,9 @@ case $1 in
-d | --debug)
debug=1
;;
-t | --trace)
trace=1
;;
-s | --standby_mds)
standby=1
;;
@ -916,6 +921,9 @@ start_osd() {
if [ "$debug" -ne 0 ]; then
extra_seastar_args+=" --debug"
fi
if [ "$trace" -ne 0]; then
extra_seastar_args+=" --trace"
fi
fi
if [ "$new" -eq 1 -o $inc_osd_num -gt 0 ]; then
wconf <<EOF