Merge pull request #37303 from tchaikov/wip-crimson-osd

crimson: do not set_value() twice and fix Message leak

Reviewed-by: Chunmei Liu <chunmei.liu@intel.com>
Reviewed-by: Yingxin Cheng <yingxin.cheng@intel.com>
This commit is contained in:
Kefu Chai 2020-09-23 16:49:12 +08:00 committed by GitHub
commit 8262c5571a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 74 additions and 67 deletions

View File

@ -51,34 +51,34 @@ public:
Connection(const AuthRegistry& auth_registry,
crimson::net::ConnectionRef conn,
KeyRing* keyring);
enum class AuthResult {
enum class auth_result_t {
success = 0,
failure,
canceled
};
seastar::future<> handle_auth_reply(Ref<MAuthReply> m);
// v1
seastar::future<AuthResult> authenticate_v1(
seastar::future<auth_result_t> authenticate_v1(
epoch_t epoch,
const EntityName& name,
uint32_t want_keys);
// v2
seastar::future<AuthResult> authenticate_v2();
seastar::future<auth_result_t> authenticate_v2();
auth::AuthClient::auth_request_t
get_auth_request(const EntityName& name,
uint32_t want_keys);
using secret_t = string;
tuple<CryptoKey, secret_t, bufferlist>
handle_auth_reply_more(const ceph::buffer::list& bl);
tuple<CryptoKey, secret_t, int>
handle_auth_done(uint64_t new_global_id,
const ceph::buffer::list& bl);
int handle_auth_bad_method(uint32_t old_auth_method,
int result,
const std::vector<uint32_t>& allowed_methods,
const std::vector<uint32_t>& allowed_modes);
// v1 and v2
tuple<CryptoKey, secret_t, int>
handle_auth_done(uint64_t new_global_id,
const ceph::buffer::list& bl);
void close();
bool is_my_peer(const entity_addr_t& addr) const;
AuthAuthorizer* get_authorizer(entity_type_t peer) const;
@ -99,8 +99,8 @@ private:
rotating,
general,
};
seastar::future<std::optional<AuthResult>> do_auth_single(request_t);
seastar::future<AuthResult> do_auth(request_t);
seastar::future<std::optional<auth_result_t>> do_auth_single(request_t);
seastar::future<auth_result_t> do_auth(request_t);
private:
bool closed = false;
@ -110,7 +110,7 @@ private:
using clock_t = seastar::lowres_system_clock;
clock_t::time_point auth_start;
crimson::auth::method_t auth_method = 0;
seastar::promise<AuthResult> auth_done;
std::optional<seastar::promise<auth_result_t>> auth_done;
// v1 and v2
const AuthRegistry& auth_registry;
crimson::net::ConnectionRef conn;
@ -141,8 +141,8 @@ seastar::future<> Connection::handle_auth_reply(Ref<MAuthReply> m)
seastar::future<> Connection::renew_tickets()
{
if (auth->need_tickets()) {
return do_auth(request_t::general).then([](AuthResult r) {
if (r != AuthResult::success) {
return do_auth(request_t::general).then([](auth_result_t r) {
if (r != auth_result_t::success) {
throw std::system_error(
make_error_code(
crimson::net::error::negotiation_failure));
@ -166,8 +166,8 @@ seastar::future<> Connection::renew_rotating_keyring()
return seastar::now();
}
last_rotating_renew_sent = now;
return do_auth(request_t::rotating).then([](AuthResult r) {
if (r != AuthResult::success) {
return do_auth(request_t::rotating).then([](auth_result_t r) {
if (r != auth_result_t::success) {
throw std::system_error(make_error_code(
crimson::net::error::negotiation_failure));
}
@ -226,7 +226,7 @@ Connection::setup_session(epoch_t epoch,
return conn->send(m);
}
seastar::future<std::optional<Connection::AuthResult>>
seastar::future<std::optional<Connection::auth_result_t>>
Connection::do_auth_single(Connection::request_t what)
{
auto m = make_message<MAuth>();
@ -254,8 +254,8 @@ Connection::do_auth_single(Connection::request_t what)
if (!m) {
ceph_assert(closed);
logger().info("do_auth: connection closed");
return seastar::make_ready_future<std::optional<Connection::AuthResult>>(
std::make_optional(AuthResult::canceled));
return seastar::make_ready_future<std::optional<Connection::auth_result_t>>(
std::make_optional(auth_result_t::canceled));
}
logger().info(
"do_auth: mon {} => {} returns {}: {}",
@ -270,24 +270,24 @@ Connection::do_auth_single(Connection::request_t what)
ret,
conn->get_peer_addr());
}
return seastar::make_ready_future<std::optional<Connection::AuthResult>>(
return seastar::make_ready_future<std::optional<Connection::auth_result_t>>(
ret == -EAGAIN
? std::nullopt
: std::make_optional(ret == 0
? AuthResult::success
: AuthResult::failure
? auth_result_t::success
: auth_result_t::failure
));
});
}
seastar::future<Connection::AuthResult>
seastar::future<Connection::auth_result_t>
Connection::do_auth(Connection::request_t what) {
return seastar::repeat_until_value([this, what]() {
return do_auth_single(what);
});
}
seastar::future<Connection::AuthResult>
seastar::future<Connection::auth_result_t>
Connection::authenticate_v1(epoch_t epoch,
const EntityName& name,
uint32_t want_keys)
@ -299,7 +299,7 @@ Connection::authenticate_v1(epoch_t epoch,
}).then([name, want_keys, this](Ref<MAuthReply> m) {
if (!m) {
logger().error("authenticate_v1 canceled on {}", name);
return seastar::make_ready_future<AuthResult>(AuthResult::canceled);
return seastar::make_ready_future<auth_result_t>(auth_result_t::canceled);
}
global_id = m->global_id;
auth = create_auth(m->protocol, m->global_id, name, want_keys);
@ -308,7 +308,7 @@ Connection::authenticate_v1(epoch_t epoch,
nullptr, nullptr)) {
case 0:
// none
return seastar::make_ready_future<AuthResult>(AuthResult::success);
return seastar::make_ready_future<auth_result_t>(auth_result_t::success);
case -EAGAIN:
// cephx
return do_auth(request_t::general);
@ -317,15 +317,16 @@ Connection::authenticate_v1(epoch_t epoch,
}
}).handle_exception([](auto ep) {
logger().error("authenticate_v1 failed with {}", ep);
return seastar::make_ready_future<AuthResult>(AuthResult::canceled);
return seastar::make_ready_future<auth_result_t>(auth_result_t::canceled);
});
}
seastar::future<Connection::AuthResult> Connection::authenticate_v2()
seastar::future<Connection::auth_result_t> Connection::authenticate_v2()
{
auth_start = seastar::lowres_system_clock::now();
return conn->send(make_message<MMonGetMap>()).then([this] {
return auth_done.get_future();
auth_done.emplace();
return auth_done->get_future();
});
}
@ -397,7 +398,10 @@ Connection::handle_auth_done(uint64_t new_global_id,
secret_t connection_secret;
int r = auth->handle_response(0, p, &session_key, &connection_secret);
conn->set_last_keepalive_ack(auth_start);
auth_done.set_value(AuthResult::success);
if (auth_done) {
auth_done->set_value(auth_result_t::success);
auth_done.reset();
}
return {session_key, connection_secret, r};
}
@ -418,7 +422,8 @@ int Connection::handle_auth_bad_method(uint32_t old_auth_method,
if (p == auth_supported.end()) {
logger().error("server allowed_methods {} but i only support {}",
allowed_methods, auth_supported);
auth_done.set_exception(std::system_error(make_error_code(
assert(auth_done);
auth_done->set_exception(std::system_error(make_error_code(
crimson::net::error::negotiation_failure)));
return -EACCES;
}
@ -431,8 +436,10 @@ void Connection::close()
{
reply.set_value(Ref<MAuthReply>(nullptr));
reply = {};
auth_done.set_value(AuthResult::canceled);
auth_done = {};
if (auth_done) {
auth_done->set_value(auth_result_t::canceled);
auth_done.reset();
}
if (conn && !std::exchange(closed, true)) {
conn->mark_down();
}
@ -961,7 +968,7 @@ seastar::future<> Client::reopen_session(int rank)
}
logger().info("connecting to mon.{}", rank);
return seastar::futurize_invoke(
[peer, this] () -> seastar::future<Connection::AuthResult> {
[peer, this] () -> seastar::future<Connection::auth_result_t> {
auto conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MON);
auto& mc = pending_conns.emplace_back(
std::make_unique<Connection>(auth_registry, conn, &keyring));
@ -971,45 +978,13 @@ seastar::future<> Client::reopen_session(int rank)
return mc->authenticate_v1(monmap.get_epoch(), entity_name, want_keys)
.handle_exception([conn](auto ep) {
conn->mark_down();
return seastar::make_exception_future<Connection::AuthResult>(ep);
return seastar::make_exception_future<Connection::auth_result_t>(ep);
});
}
}).then([peer, this](auto result) {
if (result == Connection::AuthResult::canceled) {
return seastar::now();
if (result == Connection::auth_result_t::success) {
_finish_auth(peer);
}
if (!is_hunting()) {
return seastar::now();
}
logger().info("found mon.{}", monmap.get_name(peer));
auto found = std::find_if(
pending_conns.begin(), pending_conns.end(),
[peer](auto& conn) {
return conn->is_my_peer(peer);
});
if (found == pending_conns.end()) {
// Happens if another connection has won the race
ceph_assert(active_con && pending_conns.empty());
logger().info(
"no pending connection for mon.{}, peer {}",
monmap.get_name(peer),
peer);
return seastar::now();
}
ceph_assert(!active_con && !pending_conns.empty());
active_con = std::move(*found);
found->reset();
for (auto& conn : pending_conns) {
if (conn) {
conn->close();
}
}
pending_conns.clear();
return seastar::now();
}).then([]() {
logger().debug("reopen_session mon connection attempts complete");
}).handle_exception([](auto ep) {
logger().error("mon connections failed with ep {}", ep);
@ -1024,6 +999,37 @@ seastar::future<> Client::reopen_session(int rank)
});
}
void Client::_finish_auth(const entity_addr_t& peer)
{
if (!is_hunting()) {
return;
}
logger().info("found mon.{}", monmap.get_name(peer));
auto found = std::find_if(
pending_conns.begin(), pending_conns.end(),
[peer](auto& conn) {
return conn->is_my_peer(peer);
});
if (found == pending_conns.end()) {
// Happens if another connection has won the race
ceph_assert(active_con && pending_conns.empty());
logger().info("no pending connection for mon.{}, peer {}",
monmap.get_name(peer), peer);
return;
}
ceph_assert(!active_con && !pending_conns.empty());
active_con = std::move(*found);
found->reset();
for (auto& conn : pending_conns) {
if (conn) {
conn->close();
}
}
pending_conns.clear();
}
Client::command_result_t
Client::run_command(const std::vector<std::string>& cmd,
const bufferlist& bl)

View File

@ -163,6 +163,7 @@ private:
seastar::future<> reopen_session(int rank);
std::vector<unsigned> get_random_mons(unsigned n) const;
seastar::future<> _add_conn(unsigned rank, uint64_t global_id);
void _finish_auth(const entity_addr_t& peer);
crimson::common::Gated gate;
// messages that are waiting for the active_con to be available

View File

@ -159,7 +159,7 @@ public:
void send_cluster_message(
int osd, Message *m,
epoch_t epoch, bool share_map_update=false) final {
(void)shard_services.send_to_osd(osd, m, epoch);
(void)shard_services.send_to_osd(osd, MessageRef{m, false}, epoch);
}
void send_pg_created(pg_t pgid) final {