Merge pull request #14088 from Adirl/rdma-cm-2

msg/async/rdma: Move resource handling to Device

Reviewed-by: Haomai Wang <haomai@xsky.com>
This commit is contained in:
Haomai Wang 2017-03-27 22:25:09 +08:00 committed by GitHub
commit 53e0344628
7 changed files with 582 additions and 316 deletions

View File

@ -15,14 +15,20 @@
*/
#include "Infiniband.h"
#include "RDMAStack.h"
#include "Device.h"
#include "common/errno.h"
#include "common/debug.h"
#include <poll.h>
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix *_dout << "IBDevice "
static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1;
static const uint32_t CQ_DEPTH = 30000;
Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr)
{
#ifdef HAVE_IBV_EXP
@ -102,7 +108,10 @@ Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt
}
Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_device_attr), active_port(nullptr)
Device::Device(CephContext *cct, Infiniband *ib, ibv_device* d)
: cct(cct), device(d), lock("ibdev_lock"),
async_handler(new C_handle_cq_async(this)), infiniband(ib),
device_attr(new ibv_device_attr), active_port(nullptr)
{
if (device == NULL) {
lderr(cct) << __func__ << " device == NULL" << cpp_strerror(errno) << dendl;
@ -119,10 +128,78 @@ Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_
lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;
ceph_abort();
}
tx_cc = create_comp_channel(cct);
assert(tx_cc);
rx_cc = create_comp_channel(cct);
assert(rx_cc);
assert(NetHandler(cct).set_nonblock(ctxt->async_fd) == 0);
}
void Device::init()
{
Mutex::Locker l(lock);
if (initialized)
return;
pd = new ProtectionDomain(cct, this);
max_recv_wr = std::min(device_attr->max_srq_wr, (int)cct->_conf->ms_async_rdma_receive_buffers);
ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl;
max_send_wr = std::min(device_attr->max_qp_wr, (int)cct->_conf->ms_async_rdma_send_buffers);
ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers" << dendl;
ldout(cct, 1) << __func__ << " device allow " << device_attr->max_cqe
<< " completion entries" << dendl;
memory_manager = new MemoryManager(this, pd,
cct->_conf->ms_async_rdma_enable_hugepage);
memory_manager->register_rx_tx(
cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr);
srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT);
post_channel_cluster();
tx_cq = create_comp_queue(cct, tx_cc);
assert(tx_cq);
rx_cq = create_comp_queue(cct, rx_cc);
assert(rx_cq);
initialized = true;
ldout(cct, 5) << __func__ << ":" << __LINE__ << " device " << *this << " is initialized" << dendl;
}
void Device::uninit()
{
Mutex::Locker l(lock);
if (!initialized)
return;
tx_cc->ack_events();
rx_cc->ack_events();
initialized = false;
delete rx_cq;
delete tx_cq;
delete rx_cc;
delete tx_cc;
assert(ibv_destroy_srq(srq) == 0);
delete memory_manager;
delete pd;
}
Device::~Device()
{
uninit();
if (active_port) {
delete active_port;
assert(ibv_close_device(ctxt) == 0);
@ -148,9 +225,159 @@ void Device::binding_port(CephContext *cct, int port_num) {
}
}
/**
* Create a new QueuePair. This factory should be used in preference to
* the QueuePair constructor directly, since this lets derivatives of
* Infiniband, e.g. MockInfiniband (if it existed),
* return mocked out QueuePair derivatives.
*
* \return
* QueuePair on success or NULL if init fails
* See QueuePair::QueuePair for parameter documentation.
*/
Infiniband::QueuePair* Device::create_queue_pair(CephContext *cct,
ibv_qp_type type)
{
Infiniband::QueuePair *qp = new QueuePair(
cct, *this, type, active_port->get_port_num(), srq, tx_cq, rx_cq, max_send_wr, max_recv_wr);
if (qp->init()) {
delete qp;
return NULL;
}
return qp;
}
DeviceList::DeviceList(CephContext *cct)
: device_list(ibv_get_device_list(&num))
/**
* Create a shared receive queue. This basically wraps the verbs call.
*
* \param[in] max_wr
* The max number of outstanding work requests in the SRQ.
* \param[in] max_sge
* The max number of scatter elements per WR.
* \return
* A valid ibv_srq pointer, or NULL on error.
*/
ibv_srq* Device::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge)
{
ibv_srq_init_attr sia;
memset(&sia, 0, sizeof(sia));
sia.srq_context = ctxt;
sia.attr.max_wr = max_wr;
sia.attr.max_sge = max_sge;
return ibv_create_srq(pd->pd, &sia);
}
Infiniband::CompletionChannel* Device::create_comp_channel(CephContext *c)
{
Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this);
if (cc->init()) {
delete cc;
return NULL;
}
return cc;
}
Infiniband::CompletionQueue* Device::create_comp_queue(
CephContext *cct, CompletionChannel *cc)
{
Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(
cct, *this, CQ_DEPTH, cc);
if (cq->init()) {
delete cq;
return NULL;
}
return cq;
}
int Device::post_chunk(Chunk* chunk)
{
ibv_sge isge;
isge.addr = reinterpret_cast<uint64_t>(chunk->buffer);
isge.length = chunk->bytes;
isge.lkey = chunk->mr->lkey;
ibv_recv_wr rx_work_request;
memset(&rx_work_request, 0, sizeof(rx_work_request));
rx_work_request.wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr
rx_work_request.next = NULL;
rx_work_request.sg_list = &isge;
rx_work_request.num_sge = 1;
ibv_recv_wr *badWorkRequest;
int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest);
if (ret)
return -errno;
return 0;
}
int Device::post_channel_cluster()
{
vector<Chunk*> free_chunks;
int r = memory_manager->get_channel_buffers(free_chunks, 0);
assert(r > 0);
for (vector<Chunk*>::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) {
r = post_chunk(*iter);
assert(r == 0);
}
return 0;
}
int Device::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
{
return memory_manager->get_send_buffers(c, bytes);
}
int Device::poll_tx_cq(int n, ibv_wc *wc)
{
if (!initialized)
return 0;
return tx_cq->poll_cq(n, wc);
}
int Device::poll_rx_cq(int n, ibv_wc *wc)
{
if (!initialized)
return 0;
return rx_cq->poll_cq(n, wc);
}
void Device::rearm_cqs()
{
int ret;
if (!initialized)
return;
ret = tx_cq->rearm_notify();
assert(!ret);
ret = rx_cq->rearm_notify();
assert(!ret);
}
void Device::handle_async_event()
{
ibv_async_event async_event;
ldout(cct, 30) << __func__ << dendl;
while (!ibv_get_async_event(ctxt, &async_event)) {
infiniband->process_async_event(async_event);
ibv_ack_async_event(&async_event);
}
if (errno != EAGAIN) {
lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
<< " " << cpp_strerror(errno) << ")" << dendl;
}
}
DeviceList::DeviceList(CephContext *cct, Infiniband *ib)
: cct(cct), device_list(ibv_get_device_list(&num))
{
if (device_list == NULL || num == 0) {
lderr(cct) << __func__ << " failed to get rdma device list. " << cpp_strerror(errno) << dendl;
@ -158,13 +385,29 @@ DeviceList::DeviceList(CephContext *cct)
}
devices = new Device*[num];
for (int i = 0;i < num; ++i) {
devices[i] = new Device(cct, device_list[i]);
poll_fds = new struct pollfd[2 * num];
for (int i = 0; i < num; ++i) {
struct pollfd *pfd = &poll_fds[i * 2];
struct Device *d;
d = new Device(cct, ib, device_list[i]);
devices[i] = d;
pfd[0].fd = d->tx_cc->get_fd();
pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
pfd[0].revents = 0;
pfd[1].fd = d->rx_cc->get_fd();
pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
pfd[1].revents = 0;
}
}
DeviceList::~DeviceList()
{
delete poll_fds;
for (int i=0; i < num; ++i) {
delete devices[i];
}
@ -182,3 +425,73 @@ Device* DeviceList::get_device(const char* device_name)
}
return NULL;
}
int DeviceList::poll_tx(int num_entries, Device **d, ibv_wc *wc)
{
int n = 0;
for (int i = 0; i < num; i++) {
*d = devices[++last_poll_dev % num];
n = (*d)->poll_tx_cq(num_entries, wc);
if (n)
break;
}
return n;
}
int DeviceList::poll_rx(int num_entries, Device **d, ibv_wc *wc)
{
int n = 0;
for (int i = 0; i < num; i++) {
*d = devices[++last_poll_dev % num];
n = (*d)->poll_rx_cq(num_entries, wc);
if (n)
break;
}
return n;
}
int DeviceList::poll_blocking(bool &done)
{
int r = 0;
while (!done && r == 0) {
r = poll(poll_fds, num * 2, 100);
if (r < 0) {
r = -errno;
lderr(cct) << __func__ << " poll failed " << r << dendl;
ceph_abort();
}
}
if (r <= 0)
return r;
for (int i = 0; i < num ; i++) {
Device *d = devices[i];
if (d->tx_cc->get_cq_event())
ldout(cct, 20) << __func__ << " " << *d << ": got tx cq event" << dendl;
if (d->rx_cc->get_cq_event())
ldout(cct, 20) << __func__ << " " << *d << ": got rx cq event" << dendl;
}
return r;
}
void DeviceList::rearm_notify()
{
for (int i = 0; i < num; i++)
devices[i]->rearm_cqs();
}
void DeviceList::handle_async_event()
{
for (int i = 0; i < num; i++)
devices[i]->handle_async_event();
}

