crimson/net: support the lossless connection and auth

Signed-off-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2018-05-25 14:40:54 +08:00
parent b00346c759
commit a4c7dcd192
12 changed files with 734 additions and 85 deletions

View File

@ -1,4 +1,5 @@
set(crimson_net_srcs
Dispatcher.cc
Errors.cc
SocketConnection.cc
SocketMessenger.cc)

View File

@ -14,6 +14,7 @@
#pragma once
#include <queue>
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
#include <core/future.hh>
@ -21,6 +22,8 @@
namespace ceph::net {
using seq_num_t = uint64_t;
class Connection : public boost::intrusive_ref_counter<Connection> {
protected:
Messenger *const messenger;
@ -42,7 +45,8 @@ class Connection : public boost::intrusive_ref_counter<Connection> {
virtual bool is_connected() = 0;
/// complete a handshake from the client's perspective
virtual seastar::future<> client_handshake() = 0;
virtual seastar::future<> client_handshake(entity_type_t peer_type,
entity_type_t host_type) = 0;
/// complete a handshake from the server's perspective
virtual seastar::future<> server_handshake() = 0;
@ -55,6 +59,35 @@ class Connection : public boost::intrusive_ref_counter<Connection> {
/// close the connection and cancel any any pending futures from read/send
virtual seastar::future<> close() = 0;
/// move all messages in the sent list back into the queue
virtual void requeue_sent() = 0;
/// get all messages in the out queue
virtual std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() = 0;
public:
enum class state_t {
none,
open,
standby,
closed,
wait
};
/// the number of connections initiated in this session, increment when a
/// new connection is established
virtual uint32_t connect_seq() const = 0;
/// the client side should connect us with a gseq. it will be reset with a
/// the one of exsting connection if it's greater.
virtual uint32_t peer_global_seq() const = 0;
virtual seq_num_t rx_seq_num() const = 0;
/// current state of connection
virtual state_t get_state() const = 0;
virtual bool is_server_side() const = 0;
virtual bool is_lossy() const = 0;
};
} // namespace ceph::net

View File

@ -0,0 +1,11 @@
#include "auth/Auth.h"
#include "Dispatcher.h"
namespace ceph::net
{
seastar::future<std::unique_ptr<AuthAuthorizer>>
Dispatcher::ms_get_authorizer(peer_type_t, bool force_new)
{
return seastar::make_ready_future<std::unique_ptr<AuthAuthorizer>>(nullptr);
}
}

View File

@ -18,8 +18,9 @@
#include "Fwd.h"
namespace ceph {
namespace net {
class AuthAuthorizer;
namespace ceph::net {
class Dispatcher {
public:
@ -45,8 +46,14 @@ class Dispatcher {
return seastar::make_ready_future<>();
}
// TODO: authorizer
virtual seastar::future<msgr_tag_t, bufferlist>
ms_verify_authorizer(peer_type_t,
auth_proto_t,
bufferlist&) {
return seastar::make_ready_future<msgr_tag_t, bufferlist>(0, bufferlist{});
}
virtual seastar::future<std::unique_ptr<AuthAuthorizer>>
ms_get_authorizer(peer_type_t, bool force_new);
};
} // namespace net
} // namespace ceph
} // namespace ceph::net

View File

@ -18,12 +18,18 @@
#include "Errors.h"
#include "msg/msg_types.h"
#include "msg/Message.h"
using peer_type_t = int;
using auth_proto_t = int;
class Message;
using MessageRef = boost::intrusive_ptr<Message>;
namespace ceph::net {
using msgr_tag_t = uint8_t;
class Connection;
using ConnectionRef = boost::intrusive_ptr<Connection>;

View File

@ -25,6 +25,8 @@ namespace ceph::net {
class Messenger {
entity_name_t my_name;
entity_addr_t my_addr;
uint32_t global_seq = 0;
uint32_t crc_flags = 0;
public:
Messenger(const entity_name_t& name)
@ -45,11 +47,42 @@ class Messenger {
virtual seastar::future<> start(Dispatcher *dispatcher) = 0;
/// establish a client connection and complete a handshake
virtual seastar::future<ConnectionRef> connect(const entity_addr_t& addr) = 0;
virtual seastar::future<ConnectionRef> connect(const entity_addr_t& addr,
entity_type_t peer_type) = 0;
/// stop listenening and wait for all connections to close. safe to destruct
/// after this future becomes available
virtual seastar::future<> shutdown() = 0;
uint32_t get_global_seq(uint32_t old=0) {
if (old > global_seq) {
global_seq = old;
}
return ++global_seq;
}
ConnectionRef lookup_conn(const entity_addr_t&) {
// TODO: replace handling
return nullptr;
}
// @returns a tuple of <is_valid, auth_reply, session_key>
virtual seastar::future<msgr_tag_t, /// tag for error, 0 if authorized
bufferlist> /// auth_reply
verify_authorizer(peer_type_t peer_type,
auth_proto_t protocol,
bufferlist& auth) = 0;
virtual seastar::future<std::unique_ptr<AuthAuthorizer>>
get_authorizer(peer_type_t peer_type,
bool force_new) = 0;
uint32_t get_crc_flags() const {
return crc_flags;
}
void set_crc_data() {
crc_flags |= MSG_CRC_DATA;
}
void set_crc_header() {
crc_flags |= MSG_CRC_HEADER;
}
};
} // namespace ceph::net

View File

@ -14,10 +14,16 @@
#include <algorithm>
#include <core/shared_future.hh>
#include <core/sleep.hh>
#include "Config.h"
#include "Messenger.h"
#include "SocketConnection.h"
#include "include/msgr.h"
#include "include/random.h"
#include "auth/Auth.h"
#include "auth/AuthSessionHandler.h"
#include "msg/Message.h"
using namespace ceph::net;
@ -88,6 +94,9 @@ struct bufferlist_consumer {
seastar::future<bufferlist> SocketConnection::read(size_t bytes)
{
if (bytes == 0) {
return seastar::make_ready_future<bufferlist>();
}
r.buffer.clear();
r.remaining = bytes;
return in.consume(bufferlist_consumer{r.buffer, r.remaining})
@ -113,35 +122,16 @@ void SocketConnection::read_tags_until_next_message()
// stop looping and notify read_header()
return seastar::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::yes);
case CEPH_MSGR_TAG_ACK:
return in.read_exactly(sizeof(ceph_le64))
.then([] (auto buf) {
auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
std::cout << "ack " << *seq << std::endl;
return seastar::stop_iteration::no;
});
return handle_ack();
case CEPH_MSGR_TAG_KEEPALIVE:
break;
case CEPH_MSGR_TAG_KEEPALIVE2:
return in.read_exactly(sizeof(ceph_timespec))
.then([] (auto buf) {
auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
std::cout << "keepalive2 " << t->tv_sec << std::endl;
// TODO: schedule ack
return seastar::stop_iteration::no;
});
return handle_keepalive2()
.then([this] { return seastar::stop_iteration::no; });
case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
return in.read_exactly(sizeof(ceph_timespec))
.then([] (auto buf) {
auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
std::cout << "keepalive2 ack " << t->tv_sec << std::endl;
return seastar::stop_iteration::no;
});
return handle_keepalive2_ack()
.then([this] { return seastar::stop_iteration::no; });
case CEPH_MSGR_TAG_CLOSE:
std::cout << "close" << std::endl;
break;
@ -156,6 +146,35 @@ void SocketConnection::read_tags_until_next_message()
});
}
seastar::future<seastar::stop_iteration> SocketConnection::handle_ack()
{
return in.read_exactly(sizeof(ceph_le64))
.then([this] (auto buf) {
auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
discard_up_to(&sent, *seq);
return seastar::stop_iteration::no;
});
}
void SocketConnection::discard_up_to(std::queue<MessageRef>* queue,
seq_num_t seq)
{
while (!queue->empty() &&
queue->front()->get_seq() < seq) {
queue->pop();
}
}
void SocketConnection::requeue_sent()
{
out_seq -= sent.size();
while (!sent.empty()) {
auto m = sent.front();
sent.pop();
out_q.push(std::move(m));
}
}
seastar::future<MessageRef> SocketConnection::read_message()
{
return on_message.get_future()
@ -190,18 +209,75 @@ seastar::future<MessageRef> SocketConnection::read_message()
m.front, m.middle, m.data, nullptr);
constexpr bool add_ref = false; // Message starts with 1 ref
return MessageRef{msg, add_ref};
}).then([this] (MessageRef msg) {
if (msg) {
// TODO: set time stamps
msg->set_byte_throttler(policy.throttler_bytes);
if (!update_rx_seq(msg->get_seq())) {
msg.reset();
}
}
return msg;
});
}
bool SocketConnection::update_rx_seq(seq_num_t seq)
{
if (seq <= in_seq) {
if (has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
conf.ms_die_on_old_message) {
assert(0 == "old msgs despite reconnect_seq feature");
}
return false;
} else if (seq > in_seq + 1) {
if (conf.ms_die_on_skipped_message) {
assert(0 == "skipped incoming seq");
}
return false;
} else {
in_seq = seq;
return true;
}
}
seastar::future<> SocketConnection::write_message(MessageRef msg)
{
msg->set_seq(++out_seq);
msg->encode(features, get_messenger()->get_crc_flags());
bufferlist bl;
unsigned char tag = CEPH_MSGR_TAG_MSG;
encode(tag, bl);
encode_message(msg.get(), 0, bl);
bl.append(CEPH_MSGR_TAG_MSG);
auto& header = msg->get_header();
bl.append((const char*)&header, sizeof(header));
bl.append(msg->get_payload());
bl.append(msg->get_middle());
bl.append(msg->get_data());
auto& footer = msg->get_footer();
if (has_feature(CEPH_FEATURE_MSG_AUTH)) {
bl.append((const char*)&footer, sizeof(footer));
} else {
ceph_msg_footer_old old_footer;
if (get_messenger()->get_crc_flags() & MSG_CRC_HEADER) {
old_footer.front_crc = footer.front_crc;
old_footer.middle_crc = footer.middle_crc;
} else {
old_footer.front_crc = old_footer.middle_crc = 0;
}
if (get_messenger()->get_crc_flags() & MSG_CRC_DATA) {
old_footer.data_crc = footer.data_crc;
} else {
old_footer.data_crc = 0;
}
old_footer.flags = footer.flags;
bl.append((const char*)&old_footer, sizeof(old_footer));
}
// write as a seastar::net::packet
return out.write(std::move(bl))
.then([this] { return out.flush(); });
.then([this] { return out.flush(); })
.then([this, msg = std::move(msg)] {
if (!policy.lossy) {
sent.push(std::move(msg));
}
});
}
seastar::future<> SocketConnection::send(MessageRef msg)
@ -299,29 +375,379 @@ bufferptr create_static(T& obj)
return buffer::create_static(sizeof(obj), reinterpret_cast<char*>(&obj));
}
bool SocketConnection::require_auth_feature() const
{
if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) {
return false;
}
if (conf.cephx_require_signatures) {
return true;
}
if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD ||
h.connect.host_type == CEPH_ENTITY_TYPE_MDS) {
return conf.cephx_cluster_require_signatures;
} else {
return conf.cephx_service_require_signatures;
}
}
uint32_t SocketConnection::get_proto_version(entity_type_t peer_type, bool connect) const
{
constexpr entity_type_t my_type = CEPH_ENTITY_TYPE_OSD;
// see also OSD.h, unlike other connection of simple/async messenger,
// crimson msgr is only used by osd
constexpr uint32_t CEPH_OSD_PROTOCOL = 10;
if (peer_type == my_type) {
// internal
return CEPH_OSD_PROTOCOL;
} else {
// public
switch (connect ? peer_type : my_type) {
case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
default: return 0;
}
}
}
seastar::future<> SocketConnection::handle_connect()
{
memset(&h.reply, 0, sizeof(h.reply));
h.reply.protocol_version = CEPH_OSDC_PROTOCOL;
h.reply.tag = CEPH_MSGR_TAG_READY;
bufferlist bl;
bl.append(create_static(h.reply));
return out.write(std::move(bl))
.then([this] { return out.flush(); });
return read(sizeof(h.connect))
.then([this](bufferlist bl) {
auto p = bl.cbegin();
::decode(h.connect, p);
return read(h.connect.authorizer_len);
}).then([this] (bufferlist authorizer) {
if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) {
return seastar::make_ready_future<msgr_tag_t, bufferlist>(
CEPH_MSGR_TAG_BADPROTOVER, bufferlist{});
}
if (require_auth_feature()) {
policy.features_required |= CEPH_FEATURE_MSG_AUTH;
}
if (auto feat_missing = policy.features_required & ~(uint64_t)h.connect.features;
feat_missing != 0) {
return seastar::make_ready_future<msgr_tag_t, bufferlist>(
CEPH_MSGR_TAG_FEATURES, bufferlist{});
}
return get_messenger()->verify_authorizer(get_peer_type(),
h.connect.authorizer_protocol,
authorizer);
}).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) {
if (tag) {
return send_connect_reply(tag, std::move(authorizer_reply));
}
if (auto existing = get_messenger()->lookup_conn(peer_addr); existing) {
return handle_connect_with_existing(existing, std::move(authorizer_reply));
} else if (h.connect.connect_seq > 0) {
return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION,
std::move(authorizer_reply));
}
h.connect_seq = h.connect.connect_seq + 1;
h.peer_global_seq = h.connect.global_seq;
set_features((uint64_t)h.reply.features & (uint64_t)h.connect.features);
// TODO: cct
return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply));
});
}
seastar::future<> SocketConnection::handle_connect_reply()
seastar::future<>
SocketConnection::send_connect_reply(msgr_tag_t tag,
bufferlist&& authorizer_reply)
{
if (h.reply.tag != CEPH_MSGR_TAG_READY) {
h.reply.tag = tag;
h.reply.features = static_cast<uint64_t>((h.connect.features &
policy.features_supported) |
policy.features_required);
h.reply.authorizer_len = authorizer_reply.length();
return out.write(reinterpret_cast<const char*>(&h.reply), sizeof(h.reply))
.then([this, reply=std::move(authorizer_reply)]() mutable {
out.write(std::move(reply));
}).then([this] {
return out.flush();
});
}
seastar::future<>
SocketConnection::send_connect_reply_ready(msgr_tag_t tag,
bufferlist&& authorizer_reply)
{
h.reply.tag = tag;
h.reply.features = policy.features_supported;
h.reply.global_seq = get_messenger()->get_global_seq();
h.reply.connect_seq = h.connect_seq;
h.reply.flags = 0;
if (policy.lossy) {
h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY;
}
h.reply.authorizer_len = authorizer_reply.length();
return out.write(reinterpret_cast<const char*>(&h.reply), sizeof(h.reply))
.then([this, reply=std::move(authorizer_reply)]() mutable {
if (reply.length()) {
return out.write(std::move(reply));
} else {
return seastar::now();
}
}).then([this] {
if (h.reply.tag == CEPH_MSGR_TAG_SEQ) {
return out.write(reinterpret_cast<const char*>(&in_seq),
sizeof(in_seq)).then([this] {
return out.flush();
}).then([this] {
return in.read_exactly(sizeof(seq_num_t));
}).then([this] (auto buf) {
auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
discard_up_to(&out_q, *acked_seq);
});
} else {
return out.flush();
}
}).then([this] {
state = state_t::open;
});
}
seastar::future<>
SocketConnection::handle_keepalive2()
{
return in.read_exactly(sizeof(ceph_timespec))
.then([this] (auto buf) {
auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
k.reply_stamp = *t;
std::cout << "keepalive2 " << t->tv_sec << std::endl;
char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK;
return out.write(reinterpret_cast<const char*>(&tag), sizeof(tag));
}).then([this] {
out.write(reinterpret_cast<const char*>(&k.reply_stamp),
sizeof(k.reply_stamp));
}).then([this] {
return out.flush();
});
}
seastar::future<>
SocketConnection::handle_keepalive2_ack()
{
return in.read_exactly(sizeof(ceph_timespec))
.then([this] (auto buf) {
auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
k.ack_stamp = *t;
std::cout << "keepalive2 ack " << t->tv_sec << std::endl;
});
}
seastar::future<>
SocketConnection::handle_connect_with_existing(ConnectionRef existing, bufferlist&& authorizer_reply)
{
if (h.connect.global_seq < existing->peer_global_seq()) {
h.reply.global_seq = existing->peer_global_seq();
return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL);
} else if (existing->is_lossy()) {
return replace_existing(existing, std::move(authorizer_reply));
} else if (h.connect.connect_seq == 0 && existing->connect_seq() > 0) {
return replace_existing(existing, std::move(authorizer_reply), true);
} else if (h.connect.connect_seq < existing->connect_seq()) {
// old attempt, or we sent READY but they didn't get it.
h.reply.connect_seq = existing->connect_seq() + 1;
return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
} else if (h.connect.connect_seq == existing->connect_seq()) {
// if the existing connection successfully opened, and/or
// subsequently went to standby, then the peer should bump
// their connect_seq and retry: this is not a connection race
// we need to resolve here.
if (existing->get_state() == state_t::open ||
existing->get_state() == state_t::standby) {
if (policy.resetcheck && existing->connect_seq() == 0) {
return replace_existing(existing, std::move(authorizer_reply));
} else {
h.reply.connect_seq = existing->connect_seq() + 1;
return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
}
} else if (get_peer_addr() < get_my_addr() ||
existing->is_server_side()) {
// incoming wins
return replace_existing(existing, std::move(authorizer_reply));
} else {
return send_connect_reply(CEPH_MSGR_TAG_WAIT);
}
} else if (policy.resetcheck &&
existing->connect_seq() == 0) {
return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION);
} else {
return replace_existing(existing, std::move(authorizer_reply));
}
}
seastar::future<> SocketConnection::replace_existing(ConnectionRef existing,
bufferlist&& authorizer_reply,
bool is_reset_from_peer)
{
msgr_tag_t reply_tag;
if ((h.connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) {
reply_tag = CEPH_MSGR_TAG_SEQ;
} else {
reply_tag = CEPH_MSGR_TAG_READY;
}
get_messenger()->unregister_conn(existing);
if (!existing->is_lossy()) {
// reset the in_seq if this is a hard reset from peer,
// otherwise we respect our original connection's value
in_seq = is_reset_from_peer ? 0 : existing->rx_seq_num();
// steal outgoing queue and out_seq
existing->requeue_sent();
std::tie(out_seq, out_q) = existing->get_out_queue();
}
return send_connect_reply_ready(reply_tag, std::move(authorizer_reply));
}
seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag)
{
switch (tag) {
case CEPH_MSGR_TAG_FEATURES:
return fault();
case CEPH_MSGR_TAG_BADPROTOVER:
return fault();
case CEPH_MSGR_TAG_BADAUTHORIZER:
if (h.got_bad_auth) {
throw std::system_error(make_error_code(error::negotiation_failure));
}
h.got_bad_auth = true;
// try harder
return get_messenger()->get_authorizer(h.peer_type, true)
.then([this](auto&& auth) {
h.authorizer = std::move(auth);
return seastar::now();
});
case CEPH_MSGR_TAG_RESETSESSION:
reset_session();
return seastar::now();
case CEPH_MSGR_TAG_RETRY_GLOBAL:
h.global_seq = get_messenger()->get_global_seq(h.reply.global_seq);
return seastar::now();
case CEPH_MSGR_TAG_RETRY_SESSION:
assert(h.reply.connect_seq > h.connect_seq);
h.connect_seq = h.reply.connect_seq;
return seastar::now();
case CEPH_MSGR_TAG_WAIT:
return fault();
case CEPH_MSGR_TAG_SEQ:
break;
case CEPH_MSGR_TAG_READY:
break;
}
if (auto missing = (policy.features_required & ~(uint64_t)h.reply.features);
missing) {
return fault();
}
if (h.reply.tag == CEPH_MSGR_TAG_SEQ) {
return in.read_exactly(sizeof(seq_num_t))
.then([this] (auto buf) {
auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
discard_up_to(&out_q, *acked_seq);
}).then([this] {
return out.write(reinterpret_cast<const char*>(&in_seq), sizeof(in_seq));
}).then([this] {
return out.flush();
}).then([this] {
return handle_connect_reply(CEPH_MSGR_TAG_READY);
});
}
if (h.reply.tag == CEPH_MSGR_TAG_READY) {
// hooray!
h.peer_global_seq = h.reply.global_seq;
policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY;
state = state_t::open;
h.connect_seq++;
h.backoff = 0ms;
set_features(h.reply.features & h.connect.features);
if (h.authorizer) {
session_security.reset(
get_auth_session_handler(nullptr,
h.authorizer->protocol,
h.authorizer->session_key,
features));
}
h.authorizer.reset();
return seastar::now();
} else {
// unknown tag
throw std::system_error(make_error_code(error::negotiation_failure));
}
return seastar::now();
}
seastar::future<> SocketConnection::client_handshake()
void SocketConnection::reset_session()
{
decltype(out_q){}.swap(out_q);
decltype(sent){}.swap(sent);
in_seq = 0;
h.connect_seq = 0;
if (has_feature(CEPH_FEATURE_MSG_AUTH)) {
// Set out_seq to a random value, so CRC won't be predictable.
// Constant to limit starting sequence number to 2^31. Nothing special
// about it, just a big number.
constexpr uint64_t SEQ_MASK = 0x7fffffff;
out_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK);
} else {
// previously, seq #'s always started at 0.
out_seq = 0;
}
}
seastar::future<> SocketConnection::connect(entity_type_t peer_type,
entity_type_t host_type)
{
// encode ceph_msg_connect
h.peer_type = peer_type;
memset(&h.connect, 0, sizeof(h.connect));
h.connect.features = policy.features_supported;
h.connect.host_type = host_type;
h.connect.global_seq = h.global_seq;
h.connect.connect_seq = h.connect_seq;
h.connect.protocol_version = get_proto_version(peer_type, true);
// this is fyi, actually, server decides!
h.connect.flags = policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0;
return get_messenger()->get_authorizer(peer_type, false)
.then([this](auto&& auth) {
h.authorizer = std::move(auth);
bufferlist bl;
if (h.authorizer) {
h.connect.authorizer_protocol = h.authorizer->protocol;
h.connect.authorizer_len = h.authorizer->bl.length();
bl.append(create_static(h.connect));
bl.append(h.authorizer->bl);
} else {
h.connect.authorizer_protocol = 0;
h.connect.authorizer_len = 0;
bl.append(create_static(h.connect));
};
return bl;
}).then([this](bufferlist&& bl) {
return out.write(std::move(bl));
}).then([this] {
return out.flush();
}).then([this] {
// read the reply
return read(sizeof(h.reply));
}).then([this] (bufferlist bl) {
auto p = bl.cbegin();
::decode(h.reply, p);
assert(p.end());
return read(h.reply.authorizer_len);
}).then([this] (bufferlist bl) {
if (h.authorizer) {
auto reply = bl.cbegin();
if (!h.authorizer->verify_reply(reply)) {
throw std::system_error(make_error_code(error::negotiation_failure));
}
}
return handle_connect_reply(h.reply.tag);
});
}
seastar::future<> SocketConnection::client_handshake(entity_type_t peer_type,
entity_type_t host_type)
{
// read server's handshake header
return read(server_header_size)
@ -344,24 +770,11 @@ seastar::future<> SocketConnection::client_handshake()
bufferlist bl;
bl.append(buffer::create_static(banner_size, banner));
::encode(my_addr, bl, 0);
// encode ceph_msg_connect
memset(&h.connect, 0, sizeof(h.connect));
h.connect.protocol_version = CEPH_OSDC_PROTOCOL;
bl.append(create_static(h.connect));
// TODO: append authorizer
return out.write(std::move(bl))
.then([this] { return out.flush(); });
}).then([this] {
// read the reply
return read(sizeof(h.reply));
}).then([this] (bufferlist bl) {
auto p = bl.begin();
::decode(h.reply, p);
// TODO: read authorizer
assert(p.end());
return handle_connect_reply();
return out.write(std::move(bl)).then([this] { return out.flush(); });
}).then([=] {
}).then([=] {
return seastar::do_until([=] { return state == state_t::open; },
[=] { return connect(peer_type, host_type); });
}).then([this] {
// start background processing of tags
read_tags_until_next_message();
@ -382,17 +795,19 @@ seastar::future<> SocketConnection::server_handshake()
.then([this] { return out.flush(); })
.then([this] {
// read client's handshake header and connect request
return read(client_header_size + sizeof(h.connect));
return read(client_header_size);
}).then([this] (bufferlist bl) {
auto p = bl.cbegin();
validate_banner(p);
entity_addr_t addr;
::decode(addr, p);
::decode(h.connect, p);
assert(p.end());
// TODO: read authorizer
return handle_connect();
if (!addr.is_blank_ip()) {
peer_addr = addr;
}
}).then([this] {
return seastar::do_until([this] { return state == state_t::open; },
[this] { return handle_connect(); });
}).then([this] {
// start background processing of tags
read_tags_until_next_message();
@ -401,3 +816,16 @@ seastar::future<> SocketConnection::server_handshake()
fut.forward_to(std::move(h.promise));
});
}
seastar::future<> SocketConnection::fault()
{
if (h.backoff.count()) {
h.backoff += h.backoff;
} else {
h.backoff = conf.ms_initial_backoff;
}
if (h.backoff > conf.ms_max_backoff) {
h.backoff = conf.ms_max_backoff;
}
return seastar::sleep(h.backoff);
}

View File

@ -16,16 +16,20 @@
#include <core/reactor.hh>
#include "msg/Policy.h"
#include "Connection.h"
namespace ceph {
namespace net {
class AuthSessionHandler;
namespace ceph::net {
class SocketConnection : public Connection {
seastar::connected_socket socket;
seastar::input_stream<char> in;
seastar::output_stream<char> out;
state_t state = state_t::none;
/// buffer state for read()
struct Reader {
bufferlist buffer;
@ -39,13 +43,40 @@ class SocketConnection : public Connection {
struct Handshake {
ceph_msg_connect connect;
ceph_msg_connect_reply reply;
bool got_bad_auth = false;
std::unique_ptr<AuthAuthorizer> authorizer;
peer_type_t peer_type;
std::chrono::milliseconds backoff;
uint32_t connect_seq = 0;
uint32_t peer_global_seq = 0;
uint32_t global_seq;
seastar::promise<> promise;
} h;
/// server side of handshake negotiation
seastar::future<> handle_connect();
seastar::future<> handle_connect_with_existing(ConnectionRef existing,
bufferlist&& authorizer_reply);
seastar::future<> replace_existing(ConnectionRef existing,
bufferlist&& authorizer_reply,
bool is_reset_from_peer = false);
seastar::future<> send_connect_reply(ceph::net::msgr_tag_t tag,
bufferlist&& authorizer_reply = {});
seastar::future<> send_connect_reply_ready(ceph::net::msgr_tag_t tag,
bufferlist&& authorizer_reply);
seastar::future<> handle_keepalive2();
seastar::future<> handle_keepalive2_ack();
bool require_auth_feature() const;
int get_peer_type() const {
return h.connect.host_type;
}
uint32_t get_proto_version(entity_type_t peer_type, bool connec) const;
/// client side of handshake negotiation
seastar::future<> handle_connect_reply();
seastar::future<> connect(entity_type_t peer_type, entity_type_t host_type);
seastar::future<> handle_connect_reply(ceph::net::msgr_tag_t tag);
void reset_session();
/// state for an incoming message
struct MessageReader {
@ -61,6 +92,7 @@ class SocketConnection : public Connection {
seastar::promise<> on_message;
void read_tags_until_next_message();
seastar::future<seastar::stop_iteration> handle_ack();
/// becomes available when handshake completes, and when all previous messages
/// have been sent to the output stream. send() chains new messages as
@ -70,6 +102,39 @@ class SocketConnection : public Connection {
/// encode/write a message
seastar::future<> write_message(MessageRef msg);
ceph::net::Policy policy;
uint64_t features;
void set_features(uint64_t new_features) {
features = new_features;
}
bool has_feature(uint64_t feature) const {
return features & feature;
}
/// the seq num of the last transmitted message
seq_num_t out_seq = 0;
/// the seq num of the last received message
seq_num_t in_seq = 0;
/// update the seq num of last received message
/// @returns true if the @c seq is valid, and @c in_seq is updated,
/// false otherwise.
bool update_rx_seq(seq_num_t seq);
std::unique_ptr<AuthSessionHandler> session_security;
// messages to be resent after connection gets reset
std::queue<MessageRef> out_q;
// messages sent, but not yet acked by peer
std::queue<MessageRef> sent;
static void discard_up_to(std::queue<MessageRef>*, seq_num_t);
struct Keepalive {
ceph_timespec reply_stamp;
ceph_timespec ack_stamp;
} k;
seastar::future<> fault();
public:
SocketConnection(Messenger *messenger,
const entity_addr_t& my_addr,
@ -79,7 +144,8 @@ class SocketConnection : public Connection {
bool is_connected() override;
seastar::future<> client_handshake() override;
seastar::future<> client_handshake(entity_type_t peer_type,
entity_type_t host_type) override;
seastar::future<> server_handshake() override;
@ -88,7 +154,32 @@ class SocketConnection : public Connection {
seastar::future<> send(MessageRef msg) override;
seastar::future<> close() override;
uint32_t connect_seq() const override {
return h.connect_seq;
}
uint32_t peer_global_seq() const override {
return h.peer_global_seq;
}
seq_num_t rx_seq_num() const {
return in_seq;
}
state_t get_state() const override {
return state;
}
bool is_server_side() const override {
return policy.server;
}
bool is_lossy() const override {
return policy.lossy;
}
private:
void requeue_sent() override;
std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() override {
return {out_seq, std::move(out_q)};
}
};
} // namespace net
} // namespace ceph
} // namespace ceph::net

View File

@ -12,6 +12,7 @@
*
*/
#include "auth/Auth.h"
#include "SocketMessenger.h"
#include "SocketConnection.h"
#include "Dispatcher.h"
@ -113,8 +114,9 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp)
return seastar::now();
}
seastar::future<ceph::net::ConnectionRef> SocketMessenger::connect(const entity_addr_t& addr,
const entity_addr_t& myaddr)
seastar::future<ceph::net::ConnectionRef>
SocketMessenger::connect(const entity_addr_t& addr, entity_type_t peer_type,
const entity_addr_t& myaddr, entity_type_t host_type)
{
if (auto found = std::find_if(connections.begin(),
connections.end(),
@ -129,11 +131,12 @@ seastar::future<ceph::net::ConnectionRef> SocketMessenger::connect(const entity_
ConnectionRef conn = new SocketConnection(this, get_myaddr(), addr,
std::move(socket));
// complete the handshake before returning to the caller
return conn->client_handshake()
return conn->client_handshake(peer_type, host_type)
.handle_exception([conn] (std::exception_ptr eptr) {
// close the connection before returning errors
return seastar::make_exception_future<>(eptr)
.finally([conn] { return conn->close(); });
// TODO: retry on fault
}).then([=] {
dispatcher->ms_handle_connect(conn);
// dispatch replies on this connection
@ -154,3 +157,27 @@ seastar::future<> SocketMessenger::shutdown()
return conn->close();
}).finally([this] { connections.clear(); });
}
seastar::future<msgr_tag_t, bufferlist>
SocketMessenger::verify_authorizer(peer_type_t peer_type,
auth_proto_t protocol,
bufferlist& auth)
{
if (dispatcher) {
return dispatcher->ms_verify_authorizer(peer_type, protocol, auth);
} else {
return seastar::make_ready_future<msgr_tag_t, bufferlist>(
CEPH_MSGR_TAG_BADAUTHORIZER,
bufferlist{});
}
}
seastar::future<std::unique_ptr<AuthAuthorizer>>
SocketMessenger::get_authorizer(peer_type_t peer_type, bool force_new)
{
if (dispatcher) {
return dispatcher->ms_get_authorizer(peer_type, force_new);
} else {
return seastar::make_ready_future<std::unique_ptr<AuthAuthorizer>>(nullptr);
}
}

View File

@ -22,9 +22,11 @@
namespace ceph::net {
class SocketMessenger : public Messenger {
class SocketMessenger final : public Messenger {
boost::optional<seastar::server_socket> listener;
Dispatcher *dispatcher = nullptr;
uint32_t global_seq = 0;
std::list<ConnectionRef> connections;
seastar::future<> dispatch(ConnectionRef conn);
@ -40,9 +42,18 @@ class SocketMessenger : public Messenger {
seastar::future<> start(Dispatcher *dispatcher) override;
seastar::future<ConnectionRef> connect(const entity_addr_t& addr,
const entity_addr_t& myaddr) override;
entity_type_t peer_type,
const entity_addr_t& myaddr,
entity_type_t host_type) override;
seastar::future<> shutdown() override;
seastar::future<msgr_tag_t, bufferlist>
verify_authorizer(peer_type_t peer_type,
auth_proto_t protocol,
bufferlist& auth) override;
seastar::future<std::unique_ptr<AuthAuthorizer>>
get_authorizer(peer_type_t peer_type,
bool force_new) override;
};
} // namespace ceph::net

View File

@ -15,7 +15,8 @@ target_link_libraries(unittest_seastar_denc ceph-common global Seastar::seastar)
set(test_messenger_srcs
test_messenger.cc
$<TARGET_OBJECTS:seastar_buffer_obj>
$<TARGET_OBJECTS:crimson_net_objs>)
$<TARGET_OBJECTS:crimson_net_objs>
$<TARGET_OBJECTS:crimson_thread_objs>)
add_executable(unittest_seastar_messenger ${test_messenger_srcs})
add_ceph_unittest(unittest_seastar_messenger)
target_link_libraries(unittest_seastar_messenger ceph-common Seastar::seastar)

View File

@ -46,7 +46,7 @@ static seastar::future<> test_echo()
.then([&] {
return t.client.messenger.start(&t.client.dispatcher)
.then([&] {
return t.client.messenger.connect(t.addr);
return t.client.messenger.connect(t.addr, entity_name_t::TYPE_OSD);
}).then([] (ceph::net::ConnectionRef conn) {
std::cout << "client connected" << std::endl;
return conn->send(MessageRef{new MPing(), false});