Merge pull request #45041 from cyx1231st/wip-crimson-cleanup-epm-segment-writer

crimson/os/seastore: introduce SegmentAllocator for Journal and ExtentPlacementManager

Reviewed-by: Samuel Just <sjust@redhat.com>
This commit is contained in:
Samuel Just 2022-02-24 14:23:12 -08:00 committed by GitHub
commit 8c54b697d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 530 additions and 610 deletions

View File

@ -39,6 +39,7 @@ set(crimson_seastore_srcs
random_block_manager/nvme_manager.cc
random_block_manager/nvmedevice.cc
journal/segmented_journal.cc
journal/segment_allocator.cc
journal.cc
../../../test/crimson/seastore/test_block.cc
${PROJECT_SOURCE_DIR}/src/os/Transaction.cc

View File

@ -3,14 +3,6 @@
#include "crimson/os/seastore/extent_placement_manager.h"
#include "crimson/os/seastore/segment_cleaner.h"
namespace {
seastar::logger& logger() {
return crimson::get_logger(ceph_subsys_seastore_tm);
}
}
SET_SUBSYS(seastore_tm);
namespace crimson::os::seastore {
@ -18,18 +10,15 @@ namespace crimson::os::seastore {
SegmentedAllocator::SegmentedAllocator(
SegmentProvider& sp,
SegmentManager& sm)
: segment_provider(sp),
segment_manager(sm)
{
std::generate_n(
std::back_inserter(writers),
crimson::common::get_conf<uint64_t>(
"seastore_init_rewrite_segments_num_per_device"),
[&] {
return Writer{
segment_provider,
segment_manager};
});
return Writer{sp, sm};
}
);
}
SegmentedAllocator::Writer::write_iertr::future<>
@ -37,25 +26,20 @@ SegmentedAllocator::Writer::_write(
Transaction& t,
ool_record_t& record)
{
auto record_size = record.get_encoded_record_length();
allocated_to += record_size.get_encoded_length();
segment_provider.update_segment_avail_bytes(
paddr_t::make_seg_paddr(
current_segment->segment->get_segment_id(),
allocated_to));
bufferlist bl = record.encode(
current_segment->segment->get_segment_id(),
0);
seastar::promise<> pr;
current_segment->inflight_writes.emplace_back(pr.get_future());
LOG_PREFIX(SegmentedAllocator::Writer::_write);
record.set_base(segment_allocator.get_written_to());
auto record_size = record.get_encoded_record_length();
bufferlist bl = record.encode(
segment_allocator.get_segment_id(),
segment_allocator.get_nonce());
assert(bl.length() == record_size.get_encoded_length());
DEBUGT(
"written {} extents, {} bytes to segment {} at {}",
t,
record.get_num_extents(),
bl.length(),
current_segment->segment->get_segment_id(),
segment_allocator.get_segment_id(),
record.get_base());
// account transactional ool writes before write()
@ -67,41 +51,68 @@ SegmentedAllocator::Writer::_write(
stats.data_bytes += record_size.dlength;
stats.num_records += 1;
for (auto& ool_extent : record.get_extents()) {
auto& lextent = ool_extent.get_lextent();
auto paddr = ool_extent.get_ool_paddr();
TRACET("ool extent written at {} -- {}", t, *lextent, paddr);
lextent->hint = placement_hint_t::NUM_HINTS; // invalidate hint
t.mark_delayed_extent_ool(lextent, paddr);
}
return trans_intr::make_interruptible(
current_segment->segment->write(record.get_base(), bl).safe_then(
[this, FNAME, pr=std::move(pr), &t,
it=(--current_segment->inflight_writes.end()),
cs=current_segment]() mutable {
if (cs->outdated) {
DEBUGT("segment rolled", t);
pr.set_value();
} else{
DEBUGT("segment not rolled", t);
current_segment->inflight_writes.erase(it);
}
return seastar::now();
})
).si_then([FNAME, &record, &t] {
for (auto& ool_extent : record.get_extents()) {
auto& lextent = ool_extent.get_lextent();
auto paddr = ool_extent.get_ool_paddr();
TRACET("ool extent written at {} -- {}", t, *lextent, paddr);
lextent->hint = placement_hint_t::NUM_HINTS; // invalidate hint
t.mark_delayed_extent_ool(lextent, paddr);
}
record.clear();
});
segment_allocator.write(bl).discard_result()
);
}
void SegmentedAllocator::Writer::add_extent_to_write(
ool_record_t& record,
LogicalCachedExtentRef& extent) {
logger().debug(
"SegmentedAllocator::Writer::add_extent_to_write: "
"add extent {} to record",
extent);
extent->prepare_write();
record.add_extent(extent);
SegmentedAllocator::Writer::write_iertr::future<>
SegmentedAllocator::Writer::do_write(
Transaction& t,
std::list<LogicalCachedExtentRef>& extents)
{
LOG_PREFIX(SegmentedAllocator::Writer::do_write);
assert(!extents.empty());
if (roll_promise.has_value()) {
return trans_intr::make_interruptible(
roll_promise->get_shared_future()
).then_interruptible([this, &t, &extents] {
return do_write(t, extents);
});
}
assert(segment_allocator.can_write());
ool_record_t record(segment_allocator.get_block_size());
for (auto it = extents.begin(); it != extents.end();) {
auto& extent = *it;
auto wouldbe_length = record.get_wouldbe_encoded_record_length(extent);
if (segment_allocator.needs_roll(wouldbe_length)) {
// reached the segment end, write and roll
assert(!roll_promise.has_value());
roll_promise = seastar::shared_promise<>();
auto num_extents = record.get_num_extents();
DEBUGT("end of segment, writing {} extents", t, num_extents);
return (num_extents ?
_write(t, record) :
write_iertr::now()
).si_then([this] {
return segment_allocator.roll();
}).finally([this] {
roll_promise->set_value();
roll_promise.reset();
}).si_then([this, &t, &extents] {
if (!extents.empty()) {
return do_write(t, extents);
}
return write_iertr::now();
});
}
DEBUGT("add extent to record -- {}", t, *extent);
extent->prepare_write();
record.add_extent(extent);
it = extents.erase(it);
}
DEBUGT("writing {} extents", t, record.get_num_extents());
return _write(t, record);
}
SegmentedAllocator::Writer::write_iertr::future<>
@ -109,169 +120,24 @@ SegmentedAllocator::Writer::write(
Transaction& t,
std::list<LogicalCachedExtentRef>& extents)
{
auto write_func = [this, &extents, &t] {
return seastar::do_with(ool_record_t(segment_manager.get_block_size()),
[this, &extents, &t](auto& record) {
return trans_intr::repeat([this, &record, &t, &extents]()
-> write_iertr::future<seastar::stop_iteration> {
if (extents.empty()) {
return seastar::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::yes);
}
return segment_rotation_guard.wait(
[this] {
return !rolling_segment;
},
[this, &record, &extents, &t]() -> write_iertr::future<> {
LOG_PREFIX(SegmentedAllocator::Writer::write);
record.set_base(allocated_to);
for (auto it = extents.begin();
it != extents.end();) {
auto& extent = *it;
auto wouldbe_length =
record.get_wouldbe_encoded_record_length(extent);
if (_needs_roll(wouldbe_length)) {
// reached the segment end, write and roll
assert(!rolling_segment);
rolling_segment = true;
auto num_extents = record.get_num_extents();
DEBUGT(
"end of segment, writing {} extents to segment {} at {}",
t,
num_extents,
current_segment->segment->get_segment_id(),
allocated_to);
return (num_extents ?
_write(t, record) :
write_iertr::now()
).si_then([this]() mutable {
return roll_segment(false);
}).finally([this] {
rolling_segment = false;
segment_rotation_guard.broadcast();
});
}
add_extent_to_write(record, extent);
it = extents.erase(it);
}
DEBUGT(
"writing {} extents to segment {} at {}",
t,
record.get_num_extents(),
current_segment->segment->get_segment_id(),
allocated_to);
return _write(t, record);
}
).si_then([]()
-> write_iertr::future<seastar::stop_iteration> {
return seastar::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::no);
});
if (extents.empty()) {
return write_iertr::now();
}
return seastar::with_gate(write_guard, [this, &t, &extents] {
if (!roll_promise.has_value() &&
!segment_allocator.can_write()) {
roll_promise = seastar::shared_promise<>();
return trans_intr::make_interruptible(
segment_allocator.open().discard_result()
).finally([this] {
roll_promise->set_value();
roll_promise.reset();
}).si_then([this, &t, &extents] {
return do_write(t, extents);
});
});
};
if (rolling_segment) {
return segment_rotation_guard.wait([this] {
return !rolling_segment;
}, std::move(write_func));
} else if (!current_segment) {
return trans_intr::make_interruptible(roll_segment(true)).si_then(
[write_func=std::move(write_func)] {
return write_func();
});
}
return write_func();
}
bool SegmentedAllocator::Writer::_needs_roll(seastore_off_t length) const {
return allocated_to + length > current_segment->segment->get_write_capacity();
}
SegmentedAllocator::Writer::init_segment_ertr::future<>
SegmentedAllocator::Writer::init_segment(Segment& segment) {
bufferptr bp(
ceph::buffer::create_page_aligned(
segment_manager.get_block_size()));
bp.zero();
auto header =segment_header_t{
OOL_SEG_SEQ,
segment.get_segment_id(),
NO_DELTAS, 0};
logger().debug("SegmentedAllocator::Writer::init_segment: initting {}, {}",
segment.get_segment_id(),
header);
ceph::bufferlist bl;
encode(header, bl);
bl.cbegin().copy(bl.length(), bp.c_str());
bl.clear();
bl.append(bp);
allocated_to = segment_manager.get_block_size();
return segment.write(0, bl).handle_error(
crimson::ct_error::input_output_error::pass_further{},
crimson::ct_error::assert_all{
"Invalid error when initing segment"}
);
}
SegmentedAllocator::Writer::roll_segment_ertr::future<>
SegmentedAllocator::Writer::roll_segment(bool set_rolling) {
LOG_PREFIX(SegmentedAllocator::Writer::roll_segment);
DEBUG("set_rolling {}", set_rolling);
if (set_rolling) {
rolling_segment = true;
}
assert(rolling_segment);
if (current_segment) {
(void) seastar::with_gate(writer_guard, [this] {
auto fut = seastar::now();
if (!current_segment->inflight_writes.empty()) {
fut = seastar::when_all_succeed(
current_segment->inflight_writes.begin(),
current_segment->inflight_writes.end());
}
current_segment->outdated = true;
return fut.then(
[cs=std::move(current_segment), this, it=(--open_segments.end())] {
return cs->segment->close().safe_then([this, cs, it] {
LOG_PREFIX(SegmentedAllocator::Writer::roll_segment);
assert((*it).get() == cs.get());
segment_provider.close_segment(cs->segment->get_segment_id());
open_segments.erase(it);
DEBUG("closed segment: {}", cs->segment->get_segment_id());
});
});
}).handle_exception_type([](seastar::gate_closed_exception e) {
LOG_PREFIX(SegmentedAllocator::Writer::roll_segment);
DEBUG(" writer_guard closed, should be stopping");
return seastar::now();
});
}
return segment_provider.get_segment(
segment_manager.get_device_id(), OOL_SEG_SEQ
).safe_then([this](auto segment) {
return segment_manager.open(segment);
}).safe_then([this](auto segref) {
LOG_PREFIX(SegmentedAllocator::Writer::roll_segment);
DEBUG("opened new segment: {}", segref->get_segment_id());
return init_segment(*segref).safe_then([segref=std::move(segref), this] {
LOG_PREFIX(SegmentedAllocator::Writer::roll_segment);
assert(!current_segment.get());
current_segment.reset(new open_segment_wrapper_t());
current_segment->segment = segref;
open_segments.emplace_back(current_segment);
rolling_segment = false;
segment_rotation_guard.broadcast();
DEBUG("inited new segment: {}", segref->get_segment_id());
});
}).handle_error(
roll_segment_ertr::pass_further{},
crimson::ct_error::all_same_way([] { ceph_assert(0 == "TODO"); })
);
}
return do_write(t, extents);
});
}
}

View File

@ -4,11 +4,11 @@
#pragma once
#include "seastar/core/gate.hh"
#include "seastar/core/shared_future.hh"
#include "crimson/common/condition_variable.h"
#include "crimson/os/seastore/cached_extent.h"
#include "crimson/os/seastore/journal/segment_allocator.h"
#include "crimson/os/seastore/logging.h"
#include "crimson/os/seastore/segment_manager.h"
#include "crimson/os/seastore/transaction.h"
namespace crimson::os::seastore {
@ -151,17 +151,6 @@ public:
};
using ExtentAllocatorRef = std::unique_ptr<ExtentAllocator>;
struct open_segment_wrapper_t : public boost::intrusive_ref_counter<
open_segment_wrapper_t,
boost::thread_unsafe_counter> {
SegmentRef segment;
std::list<seastar::future<>> inflight_writes;
bool outdated = false;
};
using open_segment_wrapper_ref =
boost::intrusive_ptr<open_segment_wrapper_t>;
class SegmentProvider;
/**
@ -180,51 +169,33 @@ class SegmentProvider;
class SegmentedAllocator : public ExtentAllocator {
class Writer : public ExtentOolWriter {
public:
Writer(
SegmentProvider& sp,
SegmentManager& sm)
: segment_provider(sp),
segment_manager(sm)
{}
Writer(SegmentProvider& sp, SegmentManager& sm)
: segment_allocator(segment_type_t::OOL, sp, sm) {}
Writer(Writer &&) = default;
write_iertr::future<> write(
Transaction& t,
std::list<LogicalCachedExtentRef>& extent) final;
stop_ertr::future<> stop() final {
return writer_guard.close().then([this] {
return crimson::do_for_each(open_segments, [](auto& seg_wrapper) {
return seg_wrapper->segment->close();
});
return write_guard.close().then([this] {
return segment_allocator.close();
});
}
private:
bool _needs_roll(seastore_off_t length) const;
write_iertr::future<> do_write(
Transaction& t,
std::list<LogicalCachedExtentRef>& extent);
write_iertr::future<> _write(
Transaction& t,
ool_record_t& record);
using roll_segment_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
roll_segment_ertr::future<> roll_segment(bool);
using init_segment_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
init_segment_ertr::future<> init_segment(Segment& segment);
void add_extent_to_write(
ool_record_t&,
LogicalCachedExtentRef& extent);
SegmentProvider& segment_provider;
SegmentManager& segment_manager;
open_segment_wrapper_ref current_segment;
std::list<open_segment_wrapper_ref> open_segments;
seastore_off_t allocated_to = 0;
crimson::condition_variable segment_rotation_guard;
seastar::gate writer_guard;
bool rolling_segment = false;
journal::SegmentAllocator segment_allocator;
std::optional<seastar::shared_promise<>> roll_promise;
seastar::gate write_guard;
};
public:
SegmentedAllocator(
@ -261,8 +232,6 @@ public:
});
}
private:
SegmentProvider& segment_provider;
SegmentManager& segment_manager;
std::vector<Writer> writers;
};

View File

@ -0,0 +1,222 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
// vim: ts=8 sw=2 smarttab expandtab
#include "segment_allocator.h"
#include "crimson/os/seastore/logging.h"
#include "crimson/os/seastore/segment_cleaner.h"
SET_SUBSYS(seastore_journal);
namespace crimson::os::seastore::journal {
static segment_nonce_t generate_nonce(
segment_seq_t seq,
const seastore_meta_t &meta)
{
return ceph_crc32c(
seq,
reinterpret_cast<const unsigned char *>(meta.seastore_id.bytes()),
sizeof(meta.seastore_id.uuid));
}
SegmentAllocator::SegmentAllocator(
segment_type_t type,
SegmentProvider &sp,
SegmentManager &sm)
: type{type},
segment_provider{sp},
segment_manager{sm}
{
ceph_assert(type != segment_type_t::NULL_SEG);
reset();
}
void SegmentAllocator::set_next_segment_seq(segment_seq_t seq)
{
LOG_PREFIX(SegmentAllocator::set_next_segment_seq);
INFO("{} {} next_segment_seq={}",
type, get_device_id(), segment_seq_printer_t{seq});
assert(type == segment_seq_to_type(seq));
next_segment_seq = seq;
}
SegmentAllocator::open_ertr::future<journal_seq_t>
SegmentAllocator::open()
{
LOG_PREFIX(SegmentAllocator::open);
ceph_assert(!current_segment);
segment_seq_t new_segment_seq;
if (type == segment_type_t::JOURNAL) {
new_segment_seq = next_segment_seq++;
} else { // OOL
new_segment_seq = next_segment_seq;
}
assert(new_segment_seq == get_current_segment_seq());
ceph_assert(segment_seq_to_type(new_segment_seq) == type);
auto new_segment_id = segment_provider.get_segment(
get_device_id(), new_segment_seq);
return segment_manager.open(new_segment_id
).handle_error(
open_ertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error in SegmentAllocator::open open"
}
).safe_then([this, FNAME, new_segment_seq](auto sref) {
// initialize new segment
journal_seq_t new_journal_tail;
if (type == segment_type_t::JOURNAL) {
new_journal_tail = segment_provider.get_journal_tail_target();
current_segment_nonce = generate_nonce(
new_segment_seq, segment_manager.get_meta());
} else { // OOL
new_journal_tail = NO_DELTAS;
assert(current_segment_nonce == 0);
}
segment_id_t segment_id = sref->get_segment_id();
auto header = segment_header_t{
new_segment_seq,
segment_id,
new_journal_tail,
current_segment_nonce};
INFO("{} {} writing header to new segment ... -- {}",
type, get_device_id(), header);
auto header_length = segment_manager.get_block_size();
bufferlist bl;
encode(header, bl);
bufferptr bp(ceph::buffer::create_page_aligned(header_length));
bp.zero();
auto iter = bl.cbegin();
iter.copy(bl.length(), bp.c_str());
bl.clear();
bl.append(bp);
ceph_assert(sref->get_write_ptr() == 0);
assert((unsigned)header_length == bl.length());
written_to = header_length;
auto new_journal_seq = journal_seq_t{
new_segment_seq,
paddr_t::make_seg_paddr(segment_id, written_to)};
if (type == segment_type_t::OOL) {
// FIXME: improve the special handling for OOL
segment_provider.update_segment_avail_bytes(
new_journal_seq.offset);
}
return sref->write(0, bl
).handle_error(
open_ertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error in SegmentAllocator::open write"
}
).safe_then([this,
FNAME,
new_journal_seq,
new_journal_tail,
sref=std::move(sref)]() mutable {
ceph_assert(!current_segment);
current_segment = std::move(sref);
if (type == segment_type_t::JOURNAL) {
segment_provider.update_journal_tail_committed(new_journal_tail);
}
DEBUG("{} {} rolled new segment id={}",
type, get_device_id(), current_segment->get_segment_id());
ceph_assert(new_journal_seq.segment_seq == get_current_segment_seq());
return new_journal_seq;
});
});
}
SegmentAllocator::roll_ertr::future<>
SegmentAllocator::roll()
{
ceph_assert(can_write());
return close_segment(true).safe_then([this] {
return open().discard_result();
});
}
SegmentAllocator::write_ret
SegmentAllocator::write(ceph::bufferlist to_write)
{
LOG_PREFIX(SegmentAllocator::write);
assert(can_write());
auto write_length = to_write.length();
auto write_start_offset = written_to;
auto write_start_seq = journal_seq_t{
get_current_segment_seq(),
paddr_t::make_seg_paddr(
current_segment->get_segment_id(), write_start_offset)
};
TRACE("{} {} {}~{}", type, get_device_id(), write_start_seq, write_length);
assert(write_length > 0);
assert((write_length % segment_manager.get_block_size()) == 0);
assert(!needs_roll(write_length));
auto write_result = write_result_t{
write_start_seq,
static_cast<seastore_off_t>(write_length)
};
written_to += write_length;
if (type == segment_type_t::OOL) {
// FIXME: improve the special handling for OOL
segment_provider.update_segment_avail_bytes(
paddr_t::make_seg_paddr(
current_segment->get_segment_id(), written_to)
);
}
return current_segment->write(
write_start_offset, to_write
).handle_error(
write_ertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error in SegmentAllocator::write"
}
).safe_then([write_result, cs=current_segment] {
return write_result;
});
}
SegmentAllocator::close_ertr::future<>
SegmentAllocator::close()
{
return [this] {
LOG_PREFIX(SegmentAllocator::close);
if (current_segment) {
return close_segment(false);
} else {
INFO("{} {} no current segment", type, get_device_id());
return close_segment_ertr::now();
}
}().finally([this] {
reset();
});
}
SegmentAllocator::close_segment_ertr::future<>
SegmentAllocator::close_segment(bool is_rolling)
{
LOG_PREFIX(SegmentAllocator::close_segment);
assert(can_write());
auto seg_to_close = std::move(current_segment);
auto close_segment_id = seg_to_close->get_segment_id();
INFO("{} {} close segment id={}, seq={}, written_to={}, nonce={}",
type, get_device_id(),
close_segment_id,
segment_seq_printer_t{get_current_segment_seq()},
written_to,
current_segment_nonce);
if (is_rolling) {
segment_provider.close_segment(close_segment_id);
}
return seg_to_close->close(
).safe_then([seg_to_close=std::move(seg_to_close)] {
}).handle_error(
close_segment_ertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error in SegmentAllocator::close_segment"
}
);
}
}

View File

@ -0,0 +1,134 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
// vim: ts=8 sw=2 smarttab expandtab
#pragma once
#include "include/buffer.h"
#include "crimson/common/errorator.h"
#include "crimson/os/seastore/segment_manager.h"
namespace crimson::os::seastore {
class SegmentProvider;
}
namespace crimson::os::seastore::journal {
/**
* SegmentAllocator
*
* Maintain an available segment for writes.
*/
class SegmentAllocator {
using base_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
public:
SegmentAllocator(segment_type_t type,
SegmentProvider &sp,
SegmentManager &sm);
device_id_t get_device_id() const {
return segment_manager.get_device_id();
}
seastore_off_t get_block_size() const {
return segment_manager.get_block_size();
}
extent_len_t get_max_write_length() const {
return segment_manager.get_segment_size() -
p2align(ceph::encoded_sizeof_bounded<segment_header_t>(),
size_t(segment_manager.get_block_size()));
}
device_segment_id_t get_num_segments() const {
return segment_manager.get_num_segments();
}
bool can_write() const {
return !!current_segment;
}
segment_id_t get_segment_id() const {
assert(can_write());
return current_segment->get_segment_id();
}
segment_nonce_t get_nonce() const {
assert(can_write());
return current_segment_nonce;
}
seastore_off_t get_written_to() const {
assert(can_write());
return written_to;
}
void set_next_segment_seq(segment_seq_t);
// returns true iff the current segment has insufficient space
bool needs_roll(std::size_t length) const {
assert(can_write());
auto write_capacity = current_segment->get_write_capacity();
return length + written_to > std::size_t(write_capacity);
}
// open for write
using open_ertr = base_ertr;
using open_ret = open_ertr::future<journal_seq_t>;
open_ret open();
// close the current segment and initialize next one
using roll_ertr = base_ertr;
roll_ertr::future<> roll();
// write the buffer, return the write result
//
// May be called concurrently, but writes may complete in any order.
// If rolling/opening, no write is allowed.
using write_ertr = base_ertr;
using write_ret = write_ertr::future<write_result_t>;
write_ret write(ceph::bufferlist to_write);
using close_ertr = base_ertr;
close_ertr::future<> close();
private:
void reset() {
current_segment.reset();
if (type == segment_type_t::JOURNAL) {
next_segment_seq = 0;
} else { // OOL
next_segment_seq = OOL_SEG_SEQ;
}
current_segment_nonce = 0;
written_to = 0;
}
// FIXME: remove the unnecessary is_rolling
using close_segment_ertr = base_ertr;
close_segment_ertr::future<> close_segment(bool is_rolling);
segment_seq_t get_current_segment_seq() const {
segment_seq_t ret;
if (type == segment_type_t::JOURNAL) {
assert(next_segment_seq != 0);
ret = next_segment_seq - 1;
} else { // OOL
ret = next_segment_seq;
}
assert(segment_seq_to_type(ret) == type);
return ret;
}
const segment_type_t type; // JOURNAL or OOL
SegmentProvider &segment_provider;
SegmentManager &segment_manager;
SegmentRef current_segment;
segment_seq_t next_segment_seq;
segment_nonce_t current_segment_nonce;
seastore_off_t written_to;
};
}

