mirror of
https://github.com/ceph/ceph
synced 2025-02-20 17:37:29 +00:00
crimson/os/seastore: genralize read_validate_record_metadata and read_validate_data
Signed-off-by: Myoungwon Oh <myoungwon.oh@samsung.com>
This commit is contained in:
parent
a10f2855da
commit
15ae64c331
@ -109,63 +109,6 @@ CircularBoundedJournal::do_submit_record(
|
||||
});
|
||||
}
|
||||
|
||||
RecordScanner::read_validate_record_metadata_ret
|
||||
CircularBoundedJournal::read_validate_record_metadata(
|
||||
scan_valid_records_cursor &cursor,
|
||||
segment_nonce_t nonce)
|
||||
{
|
||||
LOG_PREFIX(CircularBoundedJournal::read_validate_record_metadata);
|
||||
paddr_t start = cursor.seq.offset;
|
||||
return read_record(start, nonce
|
||||
).safe_then([FNAME, &cursor, this](auto ret) {
|
||||
if (!ret.has_value()) {
|
||||
return read_validate_record_metadata_ret(
|
||||
read_validate_record_metadata_ertr::ready_future_marker{},
|
||||
std::nullopt);
|
||||
}
|
||||
auto [r_header, bl] = *ret;
|
||||
auto print_invalid = [FNAME](auto &r_header) {
|
||||
DEBUG("invalid header: {}", r_header);
|
||||
return read_validate_record_metadata_ret(
|
||||
read_validate_record_metadata_ertr::ready_future_marker{},
|
||||
std::nullopt);
|
||||
};
|
||||
if (cursor.seq.offset == convert_abs_addr_to_paddr(
|
||||
get_records_start(), get_device_id())) {
|
||||
if ((r_header.committed_to.segment_seq == NULL_SEG_SEQ &&
|
||||
cursor.seq.segment_seq != 0) ||
|
||||
r_header.committed_to.segment_seq != cursor.seq.segment_seq - 1) {
|
||||
return print_invalid(r_header);
|
||||
}
|
||||
} else if (r_header.committed_to.segment_seq != cursor.seq.segment_seq) {
|
||||
return print_invalid(r_header);
|
||||
}
|
||||
|
||||
bufferlist mdbuf;
|
||||
mdbuf.substr_of(bl, 0, r_header.mdlength);
|
||||
DEBUG("header: {}", r_header);
|
||||
return read_validate_record_metadata_ret(
|
||||
read_validate_record_metadata_ertr::ready_future_marker{},
|
||||
std::make_pair(std::move(r_header), std::move(mdbuf)));
|
||||
});
|
||||
}
|
||||
|
||||
RecordScanner::read_validate_data_ret CircularBoundedJournal::read_validate_data(
|
||||
paddr_t record_base,
|
||||
const record_group_header_t &header)
|
||||
{
|
||||
return read_record(record_base, header.segment_nonce
|
||||
).safe_then([](auto ret) {
|
||||
// read_record would return non-empty value if the record is valid
|
||||
if (!ret.has_value()) {
|
||||
return read_validate_data_ret(
|
||||
read_validate_data_ertr::ready_future_marker{},
|
||||
false);
|
||||
}
|
||||
return read_validate_data_ertr::make_ready_future<bool>(true);
|
||||
});
|
||||
}
|
||||
|
||||
Journal::replay_ret CircularBoundedJournal::replay_segment(
|
||||
cbj_delta_handler_t &handler, scan_valid_records_cursor& cursor)
|
||||
{
|
||||
@ -288,6 +231,42 @@ Journal::replay_ret CircularBoundedJournal::scan_valid_record_delta(
|
||||
});
|
||||
}
|
||||
|
||||
RecordScanner::read_ret CircularBoundedJournal::read(paddr_t start, size_t len)
|
||||
{
|
||||
LOG_PREFIX(CircularBoundedJournal::read);
|
||||
rbm_abs_addr addr = convert_paddr_to_abs_addr(start);
|
||||
DEBUG("reading data from addr {} read length {}", addr, len);
|
||||
auto bptr = bufferptr(ceph::buffer::create_page_aligned(len));
|
||||
return cjs.read(addr, bptr
|
||||
).safe_then([bptr=std::move(bptr)]() {
|
||||
return read_ret(
|
||||
RecordScanner::read_ertr::ready_future_marker{},
|
||||
std::move(bptr)
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
bool CircularBoundedJournal::is_record_segment_seq_invalid(
|
||||
scan_valid_records_cursor &cursor,
|
||||
record_group_header_t &r_header)
|
||||
{
|
||||
LOG_PREFIX(CircularBoundedJournal::is_record_segment_seq_invalid);
|
||||
auto print_invalid = [FNAME](auto &r_header) {
|
||||
DEBUG("invalid header: {}", r_header);
|
||||
return true;
|
||||
};
|
||||
if (cursor.seq.offset == convert_abs_addr_to_paddr(
|
||||
get_records_start(), get_device_id())) {
|
||||
if ((r_header.committed_to.segment_seq == NULL_SEG_SEQ &&
|
||||
cursor.seq.segment_seq != 0) ||
|
||||
r_header.committed_to.segment_seq != cursor.seq.segment_seq - 1) {
|
||||
return print_invalid(r_header);
|
||||
}
|
||||
} else if (r_header.committed_to.segment_seq != cursor.seq.segment_seq) {
|
||||
return print_invalid(r_header);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
Journal::replay_ret CircularBoundedJournal::replay(
|
||||
delta_handler_t &&delta_handler)
|
||||
|
@ -188,18 +188,21 @@ public:
|
||||
cursor.seq.segment_seq += 1;
|
||||
}
|
||||
|
||||
void initialize_cursor(scan_valid_records_cursor& cursor) final {};
|
||||
void initialize_cursor(scan_valid_records_cursor& cursor) final {
|
||||
cursor.block_size = get_block_size();
|
||||
};
|
||||
|
||||
Journal::replay_ret replay_segment(
|
||||
cbj_delta_handler_t &handler, scan_valid_records_cursor& cursor);
|
||||
|
||||
read_validate_record_metadata_ret read_validate_record_metadata(
|
||||
scan_valid_records_cursor &cursor,
|
||||
segment_nonce_t nonce) final;
|
||||
read_ret read(paddr_t start, size_t len) final;
|
||||
|
||||
read_validate_data_ret read_validate_data(
|
||||
paddr_t record_base,
|
||||
const record_group_header_t &header) final;
|
||||
bool is_record_segment_seq_invalid(scan_valid_records_cursor &cursor,
|
||||
record_group_header_t &h) final;
|
||||
|
||||
int64_t get_segment_end_offset(paddr_t addr) final {
|
||||
return get_journal_end();
|
||||
}
|
||||
|
||||
// Test interfaces
|
||||
|
||||
|
@ -105,6 +105,103 @@ RecordScanner::scan_valid_records(
|
||||
});
|
||||
}
|
||||
|
||||
RecordScanner::read_validate_record_metadata_ret
|
||||
RecordScanner::read_validate_record_metadata(
|
||||
scan_valid_records_cursor &cursor,
|
||||
segment_nonce_t nonce)
|
||||
{
|
||||
LOG_PREFIX(RecordScanner::read_validate_record_metadata);
|
||||
paddr_t start = cursor.seq.offset;
|
||||
auto block_size = cursor.get_block_size();
|
||||
if (get_segment_off(cursor.seq.offset) + block_size > get_segment_end_offset(cursor.seq.offset)) {
|
||||
DEBUG("failed -- record group header block {}~4096 > segment_size {}",
|
||||
start, get_segment_end_offset(cursor.seq.offset));
|
||||
return read_validate_record_metadata_ret(
|
||||
read_validate_record_metadata_ertr::ready_future_marker{},
|
||||
std::nullopt);
|
||||
}
|
||||
TRACE("reading record group header block {}~4096", start);
|
||||
return read(start, block_size
|
||||
).safe_then([=](bufferptr bptr) mutable
|
||||
-> read_validate_record_metadata_ret {
|
||||
bufferlist bl;
|
||||
bl.append(bptr);
|
||||
auto maybe_header = try_decode_records_header(bl, nonce);
|
||||
if (!maybe_header.has_value()) {
|
||||
return read_validate_record_metadata_ret(
|
||||
read_validate_record_metadata_ertr::ready_future_marker{},
|
||||
std::nullopt);
|
||||
}
|
||||
|
||||
auto& header = *maybe_header;
|
||||
if (header.mdlength < block_size ||
|
||||
header.mdlength % block_size != 0 ||
|
||||
header.dlength % block_size != 0 ||
|
||||
(header.committed_to != JOURNAL_SEQ_NULL &&
|
||||
get_segment_off(header.committed_to.offset) %
|
||||
cursor.get_block_size() != 0) ||
|
||||
(get_segment_off(cursor.seq.offset) + header.mdlength + header.dlength >
|
||||
get_segment_end_offset(cursor.seq.offset))) {
|
||||
ERROR("failed, invalid record group header {}", header);
|
||||
return crimson::ct_error::input_output_error::make();
|
||||
}
|
||||
|
||||
if (is_record_segment_seq_invalid(cursor, header)) {
|
||||
return read_validate_record_metadata_ret(
|
||||
read_validate_record_metadata_ertr::ready_future_marker{},
|
||||
std::nullopt);
|
||||
}
|
||||
|
||||
if (header.mdlength == block_size) {
|
||||
return read_validate_record_metadata_ret(
|
||||
read_validate_record_metadata_ertr::ready_future_marker{},
|
||||
std::make_pair(std::move(header), std::move(bl))
|
||||
);
|
||||
}
|
||||
|
||||
paddr_t rest_start = cursor.seq.offset.add_offset(block_size);
|
||||
auto rest_len = header.mdlength - block_size;
|
||||
TRACE("reading record group header rest {}~{}", rest_start, rest_len);
|
||||
return read(rest_start, rest_len
|
||||
).safe_then([header=std::move(header), bl=std::move(bl)
|
||||
](auto&& bptail) mutable {
|
||||
bl.push_back(bptail);
|
||||
return read_validate_record_metadata_ret(
|
||||
read_validate_record_metadata_ertr::ready_future_marker{},
|
||||
std::make_pair(std::move(header), std::move(bl)));
|
||||
});
|
||||
}).safe_then([](auto p) {
|
||||
if (p && validate_records_metadata(p->second)) {
|
||||
return read_validate_record_metadata_ret(
|
||||
read_validate_record_metadata_ertr::ready_future_marker{},
|
||||
std::move(*p)
|
||||
);
|
||||
} else {
|
||||
return read_validate_record_metadata_ret(
|
||||
read_validate_record_metadata_ertr::ready_future_marker{},
|
||||
std::nullopt);
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
RecordScanner::read_validate_data_ret RecordScanner::read_validate_data(
|
||||
paddr_t record_base,
|
||||
const record_group_header_t &header)
|
||||
{
|
||||
LOG_PREFIX(RecordScanner::read_validate_data);
|
||||
auto data_addr = record_base.add_offset(header.mdlength);
|
||||
TRACE("reading record group data blocks {}~{}", data_addr, header.dlength);
|
||||
return read(
|
||||
data_addr,
|
||||
header.dlength
|
||||
).safe_then([=, &header](auto bptr) {
|
||||
bufferlist bl;
|
||||
bl.append(bptr);
|
||||
return validate_records_data(header, bl);
|
||||
});
|
||||
}
|
||||
|
||||
RecordScanner::consume_record_group_ertr::future<>
|
||||
RecordScanner::consume_next_records(
|
||||
scan_valid_records_cursor& cursor,
|
||||
|
@ -30,6 +30,16 @@ public:
|
||||
found_record_handler_t &handler ///< [in] handler for records
|
||||
); ///< @return used budget
|
||||
|
||||
device_off_t get_segment_off(paddr_t addr) const {
|
||||
if (addr.get_addr_type() == paddr_types_t::SEGMENT) {
|
||||
auto& seg_addr = addr.as_seg_paddr();
|
||||
return seg_addr.get_segment_off();
|
||||
}
|
||||
assert(addr.get_addr_type() == paddr_types_t::RANDOM_BLOCK);
|
||||
auto& blk_addr = addr.as_blk_paddr();
|
||||
return blk_addr.get_device_off();
|
||||
}
|
||||
|
||||
protected:
|
||||
/// read record metadata for record starting at start
|
||||
using read_validate_record_metadata_ertr = read_ertr;
|
||||
@ -37,18 +47,26 @@ protected:
|
||||
read_validate_record_metadata_ertr::future<
|
||||
std::optional<std::pair<record_group_header_t, bufferlist>>
|
||||
>;
|
||||
virtual read_validate_record_metadata_ret read_validate_record_metadata(
|
||||
read_validate_record_metadata_ret read_validate_record_metadata(
|
||||
scan_valid_records_cursor &cursor,
|
||||
segment_nonce_t nonce) = 0;
|
||||
segment_nonce_t nonce);
|
||||
|
||||
/// read and validate data
|
||||
using read_validate_data_ertr = read_ertr;
|
||||
using read_validate_data_ret = read_validate_data_ertr::future<bool>;
|
||||
virtual read_validate_data_ret read_validate_data(
|
||||
read_validate_data_ret read_validate_data(
|
||||
paddr_t record_base,
|
||||
const record_group_header_t &header ///< caller must ensure lifetime through
|
||||
/// future resolution
|
||||
) = 0;
|
||||
);
|
||||
|
||||
virtual bool is_record_segment_seq_invalid(scan_valid_records_cursor &cursor,
|
||||
record_group_header_t &h) = 0;
|
||||
|
||||
virtual int64_t get_segment_end_offset(paddr_t addr) = 0;
|
||||
|
||||
using read_ret = read_ertr::future<bufferptr>;
|
||||
virtual read_ret read(paddr_t start, size_t len) = 0;
|
||||
|
||||
using consume_record_group_ertr = scan_valid_records_ertr;
|
||||
consume_record_group_ertr::future<> consume_next_records(
|
||||
|
@ -2056,6 +2056,7 @@ struct scan_valid_records_cursor {
|
||||
journal_seq_t seq;
|
||||
journal_seq_t last_committed;
|
||||
std::size_t num_consumed_records = 0;
|
||||
extent_len_t block_size = 0;
|
||||
|
||||
struct found_record_group_t {
|
||||
paddr_t offset;
|
||||
@ -2082,6 +2083,10 @@ struct scan_valid_records_cursor {
|
||||
return seq.offset.as_seg_paddr().get_segment_off();
|
||||
}
|
||||
|
||||
extent_len_t get_block_size() const {
|
||||
return block_size;
|
||||
}
|
||||
|
||||
void increment_seq(segment_off_t off) {
|
||||
seq.offset = seq.offset.add_offset(off);
|
||||
}
|
||||
|
@ -102,102 +102,24 @@ void SegmentManagerGroup::initialize_cursor(
|
||||
INFO("start to scan segment {}", cursor.get_segment_id());
|
||||
cursor.increment_seq(segment_manager.get_block_size());
|
||||
}
|
||||
cursor.block_size = segment_manager.get_block_size();
|
||||
}
|
||||
|
||||
SegmentManagerGroup::read_validate_record_metadata_ret
|
||||
SegmentManagerGroup::read_validate_record_metadata(
|
||||
scan_valid_records_cursor &cursor,
|
||||
segment_nonce_t nonce)
|
||||
SegmentManagerGroup::read_ret
|
||||
SegmentManagerGroup::read(paddr_t start, size_t len)
|
||||
{
|
||||
LOG_PREFIX(SegmentManagerGroup::read_validate_record_metadata);
|
||||
paddr_t start = cursor.seq.offset;
|
||||
auto& seg_addr = start.as_seg_paddr();
|
||||
assert(has_device(seg_addr.get_segment_id().device_id()));
|
||||
auto& segment_manager = *segment_managers[seg_addr.get_segment_id().device_id()];
|
||||
auto block_size = segment_manager.get_block_size();
|
||||
auto segment_size = static_cast<int64_t>(segment_manager.get_segment_size());
|
||||
if (seg_addr.get_segment_off() + block_size > segment_size) {
|
||||
DEBUG("failed -- record group header block {}~4096 > segment_size {}", start, segment_size);
|
||||
return read_validate_record_metadata_ret(
|
||||
read_validate_record_metadata_ertr::ready_future_marker{},
|
||||
std::nullopt);
|
||||
}
|
||||
TRACE("reading record group header block {}~4096", start);
|
||||
return segment_manager.read(start, block_size
|
||||
).safe_then([=, &segment_manager](bufferptr bptr) mutable
|
||||
-> read_validate_record_metadata_ret {
|
||||
auto block_size = segment_manager.get_block_size();
|
||||
bufferlist bl;
|
||||
bl.append(bptr);
|
||||
auto maybe_header = try_decode_records_header(bl, nonce);
|
||||
if (!maybe_header.has_value()) {
|
||||
return read_validate_record_metadata_ret(
|
||||
read_validate_record_metadata_ertr::ready_future_marker{},
|
||||
std::nullopt);
|
||||
}
|
||||
auto& seg_addr = start.as_seg_paddr();
|
||||
auto& header = *maybe_header;
|
||||
if (header.mdlength < block_size ||
|
||||
header.mdlength % block_size != 0 ||
|
||||
header.dlength % block_size != 0 ||
|
||||
(header.committed_to != JOURNAL_SEQ_NULL &&
|
||||
header.committed_to.offset.as_seg_paddr().get_segment_off() % block_size != 0) ||
|
||||
(seg_addr.get_segment_off() + header.mdlength + header.dlength > segment_size)) {
|
||||
ERROR("failed, invalid record group header {}", start);
|
||||
return crimson::ct_error::input_output_error::make();
|
||||
}
|
||||
if (header.mdlength == block_size) {
|
||||
return read_validate_record_metadata_ret(
|
||||
read_validate_record_metadata_ertr::ready_future_marker{},
|
||||
std::make_pair(std::move(header), std::move(bl))
|
||||
);
|
||||
}
|
||||
|
||||
auto rest_start = paddr_t::make_seg_paddr(
|
||||
seg_addr.get_segment_id(),
|
||||
seg_addr.get_segment_off() + block_size
|
||||
);
|
||||
auto rest_len = header.mdlength - block_size;
|
||||
TRACE("reading record group header rest {}~{}", rest_start, rest_len);
|
||||
return segment_manager.read(rest_start, rest_len
|
||||
).safe_then([header=std::move(header), bl=std::move(bl)
|
||||
](auto&& bptail) mutable {
|
||||
bl.push_back(bptail);
|
||||
return read_validate_record_metadata_ret(
|
||||
read_validate_record_metadata_ertr::ready_future_marker{},
|
||||
std::make_pair(std::move(header), std::move(bl)));
|
||||
});
|
||||
}).safe_then([](auto p) {
|
||||
if (p && validate_records_metadata(p->second)) {
|
||||
return read_validate_record_metadata_ret(
|
||||
read_validate_record_metadata_ertr::ready_future_marker{},
|
||||
std::move(*p)
|
||||
);
|
||||
} else {
|
||||
return read_validate_record_metadata_ret(
|
||||
read_validate_record_metadata_ertr::ready_future_marker{},
|
||||
std::nullopt);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SegmentManagerGroup::read_validate_data_ret
|
||||
SegmentManagerGroup::read_validate_data(
|
||||
paddr_t record_base,
|
||||
const record_group_header_t &header)
|
||||
{
|
||||
LOG_PREFIX(SegmentManagerGroup::read_validate_data);
|
||||
assert(has_device(record_base.get_device_id()));
|
||||
auto& segment_manager = *segment_managers[record_base.get_device_id()];
|
||||
auto data_addr = record_base.add_offset(header.mdlength);
|
||||
TRACE("reading record group data blocks {}~{}", data_addr, header.dlength);
|
||||
LOG_PREFIX(SegmentManagerGroup::read);
|
||||
assert(has_device(start.get_device_id()));
|
||||
auto& segment_manager = *segment_managers[start.get_device_id()];
|
||||
TRACE("reading data {}~{}", start, len);
|
||||
return segment_manager.read(
|
||||
data_addr,
|
||||
header.dlength
|
||||
).safe_then([=, &header](auto bptr) {
|
||||
bufferlist bl;
|
||||
bl.append(bptr);
|
||||
return validate_records_data(header, bl);
|
||||
start,
|
||||
len
|
||||
).safe_then([](auto bptr) {
|
||||
return read_ret(
|
||||
read_ertr::ready_future_marker{},
|
||||
std::move(bptr)
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -128,15 +128,18 @@ private:
|
||||
|
||||
void initialize_cursor(scan_valid_records_cursor &cursor) final;
|
||||
|
||||
read_validate_record_metadata_ret read_validate_record_metadata(
|
||||
scan_valid_records_cursor &cursor,
|
||||
segment_nonce_t nonce) final;
|
||||
read_ret read(paddr_t start, size_t len) final;
|
||||
|
||||
read_validate_data_ret read_validate_data(
|
||||
paddr_t record_base,
|
||||
const record_group_header_t &header ///< caller must ensure lifetime through
|
||||
/// future resolution
|
||||
) final;
|
||||
bool is_record_segment_seq_invalid(scan_valid_records_cursor &cursor,
|
||||
record_group_header_t &header) final {
|
||||
return false;
|
||||
}
|
||||
|
||||
int64_t get_segment_end_offset(paddr_t addr) final {
|
||||
auto& seg_addr = addr.as_seg_paddr();
|
||||
auto& segment_manager = *segment_managers[seg_addr.get_segment_id().device_id()];
|
||||
return static_cast<int64_t>(segment_manager.get_segment_size());
|
||||
}
|
||||
|
||||
std::vector<SegmentManager*> segment_managers;
|
||||
std::set<device_id_t> device_ids;
|
||||
|
Loading…
Reference in New Issue
Block a user