msg/async: msgr2: message flow handshake

Signed-off-by: Ricardo Dias <rdias@suse.com>
This commit is contained in:
Ricardo Dias 2018-10-26 16:23:34 +01:00
parent 31d022a0e5
commit 6032ae85f2
No known key found for this signature in database
GPG Key ID: 74390C579BD37B68
2 changed files with 253 additions and 6 deletions

View File

@ -5,6 +5,7 @@
#include "AsyncMessenger.h"
#include "common/errno.h"
#include "include/random.h"
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
@ -31,6 +32,8 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection)
: Protocol(2, connection),
temp_buffer(nullptr),
state(NONE),
peer_required_features(0),
cookie(0),
bannerExchangeCallback(nullptr),
next_frame_len(0) {
temp_buffer = new char[4096];
@ -200,7 +203,8 @@ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
ldout(cct, 1) << __func__ << " banner peer_type=" << (int)peer_type
<< " supported=" << std::hex << peer_supported_features
<< " required=" << std::hex << peer_required_features << dendl;
<< " required=" << std::hex << peer_required_features
<< std::dec << dendl;
if (connection->get_peer_type() == -1) {
connection->set_peer_type(peer_type);
@ -224,7 +228,7 @@ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
ldout(cct, 1) << __func__ << " peer does not support all required features"
<< " required=" << std::hex << required_features
<< " supported=" << std::hex << peer_supported_features
<< dendl;
<< std::dec << dendl;
stop();
connection->dispatch_queue->queue_reset(connection);
return nullptr;
@ -292,6 +296,10 @@ CtPtr ProtocolV2::handle_frame(char *buffer, int r) {
return handle_auth_more(buffer, payload_len);
case Tag::AUTH_DONE:
return handle_auth_done(buffer, payload_len);
case Tag::IDENT:
return handle_ident(buffer, payload_len);
case Tag::IDENT_MISSING_FEATURES:
return handle_ident_missing_features(buffer, payload_len);
default:
ceph_abort();
}
@ -349,6 +357,16 @@ CtPtr ProtocolV2::handle_auth_more_write(int r) {
return CONTINUE(read_frame);
}
CtPtr ProtocolV2::handle_ident(char *payload, uint32_t length) {
if (state == CONNECTING) {
return handle_server_ident(payload, length);
}
if (state == ACCEPTING) {
return handle_client_ident(payload, length);
}
ceph_abort("wrong state at handle_ident");
}
/* Client Protocol Methods */
CtPtr ProtocolV2::start_client_banner_exchange() {
@ -440,7 +458,74 @@ CtPtr ProtocolV2::handle_auth_done(char *payload, uint32_t length) {
CtPtr ProtocolV2::send_client_ident() {
ldout(cct, 20) << __func__ << dendl;
return nullptr;
uint64_t flags = 0;
if (connection->policy.lossy) {
flags |= CEPH_MSG_CONNECT_LOSSY;
}
cookie = ceph::util::generate_random_number<uint64_t>(0, -1ll);
IdentFrame ident(messenger->get_myaddrs(), messenger->get_myname().num(),
connection->policy.features_supported,
connection->policy.features_required, flags, cookie);
ldout(cct, 5) << __func__ << " sending identification: "
<< "addrs: " << ident.addrs << " gid: " << ident.gid
<< " features_supported: " << std::hex
<< ident.supported_features
<< " features_required: " << ident.required_features
<< " flags: " << ident.flags << " cookie: " << std::dec
<< ident.cookie << dendl;
bufferlist bl = ident.to_bufferlist();
return WRITE(bl, handle_client_ident_write);
}
CtPtr ProtocolV2::handle_client_ident_write(int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " client ident write failed r=" << r << " ("
<< cpp_strerror(r) << ")" << dendl;
return _fault();
}
return CONTINUE(read_frame);
}
CtPtr ProtocolV2::handle_ident_missing_features(char *payload,
uint32_t length) {
ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
IdentMissingFeaturesFrame ident_missing(payload, length);
lderr(cct) << __func__
<< " client does not support all server features: " << std::hex
<< ident_missing.features << std::dec << dendl;
return _fault();
}
CtPtr ProtocolV2::handle_server_ident(char *payload, uint32_t length) {
ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;
IdentFrame server_ident(payload, length);
ldout(cct, 5) << __func__ << " received server identification: "
<< "addrs: " << server_ident.addrs
<< " gid: " << server_ident.gid
<< " features_supported: " << std::hex
<< server_ident.supported_features
<< " features_required: " << server_ident.required_features
<< " flags: " << server_ident.flags << " cookie: " << std::dec
<< server_ident.cookie << dendl;
connection->set_peer_addrs(server_ident.addrs);
connection->peer_global_id = server_ident.gid;
connection->set_features(server_ident.required_features &
connection->policy.features_supported);
state = READY;
return CONTINUE(read_frame);
}
/* Server Protocol Methods */
@ -568,5 +653,89 @@ CtPtr ProtocolV2::handle_auth_done_write(int r) {
return _fault();
}
return nullptr;
return CONTINUE(read_frame);
}
CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) {
ldout(cct, 20) << __func__ << " payload_len=" << std::dec << length << dendl;
IdentFrame client_ident(payload, length);
ldout(cct, 5) << __func__ << " received client identification: "
<< "addrs: " << client_ident.addrs
<< " gid: " << client_ident.gid
<< " features_supported: " << std::hex
<< client_ident.supported_features
<< " features_required: " << client_ident.required_features
<< " flags: " << client_ident.flags << " cookie: " << std::dec
<< client_ident.cookie << dendl;
if (client_ident.addrs.empty()) {
connection->set_peer_addr(connection->target_addr);
} else {
// Should we check if one of the ident.addrs match connection->target_addr
// as we do in ProtocolV1?
connection->set_peer_addrs(client_ident.addrs);
}
uint64_t feat_missing = connection->policy.features_required &
~(uint64_t)client_ident.supported_features;
if (feat_missing) {
ldout(cct, 1) << __func__ << " peer missing required features " << std::hex
<< feat_missing << std::dec << dendl;
IdentMissingFeaturesFrame ident_missing_features(feat_missing);
bufferlist bl;
bl = ident_missing_features.to_bufferlist();
return WRITE(bl, handle_ident_missing_features_write);
}
// if everything is OK reply with server identification
connection->peer_global_id = client_ident.gid;
cookie = client_ident.cookie;
uint64_t flags = 0;
if (connection->policy.lossy) {
flags = flags | CEPH_MSG_CONNECT_LOSSY;
}
IdentFrame ident(messenger->get_myaddrs(), messenger->get_myname().num(),
connection->policy.features_supported,
connection->policy.features_required, flags, cookie);
ldout(cct, 5) << __func__ << " sending identification: "
<< "addrs: " << ident.addrs << " gid: " << ident.gid
<< " features_supported: " << std::hex
<< ident.supported_features
<< " features_required: " << ident.required_features
<< " flags: " << ident.flags << " cookie: " << std::dec
<< ident.cookie << dendl;
bufferlist bl = ident.to_bufferlist();
return WRITE(bl, handle_send_server_ident_write);
}
CtPtr ProtocolV2::handle_ident_missing_features_write(int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " ident missing features write failed r=" << r
<< " (" << cpp_strerror(r) << ")" << dendl;
return _fault();
}
return CONTINUE(read_frame);
}
CtPtr ProtocolV2::handle_send_server_ident_write(int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
ldout(cct, 1) << __func__ << " server ident write failed r=" << r << " ("
<< cpp_strerror(r) << ")" << dendl;
return _fault();
}
state = READY;
return CONTINUE(read_frame);
}

