From 23d4488e376de8f1a5c369365d91021ee8c9a324 Mon Sep 17 00:00:00 2001 From: Michal Jarzabek Date: Sun, 26 Jun 2016 08:10:39 +0100 Subject: [PATCH] msg/AsyncMessenger: move Worker class to cc file Signed-off-by: Michal Jarzabek --- src/msg/async/AsyncMessenger.cc | 51 ++++++++++++++++++++++++++++++++ src/msg/async/AsyncMessenger.h | 52 ++------------------------------- 2 files changed, 53 insertions(+), 50 deletions(-) diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 02610b53731..52e71e0e18e 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -50,6 +50,47 @@ static ostream& _prefix(std::ostream *_dout, WorkerPool *p) { } +class Worker : public Thread { + static const uint64_t InitEventNumber = 5000; + static const uint64_t EventMaxWaitUs = 30000000; + CephContext *cct; + WorkerPool *pool; + bool done; + int id; + PerfCounters *perf_logger; + + public: + EventCenter center; + std::atomic_uint references; + Worker(CephContext *c, WorkerPool *p, int i) + : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) { + center.init(InitEventNumber); + char name[128]; + sprintf(name, "AsyncMessenger::Worker-%d", id); + // initialize perf_logger + PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last); + + plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages"); + plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages"); + plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages"); + plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes"); + plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes"); + plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number"); + plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number"); + + perf_logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(perf_logger); + } + ~Worker() { + if (perf_logger) { + cct->get_perfcounters_collection()->remove(perf_logger); + delete perf_logger; + } + } + void *entry(); + void stop(); + PerfCounters *get_perf_counter() { return perf_logger; } +}; /******************* * Processor @@ -850,6 +891,16 @@ int AsyncMessenger::get_proto_version(int peer_type, bool connect) return 0; } +void AsyncMessenger::unregister_conn(AsyncConnectionRef conn) { + Mutex::Locker l(deleted_lock); + conn->release_worker(); + deleted_conns.insert(conn); + + if (deleted_conns.size() >= ReapDeadConnectionThreshold) { + local_worker->center.dispatch_event_external(reap_handler); + } +} + void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) { // be careful here: multiple threads may block here, and readers of diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 2b8570cb6d1..eacab4f1017 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -56,47 +56,7 @@ enum { }; -class Worker : public Thread { - static const uint64_t InitEventNumber = 5000; - static const uint64_t EventMaxWaitUs = 30000000; - CephContext *cct; - WorkerPool *pool; - bool done; - int id; - PerfCounters *perf_logger; - - public: - EventCenter center; - std::atomic_uint references; - Worker(CephContext *c, WorkerPool *p, int i) - : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) { - center.init(InitEventNumber); - char name[128]; - sprintf(name, "AsyncMessenger::Worker-%d", id); - // initialize perf_logger - PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last); - - plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages"); - plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages"); - plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages"); - plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes"); - plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes"); - plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number"); - plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number"); - - perf_logger = plb.create_perf_counters(); - cct->get_perfcounters_collection()->add(perf_logger); - } - ~Worker() { - if (perf_logger) { - cct->get_perfcounters_collection()->remove(perf_logger); - delete perf_logger; - } - } - void *entry(); - void stop(); - PerfCounters *get_perf_counter() { return perf_logger; } -}; +class Worker; /** * If the Messenger binds to a specific address, the Processor runs @@ -481,15 +441,7 @@ public: * * See "deleted_conns" */ - void unregister_conn(AsyncConnectionRef conn) { - Mutex::Locker l(deleted_lock); - conn->release_worker(); - deleted_conns.insert(conn); - - if (deleted_conns.size() >= ReapDeadConnectionThreshold) { - local_worker->center.dispatch_event_external(reap_handler); - } - } + void unregister_conn(AsyncConnectionRef conn); /** * Reap dead connection from `deleted_conns`