crimson/net: improve get_global_seq()

Implement single global_seq and non-racing get_global_seq() interface.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
This commit is contained in:
Yingxin Cheng 2019-08-07 19:14:27 +08:00
parent 9c9576cefd
commit 03fbb3603a
5 changed files with 93 additions and 76 deletions

View File

@ -35,7 +35,6 @@ using SocketPolicy = ceph::net::Policy<Throttle>;
class Messenger {
entity_name_t my_name;
entity_addrvec_t my_addrs;
uint32_t global_seq = 0;
uint32_t crc_flags = 0;
ceph::auth::AuthClient* auth_client = nullptr;
ceph::auth::AuthServer* auth_server = nullptr;
@ -79,13 +78,6 @@ public:
/// after this future becomes available
virtual seastar::future<> shutdown() = 0;
uint32_t get_global_seq(uint32_t old=0) {
if (old > global_seq) {
global_seq = old;
}
return ++global_seq;
}
uint32_t get_crc_flags() const {
return crc_flags;
}

View File

@ -184,8 +184,10 @@ ProtocolV1::handle_connect_reply(msgr_tag_t tag)
reset_session();
return seastar::make_ready_future<stop_t>(stop_t::no);
case CEPH_MSGR_TAG_RETRY_GLOBAL:
h.global_seq = messenger.get_global_seq(h.reply.global_seq);
return seastar::make_ready_future<stop_t>(stop_t::no);
return messenger.get_global_seq(h.reply.global_seq).then([this] (auto gs) {
h.global_seq = gs;
return seastar::make_ready_future<stop_t>(stop_t::no);
});
case CEPH_MSGR_TAG_RETRY_SESSION:
ceph_assert(h.reply.connect_seq > h.connect_seq);
h.connect_seq = h.reply.connect_seq;
@ -327,6 +329,9 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
}
return seastar::now();
}).then([this] {
return messenger.get_global_seq();
}).then([this] (auto gs) {
h.global_seq = gs;
// read server's handshake header
return socket->read(server_header_size);
}).then([this] (bufferlist headerbl) {
@ -357,7 +362,6 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
bufferlist bl;
bl.append(buffer::create_static(banner_size, banner));
::encode(messenger.get_myaddr(), bl, 0);
h.global_seq = messenger.get_global_seq();
return socket->write_flush(std::move(bl));
}).then([=] {
return seastar::repeat([this] {
@ -399,25 +403,27 @@ seastar::future<stop_t> ProtocolV1::send_connect_reply(
seastar::future<stop_t> ProtocolV1::send_connect_reply_ready(
msgr_tag_t tag, bufferlist&& authorizer_reply)
{
h.global_seq = messenger.get_global_seq();
h.reply.tag = tag;
h.reply.features = conn.policy.features_supported;
h.reply.global_seq = h.global_seq;
h.reply.connect_seq = h.connect_seq;
h.reply.flags = 0;
if (conn.policy.lossy) {
h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY;
}
h.reply.authorizer_len = authorizer_reply.length();
return messenger.get_global_seq(
).then([this, tag, auth_len = authorizer_reply.length()] (auto gs) {
h.global_seq = gs;
h.reply.tag = tag;
h.reply.features = conn.policy.features_supported;
h.reply.global_seq = h.global_seq;
h.reply.connect_seq = h.connect_seq;
h.reply.flags = 0;
if (conn.policy.lossy) {
h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY;
}
h.reply.authorizer_len = auth_len;
session_security.reset(
get_auth_session_handler(nullptr,
auth_meta->auth_method,
auth_meta->session_key,
conn.features));
session_security.reset(
get_auth_session_handler(nullptr,
auth_meta->auth_method,
auth_meta->session_key,
conn.features));
return socket->write(make_static_packet(h.reply))
.then([this, reply=std::move(authorizer_reply)]() mutable {
return socket->write(make_static_packet(h.reply));
}).then([this, reply=std::move(authorizer_reply)]() mutable {
if (reply.length()) {
return socket->write(std::move(reply));
} else {

View File

@ -777,9 +777,11 @@ seastar::future<bool> ProtocolV2::client_reconnect()
auto retry = RetryGlobalFrame::Decode(rx_segments_data.back());
logger().warn("{} GOT RetryGlobalFrame: gs={}",
conn, retry.global_seq());
global_seq = messenger.get_global_seq(retry.global_seq());
logger().warn("{} UPDATE: gs={}", conn, global_seq);
return client_reconnect();
return messenger.get_global_seq(retry.global_seq()).then([this] (auto gs) {
global_seq = gs;
logger().warn("{} UPDATE: gs={}", conn, global_seq);
return client_reconnect();
});
});
case Tag::SESSION_RETRY:
return read_frame_payload().then([this] {
@ -835,11 +837,12 @@ void ProtocolV2::execute_connecting()
seastar::with_gate(pending_dispatch, [this] {
// we don't know my socket_port yet
conn.set_ephemeral_port(0, SocketConnection::side_t::none);
enable_recording();
global_seq = messenger.get_global_seq();
logger().debug("{} UPDATE: gs={}", conn, global_seq);
return Socket::connect(conn.peer_addr)
.then([this](SocketFRef sock) {
return messenger.get_global_seq().then([this] (auto gs) {
global_seq = gs;
enable_recording();
logger().debug("{} UPDATE: gs={}", conn, global_seq);
return Socket::connect(conn.peer_addr);
}).then([this](SocketFRef sock) {
logger().debug("{} socket connected", conn);
socket = std::move(sock);
if (state == state_t::CLOSING) {
@ -1409,52 +1412,55 @@ seastar::future<> ProtocolV2::send_server_ident()
{
// send_server_ident() logic
// this is required for the case when this connection is being replaced
// TODO
// out_seq = discard_requeued_up_to(out_seq, 0);
conn.in_seq = 0;
if (!conn.policy.lossy) {
server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
}
uint64_t flags = 0;
if (conn.policy.lossy) {
flags = flags | CEPH_MSG_CONNECT_LOSSY;
}
// refered to async-conn v2: not assign gs to global_seq
uint64_t gs = messenger.get_global_seq();
auto server_ident = ServerIdentFrame::Encode(
messenger.get_myaddrs(),
messenger.get_myname().num(),
gs,
conn.policy.features_supported,
conn.policy.features_required | msgr2_required,
flags,
server_cookie);
return messenger.get_global_seq().then([this] (auto gs) {
logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
" gs={}, features_supported={}, features_required={},"
" flags={}, cookie={}",
conn, messenger.get_myaddrs(), messenger.get_myname().num(),
gs, conn.policy.features_supported,
conn.policy.features_required | msgr2_required,
flags, server_cookie);
// this is required for the case when this connection is being replaced
// TODO
// out_seq = discard_requeued_up_to(out_seq, 0);
conn.in_seq = 0;
conn.set_features(connection_features);
if (!conn.policy.lossy) {
server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
}
// notify
seastar::with_gate(pending_dispatch, [this] {
return dispatcher.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
.handle_exception([this] (std::exception_ptr eptr) {
logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
ceph_abort("unecpected exception from ms_handle_accept()");
uint64_t flags = 0;
if (conn.policy.lossy) {
flags = flags | CEPH_MSG_CONNECT_LOSSY;
}
auto server_ident = ServerIdentFrame::Encode(
messenger.get_myaddrs(),
messenger.get_myname().num(),
gs,
conn.policy.features_supported,
conn.policy.features_required | msgr2_required,
flags,
server_cookie);
logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
" gs={}, features_supported={}, features_required={},"
" flags={}, cookie={}",
conn, messenger.get_myaddrs(), messenger.get_myname().num(),
gs, conn.policy.features_supported,
conn.policy.features_required | msgr2_required,
flags, server_cookie);
conn.set_features(connection_features);
// notify
seastar::with_gate(pending_dispatch, [this] {
return dispatcher.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
.handle_exception([this] (std::exception_ptr eptr) {
logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
ceph_abort("unecpected exception from ms_handle_accept()");
});
});
});
return write_frame(server_ident);
return write_frame(server_ident);
});
}
// REPLACING state

View File

@ -372,3 +372,14 @@ void SocketMessenger::unregister_conn(SocketConnectionRef conn)
ceph_assert(found->second == conn);
connections.erase(found);
}
seastar::future<uint32_t>
SocketMessenger::get_global_seq(uint32_t old)
{
return container().invoke_on(0, [old] (auto& msgr) {
if (old > msgr.global_seq) {
msgr.global_seq = old;
}
return ++msgr.global_seq;
});
}

View File

@ -42,6 +42,7 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_
const uint32_t nonce;
// specifying we haven't learned our addr; set false when we find it.
bool need_addr = true;
uint32_t global_seq = 0;
seastar::future<> accept(seastar::connected_socket socket,
seastar::socket_address paddr);
@ -108,6 +109,7 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_
void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override;
public:
seastar::future<uint32_t> get_global_seq(uint32_t old=0);
seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me,
const SocketConnection& conn);