msg/async/rdma: use shared_ptr to manage RDMADispatcher obj

1. Don't use bare pointer to manage RDMADispatcher obj.

2. access RDMADispatcher obj directly instead of accessing it
from RDMAStack. This could avoid caching RDMAStack obj in
RDMAWorker & RDMADispatcher.

Signed-off-by: Changcheng Liu <changcheng.liu@aliyun.com>
This commit is contained in:
Changcheng Liu 2019-08-07 15:08:15 +08:00 committed by Kefu Chai
parent 923b30f57e
commit 44a1820da8
6 changed files with 35 additions and 29 deletions

View File

@ -19,15 +19,16 @@
#undef dout_prefix
#define dout_prefix *_dout << " RDMAConnectedSocketImpl "
RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband> &ib, RDMADispatcher* s,
RDMAWorker *w)
RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband> &ib,
shared_ptr<RDMADispatcher>& rdma_dispatcher,
RDMAWorker *w)
: cct(cct), connected(0), error(0), ib(ib),
dispatcher(s), worker(w),
dispatcher(rdma_dispatcher), worker(w),
is_server(false), con_handler(new C_handle_connection(this)),
active(false), pending(false)
{
if (!cct->_conf->ms_async_rdma_cm) {
qp = ib->create_queue_pair(cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC, NULL);
qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(), dispatcher->get_rx_cq(), IBV_QPT_RC, NULL);
my_msg.qpn = qp->get_local_qp_number();
my_msg.psn = qp->get_initial_psn();
my_msg.lid = ib->get_lid();

View File

@ -7,9 +7,10 @@
#define TIMEOUT_MS 3000
#define RETRY_COUNT 7
RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher* s,
RDMAWorker *w, RDMACMInfo *info)
: RDMAConnectedSocketImpl(cct, ib, s, w), cm_con_handler(new C_handle_cm_connection(this))
RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
shared_ptr<RDMADispatcher>& rdma_dispatcher,
RDMAWorker *w, RDMACMInfo *info)
: RDMAConnectedSocketImpl(cct, ib, rdma_dispatcher, w), cm_con_handler(new C_handle_cm_connection(this))
{
status = IDLE;
notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);

View File

@ -9,8 +9,9 @@
RDMAIWARPServerSocketImpl::RDMAIWARPServerSocketImpl(
CephContext *cct, shared_ptr<Infiniband>& ib,
RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a, unsigned addr_slot)
: RDMAServerSocketImpl(cct, ib, s, w, a, addr_slot)
shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w,
entity_addr_t& a, unsigned addr_slot)
: RDMAServerSocketImpl(cct, ib, rdma_dispatcher, w, a, addr_slot)
{
}

View File

@ -25,11 +25,12 @@
#define dout_prefix *_dout << " RDMAServerSocketImpl "
RDMAServerSocketImpl::RDMAServerSocketImpl(
CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher *s, RDMAWorker *w,
entity_addr_t& a, unsigned slot)
CephContext *cct, shared_ptr<Infiniband>& ib,
shared_ptr<RDMADispatcher>& rdma_dispatcher,
RDMAWorker *w, entity_addr_t& a, unsigned slot)
: ServerSocketImpl(a.get_type(), slot),
cct(cct), net(cct), server_setup_socket(-1), ib(ib),
dispatcher(s), worker(w), sa(a)
dispatcher(rdma_dispatcher), worker(w), sa(a)
{
}

View File

@ -560,9 +560,7 @@ RDMAWorker::~RDMAWorker()
void RDMAWorker::initialize()
{
if (!dispatcher) {
dispatcher = &stack->get_dispatcher();
}
ceph_assert(dispatcher);
}
int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot,
@ -655,7 +653,8 @@ void RDMAWorker::handle_pending_message()
}
RDMAStack::RDMAStack(CephContext *cct, const string &t)
: NetworkStack(cct, t), ib(make_shared<Infiniband>(cct)), dispatcher(cct, ib)
: NetworkStack(cct, t), ib(make_shared<Infiniband>(cct)),
rdma_dispatcher(make_shared<RDMADispatcher>(cct, ib))
{
ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
@ -663,9 +662,10 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t)
for (unsigned i = 0; i < num; ++i) {
RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
w->set_stack(this);
w->set_dispatcher(rdma_dispatcher);
w->set_ib(ib);
}
ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << &dispatcher << dendl;
ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << rdma_dispatcher.get() << dendl;
}
RDMAStack::~RDMAStack()

