workqueue: non-inline worker, control methods; debugging

This commit is contained in:
Sage Weil 2008-12-10 12:15:00 -08:00
parent b3affe5edb
commit 642420aa44
8 changed files with 169 additions and 109 deletions

View File

@ -181,6 +181,7 @@ libcommon_a_SOURCES = \
common/sctp_crc32.c\
common/assert.cc \
common/debug.cc \
common/WorkQueue.cc \
mon/MonMap.cc \
mon/MonClient.cc \
osd/OSDMap.cc \

120
src/common/WorkQueue.cc Normal file
View File

@ -0,0 +1,120 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "WorkQueue.h"
#include "config.h"
#define DOUT_SUBSYS tp
#undef dout_prefix
#define dout_prefix *_dout << dbeginl << pthread_self() << " " << name << " "
void ThreadPool::worker()
{
_lock.Lock();
dout(10) << "worker start" << dendl;
while (!_stop) {
if (!_pause && work_queues.size()) {
_WorkQueue *wq;
int tries = work_queues.size();
bool did = false;
while (tries--) {
last_work_queue++;
last_work_queue %= work_queues.size();
wq = work_queues[last_work_queue];
void *item = wq->_void_dequeue();
if (item) {
processing++;
dout(12) << "worker wq " << wq->name << " start processing " << item << dendl;
_lock.Unlock();
wq->_void_process(item);
_lock.Lock();
dout(15) << "worker wq " << wq->name << " done processing " << item << dendl;
processing--;
if (_pause)
_wait_cond.Signal();
did = true;
break;
}
}
if (did)
continue;
}
dout(15) << "worker waiting" << dendl;
_cond.Wait(_lock);
}
dout(0) << "worker finish" << dendl;
_lock.Unlock();
}
void ThreadPool::start()
{
dout(10) << "start" << dendl;
for (set<WorkThread*>::iterator p = _threads.begin();
p != _threads.end();
p++)
(*p)->create();
dout(15) << "started" << dendl;
}
void ThreadPool::stop(bool clear_after)
{
dout(10) << "stop" << dendl;
_lock.Lock();
_stop = true;
_cond.Signal();
_lock.Unlock();
for (set<WorkThread*>::iterator p = _threads.begin();
p != _threads.end();
p++)
(*p)->join();
_lock.Lock();
for (unsigned i=0; i<work_queues.size(); i++)
work_queues[i]->_clear();
_lock.Unlock();
dout(15) << "stopped" << dendl;
}
void ThreadPool::pause()
{
dout(10) << "pause" << dendl;
_lock.Lock();
assert(!_pause);
_pause = true;
while (processing)
_wait_cond.Wait(_lock);
_lock.Unlock();
dout(15) << "paused" << dendl;
}
void ThreadPool::pause_new()
{
dout(10) << "pause_new" << dendl;
_lock.Lock();
assert(!_pause);
_pause = true;
_lock.Unlock();
}
void ThreadPool::unpause()
{
dout(10) << "unpause" << dendl;
_lock.Lock();
assert(_pause);
_pause = false;
_cond.Signal();
_lock.Unlock();
}

View File

