Merge pull request #23642 from tchaikov/wip-crimson-msgr

cmake,crimson/net: add keepalive support, and enable unittest_seastar_messenger in "make check"

Reviewed-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
Kefu Chai 2018-08-22 16:00:14 +08:00 committed by GitHub
commit a5735b8718
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 123 additions and 31 deletions

View File

@ -59,6 +59,10 @@ class Connection : public boost::intrusive_ref_counter<Connection,
/// send a message over a connection that has completed its handshake
virtual seastar::future<> send(MessageRef msg) = 0;
/// send a keepalive message over a connection that has completed its
/// handshake
virtual seastar::future<> keepalive() = 0;
/// close the connection and cancel any any pending futures from read/send
virtual seastar::future<> close() = 0;

View File

@ -15,6 +15,7 @@
#include <algorithm>
#include <seastar/core/shared_future.hh>
#include <seastar/core/sleep.hh>
#include <seastar/net/packet.hh>
#include "Config.h"
#include "Messenger.h"
@ -242,7 +243,7 @@ seastar::future<MessageRef> SocketConnection::read_message()
bool SocketConnection::update_rx_seq(seq_num_t seq)
{
if (seq <= in_seq) {
if (has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
if (HAVE_FEATURE(features, RECONNECT_SEQ) &&
conf.ms_die_on_old_message) {
assert(0 == "old msgs despite reconnect_seq feature");
}
@ -270,7 +271,7 @@ seastar::future<> SocketConnection::write_message(MessageRef msg)
bl.append(msg->get_middle());
bl.append(msg->get_data());
auto& footer = msg->get_footer();
if (has_feature(CEPH_FEATURE_MSG_AUTH)) {
if (HAVE_FEATURE(features, MSG_AUTH)) {
bl.append((const char*)&footer, sizeof(footer));
} else {
ceph_msg_footer_old old_footer;
@ -312,6 +313,21 @@ seastar::future<> SocketConnection::send(MessageRef msg)
return f.get_future();
}
seastar::future<> SocketConnection::keepalive()
{
seastar::shared_future<> f = send_ready.then([this] {
k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
ceph::coarse_real_clock::now());
seastar::net::packet msg{reinterpret_cast<const char*>(&k.req),
sizeof(k.req)};
return out.write(std::move(msg));
}).then([this] {
return out.flush();
});
send_ready = f.get_future();
return f.get_future();
}
seastar::future<> SocketConnection::close()
{
// unregister_conn() drops a reference, so hold another until completion
@ -539,14 +555,11 @@ SocketConnection::handle_keepalive2()
{
return in.read_exactly(sizeof(ceph_timespec))
.then([this] (auto buf) {
auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
k.reply_stamp = *t;
std::cout << "keepalive2 " << t->tv_sec << std::endl;
char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK;
return out.write(reinterpret_cast<const char*>(&tag), sizeof(tag));
}).then([this] {
out.write(reinterpret_cast<const char*>(&k.reply_stamp),
sizeof(k.reply_stamp));
k.ack.stamp = *reinterpret_cast<const ceph_timespec*>(buf.get());
std::cout << "keepalive2 " << k.ack.stamp.tv_sec << std::endl;
seastar::net::packet msg{reinterpret_cast<const char*>(&k.ack),
sizeof(k.ack)};
return out.write(std::move(msg));
}).then([this] {
return out.flush();
});
@ -610,7 +623,8 @@ seastar::future<> SocketConnection::replace_existing(ConnectionRef existing,
bool is_reset_from_peer)
{
msgr_tag_t reply_tag;
if ((h.connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) {
if (HAVE_FEATURE(h.connect.features, RECONNECT_SEQ) &&
!is_reset_from_peer) {
reply_tag = CEPH_MSGR_TAG_SEQ;
} else {
reply_tag = CEPH_MSGR_TAG_READY;
@ -708,7 +722,7 @@ void SocketConnection::reset_session()
decltype(sent){}.swap(sent);
in_seq = 0;
h.connect_seq = 0;
if (has_feature(CEPH_FEATURE_MSG_AUTH)) {
if (HAVE_FEATURE(features, MSG_AUTH)) {
// Set out_seq to a random value, so CRC won't be predictable.
// Constant to limit starting sequence number to 2^31. Nothing special
// about it, just a big number.

View File

@ -109,9 +109,6 @@ class SocketConnection : public Connection {
void set_features(uint64_t new_features) {
features = new_features;
}
bool has_feature(uint64_t feature) const {
return features & feature;
}
/// the seq num of the last transmitted message
seq_num_t out_seq = 0;
@ -131,7 +128,14 @@ class SocketConnection : public Connection {
static void discard_up_to(std::queue<MessageRef>*, seq_num_t);
struct Keepalive {
ceph_timespec reply_stamp;
struct {
const char tag = CEPH_MSGR_TAG_KEEPALIVE2;
ceph_timespec stamp;
} __attribute__((packed)) req;
struct {
const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK;
ceph_timespec stamp;
} __attribute__((packed)) ack;
ceph_timespec ack_stamp;
} k;
@ -155,6 +159,8 @@ class SocketConnection : public Connection {
seastar::future<> send(MessageRef msg) override;
seastar::future<> keepalive() override;
seastar::future<> close() override;
uint32_t connect_seq() const override {

View File

@ -10,6 +10,7 @@ add_ceph_unittest(unittest_seastar_denc)
target_link_libraries(unittest_seastar_denc ceph-common global crimson)
add_executable(unittest_seastar_messenger test_messenger.cc)
add_ceph_unittest(unittest_seastar_messenger)
target_link_libraries(unittest_seastar_messenger ceph-common crimson)
add_executable(unittest_seastar_echo

View File

@ -2,11 +2,21 @@
#include "crimson/net/Connection.h"
#include "crimson/net/Dispatcher.h"
#include "crimson/net/SocketMessenger.h"
#include <random>
#include <boost/program_options.hpp>
#include <seastar/core/app-template.hh>
#include <seastar/core/future-util.hh>
#include <seastar/core/reactor.hh>
static seastar::future<> test_echo()
namespace bpo = boost::program_options;
static std::random_device rd;
static std::default_random_engine rng{rd()};
static bool verbose = false;
static seastar::future<> test_echo(unsigned rounds,
double keepalive_ratio)
{
struct test_state {
entity_addr_t addr;
@ -16,7 +26,9 @@ static seastar::future<> test_echo()
struct ServerDispatcher : ceph::net::Dispatcher {
seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
MessageRef m) override {
std::cout << "server got " << *m << std::endl;
if (verbose) {
std::cout << "server got " << *m << std::endl;
}
// reply with a pong
return c->send(MessageRef{new MPing(), false});
}
@ -24,42 +36,86 @@ static seastar::future<> test_echo()
} server;
struct {
unsigned rounds;
std::bernoulli_distribution keepalive_dist{};
ceph::net::SocketMessenger messenger{entity_name_t::OSD(0)};
struct ClientDispatcher : ceph::net::Dispatcher {
seastar::promise<MessageRef> reply;
unsigned count = 0u;
seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
MessageRef m) override {
++count;
if (verbose) {
std::cout << "client ms_dispatch " << count << std::endl;
}
reply.set_value(std::move(m));
return seastar::now();
}
} dispatcher;
seastar::future<> pingpong(ceph::net::ConnectionRef c) {
return seastar::repeat([conn=std::move(c), this] {
if (keepalive_dist(rng)) {
return conn->keepalive().then([] {
return seastar::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::no);
});
} else {
return conn->send(MessageRef{new MPing(), false}).then([&] {
return dispatcher.reply.get_future();
}).then([&] (MessageRef msg) {
dispatcher.reply = seastar::promise<MessageRef>{};
if (verbose) {
std::cout << "client got reply " << *msg << std::endl;
}
return seastar::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::yes);
});
};
});
}
bool done() const {
return dispatcher.count >= rounds;
}
} client;
};
return seastar::do_with(test_state{},
[] (test_state& t) {
[rounds, keepalive_ratio] (test_state& t) {
// bind the server
t.addr.set_family(AF_INET);
t.addr.set_port(9010);
t.server.messenger.bind(t.addr);
t.client.rounds = rounds;
t.client.keepalive_dist = std::bernoulli_distribution{keepalive_ratio};
return t.server.messenger.start(&t.server.dispatcher)
.then([&] {
return t.client.messenger.start(&t.client.dispatcher)
.then([&] {
return t.client.messenger.connect(t.addr, entity_name_t::TYPE_OSD);
}).then([] (ceph::net::ConnectionRef conn) {
std::cout << "client connected" << std::endl;
return conn->send(MessageRef{new MPing(), false});
}).then([&] {
return t.client.dispatcher.reply.get_future();
}).then([&] (MessageRef msg) {
std::cout << "client got reply " << *msg << std::endl;
return t.client.messenger.connect(t.addr,
entity_name_t::TYPE_OSD);
}).then([&client=t.client] (ceph::net::ConnectionRef conn) {
if (verbose) {
std::cout << "client connected" << std::endl;
}
return seastar::repeat([&client,conn=std::move(conn)] {
return client.pingpong(conn).then([&client] {
return seastar::make_ready_future<seastar::stop_iteration>(
client.done() ?
seastar::stop_iteration::yes :
seastar::stop_iteration::no);
});
});
}).finally([&] {
std::cout << "client shutting down" << std::endl;
if (verbose) {
std::cout << "client shutting down" << std::endl;
}
return t.client.messenger.shutdown();
});
}).finally([&] {
std::cout << "server shutting down" << std::endl;
if (verbose) {
std::cout << "server shutting down" << std::endl;
}
return t.server.messenger.shutdown();
});
});
@ -68,8 +124,19 @@ static seastar::future<> test_echo()
int main(int argc, char** argv)
{
seastar::app_template app;
return app.run(argc, argv, [] {
return test_echo().then([] {
app.add_options()
("verbose,v", bpo::value<bool>()->default_value(false),
"chatty if true")
("rounds", bpo::value<unsigned>()->default_value(512),
"number of pingpong rounds")
("keepalive-ratio", bpo::value<double>()->default_value(0.1),
"ratio of keepalive in ping messages");
return app.run(argc, argv, [&] {
auto&& config = app.configuration();
verbose = config["verbose"].as<bool>();
auto rounds = config["rounds"].as<unsigned>();
auto keepalive_ratio = config["keepalive-ratio"].as<double>();
return test_echo(rounds, keepalive_ratio).then([] {
std::cout << "All tests succeeded" << std::endl;
}).handle_exception([] (auto eptr) {
std::cout << "Test failure" << std::endl;