crimson/net: apply strict checks when handle addresses

* Apply strict checks at each step of addresses exchange, make sure we
  fail early when notice inconsistent addresses sent from peer;
* Throw error::bad_peer_address for address validation from peer;
* Stop learning my address before sending it out for peer as identity;
* Fix potential racing when try to learn my address by submit to core 0;

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
This commit is contained in:
Yingxin Cheng 2019-07-16 16:53:40 +08:00
parent 1466c65bfd
commit cb371d7d97
4 changed files with 162 additions and 77 deletions

View File

@ -84,24 +84,6 @@ void validate_banner(bufferlist::const_iterator& p)
}
}
// make sure that we agree with the peer about its address
void validate_peer_addr(const entity_addr_t& addr,
const entity_addr_t& expected)
{
if (addr == expected) {
return;
}
// ok if server bound anonymously, as long as port/nonce match
if (addr.is_blank_ip() &&
addr.get_port() == expected.get_port() &&
addr.get_nonce() == expected.get_nonce()) {
return;
} else {
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
}
// return a static bufferptr to the given object
template <typename T>
bufferptr create_static(T& obj)
@ -354,10 +336,22 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
::decode(saddr, p);
::decode(caddr, p);
ceph_assert(p.end());
validate_peer_addr(saddr, conn.peer_addr);
if (saddr != conn.peer_addr) {
logger().error("{} my peer_addr {} doesn't match what peer advertized {}",
conn, conn.peer_addr, saddr);
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
conn.set_ephemeral_port(caddr.get_port(),
SocketConnection::side_t::connector);
return messenger.learned_addr(caddr);
if (unlikely(caddr.is_msgr2())) {
logger().warn("{} peer sent a v2 address for me: {}",
conn, caddr);
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
caddr.set_type(entity_addr_t::TYPE_LEGACY);
return messenger.learned_addr(caddr, conn);
}).then([this] {
// encode/send client's handshake header
bufferlist bl;
@ -624,13 +618,15 @@ void ProtocolV1::start_accept(SocketFRef&& sock,
messenger.accept_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
seastar::with_gate(pending_dispatch, [this] {
// encode/send server's handshake header
bufferlist bl;
bl.append(buffer::create_static(banner_size, banner));
::encode(messenger.get_myaddr(), bl, 0);
::encode(conn.target_addr, bl, 0);
return socket->write_flush(std::move(bl))
.then([this] {
// stop learning my_addr before sending it out, so it won't change
return messenger.learned_addr(messenger.get_myaddr(), conn).then([this] {
// encode/send server's handshake header
bufferlist bl;
bl.append(buffer::create_static(banner_size, banner));
::encode(messenger.get_myaddr(), bl, 0);
::encode(conn.target_addr, bl, 0);
return socket->write_flush(std::move(bl));
}).then([this] {
// read client's handshake header and connect request
return socket->read(client_header_size);
}).then([this] (bufferlist bl) {
@ -639,6 +635,16 @@ void ProtocolV1::start_accept(SocketFRef&& sock,
entity_addr_t addr;
::decode(addr, p);
ceph_assert(p.end());
if ((addr.is_legacy() || addr.is_any()) &&
addr.is_same_host(conn.target_addr)) {
// good
} else {
logger().error("{} peer advertized an invalid peer_addr: {},"
" which should be v1 and the same host with {}.",
conn, addr, conn.peer_addr);
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
conn.peer_addr = addr;
conn.target_addr = conn.peer_addr;
return seastar::repeat([this] {

View File

@ -653,35 +653,29 @@ seastar::future<bool> ProtocolV2::client_connect()
client_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
}
// TODO: get socket address and learn(not supported by seastar)
entity_addr_t a;
a.u.sa.sa_family = AF_INET;
a.set_type(entity_addr_t::TYPE_MSGR2);
return messenger.learned_addr(a).then([this] {
uint64_t flags = 0;
if (conn.policy.lossy) {
flags |= CEPH_MSG_CONNECT_LOSSY;
}
uint64_t flags = 0;
if (conn.policy.lossy) {
flags |= CEPH_MSG_CONNECT_LOSSY;
}
auto client_ident = ClientIdentFrame::Encode(
messenger.get_myaddrs(),
conn.target_addr,
messenger.get_myname().num(),
global_seq,
conn.policy.features_supported,
conn.policy.features_required | msgr2_required, flags,
client_cookie);
auto client_ident = ClientIdentFrame::Encode(
messenger.get_myaddrs(),
conn.target_addr,
messenger.get_myname().num(),
global_seq,
conn.policy.features_supported,
conn.policy.features_required | msgr2_required, flags,
client_cookie);
logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={},"
" gs={}, features_supported={}, features_required={},"
" flags={}, cookie={}",
conn, messenger.get_myaddrs(), conn.target_addr,
messenger.get_myname().num(), global_seq,
conn.policy.features_supported,
conn.policy.features_required | msgr2_required,
flags, client_cookie);
return write_frame(client_ident);
}).then([this] {
logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={},"
" gs={}, features_supported={}, features_required={},"
" flags={}, cookie={}",
conn, messenger.get_myaddrs(), conn.target_addr,
messenger.get_myname().num(), global_seq,
conn.policy.features_supported,
conn.policy.features_required | msgr2_required,
flags, client_cookie);
return write_frame(client_ident).then([this] {
return read_main_preamble();
}).then([this] (Tag tag) {
switch (tag) {
@ -719,13 +713,19 @@ seastar::future<bool> ProtocolV2::client_connect()
if (!server_ident.addrs().contains(conn.target_addr)) {
logger().warn("{} peer identifies as {}, does not include {}",
conn, server_ident.addrs(), conn.target_addr);
abort_in_fault();
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
server_cookie = server_ident.cookie();
// TODO: change peer_addr to entity_addrvec_t
ceph_assert(conn.peer_addr == server_ident.addrs().front());
if (server_ident.addrs().front() != conn.peer_addr) {
logger().warn("{} peer advertises as {}, does not match {}",
conn, server_ident.addrs(), conn.peer_addr);
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
conn.set_peer_id(server_ident.gid());
conn.set_features(server_ident.supported_features() &
conn.policy.features_supported);
@ -862,12 +862,14 @@ void ProtocolV2::execute_connecting()
}
conn.set_ephemeral_port(_my_addr_from_peer.get_port(),
SocketConnection::side_t::connector);
if (messenger.get_myaddrs().empty() ||
messenger.get_myaddrs().front().is_blank_ip()) {
return messenger.learned_addr(_my_addr_from_peer);
} else {
return seastar::now();
if (unlikely(_my_addr_from_peer.is_legacy())) {
logger().warn("{} peer sent a legacy address for me: {}",
conn, _my_addr_from_peer);
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
_my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
return messenger.learned_addr(_my_addr_from_peer, conn);
}).then([this] {
return client_auth();
}).then([this] {
@ -1065,15 +1067,26 @@ seastar::future<bool> ProtocolV2::server_connect()
if (client_ident.addrs().empty() ||
client_ident.addrs().front() == entity_addr_t()) {
logger().warn("{} oops, client_ident.addrs() is empty", conn);
abort_in_fault();
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
if (!messenger.get_myaddrs().contains(client_ident.target_addr())) {
logger().warn("{} peer is trying to reach {} which is not us ({})",
conn, client_ident.target_addr(), messenger.get_myaddrs());
abort_in_fault();
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
// TODO: change peer_addr to entity_addrvec_t
entity_addr_t paddr = client_ident.addrs().front();
if ((paddr.is_msgr2() || paddr.is_any()) &&
paddr.is_same_host(conn.target_addr)) {
// good
} else {
logger().warn("{} peer's address {} is not v2 or not the same host with {}",
conn, paddr, conn.target_addr);
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
conn.peer_addr = paddr;
logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr);
conn.target_addr = conn.peer_addr;
@ -1306,6 +1319,15 @@ void ProtocolV2::execute_accepting()
conn, ceph_entity_type_name(_peer_type),
conn.policy.lossy, conn.policy.server,
conn.policy.standby, conn.policy.resetcheck);
if (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce()) {
logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
conn, _my_addr_from_peer, messenger.get_myaddr());
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
return messenger.learned_addr(_my_addr_from_peer, conn);
}).then([this] {
return server_auth();
}).then([this] {
return read_main_preamble();

View File

@ -112,6 +112,10 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp) {
seastar::future<ceph::net::ConnectionXRef>
SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
{
// make sure we connect to a valid peer_addr
ceph_assert(peer_addr.is_legacy() || peer_addr.is_msgr2());
ceph_assert(peer_addr.get_port() > 0);
auto shard = locate_shard(peer_addr);
return container().invoke_on(shard, [peer_addr, peer_type](auto& msgr) {
return msgr.do_connect(peer_addr, peer_type);
@ -154,6 +158,10 @@ seastar::future<> SocketMessenger::do_start(Dispatcher *disp)
// start listening if bind() was called
if (listener) {
// make sure we have already bound to a valid address
ceph_assert(get_myaddr().is_legacy() || get_myaddr().is_msgr2());
ceph_assert(get_myaddr().get_port() > 0);
seastar::keep_doing([this] {
return Socket::accept(*listener)
.then([this] (SocketFRef socket,
@ -221,21 +229,67 @@ seastar::future<> SocketMessenger::do_shutdown()
});
}
seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me, const SocketConnection& conn)
{
if (!get_myaddr().is_blank_ip()) {
// already learned or binded
return seastar::now();
}
// make sure we there's no racing to learn address from peer
return container().invoke_on(0, [peer_addr_for_me, &conn] (auto& msgr) {
if (!msgr.need_addr) {
if ((!msgr.get_myaddr().is_any() &&
msgr.get_myaddr().get_type() != peer_addr_for_me.get_type()) ||
msgr.get_myaddr().get_family() != peer_addr_for_me.get_family() ||
!msgr.get_myaddr().is_same_host(peer_addr_for_me)) {
logger().warn("{} peer_addr_for_me {} type/family/IP doesn't match myaddr {}",
conn, peer_addr_for_me, msgr.get_myaddr());
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
return seastar::now();
}
msgr.need_addr = false;
// Only learn IP address if blank.
entity_addr_t addr = get_myaddr();
addr.u = peer_addr_for_me.u;
addr.set_type(peer_addr_for_me.get_type());
addr.set_port(get_myaddr().get_port());
return set_myaddrs(entity_addrvec_t{addr}).then([this, peer_addr_for_me] {
logger().debug("{} learned myaddr={} from {}",
*this, get_myaddr(), peer_addr_for_me);
if (msgr.get_myaddr().get_type() == entity_addr_t::TYPE_NONE) {
// Not bound
entity_addr_t addr = peer_addr_for_me;
addr.set_type(entity_addr_t::TYPE_ANY);
addr.set_port(0);
return msgr.set_myaddrs(entity_addrvec_t{addr}
).then([&msgr, &conn, peer_addr_for_me] {
logger().info("{} learned myaddr={} (unbound) from {}",
conn, msgr.get_myaddr(), peer_addr_for_me);
});
} else {
// Already bound
if (!msgr.get_myaddr().is_any() &&
msgr.get_myaddr().get_type() != peer_addr_for_me.get_type()) {
logger().warn("{} peer_addr_for_me {} type doesn't match myaddr {}",
conn, peer_addr_for_me, msgr.get_myaddr());
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
if (msgr.get_myaddr().get_family() != peer_addr_for_me.get_family()) {
logger().warn("{} peer_addr_for_me {} family doesn't match myaddr {}",
conn, peer_addr_for_me, msgr.get_myaddr());
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
}
if (msgr.get_myaddr().is_blank_ip()) {
entity_addr_t addr = peer_addr_for_me;
addr.set_type(msgr.get_myaddr().get_type());
addr.set_port(msgr.get_myaddr().get_port());
return msgr.set_myaddrs(entity_addrvec_t{addr}
).then([&msgr, &conn, peer_addr_for_me] {
logger().info("{} learned myaddr={} (blank IP) from {}",
conn, msgr.get_myaddr(), peer_addr_for_me);
});
} else if (!msgr.get_myaddr().is_same_host(peer_addr_for_me)) {
logger().warn("{} peer_addr_for_me {} IP doesn't match myaddr {}",
conn, peer_addr_for_me, msgr.get_myaddr());
throw std::system_error(
make_error_code(ceph::net::error::bad_peer_address));
} else {
return seastar::now();
}
}
});
}

View File

@ -40,6 +40,8 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_
// Distinguish messengers with meaningful names for debugging
const std::string logic_name;
const uint32_t nonce;
// specifying we haven't learned our addr; set false when we find it.
bool need_addr = true;
seastar::future<> accept(seastar::connected_socket socket,
seastar::socket_address paddr);
@ -106,7 +108,8 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_
void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override;
public:
seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me);
seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me,
const SocketConnection& conn);
SocketConnectionRef lookup_conn(const entity_addr_t& addr);
void accept_conn(SocketConnectionRef);