View File

@ -26,22 +26,14 @@ SET_SUBSYS(seastore_journal);
namespace crimson::os::seastore::journal {
segment_nonce_t generate_nonce(
segment_seq_t seq,
const seastore_meta_t &meta)
{
return ceph_crc32c(
seq,
reinterpret_cast<const unsigned char *>(meta.seastore_id.bytes()),
sizeof(meta.seastore_id.uuid));
}
SegmentedJournal::SegmentedJournal(
SegmentManager &segment_manager,
ExtentReader &scanner,
SegmentProvider &segment_provider)
: segment_provider(segment_provider),
journal_segment_manager(segment_manager, segment_provider),
journal_segment_allocator(segment_type_t::JOURNAL,
segment_provider,
segment_manager),
record_submitter(crimson::common::get_conf<uint64_t>(
"seastore_journal_iodepth_limit"),
crimson::common::get_conf<uint64_t>(
@ -50,7 +42,7 @@ SegmentedJournal::SegmentedJournal(
"seastore_journal_batch_flush_size"),
crimson::common::get_conf<double>(
"seastore_journal_batch_preferred_fullness"),
journal_segment_manager),
journal_segment_allocator),
scanner(scanner)
{
register_metrics();
@ -59,16 +51,17 @@ SegmentedJournal::SegmentedJournal(
SegmentedJournal::open_for_write_ret SegmentedJournal::open_for_write()
{
LOG_PREFIX(Journal::open_for_write);
INFO("device_id={}", journal_segment_manager.get_device_id());
return journal_segment_manager.open();
INFO("device_id={}", journal_segment_allocator.get_device_id());
return journal_segment_allocator.open();
}
SegmentedJournal::close_ertr::future<> SegmentedJournal::close()
{
LOG_PREFIX(Journal::close);
INFO("closing");
INFO("closing, committed_to={}",
record_submitter.get_committed_to());
metrics.clear();
return journal_segment_manager.close();
return journal_segment_allocator.close();
}
SegmentedJournal::prep_replay_segments_fut
@ -88,15 +81,15 @@ SegmentedJournal::prep_replay_segments(
rt.second.journal_segment_seq;
});
journal_segment_manager.set_segment_seq(
segments.rbegin()->second.journal_segment_seq);
journal_segment_allocator.set_next_segment_seq(
segments.rbegin()->second.journal_segment_seq + 1);
std::for_each(
segments.begin(),
segments.end(),
[this, FNAME](auto &seg)
{
if (seg.first != seg.second.physical_segment_id ||
seg.first.device_id() != journal_segment_manager.get_device_id() ||
seg.first.device_id() != journal_segment_allocator.get_device_id() ||
seg.second.get_type() != segment_type_t::JOURNAL) {
ERROR("illegal journal segment for replay -- {}", seg.second);
ceph_abort();
@ -123,7 +116,7 @@ SegmentedJournal::prep_replay_segments(
} else {
replay_from = paddr_t::make_seg_paddr(
from->first,
journal_segment_manager.get_block_size());
journal_segment_allocator.get_block_size());
}
auto num_segments = segments.end() - from;
@ -137,7 +130,7 @@ SegmentedJournal::prep_replay_segments(
p.second.journal_segment_seq,
paddr_t::make_seg_paddr(
p.first,
journal_segment_manager.get_block_size())
journal_segment_allocator.get_block_size())
};
return std::make_pair(ret, p.second);
});
@ -253,10 +246,10 @@ SegmentedJournal::find_journal_segments()
return crimson::do_for_each(
boost::counting_iterator<device_segment_id_t>(0),
boost::counting_iterator<device_segment_id_t>(
journal_segment_manager.get_num_segments()),
journal_segment_allocator.get_num_segments()),
[this, &ret](device_segment_id_t d_segment_id) {
segment_id_t segment_id{
journal_segment_manager.get_device_id(),
journal_segment_allocator.get_device_id(),
d_segment_id};
return scanner.read_segment_header(
segment_id
@ -366,169 +359,6 @@ void SegmentedJournal::register_metrics()
);
}
SegmentedJournal::JournalSegmentManager::JournalSegmentManager(
SegmentManager& segment_manager,
SegmentProvider& segment_provider)
: segment_provider{segment_provider}, segment_manager{segment_manager}
{
reset();
}
SegmentedJournal::JournalSegmentManager::open_ret
SegmentedJournal::JournalSegmentManager::open()
{
return roll().safe_then([this] {
return get_current_write_seq();
});
}
SegmentedJournal::JournalSegmentManager::close_ertr::future<>
SegmentedJournal::JournalSegmentManager::close()
{
LOG_PREFIX(JournalSegmentManager::close);
if (current_journal_segment) {
INFO("segment_id={}, seq={}, "
"written_to={}, committed_to={}, nonce={}",
current_journal_segment->get_segment_id(),
get_segment_seq(),
written_to,
committed_to,
current_segment_nonce);
} else {
INFO("no current journal segment");
}
return (
current_journal_segment ?
current_journal_segment->close() :
Segment::close_ertr::now()
).handle_error(
close_ertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error in JournalSegmentManager::close()"
}
).finally([this] {
reset();
});
}
SegmentedJournal::JournalSegmentManager::roll_ertr::future<>
SegmentedJournal::JournalSegmentManager::roll()
{
LOG_PREFIX(JournalSegmentManager::roll);
auto old_segment_id = current_journal_segment ?
current_journal_segment->get_segment_id() :
NULL_SEG_ID;
if (current_journal_segment) {
INFO("closing segment {}, seq={}, "
"written_to={}, committed_to={}, nonce={}",
old_segment_id,
get_segment_seq(),
written_to,
committed_to,
current_segment_nonce);
}
return (
current_journal_segment ?
current_journal_segment->close() :
Segment::close_ertr::now()
).safe_then([this] {
return segment_provider.get_segment(
get_device_id(), next_journal_segment_seq);
}).safe_then([this](auto segment) {
return segment_manager.open(segment);
}).safe_then([this](auto sref) {
current_journal_segment = sref;
return initialize_segment(*current_journal_segment);
}).safe_then([this, old_segment_id] {
if (old_segment_id != NULL_SEG_ID) {
segment_provider.close_segment(old_segment_id);
}
}).handle_error(
roll_ertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error in JournalSegmentManager::roll"
}
);
}
SegmentedJournal::JournalSegmentManager::write_ret
SegmentedJournal::JournalSegmentManager::write(ceph::bufferlist to_write)
{
LOG_PREFIX(JournalSegmentManager::write);
auto write_length = to_write.length();
auto write_start_seq = get_current_write_seq();
TRACE("{}~{}", write_start_seq, write_length);
assert(write_length > 0);
assert((write_length % segment_manager.get_block_size()) == 0);
assert(!needs_roll(write_length));
auto write_start_offset = written_to;
written_to += write_length;
auto write_result = write_result_t{
write_start_seq,
static_cast<seastore_off_t>(write_length)
};
return current_journal_segment->write(
write_start_offset, to_write
).handle_error(
write_ertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error in JournalSegmentManager::write"
}
).safe_then([write_result] {
return write_result;
});
}
void SegmentedJournal::JournalSegmentManager::mark_committed(
const journal_seq_t& new_committed_to)
{
LOG_PREFIX(JournalSegmentManager::mark_committed);
TRACE("{} => {}", committed_to, new_committed_to);
assert(committed_to == JOURNAL_SEQ_NULL ||
committed_to <= new_committed_to);
committed_to = new_committed_to;
}
SegmentedJournal::JournalSegmentManager::initialize_segment_ertr::future<>
SegmentedJournal::JournalSegmentManager::initialize_segment(Segment& segment)
{
LOG_PREFIX(JournalSegmentManager::initialize_segment);
auto new_tail = segment_provider.get_journal_tail_target();
// write out header
ceph_assert(segment.get_write_ptr() == 0);
bufferlist bl;
segment_seq_t seq = next_journal_segment_seq++;
current_segment_nonce = generate_nonce(
seq, segment_manager.get_meta());
auto header = segment_header_t{
seq,
segment.get_segment_id(),
new_tail,
current_segment_nonce};
INFO("writing {} ...", header);
ceph_assert(header.get_type() == segment_type_t::JOURNAL);
encode(header, bl);
bufferptr bp(
ceph::buffer::create_page_aligned(
segment_manager.get_block_size()));
bp.zero();
auto iter = bl.cbegin();
iter.copy(bl.length(), bp.c_str());
bl.clear();
bl.append(bp);
written_to = 0;
return write(bl
).safe_then([this, new_tail](auto) {
segment_provider.update_journal_tail_committed(new_tail);
});
}
SegmentedJournal::RecordBatch::add_pending_ret
SegmentedJournal::RecordBatch::add_pending(
record_t&& record,
@ -644,10 +474,10 @@ SegmentedJournal::RecordSubmitter::RecordSubmitter(
std::size_t batch_capacity,
std::size_t batch_flush_size,
double preferred_fullness,
JournalSegmentManager& jsm)
SegmentAllocator& jsa)
: io_depth_limit{io_depth},
preferred_fullness{preferred_fullness},
journal_segment_manager{jsm},
journal_segment_allocator{jsa},
batches(new RecordBatch[io_depth + 1])
{
LOG_PREFIX(RecordSubmitter);
@ -678,9 +508,9 @@ SegmentedJournal::RecordSubmitter::submit(
assert(write_pipeline);
auto expected_size = record_group_size_t(
record.size,
journal_segment_manager.get_block_size()
journal_segment_allocator.get_block_size()
).get_encoded_length();
auto max_record_length = journal_segment_manager.get_max_write_length();
auto max_record_length = journal_segment_allocator.get_max_write_length();
if (expected_size > max_record_length) {
ERROR("H{} {} exceeds max record size {}",
(void*)&handle, record, max_record_length);
@ -768,13 +598,12 @@ void SegmentedJournal::RecordSubmitter::flush_current_batch()
increment_io();
auto num = p_batch->get_num_records();
auto committed_to = journal_segment_manager.get_committed_to();
auto [to_write, sizes] = p_batch->encode_batch(
committed_to, journal_segment_manager.get_nonce());
journal_committed_to, journal_segment_allocator.get_nonce());
DEBUG("{} records, {}, committed_to={}, outstanding_io={} ...",
num, sizes, committed_to, num_outstanding_io);
num, sizes, journal_committed_to, num_outstanding_io);
account_submission(num, sizes);
std::ignore = journal_segment_manager.write(to_write
std::ignore = journal_segment_allocator.write(to_write
).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
TRACE("{} records, {}, write done with {}", num, sizes, write_result);
finish_submit_batch(p_batch, write_result);
@ -804,16 +633,15 @@ SegmentedJournal::RecordSubmitter::submit_pending(
if (do_flush && p_current_batch->is_empty()) {
// fast path with direct write
increment_io();
auto committed_to = journal_segment_manager.get_committed_to();
auto [to_write, sizes] = p_current_batch->submit_pending_fast(
std::move(record),
journal_segment_manager.get_block_size(),
committed_to,
journal_segment_manager.get_nonce());
journal_segment_allocator.get_block_size(),
journal_committed_to,
journal_segment_allocator.get_nonce());
DEBUG("H{} fast submit {}, committed_to={}, outstanding_io={} ...",
(void*)&handle, sizes, committed_to, num_outstanding_io);
(void*)&handle, sizes, journal_committed_to, num_outstanding_io);
account_submission(1, sizes);
return journal_segment_manager.write(to_write
return journal_segment_allocator.write(to_write
).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
return record_locator_t{
write_result.start_seq.offset.add_offset(mdlength),
@ -827,7 +655,7 @@ SegmentedJournal::RecordSubmitter::submit_pending(
auto write_fut = p_current_batch->add_pending(
std::move(record),
handle,
journal_segment_manager.get_block_size());
journal_segment_allocator.get_block_size());
if (do_flush) {
DEBUG("H{} added pending and flush", (void*)&handle);
flush_current_batch();
@ -845,8 +673,10 @@ SegmentedJournal::RecordSubmitter::submit_pending(
return handle.enter(write_pipeline->finalize
).then([this, FNAME, submit_result, &handle] {
DEBUG("H{} finish with {}", (void*)&handle, submit_result);
journal_segment_manager.mark_committed(
submit_result.write_result.get_end_seq());
auto new_committed_to = submit_result.write_result.get_end_seq();
assert(journal_committed_to == JOURNAL_SEQ_NULL ||
journal_committed_to <= new_committed_to);
journal_committed_to = new_committed_to;
return submit_result;
});
});
@ -865,15 +695,15 @@ SegmentedJournal::RecordSubmitter::do_submit(
// can increment io depth
assert(!wait_submit_promise.has_value());
auto maybe_new_size = p_current_batch->can_batch(
record, journal_segment_manager.get_block_size());
record, journal_segment_allocator.get_block_size());
if (!maybe_new_size.has_value() ||
(maybe_new_size->get_encoded_length() >
journal_segment_manager.get_max_write_length())) {
journal_segment_allocator.get_max_write_length())) {
TRACE("H{} flush", (void*)&handle);
assert(p_current_batch->is_pending());
flush_current_batch();
return do_submit(std::move(record), handle);
} else if (journal_segment_manager.needs_roll(
} else if (journal_segment_allocator.needs_roll(
maybe_new_size->get_encoded_length())) {
if (p_current_batch->is_pending()) {
TRACE("H{} flush and roll", (void*)&handle);
@ -881,7 +711,7 @@ SegmentedJournal::RecordSubmitter::do_submit(
} else {
TRACE("H{} roll", (void*)&handle);
}
return journal_segment_manager.roll(
return journal_segment_allocator.roll(
).safe_then([this, record=std::move(record), &handle]() mutable {
return do_submit(std::move(record), handle);
});
@ -895,11 +725,11 @@ SegmentedJournal::RecordSubmitter::do_submit(
assert(state == state_t::FULL);
// cannot increment io depth
auto maybe_new_size = p_current_batch->can_batch(
record, journal_segment_manager.get_block_size());
record, journal_segment_allocator.get_block_size());
if (!maybe_new_size.has_value() ||
(maybe_new_size->get_encoded_length() >
journal_segment_manager.get_max_write_length()) ||
journal_segment_manager.needs_roll(
journal_segment_allocator.get_max_write_length()) ||
journal_segment_allocator.needs_roll(
maybe_new_size->get_encoded_length())) {
if (!wait_submit_promise.has_value()) {
wait_submit_promise = seastar::promise<>();

View File

@ -18,10 +18,10 @@
#include "crimson/os/seastore/segment_cleaner.h"
#include "crimson/os/seastore/journal.h"
#include "crimson/os/seastore/extent_reader.h"
#include "crimson/os/seastore/segment_manager.h"
#include "crimson/os/seastore/ordering_handle.h"
#include "crimson/os/seastore/seastore_types.h"
#include "crimson/osd/exceptions.h"
#include "segment_allocator.h"
namespace crimson::os::seastore::journal {
@ -58,106 +58,6 @@ public:
}
private:
class JournalSegmentManager {
public:
JournalSegmentManager(SegmentManager&, SegmentProvider&);
using base_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
extent_len_t get_max_write_length() const {
return segment_manager.get_segment_size() -
p2align(ceph::encoded_sizeof_bounded<segment_header_t>(),
size_t(segment_manager.get_block_size()));
}
device_id_t get_device_id() const {
return segment_manager.get_device_id();
}
device_segment_id_t get_num_segments() const {
return segment_manager.get_num_segments();
}
seastore_off_t get_block_size() const {
return segment_manager.get_block_size();
}
segment_nonce_t get_nonce() const {
return current_segment_nonce;
}
journal_seq_t get_committed_to() const {
return committed_to;
}
segment_seq_t get_segment_seq() const {
return next_journal_segment_seq - 1;
}
void set_segment_seq(segment_seq_t current_seq) {
next_journal_segment_seq = (current_seq + 1);
}
using open_ertr = base_ertr;
using open_ret = open_ertr::future<journal_seq_t>;
open_ret open();
using close_ertr = base_ertr;
close_ertr::future<> close();
// returns true iff the current segment has insufficient space
bool needs_roll(std::size_t length) const {
auto write_capacity = current_journal_segment->get_write_capacity();
return length + written_to > std::size_t(write_capacity);
}
// close the current segment and initialize next one
using roll_ertr = base_ertr;
roll_ertr::future<> roll();
// write the buffer, return the write result
// May be called concurrently, writes may complete in any order.
using write_ertr = base_ertr;
using write_ret = write_ertr::future<write_result_t>;
write_ret write(ceph::bufferlist to_write);
// mark write committed in order
void mark_committed(const journal_seq_t& new_committed_to);
private:
journal_seq_t get_current_write_seq() const {
assert(current_journal_segment);
return journal_seq_t{
get_segment_seq(),
paddr_t::make_seg_paddr(current_journal_segment->get_segment_id(),
written_to)
};
}
void reset() {
next_journal_segment_seq = 0;
current_segment_nonce = 0;
current_journal_segment.reset();
written_to = 0;
committed_to = JOURNAL_SEQ_NULL;
}
// prepare segment for writes, writes out segment header
using initialize_segment_ertr = base_ertr;
initialize_segment_ertr::future<> initialize_segment(Segment&);
SegmentProvider& segment_provider;
SegmentManager& segment_manager;
segment_seq_t next_journal_segment_seq;
segment_nonce_t current_segment_nonce;
SegmentRef current_journal_segment;
seastore_off_t written_to;
// committed_to may be in a previous journal segment
journal_seq_t committed_to;
};
class RecordBatch {
enum class state_t {
EMPTY = 0,
@ -222,7 +122,7 @@ private:
//
// Set write_result_t::write_length to 0 if the record is not the first one
// in the batch.
using add_pending_ertr = JournalSegmentManager::write_ertr;
using add_pending_ertr = SegmentAllocator::write_ertr;
using add_pending_ret = add_pending_ertr::future<record_locator_t>;
add_pending_ret add_pending(
record_t&&,
@ -300,7 +200,7 @@ private:
std::size_t batch_capacity,
std::size_t batch_flush_size,
double preferred_fullness,
JournalSegmentManager&);
SegmentAllocator&);
grouped_io_stats get_record_batch_stats() const {
return stats.record_batch_stats;
@ -322,6 +222,10 @@ private:
return stats.record_group_data_bytes;
}
journal_seq_t get_committed_to() const {
return journal_committed_to;
}
void reset_stats() {
stats = {};
}
@ -361,7 +265,7 @@ private:
void flush_current_batch();
using submit_pending_ertr = JournalSegmentManager::write_ertr;
using submit_pending_ertr = SegmentAllocator::write_ertr;
using submit_pending_ret = submit_pending_ertr::future<
record_locator_t>;
submit_pending_ret submit_pending(
@ -377,7 +281,10 @@ private:
double preferred_fullness;
WritePipeline* write_pipeline = nullptr;
JournalSegmentManager& journal_segment_manager;
SegmentAllocator& journal_segment_allocator;
// committed_to may be in a previous journal segment
journal_seq_t journal_committed_to = JOURNAL_SEQ_NULL;
std::unique_ptr<RecordBatch[]> batches;
std::size_t current_batch_index;
// should not be nullptr after constructed
@ -395,7 +302,7 @@ private:
};
SegmentProvider& segment_provider;
JournalSegmentManager journal_segment_manager;
SegmentAllocator journal_segment_allocator;
RecordSubmitter record_submitter;
ExtentReader& scanner;
seastar::metrics::metric_group metrics;

View File

@ -207,28 +207,26 @@ void SegmentCleaner::register_metrics()
});
}
SegmentCleaner::get_segment_ret SegmentCleaner::get_segment(
device_id_t id, segment_seq_t seq)
segment_id_t SegmentCleaner::get_segment(
device_id_t device_id, segment_seq_t seq)
{
LOG_PREFIX(SegmentCleaner::get_segment);
assert(segment_seq_to_type(seq) != segment_type_t::NULL_SEG);
for (auto it = segments.device_begin(id);
it != segments.device_end(id);
for (auto it = segments.device_begin(device_id);
it != segments.device_end(device_id);
++it) {
auto id = it->first;
auto seg_id = it->first;
auto& segment_info = it->second;
if (segment_info.is_empty()) {
logger().debug("{}: returning segment {} {}",
__func__, id, segment_seq_printer_t{seq});
mark_open(id, seq);
return get_segment_ret(
get_segment_ertr::ready_future_marker{},
id);
DEBUG("returning segment {} {}", seg_id, segment_seq_printer_t{seq});
mark_open(seg_id, seq);
return seg_id;
}
}
assert(0 == "out of space handling todo");
return get_segment_ret(
get_segment_ertr::ready_future_marker{},
NULL_SEG_ID);
ERROR("(TODO) handle out of space from device {} with segment_seq={}",
device_id, segment_seq_printer_t{seq});
ceph_abort();
return NULL_SEG_ID;
}
void SegmentCleaner::update_journal_tail_target(journal_seq_t target)

View File

@ -272,10 +272,7 @@ private:
*/
class SegmentProvider {
public:
using get_segment_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
using get_segment_ret = get_segment_ertr::future<segment_id_t>;
virtual get_segment_ret get_segment(
virtual segment_id_t get_segment(
device_id_t id, segment_seq_t seq) = 0;
virtual void close_segment(segment_id_t) {}
@ -681,7 +678,7 @@ public:
using mount_ret = mount_ertr::future<>;
mount_ret mount(device_id_t pdevice_id, std::vector<SegmentManager*>& sms);
get_segment_ret get_segment(
segment_id_t get_segment(
device_id_t id, segment_seq_t seq) final;
void close_segment(segment_id_t segment) final;

View File

@ -43,14 +43,12 @@ struct btree_test_base :
void update_segment_avail_bytes(paddr_t offset) final {}
get_segment_ret get_segment(device_id_t id, segment_seq_t seq) final {
segment_id_t get_segment(device_id_t id, segment_seq_t seq) final {
auto ret = next;
next = segment_id_t{
next.device_id(),
next.device_segment_id() + 1};
return get_segment_ret(
get_segment_ertr::ready_future_marker{},
ret);
return ret;
}
journal_seq_t get_journal_tail_target() const final { return journal_seq_t{}; }

View File

@ -84,14 +84,12 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider {
void update_segment_avail_bytes(paddr_t offset) final {}
get_segment_ret get_segment(device_id_t id, segment_seq_t seq) final {
segment_id_t get_segment(device_id_t id, segment_seq_t seq) final {
auto ret = next;
next = segment_id_t{
next.device_id(),
next.device_segment_id() + 1};
return get_segment_ret(
get_segment_ertr::ready_future_marker{},
ret);
return ret;
}
journal_seq_t get_journal_tail_target() const final { return journal_seq_t{}; }