mirror of
https://github.com/ceph/ceph
synced 2025-02-23 02:57:21 +00:00
crimson/os/seastore: count consumed records in cursor with cleanups
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
This commit is contained in:
parent
0d18428f0d
commit
c5e8e7ded9
@ -113,7 +113,7 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
|
||||
auto& segment_manager =
|
||||
*segment_managers[cursor.get_segment_id().device_id()];
|
||||
if (cursor.get_segment_offset() == 0) {
|
||||
cursor.increment(segment_manager.get_block_size());
|
||||
cursor.increment_seq(segment_manager.get_block_size());
|
||||
}
|
||||
auto retref = std::make_unique<size_t>(0);
|
||||
auto &budget_used = *retref;
|
||||
@ -135,16 +135,7 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
|
||||
auto new_committed_to = header.committed_to;
|
||||
DEBUG("valid record read at {}, now committed at {}",
|
||||
cursor.seq, new_committed_to);
|
||||
ceph_assert(cursor.last_committed == journal_seq_t() ||
|
||||
cursor.last_committed <= new_committed_to);
|
||||
cursor.last_committed = new_committed_to;
|
||||
cursor.pending_record_groups.emplace_back(
|
||||
cursor.seq.offset,
|
||||
header,
|
||||
std::move(md_bl));
|
||||
cursor.increment(header.dlength + header.mdlength);
|
||||
ceph_assert(new_committed_to == journal_seq_t() ||
|
||||
new_committed_to < cursor.seq);
|
||||
cursor.emplace_record_group(header, std::move(md_bl));
|
||||
return scan_valid_records_ertr::now();
|
||||
}
|
||||
}).safe_then([=, &cursor, &budget_used, &handler] {
|
||||
@ -318,7 +309,7 @@ ExtentReader::consume_next_records(
|
||||
next.header,
|
||||
next.mdbuffer
|
||||
).safe_then([&cursor] {
|
||||
cursor.pending_record_groups.pop_front();
|
||||
cursor.pop_record_group();
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -516,5 +516,20 @@ blk_paddr_t convert_paddr_to_blk_paddr(paddr_t addr, size_t block_size,
|
||||
(block_size * blocks_per_segment) + s.get_segment_off());
|
||||
}
|
||||
|
||||
void scan_valid_records_cursor::emplace_record_group(
|
||||
const record_group_header_t& header, ceph::bufferlist&& md_bl)
|
||||
{
|
||||
auto new_committed_to = header.committed_to;
|
||||
ceph_assert(last_committed == journal_seq_t() ||
|
||||
last_committed <= new_committed_to);
|
||||
last_committed = new_committed_to;
|
||||
pending_record_groups.emplace_back(
|
||||
seq.offset,
|
||||
header,
|
||||
std::move(md_bl));
|
||||
increment_seq(header.dlength + header.mdlength);
|
||||
ceph_assert(new_committed_to == journal_seq_t() ||
|
||||
new_committed_to < seq);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1446,6 +1446,7 @@ struct scan_valid_records_cursor {
|
||||
bool last_valid_header_found = false;
|
||||
journal_seq_t seq;
|
||||
journal_seq_t last_committed;
|
||||
std::size_t num_consumed_records = 0;
|
||||
|
||||
struct found_record_group_t {
|
||||
paddr_t offset;
|
||||
@ -1472,12 +1473,20 @@ struct scan_valid_records_cursor {
|
||||
return seq.offset.as_seg_paddr().get_segment_off();
|
||||
}
|
||||
|
||||
void increment(segment_off_t off) {
|
||||
void increment_seq(segment_off_t off) {
|
||||
auto& seg_addr = seq.offset.as_seg_paddr();
|
||||
seg_addr.set_segment_off(
|
||||
seg_addr.get_segment_off() + off);
|
||||
}
|
||||
|
||||
void emplace_record_group(const record_group_header_t&, ceph::bufferlist&&);
|
||||
|
||||
void pop_record_group() {
|
||||
assert(!pending_record_groups.empty());
|
||||
++num_consumed_records;
|
||||
pending_record_groups.pop_front();
|
||||
}
|
||||
|
||||
scan_valid_records_cursor(
|
||||
journal_seq_t seq)
|
||||
: seq(seq) {}
|
||||
|
Loading…
Reference in New Issue
Block a user