Merge branch 'msgr' into unstable

This commit is contained in:
Sage Weil 2010-03-30 11:45:00 -07:00
commit 858314367f
8 changed files with 126 additions and 57 deletions

View File

@ -54,14 +54,15 @@
* feature bits
*/
#define CEPH_FEATURE_UID 1
#define CEPH_FEATURE_NOSRCADDR 2
#define CEPH_FEATURE_SUPPORTED_MON CEPH_FEATURE_UID
#define CEPH_FEATURE_SUPPORTED_MON CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR
#define CEPH_FEATURE_REQUIRED_MON CEPH_FEATURE_UID
#define CEPH_FEATURE_SUPPORTED_MDS CEPH_FEATURE_UID
#define CEPH_FEATURE_SUPPORTED_MDS CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR
#define CEPH_FEATURE_REQUIRED_MDS CEPH_FEATURE_UID
#define CEPH_FEATURE_SUPPORTED_OSD CEPH_FEATURE_UID
#define CEPH_FEATURE_SUPPORTED_OSD CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR
#define CEPH_FEATURE_REQUIRED_OSD CEPH_FEATURE_UID
#define CEPH_FEATURE_SUPPORTED_CLIENT 0
#define CEPH_FEATURE_SUPPORTED_CLIENT CEPH_FEATURE_NOSRCADDR
#define CEPH_FEATURE_REQUIRED_CLIENT 0

View File

