mirror of
https://github.com/ceph/ceph
synced 2025-02-19 00:47:49 +00:00
msg/async/rdma: fix bug event center is blocked by rdma construct connection for transport ib sync msg
We construct a tcp connection to transport ib sync msg, if the remote node is shutdown (shutdown by accident), the net.connect will be blocked until timeout is reached, which cause the event center be blocked. This bug may cause mon probe timeout and osd not reply, and so on. Signed-off-by: Peng Liu <liupeng37@baidu.com>
This commit is contained in:
parent
250a65e045
commit
8b2a95011c
@ -15,6 +15,21 @@
|
||||
*/
|
||||
#include "RDMAStack.h"
|
||||
|
||||
class C_handle_connection_established : public EventCallback {
|
||||
RDMAConnectedSocketImpl *csi;
|
||||
bool active = true;
|
||||
public:
|
||||
C_handle_connection_established(RDMAConnectedSocketImpl *w) : csi(w) {}
|
||||
void do_request(uint64_t fd) final {
|
||||
if (active)
|
||||
csi->handle_connection_established();
|
||||
}
|
||||
void close() {
|
||||
active = false;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
#define dout_subsys ceph_subsys_ms
|
||||
#undef dout_prefix
|
||||
#define dout_prefix *_dout << " RDMAConnectedSocketImpl "
|
||||
@ -25,6 +40,7 @@ RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<In
|
||||
: cct(cct), connected(0), error(0), ib(ib),
|
||||
dispatcher(rdma_dispatcher), worker(w),
|
||||
is_server(false), con_handler(new C_handle_connection(this)),
|
||||
established_handler(new C_handle_connection_established(this)),
|
||||
active(false), pending(false)
|
||||
{
|
||||
if (!cct->_conf->ms_async_rdma_cm) {
|
||||
@ -101,7 +117,14 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S
|
||||
ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"
|
||||
<< opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;
|
||||
NetHandler net(cct);
|
||||
tcp_fd = net.connect(peer_addr, opts.connect_bind_addr);
|
||||
|
||||
// we construct a socket to transport ib sync message
|
||||
// but we shouldn't block in tcp connecting
|
||||
if (opts.nonblock) {
|
||||
tcp_fd = net.nonblock_connect(peer_addr, opts.connect_bind_addr);
|
||||
} else {
|
||||
tcp_fd = net.connect(peer_addr, opts.connect_bind_addr);
|
||||
}
|
||||
|
||||
if (tcp_fd < 0) {
|
||||
return -errno;
|
||||
@ -116,12 +139,38 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S
|
||||
|
||||
ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
|
||||
net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
|
||||
qp->get_local_cm_meta().peer_qpn = 0;
|
||||
r = qp->send_cm_meta(cct, tcp_fd);
|
||||
if (r < 0)
|
||||
return r;
|
||||
r = 0;
|
||||
if (opts.nonblock) {
|
||||
worker->center.create_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE , established_handler);
|
||||
} else {
|
||||
r = handle_connection_established(false);
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
int RDMAConnectedSocketImpl::handle_connection_established(bool need_set_fault) {
|
||||
ldout(cct, 20) << __func__ << " start " << dendl;
|
||||
// delete read event
|
||||
worker->center.delete_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE);
|
||||
if (1 == connected) {
|
||||
ldout(cct, 1) << __func__ << " warnning: logic failed " << dendl;
|
||||
if (need_set_fault) {
|
||||
fault();
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
// send handshake msg to server
|
||||
qp->get_local_cm_meta().peer_qpn = 0;
|
||||
int r = qp->send_cm_meta(cct, tcp_fd);
|
||||
if (r < 0) {
|
||||
ldout(cct, 1) << __func__ << " send handshake msg failed." << r << dendl;
|
||||
if (need_set_fault) {
|
||||
fault();
|
||||
}
|
||||
return r;
|
||||
}
|
||||
worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
|
||||
ldout(cct, 20) << __func__ << " finish " << dendl;
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -517,11 +566,16 @@ void RDMAConnectedSocketImpl::cleanup() {
|
||||
if (con_handler && tcp_fd >= 0) {
|
||||
(static_cast<C_handle_connection*>(con_handler))->close();
|
||||
worker->center.submit_to(worker->center.get_id(), [this]() {
|
||||
worker->center.delete_file_event(tcp_fd, EVENT_READABLE);
|
||||
worker->center.delete_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE);
|
||||
}, false);
|
||||
delete con_handler;
|
||||
con_handler = nullptr;
|
||||
}
|
||||
if (established_handler) {
|
||||
(static_cast<C_handle_connection_established*>(established_handler))->close();
|
||||
delete established_handler;
|
||||
established_handler = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
void RDMAConnectedSocketImpl::notify()
|
||||
|
@ -189,6 +189,7 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
|
||||
std::vector<ibv_wc> wc;
|
||||
bool is_server;
|
||||
EventCallbackRef con_handler;
|
||||
EventCallbackRef established_handler;
|
||||
int tcp_fd = -1;
|
||||
bool active;// qp is active ?
|
||||
bool pending;
|
||||
@ -225,6 +226,7 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
|
||||
int activate();
|
||||
void fin();
|
||||
void handle_connection();
|
||||
int handle_connection_established(bool need_set_fault = true);
|
||||
void cleanup();
|
||||
void set_accept_fd(int sd);
|
||||
virtual int try_connect(const entity_addr_t&, const SocketOptions &opt);
|
||||
@ -246,6 +248,7 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
|
||||
active = false;
|
||||
}
|
||||
};
|
||||
|
||||
};
|
||||
|
||||
enum RDMA_CM_STATUS {
|
||||
|
Loading…
Reference in New Issue
Block a user