From af2f1ecd2700ef3fad7e7d16d61f8ee86794b3ae Mon Sep 17 00:00:00 2001 From: Amir Vadai Date: Wed, 22 Mar 2017 12:41:12 +0200 Subject: [PATCH] msg/async/rdma: Make port number an attribute of the Connection not of the Device Since multiple connections on different ports could exist, shouldn't use device->active_port, instead use conn->ibport. Or in other words, now Device object doesn't have an active_port, instead every port specific action (create_qp, get_lid, get_gid etc.) need to specify the port number. The information about the port number is known to the connection (RDMAConnectedSocket*) who is the caller of those actions. Issue: 995322 Change-Id: I482cb87c04ba99845dc44f6dd0547835fe814ebf Signed-off-by: Amir Vadai --- src/msg/async/rdma/Device.cc | 61 +++++++++++-------- src/msg/async/rdma/Device.h | 21 ++++--- src/msg/async/rdma/RDMAConnectedSocketImpl.cc | 13 ++-- 3 files changed, 59 insertions(+), 36 deletions(-) diff --git a/src/msg/async/rdma/Device.cc b/src/msg/async/rdma/Device.cc index fc09063989f..392ed7f69c7 100644 --- a/src/msg/async/rdma/Device.cc +++ b/src/msg/async/rdma/Device.cc @@ -116,7 +116,7 @@ Port::~Port() Device::Device(CephContext *cct, Infiniband *ib, ibv_device* d) : cct(cct), device(d), lock("ibdev_lock"), async_handler(new C_handle_cq_async(this)), infiniband(ib), - device_attr(new ibv_device_attr), active_port(nullptr) + device_attr(new ibv_device_attr) { if (device == NULL) { lderr(cct) << __func__ << " device == NULL" << cpp_strerror(errno) << dendl; @@ -134,6 +134,15 @@ Device::Device(CephContext *cct, Infiniband *ib, ibv_device* d) ceph_abort(); } + port_cnt = device_attr->phys_port_cnt; + ports = new Port *[port_cnt + 1]; + assert(ports); + + for (int i = 1; i <= port_cnt; i++) { + ports[i] = new Port(cct, ctxt, i); + assert(ports[i]); + } + tx_cc = create_comp_channel(cct); assert(tx_cc); @@ -147,6 +156,8 @@ void Device::init(int ibport) { Mutex::Locker l(lock); + verify_port(ibport); + if (initialized) return; @@ -175,8 +186,6 @@ void Device::init(int ibport) rx_cq = create_comp_queue(cct, rx_cc); assert(rx_cq); - binding_port(cct, ibport); - initialized = true; ldout(cct, 5) << __func__ << ":" << __LINE__ << " device " << *this << " is initialized" << dendl; @@ -210,31 +219,35 @@ Device::~Device() uninit(); - if (active_port) { - delete active_port; - assert(ibv_close_device(ctxt) == 0); - } + for (int i = 1; i <= port_cnt; i++) + delete ports[i]; + delete[] ports; + assert(ibv_close_device(ctxt) == 0); delete device_attr; } -void Device::binding_port(CephContext *cct, int port_num) { - port_cnt = device_attr->phys_port_cnt; - for (uint8_t i = 0; i < port_cnt; ++i) { - Port *port = new Port(cct, ctxt, i+1); - if (i + 1 == port_num && port->get_port_attr()->state == IBV_PORT_ACTIVE) { - active_port = port; - ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl; - break; - } else { - ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl; - } - delete port; - } - if (nullptr == active_port) { +void Device::verify_port(int port_num) { + if (port_num < 0 || port_num > port_cnt) { lderr(cct) << __func__ << " port not found" << dendl; - assert(active_port); + ceph_abort(); } + + Port *port = ports[port_num]; + + if (port->get_port_attr()->state == IBV_PORT_ACTIVE) { + ldout(cct, 1) << __func__ << " found active port " << port_num << dendl; + } else { + ldout(cct, 10) << __func__ << " port " << port_num << + " is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl; + ceph_abort(); + } +} + +Port *Device::get_port(int ibport) +{ + assert(ibport > 0 && ibport <= port_cnt); + return ports[ibport]; } /** @@ -247,11 +260,11 @@ void Device::binding_port(CephContext *cct, int port_num) { * QueuePair on success or NULL if init fails * See QueuePair::QueuePair for parameter documentation. */ -Infiniband::QueuePair* Device::create_queue_pair(CephContext *cct, +Infiniband::QueuePair* Device::create_queue_pair(int port, ibv_qp_type type) { Infiniband::QueuePair *qp = new QueuePair( - cct, *this, type, active_port->get_port_num(), srq, tx_cq, rx_cq, max_send_wr, max_recv_wr); + cct, *this, type, port, srq, tx_cq, rx_cq, max_send_wr, max_recv_wr); if (qp->init()) { delete qp; return NULL; diff --git a/src/msg/async/rdma/Device.h b/src/msg/async/rdma/Device.h index 636fd76eb93..5ba7d5ca4ef 100644 --- a/src/msg/async/rdma/Device.h +++ b/src/msg/async/rdma/Device.h @@ -71,7 +71,11 @@ class Device { CephContext *cct; ibv_device *device; const char *name; - uint8_t port_cnt; + + Port **ports; // Array of Port objects. index is 1 based (IB port #1 is in + // index 1). Index 0 is not used + + int port_cnt; uint32_t max_send_wr; uint32_t max_recv_wr; @@ -82,6 +86,8 @@ class Device { EventCallbackRef async_handler; Infiniband *infiniband; + void verify_port(int port_num); + public: explicit Device(CephContext *c, Infiniband *ib, ibv_device* d); ~Device(); @@ -92,12 +98,14 @@ class Device { void handle_async_event(); const char* get_name() const { return name;} - uint16_t get_lid() { return active_port->get_lid(); } - ibv_gid get_gid() { return active_port->get_gid(); } - int get_gid_idx() { return active_port->get_gid_idx(); } - void binding_port(CephContext *c, int port_num); - QueuePair* create_queue_pair(CephContext *c, ibv_qp_type type); + Port *get_port(int ibport); + uint16_t get_lid(int p) { return get_port(p)->get_lid(); } + ibv_gid get_gid(int p) { return get_port(p)->get_gid(); } + int get_gid_idx(int p) { return get_port(p)->get_gid_idx(); } + + QueuePair *create_queue_pair(int port, + ibv_qp_type type); ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge); CompletionChannel *create_comp_channel(CephContext *c); CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL); @@ -115,7 +123,6 @@ class Device { struct ibv_context *ctxt; ibv_device_attr *device_attr; - Port* active_port; MemoryManager* memory_manager = nullptr; ibv_srq *srq = nullptr; diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc index 80f3c014adb..83c8b5387b9 100644 --- a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -44,7 +44,7 @@ QueuePair *RDMAConnectedSocketImpl::create_queue_pair(Device *d, int p) ibdev = d; ibport = p; - qp = ibdev->create_queue_pair(cct, IBV_QPT_RC); + qp = ibdev->create_queue_pair(ibport, IBV_QPT_RC); local_qpn = qp->get_local_qp_number(); @@ -67,9 +67,9 @@ RDMAConnTCP::RDMAConnTCP(CephContext *cct, RDMAConnectedSocketImpl *sock, my_msg.qpn = socket->local_qpn; my_msg.psn = qp->get_initial_psn(); - my_msg.lid = ibdev->get_lid(); + my_msg.lid = ibdev->get_lid(ibport); my_msg.peer_qpn = 0; - my_msg.gid = ibdev->get_gid(); + my_msg.gid = ibdev->get_gid(ibport); socket->register_qp(qp); } @@ -132,6 +132,9 @@ int RDMAConnTCP::activate() ibv_qp_attr qpa; int r; + Device *ibdev = socket->get_device(); + int ibport = socket->get_ibport(); + socket->remote_qpn = peer_msg.qpn; // now connect up the qps and switch to RTR @@ -147,12 +150,12 @@ int RDMAConnTCP::activate() qpa.ah_attr.grh.hop_limit = 6; qpa.ah_attr.grh.dgid = peer_msg.gid; - qpa.ah_attr.grh.sgid_index = socket->get_device()->get_gid_idx(); + qpa.ah_attr.grh.sgid_index = ibdev->get_gid_idx(ibport); qpa.ah_attr.dlid = peer_msg.lid; qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl; qpa.ah_attr.src_path_bits = 0; - qpa.ah_attr.port_num = (uint8_t)socket->get_ibport(); + qpa.ah_attr.port_num = (uint8_t)ibport; ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;