OSD: Push responsibility for grabbing pg_map_lock up to callers of _remove_pg()

The atomicity requirements of other systems prevent us dropping the PG lock
inside that function, and the PG lock is ordered underneath the pg_map_lock.

Signed-off-by: Greg Farnum <greg@inktank.com>
This commit is contained in:
Greg Farnum 2014-04-10 14:19:46 -07:00
parent 00d36f6e8a
commit 9d8c797e65

View File

@ -1889,11 +1889,9 @@ void OSD::add_newly_split_pg(PG *pg, PG::RecoveryCtx *rctx)
{
epoch_t e(service.get_osdmap()->get_epoch());
pg->get("PGMap"); // For pg_map
{
RWLock::WLocker l(pg_map_lock);
pg_map[pg->info.pgid] = pg;
wake_pg_waiters(pg, pg->info.pgid);
}
pg_map[pg->info.pgid] = pg;
wake_pg_waiters(pg, pg->info.pgid);
dout(10) << "Adding newly split pg " << *pg << dendl;
vector<int> up, acting;
pg->get_osdmap()->pg_to_up_acting_osds(pg->info.pgid.pgid, up, acting);
@ -5901,6 +5899,7 @@ void OSD::consume_map()
for (list<PGRef>::iterator i = to_remove.begin();
i != to_remove.end();
to_remove.erase(i++)) {
RWLock::WLocker locker(pg_map_lock);
(*i)->lock();
_remove_pg(&**i);
(*i)->unlock();
@ -7035,12 +7034,10 @@ void OSD::handle_pg_remove(OpRequestRef op)
continue;
}
{
RWLock::RLocker l(pg_map_lock);
if (pg_map.count(pgid) == 0) {
dout(10) << " don't have pg " << pgid << dendl;
continue;
}
RWLock::WLocker l(pg_map_lock);
if (pg_map.count(pgid) == 0) {
dout(10) << " don't have pg " << pgid << dendl;
continue;
}
dout(5) << "queue_pg_for_deletion: " << pgid << dendl;
PG *pg = _lookup_lock_pg_with_map_lock_held(pgid);
@ -7095,10 +7092,7 @@ void OSD::_remove_pg(PG *pg)
remove_wq.queue(make_pair(PGRef(pg), deleting));
// remove from map
{
RWLock::WLocker l(pg_map_lock);
pg_map.erase(pg->info.pgid);
}
pg_map.erase(pg->info.pgid);
pg->put("PGMap"); // since we've taken it out of map
}
@ -7693,14 +7687,20 @@ struct C_CompleteSplits : public Context {
for (set<boost::intrusive_ptr<PG> >::iterator i = pgs.begin();
i != pgs.end();
++i) {
osd->pg_map_lock.get_write();
(*i)->lock();
osd->add_newly_split_pg(&**i, &rctx);
if (!((*i)->deleting)) {
to_complete.insert((*i)->info.pgid);
osd->service.complete_split(to_complete);
}
osd->pg_map_lock.put_write();
osd->dispatch_context_transaction(rctx, &**i);
if (!((*i)->deleting))
to_complete.insert((*i)->info.pgid);
(*i)->unlock();
to_complete.clear();
}
osd->service.complete_split(to_complete);
osd->dispatch_context(rctx, 0, osd->service.get_osdmap());
}
};