From c1f2a8026fe77848af0dd09b5799df6150e89e80 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Sun, 17 Jun 2012 16:23:20 -0700 Subject: [PATCH] OSD: peering_wq is now a BatchWorkQueue process_peering_events now handles multiple pgs at once to better batch up notifes, etc. Signed-off-by: Samuel Just --- src/common/WorkQueue.h | 15 ++++----------- src/osd/OSD.cc | 25 ++++++++++++------------- src/osd/OSD.h | 22 +++++++++++++--------- 3 files changed, 29 insertions(+), 33 deletions(-) diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index ed2bf6c7b3a..83f789aef84 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -50,22 +50,16 @@ public: template class BatchWorkQueue : public WorkQueue_ { ThreadPool *pool; - const size_t batch_size; virtual bool _enqueue(T *) = 0; virtual void _dequeue(T *) = 0; - virtual T *_dequeue() = 0; + virtual void _dequeue(list *) = 0; virtual void _process(const list &) = 0; virtual void _process_finish(const list &) {} void *_void_dequeue() { list *out(new list); - while (out->size() < batch_size) { - T *val = _dequeue(); - if (!val) - break; - out->push_back(val); - } + _dequeue(out); if (out->size()) { return (void *)out; } else { @@ -82,9 +76,8 @@ public: } public: - BatchWorkQueue(string n, time_t ti, time_t sti, ThreadPool* p, - size_t batch_size) : - WorkQueue_(n, ti, sti), pool(p), batch_size(batch_size) { + BatchWorkQueue(string n, time_t ti, time_t sti, ThreadPool* p) + : WorkQueue_(n, ti, sti), pool(p) { pool->add_work_queue(this); } ~BatchWorkQueue() { diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 1a675619f88..6b1789d9db0 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -692,7 +692,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, admin_ops_hook(NULL), op_queue_len(0), op_wq(this, g_conf->osd_op_thread_timeout, &op_tp), - peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp), + peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp, 200), map_lock("OSD::map_lock"), peer_map_epoch_lock("OSD::peer_map_epoch_lock"), outstanding_pg_stats(false), @@ -3497,8 +3497,6 @@ void OSD::advance_pg(epoch_t osd_epoch, PG *pg, PG::RecoveryCtx *rctx) lastmap = nextmap; } pg->handle_activate_map(rctx); - if (pg->dirty_info) - pg->write_info(*rctx->transaction); } /** @@ -5181,20 +5179,20 @@ void OSDService::queue_for_op(PG *pg) op_wq.queue(pg); } -void OSD::process_peering_event(PG *pg) +void OSD::process_peering_events(const list &pgs) { bool need_up_thru = false; - epoch_t same_interval_since; - OSDMapRef curmap; + epoch_t same_interval_since = 0; + OSDMapRef curmap = service.get_osdmap(); PG::RecoveryCtx rctx = create_context(); - { - map_lock.get_read(); + for (list::const_iterator i = pgs.begin(); + i != pgs.end(); + ++i) { + PG *pg = *i; pg->lock(); - curmap = osdmap; - map_lock.put_read(); if (pg->deleting) { pg->unlock(); - return; + continue; } advance_pg(curmap->get_epoch(), pg, &rctx); if (!pg->peering_queue.empty()) { @@ -5203,8 +5201,9 @@ void OSD::process_peering_event(PG *pg) pg->handle_peering_event(evt, &rctx); } dispatch_context_transaction(rctx, pg); - need_up_thru = pg->need_up_thru; - same_interval_since = pg->info.history.same_interval_since; + need_up_thru = pg->need_up_thru || need_up_thru; + same_interval_since = MAX(pg->info.history.same_interval_since, + same_interval_since); pg->unlock(); } if (need_up_thru) diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 529f3f99d28..f897e5b5ff9 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -168,7 +168,7 @@ public: PerfCounters *&logger; MonClient *&monc; ThreadPool::WorkQueue &op_wq; - ThreadPool::WorkQueue &peering_wq; + ThreadPool::BatchWorkQueue &peering_wq; ThreadPool::WorkQueue &recovery_wq; ThreadPool::WorkQueue &snap_trim_wq; ThreadPool::WorkQueue &scrub_wq; @@ -566,12 +566,12 @@ private: }; // -- peering queue -- - struct PeeringWQ : public ThreadPool::WorkQueue { + struct PeeringWQ : public ThreadPool::BatchWorkQueue { list peering_queue; OSD *osd; - PeeringWQ(OSD *o, time_t ti, ThreadPool *tp) - : ThreadPool::WorkQueue( - "OSD::PeeringWQ", ti, ti*10, tp), osd(o) {} + PeeringWQ(OSD *o, time_t ti, ThreadPool *tp, size_t batch_size) + : ThreadPool::BatchWorkQueue( + "OSD::PeeringWQ", ti, ti*10, tp, batch_size), osd(o) {} void _dequeue(PG *pg) { for (list::iterator i = peering_queue.begin(); @@ -598,16 +598,20 @@ private: peering_queue.pop_front(); return retval; } - void _process(PG *pg) { - osd->process_peering_event(pg); - pg->put(); + void _process(const list &pgs) { + osd->process_peering_events(pgs); + for (list::const_iterator i = pgs.begin(); + i != pgs.end(); + ++i) { + (*i)->put(); + } } void _clear() { assert(peering_queue.empty()); } } peering_wq; - void process_peering_event(PG *pg); + void process_peering_events(const list &pg); friend class PG; friend class ReplicatedPG;