View File

@ -14,6 +14,7 @@ private:
CONNECTING,
START_ACCEPT,
ACCEPTING,
READY,
CLOSED
};
@ -29,7 +30,9 @@ private:
AUTH_BAD_METHOD,
AUTH_BAD_AUTH,
AUTH_MORE,
AUTH_DONE
AUTH_DONE,
IDENT,
IDENT_MISSING_FEATURES,
};
struct Frame {
@ -51,6 +54,12 @@ private:
}
};
struct SignedEncryptedFrame : public Frame {
SignedEncryptedFrame(Tag tag, __le32 payload_len)
: Frame(tag, payload_len) {}
bufferlist to_bufferlist() { return Frame::to_bufferlist(); }
};
struct AuthRequestFrame : public Frame {
__le32 method;
__le32 len;
@ -155,10 +164,68 @@ private:
}
};
struct IdentFrame : public SignedEncryptedFrame {
entity_addrvec_t addrs;
int64_t gid;
uint64_t supported_features; // CEPH_FEATURE_*
uint64_t required_features; // CEPH_FEATURE_*
uint64_t flags; // CEPH_MSG_CONNECT_*
uint64_t cookie;
IdentFrame(entity_addrvec_t addrs, int64_t gid, uint64_t supported_features,
uint64_t required_features, uint64_t flags, uint64_t cookie)
: SignedEncryptedFrame(Tag::IDENT, 0),
addrs(addrs),
gid(gid),
supported_features(supported_features),
required_features(required_features),
flags(flags),
cookie(cookie) {
encode(addrs, payload, -1ll);
encode(gid, payload, -1ll);
encode(supported_features, payload, -1ll);
encode(required_features, payload, -1ll);
encode(flags, payload, -1ll);
encode(cookie, payload, -1ll);
frame_len = sizeof(uint32_t) + payload.length();
}
IdentFrame(char *payload, uint32_t length)
: SignedEncryptedFrame(Tag::IDENT, length) {
bufferlist bl;
bl.append(payload, length);
try {
auto ti = bl.cbegin();
decode(addrs, ti);
decode(gid, ti);
decode(supported_features, ti);
decode(required_features, ti);
decode(flags, ti);
decode(cookie, ti);
} catch (const buffer::error &e) {
}
}
};
struct IdentMissingFeaturesFrame : public SignedEncryptedFrame {
__le64 features;
IdentMissingFeaturesFrame(uint64_t features)
: SignedEncryptedFrame(Tag::IDENT_MISSING_FEATURES, sizeof(uint64_t)),
features(features) {
encode(features, payload, -1ll);
}
IdentMissingFeaturesFrame(char *payload, uint32_t length)
: SignedEncryptedFrame(Tag::IDENT_MISSING_FEATURES, length) {
features = *(uint64_t *)payload;
}
};
char *temp_buffer;
State state;
uint64_t peer_required_features;
uint64_t cookie;
using ProtFuncPtr = void (ProtocolV2::*)();
Ct<ProtocolV2> *bannerExchangeCallback;
@ -194,6 +261,7 @@ private:
Ct<ProtocolV2> *handle_frame(char *buffer, int r);
Ct<ProtocolV2> *handle_auth_more(char *payload, uint32_t length);
Ct<ProtocolV2> *handle_auth_more_write(int r);
Ct<ProtocolV2> *handle_ident(char *payload, uint32_t length);
public:
ProtocolV2(AsyncConnection *connection);
@ -216,6 +284,7 @@ private:
CONTINUATION_DECL(ProtocolV2, start_client_banner_exchange);
CONTINUATION_DECL(ProtocolV2, post_client_banner_exchange);
WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_request_write);
WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_client_ident_write);
Ct<ProtocolV2> *start_client_banner_exchange();
Ct<ProtocolV2> *post_client_banner_exchange();
@ -225,6 +294,9 @@ private:
Ct<ProtocolV2> *handle_auth_bad_auth(char *payload, uint32_t length);
Ct<ProtocolV2> *handle_auth_done(char *payload, uint32_t length);
Ct<ProtocolV2> *send_client_ident();
Ct<ProtocolV2> *handle_client_ident_write(int r);
Ct<ProtocolV2> *handle_ident_missing_features(char *payload, uint32_t length);
Ct<ProtocolV2> *handle_server_ident(char *payload, uint32_t length);
// Server Protocol
CONTINUATION_DECL(ProtocolV2, start_server_banner_exchange);
@ -232,6 +304,9 @@ private:
WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_bad_method_write);
WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_bad_auth_write);
WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_done_write);
WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2,
handle_ident_missing_features_write);
WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_send_server_ident_write);
Ct<ProtocolV2> *start_server_banner_exchange();
Ct<ProtocolV2> *post_server_banner_exchange();
@ -239,6 +314,9 @@ private:
Ct<ProtocolV2> *handle_auth_bad_method_write(int r);
Ct<ProtocolV2> *handle_auth_bad_auth_write(int r);
Ct<ProtocolV2> *handle_auth_done_write(int r);
Ct<ProtocolV2> *handle_client_ident(char *payload, uint32_t length);
Ct<ProtocolV2> *handle_ident_missing_features_write(int r);
Ct<ProtocolV2> *handle_send_server_ident_write(int r);
};
#endif /* _MSG_ASYNC_PROTOCOL_V2_ */