messenger protocol changes

There is now a plaintext banner that is exchanged so that both ends can
verify they are talking to the right software.  This string should be
changed each time the wire protocol changes.

There are also mon, osd, and mds specific protocol versions defined, but
these are not yet used.
This commit is contained in:
Sage Weil 2008-10-01 15:03:41 -07:00
parent 5ca0b3305d
commit 73c689cb5d
4 changed files with 251 additions and 140 deletions

View File

@ -26,6 +26,19 @@
#define CEPH_MON_PORT 12345
#define CEPH_FILE_MAX_SIZE (1ULL << 40) // 1 TB
/*
* protocol versions. increment each time one of these changes.
*/
#define CEPH_BANNER "ceph\n1\n" /* second line is a protocol version.
adjust whenever the wire protocol
changes. */
#define CEPH_BANNER_MAX_LEN 30
#define CEPH_OSD_PROTOCOL 1
#define CEPH_MDS_PROTOCOL 1
#define CEPH_MON_PROTOCOL 1
/*
* types in this file are defined as little-endian, and are
* primarily intended to describe data structures that pass
@ -377,6 +390,15 @@ struct ceph_entity_inst {
} __attribute__ ((packed));
/*
* connection negotiation
*/
struct ceph_msg_connect {
__le32 global_seq;
__le32 connect_seq;
};
/*
* message header, footer
*/

View File

