mirror of
https://github.com/ceph/ceph
synced 2024-12-18 17:37:38 +00:00
Merge pull request #14297 from Adirl/ibport
msg/async/rdma: Make port number an attribute of the Connection not o… Reviewed-by: Haomai Wang <haomai@xsky.com>
This commit is contained in:
commit
71c78b859a
@ -116,7 +116,7 @@ Port::~Port()
|
||||
Device::Device(CephContext *cct, Infiniband *ib, ibv_device* d)
|
||||
: cct(cct), device(d), lock("ibdev_lock"),
|
||||
async_handler(new C_handle_cq_async(this)), infiniband(ib),
|
||||
device_attr(new ibv_device_attr), active_port(nullptr)
|
||||
device_attr(new ibv_device_attr)
|
||||
{
|
||||
if (device == NULL) {
|
||||
lderr(cct) << __func__ << " device == NULL" << cpp_strerror(errno) << dendl;
|
||||
@ -134,6 +134,15 @@ Device::Device(CephContext *cct, Infiniband *ib, ibv_device* d)
|
||||
ceph_abort();
|
||||
}
|
||||
|
||||
port_cnt = device_attr->phys_port_cnt;
|
||||
ports = new Port *[port_cnt + 1];
|
||||
assert(ports);
|
||||
|
||||
for (int i = 1; i <= port_cnt; i++) {
|
||||
ports[i] = new Port(cct, ctxt, i);
|
||||
assert(ports[i]);
|
||||
}
|
||||
|
||||
tx_cc = create_comp_channel(cct);
|
||||
assert(tx_cc);
|
||||
|
||||
@ -147,6 +156,8 @@ void Device::init(int ibport)
|
||||
{
|
||||
Mutex::Locker l(lock);
|
||||
|
||||
verify_port(ibport);
|
||||
|
||||
if (initialized)
|
||||
return;
|
||||
|
||||
@ -175,8 +186,6 @@ void Device::init(int ibport)
|
||||
rx_cq = create_comp_queue(cct, rx_cc);
|
||||
assert(rx_cq);
|
||||
|
||||
binding_port(cct, ibport);
|
||||
|
||||
initialized = true;
|
||||
|
||||
ldout(cct, 5) << __func__ << ":" << __LINE__ << " device " << *this << " is initialized" << dendl;
|
||||
@ -210,31 +219,35 @@ Device::~Device()
|
||||
|
||||
uninit();
|
||||
|
||||
if (active_port) {
|
||||
delete active_port;
|
||||
assert(ibv_close_device(ctxt) == 0);
|
||||
}
|
||||
for (int i = 1; i <= port_cnt; i++)
|
||||
delete ports[i];
|
||||
delete[] ports;
|
||||
|
||||
assert(ibv_close_device(ctxt) == 0);
|
||||
delete device_attr;
|
||||
}
|
||||
|
||||
void Device::binding_port(CephContext *cct, int port_num) {
|
||||
port_cnt = device_attr->phys_port_cnt;
|
||||
for (uint8_t i = 0; i < port_cnt; ++i) {
|
||||
Port *port = new Port(cct, ctxt, i+1);
|
||||
if (i + 1 == port_num && port->get_port_attr()->state == IBV_PORT_ACTIVE) {
|
||||
active_port = port;
|
||||
ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl;
|
||||
break;
|
||||
} else {
|
||||
ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl;
|
||||
}
|
||||
delete port;
|
||||
}
|
||||
if (nullptr == active_port) {
|
||||
void Device::verify_port(int port_num) {
|
||||
if (port_num < 0 || port_num > port_cnt) {
|
||||
lderr(cct) << __func__ << " port not found" << dendl;
|
||||
assert(active_port);
|
||||
ceph_abort();
|
||||
}
|
||||
|
||||
Port *port = ports[port_num];
|
||||
|
||||
if (port->get_port_attr()->state == IBV_PORT_ACTIVE) {
|
||||
ldout(cct, 1) << __func__ << " found active port " << port_num << dendl;
|
||||
} else {
|
||||
ldout(cct, 10) << __func__ << " port " << port_num <<
|
||||
" is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl;
|
||||
ceph_abort();
|
||||
}
|
||||
}
|
||||
|
||||
Port *Device::get_port(int ibport)
|
||||
{
|
||||
assert(ibport > 0 && ibport <= port_cnt);
|
||||
return ports[ibport];
|
||||
}
|
||||
|
||||
/**
|
||||
@ -247,11 +260,11 @@ void Device::binding_port(CephContext *cct, int port_num) {
|
||||
* QueuePair on success or NULL if init fails
|
||||
* See QueuePair::QueuePair for parameter documentation.
|
||||
*/
|
||||
Infiniband::QueuePair* Device::create_queue_pair(CephContext *cct,
|
||||
Infiniband::QueuePair* Device::create_queue_pair(int port,
|
||||
ibv_qp_type type)
|
||||
{
|
||||
Infiniband::QueuePair *qp = new QueuePair(
|
||||
cct, *this, type, active_port->get_port_num(), srq, tx_cq, rx_cq, max_send_wr, max_recv_wr);
|
||||
cct, *this, type, port, srq, tx_cq, rx_cq, max_send_wr, max_recv_wr);
|
||||
if (qp->init()) {
|
||||
delete qp;
|
||||
return NULL;
|
||||
|
@ -71,7 +71,11 @@ class Device {
|
||||
CephContext *cct;
|
||||
ibv_device *device;
|
||||
const char *name;
|
||||
uint8_t port_cnt;
|
||||
|
||||
Port **ports; // Array of Port objects. index is 1 based (IB port #1 is in
|
||||
// index 1). Index 0 is not used
|
||||
|
||||
int port_cnt;
|
||||
|
||||
uint32_t max_send_wr;
|
||||
uint32_t max_recv_wr;
|
||||
@ -82,6 +86,8 @@ class Device {
|
||||
EventCallbackRef async_handler;
|
||||
Infiniband *infiniband;
|
||||
|
||||
void verify_port(int port_num);
|
||||
|
||||
public:
|
||||
explicit Device(CephContext *c, Infiniband *ib, ibv_device* d);
|
||||
~Device();
|
||||
@ -92,12 +98,14 @@ class Device {
|
||||
void handle_async_event();
|
||||
|
||||
const char* get_name() const { return name;}
|
||||
uint16_t get_lid() { return active_port->get_lid(); }
|
||||
ibv_gid get_gid() { return active_port->get_gid(); }
|
||||
int get_gid_idx() { return active_port->get_gid_idx(); }
|
||||
void binding_port(CephContext *c, int port_num);
|
||||
|
||||
QueuePair* create_queue_pair(CephContext *c, ibv_qp_type type);
|
||||
Port *get_port(int ibport);
|
||||
uint16_t get_lid(int p) { return get_port(p)->get_lid(); }
|
||||
ibv_gid get_gid(int p) { return get_port(p)->get_gid(); }
|
||||
int get_gid_idx(int p) { return get_port(p)->get_gid_idx(); }
|
||||
|
||||
QueuePair *create_queue_pair(int port,
|
||||
ibv_qp_type type);
|
||||
ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
|
||||
CompletionChannel *create_comp_channel(CephContext *c);
|
||||
CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
|
||||
@ -115,7 +123,6 @@ class Device {
|
||||
|
||||
struct ibv_context *ctxt;
|
||||
ibv_device_attr *device_attr;
|
||||
Port* active_port;
|
||||
|
||||
MemoryManager* memory_manager = nullptr;
|
||||
ibv_srq *srq = nullptr;
|
||||
|
@ -44,7 +44,7 @@ QueuePair *RDMAConnectedSocketImpl::create_queue_pair(Device *d, int p)
|
||||
ibdev = d;
|
||||
ibport = p;
|
||||
|
||||
qp = ibdev->create_queue_pair(cct, IBV_QPT_RC);
|
||||
qp = ibdev->create_queue_pair(ibport, IBV_QPT_RC);
|
||||
|
||||
local_qpn = qp->get_local_qp_number();
|
||||
|
||||
@ -67,9 +67,9 @@ RDMAConnTCP::RDMAConnTCP(CephContext *cct, RDMAConnectedSocketImpl *sock,
|
||||
|
||||
my_msg.qpn = socket->local_qpn;
|
||||
my_msg.psn = qp->get_initial_psn();
|
||||
my_msg.lid = ibdev->get_lid();
|
||||
my_msg.lid = ibdev->get_lid(ibport);
|
||||
my_msg.peer_qpn = 0;
|
||||
my_msg.gid = ibdev->get_gid();
|
||||
my_msg.gid = ibdev->get_gid(ibport);
|
||||
socket->register_qp(qp);
|
||||
}
|
||||
|
||||
@ -132,6 +132,9 @@ int RDMAConnTCP::activate()
|
||||
ibv_qp_attr qpa;
|
||||
int r;
|
||||
|
||||
Device *ibdev = socket->get_device();
|
||||
int ibport = socket->get_ibport();
|
||||
|
||||
socket->remote_qpn = peer_msg.qpn;
|
||||
|
||||
// now connect up the qps and switch to RTR
|
||||
@ -147,12 +150,12 @@ int RDMAConnTCP::activate()
|
||||
qpa.ah_attr.grh.hop_limit = 6;
|
||||
qpa.ah_attr.grh.dgid = peer_msg.gid;
|
||||
|
||||
qpa.ah_attr.grh.sgid_index = socket->get_device()->get_gid_idx();
|
||||
qpa.ah_attr.grh.sgid_index = ibdev->get_gid_idx(ibport);
|
||||
|
||||
qpa.ah_attr.dlid = peer_msg.lid;
|
||||
qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
|
||||
qpa.ah_attr.src_path_bits = 0;
|
||||
qpa.ah_attr.port_num = (uint8_t)socket->get_ibport();
|
||||
qpa.ah_attr.port_num = (uint8_t)ibport;
|
||||
|
||||
ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user