msg/async: allow EventCenter::set_owner reentry

If daemonlize we need to respawn event threads, it need to allow set_owner again

Signed-off-by: Haomai Wang <haomai@xsky.com>
This commit is contained in:
Haomai Wang 2016-07-12 14:15:23 +08:00
parent 8d63e2c709
commit 154376b211
3 changed files with 14 additions and 11 deletions

View File

@ -135,16 +135,17 @@ EventCenter::~EventCenter()
void EventCenter::set_owner()
{
cct->lookup_or_create_singleton_object<EventCenter::AssociatedCenters>(
global_centers, "AsyncMessenger::EventCenter::global_center");
assert(global_centers && !global_centers->centers[idx]);
global_centers->centers[idx] = this;
owner = pthread_self();
ldout(cct, 1) << __func__ << " idx=" << idx << " owner=" << owner << dendl;
notify_handler = new C_handle_notify(this, cct);
int r = create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler);
assert(r == 0);
if (!global_centers) {
cct->lookup_or_create_singleton_object<EventCenter::AssociatedCenters>(
global_centers, "AsyncMessenger::EventCenter::global_center");
assert(global_centers && !global_centers->centers[idx]);
global_centers->centers[idx] = this;
notify_handler = new C_handle_notify(this, cct);
int r = create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler);
assert(r == 0);
}
}
int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)

View File

@ -127,7 +127,7 @@ class EventCenter {
NetHandler net;
EventCallbackRef notify_handler;
unsigned idx = 10000;
AssociatedCenters *global_centers;
AssociatedCenters *global_centers = nullptr;
int process_time_events();
FileEvent *_get_file_event(int fd) {
@ -147,6 +147,7 @@ class EventCenter {
int init(int nevent, unsigned idx);
void set_owner();
pthread_t get_owner() const { return owner; }
unsigned get_id() const { return idx; }
// Used by internal thread

View File

@ -30,9 +30,8 @@ void NetworkStack::add_thread(unsigned i, std::function<void ()> &thread)
Worker *w = workers[i];
thread = std::move(
[this, w]() {
const uint64_t InitEventNumber = 5000;
const uint64_t EventMaxWaitUs = 30000000;
w->center.init(InitEventNumber, w->id);
w->center.set_owner();
ldout(cct, 10) << __func__ << " starting" << dendl;
w->initialize();
w->init_done();
@ -68,9 +67,11 @@ Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned
NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c)
{
const uint64_t InitEventNumber = 5000;
num_workers = cct->_conf->ms_async_op_threads;
for (unsigned i = 0; i < num_workers; ++i) {
Worker *w = create_worker(cct, type, i);
w->center.init(InitEventNumber, i);
workers.push_back(w);
}
cct->register_fork_watcher(this);