msg/async/rdma: improves RX buffer management

The commit adds following changes:
- rx buffers are allocated from the memory pool (boost::pool)
- flat memory layout for buffer data/metadata to reduce cpu cache misses
- number of receive buffers can be much larger than receive queue len
- post new buffers to the srq as soon as possible.
- stat counters

Signed-off-by: Alex Mikheev <alexm@mellanox.com>
This commit is contained in:
Adir Lev 2017-05-03 11:17:51 +00:00 committed by Alex Mikheev
parent f1b1e94494
commit 720d044db1
No known key found for this signature in database
GPG Key ID: 0F964A1EF0579522
7 changed files with 308 additions and 174 deletions

View File

@ -165,7 +165,10 @@ OPTION(ms_async_rdma_device_name, OPT_STR)
OPTION(ms_async_rdma_enable_hugepage, OPT_BOOL)
OPTION(ms_async_rdma_buffer_size, OPT_INT)
OPTION(ms_async_rdma_send_buffers, OPT_U32)
//size of the receive buffer pool, 0 is unlimited
OPTION(ms_async_rdma_receive_buffers, OPT_U32)
// max number of wr in srq
OPTION(ms_async_rdma_receive_queue_len, OPT_U32)
OPTION(ms_async_rdma_port_num, OPT_U32)
OPTION(ms_async_rdma_polling_us, OPT_U32)
OPTION(ms_async_rdma_local_gid, OPT_STR) // GID format: "fe80:0000:0000:0000:7efe:90ff:fe72:6efe", no zero folding

View File

