Merge PR #25827 into master

* refs/pull/25827/head:
	test/cls_rgw: fix signed/unsigned warning
	test/msgr/test_msgr: fix signed/unsigned warning
	msg/Connection: protect peer_addrs with safe_item_history<>

Reviewed-by: Gregory Farnum <gfarnum@redhat.com>
Reviewed-by: Greg Farnum <gfarnum@redhat.com>
This commit is contained in:
Sage Weil 2019-01-10 16:29:10 -06:00
commit 888bf53235
9 changed files with 29 additions and 28 deletions

View File

@ -631,7 +631,7 @@ void MDSDaemon::handle_command(const MCommand::const_ref &m)
if (!session->auth_caps.allow_all()) {
dout(1) << __func__
<< ": received command from client without `tell` capability: "
<< m->get_connection()->peer_addrs << dendl;
<< *m->get_connection()->peer_addrs << dendl;
ss << "permission denied";
r = -EPERM;

View File

@ -28,6 +28,7 @@
#include "include/ceph_assert.h" // Because intusive_ptr clobbers our assert...
#include "include/buffer.h"
#include "include/types.h"
#include "common/item_history.h"
#include "msg/MessageRef.h"
@ -42,7 +43,7 @@ struct Connection : public RefCountedObject {
Messenger *msgr;
RefCountedPtr priv;
int peer_type;
entity_addrvec_t peer_addrs;
safe_item_history<entity_addrvec_t> peer_addrs;
utime_t last_keepalive, last_keepalive_ack;
private:
uint64_t features;
@ -177,10 +178,10 @@ public:
virtual entity_addr_t get_peer_socket_addr() const = 0;
entity_addr_t get_peer_addr() const {
return peer_addrs.front();
return peer_addrs->front();
}
const entity_addrvec_t& get_peer_addrs() const {
return peer_addrs;
return *peer_addrs;
}
void set_peer_addr(const entity_addr_t& a) {
peer_addrs = entity_addrvec_t(a);

View File

@ -37,7 +37,7 @@
#define dout_prefix _conn_prefix(_dout)
ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
return *_dout << "-- " << async_msgr->get_myaddrs() << " >> "
<< peer_addrs << " conn(" << this
<< *peer_addrs << " conn(" << this
<< (msgr2 ? " msgr2" : " legacy")
<< " :" << port
<< " s=" << get_state_name(state)

View File

@ -594,7 +594,7 @@ AsyncConnectionRef AsyncMessenger::create_connect(
conn->connect(addrs, type, target);
ceph_assert(!conns.count(addrs));
ldout(cct, 10) << __func__ << " " << conn << " " << addrs << " "
<< conn->peer_addrs << dendl;
<< *conn->peer_addrs << dendl;
conns[addrs] = conn;
w->get_perf_counter()->inc(l_msgr_active_connections);
@ -816,7 +816,7 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect) const
int AsyncMessenger::accept_conn(AsyncConnectionRef conn)
{
Mutex::Locker l(lock);
auto it = conns.find(conn->peer_addrs);
auto it = conns.find(*conn->peer_addrs);
if (it != conns.end()) {
AsyncConnectionRef existing = it->second;
@ -830,8 +830,8 @@ int AsyncMessenger::accept_conn(AsyncConnectionRef conn)
return -1;
}
}
ldout(cct, 10) << __func__ << " " << conn << " " << conn->peer_addrs << dendl;
conns[conn->peer_addrs] = conn;
ldout(cct, 10) << __func__ << " " << conn << " " << *conn->peer_addrs << dendl;
conns[*conn->peer_addrs] = conn;
conn->get_perf_counter()->inc(l_msgr_active_connections);
accepting_conns.erase(conn);
return 0;
@ -891,7 +891,7 @@ int AsyncMessenger::reap_dead()
auto it = deleted_conns.begin();
AsyncConnectionRef p = *it;
ldout(cct, 5) << __func__ << " delete " << p << dendl;
auto conns_it = conns.find(p->peer_addrs);
auto conns_it = conns.find(*p->peer_addrs);
if (conns_it != conns.end() && conns_it->second == p)
conns.erase(conns_it);
accepting_conns.erase(p);

View File

@ -15,7 +15,7 @@
#define dout_prefix _conn_prefix(_dout)
ostream &ProtocolV1::_conn_prefix(std::ostream *_dout) {
return *_dout << "--1- " << messenger->get_myaddrs().legacy_addr() << " >> "
<< connection->peer_addrs.legacy_addr() << " conn("
<< connection->peer_addrs->legacy_addr() << " conn("
<< connection << (connection->msgr2 ? " msgr2" : " legacy")
<< " :" << connection->port << " s=" << get_state_name(state)
<< " pgs=" << peer_global_seq << " cs=" << connect_seq
@ -1356,7 +1356,7 @@ CtPtr ProtocolV1::handle_server_banner_and_identify(char *buffer, int r) {
ldout(cct, 20) << __func__ << " connect read peer addr " << paddr
<< " on socket " << connection->cs.fd() << dendl;
entity_addr_t peer_addr = connection->peer_addrs.legacy_addr();
entity_addr_t peer_addr = connection->peer_addrs->legacy_addr();
if (peer_addr != paddr) {
if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() &&
peer_addr.get_nonce() == paddr.get_nonce()) {
@ -1960,7 +1960,7 @@ CtPtr ProtocolV1::handle_connect_message_2() {
ldout(cct, 10) << __func__ << " accept setting up session_security." << dendl;
// existing?
AsyncConnectionRef existing = messenger->lookup_conn(connection->peer_addrs);
AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
connection->inject_delay();
@ -2089,7 +2089,7 @@ CtPtr ProtocolV1::handle_connect_message_2() {
}
// connection race?
if (connection->peer_addrs.legacy_addr() < messenger->get_myaddr() ||
if (connection->peer_addrs->legacy_addr() < messenger->get_myaddr() ||
existing->policy.server) {
// incoming wins
ldout(cct, 10) << __func__ << " accept connection race, existing "
@ -2103,7 +2103,7 @@ CtPtr ProtocolV1::handle_connect_message_2() {
<< __func__ << " accept connection race, existing " << existing
<< ".cseq " << exproto->connect_seq
<< " == " << connect_msg.connect_seq << ", sending WAIT" << dendl;
ceph_assert(connection->peer_addrs.legacy_addr() >
ceph_assert(connection->peer_addrs->legacy_addr() >
messenger->get_myaddr());
existing->lock.unlock();
return send_connect_message_reply(CEPH_MSGR_TAG_WAIT, reply,
@ -2382,7 +2382,7 @@ CtPtr ProtocolV1::open(ceph_msg_connect_reply &reply,
replacing = false;
if (r < 0) {
ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
<< connection->peer_addrs.legacy_addr()
<< connection->peer_addrs->legacy_addr()
<< " just fail later one(this)" << dendl;
ldout(cct, 10) << "accept fault after register" << dendl;
connection->inject_delay();

View File

@ -15,7 +15,7 @@
#define dout_prefix _conn_prefix(_dout)
ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
return *_dout << "--2- " << messenger->get_myaddrs() << " >> "
<< connection->peer_addrs << " conn("
<< *connection->peer_addrs << " conn("
<< connection << (connection->msgr2 ? " msgr2" : " legacy")
<< " :" << connection->port << " s=" << get_state_name(state)
<< " pgs=" << peer_global_seq << " cs=" << connect_seq
@ -1376,10 +1376,10 @@ CtPtr ProtocolV2::handle_server_addrvec_and_identify(char *buffer, int r) {
// may be trying to connect to a v2 addr, and the remote may
// identify themselves by several other addrs as well. This happens
// with mon discovery.
if (!connection->peer_addrs.contains(peer_addr)) {
if (!connection->peer_addrs->contains(peer_addr)) {
ldout(cct, 10) << __func__ << " server claims to be " << peer_addr
<< " (of " << paddrs << "), but we are trying to reach "
<< connection->peer_addrs << dendl;
<< *connection->peer_addrs << dendl;
return _fault();
}
@ -2007,9 +2007,9 @@ CtPtr ProtocolV2::handle_connect_message_2() {
ldout(cct, 10) << __func__ << " accept setting up session_security." << dendl;
// existing?
AsyncConnectionRef existing = messenger->lookup_conn(connection->peer_addrs);
AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
ldout(cct, 10) << __func__ << " existing " << existing
<< " on " << connection->peer_addrs << dendl;
<< " on " << *connection->peer_addrs << dendl;
connection->inject_delay();
connection->lock.lock();
@ -2137,7 +2137,7 @@ CtPtr ProtocolV2::handle_connect_message_2() {
}
// connection race?
if (connection->peer_addrs.legacy_addr() < messenger->get_myaddr() ||
if (connection->peer_addrs->legacy_addr() < messenger->get_myaddr() ||
existing->policy.server) {
// incoming wins
ldout(cct, 10) << __func__ << " accept connection race, existing "
@ -2151,7 +2151,7 @@ CtPtr ProtocolV2::handle_connect_message_2() {
<< __func__ << " accept connection race, existing " << existing
<< ".cseq " << exproto->connect_seq
<< " == " << connect_msg.connect_seq << ", sending WAIT" << dendl;
ceph_assert(connection->peer_addrs.legacy_addr() >
ceph_assert(connection->peer_addrs->legacy_addr() >
messenger->get_myaddr());
existing->lock.unlock();
return send_connect_message_reply(CEPH_MSGR_TAG_WAIT, reply,
@ -2421,7 +2421,7 @@ CtPtr ProtocolV2::open(ceph_msg_connect_reply &reply,
replacing = false;
if (r < 0) {
ldout(cct, 1) << __func__ << " existing race replacing process for addr = "
<< connection->peer_addrs.legacy_addr()
<< connection->peer_addrs->legacy_addr()
<< " just fail later one(this)" << dendl;
ldout(cct, 10) << "accept fault after register" << dendl;
connection->inject_delay();

View File

@ -49,7 +49,7 @@ public:
void mark_disposable() override;
entity_addr_t get_peer_socket_addr() const override {
return peer_addrs.front();
return peer_addrs->front();
}
}; /* PipeConnection */

View File

@ -868,7 +868,7 @@ TEST(cls_rgw, usage_basic)
ret = cls_rgw_usage_log_read(ioctx, oid, "", bucket1, start_epoch, end_epoch,
max_entries, read_iter, usage2, &truncated);
ASSERT_EQ(0, ret);
ASSERT_EQ(100, usage2.size());
ASSERT_EQ(100u, usage2.size());
// delete and read to assert that bucket option is valid for usage trim
ASSERT_EQ(0, cls_rgw_usage_log_trim(ioctx, oid, "", bucket1, start_epoch, end_epoch));
@ -876,7 +876,7 @@ TEST(cls_rgw, usage_basic)
ret = cls_rgw_usage_log_read(ioctx, oid, "", bucket1, start_epoch, end_epoch,
max_entries, read_iter, usage2, &truncated);
ASSERT_EQ(0, ret);
ASSERT_EQ(0, usage2.size());
ASSERT_EQ(0u, usage2.size());
ASSERT_EQ(0, cls_rgw_usage_log_trim(ioctx, oid, "", bucket2, start_epoch, end_epoch));
}

View File

@ -324,7 +324,7 @@ TEST_P(MessengerTest, SimpleMsgr2Test) {
cli_dispatcher.got_new = false;
}
ASSERT_TRUE(conn->is_connected());
ASSERT_EQ(1, static_cast<Session*>(conn->get_priv().get())->get_count());
ASSERT_EQ(1u, static_cast<Session*>(conn->get_priv().get())->get_count());
ASSERT_TRUE(conn->peer_is_osd());
// 2. test rebind port