crimson/net: add unit test for concurrent dispatch

Signed-off-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
Casey Bodley 2018-09-13 16:34:48 -04:00
parent 43abe8995a
commit 35deda533e

View File

@ -121,6 +121,69 @@ static seastar::future<> test_echo(unsigned rounds,
});
}
static seastar::future<> test_concurrent_dispatch()
{
struct test_state {
entity_addr_t addr;
struct {
ceph::net::SocketMessenger messenger{entity_name_t::OSD(1)};
class ServerDispatcher : public ceph::net::Dispatcher {
int count = 0;
seastar::promise<> on_second; // satisfied on second dispatch
seastar::promise<> on_done; // satisfied when first dispatch unblocks
public:
seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
MessageRef m) override {
switch (++count) {
case 1:
// block on the first request until we reenter with the second
return on_second.get_future().then([=] { on_done.set_value(); });
case 2:
on_second.set_value();
return seastar::now();
default:
throw std::runtime_error("unexpected count");
}
}
seastar::future<> wait() { return on_done.get_future(); }
} dispatcher;
} server;
struct {
ceph::net::SocketMessenger messenger{entity_name_t::OSD(0)};
ceph::net::Dispatcher dispatcher;
} client;
};
return seastar::do_with(test_state{},
[] (test_state& t) {
// bind the server
t.addr.set_family(AF_INET);
t.addr.set_port(9010);
t.server.messenger.bind(t.addr);
return t.server.messenger.start(&t.server.dispatcher)
.then([&] {
return t.client.messenger.start(&t.client.dispatcher)
.then([&] {
return t.client.messenger.connect(t.addr,
entity_name_t::TYPE_OSD);
}).then([] (ceph::net::ConnectionRef conn) {
// send two messages
conn->send(MessageRef{new MPing, false});
conn->send(MessageRef{new MPing, false});
}).then([&] {
// wait for the server to get both
return t.server.dispatcher.wait();
}).finally([&] {
return t.client.messenger.shutdown();
});
}).finally([&] {
return t.server.messenger.shutdown();
});
});
}
int main(int argc, char** argv)
{
seastar::app_template app;
@ -136,7 +199,10 @@ int main(int argc, char** argv)
verbose = config["verbose"].as<bool>();
auto rounds = config["rounds"].as<unsigned>();
auto keepalive_ratio = config["keepalive-ratio"].as<double>();
return test_echo(rounds, keepalive_ratio).then([] {
return test_echo(rounds, keepalive_ratio)
.then([] {
return test_concurrent_dispatch();
}).then([] {
std::cout << "All tests succeeded" << std::endl;
}).handle_exception([] (auto eptr) {
std::cout << "Test failure" << std::endl;