msg: start on SocketConnection negotiation

Signed-off-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
Casey Bodley 2017-10-21 16:34:12 -04:00 committed by Kefu Chai
parent cf37715882
commit 3e5621b09d
4 changed files with 188 additions and 3 deletions

View File

@ -25,6 +25,12 @@ const std::error_category& net_category()
std::string message(int ev) const override {
switch (static_cast<error>(ev)) {
case error::bad_connect_banner:
return "bad connect banner";
case error::bad_peer_address:
return "bad peer address";
case error::negotiation_failure:
return "negotiation failure";
case error::read_eof:
return "read eof";
case error::connection_aborted:

View File

@ -20,6 +20,9 @@ namespace ceph::net {
/// net error codes
enum class error {
bad_connect_banner,
bad_peer_address,
negotiation_failure,
read_eof,
connection_aborted,
connection_refused,

View File

@ -12,6 +12,7 @@
*
*/
#include <algorithm>
#include <core/shared_future.hh>
#include "SocketConnection.h"
@ -28,7 +29,7 @@ SocketConnection::SocketConnection(Messenger *messenger,
socket(std::move(fd)),
in(socket.input()),
out(socket.output()),
send_ready(seastar::now())
send_ready(h.promise.get_future())
{
}
@ -156,12 +157,175 @@ seastar::future<> SocketConnection::close()
return seastar::when_all(in.close(), out.close()).discard_result();
}
// handshake
/// store the banner in a non-const string for buffer::create_static()
static char banner[] = CEPH_BANNER;
constexpr size_t banner_size = sizeof(CEPH_BANNER)-1;
constexpr size_t client_header_size = banner_size + sizeof(ceph_entity_addr);
constexpr size_t server_header_size = banner_size + 2 * sizeof(ceph_entity_addr);
WRITE_RAW_ENCODER(ceph_msg_connect);
WRITE_RAW_ENCODER(ceph_msg_connect_reply);
std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c)
{
return out << "connect{features=" << std::hex << c.features << std::dec
<< " host_type=" << c.host_type
<< " global_seq=" << c.global_seq
<< " connect_seq=" << c.connect_seq
<< " protocol_version=" << c.protocol_version
<< " authorizer_protocol=" << c.authorizer_protocol
<< " authorizer_len=" << c.authorizer_len
<< " flags=" << std::hex << static_cast<uint16_t>(c.flags) << std::dec << '}';
}
std::ostream& operator<<(std::ostream& out, const ceph_msg_connect_reply& r)
{
return out << "connect_reply{tag=" << static_cast<uint16_t>(r.tag)
<< " features=" << std::hex << r.features << std::dec
<< " global_seq=" << r.global_seq
<< " connect_seq=" << r.connect_seq
<< " protocol_version=" << r.protocol_version
<< " authorizer_len=" << r.authorizer_len
<< " flags=" << std::hex << static_cast<uint16_t>(r.flags) << std::dec << '}';
}
// check that the buffer starts with a valid banner without requiring it to
// be contiguous in memory
static void validate_banner(bufferlist::const_iterator& p)
{
auto b = std::cbegin(banner);
auto end = b + banner_size;
while (b != end) {
const char *buf{nullptr};
auto remaining = std::distance(b, end);
auto len = p.get_ptr_and_advance(remaining, &buf);
if (!std::equal(buf, buf + len, b)) {
throw std::system_error(make_error_code(error::bad_connect_banner));
}
b += len;
}
}
// make sure that we agree with the peer about its address
static void validate_peer_addr(const entity_addr_t& addr,
const entity_addr_t& expected)
{
if (addr == expected) {
return;
}
// ok if server bound anonymously, as long as port/nonce match
if (addr.is_blank_ip() &&
addr.get_port() == expected.get_port() &&
addr.get_nonce() == expected.get_nonce()) {
return;
} else {
throw std::system_error(make_error_code(error::bad_peer_address));
}
}
/// return a static bufferptr to the given object
template <typename T>
bufferptr create_static(T& obj)
{
return buffer::create_static(sizeof(obj), reinterpret_cast<char*>(&obj));
}
seastar::future<> SocketConnection::handle_connect()
{
memset(&h.reply, 0, sizeof(h.reply));
h.reply.protocol_version = CEPH_OSDC_PROTOCOL;
h.reply.tag = CEPH_MSGR_TAG_READY;
bufferlist bl;
bl.append(create_static(h.reply));
return out.write(std::move(bl))
.then([this] { return out.flush(); });
}
seastar::future<> SocketConnection::handle_connect_reply()
{
if (h.reply.tag != CEPH_MSGR_TAG_READY) {
throw std::system_error(make_error_code(error::negotiation_failure));
}
return seastar::now();
}
seastar::future<> SocketConnection::client_handshake()
{
return seastar::now(); // TODO
// read server's handshake header
return read(server_header_size)
.then([this] (bufferlist headerbl) {
auto p = headerbl.cbegin();
validate_banner(p);
entity_addr_t saddr, caddr;
::decode(saddr, p);
::decode(caddr, p);
assert(p.end());
validate_peer_addr(saddr, peer_addr);
if (my_addr != caddr) {
// take peer's address for me, but preserve my port/nonce
caddr.set_port(my_addr.get_port());
caddr.nonce = my_addr.nonce;
my_addr = caddr;
}
// encode/send client's handshake header
bufferlist bl;
bl.append(buffer::create_static(banner_size, banner));
::encode(my_addr, bl, 0);
// encode ceph_msg_connect
memset(&h.connect, 0, sizeof(h.connect));
h.connect.protocol_version = CEPH_OSDC_PROTOCOL;
bl.append(create_static(h.connect));
// TODO: append authorizer
return out.write(std::move(bl))
.then([this] { return out.flush(); });
}).then([this] {
// read the reply
return read(sizeof(h.reply));
}).then([this] (bufferlist bl) {
auto p = bl.begin();
::decode(h.reply, p);
// TODO: read authorizer
assert(p.end());
return handle_connect_reply();
}).then_wrapped([this] (auto fut) {
// satisfy the handshake's promise
fut.forward_to(std::move(h.promise));
});
}
seastar::future<> SocketConnection::server_handshake()
{
return seastar::now(); // TODO
// encode/send server's handshake header
bufferlist bl;
bl.append(buffer::create_static(banner_size, banner));
::encode(my_addr, bl, 0);
::encode(peer_addr, bl, 0);
return out.write(std::move(bl))
.then([this] { return out.flush(); })
.then([this] {
// read client's handshake header and connect request
return read(client_header_size + sizeof(h.connect));
}).then([this] (bufferlist bl) {
auto p = bl.cbegin();
validate_banner(p);
entity_addr_t addr;
::decode(addr, p);
::decode(h.connect, p);
assert(p.end());
// TODO: read authorizer
return handle_connect();
}).then_wrapped([this] (auto fut) {
// satisfy the handshake's promise
fut.forward_to(std::move(h.promise));
});
}

View File

@ -35,6 +35,18 @@ class SocketConnection : public Connection {
/// read the requested number of bytes into a bufferlist
seastar::future<bufferlist> read(size_t bytes);
/// state for handshake
struct Handshake {
ceph_msg_connect connect;
ceph_msg_connect_reply reply;
seastar::promise<> promise;
} h;
/// server side of handshake negotiation
seastar::future<> handle_connect();
/// client side of handshake negotiation
seastar::future<> handle_connect_reply();
/// state for an incoming message
struct MessageReader {
ceph_msg_header header;