msg/Pipe: join previous reader threads

We may stop and then restart the reader thread.  Join previous threads
before we create new ones.

Signed-off-by: Sage Weil <sage@inktank.com>
This commit is contained in:
Sage Weil 2012-11-17 20:56:50 -08:00
parent c4caf871aa
commit c07c93e01d
2 changed files with 8 additions and 2 deletions

View File

@ -62,7 +62,7 @@ Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
state(st),
session_security(NULL),
connection_state(NULL),
reader_running(false), reader_joining(false), writer_running(false),
reader_running(false), reader_needs_join(false), reader_joining(false), writer_running(false),
in_q(&(r->dispatch_queue)),
keepalive(false),
close_on_empty(false),
@ -118,6 +118,10 @@ void Pipe::start_reader()
{
assert(pipe_lock.is_locked());
assert(!reader_running);
if (reader_needs_join) {
reader_thread.join();
reader_needs_join = false;
}
reader_running = true;
reader_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes);
}
@ -142,6 +146,7 @@ void Pipe::join_reader()
pipe_lock.Lock();
assert(reader_joining);
reader_joining = false;
reader_needs_join = false;
}
@ -1264,6 +1269,7 @@ void Pipe::reader()
// reap?
reader_running = false;
reader_needs_join = true;
unlock_maybe_reap();
ldout(msgr->cct,10) << "reader done" << dendl;
}

View File

@ -113,7 +113,7 @@ class DispatchQueue;
utime_t backoff; // backoff time
bool reader_running, reader_joining;
bool reader_running, reader_needs_join, reader_joining;
bool writer_running;
map<int, list<Message*> > out_q; // priority queue for outbound msgs