mirror of
https://github.com/ceph/ceph
synced 2025-02-22 10:37:15 +00:00
Merge pull request #43754 from cyx1231st/wip-seastore-fix-journal-committed-to
crimson/os/seastore: fix ordered updates to JournalSegmentManager::committed_to Reviewed-by: Samuel Just <sjust@redhat.com> Reviewed-by: Xuehan Xu <xxhdx1985126@gmail.com>
This commit is contained in:
commit
5e56258a34
@ -69,7 +69,7 @@ public:
|
||||
extent_offset += extent.get_bptr().length();
|
||||
}
|
||||
assert(extent_offset == (segment_off_t)(base + rsize.mdlength + rsize.dlength));
|
||||
return encode_record(rsize, std::move(record), block_size, base, nonce);
|
||||
return encode_record(rsize, std::move(record), block_size, journal_seq_t(), nonce);
|
||||
}
|
||||
void add_extent(LogicalCachedExtentRef& extent) {
|
||||
extents.emplace_back(extent);
|
||||
@ -87,7 +87,7 @@ public:
|
||||
void set_base(segment_off_t b) {
|
||||
base = b;
|
||||
}
|
||||
segment_off_t get_base() {
|
||||
segment_off_t get_base() const {
|
||||
return base;
|
||||
}
|
||||
void clear() {
|
||||
|
@ -62,7 +62,7 @@ ExtentReader::scan_extents_ret ExtentReader::scan_extents(
|
||||
{
|
||||
auto ret = std::make_unique<scan_extents_ret_bare>();
|
||||
auto* extents = ret.get();
|
||||
return read_segment_header(cursor.get_offset().segment
|
||||
return read_segment_header(cursor.get_segment_id()
|
||||
).handle_error(
|
||||
scan_extents_ertr::pass_further{},
|
||||
crimson::ct_error::assert_all{
|
||||
@ -114,9 +114,9 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
|
||||
found_record_handler_t &handler)
|
||||
{
|
||||
auto& segment_manager =
|
||||
*segment_managers[cursor.offset.segment.device_id()];
|
||||
if (cursor.offset.offset == 0) {
|
||||
cursor.offset.offset = segment_manager.get_block_size();
|
||||
*segment_managers[cursor.get_segment_id().device_id()];
|
||||
if (cursor.get_segment_offset() == 0) {
|
||||
cursor.increment(segment_manager.get_block_size());
|
||||
}
|
||||
auto retref = std::make_unique<size_t>(0);
|
||||
auto &budget_used = *retref;
|
||||
@ -125,30 +125,33 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
|
||||
-> scan_valid_records_ertr::future<seastar::stop_iteration> {
|
||||
return [=, &handler, &cursor, &budget_used] {
|
||||
if (!cursor.last_valid_header_found) {
|
||||
return read_validate_record_metadata(cursor.offset, nonce
|
||||
return read_validate_record_metadata(cursor.seq.offset, nonce
|
||||
).safe_then([=, &cursor](auto md) {
|
||||
logger().debug(
|
||||
"ExtentReader::scan_valid_records: read complete {}",
|
||||
cursor.offset);
|
||||
cursor.seq);
|
||||
if (!md) {
|
||||
logger().debug(
|
||||
"ExtentReader::scan_valid_records: found invalid header at {}, presumably at end",
|
||||
cursor.offset);
|
||||
cursor.seq);
|
||||
cursor.last_valid_header_found = true;
|
||||
return scan_valid_records_ertr::now();
|
||||
} else {
|
||||
auto new_committed_to = md->first.committed_to;
|
||||
logger().debug(
|
||||
"ExtentReader::scan_valid_records: valid record read at {}",
|
||||
cursor.offset);
|
||||
cursor.last_committed = paddr_t{
|
||||
cursor.offset.segment,
|
||||
md->first.committed_to};
|
||||
"ExtentReader::scan_valid_records: valid record read at {}, now committed at {}",
|
||||
cursor.seq,
|
||||
new_committed_to);
|
||||
ceph_assert(cursor.last_committed == journal_seq_t() ||
|
||||
cursor.last_committed <= new_committed_to);
|
||||
cursor.last_committed = new_committed_to;
|
||||
cursor.pending_records.emplace_back(
|
||||
cursor.offset,
|
||||
cursor.seq.offset,
|
||||
md->first,
|
||||
md->second);
|
||||
cursor.offset.offset +=
|
||||
md->first.dlength + md->first.mdlength;
|
||||
cursor.increment(md->first.dlength + md->first.mdlength);
|
||||
ceph_assert(new_committed_to == journal_seq_t() ||
|
||||
new_committed_to < cursor.seq);
|
||||
return scan_valid_records_ertr::now();
|
||||
}
|
||||
}).safe_then([=, &cursor, &budget_used, &handler] {
|
||||
@ -166,7 +169,9 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
|
||||
seastar::stop_iteration>(seastar::stop_iteration::yes);
|
||||
}
|
||||
auto &next = cursor.pending_records.front();
|
||||
if (next.offset > cursor.last_committed) {
|
||||
journal_seq_t next_seq = {cursor.seq.segment_seq, next.offset};
|
||||
if (cursor.last_committed == journal_seq_t() ||
|
||||
next_seq > cursor.last_committed) {
|
||||
return scan_valid_records_ertr::make_ready_future<
|
||||
seastar::stop_iteration>(seastar::stop_iteration::yes);
|
||||
}
|
||||
|
@ -170,7 +170,7 @@ Journal::replay_segment(
|
||||
{
|
||||
logger().debug("Journal::replay_segment: starting at {}", seq);
|
||||
return seastar::do_with(
|
||||
scan_valid_records_cursor(seq.offset),
|
||||
scan_valid_records_cursor(seq),
|
||||
ExtentReader::found_record_handler_t(
|
||||
[=, &handler](paddr_t base,
|
||||
const record_header_t &header,
|
||||
@ -378,13 +378,9 @@ void Journal::JournalSegmentManager::mark_committed(
|
||||
logger().debug(
|
||||
"JournalSegmentManager::mark_committed: committed_to {} => {}",
|
||||
committed_to, new_committed_to);
|
||||
assert(new_committed_to.segment_seq <=
|
||||
get_segment_seq());
|
||||
if (new_committed_to.segment_seq ==
|
||||
get_segment_seq()) {
|
||||
assert(committed_to.offset.offset < new_committed_to.offset.offset);
|
||||
committed_to = new_committed_to;
|
||||
}
|
||||
assert(committed_to == journal_seq_t() ||
|
||||
committed_to <= new_committed_to);
|
||||
committed_to = new_committed_to;
|
||||
}
|
||||
|
||||
Journal::JournalSegmentManager::initialize_segment_ertr::future<>
|
||||
@ -401,7 +397,7 @@ Journal::JournalSegmentManager::initialize_segment(Segment& segment)
|
||||
auto header = segment_header_t{
|
||||
seq,
|
||||
segment.get_segment_id(),
|
||||
segment_provider->get_journal_tail_target(),
|
||||
new_tail,
|
||||
current_segment_nonce,
|
||||
false};
|
||||
logger().debug(
|
||||
@ -421,14 +417,9 @@ Journal::JournalSegmentManager::initialize_segment(Segment& segment)
|
||||
bl.append(bp);
|
||||
|
||||
written_to = 0;
|
||||
// FIXME: improve committed_to to point to another segment
|
||||
committed_to = get_current_write_seq();
|
||||
return write(bl
|
||||
).safe_then([this, new_tail, write_size=bl.length()
|
||||
](journal_seq_t write_start_seq) {
|
||||
auto committed_to = write_start_seq;
|
||||
committed_to.offset.offset += write_size;
|
||||
mark_committed(committed_to);
|
||||
segment_provider->update_journal_tail_committed(new_tail);
|
||||
});
|
||||
}
|
||||
@ -475,7 +466,7 @@ Journal::RecordBatch::add_pending(
|
||||
|
||||
ceph::bufferlist Journal::RecordBatch::encode_records(
|
||||
size_t block_size,
|
||||
segment_off_t committed_to,
|
||||
const journal_seq_t& committed_to,
|
||||
segment_nonce_t segment_nonce)
|
||||
{
|
||||
logger().debug(
|
||||
@ -532,7 +523,7 @@ ceph::bufferlist Journal::RecordBatch::submit_pending_fast(
|
||||
record_t&& record,
|
||||
const record_size_t& rsize,
|
||||
size_t block_size,
|
||||
segment_off_t committed_to,
|
||||
const journal_seq_t& committed_to,
|
||||
segment_nonce_t segment_nonce)
|
||||
{
|
||||
logger().debug(
|
||||
|
@ -140,10 +140,8 @@ private:
|
||||
return current_segment_nonce;
|
||||
}
|
||||
|
||||
segment_off_t get_committed_to() const {
|
||||
assert(committed_to.segment_seq ==
|
||||
get_segment_seq());
|
||||
return committed_to.offset.offset;
|
||||
journal_seq_t get_committed_to() const {
|
||||
return committed_to;
|
||||
}
|
||||
|
||||
segment_seq_t get_segment_seq() const {
|
||||
@ -287,7 +285,7 @@ private:
|
||||
// Encode the batched records for write.
|
||||
ceph::bufferlist encode_records(
|
||||
size_t block_size,
|
||||
segment_off_t committed_to,
|
||||
const journal_seq_t& committed_to,
|
||||
segment_nonce_t segment_nonce);
|
||||
|
||||
// Set the write result and reset for reuse
|
||||
@ -304,7 +302,7 @@ private:
|
||||
record_t&&,
|
||||
const record_size_t&,
|
||||
size_t block_size,
|
||||
segment_off_t committed_to,
|
||||
const journal_seq_t& committed_to,
|
||||
segment_nonce_t segment_nonce);
|
||||
|
||||
private:
|
||||
|
@ -150,7 +150,7 @@ ceph::bufferlist encode_record(
|
||||
record_size_t rsize,
|
||||
record_t &&record,
|
||||
size_t block_size,
|
||||
segment_off_t committed_to,
|
||||
const journal_seq_t& committed_to,
|
||||
segment_nonce_t current_segment_nonce)
|
||||
{
|
||||
bufferlist data_bl;
|
||||
|
@ -1143,11 +1143,11 @@ struct record_header_t {
|
||||
// Fixed portion
|
||||
extent_len_t mdlength; // block aligned, length of metadata
|
||||
extent_len_t dlength; // block aligned, length of data
|
||||
uint32_t deltas; // number of deltas
|
||||
uint32_t extents; // number of extents
|
||||
uint32_t deltas; // number of deltas
|
||||
uint32_t extents; // number of extents
|
||||
segment_nonce_t segment_nonce;// nonce of containing segment
|
||||
segment_off_t committed_to; // records in this segment prior to committed_to
|
||||
// have been fully written
|
||||
journal_seq_t committed_to; // records prior to committed_to have been
|
||||
// fully written, maybe in another segment.
|
||||
checksum_t data_crc; // crc of data payload
|
||||
|
||||
|
||||
@ -1188,14 +1188,14 @@ ceph::bufferlist encode_record(
|
||||
record_size_t rsize,
|
||||
record_t &&record,
|
||||
size_t block_size,
|
||||
segment_off_t committed_to,
|
||||
const journal_seq_t& committed_to,
|
||||
segment_nonce_t current_segment_nonce = 0);
|
||||
|
||||
/// scan segment for end incrementally
|
||||
struct scan_valid_records_cursor {
|
||||
bool last_valid_header_found = false;
|
||||
paddr_t offset;
|
||||
paddr_t last_committed;
|
||||
journal_seq_t seq;
|
||||
journal_seq_t last_committed;
|
||||
|
||||
struct found_record_t {
|
||||
paddr_t offset;
|
||||
@ -1214,13 +1214,21 @@ struct scan_valid_records_cursor {
|
||||
return last_valid_header_found && pending_records.empty();
|
||||
}
|
||||
|
||||
paddr_t get_offset() const {
|
||||
return offset;
|
||||
segment_id_t get_segment_id() const {
|
||||
return seq.offset.segment;
|
||||
}
|
||||
|
||||
segment_off_t get_segment_offset() const {
|
||||
return seq.offset.offset;
|
||||
}
|
||||
|
||||
void increment(segment_off_t off) {
|
||||
seq.offset.offset += off;
|
||||
}
|
||||
|
||||
scan_valid_records_cursor(
|
||||
paddr_t offset)
|
||||
: offset(offset) {}
|
||||
journal_seq_t seq)
|
||||
: seq(seq) {}
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -321,20 +321,18 @@ SegmentCleaner::gc_trim_journal_ret SegmentCleaner::gc_trim_journal()
|
||||
SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space()
|
||||
{
|
||||
if (!scan_cursor) {
|
||||
paddr_t next = P_ADDR_NULL;
|
||||
next.segment = get_next_gc_target();
|
||||
if (next == P_ADDR_NULL) {
|
||||
journal_seq_t next = get_next_gc_target();
|
||||
if (next == journal_seq_t()) {
|
||||
logger().debug(
|
||||
"SegmentCleaner::do_gc: no segments to gc");
|
||||
return seastar::now();
|
||||
}
|
||||
next.offset = 0;
|
||||
scan_cursor =
|
||||
std::make_unique<ExtentReader::scan_extents_cursor>(
|
||||
next);
|
||||
logger().debug(
|
||||
"SegmentCleaner::do_gc: starting gc on segment {}",
|
||||
scan_cursor->get_offset().segment);
|
||||
scan_cursor->seq);
|
||||
} else {
|
||||
ceph_assert(!scan_cursor->is_complete());
|
||||
}
|
||||
@ -384,7 +382,7 @@ SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space()
|
||||
});
|
||||
}).si_then([this, &t] {
|
||||
if (scan_cursor->is_complete()) {
|
||||
t.mark_segment_to_release(scan_cursor->get_offset().segment);
|
||||
t.mark_segment_to_release(scan_cursor->get_segment_id());
|
||||
}
|
||||
return ecb->submit_transaction_direct(t);
|
||||
});
|
||||
|
@ -683,10 +683,6 @@ public:
|
||||
|
||||
void update_journal_tail_target(journal_seq_t target);
|
||||
|
||||
void init_journal_tail(journal_seq_t tail) {
|
||||
journal_tail_target = journal_tail_committed = tail;
|
||||
}
|
||||
|
||||
void init_mkfs(journal_seq_t head) {
|
||||
journal_tail_target = head;
|
||||
journal_tail_committed = head;
|
||||
@ -773,30 +769,32 @@ public:
|
||||
assert(ret >= 0);
|
||||
}
|
||||
|
||||
segment_id_t get_next_gc_target() const {
|
||||
segment_id_t ret = NULL_SEG_ID;
|
||||
journal_seq_t get_next_gc_target() const {
|
||||
segment_id_t id = NULL_SEG_ID;
|
||||
segment_seq_t seq = NULL_SEG_SEQ;
|
||||
int64_t least_live_bytes = std::numeric_limits<int64_t>::max();
|
||||
for (auto it = segments.begin();
|
||||
it != segments.end();
|
||||
++it) {
|
||||
auto id = it->first;
|
||||
auto _id = it->first;
|
||||
const auto& segment_info = it->second;
|
||||
if (segment_info.is_closed() &&
|
||||
!segment_info.is_in_journal(journal_tail_committed) &&
|
||||
space_tracker->get_usage(id) < least_live_bytes) {
|
||||
ret = id;
|
||||
space_tracker->get_usage(_id) < least_live_bytes) {
|
||||
id = _id;
|
||||
seq = segment_info.journal_segment_seq;
|
||||
least_live_bytes = space_tracker->get_usage(id);
|
||||
}
|
||||
}
|
||||
if (ret != NULL_SEG_ID) {
|
||||
if (id != NULL_SEG_ID) {
|
||||
crimson::get_logger(ceph_subsys_seastore).debug(
|
||||
"SegmentCleaner::get_next_gc_target: segment {} seq {}",
|
||||
ret,
|
||||
id,
|
||||
seq);
|
||||
return journal_seq_t{seq, {id, 0}};
|
||||
} else {
|
||||
return journal_seq_t();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
SpaceTrackerIRef get_empty_space_tracker() const {
|
||||
@ -987,7 +985,7 @@ private:
|
||||
if (!scan_cursor)
|
||||
return 0;
|
||||
|
||||
return scan_cursor->get_offset().offset;
|
||||
return scan_cursor->get_segment_offset();
|
||||
}
|
||||
|
||||
/// Returns free space available for writes
|
||||
|
Loading…
Reference in New Issue
Block a user