diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index 08b197aef74..4a616da20d6 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -26,6 +26,19 @@ #define CEPH_MON_PORT 12345 #define CEPH_FILE_MAX_SIZE (1ULL << 40) // 1 TB + +/* + * protocol versions. increment each time one of these changes. + */ +#define CEPH_BANNER "ceph\n1\n" /* second line is a protocol version. + adjust whenever the wire protocol + changes. */ +#define CEPH_BANNER_MAX_LEN 30 + +#define CEPH_OSD_PROTOCOL 1 +#define CEPH_MDS_PROTOCOL 1 +#define CEPH_MON_PROTOCOL 1 + /* * types in this file are defined as little-endian, and are * primarily intended to describe data structures that pass @@ -377,6 +390,15 @@ struct ceph_entity_inst { } __attribute__ ((packed)); +/* + * connection negotiation + */ +struct ceph_msg_connect { + __le32 global_seq; + __le32 connect_seq; +}; + + /* * message header, footer */ diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index 2a7813f0749..909c4535690 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -268,7 +268,7 @@ int ceph_tcp_accept(struct socket *lsock, struct ceph_connection *con) struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr; int len; struct socket *sock; - + ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); if (ret) return ret; @@ -557,7 +557,7 @@ void ceph_queue_con(struct ceph_connection *con) atomic_inc(&con->nref); dout(40, "ceph_queue_con %p %d -> %d\n", con, atomic_read(&con->nref) - 1, atomic_read(&con->nref)); - + set_bit(QUEUED, &con->state); if (test_bit(BUSY, &con->state) || !queue_work(ceph_msgr_wq, &con->work.work)) { @@ -585,10 +585,10 @@ static void ceph_fault(struct ceph_connection *con) } con_close_socket(con); - + /* hmm? */ BUG_ON(test_bit(WAIT, &con->state)); - + /* * If there are no messages in the queue, place the * connection in a STANDBY state. otherwise, retry with @@ -601,7 +601,7 @@ static void ceph_fault(struct ceph_connection *con) spin_unlock(&con->out_queue_lock); return; } - + dout(10, "fault setting BACKOFF\n"); set_bit(BACKOFF, &con->state); @@ -616,7 +616,7 @@ static void ceph_fault(struct ceph_connection *con) con->delay); queue_delayed_work(ceph_msgr_wq, &con->work, round_jiffies_relative(con->delay)); - + list_splice_init(&con->out_sent, &con->out_queue); spin_unlock(&con->out_queue_lock); } @@ -713,13 +713,13 @@ static int write_partial_msg_pages(struct ceph_connection *con, con->out_footer.aborted = cpu_to_le32(con->out_msg->pages == 0); con->out_kvec[0].iov_base = &con->out_footer; - con->out_kvec_bytes = con->out_kvec[0].iov_len = + con->out_kvec_bytes = con->out_kvec[0].iov_len = sizeof(con->out_footer); con->out_kvec_left = 1; con->out_kvec_cur = con->out_kvec; con->out_msg = 0; con->out_more = 0; /* end of message */ - + ret = 1; out: return ret; @@ -784,7 +784,7 @@ static void prepare_write_message(struct ceph_connection *con) */ static void prepare_write_ack(struct ceph_connection *con) { - dout(20, "prepare_write_ack %p %u -> %u\n", con, + dout(20, "prepare_write_ack %p %u -> %u\n", con, con->in_seq_acked, con->in_seq); con->in_seq_acked = con->in_seq; @@ -819,17 +819,19 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) static void prepare_write_connect(struct ceph_messenger *msgr, struct ceph_connection *con) { - con->out_global_seq = cpu_to_le32(con->global_seq); - con->out_connect_seq = cpu_to_le32(con->connect_seq); + con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); + con->out_connect.global_seq = cpu_to_le32(con->global_seq); - con->out_kvec[0].iov_base = &msgr->inst.addr; - con->out_kvec[0].iov_len = sizeof(msgr->inst.addr); - con->out_kvec[1].iov_base = &con->out_global_seq; - con->out_kvec[1].iov_len = 4; - con->out_kvec[2].iov_base = &con->out_connect_seq; - con->out_kvec[2].iov_len = 4; + con->out_kvec[0].iov_base = CEPH_BANNER; + con->out_kvec[0].iov_len = strlen(CEPH_BANNER); + con->out_kvec[1].iov_base = &msgr->inst.addr; + con->out_kvec[1].iov_len = sizeof(msgr->inst.addr); + con->out_kvec[2].iov_base = &con->out_connect; + con->out_kvec[2].iov_len = sizeof(con->out_connect); con->out_kvec_left = 3; - con->out_kvec_bytes = sizeof(msgr->inst.addr) + 8; + con->out_kvec_bytes = strlen(CEPH_BANNER) + + sizeof(msgr->inst.addr) + + sizeof(con->out_connect); con->out_kvec_cur = con->out_kvec; con->out_more = 0; set_bit(WRITE_PENDING, &con->state); @@ -838,27 +840,29 @@ static void prepare_write_connect(struct ceph_messenger *msgr, static void prepare_write_connect_retry(struct ceph_messenger *msgr, struct ceph_connection *con) { - con->out_global_seq = cpu_to_le32(con->global_seq); - con->out_connect_seq = cpu_to_le32(con->connect_seq); + con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); + con->out_connect.global_seq = cpu_to_le32(con->global_seq); - con->out_kvec[0].iov_base = &con->out_global_seq; - con->out_kvec[0].iov_len = 4; - con->out_kvec[1].iov_base = &con->out_connect_seq; - con->out_kvec[1].iov_len = 4; - con->out_kvec_left = 2; - con->out_kvec_bytes = 8; + con->out_kvec[0].iov_base = &con->out_connect; + con->out_kvec[0].iov_len = sizeof(con->out_connect); + con->out_kvec_left = 1; + con->out_kvec_bytes = sizeof(con->out_connect); con->out_kvec_cur = con->out_kvec; con->out_more = 0; set_bit(WRITE_PENDING, &con->state); } -static void prepare_write_accept_announce(struct ceph_messenger *msgr, - struct ceph_connection *con) +static void prepare_write_accept_hello(struct ceph_messenger *msgr, + struct ceph_connection *con) { - con->out_kvec[0].iov_base = &msgr->inst.addr; - con->out_kvec[0].iov_len = sizeof(msgr->inst.addr); - con->out_kvec_left = 1; - con->out_kvec_bytes = sizeof(msgr->inst.addr); + int len = strlen(CEPH_BANNER); + + con->out_kvec[0].iov_base = CEPH_BANNER; + con->out_kvec[0].iov_len = len; + con->out_kvec[1].iov_base = &msgr->inst.addr; + con->out_kvec[1].iov_len = sizeof(msgr->inst.addr); + con->out_kvec_left = 2; + con->out_kvec_bytes = len + sizeof(msgr->inst.addr); con->out_kvec_cur = con->out_kvec; con->out_more = 0; set_bit(WRITE_PENDING, &con->state); @@ -887,6 +891,9 @@ static void prepare_write_accept_retry(struct ceph_connection *con, char *ptag, con->out_kvec_cur = con->out_kvec; con->out_more = 0; set_bit(WRITE_PENDING, &con->state); + + /* we'll re-read the connect request, but not the hello */ + con->in_base_pos = strlen(CEPH_BANNER) + sizeof(con->msgr->inst.addr); } /* @@ -1123,11 +1130,11 @@ static void process_message(struct ceph_connection *con) /* if first message, set peer_name */ if (con->peer_name.type == 0) con->peer_name = con->in_msg->hdr.src.name; - + spin_lock(&con->out_queue_lock); con->in_seq++; spin_unlock(&con->out_queue_lock); - + dout(1, "===== %p %u from %s%d %d=%s len %d+%d =====\n", con->in_msg, le32_to_cpu(con->in_msg->hdr.seq), ENTITY_NAME(con->in_msg->hdr.src.name), @@ -1195,11 +1202,24 @@ static int read_connect_partial(struct ceph_connection *con) int ret, to; dout(20, "read_connect_partial %p at %d\n", con, con->in_base_pos); - /* actual_peer_addr */ - to = sizeof(con->actual_peer_addr); + /* peer's banner */ + to = strlen(CEPH_BANNER); while (con->in_base_pos < to) { int left = to - con->in_base_pos; int have = con->in_base_pos; + ret = ceph_tcp_recvmsg(con->sock, + (char *)&con->in_banner + have, + left); + if (ret <= 0) + goto out; + con->in_base_pos += ret; + } + + /* peer's addr */ + to += sizeof(con->actual_peer_addr); + while (con->in_base_pos < to) { + int left = to - con->in_base_pos; + int have = sizeof(con->actual_peer_addr) - left; ret = ceph_tcp_recvmsg(con->sock, (char *)&con->actual_peer_addr + have, left); @@ -1218,28 +1238,28 @@ static int read_connect_partial(struct ceph_connection *con) } if (con->in_tag == CEPH_MSGR_TAG_RETRY_SESSION) { - /* peers connect_seq */ - to += sizeof(con->in_connect_seq); + /* peer's connect_seq */ + to += sizeof(con->in_connect.connect_seq); if (con->in_base_pos < to) { int left = to - con->in_base_pos; - int have = sizeof(con->in_connect_seq) - left; + int have = sizeof(con->in_connect.connect_seq) - left; ret = ceph_tcp_recvmsg(con->sock, - (char *)&con->in_connect_seq + - have, left); + (char *)&con->in_connect.connect_seq + + have, left); if (ret <= 0) goto out; con->in_base_pos += ret; } } if (con->in_tag == CEPH_MSGR_TAG_RETRY_GLOBAL) { - /* peers global_seq */ - to += sizeof(con->in_global_seq); + /* peer's global_seq */ + to += sizeof(con->in_connect.global_seq); if (con->in_base_pos < to) { int left = to - con->in_base_pos; - int have = sizeof(con->in_global_seq) - left; + int have = sizeof(con->in_connect.global_seq) - left; ret = ceph_tcp_recvmsg(con->sock, - (char *)&con->in_global_seq + - have, left); + (char *)&con->in_connect.global_seq + + have, left); if (ret <= 0) goto out; con->in_base_pos += ret; @@ -1249,8 +1269,9 @@ static int read_connect_partial(struct ceph_connection *con) out: dout(20, "read_connect_partial %p end at %d ret %d\n", con, con->in_base_pos, ret); - dout(20, "read_connect_partial peer in connect_seq = %u\n", - le32_to_cpu(con->in_connect_seq)); + dout(20, "read_connect_partial connect_seq = %u, global_seq = %u\n", + le32_to_cpu(con->in_connect.connect_seq), + le32_to_cpu(con->in_connect.global_seq)); return ret; /* done */ } @@ -1259,7 +1280,7 @@ out: */ static void reset_connection(struct ceph_connection *con) { - derr(1, "%s%d %u.%u.%u.%u:%u connection reset\n", + derr(1, "%s%d %u.%u.%u.%u:%u connection reset\n", ENTITY_NAME(con->peer_name), IPQUADPORT(con->peer_addr.ipaddr)); @@ -1277,9 +1298,25 @@ static void reset_connection(struct ceph_connection *con) spin_unlock(&con->out_queue_lock); } +static int verify_hello(struct ceph_connection *con) +{ + if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { + derr(10, "connection from %u.%u.%u.%u:%u with bad banner\n", + IPQUADPORT(con->peer_addr.ipaddr)); + con->error_msg = "protocol error, bad banner"; + return -1; + } + return 0; +} + static int process_connect(struct ceph_connection *con) { dout(20, "process_connect on %p tag %d\n", con, (int)con->in_tag); + + if (verify_hello(con) < 0) + return -1; + + /* verify peer addr */ if (!ceph_entity_addr_is_local(con->peer_addr, con->actual_peer_addr) && con->actual_peer_addr.ipaddr.sin_addr.s_addr != 0) { derr(1, "process_connect wrong peer, want %u.%u.%u.%u:%u/%d, " @@ -1291,10 +1328,11 @@ static int process_connect(struct ceph_connection *con) con->error_msg = "protocol error, wrong peer"; return -1; } + switch (con->in_tag) { case CEPH_MSGR_TAG_RESETSESSION: dout(10, "process_connect got RESET peer seq %u\n", - le32_to_cpu(con->in_connect_seq)); + le32_to_cpu(con->in_connect.connect_seq)); reset_connection(con); prepare_write_connect_retry(con->msgr, con); prepare_read_connect(con); @@ -1303,19 +1341,19 @@ static int process_connect(struct ceph_connection *con) case CEPH_MSGR_TAG_RETRY_SESSION: dout(10, "process_connect got RETRY my seq = %u, peer_seq = %u\n", - le32_to_cpu(con->out_connect_seq), - le32_to_cpu(con->in_connect_seq)); - con->connect_seq = le32_to_cpu(con->in_connect_seq); + le32_to_cpu(con->out_connect.connect_seq), + le32_to_cpu(con->in_connect.connect_seq)); + con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); prepare_write_connect_retry(con->msgr, con); prepare_read_connect(con); break; case CEPH_MSGR_TAG_RETRY_GLOBAL: dout(10, "process_connect got RETRY_GLOBAL my %u, peer_gseq = %u\n", - con->global_seq, le32_to_cpu(con->in_global_seq)); + con->global_seq, le32_to_cpu(con->in_connect.global_seq)); con->global_seq = get_global_seq(con->msgr, - le32_to_cpu(con->in_global_seq)); + le32_to_cpu(con->in_connect.global_seq)); prepare_write_connect_retry(con->msgr, con); prepare_read_connect(con); break; @@ -1346,43 +1384,44 @@ static int read_accept_partial(struct ceph_connection *con) int ret; int to; - /* peer addr */ - to = sizeof(con->peer_addr); + /* banner */ + to = strlen(CEPH_BANNER); while (con->in_base_pos < to) { int left = to - con->in_base_pos; int have = con->in_base_pos; ret = ceph_tcp_recvmsg(con->sock, - (char *)&con->peer_addr + have, left); + (char *)&con->in_banner + have, left); if (ret <= 0) return ret; con->in_base_pos += ret; } - /* global_seq */ - to += sizeof(con->in_global_seq); + /* peer_addr */ + to += sizeof(con->peer_addr); while (con->in_base_pos < to) { int left = to - con->in_base_pos; - int have = sizeof(u32) - left; + int have = sizeof(con->peer_addr) - left; ret = ceph_tcp_recvmsg(con->sock, - (char *)&con->in_global_seq + have, + (char *)&con->peer_addr + have, left); if (ret <= 0) return ret; con->in_base_pos += ret; } - /* connect_seq */ - to += sizeof(con->in_connect_seq); - while (con->in_base_pos < to) { - int left = to - con->in_base_pos; - int have = sizeof(u32) - left; - ret = ceph_tcp_recvmsg(con->sock, - (char *)&con->in_connect_seq + have, - left); - if (ret <= 0) - return ret; - con->in_base_pos += ret; + /* connect */ + to += sizeof(con->in_connect); + while (con->in_base_pos < to) { + int left = to - con->in_base_pos; + int have = sizeof(u32) - left; + ret = ceph_tcp_recvmsg(con->sock, + (char *)&con->in_connect + have, left); + if (ret <= 0) + return ret; + con->in_base_pos += ret; } + + return 1; /* done */ } @@ -1401,7 +1440,7 @@ static void __replace_connection(struct ceph_messenger *msgr, list_splice_init(&new->out_queue, &old->out_queue); spin_unlock(&old->out_queue_lock); - new->connect_seq = le32_to_cpu(new->in_connect_seq); + new->connect_seq = le32_to_cpu(new->in_connect.connect_seq); new->out_seq = old->out_seq; /* replace list entry */ @@ -1418,17 +1457,22 @@ static void __replace_connection(struct ceph_messenger *msgr, /* * call after a new connection's handshake has completed */ -static void process_accept(struct ceph_connection *con) +static int process_accept(struct ceph_connection *con) { struct ceph_connection *existing; struct ceph_messenger *msgr = con->msgr; - u32 peer_gseq = le32_to_cpu(con->in_global_seq); - u32 peer_cseq = le32_to_cpu(con->in_connect_seq); + u32 peer_gseq = le32_to_cpu(con->in_connect.global_seq); + u32 peer_cseq = le32_to_cpu(con->in_connect.connect_seq); + if (verify_hello(con) < 0) + return -1; + + /* connect */ /* do we have an existing connection for this peer? */ if (radix_tree_preload(GFP_NOFS) < 0) { derr(10, "ENOMEM in process_accept\n"); - return; + con->error_msg = "out of memory"; + return -1; } spin_lock(&msgr->con_lock); existing = __get_connection(msgr, &con->peer_addr); @@ -1436,11 +1480,11 @@ static void process_accept(struct ceph_connection *con) if (peer_gseq < existing->global_seq) { /* retry_global */ con->global_seq = existing->global_seq; - con->out_global_seq = + con->out_connect.global_seq = cpu_to_le32(con->global_seq); prepare_write_accept_retry(con, - &tag_retry_global, - &con->out_global_seq); + &tag_retry_global, + &con->out_connect.global_seq); } else if (test_bit(LOSSY, &existing->state)) { dout(20, "process_accept replacing existing LOSSY %p\n", existing); @@ -1458,11 +1502,11 @@ static void process_accept(struct ceph_connection *con) /* old attempt or peer didn't get the READY */ /* send retry with peers connect seq */ con->connect_seq = existing->connect_seq; - con->out_connect_seq = + con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); prepare_write_accept_retry(con, &tag_retry_session, - &con->out_connect_seq); + &con->out_connect.connect_seq); } } else if (peer_cseq == existing->connect_seq && (test_bit(CONNECTING, &existing->state) || @@ -1505,6 +1549,7 @@ static void process_accept(struct ceph_connection *con) ceph_queue_con(con); put_connection(con); + return 0; } @@ -1528,7 +1573,10 @@ more: ret = read_accept_partial(con); if (ret <= 0) goto done; - process_accept(con); /* accepted */ + if (process_accept(con) < 0) { + ret = -1; + goto out; + } goto more; } if (test_bit(CONNECTING, &con->state)) { @@ -1609,7 +1657,7 @@ static void con_work(struct work_struct *work) { struct ceph_connection *con = container_of(work, struct ceph_connection, work.work); - + more: if (test_and_set_bit(BUSY, &con->state) != 0) { dout(10, "con_work %p BUSY already set\n", con); @@ -1617,7 +1665,7 @@ more: } dout(10, "con_work %p start, clearing QUEUED\n", con); clear_bit(QUEUED, &con->state); - + if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ dout(5, "con_work CLOSED\n"); goto done; @@ -1628,7 +1676,7 @@ more: } if (test_and_clear_bit(BACKOFF, &con->state)) dout(5, "con_work cleared BACKOFF\n"); - + if (test_and_clear_bit(SOCK_CLOSED, &con->state) || try_read(con) < 0 || try_write(con) < 0) @@ -1682,7 +1730,7 @@ static void try_accept(struct work_struct *work) } dout(5, "accepted connection \n"); - prepare_write_accept_announce(msgr, newcon); + prepare_write_accept_hello(msgr, newcon); add_connection_accepting(msgr, newcon); /* hand off to work queue; we may have missed socket state change */ @@ -1746,7 +1794,7 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr) struct ceph_connection *con; dout(2, "destroy %p\n", msgr); - + /* stop listener */ msgr->listen_sock->ops->shutdown(msgr->listen_sock, SHUT_RDWR); sock_release(msgr->listen_sock); @@ -1763,7 +1811,7 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr) dout(40, " get %p %d -> %d\n", con, atomic_read(&con->nref) - 1, atomic_read(&con->nref)); __remove_connection(msgr, con); - + /* in case there's queued work... */ spin_unlock(&msgr->con_lock); if (cancel_delayed_work_sync(&con->work)) @@ -1820,16 +1868,16 @@ struct ceph_msg *ceph_msg_maybe_dup(struct ceph_msg *old) if (atomic_read(&old->nref) == 1) return old; /* we have only ref, all is well */ - - dup = ceph_msg_new(le32_to_cpu(old->hdr.type), + + dup = ceph_msg_new(le32_to_cpu(old->hdr.type), le32_to_cpu(old->hdr.front_len), - le32_to_cpu(old->hdr.data_len), + le32_to_cpu(old->hdr.data_len), le32_to_cpu(old->hdr.data_off), old->pages); BUG_ON(!dup); memcpy(dup->front.iov_base, old->front.iov_base, le32_to_cpu(old->hdr.front_len)); - + /* revoke old message's pages */ mutex_lock(&old->page_mutex); old->pages = 0; @@ -1871,7 +1919,7 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, derr(10, "ENOMEM in ceph_msg_send\n"); return ret; } - + spin_lock(&msgr->con_lock); con = __get_connection(msgr, &msg->hdr.dst.addr); if (con) { diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h index 33c4a250a3b..846c218b6b4 100644 --- a/src/kernel/messenger.h +++ b/src/kernel/messenger.h @@ -104,14 +104,12 @@ struct ceph_connection { struct ceph_entity_addr peer_addr; /* peer address */ struct ceph_entity_name peer_name; /* peer name */ __u32 connect_seq, global_seq; - __le32 in_connect_seq, out_connect_seq; - __le32 in_global_seq, out_global_seq; + char in_banner[CEPH_BANNER_MAX_LEN]; + struct ceph_msg_connect out_connect, in_connect; + struct ceph_entity_addr actual_peer_addr; __u32 out_seq; /* last message queued for send */ __u32 in_seq, in_seq_acked; /* last message received, acked */ - /* connect state */ - struct ceph_entity_addr actual_peer_addr; - /* out queue */ spinlock_t out_queue_lock; /* protects out_queue, out_sent, out_seq */ struct list_head out_queue; diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 21f30f32dd6..6d2f6d35a6e 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -763,7 +763,15 @@ int Rank::Pipe::accept() assert(state == STATE_ACCEPTING); // announce myself. - int rc = tcp_write(sd, (char*)&rank.rank_addr, sizeof(rank.rank_addr)); + int rc = tcp_write(sd, CEPH_BANNER, strlen(CEPH_BANNER)); + if (rc < 0) { + dout(10) << "accept couldn't write banner" << dendl; + state = STATE_CLOSED; + return -1; + } + + // and my addr + rc = tcp_write(sd, (char*)&rank.rank_addr, sizeof(rank.rank_addr)); if (rc < 0) { dout(10) << "accept couldn't write my addr" << dendl; state = STATE_CLOSED; @@ -772,9 +780,21 @@ int Rank::Pipe::accept() dout(10) << "accept sd=" << sd << dendl; // identify peer + char banner[strlen(CEPH_BANNER)]; + rc = tcp_read(sd, banner, strlen(CEPH_BANNER)); + if (rc < 0) { + dout(10) << "accept couldn't read peer_addr" << dendl; + state = STATE_CLOSED; + return -1; + } + if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) { + dout(10) << "accept peer sent bad banner" << dendl; + state = STATE_CLOSED; + return -1; + } rc = tcp_read(sd, (char*)&peer_addr, sizeof(peer_addr)); if (rc < 0) { - dout(10) << "accept couldn't read peer addr" << dendl; + dout(10) << "accept couldn't read peer_addr" << dendl; state = STATE_CLOSED; return -1; } @@ -792,24 +812,19 @@ int Rank::Pipe::accept() dout(2) << "accept peer says " << old_addr << ", socket says " << peer_addr << dendl; } - __u32 peer_gseq, peer_cseq; + ceph_msg_connect connect; Pipe *existing = 0; // this should roughly mirror pseudocode at // http://ceph.newdream.net/wiki/Messaging_protocol while (1) { - rc = tcp_read(sd, (char*)&peer_gseq, sizeof(peer_gseq)); + rc = tcp_read(sd, (char*)&connect, sizeof(connect)); if (rc < 0) { - dout(10) << "accept couldn't read connect peer_gseq" << dendl; + dout(10) << "accept couldn't read connect" << dendl; goto fail; } - rc = tcp_read(sd, (char*)&peer_cseq, sizeof(peer_cseq)); - if (rc < 0) { - dout(10) << "accept couldn't read connect peer_seq" << dendl; - goto fail; - } - dout(20) << "accept got peer_connect_seq " << peer_cseq << dendl; + dout(20) << "accept got peer_connect_seq " << connect.connect_seq << dendl; rank.lock.Lock(); @@ -818,10 +833,11 @@ int Rank::Pipe::accept() existing = rank.rank_pipe[peer_addr]; existing->lock.Lock(); - if (peer_gseq < existing->peer_global_seq) { + if (connect.global_seq < existing->peer_global_seq) { dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq - << " > " << peer_gseq << ", RETRY_GLOBAL" << dendl; - __u32 gseq = existing->peer_global_seq; // so we can send it below.. + << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl; + __le32 gseq; + gseq = existing->peer_global_seq; // so we can send it below.. existing->lock.Unlock(); rank.lock.Unlock(); char tag = CEPH_MSGR_TAG_RETRY_GLOBAL; @@ -838,16 +854,17 @@ int Rank::Pipe::accept() goto replace; } - if (peer_cseq < existing->connect_seq) { - if (peer_cseq == 0) { + if (connect.connect_seq < existing->connect_seq) { + if (connect.connect_seq == 0) { dout(-10) << "accept peer reset, then tried to connect to us, replacing" << dendl; existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s goto replace; } else { // old attempt, or we sent READY but they didn't get it. dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq - << " > " << peer_cseq << ", RETRY_SESSION" << dendl; - __u32 cseq = existing->connect_seq; // so we can send it below.. + << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl; + __le32 cseq; + cseq = existing->connect_seq; // so we can send it below.. existing->lock.Unlock(); rank.lock.Unlock(); char tag = CEPH_MSGR_TAG_RETRY_SESSION; @@ -859,19 +876,19 @@ int Rank::Pipe::accept() } } - if (peer_cseq == existing->connect_seq) { + if (connect.connect_seq == existing->connect_seq) { // connection race if (peer_addr < rank.rank_addr) { // incoming wins dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq - << " == " << peer_cseq << ", replacing my attempt" << dendl; + << " == " << connect.connect_seq << ", replacing my attempt" << dendl; assert(existing->state == STATE_CONNECTING || existing->state == STATE_WAIT); goto replace; } else { // our existing outgoing wins dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq - << " == " << peer_cseq << ", sending WAIT" << dendl; + << " == " << connect.connect_seq << ", sending WAIT" << dendl; assert(peer_addr > rank.rank_addr); assert(existing->state == STATE_CONNECTING); // this will win existing->lock.Unlock(); @@ -884,10 +901,10 @@ int Rank::Pipe::accept() } } - assert(peer_cseq > existing->connect_seq); - assert(peer_gseq >= existing->peer_global_seq); + assert(connect.connect_seq > existing->connect_seq); + assert(connect.global_seq >= existing->peer_global_seq); if (existing->connect_seq == 0) { - dout(0) << "accept we reset (peer sent cseq " << peer_cseq + dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq << ", " << existing << ".cseq = " << existing->connect_seq << "), sending RESETSESSION" << dendl; rank.lock.Unlock(); @@ -898,15 +915,15 @@ int Rank::Pipe::accept() continue; } else { // reconnect - dout(10) << "accept peer sent cseq " << peer_cseq + dout(10) << "accept peer sent cseq " << connect.connect_seq << " > " << existing->connect_seq << dendl; goto replace; } assert(0); } // existing - else if (peer_cseq > 0) { + else if (connect.connect_seq > 0) { // we reset, and they are opening a new session - dout(0) << "accept we reset (peer sent cseq " << peer_cseq << "), sending RESETSESSION" << dendl; + dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl; rank.lock.Unlock(); char tag = CEPH_MSGR_TAG_RESETSESSION; if (tcp_write(sd, &tag, 1) < 0) @@ -942,8 +959,8 @@ int Rank::Pipe::accept() register_pipe(); rank.lock.Unlock(); - connect_seq = peer_cseq + 1; - peer_global_seq = peer_gseq; + connect_seq = connect.connect_seq + 1; + peer_global_seq = connect.global_seq; dout(10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl; // send READY @@ -985,11 +1002,12 @@ int Rank::Pipe::connect() char tag = -1; int rc; struct sockaddr_in myAddr; - entity_addr_t paddr; struct msghdr msg; struct iovec msgvec[2]; int msglen; - + char banner[strlen(CEPH_BANNER)]; + entity_addr_t paddr; + // create socket? newsd = ::socket(AF_INET, SOCK_STREAM, 0); if (newsd < 0) { @@ -1022,6 +1040,29 @@ int Rank::Pipe::connect() dout(0) << "connect couldn't set TCP_NODELAY: " << strerror(errno) << dendl; } + // verify banner + // FIXME: this should be non-blocking, or in some other way verify the banner as we get it. + rc = tcp_read(newsd, (char*)&banner, strlen(CEPH_BANNER)); + if (rc < 0) { + dout(2) << "connect couldn't read banner, " << strerror(errno) << dendl; + goto fail; + } + if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) { + dout(0) << "connect protocol error (bad banner) on peer " << paddr << dendl; + goto fail; + } + + memset(&msg, 0, sizeof(msg)); + msgvec[0].iov_base = banner; + msgvec[0].iov_len = strlen(banner); + msg.msg_iov = msgvec; + msg.msg_iovlen = 1; + msglen = msgvec[0].iov_len; + if (do_sendmsg(newsd, &msg, msglen)) { + dout(2) << "connect couldn't write my banner, " << strerror(errno) << dendl; + goto fail; + } + // identify peer rc = tcp_read(newsd, (char*)&paddr, sizeof(paddr)); if (rc < 0) { @@ -1040,7 +1081,7 @@ int Rank::Pipe::connect() goto fail; } } - + // identify myself, and send initial cseq memset(&msg, 0, sizeof(msg)); msgvec[0].iov_base = (char*)&rank.rank_addr; @@ -1049,19 +1090,20 @@ int Rank::Pipe::connect() msg.msg_iovlen = 1; msglen = msgvec[0].iov_len; if (do_sendmsg(newsd, &msg, msglen)) { - dout(2) << "connect couldn't write self addr, " << strerror(errno) << dendl; + dout(2) << "connect couldn't write my addr, " << strerror(errno) << dendl; goto fail; } while (1) { + ceph_msg_connect connect; + connect.global_seq = gseq; + connect.connect_seq = cseq; memset(&msg, 0, sizeof(msg)); - msgvec[0].iov_base = (char*)&gseq; - msgvec[0].iov_len = sizeof(gseq); - msgvec[1].iov_base = (char*)&cseq; - msgvec[1].iov_len = sizeof(cseq); + msgvec[0].iov_base = (char*)&connect; + msgvec[0].iov_len = sizeof(connect); msg.msg_iov = msgvec; - msg.msg_iovlen = 2; - msglen = msgvec[0].iov_len + msgvec[1].iov_len; + msg.msg_iovlen = 1; + msglen = msgvec[0].iov_len; dout(10) << "connect sending gseq " << gseq << " cseq " << cseq << dendl; if (do_sendmsg(newsd, &msg, msglen)) { @@ -1785,7 +1827,8 @@ int Rank::Pipe::write_ack(unsigned seq) dout(10) << "write_ack " << seq << dendl; char c = CEPH_MSGR_TAG_ACK; - __u32 s = seq;/*cpu_to_le32(seq);*/ + __le32 s; + s = seq; struct msghdr msg; memset(&msg, 0, sizeof(msg));