Merge pull request #36510 from athanatos/sjust/wip-tm-decref

crimson: remove extent from cache in TransactionManager::dec_ref

Reviewed-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2020-08-14 03:47:55 +08:00 committed by GitHub
commit 4fea9a8be3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 144 additions and 39 deletions

View File

@ -28,6 +28,24 @@ Cache::~Cache()
ceph_assert(extents.empty());
}
Cache::retire_extent_ret Cache::retire_extent_if_cached(
Transaction &t, paddr_t addr)
{
if (auto ext = t.write_set.find_offset(addr); ext != t.write_set.end()) {
t.add_to_retired_set(CachedExtentRef(&*ext));
return retire_extent_ertr::now();
} else if (auto iter = extents.find_offset(addr);
iter != extents.end()) {
auto ret = CachedExtentRef(&*iter);
return ret->wait_io().then([&t, ret=std::move(ret)]() mutable {
t.add_to_retired_set(ret);
return retire_extent_ertr::now();
});
} else {
return retire_extent_ertr::now();
}
}
void Cache::add_extent(CachedExtentRef ref)
{
assert(ref->is_valid());
@ -187,10 +205,16 @@ void Cache::complete_commit(
for (auto &i: t.fresh_block_list) {
i->set_paddr(cur);
cur.offset += i->get_length();
i->state = CachedExtent::extent_state_t::CLEAN;
i->last_committed_crc = i->get_crc32c();
logger().debug("complete_commit: fresh {}", *i);
i->on_initial_write();
if (!i->is_valid()) {
logger().debug("complete_commit: invalid {}", *i);
continue;
}
i->state = CachedExtent::extent_state_t::CLEAN;
logger().debug("complete_commit: fresh {}", *i);
add_extent(i);
}

View File

@ -90,6 +90,13 @@ public:
t.add_to_retired_set(ref);
}
/// Declare paddr retired in t, noop if not cached
using retire_extent_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
using retire_extent_ret = retire_extent_ertr::future<>;
retire_extent_ret retire_extent_if_cached(
Transaction &t, paddr_t addr);
/**
* get_root
*

View File

@ -65,6 +65,17 @@ CachedExtent::~CachedExtent()
}
}
std::ostream &LogicalCachedExtent::print_detail(std::ostream &out) const
{
out << ", laddr=" << laddr;
if (pin) {
out << ", pin=" << *pin;
} else {
out << ", pin=empty";
}
return print_detail_l(out);
}
std::ostream &operator<<(std::ostream &out, const LBAPin &rhs)
{
return out << "LBAPin(" << rhs.get_laddr() << "~" << rhs.get_length()

View File

@ -566,8 +566,13 @@ public:
bool is_logical() const final {
return true;
}
std::ostream &print_detail(std::ostream &out) const final;
protected:
virtual void apply_delta(const ceph::bufferlist &bl) = 0;
virtual std::ostream &print_detail_l(std::ostream &out) const {
return out;
}
private:
laddr_t laddr = L_ADDR_NULL;

View File

@ -88,10 +88,15 @@ public:
Transaction &t,
laddr_t off, extent_len_t len, paddr_t addr) = 0;
struct ref_update_result_t {
unsigned refcount = 0;
paddr_t addr;
};
using ref_ertr = crimson::errorator<
crimson::ct_error::enoent,
crimson::ct_error::input_output_error>;
using ref_ret = ref_ertr::future<unsigned>;
using ref_ret = ref_ertr::future<ref_update_result_t>;
/**
* Decrements ref count on extent

View File

@ -356,16 +356,9 @@ BtreeLBAManager::update_refcount_ret BtreeLBAManager::update_refcount(
lba_map_val_t out = in;
ceph_assert((int)out.refcount + delta >= 0);
out.refcount += delta;
if (out.refcount == 0) {
return std::optional<lba_map_val_t>();
} else {
return std::optional<lba_map_val_t>(out);
}
return out;
}).safe_then([](auto result) {
if (!result)
return 0u;
else
return result->refcount;
return ref_update_result_t{result.refcount, result.paddr};
});
}

View File

@ -145,7 +145,7 @@ private:
* Updates mapping, removes if f returns nullopt
*/
using update_mapping_ertr = ref_ertr;
using update_mapping_ret = ref_ertr::future<std::optional<lba_map_val_t>>;
using update_mapping_ret = ref_ertr::future<lba_map_val_t>;
using update_func_t = LBANode::mutate_func_t;
update_mapping_ret update_mapping(
Transaction &t,

View File

@ -129,9 +129,9 @@ struct LBANode : CachedExtent {
crimson::ct_error::input_output_error
>;
using mutate_mapping_ret = mutate_mapping_ertr::future<
std::optional<lba_map_val_t>>;
lba_map_val_t>;
using mutate_func_t = std::function<
std::optional<lba_map_val_t>(const lba_map_val_t &v)
lba_map_val_t(const lba_map_val_t &v)
>;
virtual mutate_mapping_ret mutate_mapping(
op_context_t c,

View File

@ -404,15 +404,12 @@ LBALeafNode::mutate_mapping_ret LBALeafNode::mutate_mapping(
ceph_assert(!at_min_capacity());
auto mutation_pt = find(laddr);
if (mutation_pt == end()) {
ceph_assert(0 == "should be impossible");
return mutate_mapping_ret(
mutate_mapping_ertr::ready_future_marker{},
std::nullopt);
return crimson::ct_error::enoent::make();
}
auto mutated = f(mutation_pt.get_val());
if (mutated) {
journal_update(mutation_pt, *mutated, maybe_get_delta_buffer());
if (mutated.refcount > 0) {
journal_update(mutation_pt, mutated, maybe_get_delta_buffer());
return mutate_mapping_ret(
mutate_mapping_ertr::ready_future_marker{},
mutated);

View File

@ -13,6 +13,8 @@ std::ostream &segment_to_stream(std::ostream &out, const segment_id_t &t)
return out << "BLOCK_REL_SEG";
else if (t == RECORD_REL_SEG_ID)
return out << "RECORD_REL_SEG";
else if (t == FAKE_SEG_ID)
return out << "FAKE_SEG";
else
return out << t;
}

View File

@ -28,6 +28,10 @@ constexpr segment_id_t RECORD_REL_SEG_ID =
constexpr segment_id_t BLOCK_REL_SEG_ID =
std::numeric_limits<segment_id_t>::max() - 3;
// for tests which generate fake paddrs
constexpr segment_id_t FAKE_SEG_ID =
std::numeric_limits<segment_id_t>::max() - 4;
std::ostream &segment_to_stream(std::ostream &, const segment_id_t &t);
// Offset within a segment on disk, see SegmentManager
@ -151,6 +155,9 @@ constexpr paddr_t make_record_relative_paddr(segment_off_t off) {
constexpr paddr_t make_block_relative_paddr(segment_off_t off) {
return paddr_t{BLOCK_REL_SEG_ID, off};
}
constexpr paddr_t make_fake_paddr(segment_off_t off) {
return paddr_t{FAKE_SEG_ID, off};
}
struct paddr_le_t {
ceph_le32 segment = init_le32(NULL_SEG_ID);

View File

@ -95,8 +95,9 @@ TransactionManager::ref_ret TransactionManager::inc_ref(
Transaction &t,
LogicalCachedExtentRef &ref)
{
return lba_manager.incref_extent(t, ref->get_laddr()
).handle_error(
return lba_manager.incref_extent(t, ref->get_laddr()).safe_then([](auto r) {
return r.refcount;
}).handle_error(
ref_ertr::pass_further{},
ct_error::all_same_way([](auto e) {
ceph_assert(0 == "unhandled error, TODO");
@ -107,27 +108,42 @@ TransactionManager::ref_ret TransactionManager::inc_ref(
Transaction &t,
laddr_t offset)
{
return lba_manager.incref_extent(t, offset);
return lba_manager.incref_extent(t, offset).safe_then([](auto result) {
return result.refcount;
});
}
TransactionManager::ref_ret TransactionManager::dec_ref(
Transaction &t,
LogicalCachedExtentRef &ref)
{
return dec_ref(t, ref->get_laddr()
).handle_error(
ref_ertr::pass_further{},
ct_error::all_same_way([](auto e) {
ceph_assert(0 == "unhandled error, TODO");
}));
return lba_manager.decref_extent(t, ref->get_laddr()
).safe_then([this, &t, ref](auto ret) {
if (ret.refcount == 0) {
cache.retire_extent(t, ref);
}
return ret.refcount;
});
}
TransactionManager::ref_ret TransactionManager::dec_ref(
Transaction &t,
laddr_t offset)
{
// TODO: need to retire the extent (only) if it's live, will need cache call
return lba_manager.decref_extent(t, offset);
return lba_manager.decref_extent(t, offset
).safe_then([this, &t](auto result) -> ref_ret {
if (result.refcount == 0) {
return cache.retire_extent_if_cached(t, result.addr).safe_then([] {
return ref_ret(
ref_ertr::ready_future_marker{},
0);
});
} else {
return ref_ret(
ref_ertr::ready_future_marker{},
result.refcount);
}
});
}
TransactionManager::submit_transaction_ertr::future<>

View File

@ -143,8 +143,9 @@ public:
return ret;
}
using ref_ertr = LBAManager::ref_ertr;
using ref_ret = LBAManager::ref_ret;
using ref_ret = ref_ertr::future<unsigned>;
/// Add refcount for ref
ref_ret inc_ref(

View File

@ -147,6 +147,12 @@ struct btree_lba_manager_test :
);
}
segment_off_t next_off = 0;
paddr_t get_paddr() {
next_off += block_size;
return make_fake_paddr(next_off);
}
auto alloc_mapping(
test_transaction_t &t,
laddr_t hint,
@ -208,7 +214,7 @@ struct btree_lba_manager_test :
auto refcnt = lba_manager->decref_extent(
*t.t,
target->first).unsafe_get0();
target->first).unsafe_get0().refcount;
EXPECT_EQ(refcnt, target->second.refcount);
if (target->second.refcount == 0) {
t.mappings.erase(target);
@ -222,7 +228,7 @@ struct btree_lba_manager_test :
target->second.refcount++;
auto refcnt = lba_manager->incref_extent(
*t.t,
target->first).unsafe_get0();
target->first).unsafe_get0().refcount;
EXPECT_EQ(refcnt, target->second.refcount);
}
@ -258,13 +264,12 @@ TEST_F(btree_lba_manager_test, basic)
{
run_async([this] {
laddr_t laddr = 0x12345678 * block_size;
paddr_t paddr = { 1, static_cast<segment_off_t>(block_size * 10) };
{
// write initial mapping
auto t = create_transaction();
check_mappings(t); // check in progress transaction sees mapping
check_mappings(); // check concurrent does not
auto ret = alloc_mapping(t, laddr, block_size, paddr);
auto ret = alloc_mapping(t, laddr, block_size, get_paddr());
submit_test_transaction(std::move(t));
}
check_mappings(); // check new transaction post commit sees it
@ -278,7 +283,7 @@ TEST_F(btree_lba_manager_test, force_split)
auto t = create_transaction();
logger().debug("opened transaction");
for (unsigned j = 0; j < 5; ++j) {
auto ret = alloc_mapping(t, 0, block_size, P_ADDR_MIN);
auto ret = alloc_mapping(t, 0, block_size, get_paddr());
if ((i % 10 == 0) && (j == 3)) {
check_mappings(t);
check_mappings();
@ -298,7 +303,7 @@ TEST_F(btree_lba_manager_test, force_split_merge)
auto t = create_transaction();
logger().debug("opened transaction");
for (unsigned j = 0; j < 5; ++j) {
auto ret = alloc_mapping(t, 0, block_size, P_ADDR_MIN);
auto ret = alloc_mapping(t, 0, block_size, get_paddr());
// just to speed things up a bit
if ((i % 100 == 0) && (j == 3)) {
check_mappings(t);

View File

@ -278,6 +278,38 @@ TEST_F(transaction_manager_test_t, mutate)
});
}
TEST_F(transaction_manager_test_t, create_remove_same_transaction)
{
constexpr laddr_t SIZE = 4096;
run_async([this] {
constexpr laddr_t ADDR = 0xFF * SIZE;
{
auto t = create_transaction();
auto extent = alloc_extent(
t,
ADDR,
SIZE,
'a');
ASSERT_EQ(ADDR, extent->get_laddr());
check_mappings(t);
dec_ref(t, ADDR);
check_mappings(t);
extent = alloc_extent(
t,
ADDR,
SIZE,
'a');
submit_transaction(std::move(t));
check_mappings();
}
replay();
check_mappings();
});
}
TEST_F(transaction_manager_test_t, inc_dec_ref)
{
constexpr laddr_t SIZE = 4096;