@ -19,7 +19,8 @@
#include "Cond.h"
#include "Thread.h"
class WorkThreadPool {
class ThreadPool {
string name;
Mutex _lock;
Cond _cond;
bool _stop, _pause;
@ -28,14 +29,15 @@ class WorkThreadPool {
struct _WorkQueue {
string name;
_WorkQueue(string n) : name(n) {}
virtual bool _try_process() = 0;
virtual void _clear() = 0;
virtual void *_void_dequeue() = 0;
virtual void _void_process(void *) = 0;
};
public:
template<class T>
class WorkQueue : public _WorkQueue {
WorkThreadPool *pool;
ThreadPool *pool;
virtual bool _enqueue(T *) = 0;
virtual void _dequeue(T *) = 0;
@ -43,8 +45,15 @@ public:
virtual void _process(T *) = 0;
virtual void _clear() = 0;
void *_void_dequeue() {
return (void *)_dequeue();
}
void _void_process(void *p) {
_process((T *)p);
}
public:
WorkQueue(string n, WorkThreadPool *p) : _WorkQueue(n), pool(p) {
WorkQueue(string n, ThreadPool *p) : _WorkQueue(n), pool(p) {
pool->add_work_queue(this);
}
~WorkQueue() {
@ -69,17 +78,6 @@ public:
pool->_lock.Unlock();
}
bool _try_process() {
T *item = _dequeue();
if (item) {
pool->_lock.Unlock();
_process(item);
pool->_lock.Lock();
return true;
}
return false;
}
void lock() {
pool->lock();
}
@ -99,10 +97,10 @@ private:
// threads
struct WorkThread : public Thread {
WorkThreadPool *pool;
WorkThread(WorkThreadPool *p) : pool(p) {}
ThreadPool *pool;
WorkThread(ThreadPool *p) : pool(p) {}
void *entry() {
pool->entry();
pool->worker();
return 0;
}
};
@ -110,42 +108,11 @@ private:
set<WorkThread*> _threads;
int processing;
void entry() {
_lock.Lock();
//generic_dout(0) << "entry start" << dendl;
while (!_stop) {
if (!_pause && work_queues.size()) {
_WorkQueue *wq;
int tries = work_queues.size();
bool did = false;
while (tries--) {
last_work_queue++;
last_work_queue %= work_queues.size();
wq = work_queues[last_work_queue];
processing++;
//generic_dout(0) << "entry trying wq " << wq->name << dendl;
did = wq->_try_process();
processing--;
//if (did) generic_dout(0) << "entry did wq " << wq->name << dendl;
if (did && _pause)
_wait_cond.Signal();
if (did)
break;
}
if (did)
continue;
}
//generic_dout(0) << "entry waiting" << dendl;
_cond.Wait(_lock);
}
//generic_dout(0) << "entry finish" << dendl;
_lock.Unlock();
}
void worker();
public:
WorkThreadPool(string name, int n=1) :
ThreadPool(string nm, int n=1) :
name(nm),
_lock((new string(name + "::lock"))->c_str()), // deliberately leak this
_stop(false),
_pause(false),
@ -153,7 +120,7 @@ public:
processing(0) {
set_num_threads(n);
}
~WorkThreadPool() {
~ThreadPool() {
for (set<WorkThread*>::iterator p = _threads.begin();
p != _threads.end();
p++)
@ -180,26 +147,6 @@ public:
}
}
void start() {
for (set<WorkThread*>::iterator p = _threads.begin();
p != _threads.end();
p++)
(*p)->create();
}
void stop(bool clear_after=true) {
_lock.Lock();
_stop = true;
_cond.Signal();
_lock.Unlock();
for (set<WorkThread*>::iterator p = _threads.begin();
p != _threads.end();
p++)
(*p)->join();
_lock.Lock();
for (unsigned i=0; i<work_queues.size(); i++)
work_queues[i]->_clear();
_lock.Unlock();
}
void kick() {
_lock.Lock();
_cond.Signal();
@ -216,29 +163,11 @@ public:
_lock.Unlock();
}
void pause() {
_lock.Lock();
assert(!_pause);
_pause = true;
while (processing)
_wait_cond.Wait(_lock);
_lock.Unlock();
}
void pause_new() {
_lock.Lock();
assert(!_pause);
_pause = true;
_lock.Unlock();
}
void unpause() {
_lock.Lock();
assert(_pause);
_pause = false;
_cond.Signal();
_lock.Unlock();
}
void start();
void stop(bool clear_after=true);
void pause();
void pause_new();
void unpause();
};

View File

@ -241,6 +241,7 @@ md_config_t g_conf = {
debug_ms: 0,
debug_mon: 1,
debug_paxos: 0,
debug_tp: 0,
debug_after: 0,
@ -808,6 +809,11 @@ void parse_config_options(std::vector<const char*>& args, bool open)
g_conf.debug_paxos = atoi(args[++i]);
else
g_debug_after_conf.debug_paxos = atoi(args[++i]);
else if (strcmp(args[i], "--debug_tp") == 0)
if (!g_conf.debug_after)
g_conf.debug_tp = atoi(args[++i]);
else
g_debug_after_conf.debug_tp = atoi(args[++i]);
else if (strcmp(args[i], "--debug_after") == 0) {
g_conf.debug_after = atoi(args[++i]);

View File

@ -104,6 +104,7 @@ struct md_config_t {
int debug_ms;
int debug_mon;
int debug_paxos;
int debug_tp;
int debug_after;

View File

@ -41,8 +41,13 @@ void *valloc(size_t);
#endif
#include <iostream>
#include <istream>
#include <iomanip>
#include <list>
#include <string>
using std::istream;
using std::string;
#include "atomic.h"
#include "page.h"

View File

@ -18,7 +18,6 @@
#include "ObjectStore.h"
#include "JournalingObjectStore.h"
#include "common/ThreadPool.h"
#include "common/Mutex.h"
#include "Fake.h"

View File

@ -19,7 +19,6 @@
#include "common/Mutex.h"
#include "common/RWLock.h"
#include "common/ThreadPool.h"
#include "common/Timer.h"
#include "common/WorkQueue.h"
#include "common/LogClient.h"
@ -114,9 +113,9 @@ public:
private:
WorkThreadPool op_tp;
WorkThreadPool recovery_tp;
WorkThreadPool disk_tp;
ThreadPool op_tp;
ThreadPool recovery_tp;
ThreadPool disk_tp;
@ -271,9 +270,9 @@ private:
// -- op queue --
deque<PG*> op_queue;
struct OpWQ : public WorkThreadPool::WorkQueue<PG> {
struct OpWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
OpWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue<PG>("OSD::OpWQ", tp), osd(o) {}
OpWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::OpWQ", tp), osd(o) {}
bool _enqueue(PG *pg) {
pg->get();
@ -502,9 +501,9 @@ private:
utime_t defer_recovery_until;
int recovery_ops_active;
struct RecoveryWQ : public WorkThreadPool::WorkQueue<PG> {
struct RecoveryWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
RecoveryWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", tp), osd(o) {}
RecoveryWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", tp), osd(o) {}
bool _enqueue(PG *pg) {
if (!pg->recovery_item.get_xlist()) {
@ -578,9 +577,9 @@ private:
// -- snap trimming --
xlist<PG*> snap_trim_queue;
struct SnapTrimWQ : public WorkThreadPool::WorkQueue<PG> {
struct SnapTrimWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
SnapTrimWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue<PG>("OSD::SnapTrimWQ", tp), osd(o) {}
SnapTrimWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::SnapTrimWQ", tp), osd(o) {}
bool _enqueue(PG *pg) {
if (pg->snap_trim_item.is_on_xlist())
@ -610,9 +609,9 @@ private:
// -- scrubbing --
xlist<PG*> scrub_queue;
struct ScrubWQ : public WorkThreadPool::WorkQueue<PG> {
struct ScrubWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
ScrubWQ(OSD *o, WorkThreadPool *tp) : WorkThreadPool::WorkQueue<PG>("OSD::ScrubWQ", tp), osd(o) {}
ScrubWQ(OSD *o, ThreadPool *tp) : ThreadPool::WorkQueue<PG>("OSD::ScrubWQ", tp), osd(o) {}
bool _enqueue(PG *pg) {
if (pg->scrub_item.is_on_xlist())