crimson/onode-staged-tree: remove compile-time NODE_BLOCK_SIZE

Switch to run-time node_size.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
This commit is contained in:
Yingxin Cheng 2021-06-07 13:57:13 +08:00
parent 55605f6e34
commit 38a6b41614
12 changed files with 70 additions and 41 deletions

View File

@ -64,7 +64,6 @@ inline bool is_valid_index(index_t index) { return index < INDEX_UPPER_BOUND; }
// we support up to 64 KiB tree nodes
using node_offset_t = uint16_t;
constexpr node_offset_t DISK_BLOCK_SIZE = 1u << 12;
constexpr node_offset_t NODE_BLOCK_SIZE = DISK_BLOCK_SIZE * 1u;
constexpr auto MAX_NODE_SIZE =
(extent_len_t)std::numeric_limits<node_offset_t>::max() + 1;
@ -176,9 +175,10 @@ inline std::ostream& operator<<(std::ostream& os, const tree_stats_t& stats) {
}
template <typename PtrType>
void reset_ptr(PtrType& ptr, const char* origin_base, const char* new_base) {
void reset_ptr(PtrType& ptr, const char* origin_base,
const char* new_base, extent_len_t node_size) {
assert((const char*)ptr > origin_base);
assert((const char*)ptr - origin_base < NODE_BLOCK_SIZE);
assert((const char*)ptr - origin_base < (int)node_size);
ptr = reinterpret_cast<PtrType>(
(const char*)ptr - origin_base + new_base);
}

View File

@ -191,7 +191,8 @@ void tree_cursor_t::Cache::update_all(const node_version_t& current_version,
key_view = _key_view;
p_value_header = _p_value_header;
assert((const char*)p_value_header > p_node_base);
assert((const char*)p_value_header - p_node_base < NODE_BLOCK_SIZE);
assert((const char*)p_value_header - p_node_base <
(int)ref_leaf_node->get_node_size());
value_payload_mut.reset();
p_value_recorder = nullptr;
@ -211,10 +212,12 @@ void tree_cursor_t::Cache::maybe_duplicate(const node_version_t& current_version
auto current_p_node_base = ref_leaf_node->read();
assert(current_p_node_base != p_node_base);
auto node_size = ref_leaf_node->get_node_size();
version.state = current_version.state;
reset_ptr(p_value_header, p_node_base, current_p_node_base);
key_view->reset_to(p_node_base, current_p_node_base);
reset_ptr(p_value_header, p_node_base,
current_p_node_base, node_size);
key_view->reset_to(p_node_base, current_p_node_base, node_size);
value_payload_mut.reset();
p_value_recorder = nullptr;
@ -1715,6 +1718,11 @@ const char* LeafNode::read() const
return impl->read();
}
extent_len_t LeafNode::get_node_size() const
{
return impl->get_node_size();
}
std::tuple<key_view_t, const value_header_t*>
LeafNode::get_kv(const search_position_t& pos) const
{

View File

@ -613,6 +613,7 @@ class LeafNode final : public Node {
bool is_level_tail() const;
node_version_t get_version() const;
const char* read() const;
extent_len_t get_node_size() const;
std::tuple<key_view_t, const value_header_t*> get_kv(const search_position_t&) const;
eagain_future<Ref<tree_cursor_t>> get_next_cursor(context_t, const search_position_t&);

View File

@ -77,11 +77,13 @@ class DeltaRecorderT final: public DeltaRecorder {
void encode_update_child_addr(
const laddr_t new_addr,
const laddr_packed_t* p_addr,
const char* p_node_start) {
const char* p_node_start,
extent_len_t node_size) {
ceph::encode(node_delta_op_t::UPDATE_CHILD_ADDR, encoded);
ceph::encode(new_addr, encoded);
int node_offset = reinterpret_cast<const char*>(p_addr) - p_node_start;
assert(node_offset > 0 && node_offset <= NODE_BLOCK_SIZE);
assert(node_offset > 0 && node_offset < (int)node_size);
assert(node_offset < (int)MAX_NODE_SIZE);
ceph::encode(static_cast<node_offset_t>(node_offset), encoded);
}
@ -438,11 +440,13 @@ class NodeExtentAccessorT {
assert(extent->is_pending());
assert(state != nextent_state_t::READ_ONLY);
if (state == nextent_state_t::MUTATION_PENDING) {
recorder->encode_update_child_addr(new_addr, p_addr, read().p_start());
recorder->encode_update_child_addr(
new_addr, p_addr, read().p_start(), get_length());
}
#ifndef NDEBUG
test_extent->prepare_replay(extent);
test_recorder->encode_update_child_addr(new_addr, p_addr, read().p_start());
test_recorder->encode_update_child_addr(
new_addr, p_addr, read().p_start(), get_length());
#endif
layout_t::update_child_addr(*mut, new_addr, p_addr);
#ifndef NDEBUG

View File

@ -64,6 +64,7 @@ class NodeImpl {
virtual field_type_t field_type() const = 0;
virtual laddr_t laddr() const = 0;
virtual const char* read() const = 0;
virtual extent_len_t get_node_size() const = 0;
virtual nextent_state_t get_extent_state() const = 0;
virtual void prepare_mutate(context_t) = 0;
virtual bool is_level_tail() const = 0;

View File

@ -102,6 +102,7 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
field_type_t field_type() const override { return FIELD_TYPE; }
laddr_t laddr() const override { return extent.get_laddr(); }
const char* read() const override { return extent.read().p_start(); }
extent_len_t get_node_size() const override { return extent.get_length(); }
nextent_state_t get_extent_state() const override { return extent.get_state(); }
void prepare_mutate(context_t c) override { return extent.prepare_mutate(c); }
bool is_level_tail() const override { return extent.read().is_level_tail(); }
@ -394,6 +395,12 @@ class NodeLayoutT final : public InternalNodeImpl, public LeafNodeImpl {
} else {
ceph_abort("impossible path");
}
#ifndef NDEBUG
if (pp_value) {
assert((const char*)(*pp_value) - extent.read().p_start() <
extent.get_length());
}
#endif
}
void get_prev_slot(search_position_t& pos,

View File

@ -56,17 +56,17 @@ void ITER_T::update_size(
{
node_offset_t offset = iter.get_back_offset();
int new_size = change + offset;
assert(new_size > 0 && new_size < NODE_BLOCK_SIZE);
assert(new_size > 0 && new_size < (int)mut.get_length());
mut.copy_in_absolute(
(void*)iter.get_item_range().p_end, node_offset_t(new_size));
}
template <node_type_t NODE_TYPE>
node_offset_t ITER_T::trim_until(NodeExtentMutable&, const ITER_T& iter)
node_offset_t ITER_T::trim_until(NodeExtentMutable& mut, const ITER_T& iter)
{
assert(iter.index() != 0);
size_t ret = iter.p_end() - iter.p_items_start;
assert(ret < NODE_BLOCK_SIZE);
assert(ret < mut.get_length());
return ret;
}
@ -75,7 +75,7 @@ node_offset_t ITER_T::trim_at(
NodeExtentMutable& mut, const ITER_T& iter, node_offset_t trimmed)
{
size_t trim_size = iter.p_start() - iter.p_items_start + trimmed;
assert(trim_size < NODE_BLOCK_SIZE);
assert(trim_size < mut.get_length());
assert(iter.get_back_offset() > trimmed);
node_offset_t new_offset = iter.get_back_offset() - trimmed;
mut.copy_in_absolute((void*)iter.item_range.p_end, new_offset);

View File

@ -64,12 +64,12 @@ class item_iterator_t {
}
node_offset_t size() const {
size_t ret = item_range.p_end - item_range.p_start + sizeof(node_offset_t);
assert(ret < NODE_BLOCK_SIZE);
assert(ret < node_size);
return ret;
};
node_offset_t size_to_nxt() const {
size_t ret = get_key().size() + sizeof(node_offset_t);
assert(ret < NODE_BLOCK_SIZE);
assert(ret < node_size);
return ret;
}
node_offset_t size_overhead() const {
@ -92,8 +92,8 @@ class item_iterator_t {
void encode(const char* p_node_start, ceph::bufferlist& encoded) const {
int start_offset = p_items_start - p_node_start;
int end_offset = p_items_end - p_node_start;
assert(start_offset > 0 && start_offset < NODE_BLOCK_SIZE);
assert(end_offset > 0 && end_offset <= NODE_BLOCK_SIZE);
assert(start_offset > 0 && start_offset < (int)node_size);
assert(end_offset > 0 && end_offset <= (int)node_size);
ceph::encode(static_cast<node_offset_t>(start_offset), encoded);
ceph::encode(static_cast<node_offset_t>(end_offset), encoded);
ceph::encode(_index, encoded);
@ -107,7 +107,7 @@ class item_iterator_t {
node_offset_t end_offset;
ceph::decode(end_offset, delta);
assert(start_offset < end_offset);
assert(end_offset <= NODE_BLOCK_SIZE);
assert(end_offset <= node_size);
index_t index;
ceph::decode(index, delta);

View File

@ -188,7 +188,7 @@ struct string_key_view_t {
}
node_offset_t size() const {
size_t ret = length + sizeof(string_size_t);
assert(ret < NODE_BLOCK_SIZE);
assert(ret < MAX_NODE_SIZE);
return ret;
}
node_offset_t size_logical() const {
@ -217,9 +217,11 @@ struct string_key_view_t {
}
bool operator!=(const string_key_view_t& x) const { return !(*this == x); }
void reset_to(const char* origin_base, const char* new_base) {
reset_ptr(p_key, origin_base, new_base);
reset_ptr(p_length, origin_base, new_base);
void reset_to(const char* origin_base,
const char* new_base,
extent_len_t node_size) {
reset_ptr(p_key, origin_base, new_base, node_size);
reset_ptr(p_length, origin_base, new_base, node_size);
#ifndef NDEBUG
string_size_t current_length;
std::memcpy(&current_length, p_length, sizeof(string_size_t));
@ -397,7 +399,7 @@ struct ns_oid_view_t {
node_offset_t size() const {
if (type() == Type::STR) {
size_t ret = nspace.size() + oid.size();
assert(ret < NODE_BLOCK_SIZE);
assert(ret < MAX_NODE_SIZE);
return ret;
} else {
return sizeof(string_size_t);
@ -417,9 +419,11 @@ struct ns_oid_view_t {
}
bool operator!=(const ns_oid_view_t& x) const { return !(*this == x); }
void reset_to(const char* origin_base, const char* new_base) {
nspace.reset_to(origin_base, new_base);
oid.reset_to(origin_base, new_base);
void reset_to(const char* origin_base,
const char* new_base,
extent_len_t node_size) {
nspace.reset_to(origin_base, new_base, node_size);
oid.reset_to(origin_base, new_base, node_size);
}
template <KeyT KT>
@ -700,18 +704,20 @@ class key_view_t {
replace(key);
}
void reset_to(const char* origin_base, const char* new_base) {
void reset_to(const char* origin_base,
const char* new_base,
extent_len_t node_size) {
if (p_shard_pool != nullptr) {
reset_ptr(p_shard_pool, origin_base, new_base);
reset_ptr(p_shard_pool, origin_base, new_base, node_size);
}
if (p_crush != nullptr) {
reset_ptr(p_crush, origin_base, new_base);
reset_ptr(p_crush, origin_base, new_base, node_size);
}
if (p_ns_oid.has_value()) {
p_ns_oid->reset_to(origin_base, new_base);
p_ns_oid->reset_to(origin_base, new_base, node_size);
}
if (p_snap_gen != nullptr) {
reset_ptr(p_snap_gen, origin_base, new_base);
reset_ptr(p_snap_gen, origin_base, new_base, node_size);
}
}

View File

@ -34,13 +34,13 @@ IA_TEMPLATE(KeyT::VIEW);
IA_TEMPLATE(KeyT::HOBJ);
node_offset_t internal_sub_items_t::trim_until(
NodeExtentMutable&, internal_sub_items_t& items, index_t index)
NodeExtentMutable& mut, internal_sub_items_t& items, index_t index)
{
assert(index != 0);
auto keys = items.keys();
assert(index <= keys);
size_t ret = sizeof(internal_sub_item_t) * (keys - index);
assert(ret < NODE_BLOCK_SIZE);
assert(ret < mut.get_length());
return ret;
}
@ -152,7 +152,7 @@ node_offset_t leaf_sub_items_t::trim_until(
size_trim_offsets);
mut.copy_in_absolute((void*)items.p_num_keys, num_keys_t(index));
size_t ret = size_trim_offsets + (p_shift_start - p_items_start);
assert(ret < NODE_BLOCK_SIZE);
assert(ret < mut.get_length());
return ret;
}

View File

@ -63,7 +63,7 @@ class internal_sub_items_t {
}
node_offset_t size_before(index_t index) const {
size_t ret = index * sizeof(internal_sub_item_t);
assert(ret < NODE_BLOCK_SIZE);
assert(ret < node_size);
return ret;
}
const laddr_packed_t* get_p_value(index_t index) const {
@ -79,7 +79,7 @@ class internal_sub_items_t {
int end_offset = p_end - p_node_start;
assert(start_offset > 0 &&
start_offset < end_offset &&
end_offset < NODE_BLOCK_SIZE);
end_offset < (int)node_size);
ceph::encode(static_cast<node_offset_t>(start_offset), encoded);
ceph::encode(static_cast<node_offset_t>(end_offset), encoded);
}
@ -93,7 +93,7 @@ class internal_sub_items_t {
node_offset_t end_offset;
ceph::decode(end_offset, delta);
assert(start_offset < end_offset);
assert(end_offset <= NODE_BLOCK_SIZE);
assert(end_offset <= node_size);
return internal_sub_items_t({{p_node_start + start_offset,
p_node_start + end_offset},
node_size});
@ -233,7 +233,7 @@ class leaf_sub_items_t {
(index + 1) * sizeof(node_offset_t) +
get_offset(index).value;
}
assert(ret < NODE_BLOCK_SIZE);
assert(ret < node_size);
return ret;
}
node_offset_t size_overhead_at(index_t index) const { return sizeof(node_offset_t); }
@ -252,7 +252,7 @@ class leaf_sub_items_t {
int end_offset = p_end - p_node_start;
assert(start_offset > 0 &&
start_offset < end_offset &&
end_offset < NODE_BLOCK_SIZE);
end_offset < (int)node_size);
ceph::encode(static_cast<node_offset_t>(start_offset), encoded);
ceph::encode(static_cast<node_offset_t>(end_offset), encoded);
}
@ -266,7 +266,7 @@ class leaf_sub_items_t {
node_offset_t end_offset;
ceph::decode(end_offset, delta);
assert(start_offset < end_offset);
assert(end_offset <= NODE_BLOCK_SIZE);
assert(end_offset < node_size);
return leaf_sub_items_t({{p_node_start + start_offset,
p_node_start + end_offset},
node_size});

View File

@ -846,6 +846,8 @@ class DummyChildPool {
field_type_t field_type() const override { return field_type_t::N0; }
const char* read() const override {
ceph_abort("impossible path"); }
extent_len_t get_node_size() const override {
ceph_abort("impossible path"); }
nextent_state_t get_extent_state() const override {
ceph_abort("impossible path"); }
level_t level() const override { return 0u; }