From 03bc0dcddafc8b7f899b29552d84d24d076bfb3f Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Wed, 30 Jan 2019 22:15:29 +0000 Subject: [PATCH] msg/async: msgr2: send entity type and peer_address in Tag::HELLO frame Signed-off-by: Ricardo Dias --- src/msg/async/AsyncConnection.cc | 3 +- src/msg/async/ProtocolV2.cc | 232 +++++++++++++++++-------------- src/msg/async/ProtocolV2.h | 14 +- 3 files changed, 140 insertions(+), 109 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index d2eaa558745..1ed18db114d 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -386,7 +386,7 @@ void AsyncConnection::process() { ssize_t r = cs.is_connected(); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " reconnect failed to " - << target_addr << dendl; + << target_addr << dendl; if (r == -ECONNREFUSED) { ldout(async_msgr->cct, 2) << __func__ << " connection refused!" << dendl; @@ -479,6 +479,7 @@ void AsyncConnection::accept(ConnectedSocket socket, std::lock_guard l(lock); cs = std::move(socket); socket_addr = listen_addr; + target_addr = peer_addr; // until we know better state = STATE_ACCEPTING; protocol->accept(); // rescheduler connection in order to avoid lock dep diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 49613bfbcbb..cb3745d6b46 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -138,17 +138,20 @@ protected: // this tuple is only used when decoding values from a payload buffer std::tuple _values; - // required only when signing and encryting payload, otherwise is null + // required for using econding/decoding features or when signing and + // encryting payload, otherwise is null ProtocolV2 *protocol; + uint64_t features; + template inline void _encode_payload_each(T &t) { if constexpr (std::is_same()) { this->payload.claim_append((bufferlist &)t); } else if constexpr (std::is_same const>()) { - encode((uint32_t)t.size(), this->payload, -1ll); + encode((uint32_t)t.size(), this->payload, features); for (const auto &elem : t) { - encode(elem, this->payload, 0); + encode(elem, this->payload, features); } } else if constexpr (std::is_same()) { this->payload.append((char *)&t, sizeof(t)); @@ -157,7 +160,7 @@ protected: protocol->sign_payload(this->payload); protocol->encrypt_payload(this->payload); } else { - encode(t, this->payload, -1ll); + encode(t, this->payload, features); } } @@ -198,12 +201,12 @@ protected: } public: - PayloadFrame(const Args &... args) : protocol(nullptr) { + PayloadFrame(const Args &... args) : protocol(nullptr), features(0) { (_encode_payload_each(args), ...); } PayloadFrame(ProtocolV2 *protocol, const Args &... args) - : protocol(protocol) { + : protocol(protocol), features(protocol->connection_features) { (_encode_payload_each(args), ...); } @@ -218,6 +221,16 @@ public: } }; +struct HelloFrame : public PayloadFrame { // peer_addr + const ProtocolV2::Tag tag = ProtocolV2::Tag::HELLO; + using PayloadFrame::PayloadFrame; + + inline uint8_t &entity_type() { return get_val<0>(); } + inline entity_addr_t &peer_addr() { return get_val<1>(); } +}; + struct AuthRequestFrame : public PayloadFrame { const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_REQUEST; @@ -299,19 +312,18 @@ struct ClientIdentFrame struct ServerIdentFrame : public SignedEncryptedFrame { const ProtocolV2::Tag tag = ProtocolV2::Tag::IDENT; using SignedEncryptedFrame::SignedEncryptedFrame; inline entity_addrvec_t &addrs() { return get_val<0>(); } - inline entity_addr_t &peer_addr() { return get_val<1>(); } - inline int64_t &gid() { return get_val<2>(); } - inline uint64_t &global_seq() { return get_val<3>(); } - inline uint64_t &supported_features() { return get_val<4>(); } - inline uint64_t &required_features() { return get_val<5>(); } - inline uint64_t &flags() { return get_val<6>(); } - inline uint64_t &cookie() { return get_val<7>(); } + inline int64_t &gid() { return get_val<1>(); } + inline uint64_t &global_seq() { return get_val<2>(); } + inline uint64_t &supported_features() { return get_val<3>(); } + inline uint64_t &required_features() { return get_val<4>(); } + inline uint64_t &flags() { return get_val<5>(); } + inline uint64_t &cookie() { return get_val<6>(); } }; struct ReconnectFrame @@ -1204,38 +1216,24 @@ CtPtr ProtocolV2::_banner_exchange(CtPtr callback) { ldout(cct, 20) << __func__ << dendl; bannerExchangeCallback = callback; - uint8_t type = messenger->get_mytype(); - __le64 supported_features = CEPH_MSGR2_SUPPORTED_FEATURES; - __le64 required_features = CEPH_MSGR2_REQUIRED_FEATURES; - - size_t banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); - __le16 banner_payload_len = sizeof(uint8_t) + 2 * sizeof(__le64); - size_t banner_len = banner_prefix_len + sizeof(__le16) + banner_payload_len; - char banner[banner_len]; - uint8_t offset = 0; - memcpy(banner, CEPH_BANNER_V2_PREFIX, banner_prefix_len); - offset += banner_prefix_len; - memcpy(banner + offset, (void *)&banner_payload_len, sizeof(__le16)); - offset += sizeof(__le16); - memcpy(banner + offset, (void *)&type, sizeof(uint8_t)); - offset += sizeof(uint8_t); - memcpy(banner + offset, (void *)&supported_features, sizeof(__le64)); - offset += sizeof(__le64); - memcpy(banner + offset, (void *)&required_features, sizeof(__le64)); + bufferlist banner_payload; + encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0); + encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0); bufferlist bl; - bl.append(banner, banner_len); + bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX)); + encode((uint16_t)banner_payload.length(), bl, 0); + bl.claim_append(banner_payload); return WRITE(bl, "banner", _wait_for_peer_banner); } CtPtr ProtocolV2::_wait_for_peer_banner() { - unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16) + - sizeof(uint8_t) + 2 * sizeof(__le64); - return READ(banner_len, _banner_exchange_handle_peer_banner); + unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16); + return READ(banner_len, _handle_peer_banner); } -CtPtr ProtocolV2::_banner_exchange_handle_peer_banner(char *buffer, int r) { +CtPtr ProtocolV2::_handle_peer_banner(char *buffer, int r) { ldout(cct, 20) << __func__ << " r=" << r << dendl; if (r < 0) { @@ -1256,58 +1254,56 @@ CtPtr ProtocolV2::_banner_exchange_handle_peer_banner(char *buffer, int r) { return _fault(); } - uint8_t peer_type = 0; - __le64 peer_supported_features; - __le64 peer_required_features; - - uint8_t offset = banner_prefix_len; - __le16 banner_payload_len = *(__le16 *)(buffer + offset); - - // V2 banner len check - if (banner_payload_len != (sizeof(uint8_t) + 2 * sizeof(__le64))) { - lderr(cct) << __func__ << " bad banner length: " << banner_payload_len - << dendl; + uint16_t payload_len; + bufferlist bl; + bl.push_back( + buffer::create_static(sizeof(__le16), buffer + banner_prefix_len)); + auto ti = bl.cbegin(); + try { + decode(payload_len, ti); + } catch (const buffer::error &e) { + lderr(cct) << __func__ << " decode banner payload len failed " << dendl; return _fault(); } - offset += sizeof(__le16); - peer_type = *(uint8_t *)(buffer + offset); - offset += sizeof(uint8_t); - peer_supported_features = *(__le64 *)(buffer + offset); - offset += sizeof(__le64); - peer_required_features = *(__le64 *)(buffer + offset); + ceph_assert(payload_len <= 4096); // if we need more then we need to increase + // temp_buffer size as well - if (connection->get_peer_type() == -1) { - connection->set_peer_type(peer_type); + next_payload_len = payload_len; + return READ(next_payload_len, _handle_peer_banner_payload); +} - ceph_assert(state == ACCEPTING); - connection->policy = messenger->get_policy(peer_type); - ldout(cct, 10) << __func__ << " accept of host_type " << (int)peer_type - << ", policy.lossy=" << connection->policy.lossy - << " policy.server=" << connection->policy.server - << " policy.standby=" << connection->policy.standby - << " policy.resetcheck=" << connection->policy.resetcheck - << dendl; - } else { - if (connection->get_peer_type() != peer_type) { - ldout(cct, 1) << __func__ << " connection peer type does not match what" - << " peer advertises " << connection->get_peer_type() - << " != " << (int)peer_type << dendl; - stop(); - connection->dispatch_queue->queue_reset(connection); - return nullptr; - } +CtPtr ProtocolV2::_handle_peer_banner_payload(char *buffer, int r) { + ldout(cct, 20) << __func__ << " r=" << r << dendl; + + if (r < 0) { + ldout(cct, 1) << __func__ << " read peer banner payload failed r=" << r + << " (" << cpp_strerror(r) << ")" << dendl; + return _fault(); } - ldout(cct, 1) << __func__ << " peer_type=" << (int)peer_type - << " supported=" << std::hex << peer_supported_features - << " required=" << std::hex << peer_required_features - << std::dec << dendl; + uint64_t peer_supported_features; + uint64_t peer_required_features; + + bufferlist bl; + bl.push_back(buffer::create_static(next_payload_len, buffer)); + auto ti = bl.cbegin(); + try { + decode(peer_supported_features, ti); + decode(peer_required_features, ti); + } catch (const buffer::error &e) { + lderr(cct) << __func__ << " decode banner payload failed " << dendl; + return _fault(); + } + + ldout(cct, 1) << __func__ << " supported=" << std::hex + << peer_supported_features << " required=" << std::hex + << peer_required_features << std::dec << dendl; // Check feature bit compatibility - __le64 supported_features = CEPH_MSGR2_SUPPORTED_FEATURES; - __le64 required_features = CEPH_MSGR2_REQUIRED_FEATURES; + uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES; + uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES; if ((required_features & peer_supported_features) != required_features) { ldout(cct, 1) << __func__ << " peer does not support all required features" @@ -1328,15 +1324,43 @@ CtPtr ProtocolV2::_banner_exchange_handle_peer_banner(char *buffer, int r) { } this->peer_required_features = peer_required_features; + if (this->peer_required_features == 0) { + this->connection_features = CEPH_FEATURE_MSG_ADDR2; + } - if (cct->_conf->ms_inject_internal_delays && - cct->_conf->ms_inject_socket_failures) { - if (rand() % cct->_conf->ms_inject_socket_failures == 0) { - ldout(cct, 10) << __func__ << " sleep for " - << cct->_conf->ms_inject_internal_delays << dendl; - utime_t t; - t.set_from_double(cct->_conf->ms_inject_internal_delays); - t.sleep(); + HelloFrame hello(this, messenger->get_mytype(), connection->target_addr); + return WRITE(hello.get_buffer(), "hello frame", read_frame); +} + +CtPtr ProtocolV2::handle_hello(char *payload, uint32_t length) { + ldout(cct, 20) << __func__ << " payload_len=" << std::dec << length << dendl; + + HelloFrame hello(payload, length); + + ldout(cct, 5) << __func__ << " received hello:" + << " peer_type=" << (int)hello.entity_type() + << " peer_addr_for_me=" << hello.peer_addr() << dendl; + + if (connection->get_peer_type() == -1) { + connection->set_peer_type(hello.entity_type()); + + ceph_assert(state == ACCEPTING); + connection->policy = messenger->get_policy(hello.entity_type()); + ldout(cct, 10) << __func__ << " accept of host_type " + << (int)hello.entity_type() + << ", policy.lossy=" << connection->policy.lossy + << " policy.server=" << connection->policy.server + << " policy.standby=" << connection->policy.standby + << " policy.resetcheck=" << connection->policy.resetcheck + << dendl; + } else { + if (connection->get_peer_type() != hello.entity_type()) { + ldout(cct, 1) << __func__ << " connection peer type does not match what" + << " peer advertises " << connection->get_peer_type() + << " != " << (int)hello.entity_type() << dendl; + stop(); + connection->dispatch_queue->queue_reset(connection); + return nullptr; } } @@ -1385,6 +1409,7 @@ CtPtr ProtocolV2::handle_read_frame_length_and_tag(char *buffer, int r) { << " tag=" << static_cast(next_tag) << dendl; switch (next_tag) { + case Tag::HELLO: case Tag::AUTH_REQUEST: case Tag::AUTH_BAD_METHOD: case Tag::AUTH_BAD_AUTH: @@ -1427,6 +1452,8 @@ CtPtr ProtocolV2::handle_frame_payload(char *buffer, int r) { } switch (next_tag) { + case Tag::HELLO: + return handle_hello(buffer, next_payload_len); case Tag::AUTH_REQUEST: return handle_auth_request(buffer, next_payload_len); case Tag::AUTH_BAD_METHOD: @@ -2208,30 +2235,30 @@ CtPtr ProtocolV2::send_client_ident() { messenger->get_myaddrs().front().is_blank_ip()) { sockaddr_storage ss; socklen_t len = sizeof(ss); - getsockname(connection->cs.fd(), (sockaddr*)&ss, &len); - ldout(cct,1) << __func__ << " getsockname reveals I am " << (sockaddr*)&ss - << " when talking to " << connection->target_addr << dendl; + getsockname(connection->cs.fd(), (sockaddr *)&ss, &len); + ldout(cct, 1) << __func__ << " getsockname reveals I am " << (sockaddr *)&ss + << " when talking to " << connection->target_addr << dendl; entity_addr_t a; a.set_type(entity_addr_t::TYPE_MSGR2); // anything but NONE; learned_addr ignores this - a.set_sockaddr((sockaddr*)&ss); + a.set_sockaddr((sockaddr *)&ss); a.set_port(0); connection->lock.unlock(); messenger->learned_addr(a); if (cct->_conf->ms_inject_internal_delays && - cct->_conf->ms_inject_socket_failures) { + cct->_conf->ms_inject_socket_failures) { if (rand() % cct->_conf->ms_inject_socket_failures == 0) { - ldout(cct, 10) << __func__ << " sleep for " - << cct->_conf->ms_inject_internal_delays << dendl; - utime_t t; - t.set_from_double(cct->_conf->ms_inject_internal_delays); - t.sleep(); + ldout(cct, 10) << __func__ << " sleep for " + << cct->_conf->ms_inject_internal_delays << dendl; + utime_t t; + t.set_from_double(cct->_conf->ms_inject_internal_delays); + t.sleep(); } } connection->lock.lock(); if (state != CONNECTING) { ldout(cct, 1) << __func__ - << " state changed while learned_addr, mark_down or " - << " replacing must be happened just now" << dendl; + << " state changed while learned_addr, mark_down or " + << " replacing must be happened just now" << dendl; return nullptr; } } @@ -2352,7 +2379,6 @@ CtPtr ProtocolV2::handle_server_ident(char *payload, uint32_t length) { ServerIdentFrame server_ident(this, payload, length); ldout(cct, 5) << __func__ << " received server identification:" << " addrs=" << server_ident.addrs() - << " my_addr=" << server_ident.peer_addr() << " gid=" << server_ident.gid() << " global_seq=" << server_ident.global_seq() << " features_supported=" << std::hex @@ -2572,6 +2598,7 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) { if (client_ident.addrs().empty()) { return _fault(); // a v2 peer should never do this } + connection->set_peer_addrs(client_ident.addrs()); connection->target_addr = connection->_infer_target_addr(client_ident.addrs()); @@ -2967,13 +2994,12 @@ CtPtr ProtocolV2::send_server_ident() { uint64_t gs = messenger->get_global_seq(); ServerIdentFrame server_ident( - this, messenger->get_myaddrs(), connection->target_addr, - messenger->get_myname().num(), gs, connection->policy.features_supported, + this, messenger->get_myaddrs(), messenger->get_myname().num(), gs, + connection->policy.features_supported, connection->policy.features_required, flags, cookie); ldout(cct, 5) << __func__ << " sending identification:" << " addrs=" << messenger->get_myaddrs() - << " target_addr=" << connection->target_addr << " gid=" << messenger->get_myname().num() << " global_seq=" << gs << " features_supported=" << std::hex << connection->policy.features_supported diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index d6721ffcf3d..5ac7f72c700 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -47,7 +47,8 @@ private: public: enum class Tag : uint32_t { - AUTH_REQUEST = 1, + HELLO = 1, + AUTH_REQUEST, AUTH_BAD_METHOD, AUTH_BAD_AUTH, AUTH_MORE, @@ -81,7 +82,6 @@ private: std::shared_ptr session_security; std::unique_ptr authorizer_challenge; uint64_t auth_flags; - uint64_t connection_features; uint64_t cookie; uint64_t global_seq; uint64_t connect_seq; @@ -142,12 +142,14 @@ private: void handle_message_ack(uint64_t seq); CONTINUATION_DECL(ProtocolV2, _wait_for_peer_banner); - READ_HANDLER_CONTINUATION_DECL(ProtocolV2, - _banner_exchange_handle_peer_banner); + READ_HANDLER_CONTINUATION_DECL(ProtocolV2, _handle_peer_banner); + READ_HANDLER_CONTINUATION_DECL(ProtocolV2, _handle_peer_banner_payload); Ct *_banner_exchange(Ct *callback); Ct *_wait_for_peer_banner(); - Ct *_banner_exchange_handle_peer_banner(char *buffer, int r); + Ct *_handle_peer_banner(char *buffer, int r); + Ct *_handle_peer_banner_payload(char *buffer, int r); + Ct *handle_hello(char *payload, uint32_t length); CONTINUATION_DECL(ProtocolV2, read_frame); READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_length_and_tag); @@ -191,6 +193,8 @@ private: Ct *handle_message_ack(char *payload, uint32_t length); public: + uint64_t connection_features; + ProtocolV2(AsyncConnection *connection); virtual ~ProtocolV2();