From 65a294058fc1253d89479ed483f99357241a6771 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Fri, 11 Feb 2022 14:27:54 +0800 Subject: [PATCH 1/6] crimson/os/seastore/segment_cleaner: drop unnecessary future from get_segment() Signed-off-by: Yingxin Cheng --- .../os/seastore/extent_placement_manager.cc | 9 +++--- .../os/seastore/journal/segmented_journal.cc | 5 ++-- src/crimson/os/seastore/segment_cleaner.cc | 28 +++++++++---------- src/crimson/os/seastore/segment_cleaner.h | 7 ++--- .../seastore/test_btree_lba_manager.cc | 6 ++-- .../crimson/seastore/test_seastore_journal.cc | 6 ++-- 6 files changed, 25 insertions(+), 36 deletions(-) diff --git a/src/crimson/os/seastore/extent_placement_manager.cc b/src/crimson/os/seastore/extent_placement_manager.cc index c5331cf74b1..796c63ce932 100644 --- a/src/crimson/os/seastore/extent_placement_manager.cc +++ b/src/crimson/os/seastore/extent_placement_manager.cc @@ -251,11 +251,10 @@ SegmentedAllocator::Writer::roll_segment(bool set_rolling) { }); } - return segment_provider.get_segment( - segment_manager.get_device_id(), OOL_SEG_SEQ - ).safe_then([this](auto segment) { - return segment_manager.open(segment); - }).safe_then([this](auto segref) { + auto new_segment_id = segment_provider.get_segment( + segment_manager.get_device_id(), OOL_SEG_SEQ); + return segment_manager.open(new_segment_id + ).safe_then([this](auto segref) { LOG_PREFIX(SegmentedAllocator::Writer::roll_segment); DEBUG("opened new segment: {}", segref->get_segment_id()); return init_segment(*segref).safe_then([segref=std::move(segref), this] { diff --git a/src/crimson/os/seastore/journal/segmented_journal.cc b/src/crimson/os/seastore/journal/segmented_journal.cc index 351ef6a2e1a..2fbfa3560a2 100644 --- a/src/crimson/os/seastore/journal/segmented_journal.cc +++ b/src/crimson/os/seastore/journal/segmented_journal.cc @@ -434,10 +434,9 @@ SegmentedJournal::JournalSegmentManager::roll() current_journal_segment->close() : Segment::close_ertr::now() ).safe_then([this] { - return segment_provider.get_segment( + auto new_segment_id = segment_provider->get_segment( get_device_id(), next_journal_segment_seq); - }).safe_then([this](auto segment) { - return segment_manager.open(segment); + return segment_manager.open(new_segment_id); }).safe_then([this](auto sref) { current_journal_segment = sref; return initialize_segment(*current_journal_segment); diff --git a/src/crimson/os/seastore/segment_cleaner.cc b/src/crimson/os/seastore/segment_cleaner.cc index 2f6a418fa1f..ae7c0fd17a2 100644 --- a/src/crimson/os/seastore/segment_cleaner.cc +++ b/src/crimson/os/seastore/segment_cleaner.cc @@ -207,28 +207,26 @@ void SegmentCleaner::register_metrics() }); } -SegmentCleaner::get_segment_ret SegmentCleaner::get_segment( - device_id_t id, segment_seq_t seq) +segment_id_t SegmentCleaner::get_segment( + device_id_t device_id, segment_seq_t seq) { + LOG_PREFIX(SegmentCleaner::get_segment); assert(segment_seq_to_type(seq) != segment_type_t::NULL_SEG); - for (auto it = segments.device_begin(id); - it != segments.device_end(id); + for (auto it = segments.device_begin(device_id); + it != segments.device_end(device_id); ++it) { - auto id = it->first; + auto seg_id = it->first; auto& segment_info = it->second; if (segment_info.is_empty()) { - logger().debug("{}: returning segment {} {}", - __func__, id, segment_seq_printer_t{seq}); - mark_open(id, seq); - return get_segment_ret( - get_segment_ertr::ready_future_marker{}, - id); + DEBUG("returning segment {} {}", seg_id, segment_seq_printer_t{seq}); + mark_open(seg_id, seq); + return seg_id; } } - assert(0 == "out of space handling todo"); - return get_segment_ret( - get_segment_ertr::ready_future_marker{}, - NULL_SEG_ID); + ERROR("(TODO) handle out of space from device {} with segment_seq={}", + device_id, segment_seq_printer_t{seq}); + ceph_abort(); + return NULL_SEG_ID; } void SegmentCleaner::update_journal_tail_target(journal_seq_t target) diff --git a/src/crimson/os/seastore/segment_cleaner.h b/src/crimson/os/seastore/segment_cleaner.h index e170071f6f3..a4d47407f5b 100644 --- a/src/crimson/os/seastore/segment_cleaner.h +++ b/src/crimson/os/seastore/segment_cleaner.h @@ -272,10 +272,7 @@ private: */ class SegmentProvider { public: - using get_segment_ertr = crimson::errorator< - crimson::ct_error::input_output_error>; - using get_segment_ret = get_segment_ertr::future; - virtual get_segment_ret get_segment( + virtual segment_id_t get_segment( device_id_t id, segment_seq_t seq) = 0; virtual void close_segment(segment_id_t) {} @@ -681,7 +678,7 @@ public: using mount_ret = mount_ertr::future<>; mount_ret mount(device_id_t pdevice_id, std::vector& sms); - get_segment_ret get_segment( + segment_id_t get_segment( device_id_t id, segment_seq_t seq) final; void close_segment(segment_id_t segment) final; diff --git a/src/test/crimson/seastore/test_btree_lba_manager.cc b/src/test/crimson/seastore/test_btree_lba_manager.cc index 58ac9393535..a92e2926656 100644 --- a/src/test/crimson/seastore/test_btree_lba_manager.cc +++ b/src/test/crimson/seastore/test_btree_lba_manager.cc @@ -43,14 +43,12 @@ struct btree_test_base : void update_segment_avail_bytes(paddr_t offset) final {} - get_segment_ret get_segment(device_id_t id, segment_seq_t seq) final { + segment_id_t get_segment(device_id_t id, segment_seq_t seq) final { auto ret = next; next = segment_id_t{ next.device_id(), next.device_segment_id() + 1}; - return get_segment_ret( - get_segment_ertr::ready_future_marker{}, - ret); + return ret; } journal_seq_t get_journal_tail_target() const final { return journal_seq_t{}; } diff --git a/src/test/crimson/seastore/test_seastore_journal.cc b/src/test/crimson/seastore/test_seastore_journal.cc index 1a25392974c..65ff64c5216 100644 --- a/src/test/crimson/seastore/test_seastore_journal.cc +++ b/src/test/crimson/seastore/test_seastore_journal.cc @@ -84,14 +84,12 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider { void update_segment_avail_bytes(paddr_t offset) final {} - get_segment_ret get_segment(device_id_t id, segment_seq_t seq) final { + segment_id_t get_segment(device_id_t id, segment_seq_t seq) final { auto ret = next; next = segment_id_t{ next.device_id(), next.device_segment_id() + 1}; - return get_segment_ret( - get_segment_ertr::ready_future_marker{}, - ret); + return ret; } journal_seq_t get_journal_tail_target() const final { return journal_seq_t{}; } From 7ebf3590de92cc64e6e25322934cc0cf71ebedc0 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Tue, 15 Feb 2022 09:28:04 +0800 Subject: [PATCH 2/6] crimson/os/seastore/epm: simplify gating writes for Writer Dropped open_segment_wrapper_t. Signed-off-by: Yingxin Cheng --- .../os/seastore/extent_placement_manager.cc | 125 ++++++++---------- .../os/seastore/extent_placement_manager.h | 27 ++-- 2 files changed, 66 insertions(+), 86 deletions(-) diff --git a/src/crimson/os/seastore/extent_placement_manager.cc b/src/crimson/os/seastore/extent_placement_manager.cc index 796c63ce932..3d92d4b329e 100644 --- a/src/crimson/os/seastore/extent_placement_manager.cc +++ b/src/crimson/os/seastore/extent_placement_manager.cc @@ -37,25 +37,23 @@ SegmentedAllocator::Writer::_write( Transaction& t, ool_record_t& record) { + LOG_PREFIX(SegmentedAllocator::Writer::_write); auto record_size = record.get_encoded_record_length(); allocated_to += record_size.get_encoded_length(); segment_provider.update_segment_avail_bytes( paddr_t::make_seg_paddr( - current_segment->segment->get_segment_id(), + current_segment->get_segment_id(), allocated_to)); bufferlist bl = record.encode( - current_segment->segment->get_segment_id(), + current_segment->get_segment_id(), 0); - seastar::promise<> pr; - current_segment->inflight_writes.emplace_back(pr.get_future()); - LOG_PREFIX(SegmentedAllocator::Writer::_write); DEBUGT( "written {} extents, {} bytes to segment {} at {}", t, record.get_num_extents(), bl.length(), - current_segment->segment->get_segment_id(), + current_segment->get_segment_id(), record.get_base()); // account transactional ool writes before write() @@ -68,18 +66,10 @@ SegmentedAllocator::Writer::_write( stats.num_records += 1; return trans_intr::make_interruptible( - current_segment->segment->write(record.get_base(), bl).safe_then( - [this, FNAME, pr=std::move(pr), &t, - it=(--current_segment->inflight_writes.end()), - cs=current_segment]() mutable { - if (cs->outdated) { - DEBUGT("segment rolled", t); - pr.set_value(); - } else{ - DEBUGT("segment not rolled", t); - current_segment->inflight_writes.erase(it); - } - return seastar::now(); + current_segment->write(record.get_base(), bl + ).safe_then([FNAME, &t, &record, cs=current_segment] { + DEBUGT("written {} {}", + t, cs->get_segment_id(), record.get_base()); }) ).si_then([FNAME, &record, &t] { for (auto& ool_extent : record.get_extents()) { @@ -140,7 +130,7 @@ SegmentedAllocator::Writer::write( "end of segment, writing {} extents to segment {} at {}", t, num_extents, - current_segment->segment->get_segment_id(), + current_segment->get_segment_id(), allocated_to); return (num_extents ? _write(t, record) : @@ -160,7 +150,7 @@ SegmentedAllocator::Writer::write( "writing {} extents to segment {} at {}", t, record.get_num_extents(), - current_segment->segment->get_segment_id(), + current_segment->get_segment_id(), allocated_to); return _write(t, record); } @@ -173,22 +163,26 @@ SegmentedAllocator::Writer::write( }); }; - if (rolling_segment) { - return segment_rotation_guard.wait([this] { - return !rolling_segment; - }, std::move(write_func)); + return seastar::with_gate(write_guard, + [this, write_func=std::move(write_func)]() mutable + { + if (rolling_segment) { + return segment_rotation_guard.wait([this] { + return !rolling_segment; + }, std::move(write_func)); - } else if (!current_segment) { - return trans_intr::make_interruptible(roll_segment(true)).si_then( - [write_func=std::move(write_func)] { - return write_func(); - }); - } - return write_func(); + } else if (!current_segment) { + return trans_intr::make_interruptible(roll_segment(true)).si_then( + [write_func=std::move(write_func)] { + return write_func(); + }); + } + return write_func(); + }); } bool SegmentedAllocator::Writer::_needs_roll(seastore_off_t length) const { - return allocated_to + length > current_segment->segment->get_write_capacity(); + return allocated_to + length > current_segment->get_write_capacity(); } SegmentedAllocator::Writer::init_segment_ertr::future<> @@ -225,44 +219,39 @@ SegmentedAllocator::Writer::roll_segment(bool set_rolling) { rolling_segment = true; } assert(rolling_segment); - if (current_segment) { - (void) seastar::with_gate(writer_guard, [this] { - auto fut = seastar::now(); - if (!current_segment->inflight_writes.empty()) { - fut = seastar::when_all_succeed( - current_segment->inflight_writes.begin(), - current_segment->inflight_writes.end()); - } - current_segment->outdated = true; - return fut.then( - [cs=std::move(current_segment), this, it=(--open_segments.end())] { - return cs->segment->close().safe_then([this, cs, it] { - LOG_PREFIX(SegmentedAllocator::Writer::roll_segment); - assert((*it).get() == cs.get()); - segment_provider.close_segment(cs->segment->get_segment_id()); - open_segments.erase(it); - DEBUG("closed segment: {}", cs->segment->get_segment_id()); + return [this, FNAME] { + if (current_segment) { + auto seg_to_close = std::move(current_segment); + if (write_guard.is_closed()) { + DEBUG("write_guard is closed, should be stopping"); + return seg_to_close->close( + ).safe_then([seg_to_close=std::move(seg_to_close)] {}); + } else { + DEBUG("rolling OOL segment, close {} ...", seg_to_close->get_segment_id()); + (void) seastar::with_gate(write_guard, + [this, seg_to_close=std::move(seg_to_close)]() mutable + { + return seg_to_close->close( + ).safe_then([this, seg_to_close=std::move(seg_to_close)] { + segment_provider.close_segment(seg_to_close->get_segment_id()); + }); }); - }); - }).handle_exception_type([](seastar::gate_closed_exception e) { - LOG_PREFIX(SegmentedAllocator::Writer::roll_segment); - DEBUG(" writer_guard closed, should be stopping"); - return seastar::now(); - }); - } - - auto new_segment_id = segment_provider.get_segment( - segment_manager.get_device_id(), OOL_SEG_SEQ); - return segment_manager.open(new_segment_id - ).safe_then([this](auto segref) { - LOG_PREFIX(SegmentedAllocator::Writer::roll_segment); + return Segment::close_ertr::now(); + } + } else { + DEBUG("rolling OOL segment, no current ..."); + return Segment::close_ertr::now(); + } + }().safe_then([this] { + auto new_segment_id = segment_provider.get_segment( + segment_manager.get_device_id(), OOL_SEG_SEQ); + return segment_manager.open(new_segment_id); + }).safe_then([this, FNAME](auto segref) { DEBUG("opened new segment: {}", segref->get_segment_id()); - return init_segment(*segref).safe_then([segref=std::move(segref), this] { - LOG_PREFIX(SegmentedAllocator::Writer::roll_segment); - assert(!current_segment.get()); - current_segment.reset(new open_segment_wrapper_t()); - current_segment->segment = segref; - open_segments.emplace_back(current_segment); + return init_segment(*segref + ).safe_then([segref=std::move(segref), this, FNAME] { + assert(!current_segment); + current_segment = segref; rolling_segment = false; segment_rotation_guard.broadcast(); DEBUG("inited new segment: {}", segref->get_segment_id()); diff --git a/src/crimson/os/seastore/extent_placement_manager.h b/src/crimson/os/seastore/extent_placement_manager.h index 485ed4a069b..5d97534f858 100644 --- a/src/crimson/os/seastore/extent_placement_manager.h +++ b/src/crimson/os/seastore/extent_placement_manager.h @@ -151,17 +151,6 @@ public: }; using ExtentAllocatorRef = std::unique_ptr; -struct open_segment_wrapper_t : public boost::intrusive_ref_counter< - open_segment_wrapper_t, - boost::thread_unsafe_counter> { - SegmentRef segment; - std::list> inflight_writes; - bool outdated = false; -}; - -using open_segment_wrapper_ref = - boost::intrusive_ptr; - class SegmentProvider; /** @@ -191,11 +180,14 @@ class SegmentedAllocator : public ExtentAllocator { write_iertr::future<> write( Transaction& t, std::list& extent) final; + stop_ertr::future<> stop() final { - return writer_guard.close().then([this] { - return crimson::do_for_each(open_segments, [](auto& seg_wrapper) { - return seg_wrapper->segment->close(); - }); + return write_guard.close().then([this] { + if (current_segment) { + return current_segment->close(); + } else { + return Segment::close_ertr::now(); + } }); } private: @@ -219,11 +211,10 @@ class SegmentedAllocator : public ExtentAllocator { SegmentProvider& segment_provider; SegmentManager& segment_manager; - open_segment_wrapper_ref current_segment; - std::list open_segments; + SegmentRef current_segment; seastore_off_t allocated_to = 0; crimson::condition_variable segment_rotation_guard; - seastar::gate writer_guard; + seastar::gate write_guard; bool rolling_segment = false; }; public: From d2e3bb6f7aa568e9d452acd7625d18c66a0d34ce Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Tue, 15 Feb 2022 22:26:18 +0800 Subject: [PATCH 3/6] crimson/os/seastore/epm: replace condition_variable by a shared_promise Signed-off-by: Yingxin Cheng --- .../os/seastore/extent_placement_manager.cc | 194 ++++++++---------- .../os/seastore/extent_placement_manager.h | 15 +- 2 files changed, 94 insertions(+), 115 deletions(-) diff --git a/src/crimson/os/seastore/extent_placement_manager.cc b/src/crimson/os/seastore/extent_placement_manager.cc index 3d92d4b329e..5ba17f6038b 100644 --- a/src/crimson/os/seastore/extent_placement_manager.cc +++ b/src/crimson/os/seastore/extent_placement_manager.cc @@ -38,6 +38,7 @@ SegmentedAllocator::Writer::_write( ool_record_t& record) { LOG_PREFIX(SegmentedAllocator::Writer::_write); + record.set_base(allocated_to); auto record_size = record.get_encoded_record_length(); allocated_to += record_size.get_encoded_length(); segment_provider.update_segment_avail_bytes( @@ -65,33 +66,80 @@ SegmentedAllocator::Writer::_write( stats.data_bytes += record_size.dlength; stats.num_records += 1; + for (auto& ool_extent : record.get_extents()) { + auto& lextent = ool_extent.get_lextent(); + auto paddr = ool_extent.get_ool_paddr(); + TRACET("ool extent written at {} -- {}", t, *lextent, paddr); + lextent->hint = placement_hint_t::NUM_HINTS; // invalidate hint + t.mark_delayed_extent_ool(lextent, paddr); + } + return trans_intr::make_interruptible( current_segment->write(record.get_base(), bl - ).safe_then([FNAME, &t, &record, cs=current_segment] { - DEBUGT("written {} {}", - t, cs->get_segment_id(), record.get_base()); + ).safe_then([FNAME, &t, base=record.get_base(), cs=current_segment] { + DEBUGT("written {} {}", t, cs->get_segment_id(), base); }) - ).si_then([FNAME, &record, &t] { - for (auto& ool_extent : record.get_extents()) { - auto& lextent = ool_extent.get_lextent(); - auto paddr = ool_extent.get_ool_paddr(); - TRACET("ool extent written at {} -- {}", t, *lextent, paddr); - lextent->hint = placement_hint_t::NUM_HINTS; // invalidate hint - t.mark_delayed_extent_ool(lextent, paddr); - } - record.clear(); - }); + ); } -void SegmentedAllocator::Writer::add_extent_to_write( - ool_record_t& record, - LogicalCachedExtentRef& extent) { - logger().debug( - "SegmentedAllocator::Writer::add_extent_to_write: " - "add extent {} to record", - extent); - extent->prepare_write(); - record.add_extent(extent); +SegmentedAllocator::Writer::write_iertr::future<> +SegmentedAllocator::Writer::do_write( + Transaction& t, + std::list& extents) +{ + LOG_PREFIX(SegmentedAllocator::Writer::do_write); + assert(!extents.empty()); + if (roll_promise.has_value()) { + return roll_promise->get_shared_future( + ).then([this, &t, &extents] { + return do_write(t, extents); + }); + } + assert(current_segment); + + ool_record_t record(segment_manager.get_block_size()); + for (auto it = extents.begin(); it != extents.end();) { + auto& extent = *it; + auto wouldbe_length = record.get_wouldbe_encoded_record_length(extent); + if (_needs_roll(wouldbe_length)) { + // reached the segment end, write and roll + assert(!roll_promise.has_value()); + roll_promise = seastar::shared_promise<>(); + auto num_extents = record.get_num_extents(); + DEBUGT( + "end of segment, writing {} extents to segment {} at {}", + t, + num_extents, + current_segment->get_segment_id(), + allocated_to); + return (num_extents ? + _write(t, record) : + write_iertr::now() + ).si_then([this] { + return roll_segment(); + }).finally([this] { + roll_promise->set_value(); + roll_promise.reset(); + }).si_then([this, &t, &extents] { + if (!extents.empty()) { + return do_write(t, extents); + } + return write_iertr::now(); + }); + } + DEBUGT("add extent to record -- {}", t, *extent); + extent->prepare_write(); + record.add_extent(extent); + it = extents.erase(it); + } + + DEBUGT( + "writing {} extents to segment {} at {}", + t, + record.get_num_extents(), + current_segment->get_segment_id(), + allocated_to); + return _write(t, record); } SegmentedAllocator::Writer::write_iertr::future<> @@ -99,85 +147,22 @@ SegmentedAllocator::Writer::write( Transaction& t, std::list& extents) { - auto write_func = [this, &extents, &t] { - return seastar::do_with(ool_record_t(segment_manager.get_block_size()), - [this, &extents, &t](auto& record) { - return trans_intr::repeat([this, &record, &t, &extents]() - -> write_iertr::future { - if (extents.empty()) { - return seastar::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::yes); - } - - return segment_rotation_guard.wait( - [this] { - return !rolling_segment; - }, - [this, &record, &extents, &t]() -> write_iertr::future<> { - LOG_PREFIX(SegmentedAllocator::Writer::write); - record.set_base(allocated_to); - for (auto it = extents.begin(); - it != extents.end();) { - auto& extent = *it; - auto wouldbe_length = - record.get_wouldbe_encoded_record_length(extent); - if (_needs_roll(wouldbe_length)) { - // reached the segment end, write and roll - assert(!rolling_segment); - rolling_segment = true; - auto num_extents = record.get_num_extents(); - DEBUGT( - "end of segment, writing {} extents to segment {} at {}", - t, - num_extents, - current_segment->get_segment_id(), - allocated_to); - return (num_extents ? - _write(t, record) : - write_iertr::now() - ).si_then([this]() mutable { - return roll_segment(false); - }).finally([this] { - rolling_segment = false; - segment_rotation_guard.broadcast(); - }); - } - add_extent_to_write(record, extent); - it = extents.erase(it); - } - - DEBUGT( - "writing {} extents to segment {} at {}", - t, - record.get_num_extents(), - current_segment->get_segment_id(), - allocated_to); - return _write(t, record); - } - ).si_then([]() - -> write_iertr::future { - return seastar::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::no); - }); - }); - }); - }; - - return seastar::with_gate(write_guard, - [this, write_func=std::move(write_func)]() mutable - { - if (rolling_segment) { - return segment_rotation_guard.wait([this] { - return !rolling_segment; - }, std::move(write_func)); - - } else if (!current_segment) { - return trans_intr::make_interruptible(roll_segment(true)).si_then( - [write_func=std::move(write_func)] { - return write_func(); + if (extents.empty()) { + return write_iertr::now(); + } + return seastar::with_gate(write_guard, [this, &t, &extents] { + if (!roll_promise.has_value() && !current_segment) { + roll_promise = seastar::shared_promise<>(); + return trans_intr::make_interruptible( + roll_segment().finally([this] { + roll_promise->set_value(); + roll_promise.reset(); + }) + ).si_then([this, &t, &extents] { + return do_write(t, extents); }); } - return write_func(); + return do_write(t, extents); }); } @@ -212,13 +197,10 @@ SegmentedAllocator::Writer::init_segment(Segment& segment) { } SegmentedAllocator::Writer::roll_segment_ertr::future<> -SegmentedAllocator::Writer::roll_segment(bool set_rolling) { +SegmentedAllocator::Writer::roll_segment() { LOG_PREFIX(SegmentedAllocator::Writer::roll_segment); - DEBUG("set_rolling {}", set_rolling); - if (set_rolling) { - rolling_segment = true; - } - assert(rolling_segment); + DEBUG("start"); + assert(roll_promise.has_value()); return [this, FNAME] { if (current_segment) { auto seg_to_close = std::move(current_segment); @@ -252,8 +234,6 @@ SegmentedAllocator::Writer::roll_segment(bool set_rolling) { ).safe_then([segref=std::move(segref), this, FNAME] { assert(!current_segment); current_segment = segref; - rolling_segment = false; - segment_rotation_guard.broadcast(); DEBUG("inited new segment: {}", segref->get_segment_id()); }); }).handle_error( diff --git a/src/crimson/os/seastore/extent_placement_manager.h b/src/crimson/os/seastore/extent_placement_manager.h index 5d97534f858..c582f04e589 100644 --- a/src/crimson/os/seastore/extent_placement_manager.h +++ b/src/crimson/os/seastore/extent_placement_manager.h @@ -4,8 +4,8 @@ #pragma once #include "seastar/core/gate.hh" +#include "seastar/core/shared_future.hh" -#include "crimson/common/condition_variable.h" #include "crimson/os/seastore/cached_extent.h" #include "crimson/os/seastore/logging.h" #include "crimson/os/seastore/segment_manager.h" @@ -191,6 +191,10 @@ class SegmentedAllocator : public ExtentAllocator { }); } private: + write_iertr::future<> do_write( + Transaction& t, + std::list& extent); + bool _needs_roll(seastore_off_t length) const; write_iertr::future<> _write( @@ -199,23 +203,18 @@ class SegmentedAllocator : public ExtentAllocator { using roll_segment_ertr = crimson::errorator< crimson::ct_error::input_output_error>; - roll_segment_ertr::future<> roll_segment(bool); + roll_segment_ertr::future<> roll_segment(); using init_segment_ertr = crimson::errorator< crimson::ct_error::input_output_error>; init_segment_ertr::future<> init_segment(Segment& segment); - void add_extent_to_write( - ool_record_t&, - LogicalCachedExtentRef& extent); - SegmentProvider& segment_provider; SegmentManager& segment_manager; SegmentRef current_segment; seastore_off_t allocated_to = 0; - crimson::condition_variable segment_rotation_guard; + std::optional> roll_promise; seastar::gate write_guard; - bool rolling_segment = false; }; public: SegmentedAllocator( From a1e549075859b9385368fd4a6ca541ee6366306c Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Fri, 18 Feb 2022 10:41:14 +0800 Subject: [PATCH 4/6] crimson/os/seastore/journal: cleanup, move commit_to to RecordSubmitter Signed-off-by: Yingxin Cheng --- .../os/seastore/journal/segmented_journal.cc | 37 ++++++------------- .../os/seastore/journal/segmented_journal.h | 17 ++++----- 2 files changed, 19 insertions(+), 35 deletions(-) diff --git a/src/crimson/os/seastore/journal/segmented_journal.cc b/src/crimson/os/seastore/journal/segmented_journal.cc index 2fbfa3560a2..851e6cc2182 100644 --- a/src/crimson/os/seastore/journal/segmented_journal.cc +++ b/src/crimson/os/seastore/journal/segmented_journal.cc @@ -66,7 +66,8 @@ SegmentedJournal::open_for_write_ret SegmentedJournal::open_for_write() SegmentedJournal::close_ertr::future<> SegmentedJournal::close() { LOG_PREFIX(Journal::close); - INFO("closing"); + INFO("closing, committed_to={}", + record_submitter.get_committed_to()); metrics.clear(); return journal_segment_manager.close(); } @@ -387,12 +388,10 @@ SegmentedJournal::JournalSegmentManager::close() { LOG_PREFIX(JournalSegmentManager::close); if (current_journal_segment) { - INFO("segment_id={}, seq={}, " - "written_to={}, committed_to={}, nonce={}", + INFO("segment_id={}, seq={}, written_to={}, nonce={}", current_journal_segment->get_segment_id(), get_segment_seq(), written_to, - committed_to, current_segment_nonce); } else { INFO("no current journal segment"); @@ -420,12 +419,10 @@ SegmentedJournal::JournalSegmentManager::roll() current_journal_segment->get_segment_id() : NULL_SEG_ID; if (current_journal_segment) { - INFO("closing segment {}, seq={}, " - "written_to={}, committed_to={}, nonce={}", + INFO("closing segment {}, seq={}, written_to={}, nonce={}", old_segment_id, get_segment_seq(), written_to, - committed_to, current_segment_nonce); } @@ -481,16 +478,6 @@ SegmentedJournal::JournalSegmentManager::write(ceph::bufferlist to_write) }); } -void SegmentedJournal::JournalSegmentManager::mark_committed( - const journal_seq_t& new_committed_to) -{ - LOG_PREFIX(JournalSegmentManager::mark_committed); - TRACE("{} => {}", committed_to, new_committed_to); - assert(committed_to == JOURNAL_SEQ_NULL || - committed_to <= new_committed_to); - committed_to = new_committed_to; -} - SegmentedJournal::JournalSegmentManager::initialize_segment_ertr::future<> SegmentedJournal::JournalSegmentManager::initialize_segment(Segment& segment) { @@ -767,11 +754,10 @@ void SegmentedJournal::RecordSubmitter::flush_current_batch() increment_io(); auto num = p_batch->get_num_records(); - auto committed_to = journal_segment_manager.get_committed_to(); auto [to_write, sizes] = p_batch->encode_batch( - committed_to, journal_segment_manager.get_nonce()); + journal_committed_to, journal_segment_manager.get_nonce()); DEBUG("{} records, {}, committed_to={}, outstanding_io={} ...", - num, sizes, committed_to, num_outstanding_io); + num, sizes, journal_committed_to, num_outstanding_io); account_submission(num, sizes); std::ignore = journal_segment_manager.write(to_write ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) { @@ -803,14 +789,13 @@ SegmentedJournal::RecordSubmitter::submit_pending( if (do_flush && p_current_batch->is_empty()) { // fast path with direct write increment_io(); - auto committed_to = journal_segment_manager.get_committed_to(); auto [to_write, sizes] = p_current_batch->submit_pending_fast( std::move(record), journal_segment_manager.get_block_size(), - committed_to, + journal_committed_to, journal_segment_manager.get_nonce()); DEBUG("H{} fast submit {}, committed_to={}, outstanding_io={} ...", - (void*)&handle, sizes, committed_to, num_outstanding_io); + (void*)&handle, sizes, journal_committed_to, num_outstanding_io); account_submission(1, sizes); return journal_segment_manager.write(to_write ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) { @@ -844,8 +829,10 @@ SegmentedJournal::RecordSubmitter::submit_pending( return handle.enter(write_pipeline->finalize ).then([this, FNAME, submit_result, &handle] { DEBUG("H{} finish with {}", (void*)&handle, submit_result); - journal_segment_manager.mark_committed( - submit_result.write_result.get_end_seq()); + auto new_committed_to = submit_result.write_result.get_end_seq(); + assert(journal_committed_to == JOURNAL_SEQ_NULL || + journal_committed_to <= new_committed_to); + journal_committed_to = new_committed_to; return submit_result; }); }); diff --git a/src/crimson/os/seastore/journal/segmented_journal.h b/src/crimson/os/seastore/journal/segmented_journal.h index 49091a65a6f..23e8dbb0471 100644 --- a/src/crimson/os/seastore/journal/segmented_journal.h +++ b/src/crimson/os/seastore/journal/segmented_journal.h @@ -86,10 +86,6 @@ private: return current_segment_nonce; } - journal_seq_t get_committed_to() const { - return committed_to; - } - segment_seq_t get_segment_seq() const { return next_journal_segment_seq - 1; } @@ -120,9 +116,6 @@ private: using write_ertr = base_ertr; using write_ret = write_ertr::future; write_ret write(ceph::bufferlist to_write); - - // mark write committed in order - void mark_committed(const journal_seq_t& new_committed_to); private: journal_seq_t get_current_write_seq() const { @@ -139,7 +132,6 @@ private: current_segment_nonce = 0; current_journal_segment.reset(); written_to = 0; - committed_to = JOURNAL_SEQ_NULL; } // prepare segment for writes, writes out segment header @@ -154,8 +146,6 @@ private: SegmentRef current_journal_segment; seastore_off_t written_to; - // committed_to may be in a previous journal segment - journal_seq_t committed_to; }; class RecordBatch { @@ -322,6 +312,10 @@ private: return stats.record_group_data_bytes; } + journal_seq_t get_committed_to() const { + return journal_committed_to; + } + void reset_stats() { stats = {}; } @@ -378,6 +372,9 @@ private: WritePipeline* write_pipeline = nullptr; JournalSegmentManager& journal_segment_manager; + // committed_to may be in a previous journal segment + journal_seq_t journal_committed_to = JOURNAL_SEQ_NULL; + std::unique_ptr batches; std::size_t current_batch_index; // should not be nullptr after constructed From b0db4be190390a1ed8325425a8669a22dc028ea4 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Fri, 18 Feb 2022 14:47:17 +0800 Subject: [PATCH 5/6] crimson/os/seastore: introduce SegmentAllocator and integrate with Journal Signed-off-by: Yingxin Cheng --- src/crimson/os/seastore/CMakeLists.txt | 1 + .../os/seastore/journal/segment_allocator.cc | 210 +++++++++++++++++ .../os/seastore/journal/segment_allocator.h | 124 ++++++++++ .../os/seastore/journal/segmented_journal.cc | 218 +++--------------- .../os/seastore/journal/segmented_journal.h | 102 +------- 5 files changed, 372 insertions(+), 283 deletions(-) create mode 100644 src/crimson/os/seastore/journal/segment_allocator.cc create mode 100644 src/crimson/os/seastore/journal/segment_allocator.h diff --git a/src/crimson/os/seastore/CMakeLists.txt b/src/crimson/os/seastore/CMakeLists.txt index 5664bcbdb66..cec75b10471 100644 --- a/src/crimson/os/seastore/CMakeLists.txt +++ b/src/crimson/os/seastore/CMakeLists.txt @@ -39,6 +39,7 @@ set(crimson_seastore_srcs random_block_manager/nvme_manager.cc random_block_manager/nvmedevice.cc journal/segmented_journal.cc + journal/segment_allocator.cc journal.cc ../../../test/crimson/seastore/test_block.cc ${PROJECT_SOURCE_DIR}/src/os/Transaction.cc diff --git a/src/crimson/os/seastore/journal/segment_allocator.cc b/src/crimson/os/seastore/journal/segment_allocator.cc new file mode 100644 index 00000000000..b1245c61a17 --- /dev/null +++ b/src/crimson/os/seastore/journal/segment_allocator.cc @@ -0,0 +1,210 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#include "segment_allocator.h" + +#include "crimson/os/seastore/logging.h" +#include "crimson/os/seastore/segment_cleaner.h" + +SET_SUBSYS(seastore_journal); + +namespace crimson::os::seastore::journal { + +static segment_nonce_t generate_nonce( + segment_seq_t seq, + const seastore_meta_t &meta) +{ + return ceph_crc32c( + seq, + reinterpret_cast(meta.seastore_id.bytes()), + sizeof(meta.seastore_id.uuid)); +} + +SegmentAllocator::SegmentAllocator( + segment_type_t type, + SegmentProvider &sp, + SegmentManager &sm) + : type{type}, + segment_provider{sp}, + segment_manager{sm} +{ + ceph_assert(type != segment_type_t::NULL_SEG); + reset(); +} + +void SegmentAllocator::set_next_segment_seq(segment_seq_t seq) +{ + LOG_PREFIX(SegmentAllocator::set_next_segment_seq); + INFO("{} {} next_segment_seq={}", + type, get_device_id(), segment_seq_printer_t{seq}); + assert(type == segment_seq_to_type(seq)); + next_segment_seq = seq; +} + +SegmentAllocator::open_ertr::future +SegmentAllocator::open() +{ + LOG_PREFIX(SegmentAllocator::open); + ceph_assert(!current_segment); + segment_seq_t new_segment_seq; + if (type == segment_type_t::JOURNAL) { + new_segment_seq = next_segment_seq++; + } else { // OOL + new_segment_seq = next_segment_seq; + } + assert(new_segment_seq == get_current_segment_seq()); + ceph_assert(segment_seq_to_type(new_segment_seq) == type); + auto new_segment_id = segment_provider.get_segment( + get_device_id(), new_segment_seq); + return segment_manager.open(new_segment_id + ).handle_error( + open_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error in SegmentAllocator::open open" + } + ).safe_then([this, FNAME, new_segment_seq](auto sref) { + // initialize new segment + journal_seq_t new_journal_tail; + if (type == segment_type_t::JOURNAL) { + new_journal_tail = segment_provider.get_journal_tail_target(); + current_segment_nonce = generate_nonce( + new_segment_seq, segment_manager.get_meta()); + } else { // OOL + new_journal_tail = NO_DELTAS; + assert(current_segment_nonce == 0); + } + segment_id_t segment_id = sref->get_segment_id(); + auto header = segment_header_t{ + new_segment_seq, + segment_id, + new_journal_tail, + current_segment_nonce}; + INFO("{} {} writing header to new segment ... -- {}", + type, get_device_id(), header); + + auto header_length = segment_manager.get_block_size(); + bufferlist bl; + encode(header, bl); + bufferptr bp(ceph::buffer::create_page_aligned(header_length)); + bp.zero(); + auto iter = bl.cbegin(); + iter.copy(bl.length(), bp.c_str()); + bl.clear(); + bl.append(bp); + + ceph_assert(sref->get_write_ptr() == 0); + assert((unsigned)header_length == bl.length()); + written_to = header_length; + auto new_journal_seq = journal_seq_t{ + new_segment_seq, + paddr_t::make_seg_paddr(segment_id, written_to)}; + return sref->write(0, bl + ).handle_error( + open_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error in SegmentAllocator::open write" + } + ).safe_then([this, + FNAME, + new_journal_seq, + new_journal_tail, + sref=std::move(sref)]() mutable { + ceph_assert(!current_segment); + current_segment = std::move(sref); + if (type == segment_type_t::JOURNAL) { + segment_provider.update_journal_tail_committed(new_journal_tail); + } + DEBUG("{} {} rolled new segment id={}", + type, get_device_id(), current_segment->get_segment_id()); + ceph_assert(new_journal_seq.segment_seq == get_current_segment_seq()); + return new_journal_seq; + }); + }); +} + +SegmentAllocator::roll_ertr::future<> +SegmentAllocator::roll() +{ + ceph_assert(can_write()); + return close_segment(true).safe_then([this] { + return open().discard_result(); + }); +} + +SegmentAllocator::write_ret +SegmentAllocator::write(ceph::bufferlist to_write) +{ + LOG_PREFIX(SegmentAllocator::write); + assert(can_write()); + auto write_length = to_write.length(); + auto write_start_offset = written_to; + auto write_start_seq = journal_seq_t{ + get_current_segment_seq(), + paddr_t::make_seg_paddr( + current_segment->get_segment_id(), write_start_offset) + }; + TRACE("{} {} {}~{}", type, get_device_id(), write_start_seq, write_length); + assert(write_length > 0); + assert((write_length % segment_manager.get_block_size()) == 0); + assert(!needs_roll(write_length)); + + auto write_result = write_result_t{ + write_start_seq, + static_cast(write_length) + }; + written_to += write_length; + return current_segment->write( + write_start_offset, to_write + ).handle_error( + write_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error in SegmentAllocator::write" + } + ).safe_then([write_result] { + return write_result; + }); +} + +SegmentAllocator::close_ertr::future<> +SegmentAllocator::close() +{ + return [this] { + LOG_PREFIX(SegmentAllocator::close); + if (current_segment) { + return close_segment(false); + } else { + INFO("{} {} no current segment", type, get_device_id()); + return close_segment_ertr::now(); + } + }().finally([this] { + reset(); + }); +} + +SegmentAllocator::close_segment_ertr::future<> +SegmentAllocator::close_segment(bool is_rolling) +{ + LOG_PREFIX(SegmentAllocator::close_segment); + assert(can_write()); + auto seg_to_close = std::move(current_segment); + auto close_segment_id = seg_to_close->get_segment_id(); + INFO("{} {} close segment id={}, seq={}, written_to={}, nonce={}", + type, get_device_id(), + close_segment_id, + segment_seq_printer_t{get_current_segment_seq()}, + written_to, + current_segment_nonce); + if (is_rolling) { + segment_provider.close_segment(close_segment_id); + } + return seg_to_close->close( + ).safe_then([seg_to_close=std::move(seg_to_close)] { + }).handle_error( + close_segment_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Invalid error in SegmentAllocator::close_segment" + } + ); +} + +} diff --git a/src/crimson/os/seastore/journal/segment_allocator.h b/src/crimson/os/seastore/journal/segment_allocator.h new file mode 100644 index 00000000000..ad36cfd2593 --- /dev/null +++ b/src/crimson/os/seastore/journal/segment_allocator.h @@ -0,0 +1,124 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab + +#pragma once + +#include "include/buffer.h" + +#include "crimson/common/errorator.h" +#include "crimson/os/seastore/segment_manager.h" + +namespace crimson::os::seastore { + class SegmentProvider; +} + +namespace crimson::os::seastore::journal { + +/** + * SegmentAllocator + * + * Maintain an available segment for writes. + */ +class SegmentAllocator { + using base_ertr = crimson::errorator< + crimson::ct_error::input_output_error>; + + public: + SegmentAllocator(segment_type_t type, + SegmentProvider &sp, + SegmentManager &sm); + + device_id_t get_device_id() const { + return segment_manager.get_device_id(); + } + + seastore_off_t get_block_size() const { + return segment_manager.get_block_size(); + } + + extent_len_t get_max_write_length() const { + return segment_manager.get_segment_size() - + p2align(ceph::encoded_sizeof_bounded(), + size_t(segment_manager.get_block_size())); + } + + device_segment_id_t get_num_segments() const { + return segment_manager.get_num_segments(); + } + + bool can_write() const { + return !!current_segment; + } + + segment_nonce_t get_nonce() const { + assert(can_write()); + return current_segment_nonce; + } + + void set_next_segment_seq(segment_seq_t); + + // returns true iff the current segment has insufficient space + bool needs_roll(std::size_t length) const { + assert(can_write()); + auto write_capacity = current_segment->get_write_capacity(); + return length + written_to > std::size_t(write_capacity); + } + + // open for write + using open_ertr = base_ertr; + using open_ret = open_ertr::future; + open_ret open(); + + // close the current segment and initialize next one + using roll_ertr = base_ertr; + roll_ertr::future<> roll(); + + // write the buffer, return the write result + // + // May be called concurrently, but writes may complete in any order. + // If rolling/opening, no write is allowed. + using write_ertr = base_ertr; + using write_ret = write_ertr::future; + write_ret write(ceph::bufferlist to_write); + + using close_ertr = base_ertr; + close_ertr::future<> close(); + + private: + void reset() { + current_segment.reset(); + if (type == segment_type_t::JOURNAL) { + next_segment_seq = 0; + } else { // OOL + next_segment_seq = OOL_SEG_SEQ; + } + current_segment_nonce = 0; + written_to = 0; + } + + // FIXME: remove the unnecessary is_rolling + using close_segment_ertr = base_ertr; + close_segment_ertr::future<> close_segment(bool is_rolling); + + segment_seq_t get_current_segment_seq() const { + segment_seq_t ret; + if (type == segment_type_t::JOURNAL) { + assert(next_segment_seq != 0); + ret = next_segment_seq - 1; + } else { // OOL + ret = next_segment_seq; + } + assert(segment_seq_to_type(ret) == type); + return ret; + } + + const segment_type_t type; // JOURNAL or OOL + SegmentProvider &segment_provider; + SegmentManager &segment_manager; + SegmentRef current_segment; + segment_seq_t next_segment_seq; + segment_nonce_t current_segment_nonce; + seastore_off_t written_to; +}; + +} diff --git a/src/crimson/os/seastore/journal/segmented_journal.cc b/src/crimson/os/seastore/journal/segmented_journal.cc index 851e6cc2182..ce677b55c9d 100644 --- a/src/crimson/os/seastore/journal/segmented_journal.cc +++ b/src/crimson/os/seastore/journal/segmented_journal.cc @@ -26,22 +26,14 @@ SET_SUBSYS(seastore_journal); namespace crimson::os::seastore::journal { -segment_nonce_t generate_nonce( - segment_seq_t seq, - const seastore_meta_t &meta) -{ - return ceph_crc32c( - seq, - reinterpret_cast(meta.seastore_id.bytes()), - sizeof(meta.seastore_id.uuid)); -} - SegmentedJournal::SegmentedJournal( SegmentManager &segment_manager, ExtentReader &scanner, SegmentProvider &segment_provider) : segment_provider(segment_provider), - journal_segment_manager(segment_manager, segment_provider), + journal_segment_allocator(segment_type_t::JOURNAL, + segment_provider, + segment_manager), record_submitter(crimson::common::get_conf( "seastore_journal_iodepth_limit"), crimson::common::get_conf( @@ -50,7 +42,7 @@ SegmentedJournal::SegmentedJournal( "seastore_journal_batch_flush_size"), crimson::common::get_conf( "seastore_journal_batch_preferred_fullness"), - journal_segment_manager), + journal_segment_allocator), scanner(scanner) { register_metrics(); @@ -59,8 +51,8 @@ SegmentedJournal::SegmentedJournal( SegmentedJournal::open_for_write_ret SegmentedJournal::open_for_write() { LOG_PREFIX(Journal::open_for_write); - INFO("device_id={}", journal_segment_manager.get_device_id()); - return journal_segment_manager.open(); + INFO("device_id={}", journal_segment_allocator.get_device_id()); + return journal_segment_allocator.open(); } SegmentedJournal::close_ertr::future<> SegmentedJournal::close() @@ -69,7 +61,7 @@ SegmentedJournal::close_ertr::future<> SegmentedJournal::close() INFO("closing, committed_to={}", record_submitter.get_committed_to()); metrics.clear(); - return journal_segment_manager.close(); + return journal_segment_allocator.close(); } SegmentedJournal::prep_replay_segments_fut @@ -89,15 +81,15 @@ SegmentedJournal::prep_replay_segments( rt.second.journal_segment_seq; }); - journal_segment_manager.set_segment_seq( - segments.rbegin()->second.journal_segment_seq); + journal_segment_allocator.set_next_segment_seq( + segments.rbegin()->second.journal_segment_seq + 1); std::for_each( segments.begin(), segments.end(), [this, FNAME](auto &seg) { if (seg.first != seg.second.physical_segment_id || - seg.first.device_id() != journal_segment_manager.get_device_id() || + seg.first.device_id() != journal_segment_allocator.get_device_id() || seg.second.get_type() != segment_type_t::JOURNAL) { ERROR("illegal journal segment for replay -- {}", seg.second); ceph_abort(); @@ -124,7 +116,7 @@ SegmentedJournal::prep_replay_segments( } else { replay_from = paddr_t::make_seg_paddr( from->first, - journal_segment_manager.get_block_size()); + journal_segment_allocator.get_block_size()); } auto num_segments = segments.end() - from; @@ -138,7 +130,7 @@ SegmentedJournal::prep_replay_segments( p.second.journal_segment_seq, paddr_t::make_seg_paddr( p.first, - journal_segment_manager.get_block_size()) + journal_segment_allocator.get_block_size()) }; return std::make_pair(ret, p.second); }); @@ -254,10 +246,10 @@ SegmentedJournal::find_journal_segments() return crimson::do_for_each( boost::counting_iterator(0), boost::counting_iterator( - journal_segment_manager.get_num_segments()), + journal_segment_allocator.get_num_segments()), [this, &ret](device_segment_id_t d_segment_id) { segment_id_t segment_id{ - journal_segment_manager.get_device_id(), + journal_segment_allocator.get_device_id(), d_segment_id}; return scanner.read_segment_header( segment_id @@ -367,154 +359,6 @@ void SegmentedJournal::register_metrics() ); } -SegmentedJournal::JournalSegmentManager::JournalSegmentManager( - SegmentManager& segment_manager, - SegmentProvider& segment_provider) - : segment_provider{segment_provider}, segment_manager{segment_manager} -{ - reset(); -} - -SegmentedJournal::JournalSegmentManager::open_ret -SegmentedJournal::JournalSegmentManager::open() -{ - return roll().safe_then([this] { - return get_current_write_seq(); - }); -} - -SegmentedJournal::JournalSegmentManager::close_ertr::future<> -SegmentedJournal::JournalSegmentManager::close() -{ - LOG_PREFIX(JournalSegmentManager::close); - if (current_journal_segment) { - INFO("segment_id={}, seq={}, written_to={}, nonce={}", - current_journal_segment->get_segment_id(), - get_segment_seq(), - written_to, - current_segment_nonce); - } else { - INFO("no current journal segment"); - } - - return ( - current_journal_segment ? - current_journal_segment->close() : - Segment::close_ertr::now() - ).handle_error( - close_ertr::pass_further{}, - crimson::ct_error::assert_all{ - "Invalid error in JournalSegmentManager::close()" - } - ).finally([this] { - reset(); - }); -} - -SegmentedJournal::JournalSegmentManager::roll_ertr::future<> -SegmentedJournal::JournalSegmentManager::roll() -{ - LOG_PREFIX(JournalSegmentManager::roll); - auto old_segment_id = current_journal_segment ? - current_journal_segment->get_segment_id() : - NULL_SEG_ID; - if (current_journal_segment) { - INFO("closing segment {}, seq={}, written_to={}, nonce={}", - old_segment_id, - get_segment_seq(), - written_to, - current_segment_nonce); - } - - return ( - current_journal_segment ? - current_journal_segment->close() : - Segment::close_ertr::now() - ).safe_then([this] { - auto new_segment_id = segment_provider->get_segment( - get_device_id(), next_journal_segment_seq); - return segment_manager.open(new_segment_id); - }).safe_then([this](auto sref) { - current_journal_segment = sref; - return initialize_segment(*current_journal_segment); - }).safe_then([this, old_segment_id] { - if (old_segment_id != NULL_SEG_ID) { - segment_provider.close_segment(old_segment_id); - } - }).handle_error( - roll_ertr::pass_further{}, - crimson::ct_error::assert_all{ - "Invalid error in JournalSegmentManager::roll" - } - ); -} - -SegmentedJournal::JournalSegmentManager::write_ret -SegmentedJournal::JournalSegmentManager::write(ceph::bufferlist to_write) -{ - LOG_PREFIX(JournalSegmentManager::write); - auto write_length = to_write.length(); - auto write_start_seq = get_current_write_seq(); - TRACE("{}~{}", write_start_seq, write_length); - assert(write_length > 0); - assert((write_length % segment_manager.get_block_size()) == 0); - assert(!needs_roll(write_length)); - - auto write_start_offset = written_to; - written_to += write_length; - auto write_result = write_result_t{ - write_start_seq, - static_cast(write_length) - }; - return current_journal_segment->write( - write_start_offset, to_write - ).handle_error( - write_ertr::pass_further{}, - crimson::ct_error::assert_all{ - "Invalid error in JournalSegmentManager::write" - } - ).safe_then([write_result] { - return write_result; - }); -} - -SegmentedJournal::JournalSegmentManager::initialize_segment_ertr::future<> -SegmentedJournal::JournalSegmentManager::initialize_segment(Segment& segment) -{ - LOG_PREFIX(JournalSegmentManager::initialize_segment); - auto new_tail = segment_provider.get_journal_tail_target(); - // write out header - ceph_assert(segment.get_write_ptr() == 0); - bufferlist bl; - - segment_seq_t seq = next_journal_segment_seq++; - current_segment_nonce = generate_nonce( - seq, segment_manager.get_meta()); - auto header = segment_header_t{ - seq, - segment.get_segment_id(), - new_tail, - current_segment_nonce}; - INFO("writing {} ...", header); - ceph_assert(header.get_type() == segment_type_t::JOURNAL); - encode(header, bl); - - bufferptr bp( - ceph::buffer::create_page_aligned( - segment_manager.get_block_size())); - bp.zero(); - auto iter = bl.cbegin(); - iter.copy(bl.length(), bp.c_str()); - bl.clear(); - bl.append(bp); - - written_to = 0; - return write(bl - ).safe_then([this, new_tail](auto) { - segment_provider.update_journal_tail_committed(new_tail); - }); -} - SegmentedJournal::RecordBatch::add_pending_ret SegmentedJournal::RecordBatch::add_pending( record_t&& record, @@ -630,10 +474,10 @@ SegmentedJournal::RecordSubmitter::RecordSubmitter( std::size_t batch_capacity, std::size_t batch_flush_size, double preferred_fullness, - JournalSegmentManager& jsm) + SegmentAllocator& jsa) : io_depth_limit{io_depth}, preferred_fullness{preferred_fullness}, - journal_segment_manager{jsm}, + journal_segment_allocator{jsa}, batches(new RecordBatch[io_depth + 1]) { LOG_PREFIX(RecordSubmitter); @@ -664,9 +508,9 @@ SegmentedJournal::RecordSubmitter::submit( assert(write_pipeline); auto expected_size = record_group_size_t( record.size, - journal_segment_manager.get_block_size() + journal_segment_allocator.get_block_size() ).get_encoded_length(); - auto max_record_length = journal_segment_manager.get_max_write_length(); + auto max_record_length = journal_segment_allocator.get_max_write_length(); if (expected_size > max_record_length) { ERROR("H{} {} exceeds max record size {}", (void*)&handle, record, max_record_length); @@ -755,11 +599,11 @@ void SegmentedJournal::RecordSubmitter::flush_current_batch() increment_io(); auto num = p_batch->get_num_records(); auto [to_write, sizes] = p_batch->encode_batch( - journal_committed_to, journal_segment_manager.get_nonce()); + journal_committed_to, journal_segment_allocator.get_nonce()); DEBUG("{} records, {}, committed_to={}, outstanding_io={} ...", num, sizes, journal_committed_to, num_outstanding_io); account_submission(num, sizes); - std::ignore = journal_segment_manager.write(to_write + std::ignore = journal_segment_allocator.write(to_write ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) { TRACE("{} records, {}, write done with {}", num, sizes, write_result); finish_submit_batch(p_batch, write_result); @@ -791,13 +635,13 @@ SegmentedJournal::RecordSubmitter::submit_pending( increment_io(); auto [to_write, sizes] = p_current_batch->submit_pending_fast( std::move(record), - journal_segment_manager.get_block_size(), + journal_segment_allocator.get_block_size(), journal_committed_to, - journal_segment_manager.get_nonce()); + journal_segment_allocator.get_nonce()); DEBUG("H{} fast submit {}, committed_to={}, outstanding_io={} ...", (void*)&handle, sizes, journal_committed_to, num_outstanding_io); account_submission(1, sizes); - return journal_segment_manager.write(to_write + return journal_segment_allocator.write(to_write ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) { return record_locator_t{ write_result.start_seq.offset.add_offset(mdlength), @@ -811,7 +655,7 @@ SegmentedJournal::RecordSubmitter::submit_pending( auto write_fut = p_current_batch->add_pending( std::move(record), handle, - journal_segment_manager.get_block_size()); + journal_segment_allocator.get_block_size()); if (do_flush) { DEBUG("H{} added pending and flush", (void*)&handle); flush_current_batch(); @@ -851,15 +695,15 @@ SegmentedJournal::RecordSubmitter::do_submit( // can increment io depth assert(!wait_submit_promise.has_value()); auto maybe_new_size = p_current_batch->can_batch( - record, journal_segment_manager.get_block_size()); + record, journal_segment_allocator.get_block_size()); if (!maybe_new_size.has_value() || (maybe_new_size->get_encoded_length() > - journal_segment_manager.get_max_write_length())) { + journal_segment_allocator.get_max_write_length())) { TRACE("H{} flush", (void*)&handle); assert(p_current_batch->is_pending()); flush_current_batch(); return do_submit(std::move(record), handle); - } else if (journal_segment_manager.needs_roll( + } else if (journal_segment_allocator.needs_roll( maybe_new_size->get_encoded_length())) { if (p_current_batch->is_pending()) { TRACE("H{} flush and roll", (void*)&handle); @@ -867,7 +711,7 @@ SegmentedJournal::RecordSubmitter::do_submit( } else { TRACE("H{} roll", (void*)&handle); } - return journal_segment_manager.roll( + return journal_segment_allocator.roll( ).safe_then([this, record=std::move(record), &handle]() mutable { return do_submit(std::move(record), handle); }); @@ -881,11 +725,11 @@ SegmentedJournal::RecordSubmitter::do_submit( assert(state == state_t::FULL); // cannot increment io depth auto maybe_new_size = p_current_batch->can_batch( - record, journal_segment_manager.get_block_size()); + record, journal_segment_allocator.get_block_size()); if (!maybe_new_size.has_value() || (maybe_new_size->get_encoded_length() > - journal_segment_manager.get_max_write_length()) || - journal_segment_manager.needs_roll( + journal_segment_allocator.get_max_write_length()) || + journal_segment_allocator.needs_roll( maybe_new_size->get_encoded_length())) { if (!wait_submit_promise.has_value()) { wait_submit_promise = seastar::promise<>(); diff --git a/src/crimson/os/seastore/journal/segmented_journal.h b/src/crimson/os/seastore/journal/segmented_journal.h index 23e8dbb0471..17e4056e715 100644 --- a/src/crimson/os/seastore/journal/segmented_journal.h +++ b/src/crimson/os/seastore/journal/segmented_journal.h @@ -18,10 +18,10 @@ #include "crimson/os/seastore/segment_cleaner.h" #include "crimson/os/seastore/journal.h" #include "crimson/os/seastore/extent_reader.h" -#include "crimson/os/seastore/segment_manager.h" #include "crimson/os/seastore/ordering_handle.h" #include "crimson/os/seastore/seastore_types.h" #include "crimson/osd/exceptions.h" +#include "segment_allocator.h" namespace crimson::os::seastore::journal { @@ -58,96 +58,6 @@ public: } private: - class JournalSegmentManager { - public: - JournalSegmentManager(SegmentManager&, SegmentProvider&); - - using base_ertr = crimson::errorator< - crimson::ct_error::input_output_error>; - extent_len_t get_max_write_length() const { - return segment_manager.get_segment_size() - - p2align(ceph::encoded_sizeof_bounded(), - size_t(segment_manager.get_block_size())); - } - - device_id_t get_device_id() const { - return segment_manager.get_device_id(); - } - - device_segment_id_t get_num_segments() const { - return segment_manager.get_num_segments(); - } - - seastore_off_t get_block_size() const { - return segment_manager.get_block_size(); - } - - segment_nonce_t get_nonce() const { - return current_segment_nonce; - } - - segment_seq_t get_segment_seq() const { - return next_journal_segment_seq - 1; - } - - void set_segment_seq(segment_seq_t current_seq) { - next_journal_segment_seq = (current_seq + 1); - } - - using open_ertr = base_ertr; - using open_ret = open_ertr::future; - open_ret open(); - - using close_ertr = base_ertr; - close_ertr::future<> close(); - - // returns true iff the current segment has insufficient space - bool needs_roll(std::size_t length) const { - auto write_capacity = current_journal_segment->get_write_capacity(); - return length + written_to > std::size_t(write_capacity); - } - - // close the current segment and initialize next one - using roll_ertr = base_ertr; - roll_ertr::future<> roll(); - - // write the buffer, return the write result - // May be called concurrently, writes may complete in any order. - using write_ertr = base_ertr; - using write_ret = write_ertr::future; - write_ret write(ceph::bufferlist to_write); - - private: - journal_seq_t get_current_write_seq() const { - assert(current_journal_segment); - return journal_seq_t{ - get_segment_seq(), - paddr_t::make_seg_paddr(current_journal_segment->get_segment_id(), - written_to) - }; - } - - void reset() { - next_journal_segment_seq = 0; - current_segment_nonce = 0; - current_journal_segment.reset(); - written_to = 0; - } - - // prepare segment for writes, writes out segment header - using initialize_segment_ertr = base_ertr; - initialize_segment_ertr::future<> initialize_segment(Segment&); - - SegmentProvider& segment_provider; - SegmentManager& segment_manager; - - segment_seq_t next_journal_segment_seq; - segment_nonce_t current_segment_nonce; - - SegmentRef current_journal_segment; - seastore_off_t written_to; - }; - class RecordBatch { enum class state_t { EMPTY = 0, @@ -212,7 +122,7 @@ private: // // Set write_result_t::write_length to 0 if the record is not the first one // in the batch. - using add_pending_ertr = JournalSegmentManager::write_ertr; + using add_pending_ertr = SegmentAllocator::write_ertr; using add_pending_ret = add_pending_ertr::future; add_pending_ret add_pending( record_t&&, @@ -290,7 +200,7 @@ private: std::size_t batch_capacity, std::size_t batch_flush_size, double preferred_fullness, - JournalSegmentManager&); + SegmentAllocator&); grouped_io_stats get_record_batch_stats() const { return stats.record_batch_stats; @@ -355,7 +265,7 @@ private: void flush_current_batch(); - using submit_pending_ertr = JournalSegmentManager::write_ertr; + using submit_pending_ertr = SegmentAllocator::write_ertr; using submit_pending_ret = submit_pending_ertr::future< record_locator_t>; submit_pending_ret submit_pending( @@ -371,7 +281,7 @@ private: double preferred_fullness; WritePipeline* write_pipeline = nullptr; - JournalSegmentManager& journal_segment_manager; + SegmentAllocator& journal_segment_allocator; // committed_to may be in a previous journal segment journal_seq_t journal_committed_to = JOURNAL_SEQ_NULL; @@ -392,7 +302,7 @@ private: }; SegmentProvider& segment_provider; - JournalSegmentManager journal_segment_manager; + SegmentAllocator journal_segment_allocator; RecordSubmitter record_submitter; ExtentReader& scanner; seastar::metrics::metric_group metrics; From 53279970327348ea588caab5df54b92f72abedc7 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Fri, 18 Feb 2022 22:26:38 +0800 Subject: [PATCH 6/6] crimson/os/seastore/epm: integrate SegmentAllocator with Writer Signed-off-by: Yingxin Cheng --- .../os/seastore/extent_placement_manager.cc | 152 +++--------------- .../os/seastore/extent_placement_manager.h | 35 +--- .../os/seastore/journal/segment_allocator.cc | 14 +- .../os/seastore/journal/segment_allocator.h | 10 ++ 4 files changed, 55 insertions(+), 156 deletions(-) diff --git a/src/crimson/os/seastore/extent_placement_manager.cc b/src/crimson/os/seastore/extent_placement_manager.cc index 5ba17f6038b..7dc44e78b01 100644 --- a/src/crimson/os/seastore/extent_placement_manager.cc +++ b/src/crimson/os/seastore/extent_placement_manager.cc @@ -3,14 +3,6 @@ #include "crimson/os/seastore/extent_placement_manager.h" -#include "crimson/os/seastore/segment_cleaner.h" - -namespace { - seastar::logger& logger() { - return crimson::get_logger(ceph_subsys_seastore_tm); - } -} - SET_SUBSYS(seastore_tm); namespace crimson::os::seastore { @@ -18,18 +10,15 @@ namespace crimson::os::seastore { SegmentedAllocator::SegmentedAllocator( SegmentProvider& sp, SegmentManager& sm) - : segment_provider(sp), - segment_manager(sm) { std::generate_n( std::back_inserter(writers), crimson::common::get_conf( "seastore_init_rewrite_segments_num_per_device"), [&] { - return Writer{ - segment_provider, - segment_manager}; - }); + return Writer{sp, sm}; + } + ); } SegmentedAllocator::Writer::write_iertr::future<> @@ -38,23 +27,19 @@ SegmentedAllocator::Writer::_write( ool_record_t& record) { LOG_PREFIX(SegmentedAllocator::Writer::_write); - record.set_base(allocated_to); + record.set_base(segment_allocator.get_written_to()); auto record_size = record.get_encoded_record_length(); - allocated_to += record_size.get_encoded_length(); - segment_provider.update_segment_avail_bytes( - paddr_t::make_seg_paddr( - current_segment->get_segment_id(), - allocated_to)); bufferlist bl = record.encode( - current_segment->get_segment_id(), - 0); + segment_allocator.get_segment_id(), + segment_allocator.get_nonce()); + assert(bl.length() == record_size.get_encoded_length()); DEBUGT( "written {} extents, {} bytes to segment {} at {}", t, record.get_num_extents(), bl.length(), - current_segment->get_segment_id(), + segment_allocator.get_segment_id(), record.get_base()); // account transactional ool writes before write() @@ -75,10 +60,7 @@ SegmentedAllocator::Writer::_write( } return trans_intr::make_interruptible( - current_segment->write(record.get_base(), bl - ).safe_then([FNAME, &t, base=record.get_base(), cs=current_segment] { - DEBUGT("written {} {}", t, cs->get_segment_id(), base); - }) + segment_allocator.write(bl).discard_result() ); } @@ -90,33 +72,29 @@ SegmentedAllocator::Writer::do_write( LOG_PREFIX(SegmentedAllocator::Writer::do_write); assert(!extents.empty()); if (roll_promise.has_value()) { - return roll_promise->get_shared_future( - ).then([this, &t, &extents] { + return trans_intr::make_interruptible( + roll_promise->get_shared_future() + ).then_interruptible([this, &t, &extents] { return do_write(t, extents); }); } - assert(current_segment); + assert(segment_allocator.can_write()); - ool_record_t record(segment_manager.get_block_size()); + ool_record_t record(segment_allocator.get_block_size()); for (auto it = extents.begin(); it != extents.end();) { auto& extent = *it; auto wouldbe_length = record.get_wouldbe_encoded_record_length(extent); - if (_needs_roll(wouldbe_length)) { + if (segment_allocator.needs_roll(wouldbe_length)) { // reached the segment end, write and roll assert(!roll_promise.has_value()); roll_promise = seastar::shared_promise<>(); auto num_extents = record.get_num_extents(); - DEBUGT( - "end of segment, writing {} extents to segment {} at {}", - t, - num_extents, - current_segment->get_segment_id(), - allocated_to); + DEBUGT("end of segment, writing {} extents", t, num_extents); return (num_extents ? _write(t, record) : write_iertr::now() ).si_then([this] { - return roll_segment(); + return segment_allocator.roll(); }).finally([this] { roll_promise->set_value(); roll_promise.reset(); @@ -133,12 +111,7 @@ SegmentedAllocator::Writer::do_write( it = extents.erase(it); } - DEBUGT( - "writing {} extents to segment {} at {}", - t, - record.get_num_extents(), - current_segment->get_segment_id(), - allocated_to); + DEBUGT("writing {} extents", t, record.get_num_extents()); return _write(t, record); } @@ -151,14 +124,15 @@ SegmentedAllocator::Writer::write( return write_iertr::now(); } return seastar::with_gate(write_guard, [this, &t, &extents] { - if (!roll_promise.has_value() && !current_segment) { + if (!roll_promise.has_value() && + !segment_allocator.can_write()) { roll_promise = seastar::shared_promise<>(); return trans_intr::make_interruptible( - roll_segment().finally([this] { - roll_promise->set_value(); - roll_promise.reset(); - }) - ).si_then([this, &t, &extents] { + segment_allocator.open().discard_result() + ).finally([this] { + roll_promise->set_value(); + roll_promise.reset(); + }).si_then([this, &t, &extents] { return do_write(t, extents); }); } @@ -166,80 +140,4 @@ SegmentedAllocator::Writer::write( }); } -bool SegmentedAllocator::Writer::_needs_roll(seastore_off_t length) const { - return allocated_to + length > current_segment->get_write_capacity(); -} - -SegmentedAllocator::Writer::init_segment_ertr::future<> -SegmentedAllocator::Writer::init_segment(Segment& segment) { - bufferptr bp( - ceph::buffer::create_page_aligned( - segment_manager.get_block_size())); - bp.zero(); - auto header =segment_header_t{ - OOL_SEG_SEQ, - segment.get_segment_id(), - NO_DELTAS, 0}; - logger().debug("SegmentedAllocator::Writer::init_segment: initting {}, {}", - segment.get_segment_id(), - header); - ceph::bufferlist bl; - encode(header, bl); - bl.cbegin().copy(bl.length(), bp.c_str()); - bl.clear(); - bl.append(bp); - allocated_to = segment_manager.get_block_size(); - return segment.write(0, bl).handle_error( - crimson::ct_error::input_output_error::pass_further{}, - crimson::ct_error::assert_all{ - "Invalid error when initing segment"} - ); -} - -SegmentedAllocator::Writer::roll_segment_ertr::future<> -SegmentedAllocator::Writer::roll_segment() { - LOG_PREFIX(SegmentedAllocator::Writer::roll_segment); - DEBUG("start"); - assert(roll_promise.has_value()); - return [this, FNAME] { - if (current_segment) { - auto seg_to_close = std::move(current_segment); - if (write_guard.is_closed()) { - DEBUG("write_guard is closed, should be stopping"); - return seg_to_close->close( - ).safe_then([seg_to_close=std::move(seg_to_close)] {}); - } else { - DEBUG("rolling OOL segment, close {} ...", seg_to_close->get_segment_id()); - (void) seastar::with_gate(write_guard, - [this, seg_to_close=std::move(seg_to_close)]() mutable - { - return seg_to_close->close( - ).safe_then([this, seg_to_close=std::move(seg_to_close)] { - segment_provider.close_segment(seg_to_close->get_segment_id()); - }); - }); - return Segment::close_ertr::now(); - } - } else { - DEBUG("rolling OOL segment, no current ..."); - return Segment::close_ertr::now(); - } - }().safe_then([this] { - auto new_segment_id = segment_provider.get_segment( - segment_manager.get_device_id(), OOL_SEG_SEQ); - return segment_manager.open(new_segment_id); - }).safe_then([this, FNAME](auto segref) { - DEBUG("opened new segment: {}", segref->get_segment_id()); - return init_segment(*segref - ).safe_then([segref=std::move(segref), this, FNAME] { - assert(!current_segment); - current_segment = segref; - DEBUG("inited new segment: {}", segref->get_segment_id()); - }); - }).handle_error( - roll_segment_ertr::pass_further{}, - crimson::ct_error::all_same_way([] { ceph_assert(0 == "TODO"); }) - ); -} - } diff --git a/src/crimson/os/seastore/extent_placement_manager.h b/src/crimson/os/seastore/extent_placement_manager.h index c582f04e589..7034618ade9 100644 --- a/src/crimson/os/seastore/extent_placement_manager.h +++ b/src/crimson/os/seastore/extent_placement_manager.h @@ -7,8 +7,8 @@ #include "seastar/core/shared_future.hh" #include "crimson/os/seastore/cached_extent.h" +#include "crimson/os/seastore/journal/segment_allocator.h" #include "crimson/os/seastore/logging.h" -#include "crimson/os/seastore/segment_manager.h" #include "crimson/os/seastore/transaction.h" namespace crimson::os::seastore { @@ -169,12 +169,9 @@ class SegmentProvider; class SegmentedAllocator : public ExtentAllocator { class Writer : public ExtentOolWriter { public: - Writer( - SegmentProvider& sp, - SegmentManager& sm) - : segment_provider(sp), - segment_manager(sm) - {} + Writer(SegmentProvider& sp, SegmentManager& sm) + : segment_allocator(segment_type_t::OOL, sp, sm) {} + Writer(Writer &&) = default; write_iertr::future<> write( @@ -183,36 +180,20 @@ class SegmentedAllocator : public ExtentAllocator { stop_ertr::future<> stop() final { return write_guard.close().then([this] { - if (current_segment) { - return current_segment->close(); - } else { - return Segment::close_ertr::now(); - } + return segment_allocator.close(); }); } + private: write_iertr::future<> do_write( Transaction& t, std::list& extent); - bool _needs_roll(seastore_off_t length) const; - write_iertr::future<> _write( Transaction& t, ool_record_t& record); - using roll_segment_ertr = crimson::errorator< - crimson::ct_error::input_output_error>; - roll_segment_ertr::future<> roll_segment(); - - using init_segment_ertr = crimson::errorator< - crimson::ct_error::input_output_error>; - init_segment_ertr::future<> init_segment(Segment& segment); - - SegmentProvider& segment_provider; - SegmentManager& segment_manager; - SegmentRef current_segment; - seastore_off_t allocated_to = 0; + journal::SegmentAllocator segment_allocator; std::optional> roll_promise; seastar::gate write_guard; }; @@ -251,8 +232,6 @@ public: }); } private: - SegmentProvider& segment_provider; - SegmentManager& segment_manager; std::vector writers; }; diff --git a/src/crimson/os/seastore/journal/segment_allocator.cc b/src/crimson/os/seastore/journal/segment_allocator.cc index b1245c61a17..5a55dc1c9fa 100644 --- a/src/crimson/os/seastore/journal/segment_allocator.cc +++ b/src/crimson/os/seastore/journal/segment_allocator.cc @@ -98,6 +98,11 @@ SegmentAllocator::open() auto new_journal_seq = journal_seq_t{ new_segment_seq, paddr_t::make_seg_paddr(segment_id, written_to)}; + if (type == segment_type_t::OOL) { + // FIXME: improve the special handling for OOL + segment_provider.update_segment_avail_bytes( + new_journal_seq.offset); + } return sref->write(0, bl ).handle_error( open_ertr::pass_further{}, @@ -153,6 +158,13 @@ SegmentAllocator::write(ceph::bufferlist to_write) static_cast(write_length) }; written_to += write_length; + if (type == segment_type_t::OOL) { + // FIXME: improve the special handling for OOL + segment_provider.update_segment_avail_bytes( + paddr_t::make_seg_paddr( + current_segment->get_segment_id(), written_to) + ); + } return current_segment->write( write_start_offset, to_write ).handle_error( @@ -160,7 +172,7 @@ SegmentAllocator::write(ceph::bufferlist to_write) crimson::ct_error::assert_all{ "Invalid error in SegmentAllocator::write" } - ).safe_then([write_result] { + ).safe_then([write_result, cs=current_segment] { return write_result; }); } diff --git a/src/crimson/os/seastore/journal/segment_allocator.h b/src/crimson/os/seastore/journal/segment_allocator.h index ad36cfd2593..5fec29145b8 100644 --- a/src/crimson/os/seastore/journal/segment_allocator.h +++ b/src/crimson/os/seastore/journal/segment_allocator.h @@ -50,11 +50,21 @@ class SegmentAllocator { return !!current_segment; } + segment_id_t get_segment_id() const { + assert(can_write()); + return current_segment->get_segment_id(); + } + segment_nonce_t get_nonce() const { assert(can_write()); return current_segment_nonce; } + seastore_off_t get_written_to() const { + assert(can_write()); + return written_to; + } + void set_next_segment_seq(segment_seq_t); // returns true iff the current segment has insufficient space