View File

@ -29,6 +29,15 @@
#include "msg/msg_types.h"
#include "msg/async/net_handler.h"
#include "common/Mutex.h"
#include "msg/async/Event.h"
typedef Infiniband::QueuePair QueuePair;
typedef Infiniband::CompletionChannel CompletionChannel;
typedef Infiniband::CompletionQueue CompletionQueue;
typedef Infiniband::ProtectionDomain ProtectionDomain;
typedef Infiniband::MemoryManager::Cluster Cluster;
typedef Infiniband::MemoryManager::Chunk Chunk;
typedef Infiniband::MemoryManager MemoryManager;
class Port {
struct ibv_context* ctxt;
@ -49,33 +58,100 @@ class Port {
class Device {
class C_handle_cq_async : public EventCallback {
Device *device;
public:
C_handle_cq_async(Device *d): device(d) {}
void do_request(int fd) {
device->handle_async_event();
}
};
CephContext *cct;
ibv_device *device;
const char* name;
const char *name;
uint8_t port_cnt;
uint32_t max_send_wr;
uint32_t max_recv_wr;
uint32_t max_sge;
Mutex lock; // Protects from concurrent intialization of the device
bool initialized = false;
EventCallbackRef async_handler;
Infiniband *infiniband;
public:
explicit Device(CephContext *c, ibv_device* d);
explicit Device(CephContext *c, Infiniband *ib, ibv_device* d);
~Device();
const char* get_name() { return name;}
void init();
void uninit();
void handle_async_event();
const char* get_name() const { return name;}
uint16_t get_lid() { return active_port->get_lid(); }
ibv_gid get_gid() { return active_port->get_gid(); }
int get_gid_idx() { return active_port->get_gid_idx(); }
void binding_port(CephContext *c, int port_num);
QueuePair* create_queue_pair(CephContext *c, ibv_qp_type type);
ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
CompletionChannel *create_comp_channel(CephContext *c);
CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
int post_chunk(Chunk* chunk);
int post_channel_cluster();
MemoryManager* get_memory_manager() { return memory_manager; }
bool is_tx_buffer(const char* c) { return memory_manager->is_tx_buffer(c);}
bool is_rx_buffer(const char* c) { return memory_manager->is_rx_buffer(c);}
Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); }
int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes);
int poll_tx_cq(int n, ibv_wc *wc);
int poll_rx_cq(int n, ibv_wc *wc);
void rearm_cqs();
struct ibv_context *ctxt;
ibv_device_attr *device_attr;
Port* active_port;
MemoryManager* memory_manager = nullptr;
ibv_srq *srq = nullptr;
Infiniband::CompletionQueue *rx_cq = nullptr;
Infiniband::CompletionChannel *rx_cc = nullptr;
Infiniband::CompletionQueue *tx_cq = nullptr;
Infiniband::CompletionChannel *tx_cc = nullptr;
ProtectionDomain *pd = nullptr;
};
inline ostream& operator<<(ostream& out, const Device &d)
{
return out << d.get_name();
}
class DeviceList {
CephContext *cct;
struct ibv_device ** device_list;
int num;
Device** devices;
int last_poll_dev;
struct pollfd *poll_fds;
public:
DeviceList(CephContext *cct);
DeviceList(CephContext *cct, Infiniband *ib);
~DeviceList();
Device* get_device(const char* device_name);
void rearm_notify();
int poll_tx(int n, Device **d, ibv_wc *wc);
int poll_rx(int n, Device **d, ibv_wc *wc);
int poll_blocking(bool &done);
void handle_async_event();
};
#endif

