diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc index cf9031e068d..a6842f0bf68 100644 --- a/src/msg/async/rdma/Infiniband.cc +++ b/src/msg/async/rdma/Infiniband.cc @@ -18,6 +18,8 @@ #include "common/errno.h" #include "common/debug.h" #include "RDMAStack.h" +#include +#include #define dout_subsys ceph_subsys_ms #undef dout_prefix @@ -776,9 +778,45 @@ int Infiniband::MemoryManager::get_send_buffers(std::vector &c, size_t b return send->get_buffers(c, bytes); } -Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num) - : cct(cct), lock("IB lock"), device_name(device_name), port_num(port_num) +static std::atomic init_prereq = {false}; + +void Infiniband::verify_prereq(CephContext *cct) { + + //On RDMA MUST be called before fork + int rc = ibv_fork_init(); + if (rc) { + lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl; + ceph_abort(); + } + + ldout(cct, 20) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage << dendl; + if (cct->_conf->ms_async_rdma_enable_hugepage){ + rc = setenv("RDMAV_HUGEPAGES_SAFE","1",1); + ldout(cct, 0) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") << dendl; + if (rc) { + lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl; + ceph_abort(); + } + } + + //Check ulimit + struct rlimit limit; + getrlimit(RLIMIT_MEMLOCK, &limit); + if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) { + lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory." + " We recommend setting this parameter to infinity" << dendl; + } + init_prereq = true; +} + +Infiniband::Infiniband(CephContext *cct) + : cct(cct), lock("IB lock"), + device_name(cct->_conf->ms_async_rdma_device_name), + port_num( cct->_conf->ms_async_rdma_port_num) { + if (!init_prereq) + verify_prereq(cct); + ldout(cct, 20) << __func__ << " constructing Infiniband..." << dendl; } void Infiniband::init() @@ -836,7 +874,6 @@ void Infiniband::init() srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT); post_chunks_to_srq(rx_queue_len); //add to srq - dispatcher->polling_start(); } Infiniband::~Infiniband() @@ -844,23 +881,11 @@ Infiniband::~Infiniband() if (!initialized) return; - if (dispatcher) - dispatcher->polling_stop(); - ibv_destroy_srq(srq); delete memory_manager; delete pd; } -void Infiniband::set_dispatcher(RDMADispatcher *d) -{ - assert(!d ^ !dispatcher); - - dispatcher = d; - if (dispatcher != nullptr) - MemoryManager::RxAllocator::set_perf_logger(dispatcher->perf_logger); -} - /** * Create a shared receive queue. This basically wraps the verbs call. * diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h index ee8fb275858..d9b196ba557 100644 --- a/src/msg/async/rdma/Infiniband.h +++ b/src/msg/async/rdma/Infiniband.h @@ -321,7 +321,6 @@ class Infiniband { Device *device = NULL; ProtectionDomain *pd = NULL; DeviceList *device_list = nullptr; - RDMADispatcher *dispatcher = nullptr; void wire_gid_to_gid(const char *wgid, union ibv_gid *gid); void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]); CephContext *cct; @@ -331,11 +330,10 @@ class Infiniband { uint8_t port_num; public: - explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p); + explicit Infiniband(CephContext *c); ~Infiniband(); void init(); - - void set_dispatcher(RDMADispatcher *d); + static void verify_prereq(CephContext *cct); class CompletionChannel { static const uint32_t MAX_ACK_EVENT = 5000; diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index 94e94dfeae7..4a1961033a3 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -28,28 +28,17 @@ #undef dout_prefix #define dout_prefix *_dout << "RDMAStack " -static Tub global_infiniband; - RDMADispatcher::~RDMADispatcher() { - done = true; - polling_stop(); ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl; + polling_stop(); assert(qp_conns.empty()); assert(num_qp_conn == 0); assert(dead_queue_pairs.empty()); assert(num_dead_queue_pair == 0); - tx_cc->ack_events(); - rx_cc->ack_events(); - delete tx_cq; - delete rx_cq; - delete tx_cc; - delete rx_cc; delete async_handler; - - global_infiniband->set_dispatcher(nullptr); } RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s) @@ -88,13 +77,19 @@ RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s) void RDMADispatcher::polling_start() { - tx_cc = global_infiniband->create_comp_channel(cct); + // take lock because listen/connect can happen from different worker threads + Mutex::Locker l(lock); + + if (t.joinable()) + return; // dispatcher thread already running + + tx_cc = get_stack()->get_infiniband().create_comp_channel(cct); assert(tx_cc); - rx_cc = global_infiniband->create_comp_channel(cct); + rx_cc = get_stack()->get_infiniband().create_comp_channel(cct); assert(rx_cc); - tx_cq = global_infiniband->create_comp_queue(cct, tx_cc); + tx_cq = get_stack()->get_infiniband().create_comp_queue(cct, tx_cc); assert(tx_cq); - rx_cq = global_infiniband->create_comp_queue(cct, rx_cc); + rx_cq = get_stack()->get_infiniband().create_comp_queue(cct, rx_cc); assert(rx_cq); t = std::thread(&RDMADispatcher::polling, this); @@ -102,8 +97,19 @@ void RDMADispatcher::polling_start() void RDMADispatcher::polling_stop() { - if (t.joinable()) - t.join(); + Mutex::Locker l(lock); + done = true; + if (!t.joinable()) + return; + + t.join(); + + tx_cc->ack_events(); + rx_cc->ack_events(); + delete tx_cq; + delete rx_cq; + delete tx_cc; + delete rx_cc; } void RDMADispatcher::handle_async_event() @@ -111,7 +117,7 @@ void RDMADispatcher::handle_async_event() ldout(cct, 30) << __func__ << dendl; while (1) { ibv_async_event async_event; - if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) { + if (ibv_get_async_event(get_stack()->get_infiniband().get_device()->ctxt, &async_event)) { if (errno != EAGAIN) lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno << " " << cpp_strerror(errno) << ")" << dendl; @@ -135,7 +141,7 @@ void RDMADispatcher::handle_async_event() erase_qpn_lockless(qpn); } } else { - ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt + ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << get_stack()->get_infiniband().get_device()->ctxt << " evt: " << ibv_event_type_str(async_event.event_type) << dendl; } @@ -145,7 +151,7 @@ void RDMADispatcher::handle_async_event() void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) { Mutex::Locker l(lock); - global_infiniband->post_chunk_to_pool(chunk); + get_stack()->get_infiniband().post_chunk_to_pool(chunk); perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); } @@ -178,7 +184,7 @@ void RDMADispatcher::polling() perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret); Mutex::Locker l(lock);//make sure connected socket alive when pass wc - global_infiniband->post_chunks_to_srq(rx_ret); + get_stack()->get_infiniband().post_chunks_to_srq(rx_ret); for (int i = 0; i < rx_ret; ++i) { ibv_wc* response = &wc[i]; Chunk* chunk = reinterpret_cast(response->wr_id); @@ -190,7 +196,7 @@ void RDMADispatcher::polling() conn = get_conn_lockless(response->qp_num); if (!conn) { ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl; - global_infiniband->post_chunk_to_pool(chunk); + get_stack()->get_infiniband().post_chunk_to_pool(chunk); perf_logger->dec(l_msgr_rdma_rx_bufs_in_use); } else { polled[conn].push_back(*response); @@ -201,12 +207,12 @@ void RDMADispatcher::polling() ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk << ") status(" << response->status << ":" - << global_infiniband->wc_status_to_string(response->status) << ")" << dendl; + << get_stack()->get_infiniband().wc_status_to_string(response->status) << ")" << dendl; conn = get_conn_lockless(response->qp_num); if (conn && conn->is_connected()) conn->fault(); - global_infiniband->post_chunk_to_pool(chunk); + get_stack()->get_infiniband().post_chunk_to_pool(chunk); } } for (auto &&i : polled) @@ -338,7 +344,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) Chunk* chunk = reinterpret_cast(response->wr_id); ldout(cct, 25) << __func__ << " QP: " << response->qp_num << " len: " << response->byte_len << " , addr:" << chunk - << " " << global_infiniband->wc_status_to_string(response->status) << dendl; + << " " << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl; if (response->status != IBV_WC_SUCCESS) { perf_logger->inc(l_msgr_rdma_tx_total_wc_errors); @@ -353,7 +359,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) } else { ldout(cct, 1) << __func__ << " send work request returned error for buffer(" << response->wr_id << ") status(" << response->status << "): " - << global_infiniband->wc_status_to_string(response->status) << dendl; + << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl; } Mutex::Locker l(lock);//make sure connected socket alive when pass wc @@ -369,7 +375,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n) //TX completion may come either from regular send message or from 'fin' message. //In the case of 'fin' wr_id points to the QueuePair. - if (global_infiniband->get_memory_manager()->is_tx_buffer(chunk->buffer)) { + if (get_stack()->get_infiniband().get_memory_manager()->is_tx_buffer(chunk->buffer)) { tx_chunks.push_back(chunk); } else if (reinterpret_cast(response->wr_id)->get_local_qp_number() == response->qp_num ) { ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl; @@ -397,7 +403,7 @@ void RDMADispatcher::post_tx_buffer(std::vector &chunks) return ; inflight -= chunks.size(); - global_infiniband->get_memory_manager()->return_tx(chunks); + get_stack()->get_infiniband().get_memory_manager()->return_tx(chunks); ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << inflight << dendl; notify_pending_workers(); @@ -435,15 +441,16 @@ RDMAWorker::~RDMAWorker() void RDMAWorker::initialize() { if (!dispatcher) { - dispatcher = stack->get_dispatcher(); + dispatcher = &stack->get_dispatcher(); } } int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock) { - global_infiniband->init(); + get_stack()->get_infiniband().init(); + dispatcher->polling_start(); - auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa); + auto p = new RDMAServerSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this, sa); int r = p->listen(sa, opt); if (r < 0) { delete p; @@ -456,9 +463,10 @@ int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) { - global_infiniband->init(); + get_stack()->get_infiniband().init(); + dispatcher->polling_start(); - RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this); + RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this); int r = p->try_connect(addr, opts); if (r < 0) { @@ -474,11 +482,11 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector &c, size_t bytes) { assert(center.in_thread()); - int r = global_infiniband->get_tx_buffers(c, bytes); + int r = get_stack()->get_infiniband().get_tx_buffers(c, bytes); assert(r >= 0); - size_t got = global_infiniband->get_memory_manager()->get_tx_buffer_size() * r; + size_t got = get_stack()->get_infiniband().get_memory_manager()->get_tx_buffer_size() * r; ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl; - stack->get_dispatcher()->inflight += r; + stack->get_dispatcher().inflight += r; if (got >= bytes) return r; @@ -516,50 +524,17 @@ void RDMAWorker::handle_pending_message() dispatcher->notify_pending_workers(); } -RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t) +RDMAStack::RDMAStack(CephContext *cct, const string &t) + : NetworkStack(cct, t), ib(cct), dispatcher(cct, this) { - // - //On RDMA MUST be called before fork - // - - int rc = ibv_fork_init(); - if (rc) { - lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl; - ceph_abort(); - } - - ldout(cct, 1) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage << dendl; - if (cct->_conf->ms_async_rdma_enable_hugepage) { - rc = setenv("RDMAV_HUGEPAGES_SAFE","1",1); - ldout(cct, 1) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") << dendl; - if (rc) { - lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl; - ceph_abort(); - } - } - - //Check ulimit - struct rlimit limit; - getrlimit(RLIMIT_MEMLOCK, &limit); - if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) { - lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory." - " We recommend setting this parameter to infinity" << dendl; - } - - if (!global_infiniband) - global_infiniband.construct( - cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num); ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl; - dispatcher = new RDMADispatcher(cct, this); - global_infiniband->set_dispatcher(dispatcher); unsigned num = get_num_worker(); for (unsigned i = 0; i < num; ++i) { RDMAWorker* w = dynamic_cast(get_worker(i)); w->set_stack(this); } - - ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl; + ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << &dispatcher << dendl; } RDMAStack::~RDMAStack() @@ -568,7 +543,7 @@ RDMAStack::~RDMAStack() unsetenv("RDMAV_HUGEPAGES_SAFE"); //remove env variable on destruction } - delete dispatcher; + dispatcher.polling_stop(); } void RDMAStack::spawn_worker(unsigned i, std::function &&func) diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index bbb97af1f77..764ea33f39e 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -256,7 +256,9 @@ class RDMAServerSocketImpl : public ServerSocketImpl { class RDMAStack : public NetworkStack { vector threads; - RDMADispatcher *dispatcher; + PerfCounters *perf_counter; + Infiniband ib; + RDMADispatcher dispatcher; std::atomic fork_finished = {false}; @@ -268,10 +270,11 @@ class RDMAStack : public NetworkStack { virtual void spawn_worker(unsigned i, std::function &&func) override; virtual void join_worker(unsigned i) override; - RDMADispatcher *get_dispatcher() { return dispatcher; } - + RDMADispatcher &get_dispatcher() { return dispatcher; } + Infiniband &get_infiniband() { return ib; } virtual bool is_ready() override { return fork_finished.load(); }; virtual void ready() override { fork_finished = true; }; }; + #endif