Merge pull request #3429 from liewegas/wip-precalc-pgtemp

RFC: mon: prime pg_temp when osdmap changes

Reviewed-by: Guang Yang <yguang@yahoo-inc.com>
Tested-by: Zhi (David) Zhang <zhangz@yahoo-inc.com>
This commit is contained in:
Sage Weil 2015-05-01 09:24:33 -07:00
commit 29ee7953c9
6 changed files with 206 additions and 10 deletions

View File

@ -195,6 +195,8 @@ OPTION(mon_osd_max_op_age, OPT_DOUBLE, 32) // max op age before we get conce
OPTION(mon_osd_max_split_count, OPT_INT, 32) // largest number of PGs per "involved" OSD to let split create
OPTION(mon_osd_allow_primary_temp, OPT_BOOL, false) // allow primary_temp to be set in the osdmap
OPTION(mon_osd_allow_primary_affinity, OPT_BOOL, false) // allow primary_affinity to be set in the osdmap
OPTION(mon_osd_prime_pg_temp, OPT_BOOL, false) // prime osdmap with pg mapping changes
OPTION(mon_osd_prime_pg_temp_max_time, OPT_FLOAT, .5) // max time to spend priming
OPTION(mon_stat_smooth_intervals, OPT_INT, 2) // smooth stats over last N PGMap maps
OPTION(mon_lease, OPT_FLOAT, 5) // lease interval
OPTION(mon_lease_renew_interval, OPT_FLOAT, 3) // on leader, to renew the lease

View File

