osd: use intrusive list for session op queue

We can avoid these list<> allocations in the fast dispatch path.

Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2016-12-15 14:01:12 -05:00
parent 92d3e4328e
commit 6990fcd1c8
2 changed files with 52 additions and 26 deletions

View File

@ -3273,14 +3273,14 @@ PGRef OSD::get_pg_or_queue_for_pg(const spg_t& pgid, OpRequestRef& op,
if (i == pg_map.end())
session->waiting_for_pg[pgid];
map<spg_t, list<OpRequestRef> >::iterator wlistiter =
session->waiting_for_pg.find(pgid);
auto wlistiter = session->waiting_for_pg.find(pgid);
PG *out = NULL;
if (wlistiter == session->waiting_for_pg.end()) {
out = i->second;
} else {
wlistiter->second.push_back(op);
op->get();
wlistiter->second.push_back(*op);
register_session_waiting_on_pg(session, pgid);
}
return PGRef(out);
@ -6003,9 +6003,17 @@ void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
{
assert(session->session_dispatch_lock.is_locked());
assert(session->osdmap == osdmap);
for (list<OpRequestRef>::iterator i = session->waiting_on_map.begin();
i != session->waiting_on_map.end() && dispatch_op_fast(*i, osdmap);
session->waiting_on_map.erase(i++));
auto i = session->waiting_on_map.begin();
while (i != session->waiting_on_map.end()) {
OpRequest *op = &(*i);
session->waiting_on_map.erase(i++);
if (!dispatch_op_fast(op, osdmap)) {
session->waiting_on_map.push_front(*op);
break;
}
op->put();
}
if (session->waiting_on_map.empty()) {
clear_session_waiting_on_map(session);
@ -6029,16 +6037,14 @@ void OSD::update_waiting_for_pg(Session *session, OSDMapRef newmap)
assert(newmap->get_epoch() > session->osdmap->get_epoch());
map<spg_t, list<OpRequestRef> > from;
map<spg_t, boost::intrusive::list<OpRequest> > from;
from.swap(session->waiting_for_pg);
for (map<spg_t, list<OpRequestRef> >::iterator i = from.begin();
i != from.end();
from.erase(i++)) {
for (auto i = from.begin(); i != from.end(); from.erase(i++)) {
set<spg_t> children;
if (!newmap->have_pg_pool(i->first.pool())) {
// drop this wait list on the ground
i->second.clear();
i->second.clear_and_dispose(TrackedOp::Putter());
} else {
assert(session->osdmap->have_pg_pool(i->first.pool()));
if (i->first.is_split(
@ -6050,7 +6056,7 @@ void OSD::update_waiting_for_pg(Session *session, OSDMapRef newmap)
++child) {
unsigned split_bits = child->get_split_bits(
newmap->get_pg_num(child->pool()));
list<OpRequestRef> child_ops;
boost::intrusive::list<OpRequest> child_ops;
OSD::split_list(&i->second, &child_ops, child->ps(), split_bits);
if (!child_ops.empty()) {
session->waiting_for_pg[*child].swap(child_ops);
@ -6074,12 +6080,12 @@ void OSD::session_notify_pg_create(
{
assert(session->session_dispatch_lock.is_locked());
update_waiting_for_pg(session, osdmap);
map<spg_t, list<OpRequestRef> >::iterator i =
session->waiting_for_pg.find(pgid);
auto i = session->waiting_for_pg.find(pgid);
if (i != session->waiting_for_pg.end()) {
session->waiting_on_map.splice(
session->waiting_on_map.begin(),
i->second);
assert(i->second.empty());
session->waiting_for_pg.erase(i);
}
clear_session_waiting_on_pg(session, pgid);
@ -6090,7 +6096,11 @@ void OSD::session_notify_pg_cleared(
{
assert(session->session_dispatch_lock.is_locked());
update_waiting_for_pg(session, osdmap);
session->waiting_for_pg.erase(pgid);
auto i = session->waiting_for_pg.find(pgid);
if (i != session->waiting_for_pg.end()) {
i->second.clear_and_dispose(TrackedOp::Putter());
session->waiting_for_pg.erase(i);
}
session->maybe_reset_osdmap();
clear_session_waiting_on_pg(session, pgid);
}
@ -6116,7 +6126,8 @@ void OSD::ms_fast_dispatch(Message *m)
{
Mutex::Locker l(session->session_dispatch_lock);
update_waiting_for_pg(session, nextmap);
session->waiting_on_map.push_back(op);
op->get();
session->waiting_on_map.push_back(*op);
dispatch_session_waiting(session, nextmap);
}
session->put();
@ -6345,7 +6356,7 @@ void OSD::dispatch_op(OpRequestRef op)
}
}
bool OSD::dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap)
bool OSD::dispatch_op_fast(OpRequestRef op, OSDMapRef& osdmap)
{
if (is_stopping()) {
// we're shutting down, so drop the op

View File

@ -1240,7 +1240,7 @@ protected:
void tick_without_osd_lock();
void _dispatch(Message *m);
void dispatch_op(OpRequestRef op);
bool dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap);
bool dispatch_op_fast(OpRequestRef op, OSDMapRef& osdmap);
void check_osdmap_features(ObjectStore *store);
@ -1406,14 +1406,27 @@ public:
return false;
}
static void split_list(
boost::intrusive::list<OpRequest> *from,
boost::intrusive::list<OpRequest> *to,
unsigned match,
unsigned bits) {
for (auto i = from->begin(); i != from->end(); ) {
if (split_request(&(*i), match, bits)) {
OpRequest& o = *i;
i = from->erase(i);
to->push_back(o);
} else {
++i;
}
}
}
static void split_list(
list<OpRequestRef> *from,
list<OpRequestRef> *to,
unsigned match,
unsigned bits) {
for (list<OpRequestRef>::iterator i = from->begin();
i != from->end();
) {
for (auto i = from->begin(); i != from->end(); ) {
if (split_request(*i, match, bits)) {
to->push_back(*i);
from->erase(i++);
@ -1431,10 +1444,10 @@ public:
WatchConState wstate;
Mutex session_dispatch_lock;
list<OpRequestRef> waiting_on_map;
boost::intrusive::list<OpRequest> waiting_on_map;
OSDMapRef osdmap; /// Map as of which waiting_for_pg is current
map<spg_t, list<OpRequestRef> > waiting_for_pg;
map<spg_t, boost::intrusive::list<OpRequest> > waiting_for_pg;
Spinlock sent_epoch_lock;
epoch_t last_sent_epoch;
@ -1538,8 +1551,7 @@ private:
Mutex::Locker l(session->session_dispatch_lock);
clear_session_waiting_on_map(session);
for (map<spg_t, list<OpRequestRef> >::const_iterator i =
session->waiting_for_pg.cbegin();
for (auto i = session->waiting_for_pg.cbegin();
i != session->waiting_for_pg.cend();
++i) {
clear_session_waiting_on_pg(session, i->first);
@ -1550,7 +1562,10 @@ private:
* cycles which result.
* Bug #12338
*/
session->waiting_on_map.clear();
session->waiting_on_map.clear_and_dispose(TrackedOp::Putter());
for (auto& i : session->waiting_for_pg) {
i.second.clear_and_dispose(TrackedOp::Putter());
}
session->waiting_for_pg.clear();
session->osdmap.reset();
}