msg/Connection: protect peer_addrs with safe_item_history<>

The peer_addrs can be updated during the initial connection handshake,
but we don't want users (e.g., dout()) to race with an update and
wander off into bad memory.

We use the same strategy for Messenger's my_addrs.

Fixes: http://tracker.ceph.com/issues/37807
Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2019-01-07 20:38:48 -06:00
parent d49e2e9075
commit f1c9bd1766
7 changed files with 26 additions and 25 deletions

View File

@ -631,7 +631,7 @@ void MDSDaemon::handle_command(const MCommand::const_ref &m)
if (!session->auth_caps.allow_all()) {
dout(1) << __func__
<< ": received command from client without `tell` capability: "
<< m->get_connection()->peer_addrs << dendl;
<< *m->get_connection()->peer_addrs << dendl;
ss << "permission denied";
r = -EPERM;

View File

@ -28,6 +28,7 @@
#include "include/ceph_assert.h" // Because intusive_ptr clobbers our assert...
#include "include/buffer.h"
#include "include/types.h"
#include "common/item_history.h"
#include "msg/MessageRef.h"
@ -42,7 +43,7 @@ struct Connection : public RefCountedObject {
Messenger *msgr;
RefCountedPtr priv;
int peer_type;
entity_addrvec_t peer_addrs;
safe_item_history<entity_addrvec_t> peer_addrs;
utime_t last_keepalive, last_keepalive_ack;
private:
uint64_t features;
@ -177,10 +178,10 @@ public:
virtual entity_addr_t get_peer_socket_addr() const = 0;
entity_addr_t get_peer_addr() const {
return peer_addrs.front();
return peer_addrs->front();
}
const entity_addrvec_t& get_peer_addrs() const {
return peer_addrs;
return *peer_addrs;
}
void set_peer_addr(const entity_addr_t& a) {
peer_addrs = entity_addrvec_t(a);

View File

@ -37,7 +37,7 @@
#define dout_prefix _conn_prefix(_dout)
ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
return *_dout << "-- " << async_msgr->get_myaddrs() << " >> "
<< peer_addrs << " conn(" << this
<< *peer_addrs << " conn(" << this
<< (msgr2 ? " msgr2" : " legacy")
<< " :" << port
<< " s=" << get_state_name(state)

View File

@ -594,7 +594,7 @@ AsyncConnectionRef AsyncMessenger::create_connect(
conn->connect(addrs, type, target);
ceph_assert(!conns.count(addrs));
ldout(cct, 10) << __func__ << " " << conn << " " << addrs << " "
<< conn->peer_addrs << dendl;
<< *conn->peer_addrs << dendl;
conns[addrs] = conn;
w->get_perf_counter()->inc(l_msgr_active_connections);
@ -816,7 +816,7 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect) const
int AsyncMessenger::accept_conn(AsyncConnectionRef conn)
{
Mutex::Locker l(lock);
auto it = conns.find(conn->peer_addrs);
auto it = conns.find(*conn->peer_addrs);
if (it != conns.end()) {
AsyncConnectionRef existing = it->second;
@ -830,8 +830,8 @@ int AsyncMessenger::accept_conn(AsyncConnectionRef conn)
return -1;
}
}
ldout(cct, 10) << __func__ << " " << conn << " " << conn->peer_addrs << dendl;
conns[conn->peer_addrs] = conn;
ldout(cct, 10) << __func__ << " " << conn << " " << *conn->peer_addrs << dendl;
conns[*conn->peer_addrs] = conn;
conn->get_perf_counter()->inc(l_msgr_active_connections);
accepting_conns.erase(conn);
return 0;
@ -891,7 +891,7 @@ int AsyncMessenger::reap_dead()
auto it = deleted_conns.begin();
AsyncConnectionRef p = *it;
ldout(cct, 5) << __func__ << " delete " << p << dendl;
auto conns_it = conns.find(p->peer_addrs);
auto conns_it = conns.find(*p->peer_addrs);
if (conns_it != conns.end() && conns_it->second == p)
conns.erase(conns_it);
accepting_conns.erase(p);

View File

@ -15,7 +15,7 @@
#define dout_prefix _conn_prefix(_dout)
ostream &ProtocolV1::_conn_prefix(std::ostream *_dout) {
return *_dout << "--1- " << messenger->get_myaddrs().legacy_addr() << " >> "
<< connection->peer_addrs.legacy_addr() << " conn("
<< connection->peer_addrs->legacy_addr() << " conn("
<< connection << (connection->msgr2 ? " msgr2" : " legacy")
<< " :" << connection->port << " s=" << get_state_name(state)
<< " pgs=" << peer_global_seq << " cs=" << connect_seq
@ -1356,7 +1356,7 @@ CtPtr ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) {
ldout(cct, 20) << __func__ << " connect read peer addr " << paddr
<< " on socket " << connection->cs.fd() << dendl;
entity_addr_t peer_addr = connection->peer_addrs.legacy_addr();
entity_addr_t peer_addr = connection->peer_addrs->legacy_addr();
if (peer_addr != paddr) {
if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() &&
peer_addr.get_nonce() == paddr.get_nonce()) {
@ -1953,7 +1953,7 @@ CtPtr ProtocolV1::handle_connect_message_2() {
ldout(cct, 10) << __func__ << " accept setting up session_security." << dendl;
// existing?
AsyncConnectionRef existing = messenger->lookup_conn(connection->peer_addrs);
AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
connection->inject_delay();
@ -2082,7 +2082,7 @@ CtPtr ProtocolV1::handle_connect_message_2() {
}
// connection race?
if (connection->peer_addrs.legacy_addr() < messenger->get_myaddr() ||
if (connection->peer_addrs->legacy_addr() < messenger->get_myaddr() ||
existing->policy.server) {
// incoming wins
ldout(cct, 10) << __func__ << " accept connection race, existing "
@ -2096,7 +2096,7 @@ CtPtr ProtocolV1::handle_connect_message_2() {
<< __func__ << " accept connection race, existing " << existing
<< ".cseq " << exproto->connect_seq
<< " == " << connect_msg.connect_seq << ", sending WAIT" << dendl;
ceph_assert(connection->peer_addrs.legacy_addr() >
ceph_assert(connection->peer_addrs->legacy_addr() >
messenger->get_myaddr());
existing->lock.unlock();
return send_connect_message_reply(CEPH_MSGR_TAG_WAIT, reply,
@ -2375,7 +2375,7 @@ CtPtr ProtocolV1::open(ceph_msg_connect_reply &reply,
replacing = false;
if (r < 0) {
ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
<< connection->peer_addrs.legacy_addr()
<< connection->peer_addrs->legacy_addr()
<< " just fail later one(this)" << dendl;
ldout(cct, 10) << "accept fault after register" << dendl;
connection->inject_delay();

View File

@ -15,7 +15,7 @@
#define dout_prefix _conn_prefix(_dout)
ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
return *_dout << "--2- " << messenger->get_myaddrs() << " >> "
<< connection->peer_addrs << " conn("
<< *connection->peer_addrs << " conn("
<< connection << (connection->msgr2 ? " msgr2" : " legacy")
<< " :" << connection->port << " s=" << get_state_name(state)
<< " pgs=" << peer_global_seq << " cs=" << connect_seq
@ -1376,10 +1376,10 @@ CtPtr ProtocolV2::handle_server_addrvec_and_identify(char *buffer, int r) {
// may be trying to connect to a v2 addr, and the remote may
// identify themselves by several other addrs as well. This happens
// with mon discovery.
if (!connection->peer_addrs.contains(peer_addr)) {
if (!connection->peer_addrs->contains(peer_addr)) {
ldout(cct, 10) << __func__ << " server claims to be " << peer_addr
<< " (of " << paddrs << "), but we are trying to reach "
<< connection->peer_addrs << dendl;
<< *connection->peer_addrs << dendl;
return _fault();
}
@ -2000,9 +2000,9 @@ CtPtr ProtocolV2::handle_connect_message_2() {
ldout(cct, 10) << __func__ << " accept setting up session_security." << dendl;
// existing?
AsyncConnectionRef existing = messenger->lookup_conn(connection->peer_addrs);
AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
ldout(cct, 10) << __func__ << " existing " << existing
<< " on " << connection->peer_addrs << dendl;
<< " on " << *connection->peer_addrs << dendl;
connection->inject_delay();
connection->lock.lock();
@ -2130,7 +2130,7 @@ CtPtr ProtocolV2::handle_connect_message_2() {
}
// connection race?
if (connection->peer_addrs.legacy_addr() < messenger->get_myaddr() ||
if (connection->peer_addrs->legacy_addr() < messenger->get_myaddr() ||
existing->policy.server) {
// incoming wins
ldout(cct, 10) << __func__ << " accept connection race, existing "
@ -2144,7 +2144,7 @@ CtPtr ProtocolV2::handle_connect_message_2() {
<< __func__ << " accept connection race, existing " << existing
<< ".cseq " << exproto->connect_seq
<< " == " << connect_msg.connect_seq << ", sending WAIT" << dendl;
ceph_assert(connection->peer_addrs.legacy_addr() >
ceph_assert(connection->peer_addrs->legacy_addr() >
messenger->get_myaddr());
existing->lock.unlock();
return send_connect_message_reply(CEPH_MSGR_TAG_WAIT, reply,
@ -2414,7 +2414,7 @@ CtPtr ProtocolV2::open(ceph_msg_connect_reply &reply,
replacing = false;
if (r < 0) {
ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
<< connection->peer_addrs.legacy_addr()
<< connection->peer_addrs->legacy_addr()
<< " just fail later one(this)" << dendl;
ldout(cct, 10) << "accept fault after register" << dendl;
connection->inject_delay();

View File

@ -49,7 +49,7 @@ public:
void mark_disposable() override;
entity_addr_t get_peer_socket_addr() const override {
return peer_addrs.front();
return peer_addrs->front();
}
}; /* PipeConnection */