msgr: move the delay queue initialization into start_reader

The Pipe doesn't know the peer type in the constructor. It
doesn't always know in start_reader either, so this needs more work,
but at least it knows more frequently than it did.

Signed-off-by: Greg Farnum <greg@inktank.com>
This commit is contained in:
Greg Farnum 2012-11-27 11:02:07 -08:00 committed by Sage Weil
parent 90f66980bf
commit 0e92f89204

View File

@ -87,15 +87,6 @@ Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms
if (msgr->timeout == 0)
msgr->timeout = -1;
if (msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(connection_state->peer_type))
!= string::npos) {
lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
dispatch_thread = new DelayedDelivery(this);
delay_queue = new std::deque< Message * >();
delay_lock = new Mutex("delay_lock");
delay_cond = new Cond();
}
}
Pipe::~Pipe()
@ -144,6 +135,19 @@ void Pipe::start_reader()
}
reader_running = true;
reader_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes);
if (!dispatch_thread &&
msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(connection_state->peer_type))
!= string::npos) {
lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
dispatch_thread = new DelayedDelivery(this);
delay_queue = new std::deque< Message * >();
delay_lock = new Mutex("delay_lock");
delay_cond = new Cond();
} else
lsubdout(msgr->cct, ms, 1) << "Pipe " << this << " peer is " << ceph_entity_type_name(connection_state->peer_type)
<< "; NOT injecting delays because it does not match "
<< msgr->cct->_conf->ms_inject_delay_type << dendl;
if (dispatch_thread && stop_delayed_delivery) {
lsubdout(msgr->cct, ms, 1) << "running delayed dispatch thread on Pipe " << this << dendl;
delay_lock->Lock();