diff --git a/src/crimson/os/seastore/cache.cc b/src/crimson/os/seastore/cache.cc index 908600341a0..e1e557baf40 100644 --- a/src/crimson/os/seastore/cache.cc +++ b/src/crimson/os/seastore/cache.cc @@ -31,20 +31,43 @@ Cache::retire_extent_ret Cache::retire_extent_addr( Transaction &t, paddr_t addr, extent_len_t length) { LOG_PREFIX(Cache::retire_extent); - if (auto ext = t.write_set.find_offset(addr); ext != t.write_set.end()) { - DEBUGT("found {} in t.write_set", t, addr); + CachedExtentRef ext; + auto result = t.get_extent(addr, &ext); + if (result == Transaction::get_extent_ret::PRESENT) { + DEBUGT("found {} in t", t, addr); t.add_to_retired_set(CachedExtentRef(&*ext)); return retire_extent_iertr::now(); - } else if (auto iter = extents.find_offset(addr); - iter != extents.end()) { - auto ret = CachedExtentRef(&*iter); + } else if (result == Transaction::get_extent_ret::RETIRED) { + ERRORT("{} is already retired", t, addr); + ceph_abort(); + } + + // absent from transaction + result = query_cache_for_extent(addr, &ext); + if (result == Transaction::get_extent_ret::PRESENT) { + t.add_to_read_set(ext); return trans_intr::make_interruptible( - ret->wait_io() - ).then_interruptible([&t, ret=std::move(ret)]() mutable { - t.add_to_retired_set(ret); + ext->wait_io() + ).then_interruptible([&t, ext=std::move(ext)]() mutable { + t.add_to_retired_set(ext); return retire_extent_iertr::now(); }); - } else { + } else { // result == get_extent_ret::ABSENT + // FIXME this will cause incorrect transaction invalidation because t + // will not be notified if other transactions that modify the extent at + // this addr are committed. + // + // Say that we have transaction A and B, conflicting on extent E at laddr + // L: + // A: dec_ref(L) // cause uncached retirement + // B: read(L) -> E + // B: mutate(E) + // B: submit_transaction() // assume successful + // A: about to submit... + // + // A cannot be invalidated because E is not in A's read-set + // + // TODO: leverage RetiredExtentPlaceholder to fix the issue. t.add_to_retired_uncached(addr, length); return retire_extent_iertr::now(); } @@ -125,7 +148,9 @@ void Cache::retire_extent(CachedExtentRef ref) remove_from_dirty(ref); ref->dirty_from_or_retired_at = JOURNAL_SEQ_MAX; retired_extent_gate.add_extent(*ref); - ref->state = CachedExtent::extent_state_t::INVALID; + + invalidate(*ref); + extents.erase(*ref); } void Cache::replace_extent(CachedExtentRef next, CachedExtentRef prev) @@ -227,9 +252,9 @@ CachedExtentRef Cache::duplicate_for_write( return ret; } -std::optional<record_t> Cache::try_construct_record(Transaction &t) +record_t Cache::prepare_record(Transaction &t) { - LOG_PREFIX(Cache::try_construct_record); + LOG_PREFIX(Cache::prepare_record); DEBUGT("enter", t); // Should be valid due to interruptible future @@ -294,11 +319,6 @@ std::optional<record_t> Cache::try_construct_record(Transaction &t) // Transaction is now a go, set up in-memory cache state // invalidate now invalid blocks - for (auto &i: t.retired_set) { - DEBUGT("retiring {}", t, *i); - retire_extent(i); - } - for (auto &&i : t.retired_uncached) { CachedExtentRef to_retire; if (query_cache_for_extent(i.first, &to_retire) == @@ -307,12 +327,16 @@ std::optional<record_t> Cache::try_construct_record(Transaction &t) RetiredExtentPlaceholder >(i.second); to_retire->set_paddr(i.first); + to_retire->state = CachedExtent::extent_state_t::CLEAN; } t.retired_set.insert(to_retire); extents.insert(*to_retire); - to_retire->dirty_from_or_retired_at = JOURNAL_SEQ_MAX; - retired_extent_gate.add_extent(*to_retire); + } + + for (auto &i: t.retired_set) { + DEBUGT("retiring {}", t, *i); + retire_extent(i); } record.extents.reserve(t.fresh_block_list.size()); @@ -335,7 +359,7 @@ std::optional<record_t> Cache::try_construct_record(Transaction &t) }); } - return std::make_optional<record_t>(std::move(record)); + return record; } void Cache::complete_commit( @@ -462,9 +486,12 @@ Cache::replay_delta( -> get_extent_ertr::future<CachedExtentRef> { auto retiter = extents.find_offset(addr); if (retiter != extents.end()) { - return seastar::make_ready_future<CachedExtentRef>(&*retiter); + CachedExtentRef ret = &*retiter; + return ret->wait_io().then([ret] { + return ret; + }); } else { - return seastar::make_ready_future<CachedExtentRef>(); + return seastar::make_ready_future<CachedExtentRef>(); } }; auto extent_fut = (delta.pversion == 0 ? diff --git a/src/crimson/os/seastore/cache.h b/src/crimson/os/seastore/cache.h index 1a9998cc7f2..943975963d4 100644 --- a/src/crimson/os/seastore/cache.h +++ b/src/crimson/os/seastore/cache.h @@ -231,18 +231,27 @@ public: get_extent_if_cached_ret get_extent_if_cached( Transaction &t, paddr_t offset) { - return seastar::do_with( - CachedExtentRef(), - [this, &t, offset](auto &ret) { - auto status = query_cache_for_extent(t, offset, &ret); - auto wait = seastar::now(); - if (status == Transaction::get_extent_ret::PRESENT) { - wait = ret->wait_io(); - } - return trans_intr::make_interruptible( - wait.then([ret] { return std::move(ret); }) - ); - }); + CachedExtentRef ret; + auto result = t.get_extent(offset, &ret); + if (result != Transaction::get_extent_ret::ABSENT) { + // including get_extent_ret::RETIRED + return get_extent_if_cached_iertr::make_ready_future< + CachedExtentRef>(ret); + } + + // get_extent_ret::PRESENT from transaction + result = query_cache_for_extent(offset, &ret); + if (result == Transaction::get_extent_ret::ABSENT) { + return get_extent_if_cached_iertr::make_ready_future< + CachedExtentRef>(); + } + + // get_extent_ret::PRESENT from cache + t.add_to_read_set(ret); + return ret->wait_io().then([ret] { + return get_extent_if_cached_iertr::make_ready_future< + CachedExtentRef>(ret); + }); } /** @@ -308,7 +317,7 @@ public: laddr_t laddr, segment_off_t length) { CachedExtentRef ret; - auto status = query_cache_for_extent(t, offset, &ret); + auto status = t.get_extent(offset, &ret); if (status == Transaction::get_extent_ret::RETIRED) { return seastar::make_ready_future<CachedExtentRef>(); } else if (status == Transaction::get_extent_ret::PRESENT) { @@ -317,9 +326,14 @@ public: return trans_intr::make_interruptible( get_extent_by_type(type, offset, laddr, length) ).si_then([=, &t](CachedExtentRef ret) { - t.add_to_read_set(ret); - return get_extent_ertr::make_ready_future<CachedExtentRef>( - std::move(ret)); + if (!ret->is_valid()) { + t.conflicted = true; + return get_extent_ertr::make_ready_future<CachedExtentRef>(); + } else { + t.add_to_read_set(ret); + return get_extent_ertr::make_ready_future<CachedExtentRef>( + std::move(ret)); + } }); } } @@ -370,14 +384,11 @@ public: ); /** - * try_construct_record + * prepare_record * - * First checks for conflicts. If a racing write has mutated/retired - * an extent mutated by this transaction, nullopt will be returned. - * - * Otherwise, a record will be returned valid for use with Journal. + * Construct the record for Journal from transaction. */ - std::optional<record_t> try_construct_record( + record_t prepare_record( Transaction &t ///< [in, out] current transaction ); @@ -591,18 +602,6 @@ private: } } - Transaction::get_extent_ret query_cache_for_extent( - Transaction &t, - paddr_t offset, - CachedExtentRef *out) { - auto result = t.get_extent(offset, out); - if (result != Transaction::get_extent_ret::ABSENT) { - return result; - } else { - return query_cache_for_extent(offset, out); - } - } - }; using CacheRef = std::unique_ptr<Cache>; diff --git a/src/crimson/os/seastore/cached_extent.h b/src/crimson/os/seastore/cached_extent.h index 856791800b9..a5dde1a869e 100644 --- a/src/crimson/os/seastore/cached_extent.h +++ b/src/crimson/os/seastore/cached_extent.h @@ -740,7 +740,7 @@ public: } std::ostream &print_detail(std::ostream &out) const final { - return out << "RetiredExtentPlaceholder"; + return out << ", RetiredExtentPlaceholder"; } void on_delta_write(paddr_t record_block_offset) final { diff --git a/src/crimson/os/seastore/onode_manager/staged-fltree/node_extent_manager/seastore.h b/src/crimson/os/seastore/onode_manager/staged-fltree/node_extent_manager/seastore.h index f2d5a48a3db..deb873265e9 100644 --- a/src/crimson/os/seastore/onode_manager/staged-fltree/node_extent_manager/seastore.h +++ b/src/crimson/os/seastore/onode_manager/staged-fltree/node_extent_manager/seastore.h @@ -109,11 +109,12 @@ class SeastoreNodeExtentManager final: public TransactionManagerHandle { ).safe_then([addr, &t](auto&& e) -> read_ertr::future<NodeExtentRef> { TRACET("read {}B at {:#x} -- {}", t, e->get_length(), e->get_laddr(), *e); - if (!e->is_valid() || t.is_conflicted()) { + if (t.is_conflicted()) { ERRORT("transaction conflict detected on extent read {}", t, *e); assert(t.is_conflicted()); return crimson::ct_error::eagain::make(); } + assert(e->is_valid()); assert(e->get_laddr() == addr); std::ignore = addr; return read_ertr::make_ready_future<NodeExtentRef>(e); diff --git a/src/crimson/os/seastore/transaction_manager.cc b/src/crimson/os/seastore/transaction_manager.cc index 8f34581eb44..603ffc49248 100644 --- a/src/crimson/os/seastore/transaction_manager.cc +++ b/src/crimson/os/seastore/transaction_manager.cc @@ -239,14 +239,13 @@ TransactionManager::submit_transaction_direct( tref.get_handle().enter(write_pipeline.prepare) ).then_interruptible([this, FNAME, &tref]() mutable -> submit_transaction_iertr::future<> { - auto record = cache->try_construct_record(tref); - assert(record); // interruptible future would have already failed + auto record = cache->prepare_record(tref); tref.get_handle().maybe_release_collection_lock(); DEBUGT("about to submit to journal", tref); - return journal->submit_record(std::move(*record), tref.get_handle() + return journal->submit_record(std::move(record), tref.get_handle() ).safe_then([this, FNAME, &tref](auto p) mutable { auto [addr, journal_seq] = p; DEBUGT("journal commit to {} seq {}", tref, addr, journal_seq); diff --git a/src/test/crimson/seastore/test_btree_lba_manager.cc b/src/test/crimson/seastore/test_btree_lba_manager.cc index a4b6d1db339..596e8362f6b 100644 --- a/src/test/crimson/seastore/test_btree_lba_manager.cc +++ b/src/test/crimson/seastore/test_btree_lba_manager.cc @@ -58,12 +58,8 @@ struct btree_lba_manager_test : seastar::future<> submit_transaction(TransactionRef t) { - auto record = cache.try_construct_record(*t); - if (!record) { - ceph_assert(0 == "cannot fail"); - } - - return journal.submit_record(std::move(*record), t->get_handle()).safe_then( + auto record = cache.prepare_record(*t); + return journal.submit_record(std::move(record), t->get_handle()).safe_then( [this, t=std::move(t)](auto p) mutable { auto [addr, seq] = p; cache.complete_commit(*t, addr, seq); diff --git a/src/test/crimson/seastore/test_seastore_cache.cc b/src/test/crimson/seastore/test_seastore_cache.cc index 31b77209090..42d285671e9 100644 --- a/src/test/crimson/seastore/test_seastore_cache.cc +++ b/src/test/crimson/seastore/test_seastore_cache.cc @@ -29,16 +29,12 @@ struct cache_test_t : public seastar_test_suite_t { : segment_manager(segment_manager::create_test_ephemeral()), cache(*segment_manager) {} - seastar::future<std::optional<paddr_t>> submit_transaction( + seastar::future<paddr_t> submit_transaction( TransactionRef t) { - auto record = cache.try_construct_record(*t); - if (!record) { - return seastar::make_ready_future<std::optional<paddr_t>>( - std::nullopt); - } + auto record = cache.prepare_record(*t); bufferlist bl; - for (auto &&block : record->extents) { + for (auto &&block : record.extents) { bl.append(block.bl); } @@ -57,7 +53,7 @@ struct cache_test_t : public seastar_test_suite_t { ).safe_then( [this, prev, t=std::move(t)]() mutable { cache.complete_commit(*t, prev, seq /* TODO */); - return seastar::make_ready_future<std::optional<paddr_t>>(prev); + return prev; }, crimson::ct_error::all_same_way([](auto e) { ASSERT_FALSE("failed to submit"); @@ -90,9 +86,7 @@ struct cache_test_t : public seastar_test_suite_t { return cache.mkfs(*transaction).safe_then( [this, &transaction] { return submit_transaction(std::move(transaction)).then( - [](auto p) { - ASSERT_TRUE(p); - }); + [](auto p) {}); }); }); }).handle_error( @@ -120,8 +114,7 @@ TEST_F(cache_test_t, test_addr_fixup) TestBlockPhysical::SIZE); extent->set_contents('c'); csum = extent->get_crc32c(); - auto ret = submit_transaction(std::move(t)).get0(); - ASSERT_TRUE(ret); + submit_transaction(std::move(t)).get0(); addr = extent->get_paddr(); } { @@ -165,8 +158,7 @@ TEST_F(cache_test_t, test_dirty_extent) ASSERT_EQ(extent->get_version(), 0); ASSERT_EQ(csum, extent->get_crc32c()); } - auto ret = submit_transaction(std::move(t)).get0(); - ASSERT_TRUE(ret); + submit_transaction(std::move(t)).get0(); addr = extent->get_paddr(); } { @@ -222,8 +214,7 @@ TEST_F(cache_test_t, test_dirty_extent) ASSERT_EQ(csum2, extent->get_crc32c()); } // submit transaction - auto ret = submit_transaction(std::move(t)).get0(); - ASSERT_TRUE(ret); + submit_transaction(std::move(t)).get0(); ASSERT_TRUE(extent->is_dirty()); ASSERT_EQ(addr, extent->get_paddr()); ASSERT_EQ(extent->get_version(), 1);