crimson/net: implement factory method of Socket

Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
This commit is contained in:
Yingxin Cheng 2019-02-15 11:34:16 +08:00 committed by Kefu Chai
parent 9282d1a5d2
commit 32ad076ceb
3 changed files with 40 additions and 16 deletions

View File

@ -296,14 +296,16 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
messenger.register_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
seastar::with_gate(pending_dispatch, [this] {
return seastar::connect(conn.peer_addr.in4_addr())
.then([this](seastar::connected_socket fd) {
return Socket::connect(conn.peer_addr)
.then([this](SocketFRef sock) {
socket = std::move(sock);
if (state == state_t::closing) {
fd.shutdown_input();
fd.shutdown_output();
throw std::system_error(make_error_code(error::connection_aborted));
return socket->close().then([] {
throw std::system_error(make_error_code(error::connection_aborted));
});
}
socket = seastar::make_foreign(std::make_unique<Socket>(std::move(fd)));
return seastar::now();
}).then([this] {
// read server's handshake header
return socket->read(server_header_size);
}).then([this] (bufferlist headerbl) {

View File

@ -8,6 +8,7 @@
#include <seastar/net/packet.hh>
#include "include/buffer.h"
#include "msg/msg_types.h"
namespace ceph::net {
@ -27,14 +28,39 @@ class Socket
size_t remaining;
} r;
struct construct_tag {};
public:
explicit Socket(seastar::connected_socket&& _socket)
Socket(seastar::connected_socket&& _socket, construct_tag)
: sid{seastar::engine().cpu_id()},
socket(std::move(_socket)),
in(socket.input()),
out(socket.output()) {}
Socket(Socket&& o) = delete;
static seastar::future<SocketFRef>
connect(const entity_addr_t& peer_addr) {
return seastar::connect(peer_addr.in4_addr())
.then([] (seastar::connected_socket socket) {
return seastar::make_foreign(std::make_unique<Socket>(std::move(socket),
construct_tag{}));
});
}
static seastar::future<SocketFRef, entity_addr_t>
accept(seastar::server_socket& listener) {
return listener.accept().then([] (seastar::connected_socket socket,
seastar::socket_address paddr) {
entity_addr_t peer_addr;
peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
return seastar::make_ready_future<SocketFRef, entity_addr_t>(
seastar::make_foreign(std::make_unique<Socket>(std::move(socket),
construct_tag{})),
peer_addr);
});
}
/// read the requested number of bytes into a bufferlist
seastar::future<bufferlist> read(size_t bytes);
using tmp_buf = seastar::temporary_buffer<char>;

View File

@ -146,18 +146,14 @@ seastar::future<> SocketMessenger::do_start(Dispatcher *disp)
// start listening if bind() was called
if (listener) {
seastar::keep_doing([this] {
return listener->accept()
.then([this] (seastar::connected_socket socket,
seastar::socket_address paddr) {
// allocate the connection
entity_addr_t peer_addr;
peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
return Socket::accept(*listener)
.then([this] (SocketFRef socket,
entity_addr_t peer_addr) {
auto shard = locate_shard(peer_addr);
// don't wait before accepting another
#warning fixme
// we currently do dangerous i/o from a Connection core, different from the Socket core.
auto sock = seastar::make_foreign(std::make_unique<Socket>(std::move(socket)));
// don't wait before accepting another
container().invoke_on(shard, [sock = std::move(sock), peer_addr, this](auto& msgr) mutable {
container().invoke_on(shard, [sock = std::move(socket), peer_addr, this](auto& msgr) mutable {
SocketConnectionRef conn = seastar::make_shared<SocketConnection>(msgr, *msgr.dispatcher);
conn->start_accept(std::move(sock), peer_addr);
});