Merge pull request #27429 from cyx1231st/wip-seastar-msgr-perf2

test/crimson: include writes in perf_crimson/async_server

Reviewed-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2019-04-09 11:27:52 +08:00 committed by GitHub
commit 539b8b40e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 144 additions and 68 deletions

View File

@ -17,8 +17,8 @@ namespace {
constexpr int CEPH_OSD_PROTOCOL = 10;
struct Server {
Server(CephContext* cct)
: dummy_auth(cct), dispatcher(cct)
Server(CephContext* cct, unsigned msg_len)
: dummy_auth(cct), dispatcher(cct, msg_len)
{
msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(0), "server", 0, 0));
dummy_auth.auth_registry.refresh_config();
@ -31,9 +31,14 @@ struct Server {
DummyAuthClientServer dummy_auth;
unique_ptr<Messenger> msgr;
struct ServerDispatcher : Dispatcher {
ServerDispatcher(CephContext* cct)
: Dispatcher(cct)
{}
unsigned msg_len = 0;
bufferlist msg_data;
ServerDispatcher(CephContext* cct, unsigned msg_len)
: Dispatcher(cct), msg_len(msg_len)
{
msg_data.append_zero(msg_len);
}
bool ms_can_fast_dispatch_any() const override {
return true;
}
@ -42,8 +47,15 @@ struct Server {
}
void ms_fast_dispatch(Message* m) override {
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
MOSDOp *req = static_cast<MOSDOp*>(m);
m->get_connection()->send_message(new MOSDOpReply(req, 0, 0, 0, false));
const static pg_t pgid;
const static object_locator_t oloc;
const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
pgid.pool(), oloc.nspace);
static spg_t spgid(pgid);
MOSDOp *rep = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0);
bufferlist data(msg_data);
rep->write(0, msg_len, data);
m->get_connection()->send_message(rep);
m->put();
}
bool ms_dispatch(Message*) override {
@ -62,10 +74,10 @@ struct Server {
}
static void run(CephContext* cct, entity_addr_t addr)
static void run(CephContext* cct, entity_addr_t addr, unsigned bs)
{
std::cout << "async server listening at " << addr << std::endl;
Server server{cct};
Server server{cct, bs};
server.msgr->bind(addr);
server.msgr->add_dispatcher_head(&server.dispatcher);
server.msgr->start();
@ -79,7 +91,9 @@ int main(int argc, char** argv)
desc.add_options()
("help,h", "show help message")
("addr", po::value<std::string>()->default_value("v1:0.0.0.0:9010"),
"server address");
"server address")
("bs", po::value<unsigned>()->default_value(0),
"server block size");
po::variables_map vm;
std::vector<std::string> unrecognized_options;
try {
@ -102,6 +116,7 @@ int main(int argc, char** argv)
auto addr = vm["addr"].as<std::string>();
entity_addr_t target_addr;
target_addr.parse(addr.c_str(), nullptr);
auto bs = vm["bs"].as<unsigned>();
std::vector<const char*> args(argv, argv + argc);
auto cct = global_init(nullptr, args,
@ -109,5 +124,5 @@ int main(int argc, char** argv)
CODE_ENVIRONMENT_UTILITY,
CINIT_FLAG_NO_MON_CONFIG);
common_init_finish(cct.get());
run(cct.get(), target_addr);
run(cct.get(), target_addr, bs);
}

View File

@ -14,7 +14,6 @@
#include "common/ceph_time.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
#include "crimson/auth/DummyAuth.h"
#include "crimson/common/log.h"
@ -39,13 +38,68 @@ enum class perf_mode_t {
server
};
static seastar::future<> run(unsigned rounds,
unsigned jobs,
unsigned bs,
unsigned depth,
std::string addr,
perf_mode_t mode,
unsigned core)
struct client_config {
entity_addr_t server_addr;
unsigned block_size;
unsigned rounds;
unsigned jobs;
unsigned depth;
std::string str() const {
std::ostringstream out;
out << "client[>> " << server_addr
<< "](bs=" << block_size
<< ", rounds=" << rounds
<< ", jobs=" << jobs
<< ", depth=" << depth
<< ")";
return out.str();
}
static client_config load(bpo::variables_map& options) {
client_config conf;
entity_addr_t addr;
ceph_assert(addr.parse(options["addr"].as<std::string>().c_str(), nullptr));
conf.server_addr = addr;
conf.block_size = options["cbs"].as<unsigned>();
conf.rounds = options["rounds"].as<unsigned>();
conf.jobs = options["jobs"].as<unsigned>();
conf.depth = options["depth"].as<unsigned>();
return conf;
}
};
struct server_config {
entity_addr_t addr;
unsigned block_size;
unsigned core;
std::string str() const {
std::ostringstream out;
out << "server[" << addr
<< "](bs=" << block_size
<< ", core=" << core
<< ")";
return out.str();
}
static server_config load(bpo::variables_map& options) {
server_config conf;
entity_addr_t addr;
ceph_assert(addr.parse(options["addr"].as<std::string>().c_str(), nullptr));
conf.addr = addr;
conf.block_size = options["sbs"].as<unsigned>();
conf.core = options["core"].as<unsigned>();
return conf;
}
};
static seastar::future<> run(
perf_mode_t mode,
const client_config& client_conf,
const server_config& server_conf)
{
struct test_state {
struct Server final
@ -56,12 +110,16 @@ static seastar::future<> run(unsigned rounds,
const seastar::shard_id sid;
const seastar::shard_id msgr_sid;
std::string lname;
unsigned msg_len;
bufferlist msg_data;
Server(unsigned msgr_core)
Server(unsigned msgr_core, unsigned msg_len)
: sid{seastar::engine().cpu_id()},
msgr_sid{msgr_core} {
msgr_sid{msgr_core},
msg_len{msg_len} {
lname = "server#";
lname += std::to_string(sid);
msg_data.append_zero(msg_len);
}
Dispatcher* get_local_shard() override {
@ -73,9 +131,18 @@ static seastar::future<> run(unsigned rounds,
seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
MessageRef m) override {
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
// reply
Ref<MOSDOp> req = boost::static_pointer_cast<MOSDOp>(m);
return c->send(MessageRef{ new MOSDOpReply(req.get(), 0, 0, 0, false), false });
// server replies with MOSDOp to generate server-side write workload
const static pg_t pgid;
const static object_locator_t oloc;
const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
pgid.pool(), oloc.nspace);
static spg_t spgid(pgid);
MOSDOp *rep = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0);
bufferlist data(msg_data);
rep->write(0, msg_len, data);
MessageRef msg = {rep, false};
return c->send(msg);
}
seastar::future<> init(const entity_addr_t& addr) {
@ -142,9 +209,7 @@ static seastar::future<> run(unsigned rounds,
depth{depth} {
lname = "client#";
lname += std::to_string(sid);
bufferptr ptr(msg_len);
memset(ptr.c_str(), 0, msg_len);
msg_data.append(ptr);
msg_data.append_zero(msg_len);
}
Dispatcher* get_local_shard() override {
@ -161,13 +226,14 @@ static seastar::future<> run(unsigned rounds,
}
seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
MessageRef m) override {
ceph_assert(m->get_type() == CEPH_MSG_OSD_OPREPLY);
// server replies with MOSDOp to generate server-side write workload
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
depth.signal(1);
ceph_assert(active_session);
++(active_session->received_count);
if (active_session->received_count == rounds) {
logger().info("{}: finished receiving {} OPREPLYs", *c, active_session->received_count);
logger().info("{}: finished receiving {} REPLYs", *c, active_session->received_count);
active_session->finish_time = mono_clock::now();
active_session->done.set_value();
}
@ -290,24 +356,24 @@ static seastar::future<> run(unsigned rounds,
};
return seastar::when_all_succeed(
ceph::net::create_sharded<test_state::Server>(core),
ceph::net::create_sharded<test_state::Client>(jobs, rounds, bs, depth))
ceph::net::create_sharded<test_state::Server>(server_conf.core, server_conf.block_size),
ceph::net::create_sharded<test_state::Client>(client_conf.jobs, client_conf.rounds,
client_conf.block_size, client_conf.depth))
.then([=](test_state::Server *server,
test_state::Client *client) {
entity_addr_t target_addr;
target_addr.parse(addr.c_str(), nullptr);
if (mode == perf_mode_t::both) {
logger().info("\nperf settings:\n mode=server+client\n server addr={}\n server core={}\n rounds={}\n client jobs={}\n bs={}\n depth={}\n",
addr, core, rounds, jobs, bs, depth);
ceph_assert(seastar::smp::count >= std::max(1+jobs, 1+core));
ceph_assert(core == 0 || core > jobs);
ceph_assert(jobs > 0);
logger().info("\nperf settings:\n {}\n {}\n",
client_conf.str(), server_conf.str());
ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
ceph_assert(client_conf.jobs > 0);
ceph_assert(seastar::smp::count >= 1+server_conf.core);
ceph_assert(server_conf.core == 0 || server_conf.core > client_conf.jobs);
return seastar::when_all_succeed(
server->init(target_addr),
server->init(server_conf.addr),
client->init())
// dispatch ops
.then([client, target_addr] {
return client->dispatch_messages(target_addr);
.then([client, addr = client_conf.server_addr] {
return client->dispatch_messages(addr);
// shutdown
}).finally([client] {
return client->shutdown();
@ -315,23 +381,21 @@ static seastar::future<> run(unsigned rounds,
return server->shutdown();
});
} else if (mode == perf_mode_t::client) {
logger().info("\nperf settings:\n mode=client\n server addr={}\n rounds={}\n client jobs={}\n bs={}\n depth={}\n",
addr, rounds, jobs, bs, depth);
ceph_assert(seastar::smp::count >= 1+jobs);
ceph_assert(jobs > 0);
logger().info("\nperf settings:\n {}\n", client_conf.str());
ceph_assert(seastar::smp::count >= 1+client_conf.jobs);
ceph_assert(client_conf.jobs > 0);
return client->init()
// dispatch ops
.then([client, target_addr] {
return client->dispatch_messages(target_addr);
.then([client, addr = client_conf.server_addr] {
return client->dispatch_messages(addr);
// shutdown
}).finally([client] {
return client->shutdown();
});
} else { // mode == perf_mode_t::server
ceph_assert(seastar::smp::count >= 1+core);
logger().info("\nperf settings:\n mode=server\n server addr={}\n server core={}\n",
addr, core);
return server->init(target_addr)
ceph_assert(seastar::smp::count >= 1+server_conf.core);
logger().info("\nperf settings:\n {}\n", server_conf.str());
return server->init(server_conf.addr)
// dispatch ops
.then([server] {
return server->msgr->wait();
@ -349,33 +413,30 @@ int main(int argc, char** argv)
{
seastar::app_template app;
app.add_options()
("addr", bpo::value<std::string>()->default_value("v1:0.0.0.0:9010"),
"server address")
("core", bpo::value<unsigned>()->default_value(0),
"server running core")
("mode", bpo::value<unsigned>()->default_value(0),
"0: both, 1:client, 2:server")
("addr", bpo::value<std::string>()->default_value("v1:0.0.0.0:9010"),
"server address")
("rounds", bpo::value<unsigned>()->default_value(65536),
"number of messaging rounds")
"number of client messaging rounds")
("jobs", bpo::value<unsigned>()->default_value(1),
"number of jobs (client messengers)")
("bs", bpo::value<unsigned>()->default_value(4096),
"block size")
"number of client jobs (messengers)")
("cbs", bpo::value<unsigned>()->default_value(4096),
"client block size")
("depth", bpo::value<unsigned>()->default_value(512),
"io depth");
"client io depth")
("core", bpo::value<unsigned>()->default_value(0),
"server running core")
("sbs", bpo::value<unsigned>()->default_value(0),
"server block size");
return app.run(argc, argv, [&app] {
auto&& config = app.configuration();
auto rounds = config["rounds"].as<unsigned>();
auto jobs = config["jobs"].as<unsigned>();
auto bs = config["bs"].as<unsigned>();
auto depth = config["depth"].as<unsigned>();
auto addr = config["addr"].as<std::string>();
auto core = config["core"].as<unsigned>();
auto mode = config["mode"].as<unsigned>();
ceph_assert(mode <= 2);
auto _mode = static_cast<perf_mode_t>(mode);
return run(rounds, jobs, bs, depth, addr, _mode, core)
.then([] {
auto server_conf = server_config::load(config);
auto client_conf = client_config::load(config);
return run(_mode, client_conf, server_conf).then([] {
logger().info("\nsuccessful!\n");
}).handle_exception([] (auto eptr) {
logger().info("\nfailed!\n");