msg/async/rdma: fixes crash in fio

fio creates multiple CephContext in a single process.
Crash(es) happen because rdma stack has a global resources that
are still used from one ceph context while have already been destroyed
by another context.

The commit removes global instances of RDMA dispatcher and infiniband
and makes them context (rdma stack) specific.

Signed-off-by: Adir Lev <adirl@mellanox.com>
Signed-off-by: Alex Mikheev <alexm@mellanox.com>
This commit is contained in:
Alex Mikheev 2017-06-12 08:32:38 +00:00 committed by Haomai Wang
parent 25fce20d63
commit 6b773887a3
4 changed files with 97 additions and 96 deletions

View File

@ -18,6 +18,8 @@
#include "common/errno.h"
#include "common/debug.h"
#include "RDMAStack.h"
#include <sys/time.h>
#include <sys/resource.h>
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
@ -776,9 +778,45 @@ int Infiniband::MemoryManager::get_send_buffers(std::vector<Chunk*> &c, size_t b
return send->get_buffers(c, bytes);
}
Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
: cct(cct), lock("IB lock"), device_name(device_name), port_num(port_num)
bool Infiniband::init_prereq = false;
void Infiniband::verify_prereq(CephContext *cct) {
//On RDMA MUST be called before fork
int rc = ibv_fork_init();
if (rc) {
lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl;
ceph_abort();
}
ldout(cct, 20) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage << dendl;
if (cct->_conf->ms_async_rdma_enable_hugepage){
rc = setenv("RDMAV_HUGEPAGES_SAFE","1",1);
ldout(cct, 20) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") << dendl;
if (rc) {
lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl;
ceph_abort();
}
}
//Check ulimit
struct rlimit limit;
getrlimit(RLIMIT_MEMLOCK, &limit);
if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) {
lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory."
" We recommend setting this parameter to infinity" << dendl;
}
init_prereq = true;
}
Infiniband::Infiniband(CephContext *cct)
: cct(cct), lock("IB lock"),
device_name(cct->_conf->ms_async_rdma_device_name),
port_num( cct->_conf->ms_async_rdma_port_num)
{
if (!init_prereq)
verify_prereq(cct);
ldout(cct, 20) << __func__ << " constructing Infiniband..." << dendl;
}
void Infiniband::init()
@ -836,7 +874,6 @@ void Infiniband::init()
srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT);
post_chunks_to_srq(rx_queue_len); //add to srq
dispatcher->polling_start();
}
Infiniband::~Infiniband()
@ -844,23 +881,11 @@ Infiniband::~Infiniband()
if (!initialized)
return;
if (dispatcher)
dispatcher->polling_stop();
ibv_destroy_srq(srq);
delete memory_manager;
delete pd;
}
void Infiniband::set_dispatcher(RDMADispatcher *d)
{
assert(!d ^ !dispatcher);
dispatcher = d;
if (dispatcher != nullptr)
MemoryManager::RxAllocator::set_perf_logger(dispatcher->perf_logger);
}
/**
* Create a shared receive queue. This basically wraps the verbs call.
*

View File

@ -321,7 +321,6 @@ class Infiniband {
Device *device = NULL;
ProtectionDomain *pd = NULL;
DeviceList *device_list = nullptr;
RDMADispatcher *dispatcher = nullptr;
void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
CephContext *cct;
@ -329,13 +328,13 @@ class Infiniband {
bool initialized = false;
const std::string &device_name;
uint8_t port_num;
static bool init_prereq;
public:
explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p);
explicit Infiniband(CephContext *c);
~Infiniband();
void init();
void set_dispatcher(RDMADispatcher *d);
static void verify_prereq(CephContext *cct);
class CompletionChannel {
static const uint32_t MAX_ACK_EVENT = 5000;

View File

@ -27,28 +27,17 @@
#undef dout_prefix
#define dout_prefix *_dout << "RDMAStack "
static Tub<Infiniband> global_infiniband;
RDMADispatcher::~RDMADispatcher()
{
done = true;
polling_stop();
ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
polling_stop();
assert(qp_conns.empty());
assert(num_qp_conn == 0);
assert(dead_queue_pairs.empty());
assert(num_dead_queue_pair == 0);
tx_cc->ack_events();
rx_cc->ack_events();
delete tx_cq;
delete rx_cq;
delete tx_cc;
delete rx_cc;
delete async_handler;
global_infiniband->set_dispatcher(nullptr);
}
RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
@ -86,13 +75,19 @@ RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
void RDMADispatcher::polling_start()
{
tx_cc = global_infiniband->create_comp_channel(cct);
// take lock because listen/connect can happen from different worker threads
Mutex::Locker l(lock);
if (t.joinable())
return; // dispatcher thread already running
tx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
assert(tx_cc);
rx_cc = global_infiniband->create_comp_channel(cct);
rx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
assert(rx_cc);
tx_cq = global_infiniband->create_comp_queue(cct, tx_cc);
tx_cq = get_stack()->get_infiniband().create_comp_queue(cct, tx_cc);
assert(tx_cq);
rx_cq = global_infiniband->create_comp_queue(cct, rx_cc);
rx_cq = get_stack()->get_infiniband().create_comp_queue(cct, rx_cc);
assert(rx_cq);
t = std::thread(&RDMADispatcher::polling, this);
@ -100,8 +95,18 @@ void RDMADispatcher::polling_start()
void RDMADispatcher::polling_stop()
{
if (t.joinable())
t.join();
done = true;
if (!t.joinable())
return;
t.join();
tx_cc->ack_events();
rx_cc->ack_events();
delete tx_cq;
delete rx_cq;
delete tx_cc;
delete rx_cc;
}
void RDMADispatcher::handle_async_event()
@ -109,7 +114,7 @@ void RDMADispatcher::handle_async_event()
ldout(cct, 30) << __func__ << dendl;
while (1) {
ibv_async_event async_event;
if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) {
if (ibv_get_async_event(get_stack()->get_infiniband().get_device()->ctxt, &async_event)) {
if (errno != EAGAIN)
lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
<< " " << cpp_strerror(errno) << ")" << dendl;
@ -133,7 +138,7 @@ void RDMADispatcher::handle_async_event()
erase_qpn_lockless(qpn);
}
} else {
ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << get_stack()->get_infiniband().get_device()->ctxt
<< " evt: " << ibv_event_type_str(async_event.event_type)
<< dendl;
}
@ -143,7 +148,7 @@ void RDMADispatcher::handle_async_event()
void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) {
Mutex::Locker l(lock);
global_infiniband->post_chunk_to_pool(chunk);
get_stack()->get_infiniband().post_chunk_to_pool(chunk);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
}
@ -176,7 +181,7 @@ void RDMADispatcher::polling()
perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret);
Mutex::Locker l(lock);//make sure connected socket alive when pass wc
global_infiniband->post_chunks_to_srq(rx_ret);
get_stack()->get_infiniband().post_chunks_to_srq(rx_ret);
for (int i = 0; i < rx_ret; ++i) {
ibv_wc* response = &wc[i];
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
@ -188,7 +193,7 @@ void RDMADispatcher::polling()
conn = get_conn_lockless(response->qp_num);
if (!conn) {
ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
global_infiniband->post_chunk_to_pool(chunk);
get_stack()->get_infiniband().post_chunk_to_pool(chunk);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
} else {
polled[conn].push_back(*response);
@ -199,12 +204,12 @@ void RDMADispatcher::polling()
ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
<< ") status(" << response->status << ":"
<< global_infiniband->wc_status_to_string(response->status) << ")" << dendl;
<< get_stack()->get_infiniband().wc_status_to_string(response->status) << ")" << dendl;
conn = get_conn_lockless(response->qp_num);
if (conn && conn->is_connected())
conn->fault();
global_infiniband->post_chunk_to_pool(chunk);
get_stack()->get_infiniband().post_chunk_to_pool(chunk);
}
}
for (auto &&i : polled)
@ -335,7 +340,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
ldout(cct, 25) << __func__ << " QP: " << response->qp_num
<< " len: " << response->byte_len << " , addr:" << chunk
<< " " << global_infiniband->wc_status_to_string(response->status) << dendl;
<< " " << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
if (response->status != IBV_WC_SUCCESS) {
perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
@ -350,7 +355,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
} else {
ldout(cct, 1) << __func__ << " send work request returned error for buffer("
<< response->wr_id << ") status(" << response->status << "): "
<< global_infiniband->wc_status_to_string(response->status) << dendl;
<< get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
}
Mutex::Locker l(lock);//make sure connected socket alive when pass wc
@ -366,7 +371,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
//TX completion may come either from regular send message or from 'fin' message.
//In the case of 'fin' wr_id points to the QueuePair.
if (global_infiniband->get_memory_manager()->is_tx_buffer(chunk->buffer)) {
if (get_stack()->get_infiniband().get_memory_manager()->is_tx_buffer(chunk->buffer)) {
tx_chunks.push_back(chunk);
} else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) {
ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl;
@ -394,7 +399,7 @@ void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
return ;
inflight -= chunks.size();
global_infiniband->get_memory_manager()->return_tx(chunks);
get_stack()->get_infiniband().get_memory_manager()->return_tx(chunks);
ldout(cct, 30) << __func__ << " release " << chunks.size()
<< " chunks, inflight " << inflight << dendl;
notify_pending_workers();
@ -432,15 +437,16 @@ RDMAWorker::~RDMAWorker()
void RDMAWorker::initialize()
{
if (!dispatcher) {
dispatcher = stack->get_dispatcher();
dispatcher = &stack->get_dispatcher();
}
}
int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
{
global_infiniband->init();
get_stack()->get_infiniband().init();
dispatcher->polling_start();
auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
auto p = new RDMAServerSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this, sa);
int r = p->listen(sa, opt);
if (r < 0) {
delete p;
@ -453,9 +459,10 @@ int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket
int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
{
global_infiniband->init();
get_stack()->get_infiniband().init();
dispatcher->polling_start();
RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this);
RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this);
int r = p->try_connect(addr, opts);
if (r < 0) {
@ -471,11 +478,11 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co
int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
{
assert(center.in_thread());
int r = global_infiniband->get_tx_buffers(c, bytes);
int r = get_stack()->get_infiniband().get_tx_buffers(c, bytes);
assert(r >= 0);
size_t got = global_infiniband->get_memory_manager()->get_tx_buffer_size() * r;
size_t got = get_stack()->get_infiniband().get_memory_manager()->get_tx_buffer_size() * r;
ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl;
stack->get_dispatcher()->inflight += r;
stack->get_dispatcher().inflight += r;
if (got >= bytes)
return r;
@ -513,50 +520,17 @@ void RDMAWorker::handle_pending_message()
dispatcher->notify_pending_workers();
}
RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
RDMAStack::RDMAStack(CephContext *cct, const string &t)
: NetworkStack(cct, t), ib(cct), dispatcher(cct, this)
{
//
//On RDMA MUST be called before fork
//
int rc = ibv_fork_init();
if (rc) {
lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl;
ceph_abort();
}
ldout(cct, 1) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage << dendl;
if (cct->_conf->ms_async_rdma_enable_hugepage) {
rc = setenv("RDMAV_HUGEPAGES_SAFE","1",1);
ldout(cct, 1) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") << dendl;
if (rc) {
lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl;
ceph_abort();
}
}
//Check ulimit
struct rlimit limit;
getrlimit(RLIMIT_MEMLOCK, &limit);
if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) {
lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory."
" We recommend setting this parameter to infinity" << dendl;
}
if (!global_infiniband)
global_infiniband.construct(
cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
dispatcher = new RDMADispatcher(cct, this);
global_infiniband->set_dispatcher(dispatcher);
unsigned num = get_num_worker();
for (unsigned i = 0; i < num; ++i) {
RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
w->set_stack(this);
}
ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl;
ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << &dispatcher << dendl;
}
RDMAStack::~RDMAStack()
@ -565,7 +539,7 @@ RDMAStack::~RDMAStack()
unsetenv("RDMAV_HUGEPAGES_SAFE"); //remove env variable on destruction
}
delete dispatcher;
dispatcher.polling_stop();
}
void RDMAStack::spawn_worker(unsigned i, std::function<void ()> &&func)

View File

@ -256,7 +256,9 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
class RDMAStack : public NetworkStack {
vector<std::thread> threads;
RDMADispatcher *dispatcher;
PerfCounters *perf_counter;
Infiniband ib;
RDMADispatcher dispatcher;
std::atomic<bool> fork_finished = {false};
@ -268,10 +270,11 @@ class RDMAStack : public NetworkStack {
virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
virtual void join_worker(unsigned i) override;
RDMADispatcher *get_dispatcher() { return dispatcher; }
RDMADispatcher &get_dispatcher() { return dispatcher; }
Infiniband &get_infiniband() { return ib; }
virtual bool is_ready() override { return fork_finished.load(); };
virtual void ready() override { fork_finished = true; };
};
#endif