Merge branch 'next'

This commit is contained in:
Sage Weil 2013-01-07 13:12:33 -08:00
commit c8f8c7e652
5 changed files with 54 additions and 45 deletions

View File

@ -164,6 +164,15 @@ void Message::encode(uint64_t features, bool datacrc)
header.compat_version = header.version;
}
calc_front_crc();
// update envelope
header.front_len = get_payload().length();
header.middle_len = get_middle().length();
header.data_len = get_data().length();
calc_header_crc();
footer.flags = CEPH_MSG_FOOTER_COMPLETE;
if (datacrc) {
calc_data_crc();
@ -196,10 +205,9 @@ void Message::encode(uint64_t features, bool datacrc)
}
}
#endif
}
else
} else {
footer.flags = (unsigned)footer.flags | CEPH_MSG_FOOTER_NOCRC;
}
}
void Message::dump(Formatter *f) const

View File

@ -1070,14 +1070,17 @@ void Pipe::requeue_sent(uint64_t max_acked)
list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
while (!sent.empty()) {
Message *m = sent.back();
sent.pop_back();
if (m->get_seq() > max_acked) {
sent.pop_back();
ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq
<< " (" << m->get_seq() << ")" << dendl;
rq.push_front(m);
out_seq--;
} else
sent.clear();
} else {
ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq
<< " <= max_acked " << max_acked << ", discarding" << dendl;
m->put();
}
}
}
@ -1463,18 +1466,44 @@ void Pipe::writer()
sent.push_back(m);
m->get();
}
pipe_lock.Unlock();
ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl;
// associate message with Connection (for benefit of encode_payload)
m->set_connection(connection_state->get());
ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl;
// encode and copy out of *m
m->encode(connection_state->get_features(), !msgr->cct->_conf->ms_nocrc);
// prepare everything
ceph_msg_header& header = m->get_header();
ceph_msg_footer& footer = m->get_footer();
// Now that we have all the crcs calculated, handle the
// digital signature for the message, if the pipe has session
// security set up. Some session security options do not
// actually calculate and check the signature, but they should
// handle the calls to sign_message and check_signature. PLR
if (session_security == NULL) {
ldout(msgr->cct, 20) << "writer no session security" << dendl;
} else {
if (session_security->sign_message(m)) {
ldout(msgr->cct, 20) << "writer failed to sign seq # " << header.seq
<< "): sig = " << footer.sig << dendl;
} else {
ldout(msgr->cct, 20) << "writer signed seq # " << header.seq
<< "): sig = " << footer.sig << dendl;
}
}
bufferlist blist = m->get_payload();
blist.append(m->get_middle());
blist.append(m->get_data());
pipe_lock.Unlock();
ldout(msgr->cct,20) << "writer sending " << m->get_seq() << " " << m << dendl;
int rc = write_message(m);
int rc = write_message(header, footer, blist);
pipe_lock.Lock();
if (rc < 0) {
@ -1854,39 +1883,10 @@ int Pipe::write_keepalive()
}
int Pipe::write_message(Message *m)
int Pipe::write_message(ceph_msg_header& header, ceph_msg_footer& footer, bufferlist& blist)
{
ceph_msg_header& header = m->get_header();
ceph_msg_footer& footer = m->get_footer();
int ret;
// get envelope, buffers
header.front_len = m->get_payload().length();
header.middle_len = m->get_middle().length();
header.data_len = m->get_data().length();
footer.flags = CEPH_MSG_FOOTER_COMPLETE;
m->calc_header_crc();
// Now that we have all the crcs calculated, handle the digital signature for the message, if the
// pipe has session security set up. Some session security options do not actually calculate and
// check the signature, but they should handle the calls to sign_message and check_signature. PLR
if (session_security == NULL) {
ldout(msgr->cct, 20) << "Pipe: write_message: session security NULL for this pipe." << dendl;
} else {
if (session_security->sign_message(m)) {
ldout(msgr->cct, 20) << "Failed to put signature in client message (seq # " << header.seq << "): sig = " << footer.sig << dendl;
} else {
ldout(msgr->cct, 20) << "Put signature in client message (seq # " << header.seq << "): sig = " << footer.sig << dendl;
}
}
bufferlist blist = m->get_payload();
blist.append(m->get_middle());
blist.append(m->get_data());
ldout(msgr->cct,20) << "write_message " << m << dendl;
// set up msghdr and iovecs
struct msghdr msg;
memset(&msg, 0, sizeof(msg));

View File

@ -178,7 +178,7 @@ class DispatchQueue;
int randomize_out_seq();
int read_message(Message **pm);
int write_message(Message *m);
int write_message(ceph_msg_header& h, ceph_msg_footer& f, bufferlist& body);
/**
* Write the given data (of length len) to the Pipe's socket. This function
* will loop until all passed data has been written out.

View File

@ -23,6 +23,7 @@
#include "include/compat.h"
#include <fcntl.h>
#include <limits.h>
#include <sstream>
#include <stdio.h>
#include <sys/types.h>

View File

@ -399,10 +399,10 @@ void Objecter::scan_requests(bool skipped_map,
list<LingerOp*>& need_resend_linger)
{
// check for changed linger mappings (_before_ regular ops)
for (map<tid_t,LingerOp*>::iterator p = linger_ops.begin();
p != linger_ops.end();
p++) {
LingerOp *op = p->second;
map<tid_t,LingerOp*>::iterator lp = linger_ops.begin();
while (lp != linger_ops.end()) {
LingerOp *op = lp->second;
++lp; // check_linger_pool_dne() may touch linger_ops; prevent iterator invalidation
ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
int r = recalc_linger_op_target(op);
switch (r) {