Merge pull request #29057 from cyx1231st/wip-seastar-msgr-loggingv2

crimson/net: clean-up and fixes of messenger

Reviewed-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2019-07-19 18:34:48 +08:00 committed by GitHub
commit 86ac0ab529
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 508 additions and 273 deletions

View File

@ -83,7 +83,7 @@ public:
// v1 and v2
seastar::future<> close();
bool is_my_peer(const entity_addr_t& addr) const;
AuthAuthorizer* get_authorizer(peer_type_t peer) const;
AuthAuthorizer* get_authorizer(entity_type_t peer) const;
KeyStore& get_keys();
seastar::future<> renew_tickets();
seastar::future<> renew_rotating_keyring();
@ -176,7 +176,7 @@ seastar::future<> Connection::renew_rotating_keyring()
});
}
AuthAuthorizer* Connection::get_authorizer(peer_type_t peer) const
AuthAuthorizer* Connection::get_authorizer(entity_type_t peer) const
{
if (auth) {
return auth->build_authorizer(peer);

View File

@ -25,14 +25,17 @@ namespace ceph::net {
using seq_num_t = uint64_t;
class Connection : public seastar::enable_shared_from_this<Connection> {
entity_name_t peer_name = {0, -1};
protected:
entity_addr_t peer_addr;
peer_type_t peer_type = -1;
int64_t peer_id = -1;
using clock_t = seastar::lowres_system_clock;
clock_t::time_point last_keepalive;
clock_t::time_point last_keepalive_ack;
void set_peer_type(entity_type_t peer_type) { peer_name._type = peer_type; }
void set_peer_id(int64_t peer_id) { peer_name._num = peer_id; }
public:
uint64_t peer_global_id = 0;
@ -42,15 +45,15 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
virtual Messenger* get_messenger() const = 0;
const entity_addr_t& get_peer_addr() const { return peer_addr; }
virtual int get_peer_type() const = 0;
const entity_name_t& get_peer_name() const { return peer_name; }
entity_type_t get_peer_type() const { return peer_name.type(); }
int64_t get_peer_id() const { return peer_name.num(); }
bool peer_is_mon() const { return get_peer_type() == CEPH_ENTITY_TYPE_MON; }
bool peer_is_mgr() const { return get_peer_type() == CEPH_ENTITY_TYPE_MGR; }
bool peer_is_mds() const { return get_peer_type() == CEPH_ENTITY_TYPE_MDS; }
bool peer_is_osd() const { return get_peer_type() == CEPH_ENTITY_TYPE_OSD; }
bool peer_is_client() const {
return get_peer_type() == CEPH_ENTITY_TYPE_CLIENT;
}
bool peer_is_mon() const { return peer_name.is_mon(); }
bool peer_is_mgr() const { return peer_name.is_mgr(); }
bool peer_is_mds() const { return peer_name.is_mds(); }
bool peer_is_osd() const { return peer_name.is_osd(); }
bool peer_is_client() const { return peer_name.is_client(); }
/// true if the handshake has completed and no errors have been encountered
virtual seastar::future<bool> is_connected() = 0;

View File

@ -48,7 +48,7 @@ class Dispatcher {
}
virtual seastar::future<msgr_tag_t, bufferlist>
ms_verify_authorizer(peer_type_t,
ms_verify_authorizer(entity_type_t,
auth_proto_t,
bufferlist&) {
return seastar::make_ready_future<msgr_tag_t, bufferlist>(0, bufferlist{});

View File

@ -20,7 +20,6 @@
#include "msg/msg_types.h"
#include "msg/Message.h"
using peer_type_t = int;
using auth_proto_t = int;
class AuthConnectionMeta;

View File

@ -47,7 +47,7 @@ public:
{}
virtual ~Messenger() {}
int get_mytype() const { return my_name.type(); }
entity_type_t get_mytype() const { return my_name.type(); }
const entity_name_t& get_myname() const { return my_name; }
const entity_addrvec_t& get_myaddrs() const { return my_addrs; }
entity_addr_t get_myaddr() const { return my_addrs.front(); }

View File

@ -84,24 +84,6 @@ void validate_banner(bufferlist::const_iterator& p)
}
}
// make sure that we agree with the peer about its address
void validate_peer_addr(const entity_addr_t& addr,
const entity_addr_t& expected)
{
if (addr == expected) {
return;
}
// ok if server bound anonymously, as long as port/nonce match
if (addr.is_blank_ip() &&
addr.get_port() == expected.get_port() &&
addr.get_nonce() == expected.get_nonce()) {
return;
} else {
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
}
// return a static bufferptr to the given object
template <typename T>
bufferptr create_static(T& obj)
@ -258,7 +240,7 @@ ProtocolV1::handle_connect_reply(msgr_tag_t tag)
ceph::bufferlist ProtocolV1::get_auth_payload()
{
// only non-mons connectings to mons use MAuth messages
if (conn.peer_type == CEPH_ENTITY_TYPE_MON &&
if (conn.peer_is_mon() &&
messenger.get_mytype() != CEPH_ENTITY_TYPE_MON) {
return {};
} else {
@ -284,7 +266,7 @@ ProtocolV1::repeat_connect()
h.connect.host_type = messenger.get_myname().type();
h.connect.global_seq = h.global_seq;
h.connect.connect_seq = h.connect_seq;
h.connect.protocol_version = get_proto_version(conn.peer_type, true);
h.connect.protocol_version = get_proto_version(conn.get_peer_type(), true);
// this is fyi, actually, server decides!
h.connect.flags = conn.policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0;
@ -325,9 +307,13 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
state = state_t::connecting;
set_write_state(write_state_t::delay);
// we don't know my ephemeral port yet
conn.set_ephemeral_port(0, SocketConnection::side_t::none);
ceph_assert(!socket);
conn.peer_addr = _peer_addr;
conn.peer_type = _peer_type;
conn.target_addr = _peer_addr;
conn.set_peer_type(_peer_type);
conn.policy = messenger.get_policy(_peer_type);
messenger.register_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
seastar::with_gate(pending_dispatch, [this] {
@ -350,11 +336,22 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
::decode(saddr, p);
::decode(caddr, p);
ceph_assert(p.end());
validate_peer_addr(saddr, conn.peer_addr);
conn.side = SocketConnection::side_t::connector;
conn.socket_port = caddr.get_port();
return messenger.learned_addr(caddr);
if (saddr != conn.peer_addr) {
logger().error("{} my peer_addr {} doesn't match what peer advertized {}",
conn, conn.peer_addr, saddr);
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
conn.set_ephemeral_port(caddr.get_port(),
SocketConnection::side_t::connector);
if (unlikely(caddr.is_msgr2())) {
logger().warn("{} peer sent a v2 address for me: {}",
conn, caddr);
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
caddr.set_type(entity_addr_t::TYPE_LEGACY);
return messenger.learned_addr(caddr, conn);
}).then([this] {
// encode/send client's handshake header
bufferlist bl;
@ -537,7 +534,14 @@ seastar::future<stop_t> ProtocolV1::repeat_handle_connect()
.then([this](bufferlist bl) {
auto p = bl.cbegin();
::decode(h.connect, p);
conn.peer_type = h.connect.host_type;
conn.set_peer_type(h.connect.host_type);
conn.policy = messenger.get_policy(h.connect.host_type);
if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) {
logger().error("{} we don't know how to reconnect to peer {}",
conn, conn.target_addr);
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
return socket->read(h.connect.authorizer_len);
}).then([this] (bufferlist authorizer) {
memset(&h.reply, 0, sizeof(h.reply));
@ -606,21 +610,23 @@ void ProtocolV1::start_accept(SocketFRef&& sock,
set_write_state(write_state_t::delay);
ceph_assert(!socket);
conn.peer_addr.u = _peer_addr.u;
conn.peer_addr.set_port(0);
conn.side = SocketConnection::side_t::acceptor;
conn.socket_port = _peer_addr.get_port();
// until we know better
conn.target_addr = _peer_addr;
conn.set_ephemeral_port(_peer_addr.get_port(),
SocketConnection::side_t::acceptor);
socket = std::move(sock);
messenger.accept_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
seastar::with_gate(pending_dispatch, [this, _peer_addr] {
// encode/send server's handshake header
bufferlist bl;
bl.append(buffer::create_static(banner_size, banner));
::encode(messenger.get_myaddr(), bl, 0);
::encode(_peer_addr, bl, 0);
return socket->write_flush(std::move(bl))
.then([this] {
seastar::with_gate(pending_dispatch, [this] {
// stop learning my_addr before sending it out, so it won't change
return messenger.learned_addr(messenger.get_myaddr(), conn).then([this] {
// encode/send server's handshake header
bufferlist bl;
bl.append(buffer::create_static(banner_size, banner));
::encode(messenger.get_myaddr(), bl, 0);
::encode(conn.target_addr, bl, 0);
return socket->write_flush(std::move(bl));
}).then([this] {
// read client's handshake header and connect request
return socket->read(client_header_size);
}).then([this] (bufferlist bl) {
@ -629,9 +635,18 @@ void ProtocolV1::start_accept(SocketFRef&& sock,
entity_addr_t addr;
::decode(addr, p);
ceph_assert(p.end());
conn.peer_addr.set_type(addr.get_type());
conn.peer_addr.set_port(addr.get_port());
conn.peer_addr.set_nonce(addr.get_nonce());
if ((addr.is_legacy() || addr.is_any()) &&
addr.is_same_host(conn.target_addr)) {
// good
} else {
logger().error("{} peer advertized an invalid peer_addr: {},"
" which should be v1 and the same host with {}.",
conn, addr, conn.peer_addr);
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
conn.peer_addr = addr;
conn.target_addr = conn.peer_addr;
return seastar::repeat([this] {
return repeat_handle_connect();
});
@ -699,6 +714,8 @@ ceph::bufferlist ProtocolV1::do_sweep_messages(
if (session_security) {
session_security->sign_message(msg.get());
}
logger().debug("{} --> #{} === {} ({})",
conn, msg->get_seq(), *msg, msg->get_type());
bl.append(CEPH_MSGR_TAG_MSG);
bl.append((const char*)&header, sizeof(header));
bl.append(msg->get_payload());
@ -819,8 +836,8 @@ seastar::future<> ProtocolV1::read_message()
// start dispatch, ignoring exceptions from the application layer
seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
logger().debug("{} <= {}@{} === {}", messenger,
msg->get_source(), conn.peer_addr, *msg);
logger().debug("{} <== #{} === {} ({})",
conn, msg->get_seq(), *msg, msg->get_type());
return dispatcher.ms_dispatch(&conn, std::move(msg))
.handle_exception([this] (std::exception_ptr eptr) {
logger().error("{} ms_dispatch caught exception: {}", conn, eptr);

File diff suppressed because it is too large Load Diff

View File

@ -62,7 +62,6 @@ class ProtocolV2 final : public Protocol {
void trigger_state(state_t state, write_state_t write_state, bool reentrant);
entity_name_t peer_name;
uint64_t connection_features = 0;
uint64_t peer_required_features = 0;

View File

@ -67,6 +67,7 @@ class Socket
seastar::socket_address paddr) {
entity_addr_t peer_addr;
peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
peer_addr.set_type(entity_addr_t::TYPE_ANY);
return seastar::make_ready_future<SocketFRef, entity_addr_t>(
seastar::make_foreign(std::make_unique<Socket>(std::move(socket),
construct_tag{})),

View File

@ -121,12 +121,12 @@ seastar::shard_id SocketConnection::shard_id() const {
void SocketConnection::print(ostream& out) const {
messenger.print(out);
if (side == side_t::none) {
out << " >> " << peer_addr;
out << " >> " << get_peer_name() << " " << peer_addr;
} else if (side == side_t::acceptor) {
out << " >> " << peer_addr
<< "@" << socket_port;
out << " >> " << get_peer_name() << " " << peer_addr
<< "@" << ephemeral_port;
} else { // side == side_t::connector
out << "@" << socket_port
<< " >> " << peer_addr;
out << "@" << ephemeral_port
<< " >> " << get_peer_name() << " " << peer_addr;
}
}

View File

@ -32,15 +32,19 @@ class SocketConnection : public Connection {
SocketMessenger& messenger;
std::unique_ptr<Protocol> protocol;
// if acceptor side, socket_port is different from peer_addr.get_port();
// if connector side, socket_port is different from my_addr.get_port().
// if acceptor side, ephemeral_port is different from peer_addr.get_port();
// if connector side, ephemeral_port is different from my_addr.get_port().
enum class side_t {
none,
acceptor,
connector
};
side_t side = side_t::none;
uint16_t socket_port = 0;
uint16_t ephemeral_port = 0;
void set_ephemeral_port(uint16_t port, side_t _side) {
ephemeral_port = port;
side = _side;
}
ceph::net::Policy<ceph::thread::Throttle> policy;
uint64_t features;
@ -74,10 +78,6 @@ class SocketConnection : public Connection {
Messenger* get_messenger() const override;
int get_peer_type() const override {
return peer_type;
}
seastar::future<bool> is_connected() override;
seastar::future<> send(MessageRef msg) override;

View File

@ -59,7 +59,7 @@ seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs)
for (auto& addr : my_addrs.v) {
addr.nonce = nonce;
}
logger().info("listening on {}", my_addrs.front().in4_addr());
logger().info("{} listening on {}", *this, my_addrs.front().in4_addr());
return container().invoke_on_all([my_addrs](auto& msgr) {
msgr.do_bind(my_addrs);
}).handle_exception_type([this] (const std::system_error& e) {
@ -112,6 +112,10 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp) {
seastar::future<ceph::net::ConnectionXRef>
SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
{
// make sure we connect to a valid peer_addr
ceph_assert(peer_addr.is_legacy() || peer_addr.is_msgr2());
ceph_assert(peer_addr.get_port() > 0);
auto shard = locate_shard(peer_addr);
return container().invoke_on(shard, [peer_addr, peer_type](auto& msgr) {
return msgr.do_connect(peer_addr, peer_type);
@ -154,6 +158,10 @@ seastar::future<> SocketMessenger::do_start(Dispatcher *disp)
// start listening if bind() was called
if (listener) {
// make sure we have already bound to a valid address
ceph_assert(get_myaddr().is_legacy() || get_myaddr().is_msgr2());
ceph_assert(get_myaddr().get_port() > 0);
seastar::keep_doing([this] {
return Socket::accept(*listener)
.then([this] (SocketFRef socket,
@ -221,19 +229,68 @@ seastar::future<> SocketMessenger::do_shutdown()
});
}
seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me, const SocketConnection& conn)
{
if (!get_myaddr().is_blank_ip()) {
// already learned or binded
return seastar::now();
}
// make sure we there's no racing to learn address from peer
return container().invoke_on(0, [peer_addr_for_me, &conn] (auto& msgr) {
if (!msgr.need_addr) {
if ((!msgr.get_myaddr().is_any() &&
msgr.get_myaddr().get_type() != peer_addr_for_me.get_type()) ||
msgr.get_myaddr().get_family() != peer_addr_for_me.get_family() ||
!msgr.get_myaddr().is_same_host(peer_addr_for_me)) {
logger().warn("{} peer_addr_for_me {} type/family/IP doesn't match myaddr {}",
conn, peer_addr_for_me, msgr.get_myaddr());
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
return seastar::now();
}
msgr.need_addr = false;
// Only learn IP address if blank.
entity_addr_t addr = get_myaddr();
addr.u = peer_addr_for_me.u;
addr.set_type(peer_addr_for_me.get_type());
addr.set_port(get_myaddr().get_port());
return set_myaddrs(entity_addrvec_t{addr});
if (msgr.get_myaddr().get_type() == entity_addr_t::TYPE_NONE) {
// Not bound
entity_addr_t addr = peer_addr_for_me;
addr.set_type(entity_addr_t::TYPE_ANY);
addr.set_port(0);
return msgr.set_myaddrs(entity_addrvec_t{addr}
).then([&msgr, &conn, peer_addr_for_me] {
logger().info("{} learned myaddr={} (unbound) from {}",
conn, msgr.get_myaddr(), peer_addr_for_me);
});
} else {
// Already bound
if (!msgr.get_myaddr().is_any() &&
msgr.get_myaddr().get_type() != peer_addr_for_me.get_type()) {
logger().warn("{} peer_addr_for_me {} type doesn't match myaddr {}",
conn, peer_addr_for_me, msgr.get_myaddr());
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
if (msgr.get_myaddr().get_family() != peer_addr_for_me.get_family()) {
logger().warn("{} peer_addr_for_me {} family doesn't match myaddr {}",
conn, peer_addr_for_me, msgr.get_myaddr());
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
if (msgr.get_myaddr().is_blank_ip()) {
entity_addr_t addr = peer_addr_for_me;
addr.set_type(msgr.get_myaddr().get_type());
addr.set_port(msgr.get_myaddr().get_port());
return msgr.set_myaddrs(entity_addrvec_t{addr}
).then([&msgr, &conn, peer_addr_for_me] {
logger().info("{} learned myaddr={} (blank IP) from {}",
conn, msgr.get_myaddr(), peer_addr_for_me);
});
} else if (!msgr.get_myaddr().is_same_host(peer_addr_for_me)) {
logger().warn("{} peer_addr_for_me {} IP doesn't match myaddr {}",
conn, peer_addr_for_me, msgr.get_myaddr());
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
} else {
return seastar::now();
}
}
});
}
SocketPolicy SocketMessenger::get_policy(entity_type_t peer_type) const

View File

@ -40,6 +40,8 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_
// Distinguish messengers with meaningful names for debugging
const std::string logic_name;
const uint32_t nonce;
// specifying we haven't learned our addr; set false when we find it.
bool need_addr = true;
seastar::future<> accept(seastar::connected_socket socket,
seastar::socket_address paddr);
@ -106,7 +108,8 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_
void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override;
public:
seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me);
seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me,
const SocketConnection& conn);
SocketConnectionRef lookup_conn(const entity_addr_t& addr);
void accept_conn(SocketConnectionRef);

View File

@ -40,6 +40,12 @@ seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs,
for (auto& addr : boost::join(front_addrs.v, back_addrs.v)) {
addr.set_port(0);
}
using ceph::net::SocketPolicy;
front_msgr.set_policy(entity_name_t::TYPE_OSD,
SocketPolicy::stateless_server(0));
back_msgr.set_policy(entity_name_t::TYPE_OSD,
SocketPolicy::stateless_server(0));
return seastar::when_all_succeed(start_messenger(front_msgr, front_addrs),
start_messenger(back_msgr, back_addrs))
.then([this] {

View File

@ -198,6 +198,29 @@ seastar::future<> OSD::start()
osdmap = std::move(map);
return load_pgs();
}).then([this] {
uint64_t osd_required =
CEPH_FEATURE_UID |
CEPH_FEATURE_PGID64 |
CEPH_FEATURE_OSDENC;
using ceph::net::SocketPolicy;
public_msgr.set_default_policy(SocketPolicy::stateless_server(0));
public_msgr.set_policy(entity_name_t::TYPE_MON,
SocketPolicy::lossy_client(osd_required));
public_msgr.set_policy(entity_name_t::TYPE_MGR,
SocketPolicy::lossy_client(osd_required));
public_msgr.set_policy(entity_name_t::TYPE_OSD,
SocketPolicy::stateless_server(0));
cluster_msgr.set_default_policy(SocketPolicy::stateless_server(0));
cluster_msgr.set_policy(entity_name_t::TYPE_MON,
SocketPolicy::lossy_client(0));
cluster_msgr.set_policy(entity_name_t::TYPE_OSD,
SocketPolicy::lossless_peer(osd_required));
cluster_msgr.set_policy(entity_name_t::TYPE_CLIENT,
SocketPolicy::stateless_server(0));
dispatchers.push_front(this);
dispatchers.push_front(monc.get());
dispatchers.push_front(mgrc.get());

View File

@ -289,7 +289,6 @@ static seastar::future<> run(
return seastar::now();
}
seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override {
logger().info("{}: connected", *conn);
conn_stats.connected_time = mono_clock::now();
return seastar::now();
}

View File

@ -52,7 +52,7 @@ struct Server {
});
}
seastar::future<ceph::net::msgr_tag_t, bufferlist>
ms_verify_authorizer(peer_type_t peer_type,
ms_verify_authorizer(entity_type_t peer_type,
auth_proto_t protocol,
bufferlist& auth) override {
return seastar::make_ready_future<ceph::net::msgr_tag_t, bufferlist>(

View File

@ -115,7 +115,6 @@ static seastar::future<> test_echo(unsigned rounds,
return seastar::now();
}
seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override {
logger().info("{}: connected to {}", *conn, conn->get_peer_addr());
auto session = seastar::make_shared<PingSession>();
auto [i, added] = sessions.emplace(conn.get(), session);
std::ignore = i;