crimson/net: change the static IS_FIXED_CPU to runtime

Otherwise the messenger implementation needs to be templated.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
This commit is contained in:
Yingxin Cheng 2023-04-26 16:13:48 +08:00
parent 7797cabe2a
commit 58491826d5
5 changed files with 69 additions and 74 deletions

View File

@ -340,27 +340,23 @@ Socket::try_trap_post(bp_action_t& trap) {
}
#endif
#define SERVER_SOCKET ShardedServerSocket<IS_FIXED_CPU>
template <bool IS_FIXED_CPU>
SERVER_SOCKET::ShardedServerSocket(
ShardedServerSocket::ShardedServerSocket(
seastar::shard_id sid,
bool is_fixed_cpu,
construct_tag)
: primary_sid{sid}
: primary_sid{sid}, is_fixed_cpu{is_fixed_cpu}
{
}
template <bool IS_FIXED_CPU>
SERVER_SOCKET::~ShardedServerSocket()
ShardedServerSocket::~ShardedServerSocket()
{
assert(!listener);
// detect whether user have called destroy() properly
ceph_assert_always(!service);
}
template <bool IS_FIXED_CPU>
listen_ertr::future<>
SERVER_SOCKET::listen(entity_addr_t addr)
ShardedServerSocket::listen(entity_addr_t addr)
{
ceph_assert_always(seastar::this_shard_id() == primary_sid);
logger().debug("ShardedServerSocket({})::listen()...", addr);
@ -369,7 +365,7 @@ SERVER_SOCKET::listen(entity_addr_t addr)
seastar::socket_address s_addr(addr.in4_addr());
seastar::listen_options lo;
lo.reuse_address = true;
if constexpr (IS_FIXED_CPU) {
if (ss.is_fixed_cpu) {
lo.set_fixed_cpu(ss.primary_sid);
}
ss.listener = seastar::listen(s_addr, lo);
@ -391,9 +387,8 @@ SERVER_SOCKET::listen(entity_addr_t addr)
});
}
template <bool IS_FIXED_CPU>
seastar::future<>
SERVER_SOCKET::accept(accept_func_t &&_fn_accept)
ShardedServerSocket::accept(accept_func_t &&_fn_accept)
{
ceph_assert_always(seastar::this_shard_id() == primary_sid);
logger().debug("ShardedServerSocket({})::accept()...", listen_addr);
@ -407,10 +402,12 @@ SERVER_SOCKET::accept(accept_func_t &&_fn_accept)
return seastar::keep_doing([&ss] {
return ss.listener->accept(
).then([&ss](seastar::accept_result accept_result) {
if constexpr (IS_FIXED_CPU) {
#ifndef NDEBUG
if (ss.is_fixed_cpu) {
// see seastar::listen_options::set_fixed_cpu()
ceph_assert_always(seastar::this_shard_id() == ss.primary_sid);
}
#endif
auto [socket, paddr] = std::move(accept_result);
entity_addr_t peer_addr;
peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
@ -419,8 +416,8 @@ SERVER_SOCKET::accept(accept_func_t &&_fn_accept)
std::move(socket), Socket::side_t::acceptor,
peer_addr.get_port(), Socket::construct_tag{});
logger().debug("ShardedServerSocket({})::accept(): "
"accepted peer {}, socket {}",
ss.listen_addr, peer_addr, fmt::ptr(_socket));
"accepted peer {}, socket {}, is_fixed = {}",
ss.listen_addr, peer_addr, fmt::ptr(_socket), ss.is_fixed_cpu);
std::ignore = seastar::with_gate(
ss.shutdown_gate,
[socket=std::move(_socket), peer_addr, &ss]() mutable {
@ -462,9 +459,8 @@ SERVER_SOCKET::accept(accept_func_t &&_fn_accept)
});
}
template <bool IS_FIXED_CPU>
seastar::future<>
SERVER_SOCKET::shutdown_destroy()
ShardedServerSocket::shutdown_destroy()
{
assert(seastar::this_shard_id() == primary_sid);
logger().debug("ShardedServerSocket({})::shutdown_destroy()...", listen_addr);
@ -490,15 +486,14 @@ SERVER_SOCKET::shutdown_destroy()
});
}
template <bool IS_FIXED_CPU>
seastar::future<SERVER_SOCKET*>
SERVER_SOCKET::create()
seastar::future<ShardedServerSocket*>
ShardedServerSocket::create(bool is_fixed_cpu)
{
auto primary_sid = seastar::this_shard_id();
// start the sharded service: we should only construct/stop shards on #0
return seastar::smp::submit_to(0, [primary_sid] {
return seastar::smp::submit_to(0, [primary_sid, is_fixed_cpu] {
auto service = std::make_unique<sharded_service_t>();
return service->start(primary_sid, construct_tag{}
return service->start(primary_sid, is_fixed_cpu, construct_tag{}
).then([service = std::move(service)]() mutable {
auto p_shard = service.get();
p_shard->local().service = std::move(service);
@ -509,7 +504,4 @@ SERVER_SOCKET::create()
});
}
template class ShardedServerSocket<true>;
template class ShardedServerSocket<false>;
} // namespace crimson::net

View File

@ -144,20 +144,22 @@ using listen_ertr = crimson::errorator<
crimson::ct_error::address_not_available // https://techoverflow.net/2021/08/06/how-i-fixed-python-oserror-errno-99-cannot-assign-requested-address/
>;
template <bool IS_FIXED_CPU>
class ShardedServerSocket
: public seastar::peering_sharded_service<ShardedServerSocket<IS_FIXED_CPU> > {
: public seastar::peering_sharded_service<ShardedServerSocket> {
struct construct_tag {};
public:
ShardedServerSocket(seastar::shard_id sid, construct_tag);
ShardedServerSocket(seastar::shard_id sid, bool is_fixed_cpu, construct_tag);
~ShardedServerSocket();
ShardedServerSocket(ShardedServerSocket&&) = delete;
ShardedServerSocket(const ShardedServerSocket&) = delete;
ShardedServerSocket& operator=(ShardedServerSocket&&) = delete;
ShardedServerSocket& operator=(const ShardedServerSocket&) = delete;
bool is_fixed() const { return is_fixed_cpu; }
listen_ertr::future<> listen(entity_addr_t addr);
using accept_func_t =
@ -166,11 +168,12 @@ public:
seastar::future<> shutdown_destroy();
static seastar::future<ShardedServerSocket*> create();
static seastar::future<ShardedServerSocket*> create(bool is_fixed_cpu);
private:
// the fixed CPU if IS_FIXED_CPU is true
// the fixed CPU if is_fixed_cpu is true
const seastar::shard_id primary_sid;
const bool is_fixed_cpu;
entity_addr_t listen_addr;
std::optional<seastar::server_socket> listener;
seastar::gate shutdown_gate;

View File

@ -92,7 +92,7 @@ SocketMessenger::do_listen(const entity_addrvec_t& addrs)
set_myaddrs(addrs);
return seastar::futurize_invoke([this] {
if (!listener) {
return ShardedServerSocket<true>::create(
return ShardedServerSocket::create(true
).then([this] (auto _listener) {
listener = _listener;
});
@ -218,6 +218,7 @@ seastar::future<> SocketMessenger::start(
ceph_assert(get_myaddr().get_port() > 0);
return listener->accept([this](SocketRef socket, entity_addr_t peer_addr) {
assert(listener->is_fixed());
assert(seastar::this_shard_id() == sid);
assert(get_myaddr().is_msgr2());
SocketConnectionRef conn =

View File

@ -29,7 +29,6 @@
namespace crimson::net {
template <bool IS_FIXED_CPU>
class ShardedServerSocket;
class SocketMessenger final : public Messenger {
@ -169,7 +168,7 @@ private:
crimson::auth::AuthClient* auth_client = nullptr;
crimson::auth::AuthServer* auth_server = nullptr;
ShardedServerSocket<true> *listener = nullptr;
ShardedServerSocket *listener = nullptr;
ChainedDispatchers dispatchers;
std::map<entity_addr_t, SocketConnectionRef> connections;
std::set<SocketConnectionRef> accepting_conns;

View File

@ -70,14 +70,14 @@ future<> test_refused() {
});
}
template <bool IS_FIXED_CPU>
future<> test_bind_same() {
future<> test_bind_same(bool is_fixed_cpu) {
logger().info("test_bind_same()...");
return ShardedServerSocket<IS_FIXED_CPU>::create().then([](auto pss1) {
return ShardedServerSocket::create(is_fixed_cpu
).then([is_fixed_cpu](auto pss1) {
auto saddr = get_server_addr();
return pss1->listen(saddr).safe_then([saddr] {
return pss1->listen(saddr).safe_then([saddr, is_fixed_cpu] {
// try to bind the same address
return ShardedServerSocket<IS_FIXED_CPU>::create(
return ShardedServerSocket::create(is_fixed_cpu
).then([saddr](auto pss2) {
return pss2->listen(saddr).safe_then([] {
logger().error("test_bind_same() should raise address_in_use");
@ -112,10 +112,9 @@ future<> test_bind_same() {
});
}
template <bool IS_FIXED_CPU>
future<> test_accept() {
future<> test_accept(bool is_fixed_cpu) {
logger().info("test_accept()");
return ShardedServerSocket<IS_FIXED_CPU>::create(
return ShardedServerSocket::create(is_fixed_cpu
).then([](auto pss) {
auto saddr = get_server_addr();
return pss->listen(saddr
@ -157,27 +156,29 @@ future<> test_accept() {
});
}
template <bool IS_FIXED_CPU>
class SocketFactory {
static constexpr seastar::shard_id CLIENT_CPU = 0u;
SocketRef client_socket;
seastar::promise<> server_connected;
static constexpr seastar::shard_id SERVER_CPU = 1u;
ShardedServerSocket<IS_FIXED_CPU> *pss = nullptr;
ShardedServerSocket *pss = nullptr;
seastar::shard_id server_socket_CPU;
SocketRef server_socket;
public:
template <typename FuncC, typename FuncS>
static future<> dispatch_sockets(FuncC&& cb_client, FuncS&& cb_server) {
static future<> dispatch_sockets(
bool is_fixed_cpu,
FuncC&& cb_client,
FuncS&& cb_server) {
ceph_assert_always(seastar::this_shard_id() == CLIENT_CPU);
auto owner = std::make_unique<SocketFactory>();
auto psf = owner.get();
auto saddr = get_server_addr();
return seastar::smp::submit_to(SERVER_CPU, [psf, saddr] {
return ShardedServerSocket<IS_FIXED_CPU>::create(
return seastar::smp::submit_to(SERVER_CPU, [psf, saddr, is_fixed_cpu] {
return ShardedServerSocket::create(is_fixed_cpu
).then([psf, saddr](auto pss) {
psf->pss = pss;
return pss->listen(saddr
@ -201,7 +202,7 @@ class SocketFactory {
logger().info("dispatch_sockets(): accepted at shard {}",
seastar::this_shard_id());
psf->server_socket_CPU = seastar::this_shard_id();
if constexpr (IS_FIXED_CPU) {
if (psf->pss->is_fixed()) {
ceph_assert_always(SERVER_CPU == seastar::this_shard_id());
}
psf->server_socket = std::move(socket);
@ -426,10 +427,10 @@ class Connection {
}
};
template <bool IS_FIXED_CPU>
future<> test_read_write() {
future<> test_read_write(bool is_fixed_cpu) {
logger().info("test_read_write()...");
return SocketFactory<IS_FIXED_CPU>::dispatch_sockets(
return SocketFactory::dispatch_sockets(
is_fixed_cpu,
[](auto cs) { return Connection::dispatch_rw_bounded(cs, 128); },
[](auto ss) { return Connection::dispatch_rw_bounded(ss, 128); }
).then([] {
@ -440,10 +441,10 @@ future<> test_read_write() {
});
}
template <bool IS_FIXED_CPU>
future<> test_unexpected_down() {
future<> test_unexpected_down(bool is_fixed_cpu) {
logger().info("test_unexpected_down()...");
return SocketFactory<IS_FIXED_CPU>::dispatch_sockets(
return SocketFactory::dispatch_sockets(
is_fixed_cpu,
[](auto cs) {
return Connection::dispatch_rw_bounded(cs, 128, true
).handle_exception_type([](const std::system_error& e) {
@ -460,10 +461,10 @@ future<> test_unexpected_down() {
});
}
template <bool IS_FIXED_CPU>
future<> test_shutdown_propagated() {
future<> test_shutdown_propagated(bool is_fixed_cpu) {
logger().info("test_shutdown_propagated()...");
return SocketFactory<IS_FIXED_CPU>::dispatch_sockets(
return SocketFactory::dispatch_sockets(
is_fixed_cpu,
[](auto cs) {
logger().debug("test_shutdown_propagated() shutdown client socket");
cs->shutdown();
@ -478,10 +479,10 @@ future<> test_shutdown_propagated() {
});
}
template <bool IS_FIXED_CPU>
future<> test_preemptive_down() {
future<> test_preemptive_down(bool is_fixed_cpu) {
logger().info("test_preemptive_down()...");
return SocketFactory<IS_FIXED_CPU>::dispatch_sockets(
return SocketFactory::dispatch_sockets(
is_fixed_cpu,
[](auto cs) { return Connection::dispatch_rw_unbounded(cs, true); },
[](auto ss) { return Connection::dispatch_rw_unbounded(ss); }
).then([] {
@ -492,19 +493,18 @@ future<> test_preemptive_down() {
});
}
template <bool IS_FIXED_CPU>
future<> do_test_with_type() {
return test_bind_same<IS_FIXED_CPU>(
).then([] {
return test_accept<IS_FIXED_CPU>();
}).then([] {
return test_read_write<IS_FIXED_CPU>();
}).then([] {
return test_unexpected_down<IS_FIXED_CPU>();
}).then([] {
return test_shutdown_propagated<IS_FIXED_CPU>();
}).then([] {
return test_preemptive_down<IS_FIXED_CPU>();
future<> do_test_with_type(bool is_fixed_cpu) {
return test_bind_same(is_fixed_cpu
).then([is_fixed_cpu] {
return test_accept(is_fixed_cpu);
}).then([is_fixed_cpu] {
return test_read_write(is_fixed_cpu);
}).then([is_fixed_cpu] {
return test_unexpected_down(is_fixed_cpu);
}).then([is_fixed_cpu] {
return test_shutdown_propagated(is_fixed_cpu);
}).then([is_fixed_cpu] {
return test_preemptive_down(is_fixed_cpu);
});
}
@ -527,9 +527,9 @@ seastar::future<int> do_test(seastar::app_template& app)
}).then([] {
return test_refused();
}).then([] {
return do_test_with_type<true>();
return do_test_with_type(true);
}).then([] {
return do_test_with_type<false>();
return do_test_with_type(false);
}).then([] {
logger().info("All tests succeeded");
// Seastar has bugs to have events undispatched during shutdown,