mirror of
https://github.com/ceph/ceph
synced 2025-02-21 01:47:25 +00:00
msg/async/rdma: refactor QP state switch & ib_cm_meta_t transaction
1. Implement below 3 function in class QueuePair to switch QP state 1) int modify_qp_to_error(void); 2) int modify_qp_to_rts(void); 3) int modify_qp_to_rtr(void); 3. All connection meta data are member of class QueuePair. So, send/recv connection meta data directly in send/recv_cm_meta i.e. change send/recv_cm_meta API without using parameter cm_meta_data. 4. RDMAConnectedSocketImpl need members to track peer_qpn and local_qpn. Signed-off-by: Changcheng Liu <changcheng.liu@aliyun.com>
This commit is contained in:
parent
946769fa12
commit
8870e1bc0d
@ -182,6 +182,89 @@ Infiniband::QueuePair::QueuePair(
|
||||
}
|
||||
}
|
||||
|
||||
int Infiniband::QueuePair::modify_qp_to_error(void)
|
||||
{
|
||||
ibv_qp_attr qpa;
|
||||
memset(&qpa, 0, sizeof(qpa));
|
||||
qpa.qp_state = IBV_QPS_ERR;
|
||||
if (ibv_modify_qp(qp, &qpa, IBV_QP_STATE)) {
|
||||
lderr(cct) << __func__ << " failed to transition to ERROR state: " << cpp_strerror(errno) << dendl;
|
||||
return -1;
|
||||
}
|
||||
ldout(cct, 20) << __func__ << " transition to ERROR state successfully." << dendl;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int Infiniband::QueuePair::modify_qp_to_rts(void)
|
||||
{
|
||||
// move from RTR state RTS
|
||||
ibv_qp_attr qpa;
|
||||
memset(&qpa, 0, sizeof(qpa));
|
||||
qpa.qp_state = IBV_QPS_RTS;
|
||||
/*
|
||||
* How long to wait before retrying if packet lost or server dead.
|
||||
* Supposedly the timeout is 4.096us*2^timeout. However, the actual
|
||||
* timeout appears to be 4.096us*2^(timeout+1), so the setting
|
||||
* below creates a 135ms timeout.
|
||||
*/
|
||||
qpa.timeout = 0x12;
|
||||
// How many times to retry after timeouts before giving up.
|
||||
qpa.retry_cnt = 7;
|
||||
/*
|
||||
* How many times to retry after RNR (receiver not ready) condition
|
||||
* before giving up. Occurs when the remote side has not yet posted
|
||||
* a receive request.
|
||||
*/
|
||||
qpa.rnr_retry = 7; // 7 is infinite retry.
|
||||
qpa.sq_psn = local_cm_meta.psn;
|
||||
qpa.max_rd_atomic = 1;
|
||||
|
||||
int attr_mask = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC;
|
||||
int r = ibv_modify_qp(qp, &qpa, attr_mask);
|
||||
if (r) {
|
||||
lderr(cct) << __func__ << " failed to transition to RTS state: " << cpp_strerror(errno) << dendl;
|
||||
return -1;
|
||||
}
|
||||
ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int Infiniband::QueuePair::modify_qp_to_rtr(void)
|
||||
{
|
||||
// move from INIT to RTR state
|
||||
ibv_qp_attr qpa;
|
||||
memset(&qpa, 0, sizeof(qpa));
|
||||
qpa.qp_state = IBV_QPS_RTR;
|
||||
qpa.path_mtu = IBV_MTU_1024;
|
||||
qpa.dest_qp_num = peer_cm_meta.local_qpn;
|
||||
qpa.rq_psn = peer_cm_meta.psn;
|
||||
qpa.max_dest_rd_atomic = 1;
|
||||
qpa.min_rnr_timer = 0x12;
|
||||
qpa.ah_attr.is_global = 1;
|
||||
qpa.ah_attr.grh.hop_limit = 6;
|
||||
qpa.ah_attr.grh.dgid = peer_cm_meta.gid;
|
||||
qpa.ah_attr.grh.sgid_index = infiniband.get_device()->get_gid_idx();
|
||||
qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp;
|
||||
//qpa.ah_attr.grh.flow_label = 0;
|
||||
|
||||
qpa.ah_attr.dlid = peer_cm_meta.lid;
|
||||
qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
|
||||
qpa.ah_attr.src_path_bits = 0;
|
||||
qpa.ah_attr.port_num = (uint8_t)(ib_physical_port);
|
||||
|
||||
ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
|
||||
|
||||
int attr_mask = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MIN_RNR_TIMER | IBV_QP_MAX_DEST_RD_ATOMIC;
|
||||
|
||||
int r = ibv_modify_qp(qp, &qpa, attr_mask);
|
||||
if (r) {
|
||||
lderr(cct) << __func__ << " failed to transition to RTR state: " << cpp_strerror(errno) << dendl;
|
||||
return -1;
|
||||
}
|
||||
ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int Infiniband::QueuePair::modify_qp_to_init(void)
|
||||
{
|
||||
// move from RESET to INIT state
|
||||
@ -296,7 +379,7 @@ void Infiniband::QueuePair::gid_to_wire_gid(const ib_cm_meta_t& cm_meta_data, ch
|
||||
* 0: means got enough buffer
|
||||
* < 0: means error
|
||||
*/
|
||||
int Infiniband::QueuePair::recv_cm_meta(CephContext *cct, int socket_fd, ib_cm_meta_t& cm_meta_data)
|
||||
int Infiniband::QueuePair::recv_cm_meta(CephContext *cct, int socket_fd)
|
||||
{
|
||||
char msg[TCP_MSG_LEN];
|
||||
char gid[33];
|
||||
@ -318,15 +401,15 @@ int Infiniband::QueuePair::recv_cm_meta(CephContext *cct, int socket_fd, ib_cm_m
|
||||
ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
|
||||
r = -EINVAL;
|
||||
} else { // valid message
|
||||
sscanf(msg, "%hx:%x:%x:%x:%s", &(cm_meta_data.lid), &(cm_meta_data.local_qpn), &(cm_meta_data.psn), &(cm_meta_data.peer_qpn), gid);
|
||||
wire_gid_to_gid(gid, &cm_meta_data);
|
||||
ldout(cct, 5) << __func__ << " recevd: " << cm_meta_data.lid << ", " << cm_meta_data.local_qpn
|
||||
<< ", " << cm_meta_data.psn << ", " << cm_meta_data.peer_qpn << ", " << gid << dendl;
|
||||
sscanf(msg, "%hx:%x:%x:%x:%s", &(peer_cm_meta.lid), &(peer_cm_meta.local_qpn), &(peer_cm_meta.psn), &(peer_cm_meta.peer_qpn), gid);
|
||||
wire_gid_to_gid(gid, &peer_cm_meta);
|
||||
ldout(cct, 5) << __func__ << " recevd: " << peer_cm_meta.lid << ", " << peer_cm_meta.local_qpn
|
||||
<< ", " << peer_cm_meta.psn << ", " << peer_cm_meta.peer_qpn << ", " << gid << dendl;
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
int Infiniband::QueuePair::send_cm_meta(CephContext *cct, int socket_fd, ib_cm_meta_t& cm_meta_data)
|
||||
int Infiniband::QueuePair::send_cm_meta(CephContext *cct, int socket_fd)
|
||||
{
|
||||
int retry = 0;
|
||||
ssize_t r;
|
||||
@ -334,10 +417,10 @@ int Infiniband::QueuePair::send_cm_meta(CephContext *cct, int socket_fd, ib_cm_m
|
||||
char msg[TCP_MSG_LEN];
|
||||
char gid[33];
|
||||
retry:
|
||||
gid_to_wire_gid(cm_meta_data, gid);
|
||||
sprintf(msg, "%04x:%08x:%08x:%08x:%s", cm_meta_data.lid, cm_meta_data.local_qpn, cm_meta_data.psn, cm_meta_data.peer_qpn, gid);
|
||||
ldout(cct, 10) << __func__ << " sending: " << cm_meta_data.lid << ", " << cm_meta_data.local_qpn
|
||||
<< ", " << cm_meta_data.psn << ", " << cm_meta_data.peer_qpn << ", " << gid << dendl;
|
||||
gid_to_wire_gid(local_cm_meta, gid);
|
||||
sprintf(msg, "%04x:%08x:%08x:%08x:%s", local_cm_meta.lid, local_cm_meta.local_qpn, local_cm_meta.psn, local_cm_meta.peer_qpn, gid);
|
||||
ldout(cct, 10) << __func__ << " sending: " << local_cm_meta.lid << ", " << local_cm_meta.local_qpn
|
||||
<< ", " << local_cm_meta.psn << ", " << local_cm_meta.peer_qpn << ", " << gid << dendl;
|
||||
r = ::write(socket_fd, msg, sizeof(msg));
|
||||
// Drop incoming qpt
|
||||
if (cct->_conf->ms_inject_socket_failures && socket_fd >= 0) {
|
||||
|
@ -444,6 +444,9 @@ class Infiniband {
|
||||
uint32_t tx_queue_len, uint32_t max_recv_wr, struct rdma_cm_id *cid, uint32_t q_key = 0);
|
||||
~QueuePair();
|
||||
|
||||
int modify_qp_to_error();
|
||||
int modify_qp_to_rts();
|
||||
int modify_qp_to_rtr();
|
||||
int modify_qp_to_init();
|
||||
int init();
|
||||
|
||||
@ -476,8 +479,8 @@ class Infiniband {
|
||||
/*
|
||||
* send/receive connection management meta data
|
||||
*/
|
||||
int send_cm_meta(CephContext *cct, int socket_fd, ib_cm_meta_t& cm_meta_data);
|
||||
int recv_cm_meta(CephContext *cct, int socket_fd, ib_cm_meta_t& cm_meta_data);
|
||||
int send_cm_meta(CephContext *cct, int socket_fd);
|
||||
int recv_cm_meta(CephContext *cct, int socket_fd);
|
||||
void wire_gid_to_gid(const char *wgid, ib_cm_meta_t* cm_meta_data);
|
||||
void gid_to_wire_gid(const ib_cm_meta_t& cm_meta_data, char wgid[]);
|
||||
void add_tx_wr(uint32_t amt) { tx_wr_inflight += amt; }
|
||||
@ -488,6 +491,8 @@ class Infiniband {
|
||||
Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; }
|
||||
int to_dead();
|
||||
bool is_dead() const { return dead; }
|
||||
ib_cm_meta_t& get_peer_cm_meta() { return peer_cm_meta; }
|
||||
ib_cm_meta_t& get_local_cm_meta() { return local_cm_meta; }
|
||||
|
||||
private:
|
||||
CephContext *cct;
|
||||
|
@ -29,11 +29,7 @@ RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<In
|
||||
{
|
||||
if (!cct->_conf->ms_async_rdma_cm) {
|
||||
qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(), dispatcher->get_rx_cq(), IBV_QPT_RC, NULL);
|
||||
local_cm_meta.local_qpn = qp->get_local_qp_number();
|
||||
local_cm_meta.psn = qp->get_initial_psn();
|
||||
local_cm_meta.lid = ib->get_lid();
|
||||
local_cm_meta.peer_qpn = 0;
|
||||
local_cm_meta.gid = ib->get_gid();
|
||||
local_qpn = qp->get_local_qp_number();
|
||||
notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
|
||||
dispatcher->register_qp(qp, this);
|
||||
dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
|
||||
@ -46,7 +42,7 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
|
||||
ldout(cct, 20) << __func__ << " destruct." << dendl;
|
||||
cleanup();
|
||||
worker->remove_pending_conn(this);
|
||||
dispatcher->erase_qpn(local_cm_meta.local_qpn);
|
||||
dispatcher->erase_qpn(local_qpn);
|
||||
|
||||
for (unsigned i=0; i < wc.size(); ++i) {
|
||||
dispatcher->post_chunk_to_pool(reinterpret_cast<Chunk*>(wc[i].wr_id));
|
||||
@ -83,89 +79,20 @@ void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w)
|
||||
|
||||
int RDMAConnectedSocketImpl::activate()
|
||||
{
|
||||
ibv_qp_attr qpa;
|
||||
int r;
|
||||
|
||||
// now connect up the qps and switch to RTR
|
||||
memset(&qpa, 0, sizeof(qpa));
|
||||
qpa.qp_state = IBV_QPS_RTR;
|
||||
qpa.path_mtu = IBV_MTU_1024;
|
||||
qpa.dest_qp_num = peer_cm_meta.local_qpn;
|
||||
qpa.rq_psn = peer_cm_meta.psn;
|
||||
qpa.max_dest_rd_atomic = 1;
|
||||
qpa.min_rnr_timer = 12;
|
||||
//qpa.ah_attr.is_global = 0;
|
||||
qpa.ah_attr.is_global = 1;
|
||||
qpa.ah_attr.grh.hop_limit = 6;
|
||||
qpa.ah_attr.grh.dgid = peer_cm_meta.gid;
|
||||
|
||||
qpa.ah_attr.grh.sgid_index = ib->get_device()->get_gid_idx();
|
||||
|
||||
qpa.ah_attr.dlid = peer_cm_meta.lid;
|
||||
qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
|
||||
qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp;
|
||||
qpa.ah_attr.src_path_bits = 0;
|
||||
qpa.ah_attr.port_num = (uint8_t)(ib->get_ib_physical_port());
|
||||
|
||||
ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
|
||||
|
||||
r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
|
||||
IBV_QP_AV |
|
||||
IBV_QP_PATH_MTU |
|
||||
IBV_QP_DEST_QPN |
|
||||
IBV_QP_RQ_PSN |
|
||||
IBV_QP_MIN_RNR_TIMER |
|
||||
IBV_QP_MAX_DEST_RD_ATOMIC);
|
||||
if (r) {
|
||||
lderr(cct) << __func__ << " failed to transition to RTR state: "
|
||||
<< cpp_strerror(errno) << dendl;
|
||||
qp->get_local_cm_meta().peer_qpn = qp->get_peer_cm_meta().local_qpn;
|
||||
if (qp->modify_qp_to_rtr() != 0)
|
||||
return -1;
|
||||
}
|
||||
|
||||
ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl;
|
||||
|
||||
// now move to RTS
|
||||
qpa.qp_state = IBV_QPS_RTS;
|
||||
|
||||
// How long to wait before retrying if packet lost or server dead.
|
||||
// Supposedly the timeout is 4.096us*2^timeout. However, the actual
|
||||
// timeout appears to be 4.096us*2^(timeout+1), so the setting
|
||||
// below creates a 135ms timeout.
|
||||
qpa.timeout = 14;
|
||||
|
||||
// How many times to retry after timeouts before giving up.
|
||||
qpa.retry_cnt = 7;
|
||||
|
||||
// How many times to retry after RNR (receiver not ready) condition
|
||||
// before giving up. Occurs when the remote side has not yet posted
|
||||
// a receive request.
|
||||
qpa.rnr_retry = 7; // 7 is infinite retry.
|
||||
qpa.sq_psn = local_cm_meta.psn;
|
||||
qpa.max_rd_atomic = 1;
|
||||
|
||||
r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
|
||||
IBV_QP_TIMEOUT |
|
||||
IBV_QP_RETRY_CNT |
|
||||
IBV_QP_RNR_RETRY |
|
||||
IBV_QP_SQ_PSN |
|
||||
IBV_QP_MAX_QP_RD_ATOMIC);
|
||||
if (r) {
|
||||
lderr(cct) << __func__ << " failed to transition to RTS state: "
|
||||
<< cpp_strerror(errno) << dendl;
|
||||
if (qp->modify_qp_to_rts() != 0)
|
||||
return -1;
|
||||
}
|
||||
|
||||
// the queue pair should be ready to use once the client has finished
|
||||
// setting up their end.
|
||||
ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl;
|
||||
ldout(cct, 20) << __func__ << " QueuePair: " << qp << " with qp:" << qp->get_qp() << dendl;
|
||||
|
||||
if (!is_server) {
|
||||
connected = 1; //indicate successfully
|
||||
ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << local_cm_meta.local_qpn << dendl;
|
||||
ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << local_qpn << dendl;
|
||||
submit(false);
|
||||
}
|
||||
active = true;
|
||||
peer_qpn = qp->get_local_cm_meta().peer_qpn;
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -189,8 +116,8 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S
|
||||
|
||||
ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
|
||||
net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
|
||||
local_cm_meta.peer_qpn = 0;
|
||||
r = qp->send_cm_meta(cct, tcp_fd, local_cm_meta);
|
||||
qp->get_local_cm_meta().peer_qpn = 0;
|
||||
r = qp->send_cm_meta(cct, tcp_fd);
|
||||
if (r < 0)
|
||||
return r;
|
||||
|
||||
@ -199,8 +126,8 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S
|
||||
}
|
||||
|
||||
void RDMAConnectedSocketImpl::handle_connection() {
|
||||
ldout(cct, 20) << __func__ << " QP: " << local_cm_meta.local_qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
|
||||
int r = qp->recv_cm_meta(cct, tcp_fd, peer_cm_meta);
|
||||
ldout(cct, 20) << __func__ << " QP: " << local_qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
|
||||
int r = qp->recv_cm_meta(cct, tcp_fd);
|
||||
if (r <= 0) {
|
||||
if (r != -EAGAIN) {
|
||||
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
|
||||
@ -216,37 +143,34 @@ void RDMAConnectedSocketImpl::handle_connection() {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!is_server) {// syn + ack from server
|
||||
local_cm_meta.peer_qpn = peer_cm_meta.local_qpn;
|
||||
ldout(cct, 20) << __func__ << " peer msg : < " << peer_cm_meta.local_qpn << ", " << peer_cm_meta.psn
|
||||
<< ", " << peer_cm_meta.lid << ", " << peer_cm_meta.peer_qpn << "> " << dendl;
|
||||
if (!is_server) {// first time: cm meta sync + ack from server
|
||||
if (!connected) {
|
||||
r = activate();
|
||||
ceph_assert(!r);
|
||||
}
|
||||
notify();
|
||||
r = qp->send_cm_meta(cct, tcp_fd, local_cm_meta);
|
||||
r = qp->send_cm_meta(cct, tcp_fd);
|
||||
if (r < 0) {
|
||||
ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
|
||||
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
|
||||
fault();
|
||||
}
|
||||
} else {
|
||||
if (peer_cm_meta.peer_qpn == 0) {// syn from client
|
||||
if (qp->get_peer_cm_meta().peer_qpn == 0) {// first time: cm meta sync from client
|
||||
if (active) {
|
||||
ldout(cct, 10) << __func__ << " server is already active." << dendl;
|
||||
return ;
|
||||
}
|
||||
r = activate();
|
||||
ceph_assert(!r);
|
||||
r = qp->send_cm_meta(cct, tcp_fd, local_cm_meta);
|
||||
r = qp->send_cm_meta(cct, tcp_fd);
|
||||
if (r < 0) {
|
||||
ldout(cct, 1) << __func__ << " server ack failed." << dendl;
|
||||
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
|
||||
fault();
|
||||
return ;
|
||||
}
|
||||
} else { // ack from client
|
||||
} else { // second time: cm meta ack from client
|
||||
connected = 1;
|
||||
ldout(cct, 10) << __func__ << " handshake of rdma is done. server connected: " << connected << dendl;
|
||||
//cleanup();
|
||||
@ -260,13 +184,14 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
|
||||
{
|
||||
eventfd_t event_val = 0;
|
||||
int r = eventfd_read(notify_fd, &event_val);
|
||||
ldout(cct, 20) << __func__ << " notify_fd : " << event_val << " in " << local_cm_meta.local_qpn << " r = " << r << dendl;
|
||||
|
||||
ldout(cct, 20) << __func__ << " notify_fd : " << event_val << " in " << local_qpn
|
||||
<< " r = " << r << dendl;
|
||||
|
||||
if (!active) {
|
||||
ldout(cct, 1) << __func__ << " when ib not active. len: " << len << dendl;
|
||||
return -EAGAIN;
|
||||
}
|
||||
|
||||
|
||||
if (0 == connected) {
|
||||
ldout(cct, 1) << __func__ << " when ib not connected. len: " << len <<dendl;
|
||||
return -EAGAIN;
|
||||
@ -275,7 +200,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
|
||||
read = read_buffers(buf,len);
|
||||
|
||||
if (is_server && connected == 0) {
|
||||
ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << local_cm_meta.local_qpn << " peer QP: " << peer_cm_meta.local_qpn << dendl;
|
||||
ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << local_qpn << " peer QP: " << peer_qpn << dendl;
|
||||
connected = 1; //if so, we don't need the last handshake
|
||||
cleanup();
|
||||
submit(false);
|
||||
@ -411,11 +336,11 @@ ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
|
||||
std::lock_guard l{lock};
|
||||
pending_bl.claim_append(bl);
|
||||
if (!connected) {
|
||||
ldout(cct, 20) << __func__ << " fake send to upper, QP: " << local_cm_meta.local_qpn << dendl;
|
||||
ldout(cct, 20) << __func__ << " fake send to upper, QP: " << local_qpn << dendl;
|
||||
return bytes;
|
||||
}
|
||||
}
|
||||
ldout(cct, 20) << __func__ << " QP: " << local_cm_meta.local_qpn << dendl;
|
||||
ldout(cct, 20) << __func__ << " QP: " << local_qpn << dendl;
|
||||
ssize_t r = submit(more);
|
||||
if (r < 0 && r != -EAGAIN)
|
||||
return r;
|
||||
@ -522,7 +447,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
|
||||
|
||||
int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
|
||||
{
|
||||
ldout(cct, 20) << __func__ << " QP: " << local_cm_meta.local_qpn << " " << tx_buffers[0] << dendl;
|
||||
ldout(cct, 20) << __func__ << " QP: " << local_qpn << " " << tx_buffers[0] << dendl;
|
||||
vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
|
||||
ibv_sge isge[tx_buffers.size()];
|
||||
uint32_t current_sge = 0;
|
||||
|
@ -30,7 +30,7 @@ RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, sha
|
||||
}, false);
|
||||
status = RESOURCE_ALLOCATED;
|
||||
local_qpn = qp->get_local_qp_number();
|
||||
local_cm_meta.local_qpn = local_qpn;
|
||||
qp->get_local_cm_meta().local_qpn = local_qpn;
|
||||
} else {
|
||||
is_server = false;
|
||||
cm_channel = rdma_create_event_channel();
|
||||
@ -98,7 +98,7 @@ void RDMAIWARPConnectedSocketImpl::handle_cm_connection() {
|
||||
break;
|
||||
}
|
||||
local_qpn = qp->get_local_qp_number();
|
||||
local_cm_meta.local_qpn = local_qpn;
|
||||
qp->get_local_cm_meta().local_qpn = local_qpn;
|
||||
|
||||
memset(&cm_params, 0, sizeof(cm_params));
|
||||
cm_params.retry_count = RETRY_COUNT;
|
||||
|
@ -185,8 +185,8 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
|
||||
protected:
|
||||
CephContext *cct;
|
||||
Infiniband::QueuePair *qp;
|
||||
ib_cm_meta_t peer_cm_meta;
|
||||
ib_cm_meta_t local_cm_meta;
|
||||
uint32_t peer_qpn = 0;
|
||||
uint32_t local_qpn = 0;
|
||||
int connected;
|
||||
int error;
|
||||
shared_ptr<Infiniband> ib;
|
||||
|
Loading…
Reference in New Issue
Block a user