From 94137309db23e71a3a5014c32eebe9d6f06a8a03 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 25 Mar 2010 13:46:16 -0700 Subject: [PATCH 1/6] msgr: make Message::get_orig_* differ only when explicitly directed to This paves the way for removal of the orig_src field from the message header. --- src/messages/MForward.h | 1 + src/msg/Message.h | 26 ++++++++++++++++++++------ 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/messages/MForward.h b/src/messages/MForward.h index 633fd948f33..7a7bc2a7f11 100644 --- a/src/messages/MForward.h +++ b/src/messages/MForward.h @@ -55,6 +55,7 @@ struct MForward : public Message { ::decode(client, p); ::decode(client_caps, p); msg = (PaxosServiceMessage *)decode_message(p); + msg->set_orig_source_inst(client); } const char *get_type_name() { return "forward"; } diff --git a/src/msg/Message.h b/src/msg/Message.h index a9a8f2e5eaa..a93da15be84 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -220,15 +220,18 @@ protected: Connection *connection; friend class Messenger; + + bool _forwarded; + entity_inst_t _orig_source_inst; public: atomic_t nref; - Message() : connection(NULL), nref(0) { + Message() : connection(NULL), _forwarded(false), nref(0) { memset(&header, 0, sizeof(header)); memset(&footer, 0, sizeof(footer)); }; - Message(int t) : connection(NULL), nref(0) { + Message(int t) : connection(NULL), _forwarded(false), nref(0) { memset(&header, 0, sizeof(header)); header.type = t; header.version = 1; @@ -313,10 +316,21 @@ public: entity_addr_t get_source_addr() { return entity_addr_t(header.src.addr); } void set_source_inst(entity_inst_t& inst) { header.src = inst; } - entity_inst_t get_orig_source_inst() { return entity_inst_t(header.orig_src); } - entity_name_t get_orig_source() { return entity_name_t(header.orig_src.name); } - entity_addr_t get_orig_source_addr() { return entity_addr_t(header.orig_src.addr); } - void set_orig_source_inst(entity_inst_t &i) { header.orig_src = i; } + entity_inst_t get_orig_source_inst() { + if (_forwarded) + return _orig_source_inst; + return get_source_inst(); + } + entity_name_t get_orig_source() { + return get_orig_source_inst().name; + } + entity_addr_t get_orig_source_addr() { + return get_orig_source_inst().addr; + } + void set_orig_source_inst(entity_inst_t& i) { + _forwarded = true; + _orig_source_inst = i; + } // virtual bits virtual void decode_payload() = 0; From cd102fb61c05e9efe82f5315cc11ece8b8ded899 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 25 Mar 2010 14:49:56 -0700 Subject: [PATCH 2/6] msgr: use connection for src addr --- src/msg/Message.h | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/msg/Message.h b/src/msg/Message.h index a93da15be84..212649287d7 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -311,11 +311,19 @@ public: void set_priority(__s16 p) { header.priority = p; } // source/dest - entity_inst_t get_source_inst() { return entity_inst_t(header.src); } - entity_name_t get_source() { return entity_name_t(header.src.name); } - entity_addr_t get_source_addr() { return entity_addr_t(header.src.addr); } - void set_source_inst(entity_inst_t& inst) { header.src = inst; } + entity_inst_t get_source_inst() { + return entity_inst_t(get_source(), get_source_addr()); + } + entity_name_t get_source() { + return entity_name_t(header.src.name); + } + entity_addr_t get_source_addr() { + if (connection) + return connection->get_peer_addr(); + return entity_addr_t(); + } + // forwarded? entity_inst_t get_orig_source_inst() { if (_forwarded) return _orig_source_inst; From 9a4b7686c28dde0d67fed8f0fa0f60bc0c37ba6d Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 25 Mar 2010 16:01:34 -0700 Subject: [PATCH 3/6] msgr: put features in connection_state --- src/msg/Message.h | 7 ++++++- src/msg/SimpleMessenger.cc | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/msg/Message.h b/src/msg/Message.h index 212649287d7..601b2ac69cc 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -163,9 +163,10 @@ struct Connection : public RefCountedObject { RefCountedObject *priv; int peer_type; entity_addr_t peer_addr; + unsigned features; public: - Connection() : nref(1), lock("Connection::lock"), priv(NULL), peer_type(-1) {} + Connection() : nref(1), lock("Connection::lock"), priv(NULL), peer_type(-1), features(0) {} ~Connection() { //generic_dout(0) << "~Connection " << this << dendl; if (priv) { @@ -202,6 +203,10 @@ public: const entity_addr_t& get_peer_addr() { return peer_addr; } void set_peer_addr(const entity_addr_t& a) { peer_addr = a; } + int get_features() const { return features; } + bool has_feature(int f) const { return features & f; } + void set_features(unsigned f) { features = f; } + void set_feature(unsigned f) { features |= f; } }; diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 11265737f06..808206bae77 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -848,6 +848,9 @@ int SimpleMessenger::Pipe::accept() if (policy.lossy) reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY; + connection_state->set_features((int)reply.features & (int)connect.features); + dout(10) << "accept features " << connection_state->get_features() << dendl; + // ok! register_pipe(); messenger->lock.Unlock(); @@ -1157,7 +1160,9 @@ int SimpleMessenger::Pipe::connect() connect_seq = cseq + 1; assert(connect_seq == reply.connect_seq); backoff = utime_t(); - dout(20) << "connect success " << connect_seq << ", lossy = " << policy.lossy << dendl; + connection_state->set_features((unsigned)reply.features & (unsigned)connect.features); + dout(20) << "connect success " << connect_seq << ", lossy = " << policy.lossy + << ", features " << connection_state->get_features() << dendl; if (!messenger->destination_stopped) { Connection * cstate = connection_state->get(); From ddf61d067ca3e2e074c4f5ee7be24154795da4ff Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 25 Mar 2010 16:04:18 -0700 Subject: [PATCH 4/6] msgr: NOSRCADDR protocol feature drops src/orig_src in msg header This lets us drop the src, orig_src ceph_entity_addr's from the message header, saving about 160 bytes per message. The feature is optional. We can still talk to peers who use the old protocol. --- src/include/ceph_fs.h | 9 +++--- src/include/msgr.h | 20 +++++++++++- src/msg/Message.h | 2 +- src/msg/SimpleMessenger.cc | 65 ++++++++++++++++++++++++-------------- 4 files changed, 66 insertions(+), 30 deletions(-) diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index d39db15824a..3fc6feaae0d 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -54,14 +54,15 @@ * feature bits */ #define CEPH_FEATURE_UID 1 +#define CEPH_FEATURE_NOSRCADDR 2 -#define CEPH_FEATURE_SUPPORTED_MON CEPH_FEATURE_UID +#define CEPH_FEATURE_SUPPORTED_MON CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR #define CEPH_FEATURE_REQUIRED_MON CEPH_FEATURE_UID -#define CEPH_FEATURE_SUPPORTED_MDS CEPH_FEATURE_UID +#define CEPH_FEATURE_SUPPORTED_MDS CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR #define CEPH_FEATURE_REQUIRED_MDS CEPH_FEATURE_UID -#define CEPH_FEATURE_SUPPORTED_OSD CEPH_FEATURE_UID +#define CEPH_FEATURE_SUPPORTED_OSD CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR #define CEPH_FEATURE_REQUIRED_OSD CEPH_FEATURE_UID -#define CEPH_FEATURE_SUPPORTED_CLIENT 0 +#define CEPH_FEATURE_SUPPORTED_CLIENT CEPH_FEATURE_NOSRCADDR #define CEPH_FEATURE_REQUIRED_CLIENT 0 diff --git a/src/include/msgr.h b/src/include/msgr.h index 72c7623e872..892a0298dfd 100644 --- a/src/include/msgr.h +++ b/src/include/msgr.h @@ -119,7 +119,7 @@ struct ceph_msg_connect_reply { /* * message header */ -struct ceph_msg_header { +struct ceph_msg_header_old { __le64 seq; /* message seq# for this session */ __le64 tid; /* transaction id */ __le16 type; /* message type */ @@ -137,6 +137,24 @@ struct ceph_msg_header { __le32 crc; /* header crc32c */ } __attribute__ ((packed)); +struct ceph_msg_header { + __le64 seq; /* message seq# for this session */ + __le64 tid; /* transaction id */ + __le16 type; /* message type */ + __le16 priority; /* priority. higher value == higher priority */ + __le16 version; /* version of message encoding */ + + __le32 front_len; /* bytes in main payload */ + __le32 middle_len;/* bytes in middle payload */ + __le32 data_len; /* bytes of data payload */ + __le16 data_off; /* sender: include full offset; + receiver: mask against ~PAGE_MASK */ + + struct ceph_entity_name src; + __le32 reserved; + __le32 crc; /* header crc32c */ +} __attribute__ ((packed)); + #define CEPH_MSG_PRIO_LOW 64 #define CEPH_MSG_PRIO_DEFAULT 127 #define CEPH_MSG_PRIO_HIGH 196 diff --git a/src/msg/Message.h b/src/msg/Message.h index 601b2ac69cc..2d1f329f0fd 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -320,7 +320,7 @@ public: return entity_inst_t(get_source(), get_source_addr()); } entity_name_t get_source() { - return entity_name_t(header.src.name); + return entity_name_t(header.src); } entity_addr_t get_source_addr() { if (connection) diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 808206bae77..babbdf7c3b6 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -380,8 +380,7 @@ void SimpleMessenger::prepare_dest(const entity_inst_t& inst) int SimpleMessenger::send_message(Message *m, const entity_inst_t& dest) { // set envelope - m->get_header().src = get_myinst(); - m->get_header().orig_src = m->get_header().src; + m->get_header().src = get_myname(); if (!m->get_priority()) m->set_priority(get_default_send_priority()); @@ -399,8 +398,7 @@ int SimpleMessenger::send_message(Message *m, const entity_inst_t& dest) int SimpleMessenger::lazy_send_message(Message *m, const entity_inst_t& dest) { // set envelope - m->get_header().src = get_myinst(); - m->get_header().orig_src = m->get_header().src; + m->get_header().src = get_myname(); if (!m->get_priority()) m->set_priority(get_default_send_priority()); @@ -1684,33 +1682,37 @@ Message *SimpleMessenger::Pipe::read_message() ceph_msg_header header; ceph_msg_footer footer; - - if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0) - return 0; + __u32 header_crc; + + if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) { + if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0) + return 0; + header_crc = crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc)); + } else { + ceph_msg_header_old oldheader; + if (tcp_read( sd, (char*)&oldheader, sizeof(oldheader) ) < 0) + return 0; + // this is fugly + memcpy(&header, &oldheader, sizeof(header)); + header.src = oldheader.src.name; + header.reserved = oldheader.reserved; + header.crc = oldheader.crc; + header_crc = crc32c_le(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc)); + } dout(20) << "reader got envelope type=" << header.type - << " src " << header.src + << " src " << entity_name_t(header.src) << " front=" << header.front_len << " data=" << header.data_len << " off " << header.data_off << dendl; // verify header crc - __u32 header_crc = crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc)); if (header_crc != header.crc) { dout(0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl; return 0; } - // ok, now it's safe to change the header.. - // munge source address? - entity_addr_t srcaddr = header.src.addr; - if (srcaddr.is_blank_addr()) { - dout(10) << "reader munging src addr " << header.src << " to be " << peer_addr << dendl; - ceph_entity_addr enc_peer_addr = peer_addr; - header.orig_src.addr.in_addr = header.src.addr.in_addr = enc_peer_addr.in_addr; - } - // read front bufferlist front; int front_len = header.front_len; @@ -1780,7 +1782,7 @@ Message *SimpleMessenger::Pipe::read_message() dout(10) << "aborted = " << aborted << dendl; if (aborted) { dout(0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length() - << " byte message from " << header.src << ".. ABORTED" << dendl; + << " byte message.. ABORTED" << dendl; // MEH FIXME Message *m = new MGenericMessage(CEPH_MSG_PING); header.type = CEPH_MSG_PING; @@ -1789,7 +1791,7 @@ Message *SimpleMessenger::Pipe::read_message() } dout(20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length() - << " byte message from " << header.src << dendl; + << " byte message" << dendl; return decode_message(header, footer, front, middle, data); } @@ -1954,10 +1956,25 @@ int SimpleMessenger::Pipe::write_message(Message *m) msg.msg_iovlen++; // send envelope - msgvec[msg.msg_iovlen].iov_base = (char*)&header; - msgvec[msg.msg_iovlen].iov_len = sizeof(header); - msglen += sizeof(header); - msg.msg_iovlen++; + ceph_msg_header_old oldheader; + if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) { + msgvec[msg.msg_iovlen].iov_base = (char*)&header; + msgvec[msg.msg_iovlen].iov_len = sizeof(header); + msglen += sizeof(header); + msg.msg_iovlen++; + } else { + memcpy(&oldheader, &header, sizeof(header)); + oldheader.src.name = header.src; + oldheader.src.addr = connection_state->get_peer_addr(); + oldheader.orig_src = oldheader.src; + oldheader.reserved = header.reserved; + oldheader.crc = crc32c_le(0, (unsigned char*)&oldheader, + sizeof(oldheader) - sizeof(oldheader.crc)); + msgvec[msg.msg_iovlen].iov_base = (char*)&oldheader; + msgvec[msg.msg_iovlen].iov_len = sizeof(oldheader); + msglen += sizeof(oldheader); + msg.msg_iovlen++; + } // payload (front+data) list::const_iterator pb = blist.buffers().begin(); From bf62cc01c4ff5d2cb58f34fc9abca95e265312c7 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 25 Mar 2010 16:09:42 -0700 Subject: [PATCH 5/6] mds: don't use get_orig_source MDS doesn't do any request forwarding, so there is no need. --- src/mds/MDS.cc | 4 ++-- src/mds/Server.cc | 26 +++++++++++++------------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index 584609bb5e8..f23c0111546 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -324,7 +324,7 @@ void MDS::forward_message_mds(Message *m, int mds) // client request? if (m->get_type() == CEPH_MSG_CLIENT_REQUEST && - ((MClientRequest*)m)->get_orig_source().is_client()) { + ((MClientRequest*)m)->get_source().is_client()) { MClientRequest *creq = (MClientRequest*)m; creq->inc_num_fwd(); // inc forward counter @@ -340,7 +340,7 @@ void MDS::forward_message_mds(Message *m, int mds) // tell the client where it should go messenger->send_message(new MClientRequestForward(creq->get_tid(), mds, creq->get_num_fwd(), client_must_resend), - creq->get_orig_source_inst()); + creq->get_source_inst()); if (client_must_resend) { delete m; diff --git a/src/mds/Server.cc b/src/mds/Server.cc index d74bf4b04b4..39efa612421 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -100,7 +100,7 @@ void Server::dispatch(Message *m) // active? if (!mds->is_active() && - !(mds->is_stopping() && m->get_orig_source().is_mds())) { + !(mds->is_stopping() && m->get_source().is_mds())) { if ((mds->is_reconnect() || mds->get_want_state() == CEPH_MDS_STATE_RECONNECT) && m->get_type() == CEPH_MSG_CLIENT_REQUEST && ((MClientRequest*)m)->is_replay()) { @@ -752,7 +752,7 @@ void Server::early_reply(MDRequest *mdr, CInode *tracei, CDentry *tracedn) } MClientRequest *req = mdr->client_request; - entity_inst_t client_inst = req->get_orig_source_inst(); + entity_inst_t client_inst = req->get_source_inst(); if (client_inst.name.is_mds()) return; @@ -824,7 +824,7 @@ void Server::reply_request(MDRequest *mdr, MClientReply *reply, CInode *tracei, bool is_replay = mdr->client_request->is_replay(); bool did_early_reply = mdr->did_early_reply; Session *session = mdr->session; - entity_inst_t client_inst = req->get_orig_source_inst(); + entity_inst_t client_inst = req->get_source_inst(); int dentry_wanted = req->get_dentry_wanted(); if (!did_early_reply && !is_replay) { @@ -986,10 +986,10 @@ void Server::handle_client_request(MClientRequest *req) // active session? Session *session = 0; - if (req->get_orig_source().is_client()) { + if (req->get_source().is_client()) { session = get_session(req); if (!session) { - dout(5) << "no session for " << req->get_orig_source() << ", dropping" << dendl; + dout(5) << "no session for " << req->get_source() << ", dropping" << dendl; delete req; return; } @@ -1014,7 +1014,7 @@ void Server::handle_client_request(MClientRequest *req) assert(session); if (session->have_completed_request(req->get_reqid().tid)) { dout(5) << "already completed " << req->get_reqid() << dendl; - mds->messenger->send_message(new MClientReply(req, 0), req->get_orig_source_inst()); + mds->messenger->send_message(new MClientReply(req, 0), req->get_source_inst()); if (req->is_replay()) mds->queue_one_replay(); @@ -1571,7 +1571,7 @@ CInode* Server::prepare_new_inode(MDRequest *mdr, CDir *dir, inodeno_t useino, u if (useino && useino != in->inode.ino) { dout(0) << "WARNING: client specified " << useino << " and i allocated " << in->inode.ino << dendl; stringstream ss; - ss << mdr->client_request->get_orig_source() << " specified ino " << useino + ss << mdr->client_request->get_source() << " specified ino " << useino << " but mds" << mds->whoami << " allocated " << in->inode.ino; mds->logclient.log(LOG_ERROR, ss); //assert(0); // just for now. @@ -1630,7 +1630,7 @@ void Server::journal_allocated_inos(MDRequest *mdr, EMetaBlob *blob) blob->set_ino_alloc(mdr->alloc_ino, mdr->used_prealloc_ino, mdr->prealloc_inos, - mdr->client_request->get_orig_source(), + mdr->client_request->get_source(), mds->sessionmap.projected, mds->inotable->get_projected_version()); } @@ -1923,7 +1923,7 @@ void Server::handle_client_stat(MDRequest *mdr) return; mds->balancer->hit_inode(g_clock.now(), ref, META_POP_IRD, - mdr->client_request->get_orig_source().num()); + mdr->client_request->get_source().num()); // reply dout(10) << "reply to stat on " << *req << dendl; @@ -2107,12 +2107,12 @@ void Server::handle_client_open(MDRequest *mdr) Capability *cap = mds->locker->issue_new_caps(cur, cmode, mdr->session, 0, req->is_replay()); if (cap) dout(12) << "open issued caps " << ccap_string(cap->pending()) - << " for " << req->get_orig_source() + << " for " << req->get_source() << " on " << *cur << dendl; } else { int caps = ceph_caps_for_mode(cmode); dout(12) << "open issued IMMUTABLE SNAP caps " << ccap_string(caps) - << " for " << req->get_orig_source() + << " for " << req->get_source() << " snapid " << mdr->snapid << " on " << *cur << dendl; mdr->snap_caps = caps; @@ -2140,7 +2140,7 @@ void Server::handle_client_open(MDRequest *mdr) mds->balancer->hit_inode(mdr->now, cur, META_POP_IWR); else mds->balancer->hit_inode(mdr->now, cur, META_POP_IRD, - mdr->client_request->get_orig_source().num()); + mdr->client_request->get_source().num()); CDentry *dn = 0; if (req->get_dentry_wanted()) { @@ -2282,7 +2282,7 @@ void Server::handle_client_openc(MDRequest *mdr) void Server::handle_client_readdir(MDRequest *mdr) { MClientRequest *req = mdr->client_request; - client_t client = req->get_orig_source().num(); + client_t client = req->get_source().num(); set rdlocks, wrlocks, xlocks; CInode *diri = rdlock_path_pin_ref(mdr, 0, rdlocks, false); if (!diri) return; From 3edc9d67bdf646960d945855849fac48cb7ad67e Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 26 Mar 2010 12:31:39 -0700 Subject: [PATCH 6/6] msgr: source, not orig_source, in dbg output --- src/msg/Messenger.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 9572ed2b8a1..0eb55a78cb9 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -97,7 +97,7 @@ protected: if ((*p)->ms_dispatch(m)) return; generic_dout(0) << "unhandled message " << m << " " << *m - << " from " << m->get_orig_source_inst() + << " from " << m->get_source_inst() << dendl; assert(0); }