From 35deda533e86c8e13b69f7f7d0d77cfe344b2974 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 13 Sep 2018 16:34:48 -0400 Subject: [PATCH] crimson/net: add unit test for concurrent dispatch Signed-off-by: Casey Bodley --- src/test/crimson/test_messenger.cc | 68 +++++++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 166a9180d8d..c16434113e2 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -121,6 +121,69 @@ static seastar::future<> test_echo(unsigned rounds, }); } +static seastar::future<> test_concurrent_dispatch() +{ + struct test_state { + entity_addr_t addr; + + struct { + ceph::net::SocketMessenger messenger{entity_name_t::OSD(1)}; + class ServerDispatcher : public ceph::net::Dispatcher { + int count = 0; + seastar::promise<> on_second; // satisfied on second dispatch + seastar::promise<> on_done; // satisfied when first dispatch unblocks + public: + seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + MessageRef m) override { + switch (++count) { + case 1: + // block on the first request until we reenter with the second + return on_second.get_future().then([=] { on_done.set_value(); }); + case 2: + on_second.set_value(); + return seastar::now(); + default: + throw std::runtime_error("unexpected count"); + } + } + seastar::future<> wait() { return on_done.get_future(); } + } dispatcher; + } server; + + struct { + ceph::net::SocketMessenger messenger{entity_name_t::OSD(0)}; + ceph::net::Dispatcher dispatcher; + } client; + }; + return seastar::do_with(test_state{}, + [] (test_state& t) { + // bind the server + t.addr.set_family(AF_INET); + t.addr.set_port(9010); + t.server.messenger.bind(t.addr); + + return t.server.messenger.start(&t.server.dispatcher) + .then([&] { + return t.client.messenger.start(&t.client.dispatcher) + .then([&] { + return t.client.messenger.connect(t.addr, + entity_name_t::TYPE_OSD); + }).then([] (ceph::net::ConnectionRef conn) { + // send two messages + conn->send(MessageRef{new MPing, false}); + conn->send(MessageRef{new MPing, false}); + }).then([&] { + // wait for the server to get both + return t.server.dispatcher.wait(); + }).finally([&] { + return t.client.messenger.shutdown(); + }); + }).finally([&] { + return t.server.messenger.shutdown(); + }); + }); +} + int main(int argc, char** argv) { seastar::app_template app; @@ -136,7 +199,10 @@ int main(int argc, char** argv) verbose = config["verbose"].as(); auto rounds = config["rounds"].as(); auto keepalive_ratio = config["keepalive-ratio"].as(); - return test_echo(rounds, keepalive_ratio).then([] { + return test_echo(rounds, keepalive_ratio) + .then([] { + return test_concurrent_dispatch(); + }).then([] { std::cout << "All tests succeeded" << std::endl; }).handle_exception([] (auto eptr) { std::cout << "Test failure" << std::endl;