Merge pull request #2631 from ceph/wip-rwtimer

DNM: osdc/Objecter: use SafeTimer; make callbacks race-tolerant

Reviewed-by: Yehuda Sadeh <yehuda@redhat.com>
This commit is contained in:
Sage Weil 2014-10-07 11:41:11 -07:00
commit 887652f66c
2 changed files with 41 additions and 22 deletions

View File

@ -267,7 +267,9 @@ void Objecter::init()
<< cpp_strerror(ret) << dendl;
}
timer_lock.Lock();
timer.init();
timer_lock.Unlock();
initialized.set(1);
}
@ -369,8 +371,9 @@ void Objecter::shutdown()
}
if (tick_event) {
timer.cancel_event(tick_event);
tick_event = NULL;
Mutex::Locker l(timer_lock);
if (timer.cancel_event(tick_event))
tick_event = NULL;
}
if (m_request_state_hook) {
@ -386,7 +389,10 @@ void Objecter::shutdown()
logger = NULL;
}
timer.shutdown();
{
Mutex::Locker l(timer_lock);
timer.shutdown();
}
}
@ -1522,6 +1528,7 @@ void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend)
void Objecter::schedule_tick()
{
Mutex::Locker l(timer_lock);
assert(tick_event == NULL);
tick_event = new C_Tick(this);
timer.add_event_after(cct->_conf->objecter_tick_interval, tick_event);
@ -1529,20 +1536,19 @@ void Objecter::schedule_tick()
void Objecter::tick()
{
if (!initialized.read()) {
schedule_tick();
return;
}
assert(rwlock.is_locked());
RWLock::RLocker rl(rwlock);
ldout(cct, 10) << "tick" << dendl;
assert(initialized.read());
// we are only called by C_Tick
assert(tick_event);
tick_event = NULL;
if (!initialized.read()) {
// we raced with shutdown
return;
}
set<OSDSession*> toping;
int r = 0;
@ -1659,13 +1665,12 @@ void Objecter::resend_mon_ops()
class C_CancelOp : public Context
{
Objecter::Op *op;
ceph_tid_t tid;
Objecter *objecter;
public:
C_CancelOp(Objecter::Op *op, Objecter *objecter) : op(op),
objecter(objecter) {}
C_CancelOp(ceph_tid_t t, Objecter *objecter) : tid(t), objecter(objecter) {}
void finish(int r) {
objecter->op_cancel(op->session, op->tid, -ETIMEDOUT);
objecter->op_cancel(tid, -ETIMEDOUT);
}
};
@ -1684,16 +1689,19 @@ ceph_tid_t Objecter::_op_submit_with_budget(Op *op, RWLock::Context& lc)
assert(op->ops.size() == op->out_rval.size());
assert(op->ops.size() == op->out_handler.size());
if (osd_timeout > 0) {
op->ontimeout = new C_CancelOp(op, this);
timer.add_event_after(osd_timeout, op->ontimeout);
}
// throttle. before we look at any state, because
// take_op_budget() may drop our lock while it blocks.
_take_op_budget(op);
return _op_submit(op, lc);
ceph_tid_t tid = _op_submit(op, lc);
if (osd_timeout > 0) {
Mutex::Locker l(timer_lock);
op->ontimeout = new C_CancelOp(tid, this);
timer.add_event_after(osd_timeout, op->ontimeout);
}
return tid;
}
ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc)
@ -2291,6 +2299,7 @@ void Objecter::_finish_op(Op *op)
put_op_budget(op);
if (op->ontimeout) {
Mutex::Locker l(timer_lock);
timer.cancel_event(op->ontimeout);
}
@ -2991,6 +3000,7 @@ void Objecter::pool_op_submit(PoolOp *op)
{
assert(rwlock.is_locked());
if (mon_timeout > 0) {
Mutex::Locker l(timer_lock);
op->ontimeout = new C_CancelPoolOp(op->tid, this);
timer.add_event_after(mon_timeout, op->ontimeout);
}
@ -3094,6 +3104,7 @@ void Objecter::_finish_pool_op(PoolOp *op)
logger->set(l_osdc_poolop_active, pool_ops.size());
if (op->ontimeout) {
Mutex::Locker l(timer_lock);
timer.cancel_event(op->ontimeout);
}
@ -3127,6 +3138,7 @@ void Objecter::get_pool_stats(list<string>& pools, map<string,pool_stat_t> *resu
op->onfinish = onfinish;
op->ontimeout = NULL;
if (mon_timeout > 0) {
Mutex::Locker l(timer_lock);
op->ontimeout = new C_CancelPoolStatOp(op->tid, this);
timer.add_event_after(mon_timeout, op->ontimeout);
}
@ -3202,6 +3214,7 @@ void Objecter::_finish_pool_stat_op(PoolStatOp *op)
logger->set(l_osdc_poolstat_active, poolstat_ops.size());
if (op->ontimeout) {
Mutex::Locker l(timer_lock);
timer.cancel_event(op->ontimeout);
}
@ -3231,6 +3244,7 @@ void Objecter::get_fs_stats(ceph_statfs& result, Context *onfinish)
op->onfinish = onfinish;
op->ontimeout = NULL;
if (mon_timeout > 0) {
Mutex::Locker l(timer_lock);
op->ontimeout = new C_CancelStatfsOp(op->tid, this);
timer.add_event_after(mon_timeout, op->ontimeout);
}
@ -3305,6 +3319,7 @@ void Objecter::_finish_statfs_op(StatfsOp *op)
logger->set(l_osdc_statfs_active, statfs_ops.size());
if (op->ontimeout) {
Mutex::Locker l(timer_lock);
timer.cancel_event(op->ontimeout);
}
@ -3744,6 +3759,7 @@ int Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
(void)_calc_command_target(c);
_assign_command_session(c);
if (osd_timeout > 0) {
Mutex::Locker l(timer_lock);
c->ontimeout = new C_CancelCommandOp(c->session, tid, this);
timer.add_event_after(osd_timeout, c->ontimeout);
}
@ -3878,6 +3894,7 @@ void Objecter::_finish_command(CommandOp *c, int r, string rs)
c->onfinish->complete(r);
if (c->ontimeout) {
Mutex::Locker l(timer_lock);
timer.cancel_event(c->ontimeout);
}

View File

@ -1035,7 +1035,8 @@ private:
version_t last_seen_pgmap_version;
RWLock rwlock;
RWTimer timer;
Mutex timer_lock;
SafeTimer timer;
PerfCounters *logger;
@ -1602,7 +1603,8 @@ public:
last_seen_osdmap_version(0),
last_seen_pgmap_version(0),
rwlock("Objecter::rwlock"),
timer(cct, rwlock),
timer_lock("Objecter::timer_lock"),
timer(cct, timer_lock, false),
logger(NULL), tick_event(NULL),
m_request_state_hook(NULL),
num_homeless_ops(0),