mirror of
https://github.com/ceph/ceph
synced 2025-01-02 00:52:22 +00:00
msgr: NOSRCADDR protocol feature drops src/orig_src in msg header
This lets us drop the src, orig_src ceph_entity_addr's from the message header, saving about 160 bytes per message. The feature is optional. We can still talk to peers who use the old protocol.
This commit is contained in:
parent
9a4b7686c2
commit
ddf61d067c
@ -54,14 +54,15 @@
|
||||
* feature bits
|
||||
*/
|
||||
#define CEPH_FEATURE_UID 1
|
||||
#define CEPH_FEATURE_NOSRCADDR 2
|
||||
|
||||
#define CEPH_FEATURE_SUPPORTED_MON CEPH_FEATURE_UID
|
||||
#define CEPH_FEATURE_SUPPORTED_MON CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR
|
||||
#define CEPH_FEATURE_REQUIRED_MON CEPH_FEATURE_UID
|
||||
#define CEPH_FEATURE_SUPPORTED_MDS CEPH_FEATURE_UID
|
||||
#define CEPH_FEATURE_SUPPORTED_MDS CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR
|
||||
#define CEPH_FEATURE_REQUIRED_MDS CEPH_FEATURE_UID
|
||||
#define CEPH_FEATURE_SUPPORTED_OSD CEPH_FEATURE_UID
|
||||
#define CEPH_FEATURE_SUPPORTED_OSD CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR
|
||||
#define CEPH_FEATURE_REQUIRED_OSD CEPH_FEATURE_UID
|
||||
#define CEPH_FEATURE_SUPPORTED_CLIENT 0
|
||||
#define CEPH_FEATURE_SUPPORTED_CLIENT CEPH_FEATURE_NOSRCADDR
|
||||
#define CEPH_FEATURE_REQUIRED_CLIENT 0
|
||||
|
||||
|
||||
|
@ -119,7 +119,7 @@ struct ceph_msg_connect_reply {
|
||||
/*
|
||||
* message header
|
||||
*/
|
||||
struct ceph_msg_header {
|
||||
struct ceph_msg_header_old {
|
||||
__le64 seq; /* message seq# for this session */
|
||||
__le64 tid; /* transaction id */
|
||||
__le16 type; /* message type */
|
||||
@ -137,6 +137,24 @@ struct ceph_msg_header {
|
||||
__le32 crc; /* header crc32c */
|
||||
} __attribute__ ((packed));
|
||||
|
||||
struct ceph_msg_header {
|
||||
__le64 seq; /* message seq# for this session */
|
||||
__le64 tid; /* transaction id */
|
||||
__le16 type; /* message type */
|
||||
__le16 priority; /* priority. higher value == higher priority */
|
||||
__le16 version; /* version of message encoding */
|
||||
|
||||
__le32 front_len; /* bytes in main payload */
|
||||
__le32 middle_len;/* bytes in middle payload */
|
||||
__le32 data_len; /* bytes of data payload */
|
||||
__le16 data_off; /* sender: include full offset;
|
||||
receiver: mask against ~PAGE_MASK */
|
||||
|
||||
struct ceph_entity_name src;
|
||||
__le32 reserved;
|
||||
__le32 crc; /* header crc32c */
|
||||
} __attribute__ ((packed));
|
||||
|
||||
#define CEPH_MSG_PRIO_LOW 64
|
||||
#define CEPH_MSG_PRIO_DEFAULT 127
|
||||
#define CEPH_MSG_PRIO_HIGH 196
|
||||
|
@ -320,7 +320,7 @@ public:
|
||||
return entity_inst_t(get_source(), get_source_addr());
|
||||
}
|
||||
entity_name_t get_source() {
|
||||
return entity_name_t(header.src.name);
|
||||
return entity_name_t(header.src);
|
||||
}
|
||||
entity_addr_t get_source_addr() {
|
||||
if (connection)
|
||||
|
@ -380,8 +380,7 @@ void SimpleMessenger::prepare_dest(const entity_inst_t& inst)
|
||||
int SimpleMessenger::send_message(Message *m, const entity_inst_t& dest)
|
||||
{
|
||||
// set envelope
|
||||
m->get_header().src = get_myinst();
|
||||
m->get_header().orig_src = m->get_header().src;
|
||||
m->get_header().src = get_myname();
|
||||
|
||||
if (!m->get_priority()) m->set_priority(get_default_send_priority());
|
||||
|
||||
@ -399,8 +398,7 @@ int SimpleMessenger::send_message(Message *m, const entity_inst_t& dest)
|
||||
int SimpleMessenger::lazy_send_message(Message *m, const entity_inst_t& dest)
|
||||
{
|
||||
// set envelope
|
||||
m->get_header().src = get_myinst();
|
||||
m->get_header().orig_src = m->get_header().src;
|
||||
m->get_header().src = get_myname();
|
||||
|
||||
if (!m->get_priority()) m->set_priority(get_default_send_priority());
|
||||
|
||||
@ -1684,33 +1682,37 @@ Message *SimpleMessenger::Pipe::read_message()
|
||||
|
||||
ceph_msg_header header;
|
||||
ceph_msg_footer footer;
|
||||
|
||||
if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0)
|
||||
return 0;
|
||||
__u32 header_crc;
|
||||
|
||||
if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
|
||||
if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0)
|
||||
return 0;
|
||||
header_crc = crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
|
||||
} else {
|
||||
ceph_msg_header_old oldheader;
|
||||
if (tcp_read( sd, (char*)&oldheader, sizeof(oldheader) ) < 0)
|
||||
return 0;
|
||||
// this is fugly
|
||||
memcpy(&header, &oldheader, sizeof(header));
|
||||
header.src = oldheader.src.name;
|
||||
header.reserved = oldheader.reserved;
|
||||
header.crc = oldheader.crc;
|
||||
header_crc = crc32c_le(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc));
|
||||
}
|
||||
|
||||
dout(20) << "reader got envelope type=" << header.type
|
||||
<< " src " << header.src
|
||||
<< " src " << entity_name_t(header.src)
|
||||
<< " front=" << header.front_len
|
||||
<< " data=" << header.data_len
|
||||
<< " off " << header.data_off
|
||||
<< dendl;
|
||||
|
||||
// verify header crc
|
||||
__u32 header_crc = crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
|
||||
if (header_crc != header.crc) {
|
||||
dout(0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// ok, now it's safe to change the header..
|
||||
// munge source address?
|
||||
entity_addr_t srcaddr = header.src.addr;
|
||||
if (srcaddr.is_blank_addr()) {
|
||||
dout(10) << "reader munging src addr " << header.src << " to be " << peer_addr << dendl;
|
||||
ceph_entity_addr enc_peer_addr = peer_addr;
|
||||
header.orig_src.addr.in_addr = header.src.addr.in_addr = enc_peer_addr.in_addr;
|
||||
}
|
||||
|
||||
// read front
|
||||
bufferlist front;
|
||||
int front_len = header.front_len;
|
||||
@ -1780,7 +1782,7 @@ Message *SimpleMessenger::Pipe::read_message()
|
||||
dout(10) << "aborted = " << aborted << dendl;
|
||||
if (aborted) {
|
||||
dout(0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
|
||||
<< " byte message from " << header.src << ".. ABORTED" << dendl;
|
||||
<< " byte message.. ABORTED" << dendl;
|
||||
// MEH FIXME
|
||||
Message *m = new MGenericMessage(CEPH_MSG_PING);
|
||||
header.type = CEPH_MSG_PING;
|
||||
@ -1789,7 +1791,7 @@ Message *SimpleMessenger::Pipe::read_message()
|
||||
}
|
||||
|
||||
dout(20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
|
||||
<< " byte message from " << header.src << dendl;
|
||||
<< " byte message" << dendl;
|
||||
return decode_message(header, footer, front, middle, data);
|
||||
}
|
||||
|
||||
@ -1954,10 +1956,25 @@ int SimpleMessenger::Pipe::write_message(Message *m)
|
||||
msg.msg_iovlen++;
|
||||
|
||||
// send envelope
|
||||
msgvec[msg.msg_iovlen].iov_base = (char*)&header;
|
||||
msgvec[msg.msg_iovlen].iov_len = sizeof(header);
|
||||
msglen += sizeof(header);
|
||||
msg.msg_iovlen++;
|
||||
ceph_msg_header_old oldheader;
|
||||
if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
|
||||
msgvec[msg.msg_iovlen].iov_base = (char*)&header;
|
||||
msgvec[msg.msg_iovlen].iov_len = sizeof(header);
|
||||
msglen += sizeof(header);
|
||||
msg.msg_iovlen++;
|
||||
} else {
|
||||
memcpy(&oldheader, &header, sizeof(header));
|
||||
oldheader.src.name = header.src;
|
||||
oldheader.src.addr = connection_state->get_peer_addr();
|
||||
oldheader.orig_src = oldheader.src;
|
||||
oldheader.reserved = header.reserved;
|
||||
oldheader.crc = crc32c_le(0, (unsigned char*)&oldheader,
|
||||
sizeof(oldheader) - sizeof(oldheader.crc));
|
||||
msgvec[msg.msg_iovlen].iov_base = (char*)&oldheader;
|
||||
msgvec[msg.msg_iovlen].iov_len = sizeof(oldheader);
|
||||
msglen += sizeof(oldheader);
|
||||
msg.msg_iovlen++;
|
||||
}
|
||||
|
||||
// payload (front+data)
|
||||
list<bufferptr>::const_iterator pb = blist.buffers().begin();
|
||||
|
Loading…
Reference in New Issue
Block a user