mirror of
https://github.com/ceph/ceph
synced 2025-02-24 03:27:10 +00:00
crimson/os/seastore/journal: pass journal_seq with each replay delta
We'll need this to track where extents became dirty during replay. Signed-off-by: Samuel Just <sjust@redhat.com>
This commit is contained in:
parent
7bb393b8c6
commit
0cfc47814d
@ -308,18 +308,22 @@ Cache::close_ertr::future<> Cache::close()
|
||||
}
|
||||
|
||||
Cache::replay_delta_ret
|
||||
Cache::replay_delta(paddr_t record_base, const delta_info_t &delta)
|
||||
Cache::replay_delta(
|
||||
journal_seq_t journal_seq,
|
||||
paddr_t record_base,
|
||||
const delta_info_t &delta)
|
||||
{
|
||||
if (delta.type == extent_types_t::ROOT) {
|
||||
logger().debug("replay_delta: found root delta");
|
||||
root->apply_delta_and_adjust_crc(record_base, delta.bl);
|
||||
root->dirty_from = journal_seq;
|
||||
return replay_delta_ertr::now();
|
||||
} else {
|
||||
return get_extent_by_type(
|
||||
delta.type,
|
||||
delta.paddr,
|
||||
delta.laddr,
|
||||
delta.length).safe_then([this, record_base, delta](auto extent) {
|
||||
delta.length).safe_then([=](auto extent) {
|
||||
logger().debug(
|
||||
"replay_delta: replaying {} on {}",
|
||||
*extent,
|
||||
|
@ -320,7 +320,10 @@ public:
|
||||
using replay_delta_ertr = crimson::errorator<
|
||||
crimson::ct_error::input_output_error>;
|
||||
using replay_delta_ret = replay_delta_ertr::future<>;
|
||||
replay_delta_ret replay_delta(paddr_t record_base, const delta_info_t &delta);
|
||||
replay_delta_ret replay_delta(
|
||||
journal_seq_t seq,
|
||||
paddr_t record_block_base,
|
||||
const delta_info_t &delta);
|
||||
|
||||
/**
|
||||
* init_cached_extents
|
||||
|
@ -216,7 +216,9 @@ Journal::find_replay_segments_fut Journal::find_replay_segments()
|
||||
rt.second.journal_segment_seq;
|
||||
});
|
||||
|
||||
auto replay_from = segments.rbegin()->second.journal_tail.offset;
|
||||
auto journal_tail = segments.rbegin()->second.journal_tail;
|
||||
segment_provider->update_journal_tail_committed(journal_tail);
|
||||
auto replay_from = journal_tail.offset;
|
||||
auto from = segments.begin();
|
||||
if (replay_from != P_ADDR_NULL) {
|
||||
from = std::find_if(
|
||||
@ -228,13 +230,15 @@ Journal::find_replay_segments_fut Journal::find_replay_segments()
|
||||
} else {
|
||||
replay_from = paddr_t{from->first, (segment_off_t)block_size};
|
||||
}
|
||||
auto ret = std::vector<paddr_t>(segments.end() - from);
|
||||
auto ret = std::vector<journal_seq_t>(segments.end() - from);
|
||||
std::transform(
|
||||
from, segments.end(), ret.begin(),
|
||||
[this](const auto &p) {
|
||||
return paddr_t{p.first, (segment_off_t)block_size};
|
||||
return journal_seq_t{
|
||||
p.second.journal_segment_seq,
|
||||
paddr_t{p.first, (segment_off_t)block_size}};
|
||||
});
|
||||
ret[0] = replay_from;
|
||||
ret[0].offset = replay_from;
|
||||
return find_replay_segments_fut(
|
||||
find_replay_segments_ertr::ready_future_marker{},
|
||||
std::move(ret));
|
||||
@ -287,6 +291,7 @@ std::optional<std::vector<delta_info_t>> Journal::try_decode_deltas(
|
||||
{
|
||||
auto bliter = bl.cbegin();
|
||||
bliter += ceph::encoded_sizeof_bounded<record_header_t>();
|
||||
logger().debug("{}: decoding {} deltas", __func__, header.deltas);
|
||||
std::vector<delta_info_t> deltas(header.deltas);
|
||||
for (auto &&i : deltas) {
|
||||
try {
|
||||
@ -300,17 +305,17 @@ std::optional<std::vector<delta_info_t>> Journal::try_decode_deltas(
|
||||
|
||||
Journal::replay_ertr::future<>
|
||||
Journal::replay_segment(
|
||||
paddr_t start,
|
||||
journal_seq_t seq,
|
||||
delta_handler_t &delta_handler)
|
||||
{
|
||||
logger().debug("replay_segment: starting at {}", start);
|
||||
logger().debug("replay_segment: starting at {}", seq);
|
||||
return seastar::do_with(
|
||||
std::move(start),
|
||||
[this, &delta_handler](auto ¤t) {
|
||||
paddr_t(seq.offset),
|
||||
[=, &delta_handler](paddr_t ¤t) {
|
||||
return crimson::do_until(
|
||||
[this, ¤t, &delta_handler]() -> replay_ertr::future<bool> {
|
||||
[=, ¤t, &delta_handler]() -> replay_ertr::future<bool> {
|
||||
return read_record_metadata(current).safe_then
|
||||
([this, ¤t, &delta_handler](auto p)
|
||||
([=, ¤t, &delta_handler](auto p)
|
||||
-> replay_ertr::future<bool> {
|
||||
if (!p.has_value()) {
|
||||
return replay_ertr::make_ready_future<bool>(true);
|
||||
@ -336,11 +341,14 @@ Journal::replay_segment(
|
||||
|
||||
return seastar::do_with(
|
||||
std::move(*deltas),
|
||||
[this, &delta_handler, record_start](auto &deltas) {
|
||||
[=, &delta_handler](auto &deltas) {
|
||||
return crimson::do_for_each(
|
||||
deltas,
|
||||
[this, &delta_handler, record_start](auto &info) {
|
||||
[=, &delta_handler](auto &info) {
|
||||
return delta_handler(
|
||||
journal_seq_t{
|
||||
seq.segment_seq,
|
||||
record_start},
|
||||
record_start.add_offset(block_size),
|
||||
info);
|
||||
});
|
||||
@ -355,7 +363,7 @@ Journal::replay_segment(
|
||||
Journal::replay_ret Journal::replay(delta_handler_t &&delta_handler)
|
||||
{
|
||||
return seastar::do_with(
|
||||
std::move(delta_handler), std::vector<paddr_t>(),
|
||||
std::move(delta_handler), std::vector<journal_seq_t>(),
|
||||
[this](auto&& handler, auto&& segments) mutable -> replay_ret {
|
||||
return find_replay_segments().safe_then(
|
||||
[this, &handler, &segments](auto replay_segs) {
|
||||
|
@ -160,7 +160,9 @@ public:
|
||||
using replay_ertr = SegmentManager::read_ertr;
|
||||
using replay_ret = replay_ertr::future<>;
|
||||
using delta_handler_t = std::function<
|
||||
replay_ret(paddr_t record_start, const delta_info_t&)>;
|
||||
replay_ret(journal_seq_t seq,
|
||||
paddr_t record_block_base,
|
||||
const delta_info_t&)>;
|
||||
replay_ret replay(delta_handler_t &&delta_handler);
|
||||
|
||||
private:
|
||||
@ -229,8 +231,8 @@ private:
|
||||
using find_replay_segments_ertr = crimson::errorator<
|
||||
crimson::ct_error::input_output_error
|
||||
>;
|
||||
using find_replay_segments_fut =
|
||||
find_replay_segments_ertr::future<std::vector<paddr_t>>;
|
||||
using find_replay_segments_fut = find_replay_segments_ertr::future<
|
||||
std::vector<journal_seq_t>>;
|
||||
find_replay_segments_fut find_replay_segments();
|
||||
|
||||
/// read record metadata for record starting at start
|
||||
@ -249,7 +251,7 @@ private:
|
||||
/// replays records starting at start through end of segment
|
||||
replay_ertr::future<>
|
||||
replay_segment(
|
||||
paddr_t start, ///< [in] starting addr
|
||||
journal_seq_t start, ///< [in] starting addr, seq
|
||||
delta_handler_t &delta_handler ///< [in] processes deltas in order
|
||||
);
|
||||
};
|
||||
|
@ -62,11 +62,11 @@ TransactionManager::mkfs_ertr::future<> TransactionManager::mkfs()
|
||||
TransactionManager::mount_ertr::future<> TransactionManager::mount()
|
||||
{
|
||||
cache.init();
|
||||
return journal.replay([this](auto paddr, const auto &e) {
|
||||
return cache.replay_delta(paddr, e);
|
||||
return journal.replay([this](auto seq, auto paddr, const auto &e) {
|
||||
return cache.replay_delta(seq, paddr, e);
|
||||
}).safe_then([this] {
|
||||
return journal.open_for_write();
|
||||
}).safe_then([this] {
|
||||
}).safe_then([this](auto addr) {
|
||||
return seastar::do_with(
|
||||
create_transaction(),
|
||||
[this](auto &t) {
|
||||
|
Loading…
Reference in New Issue
Block a user