@ -268,7 +268,7 @@ int ceph_tcp_accept(struct socket *lsock, struct ceph_connection *con)
struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
int len;
struct socket *sock;
ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
if (ret)
return ret;
@ -557,7 +557,7 @@ void ceph_queue_con(struct ceph_connection *con)
atomic_inc(&con->nref);
dout(40, "ceph_queue_con %p %d -> %d\n", con,
atomic_read(&con->nref) - 1, atomic_read(&con->nref));
set_bit(QUEUED, &con->state);
if (test_bit(BUSY, &con->state) ||
!queue_work(ceph_msgr_wq, &con->work.work)) {
@ -585,10 +585,10 @@ static void ceph_fault(struct ceph_connection *con)
}
con_close_socket(con);
/* hmm? */
BUG_ON(test_bit(WAIT, &con->state));
/*
* If there are no messages in the queue, place the
* connection in a STANDBY state. otherwise, retry with
@ -601,7 +601,7 @@ static void ceph_fault(struct ceph_connection *con)
spin_unlock(&con->out_queue_lock);
return;
}
dout(10, "fault setting BACKOFF\n");
set_bit(BACKOFF, &con->state);
@ -616,7 +616,7 @@ static void ceph_fault(struct ceph_connection *con)
con->delay);
queue_delayed_work(ceph_msgr_wq, &con->work,
round_jiffies_relative(con->delay));
list_splice_init(&con->out_sent, &con->out_queue);
spin_unlock(&con->out_queue_lock);
}
@ -713,13 +713,13 @@ static int write_partial_msg_pages(struct ceph_connection *con,
con->out_footer.aborted = cpu_to_le32(con->out_msg->pages == 0);
con->out_kvec[0].iov_base = &con->out_footer;
con->out_kvec_bytes = con->out_kvec[0].iov_len =
con->out_kvec_bytes = con->out_kvec[0].iov_len =
sizeof(con->out_footer);
con->out_kvec_left = 1;
con->out_kvec_cur = con->out_kvec;
con->out_msg = 0;
con->out_more = 0; /* end of message */
ret = 1;
out:
return ret;
@ -784,7 +784,7 @@ static void prepare_write_message(struct ceph_connection *con)
*/
static void prepare_write_ack(struct ceph_connection *con)
{
dout(20, "prepare_write_ack %p %u -> %u\n", con,
dout(20, "prepare_write_ack %p %u -> %u\n", con,
con->in_seq_acked, con->in_seq);
con->in_seq_acked = con->in_seq;
@ -819,17 +819,19 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
static void prepare_write_connect(struct ceph_messenger *msgr,
struct ceph_connection *con)
{
con->out_global_seq = cpu_to_le32(con->global_seq);
con->out_connect_seq = cpu_to_le32(con->connect_seq);
con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
con->out_connect.global_seq = cpu_to_le32(con->global_seq);
con->out_kvec[0].iov_base = &msgr->inst.addr;
con->out_kvec[0].iov_len = sizeof(msgr->inst.addr);
con->out_kvec[1].iov_base = &con->out_global_seq;
con->out_kvec[1].iov_len = 4;
con->out_kvec[2].iov_base = &con->out_connect_seq;
con->out_kvec[2].iov_len = 4;
con->out_kvec[0].iov_base = CEPH_BANNER;
con->out_kvec[0].iov_len = strlen(CEPH_BANNER);
con->out_kvec[1].iov_base = &msgr->inst.addr;
con->out_kvec[1].iov_len = sizeof(msgr->inst.addr);
con->out_kvec[2].iov_base = &con->out_connect;
con->out_kvec[2].iov_len = sizeof(con->out_connect);
con->out_kvec_left = 3;
con->out_kvec_bytes = sizeof(msgr->inst.addr) + 8;
con->out_kvec_bytes = strlen(CEPH_BANNER) +
sizeof(msgr->inst.addr) +
sizeof(con->out_connect);
con->out_kvec_cur = con->out_kvec;
con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
@ -838,27 +840,29 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
static void prepare_write_connect_retry(struct ceph_messenger *msgr,
struct ceph_connection *con)
{
con->out_global_seq = cpu_to_le32(con->global_seq);
con->out_connect_seq = cpu_to_le32(con->connect_seq);
con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
con->out_connect.global_seq = cpu_to_le32(con->global_seq);
con->out_kvec[0].iov_base = &con->out_global_seq;
con->out_kvec[0].iov_len = 4;
con->out_kvec[1].iov_base = &con->out_connect_seq;
con->out_kvec[1].iov_len = 4;
con->out_kvec_left = 2;
con->out_kvec_bytes = 8;
con->out_kvec[0].iov_base = &con->out_connect;
con->out_kvec[0].iov_len = sizeof(con->out_connect);
con->out_kvec_left = 1;
con->out_kvec_bytes = sizeof(con->out_connect);
con->out_kvec_cur = con->out_kvec;
con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
}
static void prepare_write_accept_announce(struct ceph_messenger *msgr,
struct ceph_connection *con)
static void prepare_write_accept_hello(struct ceph_messenger *msgr,
struct ceph_connection *con)
{
con->out_kvec[0].iov_base = &msgr->inst.addr;
con->out_kvec[0].iov_len = sizeof(msgr->inst.addr);
con->out_kvec_left = 1;
con->out_kvec_bytes = sizeof(msgr->inst.addr);
int len = strlen(CEPH_BANNER);
con->out_kvec[0].iov_base = CEPH_BANNER;
con->out_kvec[0].iov_len = len;
con->out_kvec[1].iov_base = &msgr->inst.addr;
con->out_kvec[1].iov_len = sizeof(msgr->inst.addr);
con->out_kvec_left = 2;
con->out_kvec_bytes = len + sizeof(msgr->inst.addr);
con->out_kvec_cur = con->out_kvec;
con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
@ -887,6 +891,9 @@ static void prepare_write_accept_retry(struct ceph_connection *con, char *ptag,
con->out_kvec_cur = con->out_kvec;
con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
/* we'll re-read the connect request, but not the hello */
con->in_base_pos = strlen(CEPH_BANNER) + sizeof(con->msgr->inst.addr);
}
/*
@ -1123,11 +1130,11 @@ static void process_message(struct ceph_connection *con)
/* if first message, set peer_name */
if (con->peer_name.type == 0)
con->peer_name = con->in_msg->hdr.src.name;
spin_lock(&con->out_queue_lock);
con->in_seq++;
spin_unlock(&con->out_queue_lock);
dout(1, "===== %p %u from %s%d %d=%s len %d+%d =====\n",
con->in_msg, le32_to_cpu(con->in_msg->hdr.seq),
ENTITY_NAME(con->in_msg->hdr.src.name),
@ -1195,11 +1202,24 @@ static int read_connect_partial(struct ceph_connection *con)
int ret, to;
dout(20, "read_connect_partial %p at %d\n", con, con->in_base_pos);
/* actual_peer_addr */
to = sizeof(con->actual_peer_addr);
/* peer's banner */
to = strlen(CEPH_BANNER);
while (con->in_base_pos < to) {
int left = to - con->in_base_pos;
int have = con->in_base_pos;
ret = ceph_tcp_recvmsg(con->sock,
(char *)&con->in_banner + have,
left);
if (ret <= 0)
goto out;
con->in_base_pos += ret;
}
/* peer's addr */
to += sizeof(con->actual_peer_addr);
while (con->in_base_pos < to) {
int left = to - con->in_base_pos;
int have = sizeof(con->actual_peer_addr) - left;
ret = ceph_tcp_recvmsg(con->sock,
(char *)&con->actual_peer_addr + have,
left);
@ -1218,28 +1238,28 @@ static int read_connect_partial(struct ceph_connection *con)
}
if (con->in_tag == CEPH_MSGR_TAG_RETRY_SESSION) {
/* peers connect_seq */
to += sizeof(con->in_connect_seq);
/* peer's connect_seq */
to += sizeof(con->in_connect.connect_seq);
if (con->in_base_pos < to) {
int left = to - con->in_base_pos;
int have = sizeof(con->in_connect_seq) - left;
int have = sizeof(con->in_connect.connect_seq) - left;
ret = ceph_tcp_recvmsg(con->sock,
(char *)&con->in_connect_seq +
have, left);
(char *)&con->in_connect.connect_seq +
have, left);
if (ret <= 0)
goto out;
con->in_base_pos += ret;
}
}
if (con->in_tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
/* peers global_seq */
to += sizeof(con->in_global_seq);
/* peer's global_seq */
to += sizeof(con->in_connect.global_seq);
if (con->in_base_pos < to) {
int left = to - con->in_base_pos;
int have = sizeof(con->in_global_seq) - left;
int have = sizeof(con->in_connect.global_seq) - left;
ret = ceph_tcp_recvmsg(con->sock,
(char *)&con->in_global_seq +
have, left);
(char *)&con->in_connect.global_seq +
have, left);
if (ret <= 0)
goto out;
con->in_base_pos += ret;
@ -1249,8 +1269,9 @@ static int read_connect_partial(struct ceph_connection *con)
out:
dout(20, "read_connect_partial %p end at %d ret %d\n", con,
con->in_base_pos, ret);
dout(20, "read_connect_partial peer in connect_seq = %u\n",
le32_to_cpu(con->in_connect_seq));
dout(20, "read_connect_partial connect_seq = %u, global_seq = %u\n",
le32_to_cpu(con->in_connect.connect_seq),
le32_to_cpu(con->in_connect.global_seq));
return ret; /* done */
}
@ -1259,7 +1280,7 @@ out:
*/
static void reset_connection(struct ceph_connection *con)
{
derr(1, "%s%d %u.%u.%u.%u:%u connection reset\n",
derr(1, "%s%d %u.%u.%u.%u:%u connection reset\n",
ENTITY_NAME(con->peer_name),
IPQUADPORT(con->peer_addr.ipaddr));
@ -1277,9 +1298,25 @@ static void reset_connection(struct ceph_connection *con)
spin_unlock(&con->out_queue_lock);
}
static int verify_hello(struct ceph_connection *con)
{
if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
derr(10, "connection from %u.%u.%u.%u:%u with bad banner\n",
IPQUADPORT(con->peer_addr.ipaddr));
con->error_msg = "protocol error, bad banner";
return -1;
}
return 0;
}
static int process_connect(struct ceph_connection *con)
{
dout(20, "process_connect on %p tag %d\n", con, (int)con->in_tag);
if (verify_hello(con) < 0)
return -1;
/* verify peer addr */
if (!ceph_entity_addr_is_local(con->peer_addr, con->actual_peer_addr) &&
con->actual_peer_addr.ipaddr.sin_addr.s_addr != 0) {
derr(1, "process_connect wrong peer, want %u.%u.%u.%u:%u/%d, "
@ -1291,10 +1328,11 @@ static int process_connect(struct ceph_connection *con)
con->error_msg = "protocol error, wrong peer";
return -1;
}
switch (con->in_tag) {
case CEPH_MSGR_TAG_RESETSESSION:
dout(10, "process_connect got RESET peer seq %u\n",
le32_to_cpu(con->in_connect_seq));
le32_to_cpu(con->in_connect.connect_seq));
reset_connection(con);
prepare_write_connect_retry(con->msgr, con);
prepare_read_connect(con);
@ -1303,19 +1341,19 @@ static int process_connect(struct ceph_connection *con)
case CEPH_MSGR_TAG_RETRY_SESSION:
dout(10,
"process_connect got RETRY my seq = %u, peer_seq = %u\n",
le32_to_cpu(con->out_connect_seq),
le32_to_cpu(con->in_connect_seq));
con->connect_seq = le32_to_cpu(con->in_connect_seq);
le32_to_cpu(con->out_connect.connect_seq),
le32_to_cpu(con->in_connect.connect_seq));
con->connect_seq = le32_to_cpu(con->in_connect.connect_seq);
prepare_write_connect_retry(con->msgr, con);
prepare_read_connect(con);
break;
case CEPH_MSGR_TAG_RETRY_GLOBAL:
dout(10,
"process_connect got RETRY_GLOBAL my %u, peer_gseq = %u\n",
con->global_seq, le32_to_cpu(con->in_global_seq));
con->global_seq, le32_to_cpu(con->in_connect.global_seq));
con->global_seq =
get_global_seq(con->msgr,
le32_to_cpu(con->in_global_seq));
le32_to_cpu(con->in_connect.global_seq));
prepare_write_connect_retry(con->msgr, con);
prepare_read_connect(con);
break;
@ -1346,43 +1384,44 @@ static int read_accept_partial(struct ceph_connection *con)
int ret;
int to;
/* peer addr */
to = sizeof(con->peer_addr);
/* banner */
to = strlen(CEPH_BANNER);
while (con->in_base_pos < to) {
int left = to - con->in_base_pos;
int have = con->in_base_pos;
ret = ceph_tcp_recvmsg(con->sock,
(char *)&con->peer_addr + have, left);
(char *)&con->in_banner + have, left);
if (ret <= 0)
return ret;
con->in_base_pos += ret;
}
/* global_seq */
to += sizeof(con->in_global_seq);
/* peer_addr */
to += sizeof(con->peer_addr);
while (con->in_base_pos < to) {
int left = to - con->in_base_pos;
int have = sizeof(u32) - left;
int have = sizeof(con->peer_addr) - left;
ret = ceph_tcp_recvmsg(con->sock,
(char *)&con->in_global_seq + have,
(char *)&con->peer_addr + have,
left);
if (ret <= 0)
return ret;
con->in_base_pos += ret;
}
/* connect_seq */
to += sizeof(con->in_connect_seq);
while (con->in_base_pos < to) {
int left = to - con->in_base_pos;
int have = sizeof(u32) - left;
ret = ceph_tcp_recvmsg(con->sock,
(char *)&con->in_connect_seq + have,
left);
if (ret <= 0)
return ret;
con->in_base_pos += ret;
/* connect */
to += sizeof(con->in_connect);
while (con->in_base_pos < to) {
int left = to - con->in_base_pos;
int have = sizeof(u32) - left;
ret = ceph_tcp_recvmsg(con->sock,
(char *)&con->in_connect + have, left);
if (ret <= 0)
return ret;
con->in_base_pos += ret;
}
return 1; /* done */
}
@ -1401,7 +1440,7 @@ static void __replace_connection(struct ceph_messenger *msgr,
list_splice_init(&new->out_queue, &old->out_queue);
spin_unlock(&old->out_queue_lock);
new->connect_seq = le32_to_cpu(new->in_connect_seq);
new->connect_seq = le32_to_cpu(new->in_connect.connect_seq);
new->out_seq = old->out_seq;
/* replace list entry */
@ -1418,17 +1457,22 @@ static void __replace_connection(struct ceph_messenger *msgr,
/*
* call after a new connection's handshake has completed
*/
static void process_accept(struct ceph_connection *con)
static int process_accept(struct ceph_connection *con)
{
struct ceph_connection *existing;
struct ceph_messenger *msgr = con->msgr;
u32 peer_gseq = le32_to_cpu(con->in_global_seq);
u32 peer_cseq = le32_to_cpu(con->in_connect_seq);
u32 peer_gseq = le32_to_cpu(con->in_connect.global_seq);
u32 peer_cseq = le32_to_cpu(con->in_connect.connect_seq);
if (verify_hello(con) < 0)
return -1;
/* connect */
/* do we have an existing connection for this peer? */
if (radix_tree_preload(GFP_NOFS) < 0) {
derr(10, "ENOMEM in process_accept\n");
return;
con->error_msg = "out of memory";
return -1;
}
spin_lock(&msgr->con_lock);
existing = __get_connection(msgr, &con->peer_addr);
@ -1436,11 +1480,11 @@ static void process_accept(struct ceph_connection *con)
if (peer_gseq < existing->global_seq) {
/* retry_global */
con->global_seq = existing->global_seq;
con->out_global_seq =
con->out_connect.global_seq =
cpu_to_le32(con->global_seq);
prepare_write_accept_retry(con,
&tag_retry_global,
&con->out_global_seq);
&tag_retry_global,
&con->out_connect.global_seq);
} else if (test_bit(LOSSY, &existing->state)) {
dout(20, "process_accept replacing existing LOSSY %p\n",
existing);
@ -1458,11 +1502,11 @@ static void process_accept(struct ceph_connection *con)
/* old attempt or peer didn't get the READY */
/* send retry with peers connect seq */
con->connect_seq = existing->connect_seq;
con->out_connect_seq =
con->out_connect.connect_seq =
cpu_to_le32(con->connect_seq);
prepare_write_accept_retry(con,
&tag_retry_session,
&con->out_connect_seq);
&con->out_connect.connect_seq);
}
} else if (peer_cseq == existing->connect_seq &&
(test_bit(CONNECTING, &existing->state) ||
@ -1505,6 +1549,7 @@ static void process_accept(struct ceph_connection *con)
ceph_queue_con(con);
put_connection(con);
return 0;
}
@ -1528,7 +1573,10 @@ more:
ret = read_accept_partial(con);
if (ret <= 0)
goto done;
process_accept(con); /* accepted */
if (process_accept(con) < 0) {
ret = -1;
goto out;
}
goto more;
}
if (test_bit(CONNECTING, &con->state)) {
@ -1609,7 +1657,7 @@ static void con_work(struct work_struct *work)
{
struct ceph_connection *con = container_of(work, struct ceph_connection,
work.work);
more:
if (test_and_set_bit(BUSY, &con->state) != 0) {
dout(10, "con_work %p BUSY already set\n", con);
@ -1617,7 +1665,7 @@ more:
}
dout(10, "con_work %p start, clearing QUEUED\n", con);
clear_bit(QUEUED, &con->state);
if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
dout(5, "con_work CLOSED\n");
goto done;
@ -1628,7 +1676,7 @@ more:
}
if (test_and_clear_bit(BACKOFF, &con->state))
dout(5, "con_work cleared BACKOFF\n");
if (test_and_clear_bit(SOCK_CLOSED, &con->state) ||
try_read(con) < 0 ||
try_write(con) < 0)
@ -1682,7 +1730,7 @@ static void try_accept(struct work_struct *work)
}
dout(5, "accepted connection \n");
prepare_write_accept_announce(msgr, newcon);
prepare_write_accept_hello(msgr, newcon);
add_connection_accepting(msgr, newcon);
/* hand off to work queue; we may have missed socket state change */
@ -1746,7 +1794,7 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr)
struct ceph_connection *con;
dout(2, "destroy %p\n", msgr);
/* stop listener */
msgr->listen_sock->ops->shutdown(msgr->listen_sock, SHUT_RDWR);
sock_release(msgr->listen_sock);
@ -1763,7 +1811,7 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr)
dout(40, " get %p %d -> %d\n", con,
atomic_read(&con->nref) - 1, atomic_read(&con->nref));
__remove_connection(msgr, con);
/* in case there's queued work... */
spin_unlock(&msgr->con_lock);
if (cancel_delayed_work_sync(&con->work))
@ -1820,16 +1868,16 @@ struct ceph_msg *ceph_msg_maybe_dup(struct ceph_msg *old)
if (atomic_read(&old->nref) == 1)
return old; /* we have only ref, all is well */
dup = ceph_msg_new(le32_to_cpu(old->hdr.type),
dup = ceph_msg_new(le32_to_cpu(old->hdr.type),
le32_to_cpu(old->hdr.front_len),
le32_to_cpu(old->hdr.data_len),
le32_to_cpu(old->hdr.data_len),
le32_to_cpu(old->hdr.data_off),
old->pages);
BUG_ON(!dup);
memcpy(dup->front.iov_base, old->front.iov_base,
le32_to_cpu(old->hdr.front_len));
/* revoke old message's pages */
mutex_lock(&old->page_mutex);
old->pages = 0;
@ -1871,7 +1919,7 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
derr(10, "ENOMEM in ceph_msg_send\n");
return ret;
}
spin_lock(&msgr->con_lock);
con = __get_connection(msgr, &msg->hdr.dst.addr);
if (con) {

View File

@ -104,14 +104,12 @@ struct ceph_connection {
struct ceph_entity_addr peer_addr; /* peer address */
struct ceph_entity_name peer_name; /* peer name */
__u32 connect_seq, global_seq;
__le32 in_connect_seq, out_connect_seq;
__le32 in_global_seq, out_global_seq;
char in_banner[CEPH_BANNER_MAX_LEN];
struct ceph_msg_connect out_connect, in_connect;
struct ceph_entity_addr actual_peer_addr;
__u32 out_seq; /* last message queued for send */
__u32 in_seq, in_seq_acked; /* last message received, acked */
/* connect state */
struct ceph_entity_addr actual_peer_addr;
/* out queue */
spinlock_t out_queue_lock; /* protects out_queue, out_sent, out_seq */
struct list_head out_queue;

View File

@ -763,7 +763,15 @@ int Rank::Pipe::accept()
assert(state == STATE_ACCEPTING);
// announce myself.
int rc = tcp_write(sd, (char*)&rank.rank_addr, sizeof(rank.rank_addr));
int rc = tcp_write(sd, CEPH_BANNER, strlen(CEPH_BANNER));
if (rc < 0) {
dout(10) << "accept couldn't write banner" << dendl;
state = STATE_CLOSED;
return -1;
}
// and my addr
rc = tcp_write(sd, (char*)&rank.rank_addr, sizeof(rank.rank_addr));
if (rc < 0) {
dout(10) << "accept couldn't write my addr" << dendl;
state = STATE_CLOSED;
@ -772,9 +780,21 @@ int Rank::Pipe::accept()
dout(10) << "accept sd=" << sd << dendl;
// identify peer
char banner[strlen(CEPH_BANNER)];
rc = tcp_read(sd, banner, strlen(CEPH_BANNER));
if (rc < 0) {
dout(10) << "accept couldn't read peer_addr" << dendl;
state = STATE_CLOSED;
return -1;
}
if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
dout(10) << "accept peer sent bad banner" << dendl;
state = STATE_CLOSED;
return -1;
}
rc = tcp_read(sd, (char*)&peer_addr, sizeof(peer_addr));
if (rc < 0) {
dout(10) << "accept couldn't read peer addr" << dendl;
dout(10) << "accept couldn't read peer_addr" << dendl;
state = STATE_CLOSED;
return -1;
}
@ -792,24 +812,19 @@ int Rank::Pipe::accept()
dout(2) << "accept peer says " << old_addr << ", socket says " << peer_addr << dendl;
}
__u32 peer_gseq, peer_cseq;
ceph_msg_connect connect;
Pipe *existing = 0;
// this should roughly mirror pseudocode at
// http://ceph.newdream.net/wiki/Messaging_protocol
while (1) {
rc = tcp_read(sd, (char*)&peer_gseq, sizeof(peer_gseq));
rc = tcp_read(sd, (char*)&connect, sizeof(connect));
if (rc < 0) {
dout(10) << "accept couldn't read connect peer_gseq" << dendl;
dout(10) << "accept couldn't read connect" << dendl;
goto fail;
}
rc = tcp_read(sd, (char*)&peer_cseq, sizeof(peer_cseq));
if (rc < 0) {
dout(10) << "accept couldn't read connect peer_seq" << dendl;
goto fail;
}
dout(20) << "accept got peer_connect_seq " << peer_cseq << dendl;
dout(20) << "accept got peer_connect_seq " << connect.connect_seq << dendl;
rank.lock.Lock();
@ -818,10 +833,11 @@ int Rank::Pipe::accept()
existing = rank.rank_pipe[peer_addr];
existing->lock.Lock();
if (peer_gseq < existing->peer_global_seq) {
if (connect.global_seq < existing->peer_global_seq) {
dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
<< " > " << peer_gseq << ", RETRY_GLOBAL" << dendl;
__u32 gseq = existing->peer_global_seq; // so we can send it below..
<< " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl;
__le32 gseq;
gseq = existing->peer_global_seq; // so we can send it below..
existing->lock.Unlock();
rank.lock.Unlock();
char tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
@ -838,16 +854,17 @@ int Rank::Pipe::accept()
goto replace;
}
if (peer_cseq < existing->connect_seq) {
if (peer_cseq == 0) {
if (connect.connect_seq < existing->connect_seq) {
if (connect.connect_seq == 0) {
dout(-10) << "accept peer reset, then tried to connect to us, replacing" << dendl;
existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
goto replace;
} else {
// old attempt, or we sent READY but they didn't get it.
dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
<< " > " << peer_cseq << ", RETRY_SESSION" << dendl;
__u32 cseq = existing->connect_seq; // so we can send it below..
<< " > " << connect.connect_seq << ", RETRY_SESSION" << dendl;
__le32 cseq;
cseq = existing->connect_seq; // so we can send it below..
existing->lock.Unlock();
rank.lock.Unlock();
char tag = CEPH_MSGR_TAG_RETRY_SESSION;
@ -859,19 +876,19 @@ int Rank::Pipe::accept()
}
}
if (peer_cseq == existing->connect_seq) {
if (connect.connect_seq == existing->connect_seq) {
// connection race
if (peer_addr < rank.rank_addr) {
// incoming wins
dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
<< " == " << peer_cseq << ", replacing my attempt" << dendl;
<< " == " << connect.connect_seq << ", replacing my attempt" << dendl;
assert(existing->state == STATE_CONNECTING ||
existing->state == STATE_WAIT);
goto replace;
} else {
// our existing outgoing wins
dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
<< " == " << peer_cseq << ", sending WAIT" << dendl;
<< " == " << connect.connect_seq << ", sending WAIT" << dendl;
assert(peer_addr > rank.rank_addr);
assert(existing->state == STATE_CONNECTING); // this will win
existing->lock.Unlock();
@ -884,10 +901,10 @@ int Rank::Pipe::accept()
}
}
assert(peer_cseq > existing->connect_seq);
assert(peer_gseq >= existing->peer_global_seq);
assert(connect.connect_seq > existing->connect_seq);
assert(connect.global_seq >= existing->peer_global_seq);
if (existing->connect_seq == 0) {
dout(0) << "accept we reset (peer sent cseq " << peer_cseq
dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq
<< ", " << existing << ".cseq = " << existing->connect_seq
<< "), sending RESETSESSION" << dendl;
rank.lock.Unlock();
@ -898,15 +915,15 @@ int Rank::Pipe::accept()
continue;
} else {
// reconnect
dout(10) << "accept peer sent cseq " << peer_cseq
dout(10) << "accept peer sent cseq " << connect.connect_seq
<< " > " << existing->connect_seq << dendl;
goto replace;
}
assert(0);
} // existing
else if (peer_cseq > 0) {
else if (connect.connect_seq > 0) {
// we reset, and they are opening a new session
dout(0) << "accept we reset (peer sent cseq " << peer_cseq << "), sending RESETSESSION" << dendl;
dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl;
rank.lock.Unlock();
char tag = CEPH_MSGR_TAG_RESETSESSION;
if (tcp_write(sd, &tag, 1) < 0)
@ -942,8 +959,8 @@ int Rank::Pipe::accept()
register_pipe();
rank.lock.Unlock();
connect_seq = peer_cseq + 1;
peer_global_seq = peer_gseq;
connect_seq = connect.connect_seq + 1;
peer_global_seq = connect.global_seq;
dout(10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
// send READY
@ -985,11 +1002,12 @@ int Rank::Pipe::connect()
char tag = -1;
int rc;
struct sockaddr_in myAddr;
entity_addr_t paddr;
struct msghdr msg;
struct iovec msgvec[2];
int msglen;
char banner[strlen(CEPH_BANNER)];
entity_addr_t paddr;
// create socket?
newsd = ::socket(AF_INET, SOCK_STREAM, 0);
if (newsd < 0) {
@ -1022,6 +1040,29 @@ int Rank::Pipe::connect()
dout(0) << "connect couldn't set TCP_NODELAY: " << strerror(errno) << dendl;
}
// verify banner
// FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
rc = tcp_read(newsd, (char*)&banner, strlen(CEPH_BANNER));
if (rc < 0) {
dout(2) << "connect couldn't read banner, " << strerror(errno) << dendl;
goto fail;
}
if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
dout(0) << "connect protocol error (bad banner) on peer " << paddr << dendl;
goto fail;
}
memset(&msg, 0, sizeof(msg));
msgvec[0].iov_base = banner;
msgvec[0].iov_len = strlen(banner);
msg.msg_iov = msgvec;
msg.msg_iovlen = 1;
msglen = msgvec[0].iov_len;
if (do_sendmsg(newsd, &msg, msglen)) {
dout(2) << "connect couldn't write my banner, " << strerror(errno) << dendl;
goto fail;
}
// identify peer
rc = tcp_read(newsd, (char*)&paddr, sizeof(paddr));
if (rc < 0) {
@ -1040,7 +1081,7 @@ int Rank::Pipe::connect()
goto fail;
}
}
// identify myself, and send initial cseq
memset(&msg, 0, sizeof(msg));
msgvec[0].iov_base = (char*)&rank.rank_addr;
@ -1049,19 +1090,20 @@ int Rank::Pipe::connect()
msg.msg_iovlen = 1;
msglen = msgvec[0].iov_len;
if (do_sendmsg(newsd, &msg, msglen)) {
dout(2) << "connect couldn't write self addr, " << strerror(errno) << dendl;
dout(2) << "connect couldn't write my addr, " << strerror(errno) << dendl;
goto fail;
}
while (1) {
ceph_msg_connect connect;
connect.global_seq = gseq;
connect.connect_seq = cseq;
memset(&msg, 0, sizeof(msg));
msgvec[0].iov_base = (char*)&gseq;
msgvec[0].iov_len = sizeof(gseq);
msgvec[1].iov_base = (char*)&cseq;
msgvec[1].iov_len = sizeof(cseq);
msgvec[0].iov_base = (char*)&connect;
msgvec[0].iov_len = sizeof(connect);
msg.msg_iov = msgvec;
msg.msg_iovlen = 2;
msglen = msgvec[0].iov_len + msgvec[1].iov_len;
msg.msg_iovlen = 1;
msglen = msgvec[0].iov_len;
dout(10) << "connect sending gseq " << gseq << " cseq " << cseq << dendl;
if (do_sendmsg(newsd, &msg, msglen)) {
@ -1785,7 +1827,8 @@ int Rank::Pipe::write_ack(unsigned seq)
dout(10) << "write_ack " << seq << dendl;
char c = CEPH_MSGR_TAG_ACK;
__u32 s = seq;/*cpu_to_le32(seq);*/
__le32 s;
s = seq;
struct msghdr msg;
memset(&msg, 0, sizeof(msg));