msg/async: msgr2: send entity type and peer_address in Tag::HELLO frame

Signed-off-by: Ricardo Dias <rdias@suse.com>
This commit is contained in:
Ricardo Dias 2019-01-30 22:15:29 +00:00 committed by Sage Weil
parent db320dd096
commit 03bc0dcdda
3 changed files with 140 additions and 109 deletions

View File

@ -386,7 +386,7 @@ void AsyncConnection::process() {
ssize_t r = cs.is_connected();
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " reconnect failed to "
<< target_addr << dendl;
<< target_addr << dendl;
if (r == -ECONNREFUSED) {
ldout(async_msgr->cct, 2)
<< __func__ << " connection refused!" << dendl;
@ -479,6 +479,7 @@ void AsyncConnection::accept(ConnectedSocket socket,
std::lock_guard<std::mutex> l(lock);
cs = std::move(socket);
socket_addr = listen_addr;
target_addr = peer_addr; // until we know better
state = STATE_ACCEPTING;
protocol->accept();
// rescheduler connection in order to avoid lock dep

View File

@ -138,17 +138,20 @@ protected:
// this tuple is only used when decoding values from a payload buffer
std::tuple<Args...> _values;
// required only when signing and encryting payload, otherwise is null
// required for using econding/decoding features or when signing and
// encryting payload, otherwise is null
ProtocolV2 *protocol;
uint64_t features;
template <typename T>
inline void _encode_payload_each(T &t) {
if constexpr (std::is_same<T, bufferlist const>()) {
this->payload.claim_append((bufferlist &)t);
} else if constexpr (std::is_same<T, std::vector<uint32_t> const>()) {
encode((uint32_t)t.size(), this->payload, -1ll);
encode((uint32_t)t.size(), this->payload, features);
for (const auto &elem : t) {
encode(elem, this->payload, 0);
encode(elem, this->payload, features);
}
} else if constexpr (std::is_same<T, ceph_msg_header2 const>()) {
this->payload.append((char *)&t, sizeof(t));
@ -157,7 +160,7 @@ protected:
protocol->sign_payload(this->payload);
protocol->encrypt_payload(this->payload);
} else {
encode(t, this->payload, -1ll);
encode(t, this->payload, features);
}
}
@ -198,12 +201,12 @@ protected:
}
public:
PayloadFrame(const Args &... args) : protocol(nullptr) {
PayloadFrame(const Args &... args) : protocol(nullptr), features(0) {
(_encode_payload_each(args), ...);
}
PayloadFrame(ProtocolV2 *protocol, const Args &... args)
: protocol(protocol) {
: protocol(protocol), features(protocol->connection_features) {
(_encode_payload_each(args), ...);
}
@ -218,6 +221,16 @@ public:
}
};
struct HelloFrame : public PayloadFrame<HelloFrame,
uint8_t, // entity type
entity_addr_t> { // peer_addr
const ProtocolV2::Tag tag = ProtocolV2::Tag::HELLO;
using PayloadFrame::PayloadFrame;
inline uint8_t &entity_type() { return get_val<0>(); }
inline entity_addr_t &peer_addr() { return get_val<1>(); }
};
struct AuthRequestFrame
: public PayloadFrame<AuthRequestFrame, uint32_t, uint32_t, bufferlist> {
const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_REQUEST;
@ -299,19 +312,18 @@ struct ClientIdentFrame
struct ServerIdentFrame
: public SignedEncryptedFrame<ServerIdentFrame, entity_addrvec_t,
entity_addr_t, int64_t, uint64_t, uint64_t,
int64_t, uint64_t, uint64_t,
uint64_t, uint64_t, uint64_t> {
const ProtocolV2::Tag tag = ProtocolV2::Tag::IDENT;
using SignedEncryptedFrame::SignedEncryptedFrame;
inline entity_addrvec_t &addrs() { return get_val<0>(); }
inline entity_addr_t &peer_addr() { return get_val<1>(); }
inline int64_t &gid() { return get_val<2>(); }
inline uint64_t &global_seq() { return get_val<3>(); }
inline uint64_t &supported_features() { return get_val<4>(); }
inline uint64_t &required_features() { return get_val<5>(); }
inline uint64_t &flags() { return get_val<6>(); }
inline uint64_t &cookie() { return get_val<7>(); }
inline int64_t &gid() { return get_val<1>(); }
inline uint64_t &global_seq() { return get_val<2>(); }
inline uint64_t &supported_features() { return get_val<3>(); }
inline uint64_t &required_features() { return get_val<4>(); }
inline uint64_t &flags() { return get_val<5>(); }
inline uint64_t &cookie() { return get_val<6>(); }
};
struct ReconnectFrame
@ -1204,38 +1216,24 @@ CtPtr ProtocolV2::_banner_exchange(CtPtr callback) {
ldout(cct, 20) << __func__ << dendl;
bannerExchangeCallback = callback;
uint8_t type = messenger->get_mytype();
__le64 supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
__le64 required_features = CEPH_MSGR2_REQUIRED_FEATURES;
size_t banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
__le16 banner_payload_len = sizeof(uint8_t) + 2 * sizeof(__le64);
size_t banner_len = banner_prefix_len + sizeof(__le16) + banner_payload_len;
char banner[banner_len];
uint8_t offset = 0;
memcpy(banner, CEPH_BANNER_V2_PREFIX, banner_prefix_len);
offset += banner_prefix_len;
memcpy(banner + offset, (void *)&banner_payload_len, sizeof(__le16));
offset += sizeof(__le16);
memcpy(banner + offset, (void *)&type, sizeof(uint8_t));
offset += sizeof(uint8_t);
memcpy(banner + offset, (void *)&supported_features, sizeof(__le64));
offset += sizeof(__le64);
memcpy(banner + offset, (void *)&required_features, sizeof(__le64));
bufferlist banner_payload;
encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
bufferlist bl;
bl.append(banner, banner_len);
bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
encode((uint16_t)banner_payload.length(), bl, 0);
bl.claim_append(banner_payload);
return WRITE(bl, "banner", _wait_for_peer_banner);
}
CtPtr ProtocolV2::_wait_for_peer_banner() {
unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16) +
sizeof(uint8_t) + 2 * sizeof(__le64);
return READ(banner_len, _banner_exchange_handle_peer_banner);
unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16);
return READ(banner_len, _handle_peer_banner);
}
CtPtr ProtocolV2::_banner_exchange_handle_peer_banner(char *buffer, int r) {
CtPtr ProtocolV2::_handle_peer_banner(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
@ -1256,58 +1254,56 @@ CtPtr ProtocolV2::_banner_exchange_handle_peer_banner(char *buffer, int r) {
return _fault();
}
uint8_t peer_type = 0;
__le64 peer_supported_features;
__le64 peer_required_features;
uint8_t offset = banner_prefix_len;
__le16 banner_payload_len = *(__le16 *)(buffer + offset);
// V2 banner len check
if (banner_payload_len != (sizeof(uint8_t) + 2 * sizeof(__le64))) {
lderr(cct) << __func__ << " bad banner length: " << banner_payload_len
<< dendl;
uint16_t payload_len;
bufferlist bl;
bl.push_back(
buffer::create_static(sizeof(__le16), buffer + banner_prefix_len));
auto ti = bl.cbegin();
try {
decode(payload_len, ti);
} catch (const buffer::error &e) {
lderr(cct) << __func__ << " decode banner payload len failed " << dendl;
return _fault();
}
offset += sizeof(__le16);
peer_type = *(uint8_t *)(buffer + offset);
offset += sizeof(uint8_t);
peer_supported_features = *(__le64 *)(buffer + offset);
offset += sizeof(__le64);
peer_required_features = *(__le64 *)(buffer + offset);
ceph_assert(payload_len <= 4096); // if we need more then we need to increase
// temp_buffer size as well
if (connection->get_peer_type() == -1) {
connection->set_peer_type(peer_type);
next_payload_len = payload_len;
return READ(next_payload_len, _handle_peer_banner_payload);
}
ceph_assert(state == ACCEPTING);
connection->policy = messenger->get_policy(peer_type);
ldout(cct, 10) << __func__ << " accept of host_type " << (int)peer_type
<< ", policy.lossy=" << connection->policy.lossy
<< " policy.server=" << connection->policy.server
<< " policy.standby=" << connection->policy.standby
<< " policy.resetcheck=" << connection->policy.resetcheck
<< dendl;
} else {
if (connection->get_peer_type() != peer_type) {
ldout(cct, 1) << __func__ << " connection peer type does not match what"
<< " peer advertises " << connection->get_peer_type()
<< " != " << (int)peer_type << dendl;
stop();
connection->dispatch_queue->queue_reset(connection);
return nullptr;
}
CtPtr ProtocolV2::_handle_peer_banner_payload(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " read peer banner payload failed r=" << r
<< " (" << cpp_strerror(r) << ")" << dendl;
return _fault();
}
ldout(cct, 1) << __func__ << " peer_type=" << (int)peer_type
<< " supported=" << std::hex << peer_supported_features
<< " required=" << std::hex << peer_required_features
<< std::dec << dendl;
uint64_t peer_supported_features;
uint64_t peer_required_features;
bufferlist bl;
bl.push_back(buffer::create_static(next_payload_len, buffer));
auto ti = bl.cbegin();
try {
decode(peer_supported_features, ti);
decode(peer_required_features, ti);
} catch (const buffer::error &e) {
lderr(cct) << __func__ << " decode banner payload failed " << dendl;
return _fault();
}
ldout(cct, 1) << __func__ << " supported=" << std::hex
<< peer_supported_features << " required=" << std::hex
<< peer_required_features << std::dec << dendl;
// Check feature bit compatibility
__le64 supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
__le64 required_features = CEPH_MSGR2_REQUIRED_FEATURES;
uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
if ((required_features & peer_supported_features) != required_features) {
ldout(cct, 1) << __func__ << " peer does not support all required features"
@ -1328,15 +1324,43 @@ CtPtr ProtocolV2::_banner_exchange_handle_peer_banner(char *buffer, int r) {
}
this->peer_required_features = peer_required_features;
if (this->peer_required_features == 0) {
this->connection_features = CEPH_FEATURE_MSG_ADDR2;
}
if (cct->_conf->ms_inject_internal_delays &&
cct->_conf->ms_inject_socket_failures) {
if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
ldout(cct, 10) << __func__ << " sleep for "
<< cct->_conf->ms_inject_internal_delays << dendl;
utime_t t;
t.set_from_double(cct->_conf->ms_inject_internal_delays);
t.sleep();
HelloFrame hello(this, messenger->get_mytype(), connection->target_addr);
return WRITE(hello.get_buffer(), "hello frame", read_frame);
}
CtPtr ProtocolV2::handle_hello(char *payload, uint32_t length) {
ldout(cct, 20) << __func__ << " payload_len=" << std::dec << length << dendl;
HelloFrame hello(payload, length);
ldout(cct, 5) << __func__ << " received hello:"
<< " peer_type=" << (int)hello.entity_type()
<< " peer_addr_for_me=" << hello.peer_addr() << dendl;
if (connection->get_peer_type() == -1) {
connection->set_peer_type(hello.entity_type());
ceph_assert(state == ACCEPTING);
connection->policy = messenger->get_policy(hello.entity_type());
ldout(cct, 10) << __func__ << " accept of host_type "
<< (int)hello.entity_type()
<< ", policy.lossy=" << connection->policy.lossy
<< " policy.server=" << connection->policy.server
<< " policy.standby=" << connection->policy.standby
<< " policy.resetcheck=" << connection->policy.resetcheck
<< dendl;
} else {
if (connection->get_peer_type() != hello.entity_type()) {
ldout(cct, 1) << __func__ << " connection peer type does not match what"
<< " peer advertises " << connection->get_peer_type()
<< " != " << (int)hello.entity_type() << dendl;
stop();
connection->dispatch_queue->queue_reset(connection);
return nullptr;
}
}
@ -1385,6 +1409,7 @@ CtPtr ProtocolV2::handle_read_frame_length_and_tag(char *buffer, int r) {
<< " tag=" << static_cast<uint32_t>(next_tag) << dendl;
switch (next_tag) {
case Tag::HELLO:
case Tag::AUTH_REQUEST:
case Tag::AUTH_BAD_METHOD:
case Tag::AUTH_BAD_AUTH:
@ -1427,6 +1452,8 @@ CtPtr ProtocolV2::handle_frame_payload(char *buffer, int r) {
}
switch (next_tag) {
case Tag::HELLO:
return handle_hello(buffer, next_payload_len);
case Tag::AUTH_REQUEST:
return handle_auth_request(buffer, next_payload_len);
case Tag::AUTH_BAD_METHOD:
@ -2208,30 +2235,30 @@ CtPtr ProtocolV2::send_client_ident() {
messenger->get_myaddrs().front().is_blank_ip()) {
sockaddr_storage ss;
socklen_t len = sizeof(ss);
getsockname(connection->cs.fd(), (sockaddr*)&ss, &len);
ldout(cct,1) << __func__ << " getsockname reveals I am " << (sockaddr*)&ss
<< " when talking to " << connection->target_addr << dendl;
getsockname(connection->cs.fd(), (sockaddr *)&ss, &len);
ldout(cct, 1) << __func__ << " getsockname reveals I am " << (sockaddr *)&ss
<< " when talking to " << connection->target_addr << dendl;
entity_addr_t a;
a.set_type(entity_addr_t::TYPE_MSGR2); // anything but NONE; learned_addr ignores this
a.set_sockaddr((sockaddr*)&ss);
a.set_sockaddr((sockaddr *)&ss);
a.set_port(0);
connection->lock.unlock();
messenger->learned_addr(a);
if (cct->_conf->ms_inject_internal_delays &&
cct->_conf->ms_inject_socket_failures) {
cct->_conf->ms_inject_socket_failures) {
if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
ldout(cct, 10) << __func__ << " sleep for "
<< cct->_conf->ms_inject_internal_delays << dendl;
utime_t t;
t.set_from_double(cct->_conf->ms_inject_internal_delays);
t.sleep();
ldout(cct, 10) << __func__ << " sleep for "
<< cct->_conf->ms_inject_internal_delays << dendl;
utime_t t;
t.set_from_double(cct->_conf->ms_inject_internal_delays);
t.sleep();
}
}
connection->lock.lock();
if (state != CONNECTING) {
ldout(cct, 1) << __func__
<< " state changed while learned_addr, mark_down or "
<< " replacing must be happened just now" << dendl;
<< " state changed while learned_addr, mark_down or "
<< " replacing must be happened just now" << dendl;
return nullptr;
}
}
@ -2352,7 +2379,6 @@ CtPtr ProtocolV2::handle_server_ident(char *payload, uint32_t length) {
ServerIdentFrame server_ident(this, payload, length);
ldout(cct, 5) << __func__ << " received server identification:"
<< " addrs=" << server_ident.addrs()
<< " my_addr=" << server_ident.peer_addr()
<< " gid=" << server_ident.gid()
<< " global_seq=" << server_ident.global_seq()
<< " features_supported=" << std::hex
@ -2572,6 +2598,7 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) {
if (client_ident.addrs().empty()) {
return _fault(); // a v2 peer should never do this
}
connection->set_peer_addrs(client_ident.addrs());
connection->target_addr = connection->_infer_target_addr(client_ident.addrs());
@ -2967,13 +2994,12 @@ CtPtr ProtocolV2::send_server_ident() {
uint64_t gs = messenger->get_global_seq();
ServerIdentFrame server_ident(
this, messenger->get_myaddrs(), connection->target_addr,
messenger->get_myname().num(), gs, connection->policy.features_supported,
this, messenger->get_myaddrs(), messenger->get_myname().num(), gs,
connection->policy.features_supported,
connection->policy.features_required, flags, cookie);
ldout(cct, 5) << __func__ << " sending identification:"
<< " addrs=" << messenger->get_myaddrs()
<< " target_addr=" << connection->target_addr
<< " gid=" << messenger->get_myname().num()
<< " global_seq=" << gs << " features_supported=" << std::hex
<< connection->policy.features_supported

View File

@ -47,7 +47,8 @@ private:
public:
enum class Tag : uint32_t {
AUTH_REQUEST = 1,
HELLO = 1,
AUTH_REQUEST,
AUTH_BAD_METHOD,
AUTH_BAD_AUTH,
AUTH_MORE,
@ -81,7 +82,6 @@ private:
std::shared_ptr<AuthSessionHandler> session_security;
std::unique_ptr<AuthAuthorizerChallenge> authorizer_challenge;
uint64_t auth_flags;
uint64_t connection_features;
uint64_t cookie;
uint64_t global_seq;
uint64_t connect_seq;
@ -142,12 +142,14 @@ private:
void handle_message_ack(uint64_t seq);
CONTINUATION_DECL(ProtocolV2, _wait_for_peer_banner);
READ_HANDLER_CONTINUATION_DECL(ProtocolV2,
_banner_exchange_handle_peer_banner);
READ_HANDLER_CONTINUATION_DECL(ProtocolV2, _handle_peer_banner);
READ_HANDLER_CONTINUATION_DECL(ProtocolV2, _handle_peer_banner_payload);
Ct<ProtocolV2> *_banner_exchange(Ct<ProtocolV2> *callback);
Ct<ProtocolV2> *_wait_for_peer_banner();
Ct<ProtocolV2> *_banner_exchange_handle_peer_banner(char *buffer, int r);
Ct<ProtocolV2> *_handle_peer_banner(char *buffer, int r);
Ct<ProtocolV2> *_handle_peer_banner_payload(char *buffer, int r);
Ct<ProtocolV2> *handle_hello(char *payload, uint32_t length);
CONTINUATION_DECL(ProtocolV2, read_frame);
READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_length_and_tag);
@ -191,6 +193,8 @@ private:
Ct<ProtocolV2> *handle_message_ack(char *payload, uint32_t length);
public:
uint64_t connection_features;
ProtocolV2(AsyncConnection *connection);
virtual ~ProtocolV2();