msg/async/rdma: cleanup

Signed-off-by: Haomai Wang <haomai@xsky.com>
This commit is contained in:
Haomai Wang 2017-02-18 01:02:54 +08:00
parent bd527486f6
commit a9b13b21e8
5 changed files with 176 additions and 124 deletions

View File

@ -593,7 +593,7 @@ int Infiniband::MemoryManager::Cluster::add(uint32_t num)
assert(m);
new(chunk) Chunk(m, chunk_size, base+offset);
free_chunks.push_back(chunk);
all_chunks.insert(chunk);
all_buffers.insert(chunk->buffer);
ptr += sizeof(Chunk);
}
return 0;
@ -771,20 +771,6 @@ int Infiniband::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
return memory_manager->get_send_buffers(c, bytes);
}
int Infiniband::recall_chunk(Chunk* c)
{
if (memory_manager->is_rx_chunk(c)) {
post_chunk(c);
return 1;
} else if (memory_manager->is_tx_chunk(c)) {
vector<Chunk*> v;
v.push_back(c);
memory_manager->return_tx(v);
return 2;
}
return -1;
}
/**
* Create a new QueuePair. This factory should be used in preference to
* the QueuePair constructor directly, since this lets derivatives of

View File

@ -169,12 +169,17 @@ class Infiniband {
int add(uint32_t num);
void take_back(std::vector<Chunk*> &ck);
int get_buffers(std::vector<Chunk*> &chunks, size_t bytes);
Chunk *get_chunk_by_buffer(const char *c) {
uint32_t idx = (c - base) / chunk_size;
Chunk *chunk = reinterpret_cast<Chunk*>(chunk_base + sizeof(Chunk) * idx);
return chunk;
}
MemoryManager& manager;
uint32_t chunk_size;
Mutex lock;
std::vector<Chunk*> free_chunks;
std::set<Chunk*> all_chunks;
std::set<const char*> all_buffers;
char* base;
char* chunk_base;
};
@ -188,8 +193,15 @@ class Infiniband {
void return_tx(std::vector<Chunk*> &chunks);
int get_send_buffers(std::vector<Chunk*> &c, size_t bytes);
int get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes);
int is_tx_chunk(Chunk* c) { return send->all_chunks.count(c);}
int is_rx_chunk(Chunk* c) { return channel->all_chunks.count(c);}
// TODO: optimize via address judgement
bool is_tx_buffer(const char* c) { return send->all_buffers.count(c); }
bool is_rx_buffer(const char* c) { return channel->all_buffers.count(c); }
Chunk *get_tx_chunk_by_buffer(const char *c) {
return send->get_chunk_by_buffer(c);
}
uint32_t get_tx_chunk_size() const {
return send->chunk_size;
}
bool enabled_huge_page;
@ -349,9 +361,9 @@ class Infiniband {
MemoryManager* get_memory_manager() { return memory_manager; }
Device* get_device() { return device; }
int get_async_fd() { return device->ctxt->async_fd; }
int recall_chunk(Chunk* c);
int is_tx_chunk(Chunk* c) { return memory_manager->is_tx_chunk(c); }
int is_rx_chunk(Chunk* c) { return memory_manager->is_rx_chunk(c); }
bool is_tx_buffer(const char* c) { return memory_manager->is_tx_buffer(c);}
bool is_rx_buffer(const char* c) { return memory_manager->is_rx_buffer(c);}
Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); }
static const char* wc_status_to_string(int status);
static const char* qp_state_string(int status);
};

View File

@ -43,19 +43,24 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
{
ldout(cct, 20) << __func__ << " destruct." << dendl;
dispatcher->perf_logger->dec(l_msgr_rdma_active_queue_pair);
cleanup();
worker->remove_pending_conn(this);
dispatcher->erase_qpn(my_msg.qpn);
cleanup();
Mutex::Locker l(lock);
if (notify_fd >= 0)
::close(notify_fd);
if (tcp_fd >= 0)
::close(tcp_fd);
error = ECONNRESET;
Mutex::Locker l(lock);
for (unsigned i=0; i < wc.size(); ++i)
infiniband->recall_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
for (unsigned i=0; i < buffers.size(); ++i)
infiniband->recall_chunk(buffers[i]);
int ret = 0;
for (unsigned i=0; i < wc.size(); ++i) {
ret = infiniband->post_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
assert(ret == 0);
}
for (unsigned i=0; i < buffers.size(); ++i) {
ret = infiniband->post_chunk(buffers[i]);
assert(ret == 0);
}
}
void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
@ -266,27 +271,26 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl;
chunk->prepare_read(response->byte_len);
worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
if (response->byte_len == 0) {
dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
if (connected) {
error = ECONNRESET;
assert(infiniband->post_chunk(chunk) == 0);
ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
}
break;
}
worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
//assert(response->byte_len);
if (read == (ssize_t)len) {
buffers.push_back(chunk);
ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;
} else if (read + response->byte_len > (ssize_t)len) {
read += chunk->read(buf+read, (ssize_t)len-read);
buffers.push_back(chunk);
ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
} else {
read += chunk->read(buf+read, response->byte_len);
assert(infiniband->post_chunk(chunk) == 0);
} else {
if (read == (ssize_t)len) {
buffers.push_back(chunk);
ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;
} else if (read + response->byte_len > (ssize_t)len) {
read += chunk->read(buf+read, (ssize_t)len-read);
buffers.push_back(chunk);
ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
} else {
read += chunk->read(buf+read, response->byte_len);
assert(infiniband->post_chunk(chunk) == 0);
}
}
}
@ -306,7 +310,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
{
size_t read = 0, tmp = 0;
vector<Chunk*>::iterator c = buffers.begin();
auto c = buffers.begin();
for (; c != buffers.end() ; ++c) {
tmp = (*c)->read(buf+read, len-read);
read += tmp;
@ -404,39 +408,75 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
if (error)
return -error;
Mutex::Locker l(lock);
std::vector<Chunk*> tx_buffers;
size_t bytes = pending_bl.length();
ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: "
<< pending_bl.buffers().size() << dendl;
if (!bytes)
return 0;
int ret = worker->reserve_message_buffer(this, tx_buffers, bytes);
if (ret == 0) {
ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
return -EAGAIN; // that is ok , cause send will return bytes. == 0 enough buffers, < 0 no buffer, >0 not enough
}
vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
list<bufferptr>::const_iterator it = pending_bl.buffers().begin();
unsigned total = 0;
while (it != pending_bl.buffers().end()) {
const uintptr_t addr = reinterpret_cast<const uintptr_t>(it->c_str());
unsigned copied = 0;
while (copied < it->length()) {
uint32_t r = (*current_buffer)->write((char*)addr+copied, it->length() - copied);
copied += r;
total += r;
if ((*current_buffer)->full()){
++current_buffer;
if (current_buffer == tx_buffers.end())
goto sending;
auto fill_tx_via_copy = [this](std::vector<Chunk*> &tx_buffers, unsigned bytes,
std::list<bufferptr>::const_iterator &start,
std::list<bufferptr>::const_iterator &end) -> unsigned {
assert(start != end);
auto chunk_idx = tx_buffers.size();
int ret = worker->reserve_message_buffer(this, tx_buffers, bytes);
if (ret == 0) {
ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
return 0;
}
unsigned total_copied = 0;
Chunk *current_chunk = tx_buffers[chunk_idx];
while (start != end) {
const uintptr_t addr = reinterpret_cast<const uintptr_t>(start->c_str());
unsigned copied = 0;
while (copied < start->length()) {
uint32_t r = current_chunk->write((char*)addr+copied, start->length() - copied);
copied += r;
total_copied += r;
bytes -= r;
if (current_chunk->full()){
current_chunk = tx_buffers[++chunk_idx];
if (chunk_idx == tx_buffers.size())
return total_copied;
}
}
++start;
}
assert(bytes == 0);
return total_copied;
};
std::vector<Chunk*> tx_buffers;
std::list<bufferptr>::const_iterator it = pending_bl.buffers().begin();
std::list<bufferptr>::const_iterator copy_it = it;
unsigned total = 0;
unsigned need_reserve_bytes = 0;
while (it != pending_bl.buffers().end()) {
if (infiniband->is_tx_buffer(it->raw_c_str())) {
if (need_reserve_bytes) {
unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
total += copied;
if (copied < need_reserve_bytes)
goto sending;
need_reserve_bytes = 0;
}
assert(copy_it == it);
tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str()));
total += it->length();
++copy_it;
} else {
need_reserve_bytes += it->length();
}
++it;
}
if (need_reserve_bytes)
total += fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
sending:
if (total == 0)
return -EAGAIN;
assert(total <= pending_bl.length());
bufferlist swapped;
if (total < pending_bl.length()) {
@ -448,7 +488,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
}
ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers "
<< pending_bl.buffers().size() << dendl;
<< pending_bl.buffers().size() << " tx chunks " << tx_buffers.size() << dendl;
int r = post_work_request(tx_buffers);
if (r < 0)
@ -506,7 +546,7 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
return -errno;
}
worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size());
ldout(cct, 20) << __func__ << " qp state is : " << Infiniband::qp_state_string(qp->get_state()) << dendl;
ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
return 0;
}
@ -541,7 +581,7 @@ void RDMAConnectedSocketImpl::cleanup() {
void RDMAConnectedSocketImpl::notify()
{
uint64_t i = 1;
assert(write(notify_fd, &i, sizeof(i)) == sizeof(i));
write(notify_fd, &i, sizeof(i));
}
void RDMAConnectedSocketImpl::shutdown()

View File

@ -17,6 +17,7 @@
#include <poll.h>
#include "include/str_list.h"
#include "common/deleter.h"
#include "RDMAStack.h"
#define dout_subsys ceph_subsys_ms
@ -129,6 +130,7 @@ void RDMADispatcher::polling()
RDMAConnectedSocketImpl *conn = nullptr;
utime_t last_inactive = ceph_clock_now();
bool rearmed = false;
int ret = 0;
while (true) {
int n = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
@ -191,29 +193,42 @@ void RDMADispatcher::polling()
ibv_wc* response = &wc[i];
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl;
if (wc[i].opcode == IBV_WC_SEND) {
tx_cqe.push_back(wc[i]);
ldout(cct, 25) << " got a tx cqe, bytes:" << wc[i].byte_len << dendl;
continue;
}
if (response->status != IBV_WC_SUCCESS) {
perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
<< ") status(" << response->status << ":"
<< ib->wc_status_to_string(response->status) << dendl;
ib->recall_chunk(chunk);
conn = get_conn_lockless(response->qp_num);
if (conn && conn->is_connected())
conn->fault();
notify_pending_workers();
<< ib->wc_status_to_string(response->status) << ")" << dendl;
if (ib->is_rx_buffer(chunk->buffer)) {
ret = ib->post_chunk(chunk);
if (ret) {
ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(ret) << dendl;
assert(ret == 0);
}
conn = get_conn_lockless(response->qp_num);
if (conn && conn->is_connected())
conn->fault();
notify_pending_workers();
} else if (ib->is_tx_buffer(chunk->buffer)) {
tx_cqe.push_back(wc[i]);
} else {
ldout(cct, 0) << __func__ << " unknown chunk: " << chunk << dendl;
}
continue;
}
if (wc[i].opcode == IBV_WC_SEND) {
tx_cqe.push_back(wc[i]);
ldout(cct, 25) << " got a tx cqe, bytes:" << wc[i].byte_len << dendl;
continue;
}
ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl;
conn = get_conn_lockless(response->qp_num);
if (!conn) {
int ret = ib->recall_chunk(chunk);
ret = ib->post_chunk(chunk);
ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << ret << dendl;
assert(ret == 0);
continue;
}
polled[conn].push_back(*response);
@ -232,11 +247,11 @@ void RDMADispatcher::polling()
}
void RDMADispatcher::notify_pending_workers() {
Mutex::Locker l(w_lock);
if (pending_workers.empty())
return ;
pending_workers.front()->pass_wc(std::move(vector<ibv_wc>()));
pending_workers.pop_front();
Mutex::Locker l(w_lock);
if (pending_workers.empty())
return ;
pending_workers.front()->pass_wc(std::move(vector<ibv_wc>()));
pending_workers.pop_front();
}
int RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
@ -337,6 +352,7 @@ RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer");
plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer");
plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted");
plb.add_u64_counter(l_msgr_rdma_rx_no_registered_mem, "rx_no_registered_mem", "The count of no registered buffer when receiving");
plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted");
plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted");
@ -380,15 +396,6 @@ void RDMAWorker::pass_wc(std::vector<ibv_wc> &&v)
notify();
}
void RDMAWorker::add_pending_conn(RDMAConnectedSocketImpl* o)
{
pending_sent_conns.push_back(o);
if (!pended) {
dispatcher->pending_buffers(this);
pended = true;
}
}
void RDMAWorker::get_wc(std::vector<ibv_wc> &w)
{
Mutex::Locker l(lock);
@ -429,15 +436,20 @@ int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<C
{
int r = infiniband->get_tx_buffers(c, bytes);
if (r > 0) {
stack->get_dispatcher()->inflight += c.size();
ldout(cct, 30) << __func__ << " reserve " << c.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl;
stack->get_dispatcher()->inflight += r;
ldout(cct, 30) << __func__ << " reserve " << r << " chunks, inflight " << dispatcher->inflight << dendl;
return r;
}
assert(r == 0);
if (pending_sent_conns.back() != o)
pending_sent_conns.push_back(o);
dispatcher->pending_buffers(this);
if (o) {
{
Mutex::Locker l(lock);
if (pending_sent_conns.back() != o)
pending_sent_conns.push_back(o);
}
dispatcher->pending_buffers(this);
}
return r;
}
@ -449,35 +461,36 @@ int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<C
* \return
* 0 if success or -1 for failure
*/
int RDMAWorker::post_tx_buffer(std::vector<Chunk*> &chunks)
void RDMAWorker::post_tx_buffer(std::vector<Chunk*> &chunks)
{
if (chunks.empty())
return 0;
return ;
stack->get_dispatcher()->inflight -= chunks.size();
dispatcher->inflight -= chunks.size();
memory_manager->return_tx(chunks);
ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl;
pended = false;
std::set<RDMAConnectedSocketImpl*> done;
Mutex::Locker l(lock);
while (!pending_sent_conns.empty()) {
RDMAConnectedSocketImpl *o = pending_sent_conns.front();
if (done.count(o) == 0) {
done.insert(o);
} else {
pending_sent_conns.pop_front();
continue;
}
ssize_t r = o->submit(false);
ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
if (r < 0) {
if (r == -EAGAIN)
break;
o->fault();
}
pending_sent_conns.pop_front();
if (!done.count(o)) {
lock.Unlock();
done.insert(o);
ssize_t r = o->submit(false);
ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
lock.Lock();
if (r < 0) {
if (r == -EAGAIN) {
pending_sent_conns.push_front(o);
break;
}
o->fault();
}
}
}
return 0;
}
void RDMAWorker::handle_tx_event()
@ -507,7 +520,8 @@ void RDMAWorker::handle_tx_event()
<< infiniband->wc_status_to_string(response->status) << dendl;
}
RDMAConnectedSocketImpl *conn = stack->get_dispatcher()->get_conn_by_qp(response->qp_num);
if (conn) {
if (conn && conn->is_connected()) {
ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi
conn->fault();
} else {
@ -515,12 +529,9 @@ void RDMAWorker::handle_tx_event()
}
}
//assert(memory_manager->is_tx_chunk(chunk));
if (memory_manager->is_tx_chunk(chunk)) {
// FIXME: why not tx?
if (memory_manager->is_tx_buffer(chunk->buffer))
tx_chunks.push_back(chunk);
} else {
ldout(cct, 1) << __func__ << " a outter chunk: " << chunk << dendl;//fin
}
}
perf_logger->inc(l_msgr_rdma_tx_total_wc, cqe.size());

View File

@ -140,6 +140,7 @@ enum {
l_msgr_rdma_tx_no_mem,
l_msgr_rdma_tx_parital_mem,
l_msgr_rdma_tx_failed,
l_msgr_rdma_rx_no_registered_mem,
l_msgr_rdma_tx_chunks,
l_msgr_rdma_tx_bytes,
@ -187,9 +188,11 @@ class RDMAWorker : public Worker {
virtual void initialize() override;
RDMAStack *get_stack() { return stack; }
int reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
int post_tx_buffer(std::vector<Chunk*> &chunks);
void add_pending_conn(RDMAConnectedSocketImpl* o);
void remove_pending_conn(RDMAConnectedSocketImpl *o) { pending_sent_conns.remove(o); }
void post_tx_buffer(std::vector<Chunk*> &chunks);
void remove_pending_conn(RDMAConnectedSocketImpl *o) {
Mutex::Locker l(lock);
pending_sent_conns.remove(o);
}
void handle_tx_event();
void set_ib(Infiniband* ib) { infiniband = ib; }
void set_stack(RDMAStack *s) { stack = s; }