From ddf61d067ca3e2e074c4f5ee7be24154795da4ff Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 25 Mar 2010 16:04:18 -0700 Subject: [PATCH] msgr: NOSRCADDR protocol feature drops src/orig_src in msg header This lets us drop the src, orig_src ceph_entity_addr's from the message header, saving about 160 bytes per message. The feature is optional. We can still talk to peers who use the old protocol. --- src/include/ceph_fs.h | 9 +++--- src/include/msgr.h | 20 +++++++++++- src/msg/Message.h | 2 +- src/msg/SimpleMessenger.cc | 65 ++++++++++++++++++++++++-------------- 4 files changed, 66 insertions(+), 30 deletions(-) diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index d39db15824a..3fc6feaae0d 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -54,14 +54,15 @@ * feature bits */ #define CEPH_FEATURE_UID 1 +#define CEPH_FEATURE_NOSRCADDR 2 -#define CEPH_FEATURE_SUPPORTED_MON CEPH_FEATURE_UID +#define CEPH_FEATURE_SUPPORTED_MON CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR #define CEPH_FEATURE_REQUIRED_MON CEPH_FEATURE_UID -#define CEPH_FEATURE_SUPPORTED_MDS CEPH_FEATURE_UID +#define CEPH_FEATURE_SUPPORTED_MDS CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR #define CEPH_FEATURE_REQUIRED_MDS CEPH_FEATURE_UID -#define CEPH_FEATURE_SUPPORTED_OSD CEPH_FEATURE_UID +#define CEPH_FEATURE_SUPPORTED_OSD CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR #define CEPH_FEATURE_REQUIRED_OSD CEPH_FEATURE_UID -#define CEPH_FEATURE_SUPPORTED_CLIENT 0 +#define CEPH_FEATURE_SUPPORTED_CLIENT CEPH_FEATURE_NOSRCADDR #define CEPH_FEATURE_REQUIRED_CLIENT 0 diff --git a/src/include/msgr.h b/src/include/msgr.h index 72c7623e872..892a0298dfd 100644 --- a/src/include/msgr.h +++ b/src/include/msgr.h @@ -119,7 +119,7 @@ struct ceph_msg_connect_reply { /* * message header */ -struct ceph_msg_header { +struct ceph_msg_header_old { __le64 seq; /* message seq# for this session */ __le64 tid; /* transaction id */ __le16 type; /* message type */ @@ -137,6 +137,24 @@ struct ceph_msg_header { __le32 crc; /* header crc32c */ } __attribute__ ((packed)); +struct ceph_msg_header { + __le64 seq; /* message seq# for this session */ + __le64 tid; /* transaction id */ + __le16 type; /* message type */ + __le16 priority; /* priority. higher value == higher priority */ + __le16 version; /* version of message encoding */ + + __le32 front_len; /* bytes in main payload */ + __le32 middle_len;/* bytes in middle payload */ + __le32 data_len; /* bytes of data payload */ + __le16 data_off; /* sender: include full offset; + receiver: mask against ~PAGE_MASK */ + + struct ceph_entity_name src; + __le32 reserved; + __le32 crc; /* header crc32c */ +} __attribute__ ((packed)); + #define CEPH_MSG_PRIO_LOW 64 #define CEPH_MSG_PRIO_DEFAULT 127 #define CEPH_MSG_PRIO_HIGH 196 diff --git a/src/msg/Message.h b/src/msg/Message.h index 601b2ac69cc..2d1f329f0fd 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -320,7 +320,7 @@ public: return entity_inst_t(get_source(), get_source_addr()); } entity_name_t get_source() { - return entity_name_t(header.src.name); + return entity_name_t(header.src); } entity_addr_t get_source_addr() { if (connection) diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 808206bae77..babbdf7c3b6 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -380,8 +380,7 @@ void SimpleMessenger::prepare_dest(const entity_inst_t& inst) int SimpleMessenger::send_message(Message *m, const entity_inst_t& dest) { // set envelope - m->get_header().src = get_myinst(); - m->get_header().orig_src = m->get_header().src; + m->get_header().src = get_myname(); if (!m->get_priority()) m->set_priority(get_default_send_priority()); @@ -399,8 +398,7 @@ int SimpleMessenger::send_message(Message *m, const entity_inst_t& dest) int SimpleMessenger::lazy_send_message(Message *m, const entity_inst_t& dest) { // set envelope - m->get_header().src = get_myinst(); - m->get_header().orig_src = m->get_header().src; + m->get_header().src = get_myname(); if (!m->get_priority()) m->set_priority(get_default_send_priority()); @@ -1684,33 +1682,37 @@ Message *SimpleMessenger::Pipe::read_message() ceph_msg_header header; ceph_msg_footer footer; - - if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0) - return 0; + __u32 header_crc; + + if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) { + if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0) + return 0; + header_crc = crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc)); + } else { + ceph_msg_header_old oldheader; + if (tcp_read( sd, (char*)&oldheader, sizeof(oldheader) ) < 0) + return 0; + // this is fugly + memcpy(&header, &oldheader, sizeof(header)); + header.src = oldheader.src.name; + header.reserved = oldheader.reserved; + header.crc = oldheader.crc; + header_crc = crc32c_le(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc)); + } dout(20) << "reader got envelope type=" << header.type - << " src " << header.src + << " src " << entity_name_t(header.src) << " front=" << header.front_len << " data=" << header.data_len << " off " << header.data_off << dendl; // verify header crc - __u32 header_crc = crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc)); if (header_crc != header.crc) { dout(0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl; return 0; } - // ok, now it's safe to change the header.. - // munge source address? - entity_addr_t srcaddr = header.src.addr; - if (srcaddr.is_blank_addr()) { - dout(10) << "reader munging src addr " << header.src << " to be " << peer_addr << dendl; - ceph_entity_addr enc_peer_addr = peer_addr; - header.orig_src.addr.in_addr = header.src.addr.in_addr = enc_peer_addr.in_addr; - } - // read front bufferlist front; int front_len = header.front_len; @@ -1780,7 +1782,7 @@ Message *SimpleMessenger::Pipe::read_message() dout(10) << "aborted = " << aborted << dendl; if (aborted) { dout(0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length() - << " byte message from " << header.src << ".. ABORTED" << dendl; + << " byte message.. ABORTED" << dendl; // MEH FIXME Message *m = new MGenericMessage(CEPH_MSG_PING); header.type = CEPH_MSG_PING; @@ -1789,7 +1791,7 @@ Message *SimpleMessenger::Pipe::read_message() } dout(20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length() - << " byte message from " << header.src << dendl; + << " byte message" << dendl; return decode_message(header, footer, front, middle, data); } @@ -1954,10 +1956,25 @@ int SimpleMessenger::Pipe::write_message(Message *m) msg.msg_iovlen++; // send envelope - msgvec[msg.msg_iovlen].iov_base = (char*)&header; - msgvec[msg.msg_iovlen].iov_len = sizeof(header); - msglen += sizeof(header); - msg.msg_iovlen++; + ceph_msg_header_old oldheader; + if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) { + msgvec[msg.msg_iovlen].iov_base = (char*)&header; + msgvec[msg.msg_iovlen].iov_len = sizeof(header); + msglen += sizeof(header); + msg.msg_iovlen++; + } else { + memcpy(&oldheader, &header, sizeof(header)); + oldheader.src.name = header.src; + oldheader.src.addr = connection_state->get_peer_addr(); + oldheader.orig_src = oldheader.src; + oldheader.reserved = header.reserved; + oldheader.crc = crc32c_le(0, (unsigned char*)&oldheader, + sizeof(oldheader) - sizeof(oldheader.crc)); + msgvec[msg.msg_iovlen].iov_base = (char*)&oldheader; + msgvec[msg.msg_iovlen].iov_len = sizeof(oldheader); + msglen += sizeof(oldheader); + msg.msg_iovlen++; + } // payload (front+data) list::const_iterator pb = blist.buffers().begin();