mirror of
https://github.com/ceph/ceph
synced 2025-01-25 20:45:06 +00:00
test/crimson: split async-msgr out of alien_echo
async-msgr is not compatible with custom-built libs for seastar, so split it. Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
This commit is contained in:
parent
d7a279f759
commit
b9b0c61bf7
@ -17,7 +17,11 @@ target_link_libraries(perf_crimson_msgr ceph-common crimson)
|
|||||||
|
|
||||||
add_executable(unittest_seastar_echo
|
add_executable(unittest_seastar_echo
|
||||||
test_alien_echo.cc)
|
test_alien_echo.cc)
|
||||||
target_link_libraries(unittest_seastar_echo ceph-common global crimson)
|
target_link_libraries(unittest_seastar_echo crimson)
|
||||||
|
|
||||||
|
add_executable(unittest_async_echo
|
||||||
|
test_async_echo.cc)
|
||||||
|
target_link_libraries(unittest_async_echo ceph-common global)
|
||||||
|
|
||||||
add_executable(unittest_seastar_thread_pool
|
add_executable(unittest_seastar_thread_pool
|
||||||
test_thread_pool.cc)
|
test_thread_pool.cc)
|
||||||
|
@ -1,10 +1,7 @@
|
|||||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
|
||||||
|
|
||||||
#include "auth/Auth.h"
|
#include "auth/Auth.h"
|
||||||
#include "global/global_init.h"
|
|
||||||
#include "messages/MPing.h"
|
#include "messages/MPing.h"
|
||||||
#include "msg/Dispatcher.h"
|
|
||||||
#include "msg/Messenger.h"
|
|
||||||
#include "crimson/net/Connection.h"
|
#include "crimson/net/Connection.h"
|
||||||
#include "crimson/net/Dispatcher.h"
|
#include "crimson/net/Dispatcher.h"
|
||||||
#include "crimson/net/Messenger.h"
|
#include "crimson/net/Messenger.h"
|
||||||
@ -98,125 +95,6 @@ struct Client {
|
|||||||
};
|
};
|
||||||
} // namespace seastar_pingpong
|
} // namespace seastar_pingpong
|
||||||
|
|
||||||
namespace native_pingpong {
|
|
||||||
|
|
||||||
constexpr int CEPH_OSD_PROTOCOL = 10;
|
|
||||||
|
|
||||||
struct Server {
|
|
||||||
Server(CephContext* cct, const entity_inst_t& entity)
|
|
||||||
: dispatcher(cct)
|
|
||||||
{
|
|
||||||
msgr.reset(Messenger::create(cct, "async",
|
|
||||||
entity.name, "pong", entity.addr.get_nonce(), 0));
|
|
||||||
msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
|
|
||||||
msgr->set_default_policy(Messenger::Policy::stateless_server(0));
|
|
||||||
}
|
|
||||||
unique_ptr<Messenger> msgr;
|
|
||||||
struct ServerDispatcher : Dispatcher {
|
|
||||||
std::mutex mutex;
|
|
||||||
std::condition_variable on_reply;
|
|
||||||
bool replied = false;
|
|
||||||
ServerDispatcher(CephContext* cct)
|
|
||||||
: Dispatcher(cct)
|
|
||||||
{}
|
|
||||||
bool ms_can_fast_dispatch_any() const override {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
bool ms_can_fast_dispatch(const Message* m) const override {
|
|
||||||
return m->get_type() == CEPH_MSG_PING;
|
|
||||||
}
|
|
||||||
void ms_fast_dispatch(Message* m) override {
|
|
||||||
m->get_connection()->send_message(new MPing);
|
|
||||||
m->put();
|
|
||||||
{
|
|
||||||
std::lock_guard lock{mutex};
|
|
||||||
replied = true;
|
|
||||||
}
|
|
||||||
on_reply.notify_one();
|
|
||||||
}
|
|
||||||
bool ms_dispatch(Message*) override {
|
|
||||||
ceph_abort();
|
|
||||||
}
|
|
||||||
bool ms_handle_reset(Connection*) override {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
void ms_handle_remote_reset(Connection*) override {
|
|
||||||
}
|
|
||||||
bool ms_handle_refused(Connection*) override {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
void echo() {
|
|
||||||
replied = false;
|
|
||||||
std::unique_lock lock{mutex};
|
|
||||||
return on_reply.wait(lock, [this] { return replied; });
|
|
||||||
}
|
|
||||||
} dispatcher;
|
|
||||||
void echo() {
|
|
||||||
dispatcher.echo();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
struct Client {
|
|
||||||
unique_ptr<Messenger> msgr;
|
|
||||||
Client(CephContext *cct)
|
|
||||||
: dispatcher(cct)
|
|
||||||
{
|
|
||||||
msgr.reset(Messenger::create(cct, "async",
|
|
||||||
entity_name_t::CLIENT(-1), "ping",
|
|
||||||
getpid(), 0));
|
|
||||||
msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
|
|
||||||
msgr->set_default_policy(Messenger::Policy::lossy_client(0));
|
|
||||||
}
|
|
||||||
struct ClientDispatcher : Dispatcher {
|
|
||||||
std::mutex mutex;
|
|
||||||
std::condition_variable on_reply;
|
|
||||||
bool replied = false;
|
|
||||||
|
|
||||||
ClientDispatcher(CephContext* cct)
|
|
||||||
: Dispatcher(cct)
|
|
||||||
{}
|
|
||||||
bool ms_can_fast_dispatch_any() const override {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
bool ms_can_fast_dispatch(const Message* m) const override {
|
|
||||||
return m->get_type() == CEPH_MSG_PING;
|
|
||||||
}
|
|
||||||
void ms_fast_dispatch(Message* m) override {
|
|
||||||
m->put();
|
|
||||||
{
|
|
||||||
std::lock_guard lock{mutex};
|
|
||||||
replied = true;
|
|
||||||
}
|
|
||||||
on_reply.notify_one();
|
|
||||||
}
|
|
||||||
bool ms_dispatch(Message*) override {
|
|
||||||
ceph_abort();
|
|
||||||
}
|
|
||||||
bool ms_handle_reset(Connection *) override {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
void ms_handle_remote_reset(Connection*) override {
|
|
||||||
}
|
|
||||||
bool ms_handle_refused(Connection*) override {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
bool ping(Messenger* msgr, const entity_inst_t& peer) {
|
|
||||||
auto conn = msgr->connect_to(peer.name.type(),
|
|
||||||
entity_addrvec_t{peer.addr});
|
|
||||||
replied = false;
|
|
||||||
conn->send_message(new MPing);
|
|
||||||
std::unique_lock lock{mutex};
|
|
||||||
return on_reply.wait_for(lock, 500ms, [&] {
|
|
||||||
return replied;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
} dispatcher;
|
|
||||||
void ping(const entity_inst_t& peer) {
|
|
||||||
dispatcher.ping(msgr.get(), peer);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
} // namespace native_pingpong
|
|
||||||
|
|
||||||
class SeastarContext {
|
class SeastarContext {
|
||||||
seastar::file_desc begin_fd;
|
seastar::file_desc begin_fd;
|
||||||
ceph::thread::Condition on_end;
|
ceph::thread::Condition on_end;
|
||||||
@ -324,38 +202,6 @@ seastar_echo(const entity_addr_t addr, echo_role role, unsigned count)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void ceph_echo(CephContext* cct,
|
|
||||||
entity_addr_t addr, echo_role role, unsigned count)
|
|
||||||
{
|
|
||||||
std::cout << "ceph/";
|
|
||||||
entity_inst_t entity{entity_name_t::OSD(0), addr};
|
|
||||||
if (role == echo_role::as_server) {
|
|
||||||
std::cout << "server listening at " << addr << std::endl;
|
|
||||||
native_pingpong::Server server{cct, entity};
|
|
||||||
server.msgr->bind(addr);
|
|
||||||
server.msgr->add_dispatcher_head(&server.dispatcher);
|
|
||||||
server.msgr->start();
|
|
||||||
for (unsigned i = 0; i < count; i++) {
|
|
||||||
server.echo();
|
|
||||||
}
|
|
||||||
server.msgr->shutdown();
|
|
||||||
server.msgr->wait();
|
|
||||||
} else {
|
|
||||||
std::cout << "client sending to " << addr << std::endl;
|
|
||||||
native_pingpong::Client client{cct};
|
|
||||||
client.msgr->add_dispatcher_head(&client.dispatcher);
|
|
||||||
client.msgr->start();
|
|
||||||
auto conn = client.msgr->connect_to(entity.name.type(),
|
|
||||||
entity_addrvec_t{entity.addr});
|
|
||||||
for (unsigned i = 0; i < count; i++) {
|
|
||||||
std::cout << "seq=" << i << std::endl;
|
|
||||||
client.ping(entity);
|
|
||||||
}
|
|
||||||
client.msgr->shutdown();
|
|
||||||
client.msgr->wait();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int main(int argc, char** argv)
|
int main(int argc, char** argv)
|
||||||
{
|
{
|
||||||
namespace po = boost::program_options;
|
namespace po = boost::program_options;
|
||||||
@ -364,8 +210,6 @@ int main(int argc, char** argv)
|
|||||||
("help,h", "show help message")
|
("help,h", "show help message")
|
||||||
("role", po::value<std::string>()->default_value("pong"),
|
("role", po::value<std::string>()->default_value("pong"),
|
||||||
"role to play (ping | pong)")
|
"role to play (ping | pong)")
|
||||||
("test", po::value<std::string>()->default_value("seastar"),
|
|
||||||
"messenger to use (seastar | ceph)")
|
|
||||||
("port", po::value<uint16_t>()->default_value(9010),
|
("port", po::value<uint16_t>()->default_value(9010),
|
||||||
"port #")
|
"port #")
|
||||||
("nonce", po::value<uint32_t>()->default_value(42),
|
("nonce", po::value<uint32_t>()->default_value(42),
|
||||||
@ -409,33 +253,23 @@ int main(int argc, char** argv)
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto count = vm["count"].as<unsigned>();
|
auto count = vm["count"].as<unsigned>();
|
||||||
if (vm["test"].as<std::string>() == "seastar") {
|
seastar::app_template app;
|
||||||
seastar::app_template app;
|
SeastarContext sc;
|
||||||
SeastarContext sc;
|
auto job = sc.with_seastar([&] {
|
||||||
auto job = sc.with_seastar([&] {
|
auto fut = seastar::alien::submit_to(0, [addr, role, count] {
|
||||||
auto fut = seastar::alien::submit_to(0, [addr, role, count] {
|
return seastar_echo(addr, role, count);
|
||||||
return seastar_echo(addr, role, count);
|
|
||||||
});
|
|
||||||
fut.wait();
|
|
||||||
});
|
});
|
||||||
std::vector<char*> av{argv[0]};
|
fut.wait();
|
||||||
std::transform(begin(unrecognized_options),
|
});
|
||||||
end(unrecognized_options),
|
std::vector<char*> av{argv[0]};
|
||||||
std::back_inserter(av),
|
std::transform(begin(unrecognized_options),
|
||||||
[](auto& s) {
|
end(unrecognized_options),
|
||||||
return const_cast<char*>(s.c_str());
|
std::back_inserter(av),
|
||||||
});
|
[](auto& s) {
|
||||||
sc.run(app, av.size(), av.data());
|
return const_cast<char*>(s.c_str());
|
||||||
job.join();
|
});
|
||||||
} else {
|
sc.run(app, av.size(), av.data());
|
||||||
std::vector<const char*> args(argv, argv + argc);
|
job.join();
|
||||||
auto cct = global_init(nullptr, args,
|
|
||||||
CEPH_ENTITY_TYPE_CLIENT,
|
|
||||||
CODE_ENVIRONMENT_UTILITY,
|
|
||||||
CINIT_FLAG_NO_MON_CONFIG);
|
|
||||||
common_init_finish(cct.get());
|
|
||||||
ceph_echo(cct.get(), addr, role, count);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
226
src/test/crimson/test_async_echo.cc
Normal file
226
src/test/crimson/test_async_echo.cc
Normal file
@ -0,0 +1,226 @@
|
|||||||
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
|
||||||
|
|
||||||
|
#include <boost/program_options/variables_map.hpp>
|
||||||
|
#include <boost/program_options/parsers.hpp>
|
||||||
|
|
||||||
|
#include "auth/Auth.h"
|
||||||
|
#include "global/global_init.h"
|
||||||
|
#include "messages/MPing.h"
|
||||||
|
#include "msg/Dispatcher.h"
|
||||||
|
#include "msg/Messenger.h"
|
||||||
|
|
||||||
|
enum class echo_role {
|
||||||
|
as_server,
|
||||||
|
as_client,
|
||||||
|
};
|
||||||
|
|
||||||
|
namespace native_pingpong {
|
||||||
|
|
||||||
|
constexpr int CEPH_OSD_PROTOCOL = 10;
|
||||||
|
|
||||||
|
struct Server {
|
||||||
|
Server(CephContext* cct, const entity_inst_t& entity)
|
||||||
|
: dispatcher(cct)
|
||||||
|
{
|
||||||
|
msgr.reset(Messenger::create(cct, "async",
|
||||||
|
entity.name, "pong", entity.addr.get_nonce(), 0));
|
||||||
|
msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
|
||||||
|
msgr->set_default_policy(Messenger::Policy::stateless_server(0));
|
||||||
|
}
|
||||||
|
unique_ptr<Messenger> msgr;
|
||||||
|
struct ServerDispatcher : Dispatcher {
|
||||||
|
std::mutex mutex;
|
||||||
|
std::condition_variable on_reply;
|
||||||
|
bool replied = false;
|
||||||
|
ServerDispatcher(CephContext* cct)
|
||||||
|
: Dispatcher(cct)
|
||||||
|
{}
|
||||||
|
bool ms_can_fast_dispatch_any() const override {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
bool ms_can_fast_dispatch(const Message* m) const override {
|
||||||
|
return m->get_type() == CEPH_MSG_PING;
|
||||||
|
}
|
||||||
|
void ms_fast_dispatch(Message* m) override {
|
||||||
|
m->get_connection()->send_message(new MPing);
|
||||||
|
m->put();
|
||||||
|
{
|
||||||
|
std::lock_guard lock{mutex};
|
||||||
|
replied = true;
|
||||||
|
}
|
||||||
|
on_reply.notify_one();
|
||||||
|
}
|
||||||
|
bool ms_dispatch(Message*) override {
|
||||||
|
ceph_abort();
|
||||||
|
}
|
||||||
|
bool ms_handle_reset(Connection*) override {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
void ms_handle_remote_reset(Connection*) override {
|
||||||
|
}
|
||||||
|
bool ms_handle_refused(Connection*) override {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
void echo() {
|
||||||
|
replied = false;
|
||||||
|
std::unique_lock lock{mutex};
|
||||||
|
return on_reply.wait(lock, [this] { return replied; });
|
||||||
|
}
|
||||||
|
} dispatcher;
|
||||||
|
void echo() {
|
||||||
|
dispatcher.echo();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct Client {
|
||||||
|
unique_ptr<Messenger> msgr;
|
||||||
|
Client(CephContext *cct)
|
||||||
|
: dispatcher(cct)
|
||||||
|
{
|
||||||
|
msgr.reset(Messenger::create(cct, "async",
|
||||||
|
entity_name_t::CLIENT(-1), "ping",
|
||||||
|
getpid(), 0));
|
||||||
|
msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
|
||||||
|
msgr->set_default_policy(Messenger::Policy::lossy_client(0));
|
||||||
|
}
|
||||||
|
struct ClientDispatcher : Dispatcher {
|
||||||
|
std::mutex mutex;
|
||||||
|
std::condition_variable on_reply;
|
||||||
|
bool replied = false;
|
||||||
|
|
||||||
|
ClientDispatcher(CephContext* cct)
|
||||||
|
: Dispatcher(cct)
|
||||||
|
{}
|
||||||
|
bool ms_can_fast_dispatch_any() const override {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
bool ms_can_fast_dispatch(const Message* m) const override {
|
||||||
|
return m->get_type() == CEPH_MSG_PING;
|
||||||
|
}
|
||||||
|
void ms_fast_dispatch(Message* m) override {
|
||||||
|
m->put();
|
||||||
|
{
|
||||||
|
std::lock_guard lock{mutex};
|
||||||
|
replied = true;
|
||||||
|
}
|
||||||
|
on_reply.notify_one();
|
||||||
|
}
|
||||||
|
bool ms_dispatch(Message*) override {
|
||||||
|
ceph_abort();
|
||||||
|
}
|
||||||
|
bool ms_handle_reset(Connection *) override {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
void ms_handle_remote_reset(Connection*) override {
|
||||||
|
}
|
||||||
|
bool ms_handle_refused(Connection*) override {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
bool ping(Messenger* msgr, const entity_inst_t& peer) {
|
||||||
|
auto conn = msgr->connect_to(peer.name.type(),
|
||||||
|
entity_addrvec_t{peer.addr});
|
||||||
|
replied = false;
|
||||||
|
conn->send_message(new MPing);
|
||||||
|
std::unique_lock lock{mutex};
|
||||||
|
return on_reply.wait_for(lock, 500ms, [&] {
|
||||||
|
return replied;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} dispatcher;
|
||||||
|
void ping(const entity_inst_t& peer) {
|
||||||
|
dispatcher.ping(msgr.get(), peer);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} // namespace native_pingpong
|
||||||
|
|
||||||
|
static void ceph_echo(CephContext* cct,
|
||||||
|
entity_addr_t addr, echo_role role, unsigned count)
|
||||||
|
{
|
||||||
|
std::cout << "ceph/";
|
||||||
|
entity_inst_t entity{entity_name_t::OSD(0), addr};
|
||||||
|
if (role == echo_role::as_server) {
|
||||||
|
std::cout << "server listening at " << addr << std::endl;
|
||||||
|
native_pingpong::Server server{cct, entity};
|
||||||
|
server.msgr->bind(addr);
|
||||||
|
server.msgr->add_dispatcher_head(&server.dispatcher);
|
||||||
|
server.msgr->start();
|
||||||
|
for (unsigned i = 0; i < count; i++) {
|
||||||
|
server.echo();
|
||||||
|
}
|
||||||
|
server.msgr->shutdown();
|
||||||
|
server.msgr->wait();
|
||||||
|
} else {
|
||||||
|
std::cout << "client sending to " << addr << std::endl;
|
||||||
|
native_pingpong::Client client{cct};
|
||||||
|
client.msgr->add_dispatcher_head(&client.dispatcher);
|
||||||
|
client.msgr->start();
|
||||||
|
auto conn = client.msgr->connect_to(entity.name.type(),
|
||||||
|
entity_addrvec_t{entity.addr});
|
||||||
|
for (unsigned i = 0; i < count; i++) {
|
||||||
|
std::cout << "seq=" << i << std::endl;
|
||||||
|
client.ping(entity);
|
||||||
|
}
|
||||||
|
client.msgr->shutdown();
|
||||||
|
client.msgr->wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char** argv)
|
||||||
|
{
|
||||||
|
namespace po = boost::program_options;
|
||||||
|
po::options_description desc{"Allowed options"};
|
||||||
|
desc.add_options()
|
||||||
|
("help,h", "show help message")
|
||||||
|
("role", po::value<std::string>()->default_value("pong"),
|
||||||
|
"role to play (ping | pong)")
|
||||||
|
("port", po::value<uint16_t>()->default_value(9010),
|
||||||
|
"port #")
|
||||||
|
("nonce", po::value<uint32_t>()->default_value(42),
|
||||||
|
"a unique number to identify the pong server")
|
||||||
|
("count", po::value<unsigned>()->default_value(10),
|
||||||
|
"stop after sending/echoing <count> MPing messages")
|
||||||
|
("v2", po::value<bool>()->default_value(false),
|
||||||
|
"using msgr v2 protocol");
|
||||||
|
po::variables_map vm;
|
||||||
|
std::vector<std::string> unrecognized_options;
|
||||||
|
try {
|
||||||
|
auto parsed = po::command_line_parser(argc, argv)
|
||||||
|
.options(desc)
|
||||||
|
.allow_unregistered()
|
||||||
|
.run();
|
||||||
|
po::store(parsed, vm);
|
||||||
|
if (vm.count("help")) {
|
||||||
|
std::cout << desc << std::endl;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
po::notify(vm);
|
||||||
|
unrecognized_options = po::collect_unrecognized(parsed.options, po::include_positional);
|
||||||
|
} catch(const po::error& e) {
|
||||||
|
std::cerr << "error: " << e.what() << std::endl;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
entity_addr_t addr;
|
||||||
|
if (vm["v2"].as<bool>()) {
|
||||||
|
addr.set_type(entity_addr_t::TYPE_MSGR2);
|
||||||
|
} else {
|
||||||
|
addr.set_type(entity_addr_t::TYPE_LEGACY);
|
||||||
|
}
|
||||||
|
addr.set_family(AF_INET);
|
||||||
|
addr.set_port(vm["port"].as<std::uint16_t>());
|
||||||
|
addr.set_nonce(vm["nonce"].as<std::uint32_t>());
|
||||||
|
|
||||||
|
echo_role role = echo_role::as_server;
|
||||||
|
if (vm["role"].as<std::string>() == "ping") {
|
||||||
|
role = echo_role::as_client;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto count = vm["count"].as<unsigned>();
|
||||||
|
std::vector<const char*> args(argv, argv + argc);
|
||||||
|
auto cct = global_init(nullptr, args,
|
||||||
|
CEPH_ENTITY_TYPE_CLIENT,
|
||||||
|
CODE_ENVIRONMENT_UTILITY,
|
||||||
|
CINIT_FLAG_NO_MON_CONFIG);
|
||||||
|
common_init_finish(cct.get());
|
||||||
|
ceph_echo(cct.get(), addr, role, count);
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user