View File

@ -135,7 +135,7 @@ class RDMAWorker : public Worker {
shared_ptr<Infiniband> ib;
EventCallbackRef tx_handler;
std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
RDMADispatcher* dispatcher = nullptr;
shared_ptr<RDMADispatcher> dispatcher;
ceph::mutex lock = ceph::make_mutex("RDMAWorker::lock");
class C_handle_cq_tx : public EventCallback {
@ -164,6 +164,7 @@ class RDMAWorker : public Worker {
}
void handle_pending_message();
void set_stack(RDMAStack *s) { stack = s; }
void set_dispatcher(shared_ptr<RDMADispatcher>& dispatcher) { this->dispatcher = dispatcher; }
void set_ib(shared_ptr<Infiniband> &ib) {this->ib = ib;}
void notify_worker() {
center.dispatch_event_external(tx_handler);
@ -192,7 +193,7 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
int connected;
int error;
shared_ptr<Infiniband> ib;
RDMADispatcher* dispatcher;
shared_ptr<RDMADispatcher> dispatcher;
RDMAWorker* worker;
std::vector<Chunk*> buffers;
int notify_fd = -1;
@ -216,8 +217,8 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
const decltype(std::cbegin(pending_bl.buffers()))& end);
public:
RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher* s
RDMAWorker *w);
RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w);
virtual ~RDMAConnectedSocketImpl();
void pass_wc(std::vector<ibv_wc> &&v);
@ -273,8 +274,8 @@ enum RDMA_CM_STATUS {
class RDMAIWARPConnectedSocketImpl : public RDMAConnectedSocketImpl {
public:
RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher* s,
RDMAWorker *w, RDMACMInfo *info = nullptr);
RDMAIWARPConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w, RDMACMInfo *info = nullptr);
~RDMAIWARPConnectedSocketImpl();
virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override;
virtual void close() override;
@ -314,12 +315,13 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
NetHandler net;
int server_setup_socket;
shared_ptr<Infiniband> ib;
RDMADispatcher *dispatcher;
shared_ptr<RDMADispatcher> dispatcher;
RDMAWorker *worker;
entity_addr_t sa;
public:
RDMAServerSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher *s,
RDMAServerSocketImpl(CephContext *cct, shared_ptr<Infiniband>& ib,
shared_ptr<RDMADispatcher>& rdma_dispatcher,
RDMAWorker *w, entity_addr_t& a, unsigned slot);
virtual int listen(entity_addr_t &sa, const SocketOptions &opt);
@ -331,8 +333,9 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl {
public:
RDMAIWARPServerSocketImpl(
CephContext *cct, shared_ptr<Infiniband>& ib, RDMADispatcher *s, RDMAWorker *w,
entity_addr_t& addr, unsigned addr_slot);
CephContext *cct, shared_ptr<Infiniband>& ib,
shared_ptr<RDMADispatcher>& rdma_dispatcher,
RDMAWorker* w, entity_addr_t& addr, unsigned addr_slot);
virtual int listen(entity_addr_t &sa, const SocketOptions &opt) override;
virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
virtual void abort_accept() override;
@ -345,7 +348,7 @@ class RDMAStack : public NetworkStack {
vector<std::thread> threads;
PerfCounters *perf_counter;
shared_ptr<Infiniband> ib;
RDMADispatcher dispatcher;
shared_ptr<RDMADispatcher> rdma_dispatcher;
std::atomic<bool> fork_finished = {false};
@ -357,7 +360,6 @@ class RDMAStack : public NetworkStack {
virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
virtual void join_worker(unsigned i) override;
RDMADispatcher &get_dispatcher() { return dispatcher; }
virtual bool is_ready() override { return fork_finished.load(); };
virtual void ready() override { fork_finished = true; };
};