Merge branch 'wip-cond'

Reviewed-by: Greg Farnum <greg@inktank.com>
This commit is contained in:
Sage Weil 2012-07-06 20:01:33 -07:00
commit dddf783fa4
6 changed files with 70 additions and 32 deletions

View File

@ -5143,8 +5143,12 @@ int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl)
r = objectcacher->file_read(&in->oset, &in->layout, in->snapid,
off, len, bl, 0, onfinish);
if (r == 0) {
client_lock.Unlock();
flock.Lock();
while (!done)
cond.Wait(client_lock);
cond.Wait(flock);
flock.Unlock();
client_lock.Lock();
r = rvalue;
} else {
// it was cached.
@ -5175,8 +5179,12 @@ int Client::_read_sync(Fh *f, uint64_t off, uint64_t len, bufferlist *bl)
pos, left, &tbl, 0,
in->truncate_size, in->truncate_seq,
onfinish);
client_lock.Unlock();
flock.Lock();
while (!done)
cond.Wait(client_lock);
cond.Wait(flock);
flock.Unlock();
client_lock.Lock();
if (r < 0)
return r;
@ -5342,8 +5350,12 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
in->truncate_size, in->truncate_seq,
onfinish, onsafe);
client_lock.Unlock();
flock.Lock();
while (!done)
cond.Wait(client_lock);
cond.Wait(flock);
flock.Unlock();
client_lock.Lock();
}
// time

View File

@ -29,12 +29,14 @@ class Cond {
// my bits
pthread_cond_t _c;
Mutex *waiter_mutex;
// don't allow copying.
void operator=(Cond &C) {}
Cond( const Cond &C ) {}
public:
Cond() {
Cond() : waiter_mutex(NULL) {
int r = pthread_cond_init(&_c,NULL);
assert(r == 0);
}
@ -44,15 +46,11 @@ class Cond {
int Wait(Mutex &mutex) {
assert(mutex.is_locked());
--mutex.nlock;
int r = pthread_cond_wait(&_c, &mutex._m);
++mutex.nlock;
return r;
}
int Wait(Mutex &mutex, char* s) {
//cout << "Wait: " << s << endl;
assert(mutex.is_locked());
// make sure this cond is used with one mutex only
assert(waiter_mutex == NULL || waiter_mutex == &mutex);
waiter_mutex = &mutex;
--mutex.nlock;
int r = pthread_cond_wait(&_c, &mutex._m);
++mutex.nlock;
@ -61,6 +59,11 @@ class Cond {
int WaitUntil(Mutex &mutex, utime_t when) {
assert(mutex.is_locked());
// make sure this cond is used with one mutex only
assert(waiter_mutex == NULL || waiter_mutex == &mutex);
waiter_mutex = &mutex;
struct timespec ts;
when.to_timespec(&ts);
--mutex.nlock;
@ -74,17 +77,31 @@ class Cond {
return WaitUntil(mutex, when);
}
int SloppySignal() {
int r = pthread_cond_broadcast(&_c);
return r;
}
int Signal() {
//int r = pthread_cond_signal(&_c);
// make sure signaler is holding the waiter's lock.
assert(waiter_mutex == NULL ||
waiter_mutex->is_locked());
int r = pthread_cond_broadcast(&_c);
return r;
}
int SignalOne() {
// make sure signaler is holding the waiter's lock.
assert(waiter_mutex == NULL ||
waiter_mutex->is_locked());
int r = pthread_cond_signal(&_c);
return r;
}
int SignalAll() {
//int r = pthread_cond_signal(&_c);
// make sure signaler is holding the waiter's lock.
assert(waiter_mutex == NULL ||
waiter_mutex->is_locked());
int r = pthread_cond_broadcast(&_c);
return r;
}

View File

@ -108,8 +108,11 @@ public:
void unlock() {
pool->unlock();
}
void kick() {
pool->kick();
void wake() {
pool->wake();
}
void _wake() {
pool->_wake();
}
void drain() {
pool->drain(this);
@ -168,8 +171,13 @@ public:
void unlock() {
pool->unlock();
}
void kick() {
pool->kick();
/// wake up the thread pool (without lock held)
void wake() {
pool->wake();
}
/// wake up the thread pool (with lock already held)
void _wake() {
pool->_wake();
}
void drain() {
pool->drain(this);
@ -251,8 +259,14 @@ public:
void wait(Cond &c) {
c.Wait(_lock);
}
/// wake up a waiter
void kick() {
/// wake up a waiter (with lock already held)
void _wake() {
_cond.Signal();
}
/// wake up a waiter (without lock held)
void wake() {
Mutex::Locker l(_lock);
_cond.Signal();
}

View File

@ -1382,10 +1382,6 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver,
int librados::IoCtxImpl::_notify_ack(const object_t& oid,
uint64_t notify_id, uint64_t ver)
{
Mutex mylock("IoCtxImpl::watch::mylock");
Cond cond;
eversion_t objver;
::ObjectOperation rd;
prepare_assert_ops(&rd);
rd.notify_ack(notify_id, ver);
@ -1600,8 +1596,10 @@ void librados::IoCtxImpl::C_NotifyComplete::notify(uint8_t opcode,
uint64_t ver,
bufferlist& bl)
{
lock->Lock();
*done = true;
cond->Signal();
lock->Unlock();
}
/////////////////////////// WatchContext ///////////////////////////////

View File

@ -734,7 +734,7 @@ int FileJournal::check_for_full(uint64_t seq, off64_t pos, off64_t size)
if (room < (header.max_size >> 1) &&
room + size > (header.max_size >> 1)) {
dout(10) << " passing half full mark, triggering commit" << dendl;
do_sync_cond->Signal(); // initiate a real commit so we can trim
do_sync_cond->SloppySignal(); // initiate a real commit so we can trim
}
}

View File

@ -1855,7 +1855,7 @@ void OSD::tick()
logger->set(l_osd_buf, buffer::get_total_alloc());
// periodically kick recovery work queue
recovery_tp.kick();
recovery_tp.wake();
if (service.scrub_should_schedule()) {
sched_scrub();
@ -2564,7 +2564,7 @@ void OSD::do_command(Connection *con, tid_t tid, vector<string>& cmd, bufferlist
<< "to " << g_conf->osd_recovery_delay_start;
defer_recovery_until = ceph_clock_now(g_ceph_context);
defer_recovery_until += g_conf->osd_recovery_delay_start;
recovery_wq.kick();
recovery_wq.wake();
}
}
@ -4917,7 +4917,7 @@ void OSD::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue)
recovery_wq._queue_front(pg);
}
recovery_wq.kick();
recovery_wq._wake();
recovery_wq.unlock();
}
@ -4926,10 +4926,7 @@ void OSD::defer_recovery(PG *pg)
dout(10) << "defer_recovery " << *pg << dendl;
// move pg to the end of the queue...
recovery_wq.lock();
recovery_wq._enqueue(pg);
recovery_wq.kick();
recovery_wq.unlock();
recovery_wq.queue(pg);
}