mirror of
https://github.com/ceph/ceph
synced 2025-02-20 17:37:29 +00:00
Merge pull request #16981 from yuyuyu101/wip_rdma_mt_fix
msg/async/rdma: fixes crash for multi rados client within one process
This commit is contained in:
commit
02a212c9a3
@ -18,6 +18,8 @@
|
||||
#include "common/errno.h"
|
||||
#include "common/debug.h"
|
||||
#include "RDMAStack.h"
|
||||
#include <sys/time.h>
|
||||
#include <sys/resource.h>
|
||||
|
||||
#define dout_subsys ceph_subsys_ms
|
||||
#undef dout_prefix
|
||||
@ -776,9 +778,45 @@ int Infiniband::MemoryManager::get_send_buffers(std::vector<Chunk*> &c, size_t b
|
||||
return send->get_buffers(c, bytes);
|
||||
}
|
||||
|
||||
Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
|
||||
: cct(cct), lock("IB lock"), device_name(device_name), port_num(port_num)
|
||||
static std::atomic<bool> init_prereq = {false};
|
||||
|
||||
void Infiniband::verify_prereq(CephContext *cct) {
|
||||
|
||||
//On RDMA MUST be called before fork
|
||||
int rc = ibv_fork_init();
|
||||
if (rc) {
|
||||
lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl;
|
||||
ceph_abort();
|
||||
}
|
||||
|
||||
ldout(cct, 20) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage << dendl;
|
||||
if (cct->_conf->ms_async_rdma_enable_hugepage){
|
||||
rc = setenv("RDMAV_HUGEPAGES_SAFE","1",1);
|
||||
ldout(cct, 0) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") << dendl;
|
||||
if (rc) {
|
||||
lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl;
|
||||
ceph_abort();
|
||||
}
|
||||
}
|
||||
|
||||
//Check ulimit
|
||||
struct rlimit limit;
|
||||
getrlimit(RLIMIT_MEMLOCK, &limit);
|
||||
if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) {
|
||||
lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory."
|
||||
" We recommend setting this parameter to infinity" << dendl;
|
||||
}
|
||||
init_prereq = true;
|
||||
}
|
||||
|
||||
Infiniband::Infiniband(CephContext *cct)
|
||||
: cct(cct), lock("IB lock"),
|
||||
device_name(cct->_conf->ms_async_rdma_device_name),
|
||||
port_num( cct->_conf->ms_async_rdma_port_num)
|
||||
{
|
||||
if (!init_prereq)
|
||||
verify_prereq(cct);
|
||||
ldout(cct, 20) << __func__ << " constructing Infiniband..." << dendl;
|
||||
}
|
||||
|
||||
void Infiniband::init()
|
||||
@ -836,7 +874,6 @@ void Infiniband::init()
|
||||
srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT);
|
||||
|
||||
post_chunks_to_srq(rx_queue_len); //add to srq
|
||||
dispatcher->polling_start();
|
||||
}
|
||||
|
||||
Infiniband::~Infiniband()
|
||||
@ -844,23 +881,11 @@ Infiniband::~Infiniband()
|
||||
if (!initialized)
|
||||
return;
|
||||
|
||||
if (dispatcher)
|
||||
dispatcher->polling_stop();
|
||||
|
||||
ibv_destroy_srq(srq);
|
||||
delete memory_manager;
|
||||
delete pd;
|
||||
}
|
||||
|
||||
void Infiniband::set_dispatcher(RDMADispatcher *d)
|
||||
{
|
||||
assert(!d ^ !dispatcher);
|
||||
|
||||
dispatcher = d;
|
||||
if (dispatcher != nullptr)
|
||||
MemoryManager::RxAllocator::set_perf_logger(dispatcher->perf_logger);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a shared receive queue. This basically wraps the verbs call.
|
||||
*
|
||||
|
@ -321,7 +321,6 @@ class Infiniband {
|
||||
Device *device = NULL;
|
||||
ProtectionDomain *pd = NULL;
|
||||
DeviceList *device_list = nullptr;
|
||||
RDMADispatcher *dispatcher = nullptr;
|
||||
void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
|
||||
void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
|
||||
CephContext *cct;
|
||||
@ -331,11 +330,10 @@ class Infiniband {
|
||||
uint8_t port_num;
|
||||
|
||||
public:
|
||||
explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p);
|
||||
explicit Infiniband(CephContext *c);
|
||||
~Infiniband();
|
||||
void init();
|
||||
|
||||
void set_dispatcher(RDMADispatcher *d);
|
||||
static void verify_prereq(CephContext *cct);
|
||||
|
||||
class CompletionChannel {
|
||||
static const uint32_t MAX_ACK_EVENT = 5000;
|
||||
|
@ -28,28 +28,17 @@
|
||||
#undef dout_prefix
|
||||
#define dout_prefix *_dout << "RDMAStack "
|
||||
|
||||
static Tub<Infiniband> global_infiniband;
|
||||
|
||||
RDMADispatcher::~RDMADispatcher()
|
||||
{
|
||||
done = true;
|
||||
polling_stop();
|
||||
ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
|
||||
polling_stop();
|
||||
|
||||
assert(qp_conns.empty());
|
||||
assert(num_qp_conn == 0);
|
||||
assert(dead_queue_pairs.empty());
|
||||
assert(num_dead_queue_pair == 0);
|
||||
|
||||
tx_cc->ack_events();
|
||||
rx_cc->ack_events();
|
||||
delete tx_cq;
|
||||
delete rx_cq;
|
||||
delete tx_cc;
|
||||
delete rx_cc;
|
||||
delete async_handler;
|
||||
|
||||
global_infiniband->set_dispatcher(nullptr);
|
||||
}
|
||||
|
||||
RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
|
||||
@ -88,13 +77,19 @@ RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
|
||||
|
||||
void RDMADispatcher::polling_start()
|
||||
{
|
||||
tx_cc = global_infiniband->create_comp_channel(cct);
|
||||
// take lock because listen/connect can happen from different worker threads
|
||||
Mutex::Locker l(lock);
|
||||
|
||||
if (t.joinable())
|
||||
return; // dispatcher thread already running
|
||||
|
||||
tx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
|
||||
assert(tx_cc);
|
||||
rx_cc = global_infiniband->create_comp_channel(cct);
|
||||
rx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
|
||||
assert(rx_cc);
|
||||
tx_cq = global_infiniband->create_comp_queue(cct, tx_cc);
|
||||
tx_cq = get_stack()->get_infiniband().create_comp_queue(cct, tx_cc);
|
||||
assert(tx_cq);
|
||||
rx_cq = global_infiniband->create_comp_queue(cct, rx_cc);
|
||||
rx_cq = get_stack()->get_infiniband().create_comp_queue(cct, rx_cc);
|
||||
assert(rx_cq);
|
||||
|
||||
t = std::thread(&RDMADispatcher::polling, this);
|
||||
@ -102,8 +97,19 @@ void RDMADispatcher::polling_start()
|
||||
|
||||
void RDMADispatcher::polling_stop()
|
||||
{
|
||||
if (t.joinable())
|
||||
t.join();
|
||||
Mutex::Locker l(lock);
|
||||
done = true;
|
||||
if (!t.joinable())
|
||||
return;
|
||||
|
||||
t.join();
|
||||
|
||||
tx_cc->ack_events();
|
||||
rx_cc->ack_events();
|
||||
delete tx_cq;
|
||||
delete rx_cq;
|
||||
delete tx_cc;
|
||||
delete rx_cc;
|
||||
}
|
||||
|
||||
void RDMADispatcher::handle_async_event()
|
||||
@ -111,7 +117,7 @@ void RDMADispatcher::handle_async_event()
|
||||
ldout(cct, 30) << __func__ << dendl;
|
||||
while (1) {
|
||||
ibv_async_event async_event;
|
||||
if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) {
|
||||
if (ibv_get_async_event(get_stack()->get_infiniband().get_device()->ctxt, &async_event)) {
|
||||
if (errno != EAGAIN)
|
||||
lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
|
||||
<< " " << cpp_strerror(errno) << ")" << dendl;
|
||||
@ -135,7 +141,7 @@ void RDMADispatcher::handle_async_event()
|
||||
erase_qpn_lockless(qpn);
|
||||
}
|
||||
} else {
|
||||
ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
|
||||
ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << get_stack()->get_infiniband().get_device()->ctxt
|
||||
<< " evt: " << ibv_event_type_str(async_event.event_type)
|
||||
<< dendl;
|
||||
}
|
||||
@ -145,7 +151,7 @@ void RDMADispatcher::handle_async_event()
|
||||
|
||||
void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) {
|
||||
Mutex::Locker l(lock);
|
||||
global_infiniband->post_chunk_to_pool(chunk);
|
||||
get_stack()->get_infiniband().post_chunk_to_pool(chunk);
|
||||
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
|
||||
}
|
||||
|
||||
@ -178,7 +184,7 @@ void RDMADispatcher::polling()
|
||||
perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret);
|
||||
|
||||
Mutex::Locker l(lock);//make sure connected socket alive when pass wc
|
||||
global_infiniband->post_chunks_to_srq(rx_ret);
|
||||
get_stack()->get_infiniband().post_chunks_to_srq(rx_ret);
|
||||
for (int i = 0; i < rx_ret; ++i) {
|
||||
ibv_wc* response = &wc[i];
|
||||
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
|
||||
@ -190,7 +196,7 @@ void RDMADispatcher::polling()
|
||||
conn = get_conn_lockless(response->qp_num);
|
||||
if (!conn) {
|
||||
ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
|
||||
global_infiniband->post_chunk_to_pool(chunk);
|
||||
get_stack()->get_infiniband().post_chunk_to_pool(chunk);
|
||||
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
|
||||
} else {
|
||||
polled[conn].push_back(*response);
|
||||
@ -201,12 +207,12 @@ void RDMADispatcher::polling()
|
||||
|
||||
ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
|
||||
<< ") status(" << response->status << ":"
|
||||
<< global_infiniband->wc_status_to_string(response->status) << ")" << dendl;
|
||||
<< get_stack()->get_infiniband().wc_status_to_string(response->status) << ")" << dendl;
|
||||
conn = get_conn_lockless(response->qp_num);
|
||||
if (conn && conn->is_connected())
|
||||
conn->fault();
|
||||
|
||||
global_infiniband->post_chunk_to_pool(chunk);
|
||||
get_stack()->get_infiniband().post_chunk_to_pool(chunk);
|
||||
}
|
||||
}
|
||||
for (auto &&i : polled)
|
||||
@ -338,7 +344,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
|
||||
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
|
||||
ldout(cct, 25) << __func__ << " QP: " << response->qp_num
|
||||
<< " len: " << response->byte_len << " , addr:" << chunk
|
||||
<< " " << global_infiniband->wc_status_to_string(response->status) << dendl;
|
||||
<< " " << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
|
||||
|
||||
if (response->status != IBV_WC_SUCCESS) {
|
||||
perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
|
||||
@ -353,7 +359,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
|
||||
} else {
|
||||
ldout(cct, 1) << __func__ << " send work request returned error for buffer("
|
||||
<< response->wr_id << ") status(" << response->status << "): "
|
||||
<< global_infiniband->wc_status_to_string(response->status) << dendl;
|
||||
<< get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
|
||||
}
|
||||
|
||||
Mutex::Locker l(lock);//make sure connected socket alive when pass wc
|
||||
@ -369,7 +375,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
|
||||
|
||||
//TX completion may come either from regular send message or from 'fin' message.
|
||||
//In the case of 'fin' wr_id points to the QueuePair.
|
||||
if (global_infiniband->get_memory_manager()->is_tx_buffer(chunk->buffer)) {
|
||||
if (get_stack()->get_infiniband().get_memory_manager()->is_tx_buffer(chunk->buffer)) {
|
||||
tx_chunks.push_back(chunk);
|
||||
} else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) {
|
||||
ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl;
|
||||
@ -397,7 +403,7 @@ void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
|
||||
return ;
|
||||
|
||||
inflight -= chunks.size();
|
||||
global_infiniband->get_memory_manager()->return_tx(chunks);
|
||||
get_stack()->get_infiniband().get_memory_manager()->return_tx(chunks);
|
||||
ldout(cct, 30) << __func__ << " release " << chunks.size()
|
||||
<< " chunks, inflight " << inflight << dendl;
|
||||
notify_pending_workers();
|
||||
@ -435,15 +441,16 @@ RDMAWorker::~RDMAWorker()
|
||||
void RDMAWorker::initialize()
|
||||
{
|
||||
if (!dispatcher) {
|
||||
dispatcher = stack->get_dispatcher();
|
||||
dispatcher = &stack->get_dispatcher();
|
||||
}
|
||||
}
|
||||
|
||||
int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
|
||||
{
|
||||
global_infiniband->init();
|
||||
get_stack()->get_infiniband().init();
|
||||
dispatcher->polling_start();
|
||||
|
||||
auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
|
||||
auto p = new RDMAServerSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this, sa);
|
||||
int r = p->listen(sa, opt);
|
||||
if (r < 0) {
|
||||
delete p;
|
||||
@ -456,9 +463,10 @@ int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket
|
||||
|
||||
int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
|
||||
{
|
||||
global_infiniband->init();
|
||||
get_stack()->get_infiniband().init();
|
||||
dispatcher->polling_start();
|
||||
|
||||
RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this);
|
||||
RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this);
|
||||
int r = p->try_connect(addr, opts);
|
||||
|
||||
if (r < 0) {
|
||||
@ -474,11 +482,11 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co
|
||||
int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
|
||||
{
|
||||
assert(center.in_thread());
|
||||
int r = global_infiniband->get_tx_buffers(c, bytes);
|
||||
int r = get_stack()->get_infiniband().get_tx_buffers(c, bytes);
|
||||
assert(r >= 0);
|
||||
size_t got = global_infiniband->get_memory_manager()->get_tx_buffer_size() * r;
|
||||
size_t got = get_stack()->get_infiniband().get_memory_manager()->get_tx_buffer_size() * r;
|
||||
ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl;
|
||||
stack->get_dispatcher()->inflight += r;
|
||||
stack->get_dispatcher().inflight += r;
|
||||
if (got >= bytes)
|
||||
return r;
|
||||
|
||||
@ -516,50 +524,17 @@ void RDMAWorker::handle_pending_message()
|
||||
dispatcher->notify_pending_workers();
|
||||
}
|
||||
|
||||
RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
|
||||
RDMAStack::RDMAStack(CephContext *cct, const string &t)
|
||||
: NetworkStack(cct, t), ib(cct), dispatcher(cct, this)
|
||||
{
|
||||
//
|
||||
//On RDMA MUST be called before fork
|
||||
//
|
||||
|
||||
int rc = ibv_fork_init();
|
||||
if (rc) {
|
||||
lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl;
|
||||
ceph_abort();
|
||||
}
|
||||
|
||||
ldout(cct, 1) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage << dendl;
|
||||
if (cct->_conf->ms_async_rdma_enable_hugepage) {
|
||||
rc = setenv("RDMAV_HUGEPAGES_SAFE","1",1);
|
||||
ldout(cct, 1) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") << dendl;
|
||||
if (rc) {
|
||||
lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl;
|
||||
ceph_abort();
|
||||
}
|
||||
}
|
||||
|
||||
//Check ulimit
|
||||
struct rlimit limit;
|
||||
getrlimit(RLIMIT_MEMLOCK, &limit);
|
||||
if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) {
|
||||
lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory."
|
||||
" We recommend setting this parameter to infinity" << dendl;
|
||||
}
|
||||
|
||||
if (!global_infiniband)
|
||||
global_infiniband.construct(
|
||||
cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
|
||||
ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
|
||||
dispatcher = new RDMADispatcher(cct, this);
|
||||
global_infiniband->set_dispatcher(dispatcher);
|
||||
|
||||
unsigned num = get_num_worker();
|
||||
for (unsigned i = 0; i < num; ++i) {
|
||||
RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
|
||||
w->set_stack(this);
|
||||
}
|
||||
|
||||
ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl;
|
||||
ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << &dispatcher << dendl;
|
||||
}
|
||||
|
||||
RDMAStack::~RDMAStack()
|
||||
@ -568,7 +543,7 @@ RDMAStack::~RDMAStack()
|
||||
unsetenv("RDMAV_HUGEPAGES_SAFE"); //remove env variable on destruction
|
||||
}
|
||||
|
||||
delete dispatcher;
|
||||
dispatcher.polling_stop();
|
||||
}
|
||||
|
||||
void RDMAStack::spawn_worker(unsigned i, std::function<void ()> &&func)
|
||||
|
@ -256,7 +256,9 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
|
||||
|
||||
class RDMAStack : public NetworkStack {
|
||||
vector<std::thread> threads;
|
||||
RDMADispatcher *dispatcher;
|
||||
PerfCounters *perf_counter;
|
||||
Infiniband ib;
|
||||
RDMADispatcher dispatcher;
|
||||
|
||||
std::atomic<bool> fork_finished = {false};
|
||||
|
||||
@ -268,10 +270,11 @@ class RDMAStack : public NetworkStack {
|
||||
|
||||
virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
|
||||
virtual void join_worker(unsigned i) override;
|
||||
RDMADispatcher *get_dispatcher() { return dispatcher; }
|
||||
|
||||
RDMADispatcher &get_dispatcher() { return dispatcher; }
|
||||
Infiniband &get_infiniband() { return ib; }
|
||||
virtual bool is_ready() override { return fork_finished.load(); };
|
||||
virtual void ready() override { fork_finished = true; };
|
||||
};
|
||||
|
||||
|
||||
#endif
|
||||
|
Loading…
Reference in New Issue
Block a user