mirror of
https://github.com/ceph/ceph
synced 2025-01-19 01:21:49 +00:00
Merge pull request #2226 from athanatos/wip-8396
Wip 8396 Reviewed-by: Sage Weil <sage@redhat.com>
This commit is contained in:
commit
de0c7202a0
175
src/osd/OSD.cc
175
src/osd/OSD.cc
@ -1649,7 +1649,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
|
||||
disk_tp(cct, "OSD::disk_tp", cct->_conf->osd_disk_threads, "osd_disk_threads"),
|
||||
command_tp(cct, "OSD::command_tp", 1),
|
||||
paused_recovery(false),
|
||||
session_waiting_for_map_lock("OSD::session_waiting_for_map_lock"),
|
||||
session_waiting_lock("OSD::session_waiting_lock"),
|
||||
heartbeat_lock("OSD::heartbeat_lock"),
|
||||
heartbeat_stop(false), heartbeat_update_lock("OSD::heartbeat_update_lock"),
|
||||
heartbeat_need_update(true), heartbeat_epoch(0),
|
||||
@ -2358,7 +2358,7 @@ int OSD::shutdown()
|
||||
|
||||
service.start_shutdown();
|
||||
|
||||
dispatch_sessions_waiting_on_map();
|
||||
clear_waiting_sessions();
|
||||
|
||||
// Shutdown PGs
|
||||
{
|
||||
@ -2616,7 +2616,6 @@ PG *OSD::_open_lock_pg(
|
||||
pg_map[pgid] = pg;
|
||||
pg->get("PGMap"); // because it's in pg_map
|
||||
service.pg_add_epoch(pg->info.pgid, createmap->get_epoch());
|
||||
wake_pg_waiters(pg, pgid);
|
||||
}
|
||||
return pg;
|
||||
}
|
||||
@ -2648,7 +2647,6 @@ void OSD::add_newly_split_pg(PG *pg, PG::RecoveryCtx *rctx)
|
||||
pg->get("PGMap"); // For pg_map
|
||||
pg_map[pg->info.pgid] = pg;
|
||||
service.pg_add_epoch(pg->info.pgid, pg->get_osdmap()->get_epoch());
|
||||
wake_pg_waiters(pg, pg->info.pgid);
|
||||
|
||||
dout(10) << "Adding newly split pg " << *pg << dendl;
|
||||
vector<int> up, acting;
|
||||
@ -2769,20 +2767,30 @@ PG *OSD::_create_lock_pg(
|
||||
|
||||
PG *OSD::get_pg_or_queue_for_pg(const spg_t& pgid, OpRequestRef& op)
|
||||
{
|
||||
{
|
||||
RWLock::RLocker l(pg_map_lock);
|
||||
ceph::unordered_map<spg_t, PG*>::iterator i = pg_map.find(pgid);
|
||||
if (i != pg_map.end())
|
||||
return i->second;
|
||||
}
|
||||
RWLock::WLocker l(pg_map_lock);
|
||||
Session *session = static_cast<Session*>(
|
||||
op->get_req()->get_connection()->get_priv());
|
||||
assert(session);
|
||||
// get_pg_or_queue_for_pg is only called from the fast_dispatch path where
|
||||
// the session_dispatch_lock must already be held.
|
||||
assert(session->session_dispatch_lock.is_locked());
|
||||
RWLock::RLocker l(pg_map_lock);
|
||||
|
||||
ceph::unordered_map<spg_t, PG*>::iterator i = pg_map.find(pgid);
|
||||
if (i != pg_map.end()) {
|
||||
return i->second;
|
||||
if (i == pg_map.end())
|
||||
session->waiting_for_pg[pgid];
|
||||
|
||||
map<spg_t, list<OpRequestRef> >::iterator wlistiter =
|
||||
session->waiting_for_pg.find(pgid);
|
||||
|
||||
PG *out = NULL;
|
||||
if (wlistiter == session->waiting_for_pg.end()) {
|
||||
out = i->second;
|
||||
} else {
|
||||
waiting_for_pg[pgid].push_back(op);
|
||||
return NULL;
|
||||
wlistiter->second.push_back(op);
|
||||
register_session_waiting_on_pg(session, pgid);
|
||||
}
|
||||
session->put();
|
||||
return out;
|
||||
}
|
||||
|
||||
bool OSD::_have_pg(spg_t pgid)
|
||||
@ -4360,6 +4368,7 @@ bool OSD::ms_handle_reset(Connection *con)
|
||||
return false;
|
||||
session->wstate.reset();
|
||||
session->con.reset(NULL); // break con <-> session ref cycle
|
||||
session_handle_reset(session);
|
||||
session->put();
|
||||
return true;
|
||||
}
|
||||
@ -5400,6 +5409,8 @@ bool OSD::ms_dispatch(Message *m)
|
||||
|
||||
void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
|
||||
{
|
||||
assert(session->session_dispatch_lock.is_locked());
|
||||
assert(session->osdmap == osdmap);
|
||||
for (list<OpRequestRef>::iterator i = session->waiting_on_map.begin();
|
||||
i != session->waiting_on_map.end() && dispatch_op_fast(*i, osdmap);
|
||||
session->waiting_on_map.erase(i++));
|
||||
@ -5411,6 +5422,85 @@ void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void OSD::update_waiting_for_pg(Session *session, OSDMapRef newmap)
|
||||
{
|
||||
assert(session->session_dispatch_lock.is_locked());
|
||||
if (!session->osdmap) {
|
||||
session->osdmap = newmap;
|
||||
return;
|
||||
}
|
||||
|
||||
if (newmap->get_epoch() == session->osdmap->get_epoch())
|
||||
return;
|
||||
|
||||
assert(newmap->get_epoch() > session->osdmap->get_epoch());
|
||||
|
||||
map<spg_t, list<OpRequestRef> > from;
|
||||
from.swap(session->waiting_for_pg);
|
||||
|
||||
for (map<spg_t, list<OpRequestRef> >::iterator i = from.begin();
|
||||
i != from.end();
|
||||
from.erase(i++)) {
|
||||
set<spg_t> children;
|
||||
if (!newmap->have_pg_pool(i->first.pool())) {
|
||||
// drop this wait list on the ground
|
||||
i->second.clear();
|
||||
} else {
|
||||
assert(session->osdmap->have_pg_pool(i->first.pool()));
|
||||
if (i->first.is_split(
|
||||
session->osdmap->get_pg_num(i->first.pool()),
|
||||
newmap->get_pg_num(i->first.pool()),
|
||||
&children)) {
|
||||
for (set<spg_t>::iterator child = children.begin();
|
||||
child != children.end();
|
||||
++child) {
|
||||
unsigned split_bits = child->get_split_bits(
|
||||
newmap->get_pg_num(child->pool()));
|
||||
list<OpRequestRef> child_ops;
|
||||
OSD::split_list(&i->second, &child_ops, child->ps(), split_bits);
|
||||
if (!child_ops.empty()) {
|
||||
session->waiting_for_pg[*child].swap(child_ops);
|
||||
register_session_waiting_on_pg(session, *child);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (i->second.empty()) {
|
||||
clear_session_waiting_on_pg(session, i->first);
|
||||
} else {
|
||||
session->waiting_for_pg[i->first].swap(i->second);
|
||||
}
|
||||
}
|
||||
|
||||
session->osdmap = newmap;
|
||||
}
|
||||
|
||||
void OSD::session_notify_pg_create(
|
||||
Session *session, OSDMapRef osdmap, spg_t pgid)
|
||||
{
|
||||
assert(session->session_dispatch_lock.is_locked());
|
||||
update_waiting_for_pg(session, osdmap);
|
||||
map<spg_t, list<OpRequestRef> >::iterator i =
|
||||
session->waiting_for_pg.find(pgid);
|
||||
if (i != session->waiting_for_pg.end()) {
|
||||
session->waiting_on_map.splice(
|
||||
session->waiting_on_map.end(),
|
||||
i->second);
|
||||
session->waiting_for_pg.erase(i);
|
||||
}
|
||||
clear_session_waiting_on_pg(session, pgid);
|
||||
}
|
||||
|
||||
void OSD::session_notify_pg_cleared(
|
||||
Session *session, OSDMapRef osdmap, spg_t pgid)
|
||||
{
|
||||
assert(session->session_dispatch_lock.is_locked());
|
||||
update_waiting_for_pg(session, osdmap);
|
||||
session->waiting_for_pg.erase(pgid);
|
||||
clear_session_waiting_on_pg(session, pgid);
|
||||
}
|
||||
|
||||
void OSD::ms_fast_dispatch(Message *m)
|
||||
{
|
||||
if (service.is_stopping()) {
|
||||
@ -5423,6 +5513,7 @@ void OSD::ms_fast_dispatch(Message *m)
|
||||
assert(session);
|
||||
{
|
||||
Mutex::Locker l(session->session_dispatch_lock);
|
||||
update_waiting_for_pg(session, nextmap);
|
||||
session->waiting_on_map.push_back(op);
|
||||
dispatch_session_waiting(session, nextmap);
|
||||
}
|
||||
@ -6563,43 +6654,30 @@ void OSD::consume_map()
|
||||
|
||||
dispatch_sessions_waiting_on_map();
|
||||
|
||||
// remove any PGs which we no longer host from the waiting_for_pg list
|
||||
set<spg_t> pgs_to_delete;
|
||||
{
|
||||
RWLock::RLocker l(pg_map_lock);
|
||||
map<spg_t, list<OpRequestRef> >::iterator p = waiting_for_pg.begin();
|
||||
while (p != waiting_for_pg.end()) {
|
||||
spg_t pgid = p->first;
|
||||
// remove any PGs which we no longer host from the session waiting_for_pg lists
|
||||
set<spg_t> pgs_to_check;
|
||||
get_pgs_with_waiting_sessions(&pgs_to_check);
|
||||
for (set<spg_t>::iterator p = pgs_to_check.begin();
|
||||
p != pgs_to_check.end();
|
||||
++p) {
|
||||
vector<int> acting;
|
||||
int nrep = osdmap->pg_to_acting_osds(p->pgid, acting);
|
||||
int role = osdmap->calc_pg_role(whoami, acting, nrep);
|
||||
|
||||
vector<int> acting;
|
||||
int nrep = osdmap->pg_to_acting_osds(pgid.pgid, acting);
|
||||
int role = osdmap->calc_pg_role(whoami, acting, nrep);
|
||||
|
||||
if (role < 0) {
|
||||
pgs_to_delete.insert(p->first);
|
||||
/* we can delete list contents under the read lock because
|
||||
* nobody will be adding to them -- everybody is now using a map
|
||||
* new enough that they will simply drop ops instead of adding
|
||||
* them to the list. */
|
||||
dout(10) << " discarding waiting ops for " << pgid << dendl;
|
||||
while (!p->second.empty()) {
|
||||
p->second.pop_front();
|
||||
}
|
||||
if (role < 0) {
|
||||
set<Session*> concerned_sessions;
|
||||
get_sessions_possibly_interested_in_pg(*p, &concerned_sessions);
|
||||
for (set<Session*>::iterator i = concerned_sessions.begin();
|
||||
i != concerned_sessions.end();
|
||||
++i) {
|
||||
{
|
||||
Mutex::Locker l((*i)->session_dispatch_lock);
|
||||
session_notify_pg_cleared(*i, osdmap, *p);
|
||||
}
|
||||
(*i)->put();
|
||||
}
|
||||
++p;
|
||||
}
|
||||
}
|
||||
{
|
||||
RWLock::WLocker l(pg_map_lock);
|
||||
for (set<spg_t>::iterator i = pgs_to_delete.begin();
|
||||
i != pgs_to_delete.end();
|
||||
++i) {
|
||||
map<spg_t, list<OpRequestRef> >::iterator p = waiting_for_pg.find(*i);
|
||||
assert(p->second.empty());
|
||||
waiting_for_pg.erase(p);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// scan pg's
|
||||
{
|
||||
@ -8302,6 +8380,7 @@ struct C_CompleteSplits : public Context {
|
||||
osd->dispatch_context_transaction(rctx, &**i);
|
||||
to_complete.insert((*i)->info.pgid);
|
||||
(*i)->unlock();
|
||||
osd->wake_pg_waiters(&**i, (*i)->info.pgid);
|
||||
to_complete.clear();
|
||||
}
|
||||
|
||||
|
132
src/osd/OSD.h
132
src/osd/OSD.h
@ -1149,6 +1149,9 @@ public:
|
||||
Mutex session_dispatch_lock;
|
||||
list<OpRequestRef> waiting_on_map;
|
||||
|
||||
OSDMapRef osdmap; /// Map as of which waiting_for_pg is current
|
||||
map<spg_t, list<OpRequestRef> > waiting_for_pg;
|
||||
|
||||
Mutex sent_epoch_lock;
|
||||
epoch_t last_sent_epoch;
|
||||
Mutex received_map_lock;
|
||||
@ -1161,24 +1164,54 @@ public:
|
||||
sent_epoch_lock("Session::sent_epoch_lock"), last_sent_epoch(0),
|
||||
received_map_lock("Session::received_map_lock"), received_map_epoch(0)
|
||||
{}
|
||||
|
||||
|
||||
};
|
||||
void update_waiting_for_pg(Session *session, OSDMapRef osdmap);
|
||||
void session_notify_pg_create(Session *session, OSDMapRef osdmap, spg_t pgid);
|
||||
void session_notify_pg_cleared(Session *session, OSDMapRef osdmap, spg_t pgid);
|
||||
void dispatch_session_waiting(Session *session, OSDMapRef osdmap);
|
||||
Mutex session_waiting_for_map_lock;
|
||||
|
||||
Mutex session_waiting_lock;
|
||||
set<Session*> session_waiting_for_map;
|
||||
map<spg_t, set<Session*> > session_waiting_for_pg;
|
||||
|
||||
void clear_waiting_sessions() {
|
||||
Mutex::Locker l(session_waiting_lock);
|
||||
for (map<spg_t, set<Session*> >::iterator i =
|
||||
session_waiting_for_pg.begin();
|
||||
i != session_waiting_for_pg.end();
|
||||
++i) {
|
||||
for (set<Session*>::iterator j = i->second.begin();
|
||||
j != i->second.end();
|
||||
++j) {
|
||||
(*j)->put();
|
||||
}
|
||||
}
|
||||
session_waiting_for_pg.clear();
|
||||
|
||||
for (set<Session*>::iterator i = session_waiting_for_map.begin();
|
||||
i != session_waiting_for_map.end();
|
||||
++i) {
|
||||
(*i)->put();
|
||||
}
|
||||
session_waiting_for_map.clear();
|
||||
}
|
||||
|
||||
/// Caller assumes refs for included Sessions
|
||||
void get_sessions_waiting_for_map(set<Session*> *out) {
|
||||
Mutex::Locker l(session_waiting_for_map_lock);
|
||||
Mutex::Locker l(session_waiting_lock);
|
||||
out->swap(session_waiting_for_map);
|
||||
}
|
||||
void register_session_waiting_on_map(Session *session) {
|
||||
Mutex::Locker l(session_waiting_for_map_lock);
|
||||
Mutex::Locker l(session_waiting_lock);
|
||||
if (session_waiting_for_map.count(session) == 0) {
|
||||
session->get();
|
||||
session_waiting_for_map.insert(session);
|
||||
}
|
||||
}
|
||||
void clear_session_waiting_on_map(Session *session) {
|
||||
Mutex::Locker l(session_waiting_for_map_lock);
|
||||
Mutex::Locker l(session_waiting_lock);
|
||||
set<Session*>::iterator i = session_waiting_for_map.find(session);
|
||||
if (i != session_waiting_for_map.end()) {
|
||||
(*i)->put();
|
||||
@ -1192,11 +1225,82 @@ public:
|
||||
i != sessions_to_check.end();
|
||||
sessions_to_check.erase(i++)) {
|
||||
(*i)->session_dispatch_lock.Lock();
|
||||
update_waiting_for_pg(*i, osdmap);
|
||||
dispatch_session_waiting(*i, osdmap);
|
||||
(*i)->session_dispatch_lock.Unlock();
|
||||
(*i)->put();
|
||||
}
|
||||
}
|
||||
void clear_session_waiting_on_pg(Session *session, spg_t pgid) {
|
||||
Mutex::Locker l(session_waiting_lock);
|
||||
map<spg_t, set<Session*> >::iterator i = session_waiting_for_pg.find(pgid);
|
||||
if (i == session_waiting_for_pg.end()) {
|
||||
return;
|
||||
}
|
||||
set<Session*>::iterator j = i->second.find(session);
|
||||
if (j != i->second.end()) {
|
||||
(*j)->put();
|
||||
i->second.erase(j);
|
||||
}
|
||||
if (i->second.empty()) {
|
||||
session_waiting_for_pg.erase(i);
|
||||
}
|
||||
}
|
||||
void session_handle_reset(Session *session) {
|
||||
Mutex::Locker l(session->session_dispatch_lock);
|
||||
clear_session_waiting_on_map(session);
|
||||
vector<spg_t> pgs_to_clear;
|
||||
pgs_to_clear.reserve(session->waiting_for_pg.size());
|
||||
for (map<spg_t, list<OpRequestRef> >::iterator i =
|
||||
session->waiting_for_pg.begin();
|
||||
i != session->waiting_for_pg.end();
|
||||
++i) {
|
||||
pgs_to_clear.push_back(i->first);
|
||||
}
|
||||
for (vector<spg_t>::iterator i = pgs_to_clear.begin();
|
||||
i != pgs_to_clear.end();
|
||||
++i) {
|
||||
clear_session_waiting_on_pg(session, *i);
|
||||
}
|
||||
}
|
||||
void register_session_waiting_on_pg(Session *session, spg_t pgid) {
|
||||
Mutex::Locker l(session_waiting_lock);
|
||||
set<Session*> &s = session_waiting_for_pg[pgid];
|
||||
set<Session*>::iterator i = s.find(session);
|
||||
if (i == s.end()) {
|
||||
session->get();
|
||||
s.insert(session);
|
||||
}
|
||||
}
|
||||
void get_sessions_possibly_interested_in_pg(
|
||||
spg_t pgid, set<Session*> *sessions) {
|
||||
Mutex::Locker l(session_waiting_lock);
|
||||
while (1) {
|
||||
map<spg_t, set<Session*> >::iterator i = session_waiting_for_pg.find(pgid);
|
||||
if (i != session_waiting_for_pg.end()) {
|
||||
sessions->insert(i->second.begin(), i->second.end());
|
||||
}
|
||||
if (pgid.pgid.ps() == 0) {
|
||||
break;
|
||||
} else {
|
||||
pgid = pgid.get_parent();
|
||||
}
|
||||
}
|
||||
for (set<Session*>::iterator i = sessions->begin();
|
||||
i != sessions->end();
|
||||
++i) {
|
||||
(*i)->get();
|
||||
}
|
||||
}
|
||||
void get_pgs_with_waiting_sessions(set<spg_t> *pgs) {
|
||||
Mutex::Locker l(session_waiting_lock);
|
||||
for (map<spg_t, set<Session*> >::iterator i =
|
||||
session_waiting_for_pg.begin();
|
||||
i != session_waiting_for_pg.end();
|
||||
++i) {
|
||||
pgs->insert(i->first);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
/**
|
||||
@ -1587,7 +1691,6 @@ protected:
|
||||
// -- placement groups --
|
||||
RWLock pg_map_lock; // this lock orders *above* individual PG _locks
|
||||
ceph::unordered_map<spg_t, PG*> pg_map; // protected by pg_map lock
|
||||
map<spg_t, list<OpRequestRef> > waiting_for_pg; // protected by pg_map lock
|
||||
|
||||
map<spg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split;
|
||||
PGRecoveryStats pg_recovery_stats;
|
||||
@ -1657,15 +1760,20 @@ protected:
|
||||
); ///< @return false if there was a map gap between from and now
|
||||
|
||||
void wake_pg_waiters(PG* pg, spg_t pgid) {
|
||||
assert(osd_lock.is_locked());
|
||||
// Need write lock on pg_map_lock
|
||||
map<spg_t, list<OpRequestRef> >::iterator i = waiting_for_pg.find(pgid);
|
||||
if (i != waiting_for_pg.end()) {
|
||||
for (list<OpRequestRef>::iterator j = i->second.begin();
|
||||
j != i->second.end();
|
||||
++j) {
|
||||
enqueue_op(pg, *j);
|
||||
set<Session*> concerned_sessions;
|
||||
get_sessions_possibly_interested_in_pg(pgid, &concerned_sessions);
|
||||
|
||||
for (set<Session*>::iterator i = concerned_sessions.begin();
|
||||
i != concerned_sessions.end();
|
||||
++i) {
|
||||
{
|
||||
Mutex::Locker l((*i)->session_dispatch_lock);
|
||||
session_notify_pg_create(*i, osdmap, pgid);
|
||||
dispatch_session_waiting(*i, osdmap);
|
||||
}
|
||||
waiting_for_pg.erase(i);
|
||||
(*i)->put();
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user