Merge pull request #36842 from cyx1231st/wip-fix-crimson-heartbeat

crimson/osd: fix unexpected connection markdown in heartbeat

Reviewed-by: Xuehan Xu <xxhdx1985126@163.com>
Reviewed-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2020-08-27 14:25:06 +08:00 committed by GitHub
commit 4e67b77fb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 29 additions and 21 deletions

View File

@ -71,30 +71,26 @@ void Protocol::close(bool dispatch_reset,
}
set_write_state(write_state_t::drop);
auto gate_closed = gate.close();
auto reset_dispatched = seastar::futurize_invoke([this, dispatch_reset, is_replace] {
if (dispatch_reset) {
dispatcher->ms_handle_reset(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
is_replace);
if (dispatch_reset) {
try {
dispatcher->ms_handle_reset(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
is_replace);
} catch (...) {
logger().error("{} got unexpected exception in ms_handle_reset() {}",
conn, std::current_exception());
}
return seastar::now();
}).handle_exception([this] (std::exception_ptr eptr) {
logger().error("{} ms_handle_reset caught exception: {}", conn, eptr);
ceph_abort("unexpected exception from ms_handle_reset()");
});
}
// asynchronous operations
assert(!close_ready.valid());
close_ready = seastar::when_all_succeed(
std::move(gate_closed).finally([this] {
if (socket) {
return socket->close();
}
close_ready = std::move(gate_closed).finally([this] {
if (socket) {
return socket->close();
} else {
return seastar::now();
}),
std::move(reset_dispatched)
).then_unpack([] {
return seastar::now();
}
}).finally(std::move(cleanup));
}

View File

@ -1591,6 +1591,12 @@ void ProtocolV2::execute_establishing(
trigger_state(state_t::ESTABLISHING, write_state_t::delay, false);
if (existing_conn) {
existing_conn->protocol->close(dispatch_reset, std::move(accept_me));
if (unlikely(state != state_t::ESTABLISHING)) {
logger().warn("{} triggered {} during execute_establishing(), "
"the accept event will not be delivered!",
conn, get_state_name(state));
abort_protocol();
}
} else {
accept_me();
}

View File

@ -402,7 +402,7 @@ void Heartbeat::Connection::replaced()
racing_detected = true;
logger().warn("Heartbeat::Connection::replaced(): {} racing", *this);
assert(conn != replaced_conn);
assert(!conn->is_connected());
assert(conn->is_connected());
}
void Heartbeat::Connection::reset()

View File

@ -176,6 +176,11 @@ class Heartbeat::Connection {
is_winner_side{is_winner_side} {
connect();
}
Connection(const Connection&) = delete;
Connection(Connection&&) = delete;
Connection& operator=(const Connection&) = delete;
Connection& operator=(Connection&&) = delete;
~Connection();
bool matches(crimson::net::Connection* _conn) const;
@ -235,7 +240,7 @@ class Heartbeat::Connection {
crimson::net::ConnectionRef conn;
bool is_connected = false;
friend std::ostream& operator<<(std::ostream& os, const Connection c) {
friend std::ostream& operator<<(std::ostream& os, const Connection& c) {
if (c.type == type_t::front) {
return os << "con_front(osd." << c.peer << ")";
} else {
@ -393,6 +398,7 @@ class Heartbeat::Peer final : private Heartbeat::ConnectionListener {
~Peer();
Peer(Peer&&) = delete;
Peer(const Peer&) = delete;
Peer& operator=(Peer&&) = delete;
Peer& operator=(const Peer&) = delete;
void set_epoch(epoch_t epoch) { session.set_epoch(epoch); }