@ -119,7 +119,7 @@ struct ceph_msg_connect_reply {
/*
* message header
*/
struct ceph_msg_header {
struct ceph_msg_header_old {
__le64 seq; /* message seq# for this session */
__le64 tid; /* transaction id */
__le16 type; /* message type */
@ -137,6 +137,24 @@ struct ceph_msg_header {
__le32 crc; /* header crc32c */
} __attribute__ ((packed));
struct ceph_msg_header {
__le64 seq; /* message seq# for this session */
__le64 tid; /* transaction id */
__le16 type; /* message type */
__le16 priority; /* priority. higher value == higher priority */
__le16 version; /* version of message encoding */
__le32 front_len; /* bytes in main payload */
__le32 middle_len;/* bytes in middle payload */
__le32 data_len; /* bytes of data payload */
__le16 data_off; /* sender: include full offset;
receiver: mask against ~PAGE_MASK */
struct ceph_entity_name src;
__le32 reserved;
__le32 crc; /* header crc32c */
} __attribute__ ((packed));
#define CEPH_MSG_PRIO_LOW 64
#define CEPH_MSG_PRIO_DEFAULT 127
#define CEPH_MSG_PRIO_HIGH 196

View File

@ -324,7 +324,7 @@ void MDS::forward_message_mds(Message *m, int mds)
// client request?
if (m->get_type() == CEPH_MSG_CLIENT_REQUEST &&
((MClientRequest*)m)->get_orig_source().is_client()) {
((MClientRequest*)m)->get_source().is_client()) {
MClientRequest *creq = (MClientRequest*)m;
creq->inc_num_fwd(); // inc forward counter
@ -340,7 +340,7 @@ void MDS::forward_message_mds(Message *m, int mds)
// tell the client where it should go
messenger->send_message(new MClientRequestForward(creq->get_tid(), mds, creq->get_num_fwd(),
client_must_resend),
creq->get_orig_source_inst());
creq->get_source_inst());
if (client_must_resend) {
delete m;

View File

@ -100,7 +100,7 @@ void Server::dispatch(Message *m)
// active?
if (!mds->is_active() &&
!(mds->is_stopping() && m->get_orig_source().is_mds())) {
!(mds->is_stopping() && m->get_source().is_mds())) {
if ((mds->is_reconnect() || mds->get_want_state() == CEPH_MDS_STATE_RECONNECT) &&
m->get_type() == CEPH_MSG_CLIENT_REQUEST &&
((MClientRequest*)m)->is_replay()) {
@ -752,7 +752,7 @@ void Server::early_reply(MDRequest *mdr, CInode *tracei, CDentry *tracedn)
}
MClientRequest *req = mdr->client_request;
entity_inst_t client_inst = req->get_orig_source_inst();
entity_inst_t client_inst = req->get_source_inst();
if (client_inst.name.is_mds())
return;
@ -824,7 +824,7 @@ void Server::reply_request(MDRequest *mdr, MClientReply *reply, CInode *tracei,
bool is_replay = mdr->client_request->is_replay();
bool did_early_reply = mdr->did_early_reply;
Session *session = mdr->session;
entity_inst_t client_inst = req->get_orig_source_inst();
entity_inst_t client_inst = req->get_source_inst();
int dentry_wanted = req->get_dentry_wanted();
if (!did_early_reply && !is_replay) {
@ -986,10 +986,10 @@ void Server::handle_client_request(MClientRequest *req)
// active session?
Session *session = 0;
if (req->get_orig_source().is_client()) {
if (req->get_source().is_client()) {
session = get_session(req);
if (!session) {
dout(5) << "no session for " << req->get_orig_source() << ", dropping" << dendl;
dout(5) << "no session for " << req->get_source() << ", dropping" << dendl;
delete req;
return;
}
@ -1014,7 +1014,7 @@ void Server::handle_client_request(MClientRequest *req)
assert(session);
if (session->have_completed_request(req->get_reqid().tid)) {
dout(5) << "already completed " << req->get_reqid() << dendl;
mds->messenger->send_message(new MClientReply(req, 0), req->get_orig_source_inst());
mds->messenger->send_message(new MClientReply(req, 0), req->get_source_inst());
if (req->is_replay())
mds->queue_one_replay();
@ -1571,7 +1571,7 @@ CInode* Server::prepare_new_inode(MDRequest *mdr, CDir *dir, inodeno_t useino, u
if (useino && useino != in->inode.ino) {
dout(0) << "WARNING: client specified " << useino << " and i allocated " << in->inode.ino << dendl;
stringstream ss;
ss << mdr->client_request->get_orig_source() << " specified ino " << useino
ss << mdr->client_request->get_source() << " specified ino " << useino
<< " but mds" << mds->whoami << " allocated " << in->inode.ino;
mds->logclient.log(LOG_ERROR, ss);
//assert(0); // just for now.
@ -1630,7 +1630,7 @@ void Server::journal_allocated_inos(MDRequest *mdr, EMetaBlob *blob)
blob->set_ino_alloc(mdr->alloc_ino,
mdr->used_prealloc_ino,
mdr->prealloc_inos,
mdr->client_request->get_orig_source(),
mdr->client_request->get_source(),
mds->sessionmap.projected,
mds->inotable->get_projected_version());
}
@ -1923,7 +1923,7 @@ void Server::handle_client_stat(MDRequest *mdr)
return;
mds->balancer->hit_inode(g_clock.now(), ref, META_POP_IRD,
mdr->client_request->get_orig_source().num());
mdr->client_request->get_source().num());
// reply
dout(10) << "reply to stat on " << *req << dendl;
@ -2107,12 +2107,12 @@ void Server::handle_client_open(MDRequest *mdr)
Capability *cap = mds->locker->issue_new_caps(cur, cmode, mdr->session, 0, req->is_replay());
if (cap)
dout(12) << "open issued caps " << ccap_string(cap->pending())
<< " for " << req->get_orig_source()
<< " for " << req->get_source()
<< " on " << *cur << dendl;
} else {
int caps = ceph_caps_for_mode(cmode);
dout(12) << "open issued IMMUTABLE SNAP caps " << ccap_string(caps)
<< " for " << req->get_orig_source()
<< " for " << req->get_source()
<< " snapid " << mdr->snapid
<< " on " << *cur << dendl;
mdr->snap_caps = caps;
@ -2140,7 +2140,7 @@ void Server::handle_client_open(MDRequest *mdr)
mds->balancer->hit_inode(mdr->now, cur, META_POP_IWR);
else
mds->balancer->hit_inode(mdr->now, cur, META_POP_IRD,
mdr->client_request->get_orig_source().num());
mdr->client_request->get_source().num());
CDentry *dn = 0;
if (req->get_dentry_wanted()) {
@ -2282,7 +2282,7 @@ void Server::handle_client_openc(MDRequest *mdr)
void Server::handle_client_readdir(MDRequest *mdr)
{
MClientRequest *req = mdr->client_request;
client_t client = req->get_orig_source().num();
client_t client = req->get_source().num();
set<SimpleLock*> rdlocks, wrlocks, xlocks;
CInode *diri = rdlock_path_pin_ref(mdr, 0, rdlocks, false);
if (!diri) return;

View File

@ -55,6 +55,7 @@ struct MForward : public Message {
::decode(client, p);
::decode(client_caps, p);
msg = (PaxosServiceMessage *)decode_message(p);
msg->set_orig_source_inst(client);
}
const char *get_type_name() { return "forward"; }

View File

@ -163,9 +163,10 @@ struct Connection : public RefCountedObject {
RefCountedObject *priv;
int peer_type;
entity_addr_t peer_addr;
unsigned features;
public:
Connection() : nref(1), lock("Connection::lock"), priv(NULL), peer_type(-1) {}
Connection() : nref(1), lock("Connection::lock"), priv(NULL), peer_type(-1), features(0) {}
~Connection() {
//generic_dout(0) << "~Connection " << this << dendl;
if (priv) {
@ -202,6 +203,10 @@ public:
const entity_addr_t& get_peer_addr() { return peer_addr; }
void set_peer_addr(const entity_addr_t& a) { peer_addr = a; }
int get_features() const { return features; }
bool has_feature(int f) const { return features & f; }
void set_features(unsigned f) { features = f; }
void set_feature(unsigned f) { features |= f; }
};
@ -220,15 +225,18 @@ protected:
Connection *connection;
friend class Messenger;
bool _forwarded;
entity_inst_t _orig_source_inst;
public:
atomic_t nref;
Message() : connection(NULL), nref(0) {
Message() : connection(NULL), _forwarded(false), nref(0) {
memset(&header, 0, sizeof(header));
memset(&footer, 0, sizeof(footer));
};
Message(int t) : connection(NULL), nref(0) {
Message(int t) : connection(NULL), _forwarded(false), nref(0) {
memset(&header, 0, sizeof(header));
header.type = t;
header.version = 1;
@ -308,15 +316,34 @@ public:
void set_priority(__s16 p) { header.priority = p; }
// source/dest
entity_inst_t get_source_inst() { return entity_inst_t(header.src); }
entity_name_t get_source() { return entity_name_t(header.src.name); }
entity_addr_t get_source_addr() { return entity_addr_t(header.src.addr); }
void set_source_inst(entity_inst_t& inst) { header.src = inst; }
entity_inst_t get_source_inst() {
return entity_inst_t(get_source(), get_source_addr());
}
entity_name_t get_source() {
return entity_name_t(header.src);
}
entity_addr_t get_source_addr() {
if (connection)
return connection->get_peer_addr();
return entity_addr_t();
}
entity_inst_t get_orig_source_inst() { return entity_inst_t(header.orig_src); }
entity_name_t get_orig_source() { return entity_name_t(header.orig_src.name); }
entity_addr_t get_orig_source_addr() { return entity_addr_t(header.orig_src.addr); }
void set_orig_source_inst(entity_inst_t &i) { header.orig_src = i; }
// forwarded?
entity_inst_t get_orig_source_inst() {
if (_forwarded)
return _orig_source_inst;
return get_source_inst();
}
entity_name_t get_orig_source() {
return get_orig_source_inst().name;
}
entity_addr_t get_orig_source_addr() {
return get_orig_source_inst().addr;
}
void set_orig_source_inst(entity_inst_t& i) {
_forwarded = true;
_orig_source_inst = i;
}
// virtual bits
virtual void decode_payload() = 0;

View File

@ -97,7 +97,7 @@ protected:
if ((*p)->ms_dispatch(m))
return;
generic_dout(0) << "unhandled message " << m << " " << *m
<< " from " << m->get_orig_source_inst()
<< " from " << m->get_source_inst()
<< dendl;
assert(0);
}

View File

@ -380,8 +380,7 @@ void SimpleMessenger::prepare_dest(const entity_inst_t& inst)
int SimpleMessenger::send_message(Message *m, const entity_inst_t& dest)
{
// set envelope
m->get_header().src = get_myinst();
m->get_header().orig_src = m->get_header().src;
m->get_header().src = get_myname();
if (!m->get_priority()) m->set_priority(get_default_send_priority());
@ -399,8 +398,7 @@ int SimpleMessenger::send_message(Message *m, const entity_inst_t& dest)
int SimpleMessenger::lazy_send_message(Message *m, const entity_inst_t& dest)
{
// set envelope
m->get_header().src = get_myinst();
m->get_header().orig_src = m->get_header().src;
m->get_header().src = get_myname();
if (!m->get_priority()) m->set_priority(get_default_send_priority());
@ -848,6 +846,9 @@ int SimpleMessenger::Pipe::accept()
if (policy.lossy)
reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
connection_state->set_features((int)reply.features & (int)connect.features);
dout(10) << "accept features " << connection_state->get_features() << dendl;
// ok!
register_pipe();
messenger->lock.Unlock();
@ -1157,7 +1158,9 @@ int SimpleMessenger::Pipe::connect()
connect_seq = cseq + 1;
assert(connect_seq == reply.connect_seq);
backoff = utime_t();
dout(20) << "connect success " << connect_seq << ", lossy = " << policy.lossy << dendl;
connection_state->set_features((unsigned)reply.features & (unsigned)connect.features);
dout(20) << "connect success " << connect_seq << ", lossy = " << policy.lossy
<< ", features " << connection_state->get_features() << dendl;
if (!messenger->destination_stopped) {
Connection * cstate = connection_state->get();
@ -1679,33 +1682,37 @@ Message *SimpleMessenger::Pipe::read_message()
ceph_msg_header header;
ceph_msg_footer footer;
if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0)
return 0;
__u32 header_crc;
if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0)
return 0;
header_crc = crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
} else {
ceph_msg_header_old oldheader;
if (tcp_read( sd, (char*)&oldheader, sizeof(oldheader) ) < 0)
return 0;
// this is fugly
memcpy(&header, &oldheader, sizeof(header));
header.src = oldheader.src.name;
header.reserved = oldheader.reserved;
header.crc = oldheader.crc;
header_crc = crc32c_le(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc));
}
dout(20) << "reader got envelope type=" << header.type
<< " src " << header.src
<< " src " << entity_name_t(header.src)
<< " front=" << header.front_len
<< " data=" << header.data_len
<< " off " << header.data_off
<< dendl;
// verify header crc
__u32 header_crc = crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
if (header_crc != header.crc) {
dout(0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
return 0;
}
// ok, now it's safe to change the header..
// munge source address?
entity_addr_t srcaddr = header.src.addr;
if (srcaddr.is_blank_addr()) {
dout(10) << "reader munging src addr " << header.src << " to be " << peer_addr << dendl;
ceph_entity_addr enc_peer_addr = peer_addr;
header.orig_src.addr.in_addr = header.src.addr.in_addr = enc_peer_addr.in_addr;
}
// read front
bufferlist front;
int front_len = header.front_len;
@ -1775,7 +1782,7 @@ Message *SimpleMessenger::Pipe::read_message()
dout(10) << "aborted = " << aborted << dendl;
if (aborted) {
dout(0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
<< " byte message from " << header.src << ".. ABORTED" << dendl;
<< " byte message.. ABORTED" << dendl;
// MEH FIXME
Message *m = new MGenericMessage(CEPH_MSG_PING);
header.type = CEPH_MSG_PING;
@ -1784,7 +1791,7 @@ Message *SimpleMessenger::Pipe::read_message()
}
dout(20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
<< " byte message from " << header.src << dendl;
<< " byte message" << dendl;
return decode_message(header, footer, front, middle, data);
}
@ -1949,10 +1956,25 @@ int SimpleMessenger::Pipe::write_message(Message *m)
msg.msg_iovlen++;
// send envelope
msgvec[msg.msg_iovlen].iov_base = (char*)&header;
msgvec[msg.msg_iovlen].iov_len = sizeof(header);
msglen += sizeof(header);
msg.msg_iovlen++;
ceph_msg_header_old oldheader;
if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
msgvec[msg.msg_iovlen].iov_base = (char*)&header;
msgvec[msg.msg_iovlen].iov_len = sizeof(header);
msglen += sizeof(header);
msg.msg_iovlen++;
} else {
memcpy(&oldheader, &header, sizeof(header));
oldheader.src.name = header.src;
oldheader.src.addr = connection_state->get_peer_addr();
oldheader.orig_src = oldheader.src;
oldheader.reserved = header.reserved;
oldheader.crc = crc32c_le(0, (unsigned char*)&oldheader,
sizeof(oldheader) - sizeof(oldheader.crc));
msgvec[msg.msg_iovlen].iov_base = (char*)&oldheader;
msgvec[msg.msg_iovlen].iov_len = sizeof(oldheader);
msglen += sizeof(oldheader);
msg.msg_iovlen++;
}
// payload (front+data)
list<bufferptr>::const_iterator pb = blist.buffers().begin();