Merge branch 'wip-msgr'

Reviewed-by: Greg Farnum <greg@inktank.com>
This commit is contained in:
Sage Weil 2012-09-04 12:17:13 -07:00
commit 46b86426e8
2 changed files with 13 additions and 23 deletions

View File

@ -963,7 +963,7 @@ void Pipe::discard_out_queue()
}
void Pipe::fault(bool onconnect, bool onread)
void Pipe::fault(bool onread)
{
const md_config_t *conf = msgr->cct->_conf;
assert(pipe_lock.is_locked());
@ -975,7 +975,7 @@ void Pipe::fault(bool onconnect, bool onread)
}
char buf[80];
if (!onconnect) ldout(msgr->cct,2) << "fault " << errno << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl;
ldout(msgr->cct,2) << "fault " << errno << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl;
if (state == STATE_CLOSED ||
state == STATE_CLOSING) {
@ -1013,34 +1013,24 @@ void Pipe::fault(bool onconnect, bool onread)
// requeue sent items
requeue_sent();
if (!is_queued()) {
if (state == STATE_CLOSING || onconnect) {
ldout(msgr->cct,10) << "fault on connect, or already closing, and q empty: setting closed." << dendl;
state = STATE_CLOSED;
return;
}
if (policy.standby) {
ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl;
state = STATE_STANDBY;
return;
}
if (policy.standby && !is_queued()) {
ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl;
state = STATE_STANDBY;
return;
}
if (state != STATE_CONNECTING) {
if (policy.server) {
ldout(msgr->cct,0) << "fault, server, going to standby" << dendl;
state = STATE_STANDBY;
} else {
if (!onconnect)
ldout(msgr->cct,0) << "fault, initiating reconnect" << dendl;
ldout(msgr->cct,0) << "fault, initiating reconnect" << dendl;
connect_seq++;
state = STATE_CONNECTING;
}
backoff = utime_t();
} else if (backoff == utime_t()) {
if (!onconnect)
ldout(msgr->cct,0) << "fault" << dendl;
ldout(msgr->cct,0) << "fault" << dendl;
backoff.set_from_double(conf->ms_initial_backoff);
} else {
ldout(msgr->cct,10) << "fault waiting " << backoff << dendl;
@ -1107,7 +1097,7 @@ void Pipe::reader()
if (tcp_read((char*)&tag, 1) < 0) {
pipe_lock.Lock();
ldout(msgr->cct,2) << "reader couldn't read tag, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
fault(false, true);
fault(true);
continue;
}
@ -1125,7 +1115,7 @@ void Pipe::reader()
pipe_lock.Lock();
if (rc < 0) {
ldout(msgr->cct,2) << "reader couldn't read ack seq, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
fault(false, true);
fault(true);
} else if (state != STATE_CLOSED) {
handle_ack(seq);
}
@ -1141,7 +1131,7 @@ void Pipe::reader()
if (!m) {
if (r < 0)
fault(false, true);
fault(true);
continue;
}
@ -1192,7 +1182,7 @@ void Pipe::reader()
else {
ldout(msgr->cct,0) << "reader bad tag " << (int)tag << dendl;
pipe_lock.Lock();
fault(false, true);
fault(true);
}
}

View File

@ -144,7 +144,7 @@ class DispatchQueue;
int write_ack(uint64_t s);
int write_keepalive();
void fault(bool onconnect=false, bool reader=false);
void fault(bool reader=false);
void was_session_reset();