osd: protect per-pg heartbeat peers with inner lock

Currently we update the overall heartbeat peers by looking directly at
per-pg state.  This is potentially problematic now (#2033), and definitely
so in the future when we push more peering operations into the work queues.

Create a per-pg set of peers, protected by an inner lock, and update it
using PG::update_heartbeat_peers() when appropriate under pg->lock.  Then
aggregate it into the osd peer list in OSD::update_heatbeat_peers() under
osd_lock and the inner lock.

We could probably have re-used osd->heartbeat_lock instead of adding a
new pg->heartbeat_peer_lock, but the finer locking can't hurt.

Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
This commit is contained in:
Sage Weil 2012-02-12 18:08:34 -08:00
parent 508be8e3b3
commit 72a5610879
3 changed files with 55 additions and 14 deletions

View File

@ -1463,18 +1463,14 @@ void OSD::update_heartbeat_peers()
i != pg_map.end();
i++) {
PG *pg = i->second;
// replicas (new and old) ping primary.
if (pg->get_role() == 0) {
assert(pg->acting[0] == whoami);
for (unsigned i=0; i<pg->acting.size(); i++)
_add_heartbeat_source(pg->acting[i], old_from, old_from_stamp, old_con);
for (unsigned i=0; i<pg->up.size(); i++)
_add_heartbeat_source(pg->up[i], old_from, old_from_stamp, old_con);
for (map<int,pg_info_t>::iterator p = pg->peer_info.begin(); p != pg->peer_info.end(); ++p)
if (osdmap->is_up(p->first))
_add_heartbeat_source(p->first, old_from, old_from_stamp, old_con);
}
pg->heartbeat_peer_lock.Lock();
dout(20) << *pg << " heartbeat_peers " << pg->heartbeat_peers << dendl;
for (set<int>::iterator p = pg->heartbeat_peers.begin();
p != pg->heartbeat_peers.end();
++p)
if (osdmap->is_up(*p))
_add_heartbeat_source(*p, old_from, old_from_stamp, old_con);
pg->heartbeat_peer_lock.Unlock();
}
for (map<int,epoch_t>::iterator p = old_from.begin();

View File

@ -248,6 +248,10 @@ bool PG::proc_replica_info(int from, pg_info_t &oinfo)
}
update_stats();
// was this a new info? if so, update peers!
if (p == peer_info.end())
update_heartbeat_peers();
return true;
}
@ -796,15 +800,21 @@ bool PG::adjust_need_up_thru(const OSDMapRef osdmap)
void PG::remove_down_peer_info(const OSDMapRef osdmap)
{
// Remove any downed osds from peer_info
bool removed = false;
map<int,pg_info_t>::iterator p = peer_info.begin();
while (p != peer_info.end()) {
if (!osdmap->is_up(p->first)) {
dout(10) << " dropping down osd." << p->first << " info " << p->second << dendl;
peer_missing.erase(p->first);
peer_info.erase(p++);
removed = true;
} else
p++;
}
// if we removed anyone, update peers (which include peer_info)
if (removed)
update_heartbeat_peers();
}
/*
@ -1602,6 +1612,7 @@ void PG::purge_strays()
{
dout(10) << "purge_strays " << stray_set << dendl;
bool removed = false;
for (set<int>::iterator p = stray_set.begin();
p != stray_set.end();
p++) {
@ -1613,8 +1624,13 @@ void PG::purge_strays()
dout(10) << "not sending PGRemove to down osd." << *p << dendl;
}
peer_info.erase(*p);
removed = true;
}
// if we removed anyone, update peers (which include peer_info)
if (removed)
update_heartbeat_peers();
stray_set.clear();
// clear _requested maps; we may have to peer() again if we discover
@ -1623,8 +1639,22 @@ void PG::purge_strays()
peer_missing_requested.clear();
}
void PG::update_heartbeat_peers()
{
assert(is_locked());
heartbeat_peer_lock.Lock();
heartbeat_peers.clear();
if (role == 0) {
for (unsigned i=0; i<acting.size(); i++)
heartbeat_peers.insert(acting[i]);
for (unsigned i=0; i<up.size(); i++)
heartbeat_peers.insert(up[i]);
for (map<int,pg_info_t>::iterator p = peer_info.begin(); p != peer_info.end(); ++p)
heartbeat_peers.insert(p->first);
}
dout(10) << "update_heartbeat_peers " << heartbeat_peers << dendl;
heartbeat_peer_lock.Unlock();
}
void PG::update_stats()
{
@ -3628,6 +3658,7 @@ boost::statechart::result PG::RecoveryState::Initial::react(const MNotifyRec& no
{
PG *pg = context< RecoveryMachine >().pg;
pg->proc_replica_info(notify.from, notify.info);
pg->update_heartbeat_peers();
return transit< Primary >();
}
@ -3708,6 +3739,9 @@ boost::statechart::result PG::RecoveryState::Reset::react(const ActMap&)
context< RecoveryMachine >().send_notify(pg->get_primary(),
pg->info);
}
pg->update_heartbeat_peers();
return transit< Started >();
}
@ -3853,6 +3887,7 @@ boost::statechart::result PG::RecoveryState::Active::react(const ActMap&)
dout(10) << "Active: handling ActMap" << dendl;
assert(pg->is_active());
assert(pg->is_primary());
pg->check_recovery_op_pulls(pg->get_osdmap());
if (g_conf->osd_check_for_log_corruption)

View File

@ -561,6 +561,13 @@ protected:
epoch_t last_peering_reset;
/* heartbeat peers */
public:
Mutex heartbeat_peer_lock;
set<int> heartbeat_peers;
protected:
/**
* BackfillInterval
*
@ -740,6 +747,8 @@ public:
void purge_strays();
void update_heartbeat_peers();
Context *finish_sync_event;
void finish_recovery(ObjectStore::Transaction& t, list<Context*>& tfin);
@ -1237,6 +1246,7 @@ public:
state(0),
need_up_thru(false),
last_peering_reset(0),
heartbeat_peer_lock("PG::heartbeat_peer_lock"),
backfill_target(-1),
pg_stats_lock("PG::pg_stats_lock"),
pg_stats_valid(false),