@ -904,6 +904,139 @@ void OSDMonitor::create_pending()
OSDMap::remove_down_temps(g_ceph_context, osdmap, &pending_inc);
}
void OSDMonitor::maybe_prime_pg_temp()
{
bool all = false;
if (pending_inc.crush.length()) {
dout(10) << __func__ << " new crush map, all" << dendl;
all = true;
}
if (!pending_inc.new_up_client.empty()) {
dout(10) << __func__ << " new up osds, all" << dendl;
all = true;
}
// check for interesting OSDs
set<int> osds;
for (map<int32_t,uint8_t>::iterator p = pending_inc.new_state.begin();
!all && p != pending_inc.new_state.end();
++p) {
if ((p->second & CEPH_OSD_UP) &&
osdmap.is_up(p->first)) {
osds.insert(p->first);
}
}
for (map<int32_t,uint32_t>::iterator p = pending_inc.new_weight.begin();
!all && p != pending_inc.new_weight.end();
++p) {
if (p->second < osdmap.get_weight(p->first)) {
// weight reduction
osds.insert(p->first);
} else {
dout(10) << __func__ << " osd." << p->first << " weight increase, all"
<< dendl;
all = true;
}
}
if (!all && osds.empty())
return;
OSDMap next;
next.deepish_copy_from(osdmap);
next.apply_incremental(pending_inc);
PGMap *pg_map = &mon->pgmon()->pg_map;
utime_t stop = ceph_clock_now(NULL);
stop += g_conf->mon_osd_prime_pg_temp_max_time;
int chunk = 1000;
int n = chunk;
if (all) {
for (ceph::unordered_map<pg_t, pg_stat_t>::iterator pp =
pg_map->pg_stat.begin();
pp != pg_map->pg_stat.end();
++pp) {
prime_pg_temp(next, pp);
if (--n <= 0) {
n = chunk;
if (ceph_clock_now(NULL) > stop) {
dout(10) << __func__ << " consumed more than "
<< g_conf->mon_osd_prime_pg_temp_max_time
<< " seconds, stopping"
<< dendl;
break;
}
}
}
} else {
dout(10) << __func__ << " " << osds.size() << " interesting osds" << dendl;
for (set<int>::iterator p = osds.begin(); p != osds.end(); ++p) {
n -= prime_pg_temp(next, pg_map, *p);
if (--n <= 0) {
n = chunk;
if (ceph_clock_now(NULL) > stop) {
dout(10) << __func__ << " consumed more than "
<< g_conf->mon_osd_prime_pg_temp_max_time
<< " seconds, stopping"
<< dendl;
break;
}
}
}
}
}
void OSDMonitor::prime_pg_temp(OSDMap& next,
ceph::unordered_map<pg_t, pg_stat_t>::iterator pp)
{
// do not touch a mapping if a change is pending
if (pending_inc.new_pg_temp.count(pp->first))
return;
vector<int> up, acting;
int up_primary, acting_primary;
next.pg_to_up_acting_osds(pp->first, &up, &up_primary, &acting, &acting_primary);
if (acting == pp->second.acting)
return; // no change since last pg update, skip
vector<int> cur_up, cur_acting;
osdmap.pg_to_up_acting_osds(pp->first, &cur_up, &up_primary,
&cur_acting, &acting_primary);
if (cur_acting == acting)
return; // no change this epoch; must be stale pg_stat
if (cur_acting.empty())
return; // if previously empty now we can be no worse off
const pg_pool_t *pool = next.get_pg_pool(pp->first.pool());
if (pool && cur_acting.size() < pool->min_size)
return; // can be no worse off than before
dout(20) << __func__ << " " << pp->first << " " << cur_up << "/" << cur_acting
<< " -> " << up << "/" << acting
<< ", priming " << cur_acting
<< dendl;
pending_inc.new_pg_temp[pp->first] = cur_acting;
}
int OSDMonitor::prime_pg_temp(OSDMap& next, PGMap *pg_map, int osd)
{
dout(10) << __func__ << " osd." << osd << dendl;
int num = 0;
ceph::unordered_map<int, set<pg_t> >::iterator po = pg_map->pg_by_osd.find(osd);
if (po != pg_map->pg_by_osd.end()) {
for (set<pg_t>::iterator p = po->second.begin();
p != po->second.end();
++p, ++num) {
ceph::unordered_map<pg_t, pg_stat_t>::iterator pp = pg_map->pg_stat.find(*p);
if (pp == pg_map->pg_stat.end())
continue;
prime_pg_temp(next, pp);
}
}
return num;
}
/**
* @note receiving a transaction in this function gives a fair amount of
* freedom to the service implementation if it does need it. It shouldn't.
@ -919,6 +1052,9 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
int r = pending_inc.propagate_snaps_to_tiers(g_ceph_context, osdmap);
assert(r == 0);
if (g_conf->mon_osd_prime_pg_temp)
maybe_prime_pg_temp();
bufferlist bl;
// tell me about it

View File

@ -34,6 +34,8 @@ using namespace std;
#include "Session.h"
class Monitor;
class PGMap;
#include "messages/MOSDBoot.h"
#include "messages/MMonCommand.h"
#include "messages/MOSDMap.h"
@ -200,6 +202,11 @@ private:
void share_map_with_random_osd();
void maybe_prime_pg_temp();
void prime_pg_temp(OSDMap& next,
ceph::unordered_map<pg_t, pg_stat_t>::iterator pp);
int prime_pg_temp(OSDMap& next, PGMap *pg_map, int osd);
void update_logger();
void handle_query(PaxosServiceMessage *m);

View File

@ -360,6 +360,7 @@ void PGMap::calc_stats()
pg_pool_sum.clear();
pg_sum = pool_stat_t();
osd_sum = osd_stat_t();
pg_by_osd.clear();
for (ceph::unordered_map<pg_t,pg_stat_t>::iterator p = pg_stat.begin();
p != pg_stat.end();
@ -380,16 +381,18 @@ void PGMap::update_pg(pg_t pgid, bufferlist& bl)
{
bufferlist::iterator p = bl.begin();
ceph::unordered_map<pg_t,pg_stat_t>::iterator s = pg_stat.find(pgid);
epoch_t old_lec = 0;
epoch_t old_lec = 0, lec;
if (s != pg_stat.end()) {
old_lec = s->second.get_effective_last_epoch_clean();
stat_pg_sub(pgid, s->second);
stat_pg_update(pgid, s->second, p);
lec = s->second.get_effective_last_epoch_clean();
} else {
pg_stat_t& r = pg_stat[pgid];
::decode(r, p);
stat_pg_add(pgid, r);
lec = r.get_effective_last_epoch_clean();
}
pg_stat_t& r = pg_stat[pgid];
::decode(r, p);
stat_pg_add(pgid, r);
epoch_t lec = r.get_effective_last_epoch_clean();
if (min_last_epoch_clean &&
(lec < min_last_epoch_clean || // we did
(lec > min_last_epoch_clean && // we might
@ -456,7 +459,8 @@ void PGMap::remove_osd(int osd)
}
}
void PGMap::stat_pg_add(const pg_t &pgid, const pg_stat_t &s, bool sumonly)
void PGMap::stat_pg_add(const pg_t &pgid, const pg_stat_t &s, bool sumonly,
bool sameosds)
{
pg_pool_sum[pgid.pool()].add(s);
pg_sum.add(s);
@ -472,14 +476,32 @@ void PGMap::stat_pg_add(const pg_t &pgid, const pg_stat_t &s, bool sumonly)
if (s.acting_primary >= 0)
creating_pgs_by_osd[s.acting_primary].insert(pgid);
}
if (sameosds)
return;
for (vector<int>::const_iterator p = s.blocked_by.begin();
p != s.blocked_by.end();
++p) {
++blocked_by_sum[*p];
}
for (vector<int>::const_iterator p = s.acting.begin(); p != s.acting.end(); ++p) {
set<pg_t>& oset = pg_by_osd[*p];
oset.erase(pgid);
if (oset.empty())
pg_by_osd.erase(*p);
}
for (vector<int>::const_iterator p = s.up.begin(); p != s.up.end(); ++p) {
set<pg_t>& oset = pg_by_osd[*p];
oset.erase(pgid);
if (oset.empty())
pg_by_osd.erase(*p);
}
}
void PGMap::stat_pg_sub(const pg_t &pgid, const pg_stat_t &s, bool sumonly)
void PGMap::stat_pg_sub(const pg_t &pgid, const pg_stat_t &s, bool sumonly,
bool sameosds)
{
pool_stat_t& ps = pg_pool_sum[pgid.pool()];
ps.sub(s);
@ -503,6 +525,9 @@ void PGMap::stat_pg_sub(const pg_t &pgid, const pg_stat_t &s, bool sumonly)
}
}
if (sameosds)
return;
for (vector<int>::const_iterator p = s.blocked_by.begin();
p != s.blocked_by.end();
++p) {
@ -512,6 +537,27 @@ void PGMap::stat_pg_sub(const pg_t &pgid, const pg_stat_t &s, bool sumonly)
if (q->second == 0)
blocked_by_sum.erase(q);
}
for (vector<int>::const_iterator p = s.acting.begin(); p != s.acting.end(); ++p)
pg_by_osd[*p].insert(pgid);
for (vector<int>::const_iterator p = s.up.begin(); p != s.up.end(); ++p)
pg_by_osd[*p].insert(pgid);
}
void PGMap::stat_pg_update(const pg_t pgid, pg_stat_t& s,
bufferlist::iterator& blp)
{
pg_stat_t n;
::decode(n, blp);
bool sameosds =
s.acting == n.acting &&
s.up == n.up &&
s.blocked_by == n.blocked_by;
stat_pg_sub(pgid, s, false, sameosds);
s = n;
stat_pg_add(pgid, n, false, sameosds);
}
void PGMap::stat_osd_add(const osd_stat_t &s)

View File

@ -121,6 +121,7 @@ public:
osd_stat_t osd_sum;
mutable epoch_t min_last_epoch_clean;
ceph::unordered_map<int,int> blocked_by_sum;
ceph::unordered_map<int,set<pg_t> > pg_by_osd;
utime_t stamp;
@ -249,8 +250,11 @@ public:
void redo_full_sets();
void register_nearfull_status(int osd, const osd_stat_t& s);
void calc_stats();
void stat_pg_add(const pg_t &pgid, const pg_stat_t &s, bool sumonly=false);
void stat_pg_sub(const pg_t &pgid, const pg_stat_t &s, bool sumonly=false);
void stat_pg_add(const pg_t &pgid, const pg_stat_t &s, bool sumonly=false,
bool sameosds=false);
void stat_pg_sub(const pg_t &pgid, const pg_stat_t &s, bool sumonly=false,
bool sameosds=false);
void stat_pg_update(const pg_t pgid, pg_stat_t &prev, bufferlist::iterator& blp);
void stat_osd_add(const osd_stat_t &s);
void stat_osd_sub(const osd_stat_t &s);

View File

@ -420,6 +420,7 @@ $extra_conf
mon pg warn min per osd = 3
mon osd allow primary affinity = true
mon reweight min pgs per osd = 4
mon osd prime pg temp = true
$DAEMONOPTS
$CMONDEBUG
$extra_conf