@ -736,7 +736,11 @@ std::vector<Option> get_global_options() {
.set_description(""),
Option("ms_async_rdma_receive_buffers", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(1024)
.set_default(32768)
.set_description(""),
Option("ms_async_rdma_receive_queue_len", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(4096)
.set_description(""),
Option("ms_async_rdma_port_num", Option::TYPE_UINT, Option::LEVEL_ADVANCED)

View File

@ -150,7 +150,7 @@ Infiniband::QueuePair::QueuePair(
CephContext *c, Infiniband& infiniband, ibv_qp_type type,
int port, ibv_srq *srq,
Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key)
uint32_t tx_queue_len, uint32_t rx_queue_len, uint32_t q_key)
: cct(c), infiniband(infiniband),
type(type),
ctxt(infiniband.device->ctxt),
@ -161,8 +161,8 @@ Infiniband::QueuePair::QueuePair(
txcq(txcq),
rxcq(rxcq),
initial_psn(0),
max_send_wr(max_send_wr),
max_recv_wr(max_recv_wr),
max_send_wr(tx_queue_len),
max_recv_wr(rx_queue_len),
q_key(q_key),
dead(false)
{
@ -192,7 +192,7 @@ int Infiniband::QueuePair::init()
if (qp == NULL) {
lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;
if (errno == ENOMEM) {
lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_buffers, "
lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_queue_length, "
" ms_async_rdma_send_buffers or"
" ms_async_rdma_buffer_size" << dendl;
}
@ -554,11 +554,6 @@ void Infiniband::MemoryManager::Chunk::clear()
bound = 0;
}
void Infiniband::MemoryManager::Chunk::post_srq(Infiniband *ib)
{
ib->post_chunk(this);
}
Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
: manager(m), buffer_size(s), lock("cluster_lock")
{
@ -574,10 +569,7 @@ Infiniband::MemoryManager::Cluster::~Cluster()
}
::free(chunk_base);
if (manager.enabled_huge_page)
manager.free_huge_pages(base);
else
::free(base);
manager.free(base);
}
int Infiniband::MemoryManager::Cluster::fill(uint32_t num)
@ -585,11 +577,8 @@ int Infiniband::MemoryManager::Cluster::fill(uint32_t num)
assert(!base);
num_chunk = num;
uint32_t bytes = buffer_size * num;
if (manager.enabled_huge_page) {
base = (char*)manager.malloc_huge_pages(bytes);
} else {
base = (char*)memalign(CEPH_PAGE_SIZE, bytes);
}
base = (char*)manager.malloc(bytes);
end = base + bytes;
assert(base);
chunk_base = static_cast<Chunk*>(::malloc(sizeof(Chunk) * num));
@ -642,27 +631,96 @@ int Infiniband::MemoryManager::Cluster::get_buffers(std::vector<Chunk*> &chunks,
return r;
}
Infiniband::MemoryManager* Infiniband::MemoryManager::RxAllocator::manager = nullptr;
PerfCounters *Infiniband::MemoryManager::RxAllocator::perf_logger = nullptr;
Infiniband::MemoryManager::MemoryManager(Device *d, ProtectionDomain *p, bool hugepage)
: device(d), pd(p)
unsigned Infiniband::MemoryManager::RxAllocator::n_bufs_allocated = 0;
unsigned Infiniband::MemoryManager::RxAllocator::max_bufs = 0;
char *Infiniband::MemoryManager::RxAllocator::malloc(const size_type bytes)
{
enabled_huge_page = hugepage;
mem_info *m;
Chunk *ch;
size_t rx_buf_size;
unsigned nbufs;
rx_buf_size = sizeof(Chunk) + manager->cct->_conf->ms_async_rdma_buffer_size;
nbufs = bytes/rx_buf_size;
if (max_bufs > 0 && n_bufs_allocated + nbufs > max_bufs) {
return NULL;
}
m = static_cast<mem_info *>(manager->malloc(bytes + sizeof(*m)));
if (!m)
return NULL;
m->mr = ibv_reg_mr(manager->pd->pd, m->chunks, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
assert(m->mr);
m->nbufs = nbufs;
n_bufs_allocated += nbufs;
// note that the memory can be allocated before perf logger is set
if (perf_logger)
perf_logger->inc(l_msgr_rdma_rx_bufs_total, nbufs);
/* initialize chunks */
ch = m->chunks;
for (unsigned i = 0; i < nbufs; i++) {
ch->lkey = m->mr->lkey;
ch->bytes = manager->cct->_conf->ms_async_rdma_buffer_size;
ch->offset = 0;
ch->buffer = ch->data; // TODO: refactor tx and remove buffer
ch = reinterpret_cast<Chunk *>(reinterpret_cast<char *>(ch) + rx_buf_size);
}
return reinterpret_cast<char *>(m->chunks);
}
void Infiniband::MemoryManager::RxAllocator::free(char * const block)
{
mem_info *m;
m = reinterpret_cast<mem_info *>(block) - 1;
n_bufs_allocated -= m->nbufs;
if (perf_logger)
perf_logger->dec(l_msgr_rdma_rx_bufs_total, m->nbufs);
ibv_dereg_mr(m->mr);
manager->free(m);
}
Infiniband::MemoryManager::MemoryManager(CephContext *c, Device *d, ProtectionDomain *p)
: cct(c), device(d), pd(p),
rxbuf_pool(sizeof(Chunk) + c->_conf->ms_async_rdma_buffer_size,
c->_conf->ms_async_rdma_receive_buffers > 0 ?
// if possible make initial pool size 2 * receive_queue_len
// that way there will be no pool expansion upon receive of the
// first packet.
(c->_conf->ms_async_rdma_receive_buffers < 2 * c->_conf->ms_async_rdma_receive_queue_len ?
c->_conf->ms_async_rdma_receive_buffers : 2 * c->_conf->ms_async_rdma_receive_queue_len) :
// rx pool is infinite, we can set any initial size that we want
2 * c->_conf->ms_async_rdma_receive_queue_len)
{
RxAllocator::set_memory_manager(this);
// remember the setting because cct may not be available when
// global infiniband is destroyed
hp_enabled = cct->_conf->ms_async_rdma_enable_hugepage;
}
Infiniband::MemoryManager::~MemoryManager()
{
if (channel)
delete channel;
if (send)
delete send;
}
void* Infiniband::MemoryManager::malloc_huge_pages(size_t size)
void* Infiniband::MemoryManager::huge_pages_malloc(size_t size)
{
size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE);
char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0);
if (ptr == MAP_FAILED) {
ptr = (char *)malloc(real_size);
ptr = (char *)std::malloc(real_size);
if (ptr == NULL) return NULL;
real_size = 0;
}
@ -670,7 +728,7 @@ void* Infiniband::MemoryManager::malloc_huge_pages(size_t size)
return ptr + HUGE_PAGE_SIZE;
}
void Infiniband::MemoryManager::free_huge_pages(void *ptr)
void Infiniband::MemoryManager::huge_pages_free(void *ptr)
{
if (ptr == NULL) return;
void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE;
@ -679,15 +737,30 @@ void Infiniband::MemoryManager::free_huge_pages(void *ptr)
if (real_size != 0)
munmap(real_ptr, real_size);
else
free(real_ptr);
std::free(real_ptr);
}
void Infiniband::MemoryManager::register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num)
void* Infiniband::MemoryManager::malloc(size_t size)
{
if (hp_enabled)
return huge_pages_malloc(size);
else
return std::malloc(size);
}
void Infiniband::MemoryManager::free(void *ptr)
{
if (hp_enabled)
huge_pages_free(ptr);
else
std::free(ptr);
}
void Infiniband::MemoryManager::create_tx_pool(uint32_t size, uint32_t tx_num)
{
assert(device);
assert(pd);
channel = new Cluster(*this, size);
channel->fill(rx_num);
send = new Cluster(*this, size);
send->fill(tx_num);
@ -703,12 +776,6 @@ int Infiniband::MemoryManager::get_send_buffers(std::vector<Chunk*> &c, size_t b
return send->get_buffers(c, bytes);
}
int Infiniband::MemoryManager::get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes)
{
return channel->get_buffers(chunks, bytes);
}
Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
: cct(cct), lock("IB lock"), device_name(device_name), port_num(port_num)
{
@ -731,33 +798,44 @@ void Infiniband::init()
pd = new ProtectionDomain(cct, device);
assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
max_recv_wr = device->device_attr->max_srq_wr;
if (max_recv_wr > cct->_conf->ms_async_rdma_receive_buffers) {
max_recv_wr = cct->_conf->ms_async_rdma_receive_buffers;
ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl;
rx_queue_len = device->device_attr->max_srq_wr;
if (rx_queue_len > cct->_conf->ms_async_rdma_receive_queue_len) {
rx_queue_len = cct->_conf->ms_async_rdma_receive_queue_len;
ldout(cct, 1) << __func__ << " receive queue length is " << rx_queue_len << " receive buffers" << dendl;
} else {
ldout(cct, 1) << __func__ << " using the max allowed receive buffers: " << max_recv_wr << dendl;
ldout(cct, 0) << __func__ << " requested receive queue length " <<
cct->_conf->ms_async_rdma_receive_queue_len <<
" is too big. Setting " << rx_queue_len << dendl;
}
max_send_wr = device->device_attr->max_qp_wr;
if (max_send_wr > cct->_conf->ms_async_rdma_send_buffers) {
max_send_wr = cct->_conf->ms_async_rdma_send_buffers;
ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers" << dendl;
// check for the misconfiguration
if (cct->_conf->ms_async_rdma_receive_buffers > 0 &&
rx_queue_len > (unsigned)cct->_conf->ms_async_rdma_receive_buffers) {
lderr(cct) << __func__ << " rdma_receive_queue_len (" <<
rx_queue_len << ") > ms_async_rdma_receive_buffers(" <<
cct->_conf->ms_async_rdma_receive_buffers << ")." << dendl;
ceph_abort();
}
MemoryManager::RxAllocator::set_max_bufs(cct->_conf->ms_async_rdma_receive_buffers);
tx_queue_len = device->device_attr->max_qp_wr;
if (tx_queue_len > cct->_conf->ms_async_rdma_send_buffers) {
tx_queue_len = cct->_conf->ms_async_rdma_send_buffers;
ldout(cct, 1) << __func__ << " assigning: " << tx_queue_len << " send buffers" << dendl;
} else {
ldout(cct, 1) << __func__ << " using the max allowed send buffers: " << max_send_wr << dendl;
ldout(cct, 0) << __func__ << " using the max allowed send buffers: " << tx_queue_len << dendl;
}
ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe
<< " completion entries" << dendl;
memory_manager = new MemoryManager(device, pd,
cct->_conf->ms_async_rdma_enable_hugepage);
memory_manager->register_rx_tx(
cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr);
memory_manager = new MemoryManager(cct, device, pd);
memory_manager->create_tx_pool(cct->_conf->ms_async_rdma_buffer_size, tx_queue_len);
srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT);
post_channel_cluster();
srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT);
post_chunks_to_srq(rx_queue_len); //add to srq
dispatcher->polling_start();
}
@ -779,6 +857,8 @@ void Infiniband::set_dispatcher(RDMADispatcher *d)
assert(!d ^ !dispatcher);
dispatcher = d;
if (dispatcher != nullptr)
MemoryManager::RxAllocator::set_perf_logger(dispatcher->perf_logger);
}
/**
@ -819,7 +899,7 @@ int Infiniband::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type)
{
Infiniband::QueuePair *qp = new QueuePair(
cct, *this, type, ib_physical_port, srq, tx, rx, max_send_wr, max_recv_wr);
cct, *this, type, ib_physical_port, srq, tx, rx, tx_queue_len, rx_queue_len);
if (qp->init()) {
delete qp;
return NULL;
@ -827,37 +907,36 @@ Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, Completio
return qp;
}
int Infiniband::post_chunk(Chunk* chunk)
void Infiniband::post_chunks_to_srq(int num)
{
ibv_sge isge;
isge.addr = reinterpret_cast<uint64_t>(chunk->buffer);
isge.length = chunk->bytes;
isge.lkey = chunk->mr->lkey;
ibv_recv_wr rx_work_request;
int ret, i = 0;
ibv_sge isge[num];
Chunk *chunk;
ibv_recv_wr rx_work_request[num];
memset(&rx_work_request, 0, sizeof(rx_work_request));
rx_work_request.wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr
rx_work_request.next = NULL;
rx_work_request.sg_list = &isge;
rx_work_request.num_sge = 1;
while (i < num) {
chunk = get_memory_manager()->get_rx_buffer();
ibv_recv_wr *badWorkRequest;
int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest);
if (ret)
return -errno;
return 0;
}
assert (chunk != NULL);
int Infiniband::post_channel_cluster()
{
vector<Chunk*> free_chunks;
int r = memory_manager->get_channel_buffers(free_chunks, 0);
assert(r > 0);
for (vector<Chunk*>::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) {
r = post_chunk(*iter);
assert(r == 0);
isge[i].addr = reinterpret_cast<uint64_t>(chunk->data);
isge[i].length = chunk->bytes;
isge[i].lkey = chunk->lkey;
memset(&rx_work_request[i], 0, sizeof(rx_work_request[i]));
rx_work_request[i].wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr
if (i == num - 1) {
rx_work_request[i].next = 0;
} else {
rx_work_request[i].next = &rx_work_request[i+1];
}
rx_work_request[i].sg_list = &isge[i];
rx_work_request[i].num_sge = 1;
i++;
}
return 0;
ibv_recv_wr *badworkrequest;
ret = ibv_post_srq_recv(srq, &rx_work_request[0], &badworkrequest);
assert(ret == 0);
}
Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c)

View File

@ -17,6 +17,12 @@
#ifndef CEPH_INFINIBAND_H
#define CEPH_INFINIBAND_H
#include <boost/pool/pool.hpp>
// need this because boost messes with ceph log/assert definitions
#include <include/assert.h>
#include <infiniband/verbs.h>
#include <string>
#include <vector>
@ -27,6 +33,7 @@
#include "common/debug.h"
#include "common/errno.h"
#include "common/Mutex.h"
#include "common/perf_counters.h"
#include "msg/msg_types.h"
#include "msg/async/net_handler.h"
@ -121,6 +128,50 @@ class DeviceList {
}
};
// stat counters
enum {
l_msgr_rdma_dispatcher_first = 94000,
l_msgr_rdma_polling,
l_msgr_rdma_inflight_tx_chunks,
l_msgr_rdma_rx_bufs_in_use,
l_msgr_rdma_rx_bufs_total,
l_msgr_rdma_tx_total_wc,
l_msgr_rdma_tx_total_wc_errors,
l_msgr_rdma_tx_wc_retry_errors,
l_msgr_rdma_tx_wc_wr_flush_errors,
l_msgr_rdma_rx_total_wc,
l_msgr_rdma_rx_total_wc_errors,
l_msgr_rdma_rx_fin,
l_msgr_rdma_handshake_errors,
l_msgr_rdma_total_async_events,
l_msgr_rdma_async_last_wqe_events,
l_msgr_rdma_created_queue_pair,
l_msgr_rdma_active_queue_pair,
l_msgr_rdma_dispatcher_last,
};
enum {
l_msgr_rdma_first = 95000,
l_msgr_rdma_tx_no_mem,
l_msgr_rdma_tx_parital_mem,
l_msgr_rdma_tx_failed,
l_msgr_rdma_tx_chunks,
l_msgr_rdma_tx_bytes,
l_msgr_rdma_rx_chunks,
l_msgr_rdma_rx_bytes,
l_msgr_rdma_pending_sent_conns,
l_msgr_rdma_last,
};
class RDMADispatcher;
@ -152,14 +203,15 @@ class Infiniband {
bool full();
bool over();
void clear();
void post_srq(Infiniband *ib);
public:
ibv_mr* mr;
uint32_t lkey;
uint32_t bytes;
uint32_t bound;
uint32_t offset;
char* buffer;
char* buffer; // TODO: remove buffer/refactor TX
char data[0];
};
class Cluster {
@ -189,17 +241,46 @@ class Infiniband {
Chunk* chunk_base = nullptr;
};
MemoryManager(Device *d, ProtectionDomain *p, bool hugepage);
class RxAllocator {
struct mem_info {
ibv_mr *mr;
unsigned nbufs;
Chunk chunks[0];
};
static MemoryManager *manager;
static unsigned n_bufs_allocated;
static unsigned max_bufs;
static PerfCounters *perf_logger;
public:
typedef std::size_t size_type;
typedef std::ptrdiff_t difference_type;
static char * malloc(const size_type bytes);
static void free(char * const block);
static void set_memory_manager(MemoryManager *m) {
manager = m;
}
static void set_max_bufs(int n) {
max_bufs = n;
}
static void set_perf_logger(PerfCounters *logger) {
perf_logger = logger;
perf_logger->set(l_msgr_rdma_rx_bufs_total, n_bufs_allocated);
}
};
MemoryManager(CephContext *c, Device *d, ProtectionDomain *p);
~MemoryManager();
void* malloc_huge_pages(size_t size);
void free_huge_pages(void *ptr);
void register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num);
void* malloc(size_t size);
void free(void *ptr);
void create_tx_pool(uint32_t size, uint32_t tx_num);
void return_tx(std::vector<Chunk*> &chunks);
int get_send_buffers(std::vector<Chunk*> &c, size_t bytes);
int get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes);
bool is_tx_buffer(const char* c) { return send->is_my_buffer(c); }
bool is_rx_buffer(const char* c) { return channel->is_my_buffer(c); }
Chunk *get_tx_chunk_by_buffer(const char *c) {
return send->get_chunk_by_buffer(c);
}
@ -207,18 +288,32 @@ class Infiniband {
return send->buffer_size;
}
bool enabled_huge_page;
Chunk *get_rx_buffer() {
return reinterpret_cast<Chunk *>(rxbuf_pool.malloc());
}
void release_rx_buffer(Chunk *chunk) {
rxbuf_pool.free(chunk);
}
CephContext *cct;
private:
Cluster* channel;//RECV
// TODO: Cluster -> TxPool txbuf_pool
// chunk layout fix
//
Cluster* send;// SEND
Device *device;
ProtectionDomain *pd;
boost::pool<RxAllocator> rxbuf_pool;
bool hp_enabled;
void* huge_pages_malloc(size_t size);
void huge_pages_free(void *ptr);
};
private:
uint32_t max_send_wr = 0;
uint32_t max_recv_wr = 0;
uint32_t tx_queue_len = 0;
uint32_t rx_queue_len = 0;
uint32_t max_sge = 0;
uint8_t ib_physical_port = 0;
MemoryManager* memory_manager = nullptr;
@ -297,7 +392,7 @@ class Infiniband {
int ib_physical_port, ibv_srq *srq,
Infiniband::CompletionQueue* txcq,
Infiniband::CompletionQueue* rxcq,
uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key = 0);
uint32_t tx_queue_len, uint32_t max_recv_wr, uint32_t q_key = 0);
~QueuePair();
int init();
@ -361,8 +456,10 @@ class Infiniband {
typedef MemoryManager::Chunk Chunk;
QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, ibv_qp_type type);
ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
int post_chunk(Chunk* chunk);
int post_channel_cluster();
void post_chunks_to_srq(int);
void post_chunk_to_pool(Chunk* chunk) {
get_memory_manager()->release_rx_buffer(chunk);
}
int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes);
CompletionChannel *create_comp_channel(CephContext *c);
CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
@ -375,7 +472,6 @@ class Infiniband {
Device* get_device() { return device; }
int get_async_fd() { return device->ctxt->async_fd; }
bool is_tx_buffer(const char* c) { return memory_manager->is_tx_buffer(c);}
bool is_rx_buffer(const char* c) { return memory_manager->is_rx_buffer(c);}
Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); }
static const char* wc_status_to_string(int status);
static const char* qp_state_string(int status);

View File

@ -45,23 +45,20 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
cleanup();
worker->remove_pending_conn(this);
dispatcher->erase_qpn(my_msg.qpn);
for (unsigned i=0; i < wc.size(); ++i) {
dispatcher->post_chunk_to_pool(reinterpret_cast<Chunk*>(wc[i].wr_id));
}
for (unsigned i=0; i < buffers.size(); ++i) {
dispatcher->post_chunk_to_pool(buffers[i]);
}
Mutex::Locker l(lock);
if (notify_fd >= 0)
::close(notify_fd);
if (tcp_fd >= 0)
::close(tcp_fd);
error = ECONNRESET;
int ret = 0;
for (unsigned i=0; i < wc.size(); ++i) {
ret = infiniband->post_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
assert(ret == 0);
dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
}
for (unsigned i=0; i < buffers.size(); ++i) {
ret = infiniband->post_chunk(buffers[i]);
assert(ret == 0);
dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
}
}
void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
@ -290,8 +287,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
error = ECONNRESET;
ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
}
assert(infiniband->post_chunk(chunk) == 0);
dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
dispatcher->post_chunk_to_pool(chunk);
} else {
if (read == (ssize_t)len) {
buffers.push_back(chunk);
@ -302,8 +298,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
} else {
read += chunk->read(buf+read, response->byte_len);
assert(infiniband->post_chunk(chunk) == 0);
dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
dispatcher->post_chunk_to_pool(chunk);
}
}
}
@ -334,8 +329,7 @@ ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
read += tmp;
ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl;
if ((*c)->over()) {
assert(infiniband->post_chunk(*c) == 0);
dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
dispatcher->post_chunk_to_pool(*c);
ldout(cct, 25) << __func__ << " one chunk over." << dendl;
}
if (read == len) {

View File

@ -59,7 +59,8 @@ RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks");
plb.add_u64_counter(l_msgr_rdma_inqueue_rx_chunks, "inqueue_rx_chunks", "The number of inqueue rx chunks");
plb.add_u64_counter(l_msgr_rdma_rx_bufs_in_use, "rx_bufs_in_use", "The number of rx buffers that are holding data and being processed");
plb.add_u64_counter(l_msgr_rdma_rx_bufs_total, "rx_bufs_total", "The total number of rx buffers");
plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions");
plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors");
@ -140,6 +141,12 @@ void RDMADispatcher::handle_async_event()
}
}
void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) {
Mutex::Locker l(lock);
global_infiniband->post_chunk_to_pool(chunk);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
}
void RDMADispatcher::polling()
{
static int MAX_COMPLETIONS = 32;
@ -166,8 +173,10 @@ void RDMADispatcher::polling()
ldout(cct, 20) << __func__ << " rt completion queue got " << rx_ret
<< " responses."<< dendl;
perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret);
perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret);
Mutex::Locker l(lock);//make sure connected socket alive when pass wc
global_infiniband->post_chunks_to_srq(rx_ret);
for (int i = 0; i < rx_ret; ++i) {
ibv_wc* response = &wc[i];
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
@ -178,35 +187,28 @@ void RDMADispatcher::polling()
if (response->status == IBV_WC_SUCCESS) {
conn = get_conn_lockless(response->qp_num);
if (!conn) {
assert(global_infiniband->is_rx_buffer(chunk->buffer));
r = global_infiniband->post_chunk(chunk);
ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
assert(r == 0);
global_infiniband->post_chunk_to_pool(chunk);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
} else {
polled[conn].push_back(*response);
polled[conn].push_back(*response);
}
} else {
perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
<< ") status(" << response->status << ":"
<< global_infiniband->wc_status_to_string(response->status) << ")" << dendl;
assert(global_infiniband->is_rx_buffer(chunk->buffer));
r = global_infiniband->post_chunk(chunk);
if (r) {
ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(r) << dendl;
assert(r == 0);
}
conn = get_conn_lockless(response->qp_num);
if (conn && conn->is_connected())
conn->fault();
global_infiniband->post_chunk_to_pool(chunk);
}
}
for (auto &&i : polled) {
perf_logger->inc(l_msgr_rdma_inqueue_rx_chunks, i.second.size());
for (auto &&i : polled)
i.first->pass_wc(std::move(i.second));
}
polled.clear();
}
@ -411,7 +413,6 @@ RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer");
plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer");
plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted");
plb.add_u64_counter(l_msgr_rdma_rx_no_registered_mem, "rx_no_registered_mem", "The count of no registered buffer when receiving");
plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted");
plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted");

View File

@ -34,34 +34,6 @@ class RDMAServerSocketImpl;
class RDMAStack;
class RDMAWorker;
enum {
l_msgr_rdma_dispatcher_first = 94000,
l_msgr_rdma_polling,
l_msgr_rdma_inflight_tx_chunks,
l_msgr_rdma_inqueue_rx_chunks,
l_msgr_rdma_tx_total_wc,
l_msgr_rdma_tx_total_wc_errors,
l_msgr_rdma_tx_wc_retry_errors,
l_msgr_rdma_tx_wc_wr_flush_errors,
l_msgr_rdma_rx_total_wc,
l_msgr_rdma_rx_total_wc_errors,
l_msgr_rdma_rx_fin,
l_msgr_rdma_handshake_errors,
l_msgr_rdma_total_async_events,
l_msgr_rdma_async_last_wqe_events,
l_msgr_rdma_created_queue_pair,
l_msgr_rdma_active_queue_pair,
l_msgr_rdma_dispatcher_last,
};
class RDMADispatcher {
typedef Infiniband::MemoryManager::Chunk Chunk;
typedef Infiniband::QueuePair QueuePair;
@ -144,24 +116,9 @@ class RDMADispatcher {
void post_tx_buffer(std::vector<Chunk*> &chunks);
std::atomic<uint64_t> inflight = {0};
};
void post_chunk_to_pool(Chunk* chunk);
enum {
l_msgr_rdma_first = 95000,
l_msgr_rdma_tx_no_mem,
l_msgr_rdma_tx_parital_mem,
l_msgr_rdma_tx_failed,
l_msgr_rdma_rx_no_registered_mem,
l_msgr_rdma_tx_chunks,
l_msgr_rdma_tx_bytes,
l_msgr_rdma_rx_chunks,
l_msgr_rdma_rx_bytes,
l_msgr_rdma_pending_sent_conns,
l_msgr_rdma_last,
};
class RDMAWorker : public Worker {