Merge pull request #9937 from stiopaa1/msg_asyncMess_moveWorkerClass

msg/AsyncMessenger: move Worker class to cc file

Reviewed-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2016-06-27 17:58:52 -04:00 committed by GitHub
commit 356d96dbd5
2 changed files with 53 additions and 50 deletions

View File

@ -50,6 +50,47 @@ static ostream& _prefix(std::ostream *_dout, WorkerPool *p) {
} }
class Worker : public Thread {
static const uint64_t InitEventNumber = 5000;
static const uint64_t EventMaxWaitUs = 30000000;
CephContext *cct;
WorkerPool *pool;
bool done;
int id;
PerfCounters *perf_logger;
public:
EventCenter center;
std::atomic_uint references;
Worker(CephContext *c, WorkerPool *p, int i)
: cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) {
center.init(InitEventNumber);
char name[128];
sprintf(name, "AsyncMessenger::Worker-%d", id);
// initialize perf_logger
PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last);
plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages");
plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages");
plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages");
plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes");
plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes");
plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number");
plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number");
perf_logger = plb.create_perf_counters();
cct->get_perfcounters_collection()->add(perf_logger);
}
~Worker() {
if (perf_logger) {
cct->get_perfcounters_collection()->remove(perf_logger);
delete perf_logger;
}
}
void *entry();
void stop();
PerfCounters *get_perf_counter() { return perf_logger; }
};
/******************* /*******************
* Processor * Processor
@ -850,6 +891,16 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect)
return 0; return 0;
} }
void AsyncMessenger::unregister_conn(AsyncConnectionRef conn) {
Mutex::Locker l(deleted_lock);
conn->release_worker();
deleted_conns.insert(conn);
if (deleted_conns.size() >= ReapDeadConnectionThreshold) {
local_worker->center.dispatch_event_external(reap_handler);
}
}
void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
{ {
// be careful here: multiple threads may block here, and readers of // be careful here: multiple threads may block here, and readers of

View File

@ -56,47 +56,7 @@ enum {
}; };
class Worker : public Thread { class Worker;
static const uint64_t InitEventNumber = 5000;
static const uint64_t EventMaxWaitUs = 30000000;
CephContext *cct;
WorkerPool *pool;
bool done;
int id;
PerfCounters *perf_logger;
public:
EventCenter center;
std::atomic_uint references;
Worker(CephContext *c, WorkerPool *p, int i)
: cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) {
center.init(InitEventNumber);
char name[128];
sprintf(name, "AsyncMessenger::Worker-%d", id);
// initialize perf_logger
PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last);
plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages");
plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages");
plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages");
plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes");
plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes");
plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number");
plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number");
perf_logger = plb.create_perf_counters();
cct->get_perfcounters_collection()->add(perf_logger);
}
~Worker() {
if (perf_logger) {
cct->get_perfcounters_collection()->remove(perf_logger);
delete perf_logger;
}
}
void *entry();
void stop();
PerfCounters *get_perf_counter() { return perf_logger; }
};
/** /**
* If the Messenger binds to a specific address, the Processor runs * If the Messenger binds to a specific address, the Processor runs
@ -481,15 +441,7 @@ public:
* *
* See "deleted_conns" * See "deleted_conns"
*/ */
void unregister_conn(AsyncConnectionRef conn) { void unregister_conn(AsyncConnectionRef conn);
Mutex::Locker l(deleted_lock);
conn->release_worker();
deleted_conns.insert(conn);
if (deleted_conns.size() >= ReapDeadConnectionThreshold) {
local_worker->center.dispatch_event_external(reap_handler);
}
}
/** /**
* Reap dead connection from `deleted_conns` * Reap dead connection from `deleted_conns`