osd: schedule agent from a priority queue

We need to focus agent attention on those PGs that most need it.  For
starters, full PGs need immediate attention so that we can unblock IO.
More generally, fuller ones will give us the best payoff in terms of
evicted data vs effort expended finding candidate objects.

Restructure the agent queue with priorities.  Quantize evict_effort so that
PGs do not jump between priorities too frequently.

Signed-off-by: Sage Weil <sage@inktank.com>
This commit is contained in:
Sage Weil 2014-02-12 12:39:25 -08:00
parent a8129829ce
commit 0dd1e07194
5 changed files with 88 additions and 32 deletions

View File

@ -388,7 +388,8 @@ OPTION(osd_backfill_retry_interval, OPT_DOUBLE, 10.0)
// max agent flush ops
OPTION(osd_agent_max_ops, OPT_INT, 4)
OPTION(osd_agent_min_evict_effort, OPT_FLOAT, .05)
OPTION(osd_agent_min_evict_effort, OPT_FLOAT, .1)
OPTION(osd_agent_quantize_effort, OPT_FLOAT, .1)
// decay atime and hist histograms after how many objects go by
OPTION(osd_agent_hist_halflife, OPT_INT, 1000)

View File

@ -192,7 +192,7 @@ OSDService::OSDService(OSD *osd) :
sched_scrub_lock("OSDService::sched_scrub_lock"), scrubs_pending(0),
scrubs_active(0),
agent_lock("OSD::agent_lock"),
agent_queue_pos(agent_queue.begin()),
agent_valid_iterator(false),
agent_ops(0),
agent_active(true),
agent_thread(this),
@ -466,22 +466,33 @@ void OSDService::agent_entry()
{
dout(10) << __func__ << " start" << dendl;
agent_lock.Lock();
// stick at least one level in there to simplify other paths
if (agent_queue.empty())
agent_queue[0];
while (!agent_stop_flag) {
uint64_t level = agent_queue.rbegin()->first;
set<PGRef>& top = agent_queue.rbegin()->second;
dout(10) << __func__
<< " pgs " << agent_queue.size()
<< " ops " << agent_ops << "/"
<< " tiers " << agent_queue.size()
<< ", top is " << level
<< " with pgs " << top.size()
<< ", ops " << agent_ops << "/"
<< g_conf->osd_agent_max_ops
<< (agent_active ? " active" : " NOT ACTIVE")
<< dendl;
dout(20) << __func__ << " oids " << agent_oids << dendl;
if (agent_ops >= g_conf->osd_agent_max_ops || agent_queue.empty() ||
if (agent_ops >= g_conf->osd_agent_max_ops || top.empty() ||
!agent_active) {
agent_cond.Wait(agent_lock);
continue;
}
if (agent_queue_pos == agent_queue.end())
agent_queue_pos = agent_queue.begin();
if (!agent_valid_iterator || agent_queue_pos == top.end()) {
agent_queue_pos = top.begin();
agent_valid_iterator = true;
}
PGRef pg = *agent_queue_pos;
int max = g_conf->osd_agent_max_ops - agent_ops;
agent_lock.Unlock();

View File

@ -434,8 +434,9 @@ public:
// -- agent shared state --
Mutex agent_lock;
Cond agent_cond;
set<PGRef> agent_queue;
map<uint64_t, set<PGRef> > agent_queue;
set<PGRef>::iterator agent_queue_pos;
bool agent_valid_iterator;
int agent_ops;
set<hobject_t> agent_oids;
bool agent_active;
@ -452,22 +453,48 @@ public:
void agent_entry();
void agent_stop();
/// enable agent for a pg
void agent_enable_pg(PG *pg) {
Mutex::Locker l(agent_lock);
if (agent_queue.empty())
void _enqueue(PG *pg, uint64_t priority) {
if (!agent_queue.empty() &&
agent_queue.rbegin()->first < priority)
agent_valid_iterator = false; // inserting higher-priority queue
set<PGRef>& nq = agent_queue[priority];
if (nq.empty())
agent_cond.Signal();
agent_queue.insert(pg);
nq.insert(pg);
}
void _dequeue(PG *pg, uint64_t old_priority) {
set<PGRef>& oq = agent_queue[old_priority];
set<PGRef>::iterator p = oq.find(pg);
assert(p != oq.end());
if (p == agent_queue_pos)
++agent_queue_pos;
oq.erase(p);
if (oq.empty() && agent_queue.size() > 1) {
if (agent_queue.rbegin()->first == old_priority)
agent_valid_iterator = false;
agent_queue.erase(old_priority);
}
}
/// enable agent for a pg
void agent_enable_pg(PG *pg, uint64_t priority) {
Mutex::Locker l(agent_lock);
_enqueue(pg, priority);
}
/// adjust priority for an enagled pg
void agent_adjust_pg(PG *pg, uint64_t old_priority, uint64_t new_priority) {
Mutex::Locker l(agent_lock);
assert(new_priority != old_priority);
_enqueue(pg, new_priority);
_dequeue(pg, old_priority);
}
/// disable agent for a pg
void agent_disable_pg(PG *pg) {
void agent_disable_pg(PG *pg, uint64_t old_priority) {
Mutex::Locker l(agent_lock);
set<PGRef>::iterator p = agent_queue.find(pg);
assert(p != agent_queue.end());
if (p == agent_queue_pos)
++agent_queue_pos;
agent_queue.erase(p);
_dequeue(pg, old_priority);
}
/// note start of an async (flush) op

View File

@ -10462,11 +10462,11 @@ void ReplicatedPG::agent_stop()
if (agent_state && !agent_state->is_idle()) {
agent_state->evict_mode = TierAgentState::EVICT_MODE_IDLE;
agent_state->flush_mode = TierAgentState::FLUSH_MODE_IDLE;
osd->agent_disable_pg(this);
osd->agent_disable_pg(this, agent_state->evict_effort);
}
}
bool ReplicatedPG::agent_choose_mode()
void ReplicatedPG::agent_choose_mode()
{
uint64_t divisor = pool.info.get_pg_num_divisor(info.pgid);
@ -10529,11 +10529,21 @@ bool ReplicatedPG::agent_choose_mode()
evict_mode = TierAgentState::EVICT_MODE_SOME;
uint64_t over = full_micro - evict_target;
uint64_t span = 1000000 - evict_target;
evict_effort = MIN(over * 1000000 / span,
evict_effort = MAX(over * 1000000 / span,
(unsigned)(1000000.0 * g_conf->osd_agent_min_evict_effort));
// quantize effort to avoid too much reordering in the agent_queue.
uint64_t inc = g_conf->osd_agent_quantize_effort * 1000000;
assert(inc > 0);
uint64_t was = evict_effort;
evict_effort -= evict_effort % inc;
if (evict_effort < inc)
evict_effort = inc;
assert(evict_effort >= inc && evict_effort <= 1000000);
dout(30) << __func__ << " evict_effort " << was << " quantized by " << inc << " to " << evict_effort << dendl;
}
bool changed = false;
bool old_idle = agent_state->is_idle();
if (flush_mode != agent_state->flush_mode) {
dout(5) << __func__ << " flush_mode "
<< TierAgentState::get_flush_mode_name(agent_state->flush_mode)
@ -10541,7 +10551,6 @@ bool ReplicatedPG::agent_choose_mode()
<< TierAgentState::get_flush_mode_name(flush_mode)
<< dendl;
agent_state->flush_mode = flush_mode;
changed = true;
}
if (evict_mode != agent_state->evict_mode) {
dout(5) << __func__ << " evict_mode "
@ -10553,8 +10562,8 @@ bool ReplicatedPG::agent_choose_mode()
requeue_ops(waiting_for_cache_not_full);
}
agent_state->evict_mode = evict_mode;
changed = true;
}
uint64_t old_effort = agent_state->evict_effort;
if (evict_effort != agent_state->evict_effort) {
dout(5) << __func__ << " evict_effort "
<< ((float)agent_state->evict_effort / 1000000.0)
@ -10563,13 +10572,21 @@ bool ReplicatedPG::agent_choose_mode()
<< dendl;
agent_state->evict_effort = evict_effort;
}
if (changed) {
if (agent_state->is_idle())
osd->agent_disable_pg(this);
else
osd->agent_enable_pg(this);
// NOTE: we are using evict_effort as a proxy for *all* agent effort
// (including flush). This is probably fine (they should be
// correlated) but it is not precisely correct.
if (agent_state->is_idle()) {
if (!old_idle) {
osd->agent_disable_pg(this, old_effort);
}
} else {
if (old_idle) {
osd->agent_enable_pg(this, agent_state->evict_effort);
} else if (old_effort != agent_state->evict_effort) {
osd->agent_adjust_pg(this, old_effort, agent_state->evict_effort);
}
}
return changed;
}
void ReplicatedPG::agent_estimate_atime_temp(const hobject_t& oid,

View File

@ -702,7 +702,7 @@ protected:
/// clear agent state
void agent_clear();
bool agent_choose_mode(); ///< choose (new) agent mode(s)
void agent_choose_mode(); ///< choose (new) agent mode(s)
/// true if we can send an ondisk/commit for v
bool already_complete(eversion_t v) {