crimson/onode-staged-tree: cleanup node interfaces

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
This commit is contained in:
Yingxin Cheng 2021-04-01 14:01:42 +08:00
parent 5e8ed3e880
commit 71836adff5
3 changed files with 48 additions and 41 deletions

View File

@ -445,13 +445,13 @@ void Node::as_child(const search_position_t& pos, Ref<InternalNode> parent_node)
template void Node::as_child<true>(const search_position_t&, Ref<InternalNode>);
template void Node::as_child<false>(const search_position_t&, Ref<InternalNode>);
node_future<> Node::insert_parent(
context_t c, const key_view_t& pivot_index, Ref<Node> right_node)
node_future<> Node::apply_split_to_parent(
context_t c, Ref<Node> split_right)
{
assert(!is_root());
// TODO(cross-node string dedup)
return parent_info().ptr->apply_child_split(
c, parent_info().position, pivot_index, this, right_node);
c, this, split_right);
}
node_future<Ref<tree_cursor_t>>
@ -518,36 +518,39 @@ InternalNode::get_next_cursor(context_t c, const search_position_t& pos)
}
node_future<> InternalNode::apply_child_split(
context_t c, const search_position_t& pos, const key_view_t& left_key,
Ref<Node> left_child, Ref<Node> right_child)
context_t c, Ref<Node> left_child, Ref<Node> right_child)
{
auto& left_pos = left_child->parent_info().position;
#ifndef NDEBUG
if (pos.is_end()) {
assert(left_child->parent_info().ptr.get() == this);
assert(!left_child->impl->is_level_tail());
if (left_pos.is_end()) {
assert(impl->is_level_tail());
assert(right_child->impl->is_level_tail());
}
auto _left_key = *left_child->impl->get_pivot_index();
assert(left_key.compare_to(_left_key) == MatchKindCMP::EQ);
#endif
impl->prepare_mutate(c);
auto left_key = *left_child->impl->get_pivot_index();
auto left_child_addr = left_child->impl->laddr();
auto op_right_key = right_child->impl->get_pivot_index();
auto right_child_addr = right_child->impl->laddr();
auto right_key = right_child->impl->get_pivot_index();
if (right_key.has_value()) {
if (op_right_key.has_value()) {
logger().debug("OTree::Internal::Insert: "
"pos({}), left_child({}, {:#x}), right_child({}, {:#x}) ...",
pos, left_key, left_child_addr, *right_key, right_child_addr);
"left_pos({}), left_child({}, {:#x}), right_child({}, {:#x}) ...",
left_pos, left_key, left_child_addr, *op_right_key, right_child_addr);
} else {
logger().debug("OTree::Internal::Insert: "
"pos({}), left_child({}, {:#x}), right_child(N/A, {:#x}) ...",
pos, left_key, left_child_addr, right_child_addr);
"left_pos({}), left_child({}, {:#x}), right_child(N/A, {:#x}) ...",
left_pos, left_key, left_child_addr, right_child_addr);
}
// update pos => left_child to pos => right_child
impl->replace_child_addr(pos, right_child_addr, left_child_addr);
replace_track(pos, right_child, left_child);
// update left_pos => left_child to left_pos => right_child
impl->replace_child_addr(left_pos, right_child_addr, left_child_addr);
replace_track(right_child, left_child);
search_position_t insert_pos = pos;
search_position_t insert_pos = left_pos;
auto [insert_stage, insert_size] = impl->evaluate_insert(
left_key, left_child_addr, insert_pos);
auto free_size = impl->free_size();
@ -556,7 +559,7 @@ node_future<> InternalNode::apply_child_split(
[[maybe_unused]] auto p_value = impl->insert(
left_key, left_child_addr, insert_pos, insert_stage, insert_size);
assert(impl->free_size() == free_size - insert_size);
assert(insert_pos <= pos);
assert(insert_pos <= left_pos);
assert(p_value->value == left_child_addr);
track_insert(insert_pos, insert_stage, left_child, right_child);
validate_tracked_children();
@ -585,11 +588,7 @@ node_future<> InternalNode::apply_child_split(
validate_tracked_children();
right_node->validate_tracked_children();
// propagate index to parent
// TODO: get from trim()
key_view_t pivot_index;
impl->get_largest_slot(nullptr, &pivot_index, nullptr);
return insert_parent(c, pivot_index, right_node);
return apply_split_to_parent(c, right_node);
// TODO (optimize)
// try to acquire space from siblings before split... see btrfs
});
@ -751,7 +750,7 @@ node_future<Ref<Node>> InternalNode::get_or_track_child(
assert(position == child->parent_info().position);
std::ignore = position;
std::ignore = child_addr;
validate_child(*child);
validate_child_tracked(*child);
return child;
});
}
@ -790,12 +789,16 @@ void InternalNode::track_insert(
}
void InternalNode::replace_track(
const search_position_t& position, Ref<Node> new_child, Ref<Node> old_child)
Ref<Node> new_child, Ref<Node> old_child)
{
assert(tracked_child_nodes[position] == old_child);
tracked_child_nodes.erase(position);
new_child->as_child(position, this);
assert(tracked_child_nodes[position] == new_child);
assert(old_child->parent_info().ptr == this);
auto& pos = old_child->parent_info().position;
do_untrack_child(*old_child);
new_child->as_child(pos, this);
// ok, safe to release ref
old_child->_parent_info.reset();
validate_child_tracked(*new_child);
}
void InternalNode::track_split(
@ -1060,11 +1063,8 @@ node_future<Ref<tree_cursor_t>> LeafNode::insert_value(
validate_tracked_cursors();
right_node->validate_tracked_cursors();
// propagate insert to parent
// TODO: get from trim()
key_view_t pivot_index;
impl->get_largest_slot(nullptr, &pivot_index, nullptr);
return insert_parent(c, pivot_index, right_node).safe_then([ret] {
return apply_split_to_parent(c, right_node
).safe_then([ret] {
return ret;
});
// TODO (optimize)

View File

@ -415,12 +415,14 @@ class Node
// as child/non-root
template <bool VALIDATE = true>
void as_child(const search_position_t&, Ref<InternalNode>);
struct parent_info_t {
search_position_t position;
Ref<InternalNode> ptr;
};
const parent_info_t& parent_info() const { return *_parent_info; }
node_future<> insert_parent(context_t, const key_view_t&, Ref<Node> right_node);
node_future<> apply_split_to_parent(context_t, Ref<Node>);
node_future<Ref<tree_cursor_t>> get_next_cursor_from_parent(context_t);
private:
@ -467,9 +469,7 @@ class InternalNode final : public Node {
node_future<Ref<tree_cursor_t>> get_next_cursor(context_t, const search_position_t&);
node_future<> apply_child_split(
context_t, const search_position_t&, const key_view_t& left_key,
Ref<Node> left, Ref<Node> right);
node_future<> apply_child_split(context_t, Ref<Node> left, Ref<Node> right);
template <bool VALIDATE>
void do_track_child(Node& child) {
@ -498,6 +498,13 @@ class InternalNode final : public Node {
}
}
void validate_child_tracked(const Node& child) const {
validate_child(child);
assert(tracked_child_nodes.find(child.parent_info().position) !=
tracked_child_nodes.end());
assert(tracked_child_nodes.find(child.parent_info().position)->second == &child);
}
static node_future<Ref<InternalNode>> allocate_root(
context_t, level_t, laddr_t, Super::URef&&);
@ -516,7 +523,7 @@ class InternalNode final : public Node {
node_future<Ref<Node>> get_or_track_child(context_t, const search_position_t&, laddr_t);
void track_insert(
const search_position_t&, match_stage_t, Ref<Node>, Ref<Node> nxt_child = nullptr);
void replace_track(const search_position_t&, Ref<Node> new_child, Ref<Node> old_child);
void replace_track(Ref<Node> new_child, Ref<Node> old_child);
void track_split(const search_position_t&, Ref<InternalNode>);
void validate_tracked_children() const {
#ifndef NDEBUG

View File

@ -771,7 +771,7 @@ class DummyChildPool {
if (right_child->can_split()) {
splitable_nodes.insert(right_child);
}
return insert_parent(c, *impl->get_pivot_index(), right_child);
return apply_split_to_parent(c, right_child);
}
node_future<> insert_and_split(