View File

@ -15,6 +15,7 @@
*/
#include "Infiniband.h"
#include "RDMAStack.h"
#include "Device.h"
#include "common/errno.h"
@ -24,21 +25,19 @@
#undef dout_prefix
#define dout_prefix *_dout << "Infiniband "
static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1;
static const uint32_t MAX_INLINE_DATA = 0;
static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
static const uint32_t CQ_DEPTH = 30000;
Infiniband::QueuePair::QueuePair(
CephContext *c, Infiniband& infiniband, ibv_qp_type type,
CephContext *c, Device &device, ibv_qp_type type,
int port, ibv_srq *srq,
Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key)
: cct(c), infiniband(infiniband),
: cct(c), ibdev(device),
type(type),
ctxt(infiniband.device->ctxt),
ctxt(ibdev.ctxt),
ib_physical_port(port),
pd(infiniband.pd->pd),
pd(ibdev.pd->pd),
srq(srq),
qp(NULL),
txcq(txcq),
@ -54,7 +53,6 @@ Infiniband::QueuePair::QueuePair(
lderr(cct) << __func__ << " invalid queue pair type" << cpp_strerror(errno) << dendl;
ceph_abort();
}
pd = infiniband.pd->pd;
}
int Infiniband::QueuePair::init()
@ -229,8 +227,8 @@ bool Infiniband::QueuePair::is_error() const
}
Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Infiniband &ib)
: cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0)
Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Device &ibdev)
: cct(c), ibdev(ibdev), channel(NULL), cq(NULL), cq_events_that_need_ack(0)
{
}
@ -247,7 +245,7 @@ Infiniband::CompletionChannel::~CompletionChannel()
int Infiniband::CompletionChannel::init()
{
ldout(cct, 20) << __func__ << " started." << dendl;
channel = ibv_create_comp_channel(infiniband.device->ctxt);
channel = ibv_create_comp_channel(ibdev.ctxt);
if (!channel) {
lderr(cct) << __func__ << " failed to create receive completion channel: "
<< cpp_strerror(errno) << dendl;
@ -303,7 +301,7 @@ Infiniband::CompletionQueue::~CompletionQueue()
int Infiniband::CompletionQueue::init()
{
cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0);
cq = ibv_create_cq(ibdev.ctxt, queue_depth, this, channel->get_channel(), 0);
if (!cq) {
lderr(cct) << __func__ << " failed to create receive completion queue: "
<< cpp_strerror(errno) << dendl;
@ -441,7 +439,7 @@ void Infiniband::MemoryManager::Chunk::clear()
void Infiniband::MemoryManager::Chunk::post_srq(Infiniband *ib)
{
ib->post_chunk(this);
ib->device->post_chunk(this);
}
Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
@ -596,150 +594,30 @@ int Infiniband::MemoryManager::get_channel_buffers(std::vector<Chunk*> &chunks,
Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
: device_list(new DeviceList(cct))
: device_list(new DeviceList(cct, this))
{
device = device_list->get_device(device_name.c_str());
device->init();
device->binding_port(cct, port_num);
assert(device);
ib_physical_port = device->active_port->get_port_num();
pd = new ProtectionDomain(cct, device);
assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
max_recv_wr = device->device_attr->max_srq_wr;
if (max_recv_wr > cct->_conf->ms_async_rdma_receive_buffers) {
max_recv_wr = cct->_conf->ms_async_rdma_receive_buffers;
ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl;
} else {
ldout(cct, 1) << __func__ << " using the max allowed receive buffers: " << max_recv_wr << dendl;
}
max_send_wr = device->device_attr->max_qp_wr;
if (max_send_wr > cct->_conf->ms_async_rdma_send_buffers) {
max_send_wr = cct->_conf->ms_async_rdma_send_buffers;
ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers" << dendl;
} else {
ldout(cct, 1) << __func__ << " using the max allowed send buffers: " << max_send_wr << dendl;
}
ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe
<< " completion entries" << dendl;
memory_manager = new MemoryManager(device, pd,
cct->_conf->ms_async_rdma_enable_hugepage);
memory_manager->register_rx_tx(
cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr);
srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT);
post_channel_cluster();
}
Infiniband::~Infiniband()
{
assert(ibv_destroy_srq(srq) == 0);
delete memory_manager;
delete pd;
if (dispatcher)
dispatcher->polling_stop();
delete device_list;
}
/**
* Create a shared receive queue. This basically wraps the verbs call.
*
* \param[in] max_wr
* The max number of outstanding work requests in the SRQ.
* \param[in] max_sge
* The max number of scatter elements per WR.
* \return
* A valid ibv_srq pointer, or NULL on error.
*/
ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge)
void Infiniband::set_dispatcher(RDMADispatcher *d)
{
ibv_srq_init_attr sia;
memset(&sia, 0, sizeof(sia));
sia.srq_context = device->ctxt;
sia.attr.max_wr = max_wr;
sia.attr.max_sge = max_sge;
return ibv_create_srq(pd->pd, &sia);
}
assert(!d ^ !dispatcher);
int Infiniband::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
{
return memory_manager->get_send_buffers(c, bytes);
}
/**
* Create a new QueuePair. This factory should be used in preference to
* the QueuePair constructor directly, since this lets derivatives of
* Infiniband, e.g. MockInfiniband (if it existed),
* return mocked out QueuePair derivatives.
*
* \return
* QueuePair on success or NULL if init fails
* See QueuePair::QueuePair for parameter documentation.
*/
Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type)
{
Infiniband::QueuePair *qp = new QueuePair(
cct, *this, type, ib_physical_port, srq, tx, rx, max_send_wr, max_recv_wr);
if (qp->init()) {
delete qp;
return NULL;
}
return qp;
}
int Infiniband::post_chunk(Chunk* chunk)
{
ibv_sge isge;
isge.addr = reinterpret_cast<uint64_t>(chunk->buffer);
isge.length = chunk->bytes;
isge.lkey = chunk->mr->lkey;
ibv_recv_wr rx_work_request;
memset(&rx_work_request, 0, sizeof(rx_work_request));
rx_work_request.wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr
rx_work_request.next = NULL;
rx_work_request.sg_list = &isge;
rx_work_request.num_sge = 1;
ibv_recv_wr *badWorkRequest;
int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest);
if (ret)
return -errno;
return 0;
}
int Infiniband::post_channel_cluster()
{
vector<Chunk*> free_chunks;
int r = memory_manager->get_channel_buffers(free_chunks, 0);
assert(r > 0);
for (vector<Chunk*>::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) {
r = post_chunk(*iter);
assert(r == 0);
}
return 0;
}
Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c)
{
Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this);
if (cc->init()) {
delete cc;
return NULL;
}
return cc;
}
Infiniband::CompletionQueue* Infiniband::create_comp_queue(
CephContext *cct, CompletionChannel *cc)
{
Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(
cct, *this, CQ_DEPTH, cc);
if (cq->init()) {
delete cq;
return NULL;
}
return cq;
dispatcher = d;
}
// 1 means no valid buffer read, 0 means got enough buffer
@ -890,3 +768,43 @@ const char* Infiniband::qp_state_string(int status) {
default: return " out of range.";
}
}
void Infiniband::handle_pre_fork()
{
device->uninit();
}
void Infiniband::handle_post_fork()
{
device->init();
}
int Infiniband::poll_tx(int n, Device **d, ibv_wc *wc)
{
return device_list->poll_tx(n, d, wc);
}
int Infiniband::poll_rx(int n, Device **d, ibv_wc *wc)
{
return device_list->poll_rx(n, d, wc);
}
int Infiniband::poll_blocking(bool &done)
{
return device_list->poll_blocking(done);
}
void Infiniband::rearm_notify()
{
device_list->rearm_notify();
}
void Infiniband::handle_async_event()
{
device_list->handle_async_event();
}
void Infiniband::process_async_event(ibv_async_event &async_event)
{
dispatcher->process_async_event(async_event);
}

