mirror of
https://github.com/ceph/ceph
synced 2025-01-03 01:22:53 +00:00
crimson/os/seastore/journal: add scan_valid_records
scan_valid_records validates record checksums and uses last_comitted to avoid checking data crcs except on the final few records. Will replace scan_segment in a few patches. Signed-off-by: Samuel Just <sjust@redhat.com>
This commit is contained in:
parent
ac08cfa188
commit
68c17a0252
@ -160,6 +160,20 @@ bool Journal::validate_metadata(const bufferlist &bl)
|
||||
return test_crc == recorded_crc;
|
||||
}
|
||||
|
||||
Journal::read_validate_data_ret Journal::read_validate_data(
|
||||
paddr_t record_base,
|
||||
const record_header_t &header)
|
||||
{
|
||||
return segment_manager.read(
|
||||
record_base.add_offset(header.mdlength),
|
||||
header.dlength
|
||||
).safe_then([=, &header](auto bptr) {
|
||||
bufferlist bl;
|
||||
bl.append(bptr);
|
||||
return bl.crc32c(-1) == header.data_crc;
|
||||
});
|
||||
}
|
||||
|
||||
Journal::write_record_ret Journal::write_record(
|
||||
record_size_t rsize,
|
||||
record_t &&record)
|
||||
@ -722,4 +736,108 @@ Journal::scan_segment_ret Journal::scan_segment(
|
||||
});
|
||||
}
|
||||
|
||||
Journal::scan_valid_records_ret Journal::scan_valid_records(
|
||||
scan_valid_records_cursor &cursor,
|
||||
segment_nonce_t nonce,
|
||||
size_t budget,
|
||||
found_record_handler_t &handler)
|
||||
{
|
||||
if (cursor.offset.offset == 0) {
|
||||
cursor.offset.offset = block_size;
|
||||
}
|
||||
auto retref = std::make_unique<size_t>(0);
|
||||
auto budget_used = *retref;
|
||||
return crimson::do_until(
|
||||
[=, &cursor, &budget_used, &handler]() mutable
|
||||
-> scan_valid_records_ertr::future<bool> {
|
||||
return [=, &handler, &cursor, &budget_used] {
|
||||
if (!cursor.last_valid_header_found) {
|
||||
return read_validate_record_metadata(cursor.offset, nonce
|
||||
).safe_then([=, &cursor, &handler](auto md) {
|
||||
logger().debug(
|
||||
"Journal::scan_valid_records: read complete {}",
|
||||
cursor.offset);
|
||||
if (!md) {
|
||||
logger().debug(
|
||||
"Journal::scan_valid_records: found invalid header at {}, presumably at end",
|
||||
cursor.offset);
|
||||
cursor.last_valid_header_found = true;
|
||||
return scan_valid_records_ertr::now();
|
||||
} else {
|
||||
logger().debug(
|
||||
"Journal::scan_valid_records: valid record read at {}",
|
||||
cursor.offset);
|
||||
cursor.last_committed = paddr_t{
|
||||
cursor.offset.segment,
|
||||
md->first.committed_to};
|
||||
cursor.pending_records.emplace_back(
|
||||
cursor.offset,
|
||||
md->first,
|
||||
md->second);
|
||||
cursor.offset.offset +=
|
||||
md->first.dlength + md->first.mdlength;
|
||||
return scan_valid_records_ertr::now();
|
||||
}
|
||||
}).safe_then([=, &cursor, &budget_used, &handler] {
|
||||
return crimson::do_until(
|
||||
[=, &budget_used, &cursor, &handler] {
|
||||
logger().debug(
|
||||
"Journal::scan_valid_records: valid record read, processing queue");
|
||||
if (cursor.pending_records.empty()) {
|
||||
/* This is only possible if the segment is empty.
|
||||
* A record's last_commited must be prior to its own
|
||||
* location since it itself cannot yet have been committed
|
||||
* at its own time of submission. Thus, the most recently
|
||||
* read record must always fall after cursor.last_committed */
|
||||
return scan_valid_records_ertr::make_ready_future<bool>(true);
|
||||
}
|
||||
auto &next = cursor.pending_records.front();
|
||||
if (next.offset > cursor.last_committed) {
|
||||
return scan_valid_records_ertr::make_ready_future<bool>(true);
|
||||
}
|
||||
budget_used +=
|
||||
next.header.dlength + next.header.mdlength;
|
||||
return handler(
|
||||
next.offset,
|
||||
next.header,
|
||||
next.mdbuffer
|
||||
).safe_then([&cursor] {
|
||||
cursor.pending_records.pop_front();
|
||||
return scan_valid_records_ertr::make_ready_future<bool>(false);
|
||||
});
|
||||
});
|
||||
});
|
||||
} else {
|
||||
assert(!cursor.pending_records.empty());
|
||||
auto &next = cursor.pending_records.front();
|
||||
return read_validate_data(next.offset, next.header
|
||||
).safe_then([=, &budget_used, &next, &cursor, &handler](auto valid) {
|
||||
if (!valid) {
|
||||
cursor.pending_records.clear();
|
||||
return scan_valid_records_ertr::now();
|
||||
}
|
||||
budget_used +=
|
||||
next.header.dlength + next.header.mdlength;
|
||||
return handler(
|
||||
next.offset,
|
||||
next.header,
|
||||
next.mdbuffer
|
||||
).safe_then([&cursor] {
|
||||
cursor.pending_records.pop_front();
|
||||
return scan_valid_records_ertr::now();
|
||||
});
|
||||
});
|
||||
}
|
||||
}().safe_then([=, &budget_used, &cursor] {
|
||||
return scan_valid_records_ertr::make_ready_future<bool>(
|
||||
cursor.is_complete() || budget_used >= budget);
|
||||
});
|
||||
}).safe_then([retref=std::move(retref)]() mutable -> scan_valid_records_ret {
|
||||
return scan_valid_records_ret(
|
||||
scan_valid_records_ertr::ready_future_marker{},
|
||||
std::move(*retref));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -366,6 +366,56 @@ private:
|
||||
delta_scan_handler_t *delta_handler,
|
||||
extent_handler_t *extent_info_handler
|
||||
);
|
||||
public:
|
||||
/// scan segment for end incrementally
|
||||
struct scan_valid_records_cursor {
|
||||
bool last_valid_header_found = false;
|
||||
paddr_t offset;
|
||||
paddr_t last_committed;
|
||||
|
||||
struct found_record_t {
|
||||
paddr_t offset;
|
||||
record_header_t header;
|
||||
bufferlist mdbuffer;
|
||||
|
||||
found_record_t(
|
||||
paddr_t offset,
|
||||
const record_header_t &header,
|
||||
const bufferlist &mdbuffer)
|
||||
: offset(offset), header(header), mdbuffer(mdbuffer) {}
|
||||
};
|
||||
std::deque<found_record_t> pending_records;
|
||||
|
||||
bool is_complete() const {
|
||||
return last_valid_header_found && pending_records.empty();
|
||||
}
|
||||
|
||||
paddr_t get_offset() const {
|
||||
return offset;
|
||||
}
|
||||
|
||||
scan_valid_records_cursor(
|
||||
paddr_t offset)
|
||||
: offset(offset) {}
|
||||
};
|
||||
private:
|
||||
|
||||
using scan_valid_records_ertr = SegmentManager::read_ertr;
|
||||
using scan_valid_records_ret = scan_valid_records_ertr::future<
|
||||
size_t>;
|
||||
using found_record_handler_t = std::function<
|
||||
scan_valid_records_ertr::future<>(
|
||||
paddr_t record_block_base,
|
||||
// callee may assume header and bl will remain valid until
|
||||
// returned future resolves
|
||||
const record_header_t &header,
|
||||
const bufferlist &bl)>;
|
||||
scan_valid_records_ret scan_valid_records(
|
||||
scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
|
||||
segment_nonce_t nonce, ///< [in] nonce for segment
|
||||
size_t budget, ///< [in] max budget to use
|
||||
found_record_handler_t &handler ///< [in] handler for records
|
||||
); ///< @return used budget
|
||||
|
||||
/// replays records starting at start through end of segment
|
||||
replay_ertr::future<>
|
||||
|
Loading…
Reference in New Issue
Block a user