Merge pull request #42080 from cyx1231st/wip-seastore-fix-cache

crimson/os/seastore/cache: misc fixes and cleanup

Reviewed-by: Samuel Just <sjust@redhat.com>
Reviewed-by: Xuehan Xu <xuxuehan@360.cn>
This commit is contained in:
Samuel Just 2021-06-30 21:17:47 -07:00 committed by GitHub
commit e9717a4735
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 97 additions and 84 deletions

View File

@ -31,20 +31,43 @@ Cache::retire_extent_ret Cache::retire_extent_addr(
Transaction &t, paddr_t addr, extent_len_t length)
{
LOG_PREFIX(Cache::retire_extent);
if (auto ext = t.write_set.find_offset(addr); ext != t.write_set.end()) {
DEBUGT("found {} in t.write_set", t, addr);
CachedExtentRef ext;
auto result = t.get_extent(addr, &ext);
if (result == Transaction::get_extent_ret::PRESENT) {
DEBUGT("found {} in t", t, addr);
t.add_to_retired_set(CachedExtentRef(&*ext));
return retire_extent_iertr::now();
} else if (auto iter = extents.find_offset(addr);
iter != extents.end()) {
auto ret = CachedExtentRef(&*iter);
} else if (result == Transaction::get_extent_ret::RETIRED) {
ERRORT("{} is already retired", t, addr);
ceph_abort();
}
// absent from transaction
result = query_cache_for_extent(addr, &ext);
if (result == Transaction::get_extent_ret::PRESENT) {
t.add_to_read_set(ext);
return trans_intr::make_interruptible(
ret->wait_io()
).then_interruptible([&t, ret=std::move(ret)]() mutable {
t.add_to_retired_set(ret);
ext->wait_io()
).then_interruptible([&t, ext=std::move(ext)]() mutable {
t.add_to_retired_set(ext);
return retire_extent_iertr::now();
});
} else {
} else { // result == get_extent_ret::ABSENT
// FIXME this will cause incorrect transaction invalidation because t
// will not be notified if other transactions that modify the extent at
// this addr are committed.
//
// Say that we have transaction A and B, conflicting on extent E at laddr
// L:
// A: dec_ref(L) // cause uncached retirement
// B: read(L) -> E
// B: mutate(E)
// B: submit_transaction() // assume successful
// A: about to submit...
//
// A cannot be invalidated because E is not in A's read-set
//
// TODO: leverage RetiredExtentPlaceholder to fix the issue.
t.add_to_retired_uncached(addr, length);
return retire_extent_iertr::now();
}
@ -125,7 +148,9 @@ void Cache::retire_extent(CachedExtentRef ref)
remove_from_dirty(ref);
ref->dirty_from_or_retired_at = JOURNAL_SEQ_MAX;
retired_extent_gate.add_extent(*ref);
ref->state = CachedExtent::extent_state_t::INVALID;
invalidate(*ref);
extents.erase(*ref);
}
void Cache::replace_extent(CachedExtentRef next, CachedExtentRef prev)
@ -227,9 +252,9 @@ CachedExtentRef Cache::duplicate_for_write(
return ret;
}
std::optional<record_t> Cache::try_construct_record(Transaction &t)
record_t Cache::prepare_record(Transaction &t)
{
LOG_PREFIX(Cache::try_construct_record);
LOG_PREFIX(Cache::prepare_record);
DEBUGT("enter", t);
// Should be valid due to interruptible future
@ -294,11 +319,6 @@ std::optional<record_t> Cache::try_construct_record(Transaction &t)
// Transaction is now a go, set up in-memory cache state
// invalidate now invalid blocks
for (auto &i: t.retired_set) {
DEBUGT("retiring {}", t, *i);
retire_extent(i);
}
for (auto &&i : t.retired_uncached) {
CachedExtentRef to_retire;
if (query_cache_for_extent(i.first, &to_retire) ==
@ -307,12 +327,16 @@ std::optional<record_t> Cache::try_construct_record(Transaction &t)
RetiredExtentPlaceholder
>(i.second);
to_retire->set_paddr(i.first);
to_retire->state = CachedExtent::extent_state_t::CLEAN;
}
t.retired_set.insert(to_retire);
extents.insert(*to_retire);
to_retire->dirty_from_or_retired_at = JOURNAL_SEQ_MAX;
retired_extent_gate.add_extent(*to_retire);
}
for (auto &i: t.retired_set) {
DEBUGT("retiring {}", t, *i);
retire_extent(i);
}
record.extents.reserve(t.fresh_block_list.size());
@ -335,7 +359,7 @@ std::optional<record_t> Cache::try_construct_record(Transaction &t)
});
}
return std::make_optional<record_t>(std::move(record));
return record;
}
void Cache::complete_commit(
@ -462,9 +486,12 @@ Cache::replay_delta(
-> get_extent_ertr::future<CachedExtentRef> {
auto retiter = extents.find_offset(addr);
if (retiter != extents.end()) {
return seastar::make_ready_future<CachedExtentRef>(&*retiter);
CachedExtentRef ret = &*retiter;
return ret->wait_io().then([ret] {
return ret;
});
} else {
return seastar::make_ready_future<CachedExtentRef>();
return seastar::make_ready_future<CachedExtentRef>();
}
};
auto extent_fut = (delta.pversion == 0 ?

View File

@ -231,18 +231,27 @@ public:
get_extent_if_cached_ret get_extent_if_cached(
Transaction &t,
paddr_t offset) {
return seastar::do_with(
CachedExtentRef(),
[this, &t, offset](auto &ret) {
auto status = query_cache_for_extent(t, offset, &ret);
auto wait = seastar::now();
if (status == Transaction::get_extent_ret::PRESENT) {
wait = ret->wait_io();
}
return trans_intr::make_interruptible(
wait.then([ret] { return std::move(ret); })
);
});
CachedExtentRef ret;
auto result = t.get_extent(offset, &ret);
if (result != Transaction::get_extent_ret::ABSENT) {
// including get_extent_ret::RETIRED
return get_extent_if_cached_iertr::make_ready_future<
CachedExtentRef>(ret);
}
// get_extent_ret::PRESENT from transaction
result = query_cache_for_extent(offset, &ret);
if (result == Transaction::get_extent_ret::ABSENT) {
return get_extent_if_cached_iertr::make_ready_future<
CachedExtentRef>();
}
// get_extent_ret::PRESENT from cache
t.add_to_read_set(ret);
return ret->wait_io().then([ret] {
return get_extent_if_cached_iertr::make_ready_future<
CachedExtentRef>(ret);
});
}
/**
@ -308,7 +317,7 @@ public:
laddr_t laddr,
segment_off_t length) {
CachedExtentRef ret;
auto status = query_cache_for_extent(t, offset, &ret);
auto status = t.get_extent(offset, &ret);
if (status == Transaction::get_extent_ret::RETIRED) {
return seastar::make_ready_future<CachedExtentRef>();
} else if (status == Transaction::get_extent_ret::PRESENT) {
@ -317,9 +326,14 @@ public:
return trans_intr::make_interruptible(
get_extent_by_type(type, offset, laddr, length)
).si_then([=, &t](CachedExtentRef ret) {
t.add_to_read_set(ret);
return get_extent_ertr::make_ready_future<CachedExtentRef>(
std::move(ret));
if (!ret->is_valid()) {
t.conflicted = true;
return get_extent_ertr::make_ready_future<CachedExtentRef>();
} else {
t.add_to_read_set(ret);
return get_extent_ertr::make_ready_future<CachedExtentRef>(
std::move(ret));
}
});
}
}
@ -370,14 +384,11 @@ public:
);
/**
* try_construct_record
* prepare_record
*
* First checks for conflicts. If a racing write has mutated/retired
* an extent mutated by this transaction, nullopt will be returned.
*
* Otherwise, a record will be returned valid for use with Journal.
* Construct the record for Journal from transaction.
*/
std::optional<record_t> try_construct_record(
record_t prepare_record(
Transaction &t ///< [in, out] current transaction
);
@ -591,18 +602,6 @@ private:
}
}
Transaction::get_extent_ret query_cache_for_extent(
Transaction &t,
paddr_t offset,
CachedExtentRef *out) {
auto result = t.get_extent(offset, out);
if (result != Transaction::get_extent_ret::ABSENT) {
return result;
} else {
return query_cache_for_extent(offset, out);
}
}
};
using CacheRef = std::unique_ptr<Cache>;

View File

@ -740,7 +740,7 @@ public:
}
std::ostream &print_detail(std::ostream &out) const final {
return out << "RetiredExtentPlaceholder";
return out << ", RetiredExtentPlaceholder";
}
void on_delta_write(paddr_t record_block_offset) final {

View File

@ -109,11 +109,12 @@ class SeastoreNodeExtentManager final: public TransactionManagerHandle {
).safe_then([addr, &t](auto&& e) -> read_ertr::future<NodeExtentRef> {
TRACET("read {}B at {:#x} -- {}",
t, e->get_length(), e->get_laddr(), *e);
if (!e->is_valid() || t.is_conflicted()) {
if (t.is_conflicted()) {
ERRORT("transaction conflict detected on extent read {}", t, *e);
assert(t.is_conflicted());
return crimson::ct_error::eagain::make();
}
assert(e->is_valid());
assert(e->get_laddr() == addr);
std::ignore = addr;
return read_ertr::make_ready_future<NodeExtentRef>(e);

View File

@ -239,14 +239,13 @@ TransactionManager::submit_transaction_direct(
tref.get_handle().enter(write_pipeline.prepare)
).then_interruptible([this, FNAME, &tref]() mutable
-> submit_transaction_iertr::future<> {
auto record = cache->try_construct_record(tref);
assert(record); // interruptible future would have already failed
auto record = cache->prepare_record(tref);
tref.get_handle().maybe_release_collection_lock();
DEBUGT("about to submit to journal", tref);
return journal->submit_record(std::move(*record), tref.get_handle()
return journal->submit_record(std::move(record), tref.get_handle()
).safe_then([this, FNAME, &tref](auto p) mutable {
auto [addr, journal_seq] = p;
DEBUGT("journal commit to {} seq {}", tref, addr, journal_seq);

View File

@ -58,12 +58,8 @@ struct btree_lba_manager_test :
seastar::future<> submit_transaction(TransactionRef t)
{
auto record = cache.try_construct_record(*t);
if (!record) {
ceph_assert(0 == "cannot fail");
}
return journal.submit_record(std::move(*record), t->get_handle()).safe_then(
auto record = cache.prepare_record(*t);
return journal.submit_record(std::move(record), t->get_handle()).safe_then(
[this, t=std::move(t)](auto p) mutable {
auto [addr, seq] = p;
cache.complete_commit(*t, addr, seq);

View File

@ -29,16 +29,12 @@ struct cache_test_t : public seastar_test_suite_t {
: segment_manager(segment_manager::create_test_ephemeral()),
cache(*segment_manager) {}
seastar::future<std::optional<paddr_t>> submit_transaction(
seastar::future<paddr_t> submit_transaction(
TransactionRef t) {
auto record = cache.try_construct_record(*t);
if (!record) {
return seastar::make_ready_future<std::optional<paddr_t>>(
std::nullopt);
}
auto record = cache.prepare_record(*t);
bufferlist bl;
for (auto &&block : record->extents) {
for (auto &&block : record.extents) {
bl.append(block.bl);
}
@ -57,7 +53,7 @@ struct cache_test_t : public seastar_test_suite_t {
).safe_then(
[this, prev, t=std::move(t)]() mutable {
cache.complete_commit(*t, prev, seq /* TODO */);
return seastar::make_ready_future<std::optional<paddr_t>>(prev);
return prev;
},
crimson::ct_error::all_same_way([](auto e) {
ASSERT_FALSE("failed to submit");
@ -90,9 +86,7 @@ struct cache_test_t : public seastar_test_suite_t {
return cache.mkfs(*transaction).safe_then(
[this, &transaction] {
return submit_transaction(std::move(transaction)).then(
[](auto p) {
ASSERT_TRUE(p);
});
[](auto p) {});
});
});
}).handle_error(
@ -120,8 +114,7 @@ TEST_F(cache_test_t, test_addr_fixup)
TestBlockPhysical::SIZE);
extent->set_contents('c');
csum = extent->get_crc32c();
auto ret = submit_transaction(std::move(t)).get0();
ASSERT_TRUE(ret);
submit_transaction(std::move(t)).get0();
addr = extent->get_paddr();
}
{
@ -165,8 +158,7 @@ TEST_F(cache_test_t, test_dirty_extent)
ASSERT_EQ(extent->get_version(), 0);
ASSERT_EQ(csum, extent->get_crc32c());
}
auto ret = submit_transaction(std::move(t)).get0();
ASSERT_TRUE(ret);
submit_transaction(std::move(t)).get0();
addr = extent->get_paddr();
}
{
@ -222,8 +214,7 @@ TEST_F(cache_test_t, test_dirty_extent)
ASSERT_EQ(csum2, extent->get_crc32c());
}
// submit transaction
auto ret = submit_transaction(std::move(t)).get0();
ASSERT_TRUE(ret);
submit_transaction(std::move(t)).get0();
ASSERT_TRUE(extent->is_dirty());
ASSERT_EQ(addr, extent->get_paddr());
ASSERT_EQ(extent->get_version(), 1);