View File

@ -47,6 +47,7 @@ class CephContext;
class Port;
class Device;
class DeviceList;
class RDMADispatcher;
class Infiniband {
public:
@ -141,15 +142,11 @@ class Infiniband {
};
private:
uint32_t max_send_wr;
uint32_t max_recv_wr;
uint32_t max_sge;
uint8_t ib_physical_port;
MemoryManager* memory_manager;
ibv_srq* srq; // shared receive work queue
Device *device;
ProtectionDomain *pd;
DeviceList *device_list;
RDMADispatcher *dispatcher = nullptr;
void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
@ -157,16 +154,18 @@ class Infiniband {
explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p);
~Infiniband();
void set_dispatcher(RDMADispatcher *d);
class CompletionChannel {
static const uint32_t MAX_ACK_EVENT = 5000;
CephContext *cct;
Infiniband& infiniband;
Device &ibdev;
ibv_comp_channel *channel;
ibv_cq *cq;
uint32_t cq_events_that_need_ack;
public:
CompletionChannel(CephContext *c, Infiniband &ib);
CompletionChannel(CephContext *c, Device &ibdev);
~CompletionChannel();
int init();
bool get_cq_event();
@ -182,9 +181,9 @@ class Infiniband {
// You need to call init and it will create a cq and associate to comp channel
class CompletionQueue {
public:
CompletionQueue(CephContext *c, Infiniband &ib,
CompletionQueue(CephContext *c, Device &ibdev,
const uint32_t qd, CompletionChannel *cc)
: cct(c), infiniband(ib), channel(cc), cq(NULL), queue_depth(qd) {}
: cct(c), ibdev(ibdev), channel(cc), cq(NULL), queue_depth(qd) {}
~CompletionQueue();
int init();
int poll_cq(int num_entries, ibv_wc *ret_wc_array);
@ -194,7 +193,7 @@ class Infiniband {
CompletionChannel* get_cc() const { return channel; }
private:
CephContext *cct;
Infiniband& infiniband; // Infiniband to which this QP belongs
Device &ibdev;
CompletionChannel *channel;
ibv_cq *cq;
uint32_t queue_depth;
@ -208,7 +207,7 @@ class Infiniband {
// must call plumb() to bring the queue pair to the RTS state.
class QueuePair {
public:
QueuePair(CephContext *c, Infiniband& infiniband, ibv_qp_type type,
QueuePair(CephContext *c, Device &device, ibv_qp_type type,
int ib_physical_port, ibv_srq *srq,
Infiniband::CompletionQueue* txcq,
Infiniband::CompletionQueue* rxcq,
@ -255,7 +254,7 @@ class Infiniband {
private:
CephContext *cct;
Infiniband& infiniband; // Infiniband to which this QP belongs
Device &ibdev; // Infiniband to which this QP belongs
ibv_qp_type type; // QP type (IBV_QPT_RC, etc.)
ibv_context* ctxt; // device context of the HCA to use
int ib_physical_port;
@ -274,23 +273,22 @@ class Infiniband {
public:
typedef MemoryManager::Cluster Cluster;
typedef MemoryManager::Chunk Chunk;
QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, ibv_qp_type type);
ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
int post_chunk(Chunk* chunk);
int post_channel_cluster();
int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes);
CompletionChannel *create_comp_channel(CephContext *c);
CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
uint8_t get_ib_physical_port() { return ib_physical_port; }
int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
MemoryManager* get_memory_manager() { return memory_manager; }
Device* get_device() { return device; }
bool is_tx_buffer(const char* c) { return memory_manager->is_tx_buffer(c);}
bool is_rx_buffer(const char* c) { return memory_manager->is_rx_buffer(c);}
Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); }
static const char* wc_status_to_string(int status);
static const char* qp_state_string(int status);
void handle_pre_fork();
void handle_post_fork();
int poll_tx(int n, Device **d, ibv_wc *wc);
int poll_rx(int n, Device **d, ibv_wc *wc);
int poll_blocking(bool &done);
void rearm_notify();
void handle_async_event();
void process_async_event(ibv_async_event &async_event);
};
#endif

View File

@ -31,8 +31,7 @@ RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* i
ibdev = ib->get_device();
ibport = ib->get_ib_physical_port();
qp = infiniband->create_queue_pair(
cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC);
qp = ibdev->create_queue_pair(cct, IBV_QPT_RC);
my_msg.qpn = qp->get_local_qp_number();
my_msg.psn = qp->get_initial_psn();
my_msg.lid = ibdev->get_lid();
@ -57,11 +56,11 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
error = ECONNRESET;
int ret = 0;
for (unsigned i=0; i < wc.size(); ++i) {
ret = infiniband->post_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
ret = ibdev->post_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
assert(ret == 0);
}
for (unsigned i=0; i < buffers.size(); ++i) {
ret = infiniband->post_chunk(buffers[i]);
ret = ibdev->post_chunk(buffers[i]);
assert(ret == 0);
}
}
@ -282,7 +281,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
error = ECONNRESET;
ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
}
assert(infiniband->post_chunk(chunk) == 0);
assert(ibdev->post_chunk(chunk) == 0);
} else {
if (read == (ssize_t)len) {
buffers.push_back(chunk);
@ -293,7 +292,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
} else {
read += chunk->read(buf+read, response->byte_len);
assert(infiniband->post_chunk(chunk) == 0);
assert(ibdev->post_chunk(chunk) == 0);
}
}
}
@ -320,7 +319,7 @@ ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
read += tmp;
ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl;
if ((*c)->over()) {
assert(infiniband->post_chunk(*c) == 0);
assert(ibdev->post_chunk(*c) == 0);
ldout(cct, 25) << __func__ << " one chunk over." << dendl;
}
if (read == len) {
@ -458,7 +457,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
unsigned total = 0;
unsigned need_reserve_bytes = 0;
while (it != pending_bl.buffers().end()) {
if (infiniband->is_tx_buffer(it->raw_c_str())) {
if (ibdev->is_tx_buffer(it->raw_c_str())) {
if (need_reserve_bytes) {
unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
total += copied;
@ -467,7 +466,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
need_reserve_bytes = 0;
}
assert(copy_it == it);
tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str()));
tx_buffers.push_back(ibdev->get_tx_chunk_by_buffer(it->raw_c_str()));
total += it->length();
++copy_it;
} else {

View File

@ -14,7 +14,6 @@
*
*/
#include <poll.h>
#include <sys/time.h>
#include <sys/resource.h>
@ -32,37 +31,22 @@ static Tub<Infiniband> global_infiniband;
RDMADispatcher::~RDMADispatcher()
{
done = true;
t.join();
polling_stop();
ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
global_infiniband->set_dispatcher(nullptr);
assert(qp_conns.empty());
assert(num_qp_conn == 0);
assert(dead_queue_pairs.empty());
assert(num_dead_queue_pair == 0);
tx_cc->ack_events();
rx_cc->ack_events();
delete tx_cq;
delete rx_cq;
delete tx_cc;
delete rx_cc;
delete async_handler;
}
RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
: cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
: cct(c), lock("RDMADispatcher::lock"),
w_lock("RDMADispatcher::for worker pending list"), stack(s)
{
tx_cc = global_infiniband->create_comp_channel(c);
assert(tx_cc);
rx_cc = global_infiniband->create_comp_channel(c);
assert(rx_cc);
tx_cq = global_infiniband->create_comp_queue(c, tx_cc);
assert(tx_cq);
rx_cq = global_infiniband->create_comp_queue(c, rx_cc);
assert(rx_cq);
PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
@ -89,44 +73,46 @@ RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
perf_logger = plb.create_perf_counters();
cct->get_perfcounters_collection()->add(perf_logger);
t = std::thread(&RDMADispatcher::polling, this);
cct->register_fork_watcher(this);
}
void RDMADispatcher::handle_async_event()
void RDMADispatcher::polling_start()
{
ldout(cct, 30) << __func__ << dendl;
while (1) {
ibv_async_event async_event;
if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) {
if (errno != EAGAIN)
lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
<< " " << cpp_strerror(errno) << ")" << dendl;
return;
}
perf_logger->inc(l_msgr_rdma_total_async_events);
// FIXME: Currently we must ensure no other factor make QP in ERROR state,
// otherwise this qp can't be deleted in current cleanup flow.
if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
uint64_t qpn = async_event.element.qp->qp_num;
ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
<< " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
Mutex::Locker l(lock);
RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
if (!conn) {
ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
} else {
ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
conn->fault();
erase_qpn(qpn);
}
t = std::thread(&RDMADispatcher::polling, this);
}
void RDMADispatcher::polling_stop()
{
if (!t.joinable())
return;
done = true;
t.join();
}
void RDMADispatcher::process_async_event(ibv_async_event &async_event)
{
perf_logger->inc(l_msgr_rdma_total_async_events);
// FIXME: Currently we must ensure no other factor make QP in ERROR state,
// otherwise this qp can't be deleted in current cleanup flow.
if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
uint64_t qpn = async_event.element.qp->qp_num;
ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
<< " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
Mutex::Locker l(lock);
RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
if (!conn) {
ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
} else {
ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
<< " evt: " << ibv_event_type_str(async_event.event_type)
<< dendl;
ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
conn->fault();
erase_qpn_lockless(qpn);
}
ibv_ack_async_event(&async_event);
} else {
ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
<< " evt: " << ibv_event_type_str(async_event.event_type)
<< dendl;
}
}
@ -137,23 +123,24 @@ void RDMADispatcher::polling()
std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
std::vector<ibv_wc> tx_cqe;
ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl;
RDMAConnectedSocketImpl *conn = nullptr;
utime_t last_inactive = ceph_clock_now();
bool rearmed = false;
int r = 0;
while (true) {
int tx_ret = tx_cq->poll_cq(MAX_COMPLETIONS, wc);
Device *ibdev;
int tx_ret = global_infiniband->poll_tx(MAX_COMPLETIONS, &ibdev, wc);
if (tx_ret > 0) {
ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret
<< " responses."<< dendl;
handle_tx_event(wc, tx_ret);
handle_tx_event(ibdev, wc, tx_ret);
}
int rx_ret = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
int rx_ret = global_infiniband->poll_rx(MAX_COMPLETIONS, &ibdev, wc);
if (rx_ret > 0) {
ldout(cct, 20) << __func__ << " rt completion queue got " << rx_ret
ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret
<< " responses."<< dendl;
perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret);
@ -168,8 +155,8 @@ void RDMADispatcher::polling()
if (response->status == IBV_WC_SUCCESS) {
conn = get_conn_lockless(response->qp_num);
if (!conn) {
assert(global_infiniband->is_rx_buffer(chunk->buffer));
r = global_infiniband->post_chunk(chunk);
assert(ibdev->is_rx_buffer(chunk->buffer));
r = ibdev->post_chunk(chunk);
ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
assert(r == 0);
} else {
@ -179,9 +166,9 @@ void RDMADispatcher::polling()
perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
<< ") status(" << response->status << ":"
<< global_infiniband->wc_status_to_string(response->status) << ")" << dendl;
assert(global_infiniband->is_rx_buffer(chunk->buffer));
r = global_infiniband->post_chunk(chunk);
<< Infiniband::wc_status_to_string(response->status) << ")" << dendl;
assert(ibdev->is_rx_buffer(chunk->buffer));
r = ibdev->post_chunk(chunk);
if (r) {
ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(r) << dendl;
assert(r == 0);
@ -219,37 +206,21 @@ void RDMADispatcher::polling()
break;
if ((ceph_clock_now() - last_inactive).to_nsec() / 1000 > cct->_conf->ms_async_rdma_polling_us) {
handle_async_event();
global_infiniband->handle_async_event();
if (!rearmed) {
// Clean up cq events after rearm notify ensure no new incoming event
// arrived between polling and rearm
tx_cq->rearm_notify();
rx_cq->rearm_notify();
global_infiniband->rearm_notify();
rearmed = true;
continue;
}
struct pollfd channel_poll[2];
channel_poll[0].fd = tx_cc->get_fd();
channel_poll[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
channel_poll[0].revents = 0;
channel_poll[1].fd = rx_cc->get_fd();
channel_poll[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
channel_poll[1].revents = 0;
r = 0;
perf_logger->set(l_msgr_rdma_polling, 0);
while (!done && r == 0) {
r = poll(channel_poll, 2, 100);
if (r < 0) {
r = -errno;
lderr(cct) << __func__ << " poll failed " << r << dendl;
ceph_abort();
}
}
if (r > 0 && tx_cc->get_cq_event())
ldout(cct, 20) << __func__ << " got tx cq event." << dendl;
if (r > 0 && rx_cc->get_cq_event())
ldout(cct, 20) << __func__ << " got rx cq event." << dendl;
r = global_infiniband->poll_blocking(done);
if (r > 0)
ldout(cct, 20) << __func__ << " got a cq event." << dendl;
last_inactive = ceph_clock_now();
perf_logger->set(l_msgr_rdma_polling, 1);
rearmed = false;
@ -295,9 +266,8 @@ RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp)
return it->second.second;
}
void RDMADispatcher::erase_qpn(uint32_t qpn)
void RDMADispatcher::erase_qpn_lockless(uint32_t qpn)
{
Mutex::Locker l(lock);
auto it = qp_conns.find(qpn);
if (it == qp_conns.end())
return ;
@ -307,41 +277,36 @@ void RDMADispatcher::erase_qpn(uint32_t qpn)
--num_qp_conn;
}
void RDMADispatcher::erase_qpn(uint32_t qpn)
{
Mutex::Locker l(lock);
erase_qpn_lockless(qpn);
}
void RDMADispatcher::handle_pre_fork()
{
done = true;
t.join();
polling_stop();
done = false;
tx_cc->ack_events();
rx_cc->ack_events();
delete tx_cq;
delete rx_cq;
delete tx_cc;
delete rx_cc;
global_infiniband->handle_pre_fork();
global_infiniband.destroy();
}
void RDMADispatcher::handle_post_fork()
{
if (!global_infiniband)
if (!global_infiniband) {
global_infiniband.construct(
cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
global_infiniband->set_dispatcher(this);
}
tx_cc = global_infiniband->create_comp_channel(cct);
assert(tx_cc);
rx_cc = global_infiniband->create_comp_channel(cct);
assert(rx_cc);
tx_cq = global_infiniband->create_comp_queue(cct, tx_cc);
assert(tx_cq);
rx_cq = global_infiniband->create_comp_queue(cct, rx_cc);
assert(rx_cq);
global_infiniband->handle_post_fork();
t = std::thread(&RDMADispatcher::polling, this);
polling_start();
}
void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
void RDMADispatcher::handle_tx_event(Device *ibdev, ibv_wc *cqe, int n)
{
std::vector<Chunk*> tx_chunks;
@ -380,14 +345,14 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
}
// FIXME: why not tx?
if (global_infiniband->get_memory_manager()->is_tx_buffer(chunk->buffer))
if (ibdev->get_memory_manager()->is_tx_buffer(chunk->buffer))
tx_chunks.push_back(chunk);
else
ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl;
}
perf_logger->inc(l_msgr_rdma_tx_total_wc, n);
post_tx_buffer(tx_chunks);
post_tx_buffer(ibdev, tx_chunks);
}
/**
@ -398,13 +363,13 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
* \return
* 0 if success or -1 for failure
*/
void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
void RDMADispatcher::post_tx_buffer(Device *ibdev, std::vector<Chunk*> &chunks)
{
if (chunks.empty())
return ;
inflight -= chunks.size();
global_infiniband->get_memory_manager()->return_tx(chunks);
ibdev->get_memory_manager()->return_tx(chunks);
ldout(cct, 30) << __func__ << " release " << chunks.size()
<< " chunks, inflight " << inflight << dendl;
notify_pending_workers();
@ -476,10 +441,12 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co
int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
{
Device *ibdev = o->get_device();
assert(center.in_thread());
int r = global_infiniband->get_tx_buffers(c, bytes);
int r = ibdev->get_tx_buffers(c, bytes);
assert(r >= 0);
size_t got = global_infiniband->get_memory_manager()->get_tx_buffer_size() * r;
size_t got = ibdev->get_memory_manager()->get_tx_buffer_size() * r;
ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl;
stack->get_dispatcher()->inflight += r;
if (got == bytes)
@ -543,6 +510,9 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
dispatcher = new RDMADispatcher(cct, this);
global_infiniband->set_dispatcher(dispatcher);
dispatcher->polling_start();
unsigned num = get_num_worker();
for (unsigned i = 0; i < num; ++i) {
RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));

View File

@ -67,10 +67,6 @@ class RDMADispatcher : public CephContext::ForkWatcher {
std::thread t;
CephContext *cct;
Infiniband::CompletionQueue* tx_cq;
Infiniband::CompletionQueue* rx_cq;
Infiniband::CompletionChannel *tx_cc, *rx_cc;
EventCallbackRef async_handler;
bool done = false;
std::atomic<uint64_t> num_dead_queue_pair = {0};
std::atomic<uint64_t> num_qp_conn = {0};
@ -103,23 +99,18 @@ class RDMADispatcher : public CephContext::ForkWatcher {
std::list<RDMAWorker*> pending_workers;
RDMAStack* stack;
class C_handle_cq_async : public EventCallback {
RDMADispatcher *dispatcher;
public:
C_handle_cq_async(RDMADispatcher *w): dispatcher(w) {}
void do_request(int fd) {
// worker->handle_tx_event();
dispatcher->handle_async_event();
}
};
public:
PerfCounters *perf_logger;
explicit RDMADispatcher(CephContext* c, RDMAStack* s);
virtual ~RDMADispatcher();
void handle_async_event();
void process_async_event(ibv_async_event &async_event);
void polling_start();
void polling_stop();
void polling();
int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
void make_pending_worker(RDMAWorker* w) {
Mutex::Locker l(w_lock);
@ -130,14 +121,13 @@ class RDMADispatcher : public CephContext::ForkWatcher {
}
RDMAStack* get_stack() { return stack; }
RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
void erase_qpn_lockless(uint32_t qpn);
void erase_qpn(uint32_t qpn);
Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }
Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
void notify_pending_workers();
virtual void handle_pre_fork() override;
virtual void handle_post_fork() override;
void handle_tx_event(ibv_wc *cqe, int n);
void post_tx_buffer(std::vector<Chunk*> &chunks);
void handle_tx_event(Device *ibdev, ibv_wc *cqe, int n);
void post_tx_buffer(Device *ibdev, std::vector<Chunk*> &chunks);
std::atomic<uint64_t> inflight = {0};
};
@ -238,6 +228,8 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
RDMAWorker *w);
virtual ~RDMAConnectedSocketImpl();
Device *get_device() { return ibdev; }
void pass_wc(std::vector<ibv_wc> &&v);
void get_wc(std::vector<ibv_wc> &w);
virtual int is_connected() override { return connected; }