Merge pull request #48773 from myoungwon/wip-fix-out-date-delta

crimson/os/seastore: add deallocation map during replay to filter out out-dated delta

Reviewed-by: Yingxin Cheng <yingxin.cheng@intel.com>
This commit is contained in:
Yingxin 2022-11-22 09:44:48 +08:00 committed by GitHub
commit 6a3c09ed78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 159 additions and 112 deletions

View File

@ -1762,17 +1762,7 @@ Cache::replay_delta(
DEBUG("replay extent delta at {} {} ... -- {}, prv_extent={}",
journal_seq, record_base, delta, *extent);
if (extent->last_committed_crc != delta.prev_crc) {
// FIXME: we can't rely on crc to detect whether is delta is
// out-of-date.
ERROR("identified delta crc {} doesn't match the extent at {} {}, "
"probably is out-dated -- {}",
delta, journal_seq, record_base, *extent);
ceph_assert(epm.get_journal_type() == journal_type_t::RANDOM_BLOCK);
remove_extent(extent);
return replay_delta_ertr::make_ready_future<bool>(false);
}
assert(extent->last_committed_crc == delta.prev_crc);
assert(extent->version == delta.pversion);
extent->apply_delta_and_adjust_crc(record_base, delta.bl);
extent->set_modify_time(modify_time);

View File

@ -245,6 +245,110 @@ CircularBoundedJournal::read_header()
});
}
Journal::replay_ret CircularBoundedJournal::scan_valid_record_delta(
cbj_delta_handler_t &&delta_handler, journal_seq_t tail)
{
LOG_PREFIX(CircularBoundedJournal::scan_valid_record_delta);
return seastar::do_with(
bool(false),
rbm_abs_addr(get_rbm_addr(tail)),
std::move(delta_handler),
segment_seq_t(NULL_SEG_SEQ),
[this, FNAME](auto &is_rolled, auto &cursor_addr, auto &d_handler, auto &expected_seq) {
return crimson::repeat(
[this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME]() mutable
-> replay_ertr::future<seastar::stop_iteration> {
paddr_t record_paddr = convert_abs_addr_to_paddr(
cursor_addr,
get_device_id());
return read_record(record_paddr, expected_seq
).safe_then([this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME](auto ret)
-> replay_ertr::future<seastar::stop_iteration> {
if (!ret.has_value()) {
if (expected_seq == NULL_SEG_SEQ || is_rolled) {
DEBUG("no more records, stop replaying");
return replay_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::yes);
} else {
cursor_addr = get_records_start();
++expected_seq;
is_rolled = true;
return replay_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::no);
}
}
auto [r_header, bl] = *ret;
bufferlist mdbuf;
mdbuf.substr_of(bl, 0, r_header.mdlength);
paddr_t record_block_base = paddr_t::make_blk_paddr(
get_device_id(), cursor_addr + r_header.mdlength);
auto maybe_record_deltas_list = try_decode_deltas(
r_header, mdbuf, record_block_base);
if (!maybe_record_deltas_list) {
// This should be impossible, we did check the crc on the mdbuf
ERROR("unable to decode deltas for record {} at {}",
r_header, record_block_base);
return crimson::ct_error::input_output_error::make();
}
DEBUG("{} at {}", r_header, cursor_addr);
auto write_result = write_result_t{
r_header.committed_to,
bl.length()
};
if (expected_seq == NULL_SEG_SEQ) {
expected_seq = r_header.committed_to.segment_seq;
} else {
assert(expected_seq == r_header.committed_to.segment_seq);
}
cursor_addr += bl.length();
if (cursor_addr >= get_journal_end()) {
assert(cursor_addr == get_journal_end());
cursor_addr = get_records_start();
++expected_seq;
is_rolled = true;
}
paddr_t addr = convert_abs_addr_to_paddr(
cursor_addr,
get_device_id());
set_written_to(
journal_seq_t{expected_seq, addr});
return seastar::do_with(
std::move(*maybe_record_deltas_list),
[write_result,
&d_handler,
FNAME](auto& record_deltas_list) {
return crimson::do_for_each(
record_deltas_list,
[write_result,
&d_handler, FNAME](record_deltas_t& record_deltas) {
auto locator = record_locator_t{
record_deltas.record_block_base,
write_result
};
DEBUG("processing {} deltas at block_base {}",
record_deltas.deltas.size(),
locator);
return crimson::do_for_each(
record_deltas.deltas,
[locator,
&d_handler](auto& p) {
auto& modify_time = p.first;
auto& delta = p.second;
return d_handler(
locator,
delta,
modify_time).discard_result();
});
}).safe_then([]() {
return replay_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::no);
});
});
});
});
});
}
Journal::replay_ret CircularBoundedJournal::replay(
delta_handler_t &&delta_handler)
{
@ -257,116 +361,59 @@ Journal::replay_ret CircularBoundedJournal::replay(
open_for_mount_ertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error read_header"
}).safe_then([this, FNAME, delta_handler=std::move(delta_handler)](auto p) mutable {
}).safe_then([this, FNAME, delta_handler=std::move(delta_handler)](auto p)
mutable {
auto &[head, bl] = *p;
header = head;
DEBUG("header : {}", header);
initialized = true;
written_to.segment_seq = NULL_SEG_SEQ;
auto tail = get_dirty_tail() <= get_alloc_tail() ?
get_dirty_tail() : get_alloc_tail();
set_written_to(tail);
return seastar::do_with(
bool(false),
rbm_abs_addr(get_rbm_addr(tail)),
std::move(delta_handler),
segment_seq_t(NULL_SEG_SEQ),
[this, FNAME](auto &is_rolled, auto &cursor_addr, auto &d_handler, auto &expected_seq) {
return crimson::repeat(
[this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME]() mutable
-> replay_ertr::future<seastar::stop_iteration> {
paddr_t record_paddr = convert_abs_addr_to_paddr(
cursor_addr,
get_device_id());
return read_record(record_paddr, expected_seq
).safe_then([this, &is_rolled, &cursor_addr, &d_handler, &expected_seq, FNAME](auto ret)
-> replay_ertr::future<seastar::stop_iteration> {
if (!ret.has_value()) {
if (expected_seq == NULL_SEG_SEQ || is_rolled) {
DEBUG("no more records, stop replaying");
return replay_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::yes);
} else {
cursor_addr = get_records_start();
++expected_seq;
is_rolled = true;
return replay_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::no);
std::map<paddr_t, journal_seq_t>(),
[this](auto &d_handler, auto &map) {
auto build_paddr_seq_map = [&map](
const auto &offsets,
const auto &e,
sea_time_point modify_time)
{
if (e.type == extent_types_t::ALLOC_INFO) {
alloc_delta_t alloc_delta;
decode(alloc_delta, e.bl);
if (alloc_delta.op == alloc_delta_t::op_types_t::CLEAR) {
for (auto &alloc_blk : alloc_delta.alloc_blk_ranges) {
map[alloc_blk.paddr] = offsets.write_result.start_seq;
}
}
auto [r_header, bl] = *ret;
bufferlist mdbuf;
mdbuf.substr_of(bl, 0, r_header.mdlength);
paddr_t record_block_base = paddr_t::make_blk_paddr(
get_device_id(), cursor_addr + r_header.mdlength);
auto maybe_record_deltas_list = try_decode_deltas(
r_header, mdbuf, record_block_base);
if (!maybe_record_deltas_list) {
// This should be impossible, we did check the crc on the mdbuf
ERROR("unable to decode deltas for record {} at {}",
r_header, record_block_base);
return crimson::ct_error::input_output_error::make();
}
return replay_ertr::make_ready_future<bool>(true);
};
written_to.segment_seq = NULL_SEG_SEQ;
auto tail = get_dirty_tail() <= get_alloc_tail() ?
get_dirty_tail() : get_alloc_tail();
set_written_to(tail);
// The first pass to build the paddr->journal_seq_t map
// from extent allocations
return scan_valid_record_delta(std::move(build_paddr_seq_map), tail
).safe_then([this, &map, &d_handler, tail]() {
auto call_d_handler_if_valid = [this, &map, &d_handler](
const auto &offsets,
const auto &e,
sea_time_point modify_time)
{
if (map.find(e.paddr) == map.end() ||
map[e.paddr] <= offsets.write_result.start_seq) {
return d_handler(
offsets,
e,
header.dirty_tail,
header.alloc_tail,
modify_time
);
}
DEBUG("{} at {}", r_header, cursor_addr);
auto write_result = write_result_t{
r_header.committed_to,
bl.length()
};
if (expected_seq == NULL_SEG_SEQ) {
expected_seq = r_header.committed_to.segment_seq;
} else {
assert(expected_seq == r_header.committed_to.segment_seq);
}
cursor_addr += bl.length();
if (cursor_addr >= get_journal_end()) {
assert(cursor_addr == get_journal_end());
cursor_addr = get_records_start();
++expected_seq;
is_rolled = true;
}
paddr_t addr = convert_abs_addr_to_paddr(
cursor_addr,
get_device_id());
set_written_to(
journal_seq_t{expected_seq, addr});
return seastar::do_with(
std::move(*maybe_record_deltas_list),
[this,
write_result,
&d_handler,
FNAME](auto& record_deltas_list) {
return crimson::do_for_each(
record_deltas_list,
[this,
write_result,
&d_handler, FNAME](record_deltas_t& record_deltas) {
auto locator = record_locator_t{
record_deltas.record_block_base,
write_result
};
DEBUG("processing {} deltas at block_base {}",
record_deltas.deltas.size(),
locator);
return crimson::do_for_each(
record_deltas.deltas,
[this,
locator,
&d_handler](auto& p) {
auto& modify_time = p.first;
auto& delta = p.second;
return d_handler(
locator,
delta,
header.dirty_tail,
header.alloc_tail,
modify_time).discard_result();
});
}).safe_then([]() {
return replay_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::no);
});
});
});
return replay_ertr::make_ready_future<bool>(true);
};
// The second pass to replay deltas
return scan_valid_record_delta(std::move(call_d_handler_if_valid), tail);
});
}).safe_then([this]() {
trimmer.update_journal_tails(

View File

@ -277,6 +277,16 @@ public:
}
seastar::future<> finish_commit(transaction_type_t type) final;
using cbj_delta_handler_t = std::function<
replay_ertr::future<bool>(
const record_locator_t&,
const delta_info_t&,
sea_time_point modify_time)>;
Journal::replay_ret scan_valid_record_delta(
cbj_delta_handler_t &&delta_handler,
journal_seq_t tail);
private:
cbj_header_t header;
JournalTrimmer &trimmer;