crimson/os/seastore: validate node immediately prior to adding pin

Mostly, operating on an invalid extent is harmless by construction --
it'll be caught by the read set check in submit_transaction.  However,
prior to adding an extent to the lba pin set, we need to check that it's
really still live, which we can do by checking that it and its immediate
parent are still valid.  Mutating the target extent would have
invalidated it, and replacing the target extent would necessarily have
involved a mutation to the parent, which would have invalidated it.

Signed-off-by: Samuel Just <sjust@redhat.com>
This commit is contained in:
Samuel Just 2021-01-14 03:43:07 +00:00
parent d244530f92
commit e43e5eadc5
7 changed files with 59 additions and 13 deletions

View File

@ -235,6 +235,11 @@ public:
return state != extent_state_t::INVALID;
}
/// Returns true if extent or prior_instance has been invalidated
bool has_been_invalidated() const {
return !is_valid() || (prior_instance && !prior_instance->is_valid());
}
/**
* get_dirty_from
*
@ -555,6 +560,7 @@ public:
virtual paddr_t get_paddr() const = 0;
virtual laddr_t get_laddr() const = 0;
virtual LBAPinRef duplicate() const = 0;
virtual bool has_been_invalidated() const = 0;
virtual ~LBAPin() {}
};

View File

@ -55,6 +55,7 @@ BtreeLBAManager::get_root(Transaction &t)
unsigned(croot->get_root().lba_depth));
return get_lba_btree_extent(
get_context(t),
croot,
croot->get_root().lba_depth,
croot->get_root().lba_root_addr,
paddr_t());
@ -567,6 +568,7 @@ BtreeLBAManager::update_internal_mapping(
paddr);
return get_lba_btree_extent(
get_context(t),
croot,
croot->get_root().lba_depth,
croot->get_root().lba_root_addr,
paddr_t()).safe_then([=, &t](LBANodeRef broot) {

View File

@ -269,6 +269,10 @@ public:
void take_pin(LBAPin &opin) final {
pin.take_pin(static_cast<BtreeLBAPin&>(opin).pin);
}
bool has_been_invalidated() const final {
return parent->has_been_invalidated();
}
};
}

View File

@ -258,6 +258,7 @@ using get_lba_node_ertr = base_ertr;
using get_lba_node_ret = get_lba_node_ertr::future<LBANodeRef>;
get_lba_node_ret get_lba_btree_extent(
op_context_t c, ///< [in] context structure
CachedExtentRef parent, ///< [in] paddr ref source
depth_t depth, ///< [in] depth of node to fetch
paddr_t offset, ///< [in] physical addr of node
paddr_t base ///< [in] depending on user, block addr or record addr

View File

@ -42,6 +42,7 @@ LBAInternalNode::lookup_ret LBAInternalNode::lookup(
auto iter = lower_bound(addr);
return get_lba_btree_extent(
c,
this,
meta.depth - 1,
iter->get_val(),
get_paddr()).safe_then([c, addr, depth](auto child) {
@ -63,6 +64,7 @@ LBAInternalNode::lookup_range_ret LBAInternalNode::lookup_range(
[this, c, &result, addr, len](const auto &val) mutable {
return get_lba_btree_extent(
c,
this,
get_meta().depth - 1,
val.get_val(),
get_paddr()).safe_then(
@ -90,6 +92,7 @@ LBAInternalNode::insert_ret LBAInternalNode::insert(
auto insertion_pt = get_containing_child(laddr);
return get_lba_btree_extent(
c,
this,
get_meta().depth - 1,
insertion_pt->get_val(),
get_paddr()).safe_then(
@ -125,6 +128,7 @@ LBAInternalNode::mutate_mapping_ret LBAInternalNode::mutate_mapping_internal(
}
return get_lba_btree_extent(
c,
this,
get_meta().depth - 1,
mutation_pt->get_val(),
get_paddr()
@ -180,6 +184,7 @@ LBAInternalNode::mutate_internal_address_ret LBAInternalNode::mutate_internal_ad
auto iter = get_containing_child(laddr);
return get_lba_btree_extent(
c,
this,
get_meta().depth - 1,
iter->get_val(),
get_paddr()
@ -210,6 +215,7 @@ LBAInternalNode::find_hole_ret LBAInternalNode::find_hole(
std::make_optional<laddr_t>(L_ADDR_NULL));
}
return get_lba_btree_extent(c,
this,
get_meta().depth - 1,
i->get_val(),
get_paddr()).safe_then(
@ -246,6 +252,7 @@ LBAInternalNode::scan_mappings_ret LBAInternalNode::scan_mappings(
[=, &f](auto &viter) {
return get_lba_btree_extent(
c,
this,
get_meta().depth - 1,
viter->get_val(),
get_paddr()).safe_then([=, &f](auto child) {
@ -264,6 +271,7 @@ LBAInternalNode::scan_mapped_space_ret LBAInternalNode::scan_mapped_space(
[=, &f](auto &viter) {
return get_lba_btree_extent(
c,
this,
get_meta().depth - 1,
viter->get_val(),
get_paddr()).safe_then([=, &f](auto child) {
@ -350,6 +358,7 @@ LBAInternalNode::merge_entry(
auto donor_iter = donor_is_left ? iter - 1 : iter + 1;
return get_lba_btree_extent(
c,
this,
get_meta().depth - 1,
donor_iter->get_val(),
get_paddr()
@ -643,8 +652,9 @@ LBALeafNode::get_leaf_entries(laddr_t addr, extent_len_t len)
return bound(addr, addr + len);
}
get_lba_node_ertr::future<LBANodeRef> get_lba_btree_extent(
get_lba_node_ret get_lba_btree_extent(
op_context_t c,
CachedExtentRef parent,
depth_t depth,
paddr_t offset,
paddr_t base)
@ -659,17 +669,23 @@ get_lba_node_ertr::future<LBANodeRef> get_lba_btree_extent(
return c.cache.get_extent<LBAInternalNode>(
c.trans,
offset,
LBA_BLOCK_SIZE).safe_then([c](auto ret) {
LBA_BLOCK_SIZE).safe_then([c, parent](auto ret)
-> get_lba_node_ret {
auto meta = ret->get_meta();
if (ret->get_size()) {
ceph_assert(meta.begin <= ret->begin()->get_key());
ceph_assert(meta.end > (ret->end() - 1)->get_key());
}
if (parent->has_been_invalidated() || ret->has_been_invalidated()) {
return crimson::ct_error::eagain::make();
}
if (!ret->is_pending() && !ret->pin.is_linked()) {
ret->pin.set_range(meta);
c.pins.add_pin(ret->pin);
}
return LBANodeRef(ret.detach(), /* add_ref = */ false);
return get_lba_node_ret(
get_lba_node_ertr::ready_future_marker{},
LBANodeRef(ret.detach(), /* add_ref = */ false));
});
} else {
logger().debug(
@ -679,21 +695,28 @@ get_lba_node_ertr::future<LBANodeRef> get_lba_btree_extent(
return c.cache.get_extent<LBALeafNode>(
c.trans,
offset,
LBA_BLOCK_SIZE).safe_then([offset, c](auto ret) {
LBA_BLOCK_SIZE).safe_then([offset, c, parent](auto ret)
-> get_lba_node_ret {
logger().debug(
"get_lba_btree_extent: read leaf at offset {} {}",
"get_lba_btree_extent: read leaf at offset {} {}, parent {}",
offset,
*ret);
*ret,
*parent);
auto meta = ret->get_meta();
if (ret->get_size()) {
ceph_assert(meta.begin <= ret->begin()->get_key());
ceph_assert(meta.end > (ret->end() - 1)->get_key());
}
if (parent->has_been_invalidated() || ret->has_been_invalidated()) {
return crimson::ct_error::eagain::make();
}
if (!ret->is_pending() && !ret->pin.is_linked()) {
ret->pin.set_range(meta);
c.pins.add_pin(ret->pin);
}
return LBANodeRef(ret.detach(), /* add_ref = */ false);
return get_lba_node_ret(
get_lba_node_ertr::ready_future_marker{},
LBANodeRef(ret.detach(), /* add_ref = */ false));
});
}
}

View File

@ -289,11 +289,16 @@ TransactionManager::get_extent_if_live_ret TransactionManager::get_extent_if_liv
addr,
laddr,
len).safe_then(
[this, pin=std::move(pin)](CachedExtentRef ret) mutable {
[this, pin=std::move(pin)](CachedExtentRef ret) mutable
-> get_extent_if_live_ret {
auto lref = ret->cast<LogicalCachedExtent>();
if (!lref->has_pin()) {
lref->set_pin(std::move(pin));
lba_manager.add_pin(lref->get_pin());
if (pin->has_been_invalidated() || lref->has_been_invalidated()) {
return crimson::ct_error::eagain::make();
} else {
lref->set_pin(std::move(pin));
lba_manager.add_pin(lref->get_pin());
}
}
return get_extent_if_live_ret(
get_extent_if_live_ertr::ready_future_marker{},

View File

@ -113,10 +113,15 @@ public:
t,
pin->get_paddr(),
pin->get_length()
).safe_then([this, &pin, &ret_ref](auto ref) mutable {
).safe_then([this, &pin, &ret_ref](auto ref) mutable
-> read_extent_ertr::future<> {
if (!ref->has_pin()) {
ref->set_pin(std::move(pin));
lba_manager.add_pin(ref->get_pin());
if (pin->has_been_invalidated() || ref->has_been_invalidated()) {
return crimson::ct_error::eagain::make();
} else {
ref->set_pin(std::move(pin));
lba_manager.add_pin(ref->get_pin());
}
}
ret_ref.push_back(std::make_pair(ref->get_laddr(), ref));
crimson::get_logger(ceph_subsys_filestore).debug(