SimpleMessenger: de-globalize

Signed-off-by: Colin McCabe <colin.mccabe@dreamhost.com>
This commit is contained in:
Colin Patrick McCabe 2011-06-15 15:24:29 -07:00
parent 8069e83c0f
commit 8a0d4864cf
2 changed files with 375 additions and 371 deletions

File diff suppressed because it is too large Load Diff

View File

@ -96,11 +96,11 @@ private:
// incoming
class Accepter : public Thread {
public:
SimpleMessenger *messenger;
SimpleMessenger *msgr;
bool done;
int listen_sd;
Accepter(SimpleMessenger *r) : messenger(r), done(false), listen_sd(-1) {}
Accepter(SimpleMessenger *r) : msgr(r), done(false), listen_sd(-1) {}
void *entry();
void stop();
@ -114,7 +114,7 @@ private:
// pipe
class Pipe : public RefCountedObject {
public:
SimpleMessenger *messenger;
SimpleMessenger *msgr;
ostream& _pipe_prefix(std::ostream *_dout);
enum {
@ -178,20 +178,20 @@ private:
/* Clean up sent list */
void handle_ack(uint64_t seq) {
dout(15) << "reader got ack seq " << seq << dendl;
ldout(msgr->cct, 15) << "reader got ack seq " << seq << dendl;
// trim sent list
while (!sent.empty() &&
sent.front()->get_seq() <= seq) {
Message *m = sent.front();
sent.pop_front();
dout(10) << "reader got ack seq "
ldout(msgr->cct, 10) << "reader got ack seq "
<< seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl;
m->put();
}
if (sent.empty() && close_on_empty) {
// this is slightly hacky
dout(10) << "reader got last ack, queue empty, closing" << dendl;
ldout(msgr->cct, 10) << "reader got last ack, queue empty, closing" << dendl;
policy.lossy = true;
fault();
}
@ -216,7 +216,7 @@ private:
public:
Pipe(SimpleMessenger *r, int st) :
messenger(r),
msgr(r),
sd(-1),
peer_type(-1),
pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
@ -229,9 +229,9 @@ private:
out_seq(0), in_seq(0), in_seq_acked(0),
reader_thread(this), writer_thread(this) {
connection_state->pipe = get();
messenger->timeout = g_conf->ms_tcp_read_timeout * 1000; //convert to ms
if (messenger->timeout == 0)
messenger->timeout = -1;
msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms
if (msgr->timeout == 0)
msgr->timeout = -1;
}
~Pipe() {
for (map<int, xlist<Pipe *>::item* >::iterator i = queue_items.begin();
@ -251,13 +251,13 @@ private:
assert(pipe_lock.is_locked());
assert(!reader_running);
reader_running = true;
reader_thread.create(g_conf->ms_rwthread_stack_bytes);
reader_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes);
}
void start_writer() {
assert(pipe_lock.is_locked());
assert(!writer_running);
writer_running = true;
writer_thread.create(g_conf->ms_rwthread_stack_bytes);
writer_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes);
}
void join_reader() {
if (!reader_running)
@ -281,7 +281,7 @@ private:
void queue_received(Message *m, int priority);
void queue_received(Message *m) {
m->set_recv_stamp(ceph_clock_now(&g_ceph_context));
m->set_recv_stamp(ceph_clock_now(msgr->cct));
// this is just to make sure that a changeset is working
// properly; if you start using the refcounting more and have
@ -485,13 +485,13 @@ private:
// reaper
class ReaperThread : public Thread {
SimpleMessenger *messenger;
SimpleMessenger *msgr;
public:
ReaperThread(SimpleMessenger *m) : messenger(m) {}
ReaperThread(SimpleMessenger *m) : msgr(m) {}
void *entry() {
messenger->get();
messenger->reaper_entry();
messenger->put();
msgr->get();
msgr->reaper_entry();
msgr->put();
return 0;
}
} reaper_thread;
@ -533,20 +533,20 @@ private:
private:
class DispatchThread : public Thread {
SimpleMessenger *messenger;
SimpleMessenger *msgr;
public:
DispatchThread(SimpleMessenger *_messenger) : messenger(_messenger) {}
DispatchThread(SimpleMessenger *_messenger) : msgr(_messenger) {}
void *entry() {
messenger->get();
messenger->dispatch_entry();
messenger->put();
msgr->get();
msgr->dispatch_entry();
msgr->put();
return 0;
}
} dispatch_thread;
void dispatch_entry();
SimpleMessenger *messenger; //hack to make dout macro work, will fix
SimpleMessenger *msgr; //hack to make dout macro work, will fix
int timeout;
public:
@ -554,11 +554,11 @@ public:
Messenger(cct, entity_name_t()),
accepter(this),
lock("SimpleMessenger::lock"), started(false), did_bind(false),
dispatch_throttler(g_conf->ms_dispatch_throttle_bytes), need_addr(true),
dispatch_throttler(cct->_conf->ms_dispatch_throttle_bytes), need_addr(true),
destination_stopped(true), my_type(-1),
global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
reaper_thread(this), reaper_started(false), reaper_stop(false),
dispatch_thread(this), messenger(this) {
dispatch_thread(this), msgr(this) {
// for local dmsg delivery
dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);
}
@ -570,7 +570,7 @@ public:
int bind(entity_addr_t bind_addr, int64_t nonce);
int bind(uint64_t nonce) {
return bind(g_conf->public_addr, nonce);
return bind(cct->_conf->public_addr, nonce);
}
int start_with_nonce(uint64_t nonce); // if we didn't bind
int start() { // if we did