diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc index 2c2e79a2cdf..eeb6ffe2f11 100644 --- a/src/msg/async/rdma/Infiniband.cc +++ b/src/msg/async/rdma/Infiniband.cc @@ -593,7 +593,7 @@ int Infiniband::MemoryManager::Cluster::add(uint32_t num) assert(m); new(chunk) Chunk(m, chunk_size, base+offset); free_chunks.push_back(chunk); - all_chunks.insert(chunk); + all_buffers.insert(chunk->buffer); ptr += sizeof(Chunk); } return 0; @@ -771,20 +771,6 @@ int Infiniband::get_tx_buffers(std::vector &c, size_t bytes) return memory_manager->get_send_buffers(c, bytes); } -int Infiniband::recall_chunk(Chunk* c) -{ - if (memory_manager->is_rx_chunk(c)) { - post_chunk(c); - return 1; - } else if (memory_manager->is_tx_chunk(c)) { - vector v; - v.push_back(c); - memory_manager->return_tx(v); - return 2; - } - return -1; -} - /** * Create a new QueuePair. This factory should be used in preference to * the QueuePair constructor directly, since this lets derivatives of diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h index 20bdcccbefb..d09bf80ce37 100644 --- a/src/msg/async/rdma/Infiniband.h +++ b/src/msg/async/rdma/Infiniband.h @@ -169,12 +169,17 @@ class Infiniband { int add(uint32_t num); void take_back(std::vector &ck); int get_buffers(std::vector &chunks, size_t bytes); + Chunk *get_chunk_by_buffer(const char *c) { + uint32_t idx = (c - base) / chunk_size; + Chunk *chunk = reinterpret_cast(chunk_base + sizeof(Chunk) * idx); + return chunk; + } MemoryManager& manager; uint32_t chunk_size; Mutex lock; std::vector free_chunks; - std::set all_chunks; + std::set all_buffers; char* base; char* chunk_base; }; @@ -188,8 +193,15 @@ class Infiniband { void return_tx(std::vector &chunks); int get_send_buffers(std::vector &c, size_t bytes); int get_channel_buffers(std::vector &chunks, size_t bytes); - int is_tx_chunk(Chunk* c) { return send->all_chunks.count(c);} - int is_rx_chunk(Chunk* c) { return channel->all_chunks.count(c);} + // TODO: optimize via address judgement + bool is_tx_buffer(const char* c) { return send->all_buffers.count(c); } + bool is_rx_buffer(const char* c) { return channel->all_buffers.count(c); } + Chunk *get_tx_chunk_by_buffer(const char *c) { + return send->get_chunk_by_buffer(c); + } + uint32_t get_tx_chunk_size() const { + return send->chunk_size; + } bool enabled_huge_page; @@ -349,9 +361,9 @@ class Infiniband { MemoryManager* get_memory_manager() { return memory_manager; } Device* get_device() { return device; } int get_async_fd() { return device->ctxt->async_fd; } - int recall_chunk(Chunk* c); - int is_tx_chunk(Chunk* c) { return memory_manager->is_tx_chunk(c); } - int is_rx_chunk(Chunk* c) { return memory_manager->is_rx_chunk(c); } + bool is_tx_buffer(const char* c) { return memory_manager->is_tx_buffer(c);} + bool is_rx_buffer(const char* c) { return memory_manager->is_rx_buffer(c);} + Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); } static const char* wc_status_to_string(int status); static const char* qp_state_string(int status); }; diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc index 2747193c0ba..a9666b66382 100644 --- a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -43,19 +43,24 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl() { ldout(cct, 20) << __func__ << " destruct." << dendl; dispatcher->perf_logger->dec(l_msgr_rdma_active_queue_pair); + cleanup(); worker->remove_pending_conn(this); dispatcher->erase_qpn(my_msg.qpn); - cleanup(); + Mutex::Locker l(lock); if (notify_fd >= 0) ::close(notify_fd); if (tcp_fd >= 0) ::close(tcp_fd); error = ECONNRESET; - Mutex::Locker l(lock); - for (unsigned i=0; i < wc.size(); ++i) - infiniband->recall_chunk(reinterpret_cast(wc[i].wr_id)); - for (unsigned i=0; i < buffers.size(); ++i) - infiniband->recall_chunk(buffers[i]); + int ret = 0; + for (unsigned i=0; i < wc.size(); ++i) { + ret = infiniband->post_chunk(reinterpret_cast(wc[i].wr_id)); + assert(ret == 0); + } + for (unsigned i=0; i < buffers.size(); ++i) { + ret = infiniband->post_chunk(buffers[i]); + assert(ret == 0); + } } void RDMAConnectedSocketImpl::pass_wc(std::vector &&v) @@ -266,27 +271,26 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) Chunk* chunk = reinterpret_cast(response->wr_id); ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl; chunk->prepare_read(response->byte_len); + worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len); if (response->byte_len == 0) { dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin); if (connected) { error = ECONNRESET; - assert(infiniband->post_chunk(chunk) == 0); ldout(cct, 20) << __func__ << " got remote close msg..." << dendl; } - break; - } - worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len); - //assert(response->byte_len); - if (read == (ssize_t)len) { - buffers.push_back(chunk); - ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl; - } else if (read + response->byte_len > (ssize_t)len) { - read += chunk->read(buf+read, (ssize_t)len-read); - buffers.push_back(chunk); - ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl; - } else { - read += chunk->read(buf+read, response->byte_len); assert(infiniband->post_chunk(chunk) == 0); + } else { + if (read == (ssize_t)len) { + buffers.push_back(chunk); + ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl; + } else if (read + response->byte_len > (ssize_t)len) { + read += chunk->read(buf+read, (ssize_t)len-read); + buffers.push_back(chunk); + ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl; + } else { + read += chunk->read(buf+read, response->byte_len); + assert(infiniband->post_chunk(chunk) == 0); + } } } @@ -306,7 +310,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len) { size_t read = 0, tmp = 0; - vector::iterator c = buffers.begin(); + auto c = buffers.begin(); for (; c != buffers.end() ; ++c) { tmp = (*c)->read(buf+read, len-read); read += tmp; @@ -404,39 +408,75 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more) if (error) return -error; Mutex::Locker l(lock); - std::vector tx_buffers; size_t bytes = pending_bl.length(); ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: " << pending_bl.buffers().size() << dendl; if (!bytes) return 0; - int ret = worker->reserve_message_buffer(this, tx_buffers, bytes); - if (ret == 0) { - ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl; - worker->perf_logger->inc(l_msgr_rdma_tx_no_mem); - return -EAGAIN; // that is ok , cause send will return bytes. == 0 enough buffers, < 0 no buffer, >0 not enough - } - vector::iterator current_buffer = tx_buffers.begin(); - list::const_iterator it = pending_bl.buffers().begin(); - unsigned total = 0; - while (it != pending_bl.buffers().end()) { - const uintptr_t addr = reinterpret_cast(it->c_str()); - unsigned copied = 0; - while (copied < it->length()) { - uint32_t r = (*current_buffer)->write((char*)addr+copied, it->length() - copied); - copied += r; - total += r; - if ((*current_buffer)->full()){ - ++current_buffer; - if (current_buffer == tx_buffers.end()) - goto sending; + auto fill_tx_via_copy = [this](std::vector &tx_buffers, unsigned bytes, + std::list::const_iterator &start, + std::list::const_iterator &end) -> unsigned { + assert(start != end); + auto chunk_idx = tx_buffers.size(); + int ret = worker->reserve_message_buffer(this, tx_buffers, bytes); + if (ret == 0) { + ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl; + worker->perf_logger->inc(l_msgr_rdma_tx_no_mem); + return 0; + } + + unsigned total_copied = 0; + Chunk *current_chunk = tx_buffers[chunk_idx]; + while (start != end) { + const uintptr_t addr = reinterpret_cast(start->c_str()); + unsigned copied = 0; + while (copied < start->length()) { + uint32_t r = current_chunk->write((char*)addr+copied, start->length() - copied); + copied += r; + total_copied += r; + bytes -= r; + if (current_chunk->full()){ + current_chunk = tx_buffers[++chunk_idx]; + if (chunk_idx == tx_buffers.size()) + return total_copied; + } } + ++start; + } + assert(bytes == 0); + return total_copied; + }; + + std::vector tx_buffers; + std::list::const_iterator it = pending_bl.buffers().begin(); + std::list::const_iterator copy_it = it; + unsigned total = 0; + unsigned need_reserve_bytes = 0; + while (it != pending_bl.buffers().end()) { + if (infiniband->is_tx_buffer(it->raw_c_str())) { + if (need_reserve_bytes) { + unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it); + total += copied; + if (copied < need_reserve_bytes) + goto sending; + need_reserve_bytes = 0; + } + assert(copy_it == it); + tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str())); + total += it->length(); + ++copy_it; + } else { + need_reserve_bytes += it->length(); } ++it; } + if (need_reserve_bytes) + total += fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it); sending: + if (total == 0) + return -EAGAIN; assert(total <= pending_bl.length()); bufferlist swapped; if (total < pending_bl.length()) { @@ -448,7 +488,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more) } ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers " - << pending_bl.buffers().size() << dendl; + << pending_bl.buffers().size() << " tx chunks " << tx_buffers.size() << dendl; int r = post_work_request(tx_buffers); if (r < 0) @@ -506,7 +546,7 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector &tx_buffers) return -errno; } worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size()); - ldout(cct, 20) << __func__ << " qp state is : " << Infiniband::qp_state_string(qp->get_state()) << dendl; + ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl; return 0; } @@ -541,7 +581,7 @@ void RDMAConnectedSocketImpl::cleanup() { void RDMAConnectedSocketImpl::notify() { uint64_t i = 1; - assert(write(notify_fd, &i, sizeof(i)) == sizeof(i)); + write(notify_fd, &i, sizeof(i)); } void RDMAConnectedSocketImpl::shutdown() diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index c1679572044..f5e81dd9d3c 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -17,6 +17,7 @@ #include #include "include/str_list.h" +#include "common/deleter.h" #include "RDMAStack.h" #define dout_subsys ceph_subsys_ms @@ -129,6 +130,7 @@ void RDMADispatcher::polling() RDMAConnectedSocketImpl *conn = nullptr; utime_t last_inactive = ceph_clock_now(); bool rearmed = false; + int ret = 0; while (true) { int n = rx_cq->poll_cq(MAX_COMPLETIONS, wc); @@ -191,29 +193,42 @@ void RDMADispatcher::polling() ibv_wc* response = &wc[i]; Chunk* chunk = reinterpret_cast(response->wr_id); + ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl; + + if (wc[i].opcode == IBV_WC_SEND) { + tx_cqe.push_back(wc[i]); + ldout(cct, 25) << " got a tx cqe, bytes:" << wc[i].byte_len << dendl; + continue; + } + if (response->status != IBV_WC_SUCCESS) { perf_logger->inc(l_msgr_rdma_rx_total_wc_errors); ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk << ") status(" << response->status << ":" - << ib->wc_status_to_string(response->status) << dendl; - ib->recall_chunk(chunk); - conn = get_conn_lockless(response->qp_num); - if (conn && conn->is_connected()) - conn->fault(); - notify_pending_workers(); + << ib->wc_status_to_string(response->status) << ")" << dendl; + if (ib->is_rx_buffer(chunk->buffer)) { + ret = ib->post_chunk(chunk); + if (ret) { + ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(ret) << dendl; + assert(ret == 0); + } + conn = get_conn_lockless(response->qp_num); + if (conn && conn->is_connected()) + conn->fault(); + notify_pending_workers(); + } else if (ib->is_tx_buffer(chunk->buffer)) { + tx_cqe.push_back(wc[i]); + } else { + ldout(cct, 0) << __func__ << " unknown chunk: " << chunk << dendl; + } continue; } - if (wc[i].opcode == IBV_WC_SEND) { - tx_cqe.push_back(wc[i]); - ldout(cct, 25) << " got a tx cqe, bytes:" << wc[i].byte_len << dendl; - continue; - } - ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl; conn = get_conn_lockless(response->qp_num); if (!conn) { - int ret = ib->recall_chunk(chunk); + ret = ib->post_chunk(chunk); ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << ret << dendl; + assert(ret == 0); continue; } polled[conn].push_back(*response); @@ -232,11 +247,11 @@ void RDMADispatcher::polling() } void RDMADispatcher::notify_pending_workers() { - Mutex::Locker l(w_lock); - if (pending_workers.empty()) - return ; - pending_workers.front()->pass_wc(std::move(vector())); - pending_workers.pop_front(); + Mutex::Locker l(w_lock); + if (pending_workers.empty()) + return ; + pending_workers.front()->pass_wc(std::move(vector())); + pending_workers.pop_front(); } int RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi) @@ -337,6 +352,7 @@ RDMAWorker::RDMAWorker(CephContext *c, unsigned i) plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer"); plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer"); plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted"); + plb.add_u64_counter(l_msgr_rdma_rx_no_registered_mem, "rx_no_registered_mem", "The count of no registered buffer when receiving"); plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted"); plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted"); @@ -380,15 +396,6 @@ void RDMAWorker::pass_wc(std::vector &&v) notify(); } -void RDMAWorker::add_pending_conn(RDMAConnectedSocketImpl* o) -{ - pending_sent_conns.push_back(o); - if (!pended) { - dispatcher->pending_buffers(this); - pended = true; - } -} - void RDMAWorker::get_wc(std::vector &w) { Mutex::Locker l(lock); @@ -429,15 +436,20 @@ int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vectorget_tx_buffers(c, bytes); if (r > 0) { - stack->get_dispatcher()->inflight += c.size(); - ldout(cct, 30) << __func__ << " reserve " << c.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl; + stack->get_dispatcher()->inflight += r; + ldout(cct, 30) << __func__ << " reserve " << r << " chunks, inflight " << dispatcher->inflight << dendl; return r; } assert(r == 0); - if (pending_sent_conns.back() != o) - pending_sent_conns.push_back(o); - dispatcher->pending_buffers(this); + if (o) { + { + Mutex::Locker l(lock); + if (pending_sent_conns.back() != o) + pending_sent_conns.push_back(o); + } + dispatcher->pending_buffers(this); + } return r; } @@ -449,35 +461,36 @@ int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector &chunks) +void RDMAWorker::post_tx_buffer(std::vector &chunks) { if (chunks.empty()) - return 0; + return ; - stack->get_dispatcher()->inflight -= chunks.size(); + dispatcher->inflight -= chunks.size(); memory_manager->return_tx(chunks); ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl; pended = false; std::set done; + Mutex::Locker l(lock); while (!pending_sent_conns.empty()) { RDMAConnectedSocketImpl *o = pending_sent_conns.front(); - if (done.count(o) == 0) { - done.insert(o); - } else { - pending_sent_conns.pop_front(); - continue; - } - ssize_t r = o->submit(false); - ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl; - if (r < 0) { - if (r == -EAGAIN) - break; - o->fault(); - } pending_sent_conns.pop_front(); + if (!done.count(o)) { + lock.Unlock(); + done.insert(o); + ssize_t r = o->submit(false); + ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl; + lock.Lock(); + if (r < 0) { + if (r == -EAGAIN) { + pending_sent_conns.push_front(o); + break; + } + o->fault(); + } + } } - return 0; } void RDMAWorker::handle_tx_event() @@ -507,7 +520,8 @@ void RDMAWorker::handle_tx_event() << infiniband->wc_status_to_string(response->status) << dendl; } RDMAConnectedSocketImpl *conn = stack->get_dispatcher()->get_conn_by_qp(response->qp_num); - if (conn) { + + if (conn && conn->is_connected()) { ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi conn->fault(); } else { @@ -515,12 +529,9 @@ void RDMAWorker::handle_tx_event() } } - //assert(memory_manager->is_tx_chunk(chunk)); - if (memory_manager->is_tx_chunk(chunk)) { + // FIXME: why not tx? + if (memory_manager->is_tx_buffer(chunk->buffer)) tx_chunks.push_back(chunk); - } else { - ldout(cct, 1) << __func__ << " a outter chunk: " << chunk << dendl;//fin - } } perf_logger->inc(l_msgr_rdma_tx_total_wc, cqe.size()); diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index 7386b3ba928..5bb200c3cb8 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -140,6 +140,7 @@ enum { l_msgr_rdma_tx_no_mem, l_msgr_rdma_tx_parital_mem, l_msgr_rdma_tx_failed, + l_msgr_rdma_rx_no_registered_mem, l_msgr_rdma_tx_chunks, l_msgr_rdma_tx_bytes, @@ -187,9 +188,11 @@ class RDMAWorker : public Worker { virtual void initialize() override; RDMAStack *get_stack() { return stack; } int reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector &c, size_t bytes); - int post_tx_buffer(std::vector &chunks); - void add_pending_conn(RDMAConnectedSocketImpl* o); - void remove_pending_conn(RDMAConnectedSocketImpl *o) { pending_sent_conns.remove(o); } + void post_tx_buffer(std::vector &chunks); + void remove_pending_conn(RDMAConnectedSocketImpl *o) { + Mutex::Locker l(lock); + pending_sent_conns.remove(o); + } void handle_tx_event(); void set_ib(Infiniband* ib) { infiniband = ib; } void set_stack(RDMAStack *s) { stack = s; }