From 478e9e8395d7e7f113215e8ae69423379f376f80 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Thu, 4 Apr 2019 21:18:00 +0800 Subject: [PATCH 1/2] test/crimson: perf_crimson/async_server write test support Allow server to have write/send workload with specified block size. Signed-off-by: Yingxin Cheng --- src/test/crimson/perf_async_msgr.cc | 37 +++++++++++----- src/test/crimson/perf_crimson_msgr.cc | 61 +++++++++++++++++---------- 2 files changed, 64 insertions(+), 34 deletions(-) diff --git a/src/test/crimson/perf_async_msgr.cc b/src/test/crimson/perf_async_msgr.cc index ca5780c23b5..d64388ec56c 100644 --- a/src/test/crimson/perf_async_msgr.cc +++ b/src/test/crimson/perf_async_msgr.cc @@ -17,8 +17,8 @@ namespace { constexpr int CEPH_OSD_PROTOCOL = 10; struct Server { - Server(CephContext* cct) - : dummy_auth(cct), dispatcher(cct) + Server(CephContext* cct, unsigned msg_len) + : dummy_auth(cct), dispatcher(cct, msg_len) { msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(0), "server", 0, 0)); dummy_auth.auth_registry.refresh_config(); @@ -31,9 +31,14 @@ struct Server { DummyAuthClientServer dummy_auth; unique_ptr msgr; struct ServerDispatcher : Dispatcher { - ServerDispatcher(CephContext* cct) - : Dispatcher(cct) - {} + unsigned msg_len = 0; + bufferlist msg_data; + + ServerDispatcher(CephContext* cct, unsigned msg_len) + : Dispatcher(cct), msg_len(msg_len) + { + msg_data.append_zero(msg_len); + } bool ms_can_fast_dispatch_any() const override { return true; } @@ -42,8 +47,15 @@ struct Server { } void ms_fast_dispatch(Message* m) override { ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); - MOSDOp *req = static_cast(m); - m->get_connection()->send_message(new MOSDOpReply(req, 0, 0, 0, false)); + const static pg_t pgid; + const static object_locator_t oloc; + const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(), + pgid.pool(), oloc.nspace); + static spg_t spgid(pgid); + MOSDOp *rep = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0); + bufferlist data(msg_data); + rep->write(0, msg_len, data); + m->get_connection()->send_message(rep); m->put(); } bool ms_dispatch(Message*) override { @@ -62,10 +74,10 @@ struct Server { } -static void run(CephContext* cct, entity_addr_t addr) +static void run(CephContext* cct, entity_addr_t addr, unsigned bs) { std::cout << "async server listening at " << addr << std::endl; - Server server{cct}; + Server server{cct, bs}; server.msgr->bind(addr); server.msgr->add_dispatcher_head(&server.dispatcher); server.msgr->start(); @@ -79,7 +91,9 @@ int main(int argc, char** argv) desc.add_options() ("help,h", "show help message") ("addr", po::value()->default_value("v1:0.0.0.0:9010"), - "server address"); + "server address") + ("bs", po::value()->default_value(0), + "server block size"); po::variables_map vm; std::vector unrecognized_options; try { @@ -102,6 +116,7 @@ int main(int argc, char** argv) auto addr = vm["addr"].as(); entity_addr_t target_addr; target_addr.parse(addr.c_str(), nullptr); + auto bs = vm["bs"].as(); std::vector args(argv, argv + argc); auto cct = global_init(nullptr, args, @@ -109,5 +124,5 @@ int main(int argc, char** argv) CODE_ENVIRONMENT_UTILITY, CINIT_FLAG_NO_MON_CONFIG); common_init_finish(cct.get()); - run(cct.get(), target_addr); + run(cct.get(), target_addr, bs); } diff --git a/src/test/crimson/perf_crimson_msgr.cc b/src/test/crimson/perf_crimson_msgr.cc index 0d3f15e5580..ce2005bdb7f 100644 --- a/src/test/crimson/perf_crimson_msgr.cc +++ b/src/test/crimson/perf_crimson_msgr.cc @@ -14,7 +14,6 @@ #include "common/ceph_time.h" #include "messages/MOSDOp.h" -#include "messages/MOSDOpReply.h" #include "crimson/auth/DummyAuth.h" #include "crimson/common/log.h" @@ -41,7 +40,8 @@ enum class perf_mode_t { static seastar::future<> run(unsigned rounds, unsigned jobs, - unsigned bs, + unsigned cbs, + unsigned sbs, unsigned depth, std::string addr, perf_mode_t mode, @@ -56,12 +56,16 @@ static seastar::future<> run(unsigned rounds, const seastar::shard_id sid; const seastar::shard_id msgr_sid; std::string lname; + unsigned msg_len; + bufferlist msg_data; - Server(unsigned msgr_core) + Server(unsigned msgr_core, unsigned msg_len) : sid{seastar::engine().cpu_id()}, - msgr_sid{msgr_core} { + msgr_sid{msgr_core}, + msg_len{msg_len} { lname = "server#"; lname += std::to_string(sid); + msg_data.append_zero(msg_len); } Dispatcher* get_local_shard() override { @@ -73,9 +77,18 @@ static seastar::future<> run(unsigned rounds, seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, MessageRef m) override { ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); - // reply - Ref req = boost::static_pointer_cast(m); - return c->send(MessageRef{ new MOSDOpReply(req.get(), 0, 0, 0, false), false }); + + // server replies with MOSDOp to generate server-side write workload + const static pg_t pgid; + const static object_locator_t oloc; + const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(), + pgid.pool(), oloc.nspace); + static spg_t spgid(pgid); + MOSDOp *rep = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0); + bufferlist data(msg_data); + rep->write(0, msg_len, data); + MessageRef msg = {rep, false}; + return c->send(msg); } seastar::future<> init(const entity_addr_t& addr) { @@ -142,9 +155,7 @@ static seastar::future<> run(unsigned rounds, depth{depth} { lname = "client#"; lname += std::to_string(sid); - bufferptr ptr(msg_len); - memset(ptr.c_str(), 0, msg_len); - msg_data.append(ptr); + msg_data.append_zero(msg_len); } Dispatcher* get_local_shard() override { @@ -161,13 +172,14 @@ static seastar::future<> run(unsigned rounds, } seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, MessageRef m) override { - ceph_assert(m->get_type() == CEPH_MSG_OSD_OPREPLY); + // server replies with MOSDOp to generate server-side write workload + ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); depth.signal(1); ceph_assert(active_session); ++(active_session->received_count); if (active_session->received_count == rounds) { - logger().info("{}: finished receiving {} OPREPLYs", *c, active_session->received_count); + logger().info("{}: finished receiving {} REPLYs", *c, active_session->received_count); active_session->finish_time = mono_clock::now(); active_session->done.set_value(); } @@ -290,15 +302,15 @@ static seastar::future<> run(unsigned rounds, }; return seastar::when_all_succeed( - ceph::net::create_sharded(core), - ceph::net::create_sharded(jobs, rounds, bs, depth)) + ceph::net::create_sharded(core, sbs), + ceph::net::create_sharded(jobs, rounds, cbs, depth)) .then([=](test_state::Server *server, test_state::Client *client) { entity_addr_t target_addr; target_addr.parse(addr.c_str(), nullptr); if (mode == perf_mode_t::both) { - logger().info("\nperf settings:\n mode=server+client\n server addr={}\n server core={}\n rounds={}\n client jobs={}\n bs={}\n depth={}\n", - addr, core, rounds, jobs, bs, depth); + logger().info("\nperf settings:\n mode=server+client\n server addr={}\n server core={}\n rounds={}\n client jobs={}\n client bs={}\n server bs={}\n depth={}\n", + addr, core, rounds, jobs, cbs, sbs, depth); ceph_assert(seastar::smp::count >= std::max(1+jobs, 1+core)); ceph_assert(core == 0 || core > jobs); ceph_assert(jobs > 0); @@ -315,8 +327,8 @@ static seastar::future<> run(unsigned rounds, return server->shutdown(); }); } else if (mode == perf_mode_t::client) { - logger().info("\nperf settings:\n mode=client\n server addr={}\n rounds={}\n client jobs={}\n bs={}\n depth={}\n", - addr, rounds, jobs, bs, depth); + logger().info("\nperf settings:\n mode=client\n server addr={}\n rounds={}\n client jobs={}\n client bs={}\n depth={}\n", + addr, rounds, jobs, cbs, depth); ceph_assert(seastar::smp::count >= 1+jobs); ceph_assert(jobs > 0); return client->init() @@ -329,8 +341,8 @@ static seastar::future<> run(unsigned rounds, }); } else { // mode == perf_mode_t::server ceph_assert(seastar::smp::count >= 1+core); - logger().info("\nperf settings:\n mode=server\n server addr={}\n server core={}\n", - addr, core); + logger().info("\nperf settings:\n mode=server\n server addr={}\n server core={}\n server bs={}\n", + addr, core, sbs); return server->init(target_addr) // dispatch ops .then([server] { @@ -359,22 +371,25 @@ int main(int argc, char** argv) "number of messaging rounds") ("jobs", bpo::value()->default_value(1), "number of jobs (client messengers)") - ("bs", bpo::value()->default_value(4096), + ("cbs", bpo::value()->default_value(4096), "block size") + ("sbs", bpo::value()->default_value(0), + "server block size") ("depth", bpo::value()->default_value(512), "io depth"); return app.run(argc, argv, [&app] { auto&& config = app.configuration(); auto rounds = config["rounds"].as(); auto jobs = config["jobs"].as(); - auto bs = config["bs"].as(); + auto cbs = config["cbs"].as(); + auto sbs = config["sbs"].as(); auto depth = config["depth"].as(); auto addr = config["addr"].as(); auto core = config["core"].as(); auto mode = config["mode"].as(); ceph_assert(mode <= 2); auto _mode = static_cast(mode); - return run(rounds, jobs, bs, depth, addr, _mode, core) + return run(rounds, jobs, cbs, sbs, depth, addr, _mode, core) .then([] { logger().info("\nsuccessful!\n"); }).handle_exception([] (auto eptr) { From e289ec8521866959dae23050d13b414f2597898a Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Mon, 8 Apr 2019 16:49:46 +0800 Subject: [PATCH 2/2] test/crimson: wrap perf parameters as structured data Signed-off-by: Yingxin Cheng --- src/test/crimson/perf_crimson_msgr.cc | 144 +++++++++++++++++--------- 1 file changed, 95 insertions(+), 49 deletions(-) diff --git a/src/test/crimson/perf_crimson_msgr.cc b/src/test/crimson/perf_crimson_msgr.cc index ce2005bdb7f..07f67210a91 100644 --- a/src/test/crimson/perf_crimson_msgr.cc +++ b/src/test/crimson/perf_crimson_msgr.cc @@ -38,14 +38,68 @@ enum class perf_mode_t { server }; -static seastar::future<> run(unsigned rounds, - unsigned jobs, - unsigned cbs, - unsigned sbs, - unsigned depth, - std::string addr, - perf_mode_t mode, - unsigned core) +struct client_config { + entity_addr_t server_addr; + unsigned block_size; + unsigned rounds; + unsigned jobs; + unsigned depth; + + std::string str() const { + std::ostringstream out; + out << "client[>> " << server_addr + << "](bs=" << block_size + << ", rounds=" << rounds + << ", jobs=" << jobs + << ", depth=" << depth + << ")"; + return out.str(); + } + + static client_config load(bpo::variables_map& options) { + client_config conf; + entity_addr_t addr; + ceph_assert(addr.parse(options["addr"].as().c_str(), nullptr)); + + conf.server_addr = addr; + conf.block_size = options["cbs"].as(); + conf.rounds = options["rounds"].as(); + conf.jobs = options["jobs"].as(); + conf.depth = options["depth"].as(); + return conf; + } +}; + +struct server_config { + entity_addr_t addr; + unsigned block_size; + unsigned core; + + std::string str() const { + std::ostringstream out; + out << "server[" << addr + << "](bs=" << block_size + << ", core=" << core + << ")"; + return out.str(); + } + + static server_config load(bpo::variables_map& options) { + server_config conf; + entity_addr_t addr; + ceph_assert(addr.parse(options["addr"].as().c_str(), nullptr)); + + conf.addr = addr; + conf.block_size = options["sbs"].as(); + conf.core = options["core"].as(); + return conf; + } +}; + +static seastar::future<> run( + perf_mode_t mode, + const client_config& client_conf, + const server_config& server_conf) { struct test_state { struct Server final @@ -302,24 +356,24 @@ static seastar::future<> run(unsigned rounds, }; return seastar::when_all_succeed( - ceph::net::create_sharded(core, sbs), - ceph::net::create_sharded(jobs, rounds, cbs, depth)) + ceph::net::create_sharded(server_conf.core, server_conf.block_size), + ceph::net::create_sharded(client_conf.jobs, client_conf.rounds, + client_conf.block_size, client_conf.depth)) .then([=](test_state::Server *server, test_state::Client *client) { - entity_addr_t target_addr; - target_addr.parse(addr.c_str(), nullptr); if (mode == perf_mode_t::both) { - logger().info("\nperf settings:\n mode=server+client\n server addr={}\n server core={}\n rounds={}\n client jobs={}\n client bs={}\n server bs={}\n depth={}\n", - addr, core, rounds, jobs, cbs, sbs, depth); - ceph_assert(seastar::smp::count >= std::max(1+jobs, 1+core)); - ceph_assert(core == 0 || core > jobs); - ceph_assert(jobs > 0); + logger().info("\nperf settings:\n {}\n {}\n", + client_conf.str(), server_conf.str()); + ceph_assert(seastar::smp::count >= 1+client_conf.jobs); + ceph_assert(client_conf.jobs > 0); + ceph_assert(seastar::smp::count >= 1+server_conf.core); + ceph_assert(server_conf.core == 0 || server_conf.core > client_conf.jobs); return seastar::when_all_succeed( - server->init(target_addr), + server->init(server_conf.addr), client->init()) // dispatch ops - .then([client, target_addr] { - return client->dispatch_messages(target_addr); + .then([client, addr = client_conf.server_addr] { + return client->dispatch_messages(addr); // shutdown }).finally([client] { return client->shutdown(); @@ -327,23 +381,21 @@ static seastar::future<> run(unsigned rounds, return server->shutdown(); }); } else if (mode == perf_mode_t::client) { - logger().info("\nperf settings:\n mode=client\n server addr={}\n rounds={}\n client jobs={}\n client bs={}\n depth={}\n", - addr, rounds, jobs, cbs, depth); - ceph_assert(seastar::smp::count >= 1+jobs); - ceph_assert(jobs > 0); + logger().info("\nperf settings:\n {}\n", client_conf.str()); + ceph_assert(seastar::smp::count >= 1+client_conf.jobs); + ceph_assert(client_conf.jobs > 0); return client->init() // dispatch ops - .then([client, target_addr] { - return client->dispatch_messages(target_addr); + .then([client, addr = client_conf.server_addr] { + return client->dispatch_messages(addr); // shutdown }).finally([client] { return client->shutdown(); }); } else { // mode == perf_mode_t::server - ceph_assert(seastar::smp::count >= 1+core); - logger().info("\nperf settings:\n mode=server\n server addr={}\n server core={}\n server bs={}\n", - addr, core, sbs); - return server->init(target_addr) + ceph_assert(seastar::smp::count >= 1+server_conf.core); + logger().info("\nperf settings:\n {}\n", server_conf.str()); + return server->init(server_conf.addr) // dispatch ops .then([server] { return server->msgr->wait(); @@ -361,36 +413,30 @@ int main(int argc, char** argv) { seastar::app_template app; app.add_options() - ("addr", bpo::value()->default_value("v1:0.0.0.0:9010"), - "server address") - ("core", bpo::value()->default_value(0), - "server running core") ("mode", bpo::value()->default_value(0), "0: both, 1:client, 2:server") + ("addr", bpo::value()->default_value("v1:0.0.0.0:9010"), + "server address") ("rounds", bpo::value()->default_value(65536), - "number of messaging rounds") + "number of client messaging rounds") ("jobs", bpo::value()->default_value(1), - "number of jobs (client messengers)") + "number of client jobs (messengers)") ("cbs", bpo::value()->default_value(4096), - "block size") - ("sbs", bpo::value()->default_value(0), - "server block size") + "client block size") ("depth", bpo::value()->default_value(512), - "io depth"); + "client io depth") + ("core", bpo::value()->default_value(0), + "server running core") + ("sbs", bpo::value()->default_value(0), + "server block size"); return app.run(argc, argv, [&app] { auto&& config = app.configuration(); - auto rounds = config["rounds"].as(); - auto jobs = config["jobs"].as(); - auto cbs = config["cbs"].as(); - auto sbs = config["sbs"].as(); - auto depth = config["depth"].as(); - auto addr = config["addr"].as(); - auto core = config["core"].as(); auto mode = config["mode"].as(); ceph_assert(mode <= 2); auto _mode = static_cast(mode); - return run(rounds, jobs, cbs, sbs, depth, addr, _mode, core) - .then([] { + auto server_conf = server_config::load(config); + auto client_conf = client_config::load(config); + return run(_mode, client_conf, server_conf).then([] { logger().info("\nsuccessful!\n"); }).handle_exception([] (auto eptr) { logger().info("\nfailed!\n");