Merge pull request #28921 from rzarzynski/wip-msgr-drop-0c

msg/async: drop zero_copy_read() & co from ConnectedSocket.

Reviewed-by: Haomai Wang <haomai@xsky.com>
Reviewed-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2020-01-07 16:31:49 +08:00 committed by GitHub
commit 726b0de68f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 9 additions and 80 deletions

View File

@ -62,10 +62,6 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
}
}
ssize_t zero_copy_read(bufferptr&) override {
return -EOPNOTSUPP;
}
ssize_t read(char *buf, size_t len) override {
ssize_t r = ::read(_fd, buf, len);
if (r < 0)

View File

@ -28,7 +28,6 @@ class ConnectedSocketImpl {
virtual ~ConnectedSocketImpl() {}
virtual int is_connected() = 0;
virtual ssize_t read(char*, size_t) = 0;
virtual ssize_t zero_copy_read(bufferptr&) = 0;
virtual ssize_t send(bufferlist &bl, bool more) = 0;
virtual void shutdown() = 0;
virtual void close() = 0;
@ -94,12 +93,6 @@ class ConnectedSocket {
ssize_t read(char* buf, size_t len) {
return _csi->read(buf, len);
}
/// Gets the input stream.
///
/// Gets an object returning data sent from the remote endpoint.
ssize_t zero_copy_read(bufferptr &data) {
return _csi->zero_copy_read(data);
}
/// Gets the output stream.
///
/// Gets an object that sends data to the remote endpoint.
@ -325,8 +318,6 @@ class NetworkStack {
static Worker* create_worker(
CephContext *c, const string &t, unsigned i);
// backend need to override this method if supports zero copy read
virtual bool support_zero_copy_read() const { return false; }
// backend need to override this method if backend doesn't support shared
// listen table.
// For example, posix backend has in kernel global listen table. If one

View File

@ -97,7 +97,8 @@ class NativeConnectedSocketImpl : public ConnectedSocketImpl {
return len - left ? len - left : -EAGAIN;
}
virtual ssize_t zero_copy_read(bufferptr &data) override {
private:
ssize_t zero_copy_read(bufferptr &data) {
auto err = _conn.get_errno();
if (err <= 0)
return err;
@ -171,6 +172,8 @@ class NativeConnectedSocketImpl : public ConnectedSocketImpl {
return _conn.send(Packet(std::move(frags), make_deleter(std::move(del))));
}
}
public:
virtual void shutdown() override {
_conn.close_write();
}
@ -250,7 +253,6 @@ class DPDKStack : public NetworkStack {
explicit DPDKStack(CephContext *cct, const string &t): NetworkStack(cct, t) {
funcs.resize(cct->_conf->ms_async_max_op_threads);
}
virtual bool support_zero_copy_read() const override { return true; }
virtual bool support_local_listen_table() const override { return true; }
virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;

View File

@ -275,53 +275,6 @@ ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
return read_size;
}
ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)
{
if (error)
return -error;
static const int MAX_COMPLETIONS = 16;
ibv_wc wc[MAX_COMPLETIONS];
ssize_t size = 0;
ibv_wc* response;
Chunk* chunk;
bool loaded = false;
auto iter = buffers.begin();
if (iter != buffers.end()) {
chunk = *iter;
// FIXME need to handle release
// auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
buffers.erase(iter);
loaded = true;
size = chunk->bound;
}
std::vector<ibv_wc> cqe;
get_wc(cqe);
if (cqe.empty())
return size == 0 ? -EAGAIN : size;
ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl;
for (size_t i = 0; i < cqe.size(); ++i) {
response = &wc[i];
chunk = reinterpret_cast<Chunk*>(response->wr_id);
chunk->prepare_read(response->byte_len);
if (!loaded && i == 0) {
// FIXME need to handle release
// auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
size = chunk->bound;
continue;
}
buffers.push_back(chunk);
iter++;
}
if (size == 0)
return -EAGAIN;
return size;
}
ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
{
if (error) {

View File

@ -212,7 +212,6 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
virtual int is_connected() override { return connected; }
virtual ssize_t read(char* buf, size_t len) override;
virtual ssize_t zero_copy_read(bufferptr &data) override;
virtual ssize_t send(bufferlist &bl, bool more) override;
virtual void shutdown() override;
virtual void close() override;
@ -339,7 +338,6 @@ class RDMAStack : public NetworkStack {
public:
explicit RDMAStack(CephContext *cct, const string &t);
virtual ~RDMAStack();
virtual bool support_zero_copy_read() const override { return false; }
virtual bool nonblock_connect_need_writable_event() const override { return false; }
virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;

View File

@ -853,11 +853,7 @@ class StressFactory {
while (true) {
char buf[4096];
bufferptr data;
if (factory->zero_copy_read) {
r = socket.zero_copy_read(data);
} else {
r = socket.read(buf, sizeof(buf));
}
r = socket.read(buf, sizeof(buf));
ASSERT_TRUE(r == -EAGAIN || (r >= 0 && (size_t)r <= sizeof(buf)));
if (r == 0) {
ASSERT_TRUE(buffers.empty());
@ -865,11 +861,7 @@ class StressFactory {
return ;
} else if (r == -EAGAIN)
break;
if (factory->zero_copy_read) {
buffers.emplace_back(data.c_str(), 0, data.length());
} else {
buffers.emplace_back(buf, 0, r);
}
buffers.emplace_back(buf, 0, r);
std::cerr << " server " << this << " receive " << r << " content: " << std::endl;
}
if (!buffers.empty() && !write_enabled)
@ -961,14 +953,12 @@ class StressFactory {
atomic_int message_count, message_left;
entity_addr_t bind_addr;
std::atomic_bool already_bind = {false};
bool zero_copy_read;
SocketOptions options;
explicit StressFactory(const std::shared_ptr<NetworkStack> &s, const string &addr,
size_t cli, size_t qd, size_t mc, size_t l, bool zero_copy)
size_t cli, size_t qd, size_t mc, size_t l)
: stack(s), rs(128), client_num(cli), queue_depth(qd),
max_message_length(l), message_count(mc), message_left(mc),
zero_copy_read(zero_copy) {
max_message_length(l), message_count(mc), message_left(mc) {
bind_addr.parse(addr.c_str());
rs.prepare(100);
}
@ -1054,8 +1044,7 @@ class StressFactory {
};
TEST_P(NetworkWorkerTest, StressTest) {
StressFactory factory(stack, get_addr(), 16, 16, 10000, 1024,
strncmp(GetParam(), "dpdk", 4) == 0);
StressFactory factory(stack, get_addr(), 16, 16, 10000, 1024);
StressFactory *f = &factory;
exec_events([f](Worker *worker) mutable {
f->start(worker);