test/crimson/test_messenger: implement multi-shard test_echo::test_state::Server

Also introduces ShardedGates.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
This commit is contained in:
Yingxin Cheng 2023-07-07 10:13:18 +08:00
parent 985173419b
commit ec6acfc352

View File

@ -23,6 +23,7 @@
#include <seastar/core/app-template.hh>
#include <seastar/core/do_with.hh>
#include <seastar/core/future-util.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/with_timeout.hh>
@ -52,22 +53,91 @@ static entity_addr_t get_server_addr() {
return saddr;
}
template <typename T, typename... Args>
seastar::future<T*> create_sharded(Args... args) {
// we should only construct/stop shards on #0
return seastar::smp::submit_to(0, [=] {
auto sharded_obj = seastar::make_lw_shared<seastar::sharded<T>>();
return sharded_obj->start(args...
).then([sharded_obj] {
seastar::engine().at_exit([sharded_obj] {
return sharded_obj->stop().then([sharded_obj] {});
});
return sharded_obj.get();
});
}).then([](seastar::sharded<T> *ptr_shard) {
return &ptr_shard->local();
});
}
class ShardedGates
: public seastar::peering_sharded_service<ShardedGates> {
public:
ShardedGates() = default;
~ShardedGates() {
assert(gate.is_closed());
}
template <typename Func>
void dispatch_in_background(const char *what, Func &&f) {
std::ignore = seastar::with_gate(
container().local().gate, std::forward<Func>(f)
).handle_exception([what](std::exception_ptr eptr) {
try {
std::rethrow_exception(eptr);
} catch (std::exception &e) {
logger().error("ShardedGates::dispatch_in_background: "
"{} got exxception {}", what, e.what());
}
});
}
seastar::future<> close() {
return container().invoke_on_all([](auto &local) {
return local.gate.close();
});
}
static seastar::future<ShardedGates*> create() {
return create_sharded<ShardedGates>();
}
// seastar::future<> stop() is intentially not implemented
private:
seastar::gate gate;
};
static seastar::future<> test_echo(unsigned rounds,
double keepalive_ratio)
{
struct test_state {
struct Server final
: public crimson::net::Dispatcher {
ShardedGates &gates;
crimson::net::MessengerRef msgr;
crimson::auth::DummyAuthClientServer dummy_auth;
Server(ShardedGates &gates) : gates{gates} {}
void ms_handle_accept(
crimson::net::ConnectionRef conn,
seastar::shard_id new_shard,
bool is_replace) override {
logger().info("server accepted {}", *conn);
ceph_assert(new_shard == seastar::this_shard_id());
ceph_assert(!is_replace);
}
std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef c, MessageRef m) override {
if (verbose) {
logger().info("server got {}", *m);
}
// reply with a pong
std::ignore = c->send(crimson::make_message<MPing>());
gates.dispatch_in_background("echo_send_pong", [c] {
return c->send(crimson::make_message<MPing>());
});
return {seastar::now()};
}
@ -76,7 +146,7 @@ static seastar::future<> test_echo(unsigned rounds,
const uint64_t nonce,
const entity_addr_t& addr) {
msgr = crimson::net::Messenger::create(
name, lname, nonce, true);
name, lname, nonce, false);
msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
@ -229,46 +299,51 @@ static seastar::future<> test_echo(unsigned rounds,
logger().info("test_echo(rounds={}, keepalive_ratio={}):",
rounds, keepalive_ratio);
auto server1 = seastar::make_shared<test_state::Server>();
auto server2 = seastar::make_shared<test_state::Server>();
auto client1 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
auto client2 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
// start servers and clients
auto addr1 = get_server_addr();
auto addr2 = get_server_addr();
addr1.set_type(entity_addr_t::TYPE_MSGR2);
addr2.set_type(entity_addr_t::TYPE_MSGR2);
return seastar::when_all_succeed(
server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
client1->init(entity_name_t::OSD(2), "client1", 3),
client2->init(entity_name_t::OSD(3), "client2", 4)
// dispatch pingpoing
).then_unpack([client1, client2, server1, server2] {
return ShardedGates::create(
).then([rounds, keepalive_ratio](auto *gates) {
auto server1 = seastar::make_shared<test_state::Server>(*gates);
auto server2 = seastar::make_shared<test_state::Server>(*gates);
auto client1 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
auto client2 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
// start servers and clients
auto addr1 = get_server_addr();
auto addr2 = get_server_addr();
addr1.set_type(entity_addr_t::TYPE_MSGR2);
addr2.set_type(entity_addr_t::TYPE_MSGR2);
return seastar::when_all_succeed(
// test connecting in parallel, accepting in parallel
client1->dispatch_pingpong(server2->msgr->get_myaddr()),
client2->dispatch_pingpong(server1->msgr->get_myaddr()));
// shutdown
}).then_unpack([] {
return seastar::now();
}).then([client1] {
logger().info("client1 shutdown...");
return client1->shutdown();
}).then([client2] {
logger().info("client2 shutdown...");
return client2->shutdown();
}).then([server1] {
logger().info("server1 shutdown...");
return server1->shutdown();
}).then([server2] {
logger().info("server2 shutdown...");
return server2->shutdown();
}).then([] {
logger().info("test_echo() done!\n");
}).handle_exception([server1, server2, client1, client2] (auto eptr) {
logger().error("test_echo() failed: got exception {}", eptr);
throw;
server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
client1->init(entity_name_t::OSD(2), "client1", 3),
client2->init(entity_name_t::OSD(3), "client2", 4)
// dispatch pingpoing
).then_unpack([client1, client2, server1, server2] {
return seastar::when_all_succeed(
// test connecting in parallel, accepting in parallel
client1->dispatch_pingpong(server2->msgr->get_myaddr()),
client2->dispatch_pingpong(server1->msgr->get_myaddr()));
// shutdown
}).then_unpack([] {
return seastar::now();
}).then([client1] {
logger().info("client1 shutdown...");
return client1->shutdown();
}).then([client2] {
logger().info("client2 shutdown...");
return client2->shutdown();
}).then([server1] {
logger().info("server1 shutdown...");
return server1->shutdown();
}).then([server2] {
logger().info("server2 shutdown...");
return server2->shutdown();
}).then([] {
logger().info("test_echo() done!\n");
}).handle_exception([server1, server2, client1, client2] (auto eptr) {
logger().error("test_echo() failed: got exception {}", eptr);
throw;
}).finally([gates] {
return gates->close();
});
});
}