Add priority option to AsyncReserver class

Add priority to request_reservation()
Change to map of lists by prioriry
Add priority to queue_pointers mappped type

Signed-off-by: David Zafman <david.zafman@inktank.com>
This commit is contained in:
David Zafman 2013-05-07 15:58:48 -07:00
parent df049c1c1f
commit 09163a3bbc
2 changed files with 29 additions and 17 deletions

View File

@ -10,9 +10,9 @@ incoming backfills on a single node.
Each OSDService now has two AsyncReserver instances: one for backfills going
from the osd (local_reserver) and one for backfills going to the osd
(remote_reserver). An AsyncReserver (common/AsyncReserver.h) manages a queue
of waiting items and a set of current reservation holders. When a slot frees
up, the AsyncReserver queues the Context* associated with the next item in the
finisher provided to the constructor.
by priority of waiting items and a set of current reservation holders. When a
slot frees up, the AsyncReserver queues the Context* associated with the next
item on the highest priority queue in the finisher provided to the constructor.
For a primary to initiate a backfill, it must first obtain a reservation from
its own local_reserver. Then, it must obtain a reservation from the backfill

View File

@ -24,6 +24,10 @@
/**
* Manages a configurable number of asyncronous reservations.
*
* Memory usage is linear with the number of items queued and
* linear with respect to the total number of priorities used
* over all time.
*/
template <typename T>
class AsyncReserver {
@ -31,18 +35,23 @@ class AsyncReserver {
unsigned max_allowed;
Mutex lock;
list<pair<T, Context*> > queue;
map<T, typename list<pair<T, Context*> >::iterator > queue_pointers;
map<unsigned, list<pair<T, Context*> > > queues;
map<T, pair<unsigned, typename list<pair<T, Context*> >::iterator > > queue_pointers;
set<T> in_progress;
void do_queues() {
while (in_progress.size() < max_allowed &&
!queue.empty()) {
pair<T, Context*> p = queue.front();
queue_pointers.erase(p.first);
queue.pop_front();
f->queue(p.second);
in_progress.insert(p.first);
typename map<unsigned, list<pair<T, Context*> > >::reverse_iterator it;
for (it = queues.rbegin();
it != queues.rend() && in_progress.size() < max_allowed;
++it) {
while (in_progress.size() < max_allowed &&
!it->second.empty()) {
pair<T, Context*> p = it->second.front();
queue_pointers.erase(p.first);
it->second.pop_front();
f->queue(p.second);
in_progress.insert(p.first);
}
}
}
public:
@ -67,13 +76,14 @@ public:
*/
void request_reservation(
T item, ///< [in] reservation key
Context *on_reserved ///< [in] callback to be called on reservation
Context *on_reserved, ///< [in] callback to be called on reservation
unsigned prio
) {
Mutex::Locker l(lock);
assert(!queue_pointers.count(item) &&
!in_progress.count(item));
queue.push_back(make_pair(item, on_reserved));
queue_pointers.insert(make_pair(item, --queue.end()));
queues[prio].push_back(make_pair(item, on_reserved));
queue_pointers.insert(make_pair(item, make_pair(prio,--(queues[prio]).end())));
do_queues();
}
@ -89,14 +99,16 @@ public:
) {
Mutex::Locker l(lock);
if (queue_pointers.count(item)) {
delete queue_pointers[item]->second;
queue.erase(queue_pointers[item]);
unsigned prio = queue_pointers[item].first;
delete queue_pointers[item].second->second;
queues[prio].erase(queue_pointers[item].second);
queue_pointers.erase(item);
} else {
in_progress.erase(item);
}
do_queues();
}
static const unsigned MAX_PRIORITY = (unsigned)-1;
};
#endif