OSD: peering_wq is now a BatchWorkQueue

process_peering_events now handles multiple pgs at once to better
batch up notifes, etc.

Signed-off-by: Samuel Just <sam.just@inktank.com>
This commit is contained in:
Samuel Just 2012-06-17 16:23:20 -07:00
parent d8a68e76ea
commit c1f2a8026f
3 changed files with 29 additions and 33 deletions

View File

@ -50,22 +50,16 @@ public:
template<class T>
class BatchWorkQueue : public WorkQueue_ {
ThreadPool *pool;
const size_t batch_size;
virtual bool _enqueue(T *) = 0;
virtual void _dequeue(T *) = 0;
virtual T *_dequeue() = 0;
virtual void _dequeue(list<T*> *) = 0;
virtual void _process(const list<T*> &) = 0;
virtual void _process_finish(const list<T*> &) {}
void *_void_dequeue() {
list<T*> *out(new list<T*>);
while (out->size() < batch_size) {
T *val = _dequeue();
if (!val)
break;
out->push_back(val);
}
_dequeue(out);
if (out->size()) {
return (void *)out;
} else {
@ -82,9 +76,8 @@ public:
}
public:
BatchWorkQueue(string n, time_t ti, time_t sti, ThreadPool* p,
size_t batch_size) :
WorkQueue_(n, ti, sti), pool(p), batch_size(batch_size) {
BatchWorkQueue(string n, time_t ti, time_t sti, ThreadPool* p)
: WorkQueue_(n, ti, sti), pool(p) {
pool->add_work_queue(this);
}
~BatchWorkQueue() {

View File

@ -692,7 +692,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
admin_ops_hook(NULL),
op_queue_len(0),
op_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp, 200),
map_lock("OSD::map_lock"),
peer_map_epoch_lock("OSD::peer_map_epoch_lock"),
outstanding_pg_stats(false),
@ -3497,8 +3497,6 @@ void OSD::advance_pg(epoch_t osd_epoch, PG *pg, PG::RecoveryCtx *rctx)
lastmap = nextmap;
}
pg->handle_activate_map(rctx);
if (pg->dirty_info)
pg->write_info(*rctx->transaction);
}
/**
@ -5181,20 +5179,20 @@ void OSDService::queue_for_op(PG *pg)
op_wq.queue(pg);
}
void OSD::process_peering_event(PG *pg)
void OSD::process_peering_events(const list<PG*> &pgs)
{
bool need_up_thru = false;
epoch_t same_interval_since;
OSDMapRef curmap;
epoch_t same_interval_since = 0;
OSDMapRef curmap = service.get_osdmap();
PG::RecoveryCtx rctx = create_context();
{
map_lock.get_read();
for (list<PG*>::const_iterator i = pgs.begin();
i != pgs.end();
++i) {
PG *pg = *i;
pg->lock();
curmap = osdmap;
map_lock.put_read();
if (pg->deleting) {
pg->unlock();
return;
continue;
}
advance_pg(curmap->get_epoch(), pg, &rctx);
if (!pg->peering_queue.empty()) {
@ -5203,8 +5201,9 @@ void OSD::process_peering_event(PG *pg)
pg->handle_peering_event(evt, &rctx);
}
dispatch_context_transaction(rctx, pg);
need_up_thru = pg->need_up_thru;
same_interval_since = pg->info.history.same_interval_since;
need_up_thru = pg->need_up_thru || need_up_thru;
same_interval_since = MAX(pg->info.history.same_interval_since,
same_interval_since);
pg->unlock();
}
if (need_up_thru)

View File

@ -168,7 +168,7 @@ public:
PerfCounters *&logger;
MonClient *&monc;
ThreadPool::WorkQueue<PG> &op_wq;
ThreadPool::WorkQueue<PG> &peering_wq;
ThreadPool::BatchWorkQueue<PG> &peering_wq;
ThreadPool::WorkQueue<PG> &recovery_wq;
ThreadPool::WorkQueue<PG> &snap_trim_wq;
ThreadPool::WorkQueue<PG> &scrub_wq;
@ -566,12 +566,12 @@ private:
};
// -- peering queue --
struct PeeringWQ : public ThreadPool::WorkQueue<PG> {
struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
list<PG*> peering_queue;
OSD *osd;
PeeringWQ(OSD *o, time_t ti, ThreadPool *tp)
: ThreadPool::WorkQueue<PG>(
"OSD::PeeringWQ", ti, ti*10, tp), osd(o) {}
PeeringWQ(OSD *o, time_t ti, ThreadPool *tp, size_t batch_size)
: ThreadPool::BatchWorkQueue<PG>(
"OSD::PeeringWQ", ti, ti*10, tp, batch_size), osd(o) {}
void _dequeue(PG *pg) {
for (list<PG*>::iterator i = peering_queue.begin();
@ -598,16 +598,20 @@ private:
peering_queue.pop_front();
return retval;
}
void _process(PG *pg) {
osd->process_peering_event(pg);
pg->put();
void _process(const list<PG *> &pgs) {
osd->process_peering_events(pgs);
for (list<PG *>::const_iterator i = pgs.begin();
i != pgs.end();
++i) {
(*i)->put();
}
}
void _clear() {
assert(peering_queue.empty());
}
} peering_wq;
void process_peering_event(PG *pg);
void process_peering_events(const list<PG*> &pg);
friend class PG;
friend class ReplicatedPG;