mirror of
https://github.com/ceph/ceph
synced 2025-01-02 00:52:22 +00:00
msgr: clean up crc handling
This commit is contained in:
parent
e6dd588fb7
commit
6f8d6bb66e
@ -1027,7 +1027,7 @@ static int read_message_partial(struct ceph_connection *con)
|
||||
struct ceph_msg *m = con->in_msg;
|
||||
void *p;
|
||||
int ret;
|
||||
int want, left;
|
||||
int to, want, left;
|
||||
unsigned front_len, data_len, data_off;
|
||||
|
||||
dout(20, "read_message_partial con %p msg %p\n", con, m);
|
||||
@ -1132,8 +1132,9 @@ static int read_message_partial(struct ceph_connection *con)
|
||||
|
||||
no_data:
|
||||
/* footer */
|
||||
while (con->in_base_pos < sizeof(m->hdr) + sizeof(m->footer)) {
|
||||
left = sizeof(m->hdr) + sizeof(m->footer) - con->in_base_pos;
|
||||
to = sizeof(m->hdr) + sizeof(m->footer);
|
||||
while (con->in_base_pos < to) {
|
||||
left = to - con->in_base_pos;
|
||||
ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer +
|
||||
(con->in_base_pos - sizeof(m->hdr)),
|
||||
left);
|
||||
@ -1141,20 +1142,19 @@ no_data:
|
||||
return ret;
|
||||
con->in_base_pos += ret;
|
||||
}
|
||||
|
||||
dout(20, "read_message_partial got msg %p\n", m);
|
||||
|
||||
/* crc ok? */
|
||||
if (con->in_front_crc != con->in_msg->footer.front_crc) {
|
||||
if (con->in_front_crc != m->footer.front_crc) {
|
||||
derr(0, "read_message_partial %p front crc %u != expected %u\n",
|
||||
con->in_msg,
|
||||
con->in_front_crc, con->in_msg->footer.front_crc);
|
||||
con->in_front_crc, m->footer.front_crc);
|
||||
return -EIO;
|
||||
}
|
||||
if (con->in_data_crc != con->in_msg->footer.data_crc) {
|
||||
if (con->in_data_crc != m->footer.data_crc) {
|
||||
derr(0, "read_message_partial %p data crc %u != expected %u\n",
|
||||
con->in_msg,
|
||||
con->in_data_crc, con->in_msg->footer.data_crc);
|
||||
con->in_data_crc, m->footer.data_crc);
|
||||
return -EIO;
|
||||
}
|
||||
|
||||
|
@ -418,9 +418,6 @@ decode_message(ceph_msg_header& header, ceph_msg_footer& footer,
|
||||
|
||||
m->decode_payload();
|
||||
|
||||
m->front_crc = front_crc;
|
||||
m->data_crc = data_crc;
|
||||
|
||||
// done!
|
||||
return m;
|
||||
}
|
||||
|
@ -126,8 +126,6 @@ protected:
|
||||
friend class Messenger;
|
||||
|
||||
public:
|
||||
__u32 front_crc, data_crc;
|
||||
|
||||
Message() { };
|
||||
Message(int t) {
|
||||
header.type = t;
|
||||
@ -154,7 +152,16 @@ public:
|
||||
void set_recv_stamp(utime_t t) { recv_stamp = t; }
|
||||
utime_t get_recv_stamp() { return recv_stamp; }
|
||||
|
||||
// HEADERELOPE ----
|
||||
void calc_header_crc() {
|
||||
header.crc = crc32c_le(0, (unsigned char*)&header,
|
||||
sizeof(header) - sizeof(header.crc));
|
||||
}
|
||||
void calc_front_crc() {
|
||||
footer.front_crc = payload.crc32c(0);
|
||||
}
|
||||
void calc_data_crc() {
|
||||
footer.data_crc = data.crc32c(0);
|
||||
}
|
||||
|
||||
// type
|
||||
int get_type() { return header.type; }
|
||||
|
@ -593,7 +593,7 @@ void Rank::EntityMessenger::dispatch_entry()
|
||||
<< " <== " << m->get_source_inst()
|
||||
<< " ==== " << *m
|
||||
<< " ==== " << m->get_payload().length() << "+" << m->get_data().length()
|
||||
<< " (" << m->front_crc << " " << m->data_crc << ")"
|
||||
<< " (" << m->get_footer().front_crc << " " << m->get_footer().data_crc << ")"
|
||||
<< " " << m
|
||||
<< dendl;
|
||||
dispatch(m);
|
||||
@ -665,12 +665,15 @@ int Rank::EntityMessenger::send_message(Message *m, entity_inst_t dest)
|
||||
m->set_source_inst(_myinst);
|
||||
m->set_orig_source_inst(_myinst);
|
||||
m->set_dest_inst(dest);
|
||||
m->calc_data_crc();
|
||||
|
||||
dout(1) << m->get_source()
|
||||
<< " --> " << dest.name << " " << dest.addr
|
||||
<< " -- " << *m
|
||||
<< " -- " << m
|
||||
<< dendl;
|
||||
<< " -- ?+" << m->get_data().length()
|
||||
<< " (? " << m->get_footer().data_crc << ")"
|
||||
<< " " << m
|
||||
<< dendl;
|
||||
|
||||
rank.submit_message(m, dest.addr);
|
||||
|
||||
@ -682,11 +685,14 @@ int Rank::EntityMessenger::forward_message(Message *m, entity_inst_t dest)
|
||||
// set envelope
|
||||
m->set_source_inst(_myinst);
|
||||
m->set_dest_inst(dest);
|
||||
m->calc_data_crc();
|
||||
|
||||
dout(1) << m->get_source()
|
||||
<< " **> " << dest.name << " " << dest.addr
|
||||
<< " -- " << *m
|
||||
<< " -- " << m
|
||||
<< " -- ?+" << m->get_data().length()
|
||||
<< " (? " << m->get_footer().data_crc << ")"
|
||||
<< " " << m
|
||||
<< dendl;
|
||||
|
||||
rank.submit_message(m, dest.addr);
|
||||
@ -702,11 +708,14 @@ int Rank::EntityMessenger::lazy_send_message(Message *m, entity_inst_t dest)
|
||||
m->set_source_inst(_myinst);
|
||||
m->set_orig_source_inst(_myinst);
|
||||
m->set_dest_inst(dest);
|
||||
m->calc_data_crc();
|
||||
|
||||
dout(1) << "lazy " << m->get_source()
|
||||
<< " --> " << dest.name << " " << dest.addr
|
||||
<< " -- " << *m
|
||||
<< " -- " << m
|
||||
<< " -- ?+" << m->get_data().length()
|
||||
<< " (? " << m->get_footer().data_crc << ")"
|
||||
<< " " << m
|
||||
<< dendl;
|
||||
|
||||
rank.submit_message(m, dest.addr, true);
|
||||
@ -1638,21 +1647,18 @@ void Rank::Pipe::writer()
|
||||
// encode and copy out of *m
|
||||
if (m->empty_payload())
|
||||
m->encode_payload();
|
||||
bufferlist payload, data;
|
||||
payload.claim(m->get_payload());
|
||||
data.claim(m->get_data());
|
||||
ceph_msg_header hdr = m->get_header();
|
||||
m->calc_front_crc();
|
||||
|
||||
lock.Lock();
|
||||
sent.push_back(m); // move to sent list
|
||||
lock.Unlock();
|
||||
|
||||
dout(20) << "writer sending " << m->get_seq() << " " << m << dendl;
|
||||
int rc = write_message(m, &hdr, payload, data);
|
||||
int rc = write_message(m);
|
||||
lock.Lock();
|
||||
|
||||
if (rc < 0) {
|
||||
derr(1) << "writer error sending " << m << " to " << hdr.dst << ", "
|
||||
derr(1) << "writer error sending " << m << " to " << m->get_header().dst << ", "
|
||||
<< errno << ": " << strerror(errno) << dendl;
|
||||
fault();
|
||||
}
|
||||
@ -1710,7 +1716,7 @@ Message *Rank::Pipe::read_message()
|
||||
// verify header crc
|
||||
__u32 header_crc = crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
|
||||
if (header_crc != header.crc) {
|
||||
dout(0) << "reader got bad header crc " << header_crc << " != " << header_crc << dendl;
|
||||
dout(0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -1863,25 +1869,21 @@ int Rank::Pipe::write_ack(unsigned seq)
|
||||
}
|
||||
|
||||
|
||||
int Rank::Pipe::write_message(Message *m, ceph_msg_header *header,
|
||||
bufferlist &payload, bufferlist &data)
|
||||
int Rank::Pipe::write_message(Message *m)
|
||||
{
|
||||
struct ceph_msg_footer f;
|
||||
memset(&f, 0, sizeof(f));
|
||||
ceph_msg_header& header = m->get_header();
|
||||
ceph_msg_footer& footer = m->get_footer();
|
||||
|
||||
// get envelope, buffers
|
||||
header->front_len = payload.length();
|
||||
header->data_len = data.length();
|
||||
header.front_len = m->get_payload().length();
|
||||
header.data_len = m->get_data().length();
|
||||
footer.aborted = 0;
|
||||
m->calc_header_crc();
|
||||
|
||||
// calculate header, footer crc
|
||||
header->crc = crc32c_le(0, (unsigned char*)header, sizeof(*header) - sizeof(header->crc));
|
||||
f.front_crc = payload.crc32c(0);
|
||||
f.data_crc = data.crc32c(0);
|
||||
|
||||
bufferlist blist = payload;
|
||||
blist.append(data);
|
||||
bufferlist blist = m->get_payload();
|
||||
blist.append(m->get_data());
|
||||
|
||||
dout(20) << "write_message " << m << " to " << header->dst << dendl;
|
||||
dout(20) << "write_message " << m << " to " << header.dst << dendl;
|
||||
|
||||
// set up msghdr and iovecs
|
||||
struct msghdr msg;
|
||||
@ -1898,9 +1900,9 @@ int Rank::Pipe::write_message(Message *m, ceph_msg_header *header,
|
||||
msg.msg_iovlen++;
|
||||
|
||||
// send envelope
|
||||
msgvec[msg.msg_iovlen].iov_base = (char*)header;
|
||||
msgvec[msg.msg_iovlen].iov_len = sizeof(*header);
|
||||
msglen += sizeof(*header);
|
||||
msgvec[msg.msg_iovlen].iov_base = (char*)&header;
|
||||
msgvec[msg.msg_iovlen].iov_len = sizeof(header);
|
||||
msglen += sizeof(header);
|
||||
msg.msg_iovlen++;
|
||||
|
||||
// payload (front+data)
|
||||
@ -1948,10 +1950,10 @@ int Rank::Pipe::write_message(Message *m, ceph_msg_header *header,
|
||||
}
|
||||
assert(left == 0);
|
||||
|
||||
// send data footer
|
||||
msgvec[msg.msg_iovlen].iov_base = (void*)&f;
|
||||
msgvec[msg.msg_iovlen].iov_len = sizeof(f);
|
||||
msglen += sizeof(f);
|
||||
// send footer
|
||||
msgvec[msg.msg_iovlen].iov_base = (void*)&footer;
|
||||
msgvec[msg.msg_iovlen].iov_len = sizeof(footer);
|
||||
msglen += sizeof(footer);
|
||||
msg.msg_iovlen++;
|
||||
|
||||
// send
|
||||
|
@ -133,8 +133,7 @@ private:
|
||||
void writer();
|
||||
|
||||
Message *read_message();
|
||||
int write_message(Message *m, ceph_msg_header *env,
|
||||
bufferlist &payload, bufferlist &data);
|
||||
int write_message(Message *m);
|
||||
int do_sendmsg(int sd, struct msghdr *msg, int len);
|
||||
int write_ack(unsigned s);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user