diff --git a/src/crimson/os/seastore/extent_reader.cc b/src/crimson/os/seastore/extent_reader.cc index e60296b408b..9ae0fcebf98 100644 --- a/src/crimson/os/seastore/extent_reader.cc +++ b/src/crimson/os/seastore/extent_reader.cc @@ -113,7 +113,7 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records( auto& segment_manager = *segment_managers[cursor.get_segment_id().device_id()]; if (cursor.get_segment_offset() == 0) { - cursor.increment(segment_manager.get_block_size()); + cursor.increment_seq(segment_manager.get_block_size()); } auto retref = std::make_unique(0); auto &budget_used = *retref; @@ -135,16 +135,7 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records( auto new_committed_to = header.committed_to; DEBUG("valid record read at {}, now committed at {}", cursor.seq, new_committed_to); - ceph_assert(cursor.last_committed == journal_seq_t() || - cursor.last_committed <= new_committed_to); - cursor.last_committed = new_committed_to; - cursor.pending_record_groups.emplace_back( - cursor.seq.offset, - header, - std::move(md_bl)); - cursor.increment(header.dlength + header.mdlength); - ceph_assert(new_committed_to == journal_seq_t() || - new_committed_to < cursor.seq); + cursor.emplace_record_group(header, std::move(md_bl)); return scan_valid_records_ertr::now(); } }).safe_then([=, &cursor, &budget_used, &handler] { @@ -318,7 +309,7 @@ ExtentReader::consume_next_records( next.header, next.mdbuffer ).safe_then([&cursor] { - cursor.pending_record_groups.pop_front(); + cursor.pop_record_group(); }); } diff --git a/src/crimson/os/seastore/seastore_types.cc b/src/crimson/os/seastore/seastore_types.cc index 21650e8fc4c..a68aa4de8be 100644 --- a/src/crimson/os/seastore/seastore_types.cc +++ b/src/crimson/os/seastore/seastore_types.cc @@ -516,5 +516,20 @@ blk_paddr_t convert_paddr_to_blk_paddr(paddr_t addr, size_t block_size, (block_size * blocks_per_segment) + s.get_segment_off()); } +void scan_valid_records_cursor::emplace_record_group( + const record_group_header_t& header, ceph::bufferlist&& md_bl) +{ + auto new_committed_to = header.committed_to; + ceph_assert(last_committed == journal_seq_t() || + last_committed <= new_committed_to); + last_committed = new_committed_to; + pending_record_groups.emplace_back( + seq.offset, + header, + std::move(md_bl)); + increment_seq(header.dlength + header.mdlength); + ceph_assert(new_committed_to == journal_seq_t() || + new_committed_to < seq); +} } diff --git a/src/crimson/os/seastore/seastore_types.h b/src/crimson/os/seastore/seastore_types.h index 01bfecc6010..165419ef1f1 100644 --- a/src/crimson/os/seastore/seastore_types.h +++ b/src/crimson/os/seastore/seastore_types.h @@ -1446,6 +1446,7 @@ struct scan_valid_records_cursor { bool last_valid_header_found = false; journal_seq_t seq; journal_seq_t last_committed; + std::size_t num_consumed_records = 0; struct found_record_group_t { paddr_t offset; @@ -1472,12 +1473,20 @@ struct scan_valid_records_cursor { return seq.offset.as_seg_paddr().get_segment_off(); } - void increment(segment_off_t off) { + void increment_seq(segment_off_t off) { auto& seg_addr = seq.offset.as_seg_paddr(); seg_addr.set_segment_off( seg_addr.get_segment_off() + off); } + void emplace_record_group(const record_group_header_t&, ceph::bufferlist&&); + + void pop_record_group() { + assert(!pending_record_groups.empty()); + ++num_consumed_records; + pending_record_groups.pop_front(); + } + scan_valid_records_cursor( journal_seq_t seq) : seq(seq) {}