mirror of
https://github.com/ceph/ceph
synced 2024-12-22 03:22:00 +00:00
mds: dynamically adjust priority of committing dirfrags
Adjust priority of committing dirfrags according to number of expiring log segments. The more expiring log segments, the higher priority. Because it mean MDS does not trim log segments quickly enough. Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
This commit is contained in:
parent
908fa5edcc
commit
924064f83b
@ -1775,7 +1775,7 @@ void CDir::_omap_fetched(bufferlist& hdrbl, map<string, bufferlist>& omap,
|
||||
* @param want - min version i want committed
|
||||
* @param c - callback for completion
|
||||
*/
|
||||
void CDir::commit(version_t want, Context *c, bool ignore_authpinnability)
|
||||
void CDir::commit(version_t want, Context *c, bool ignore_authpinnability, int op_prio)
|
||||
{
|
||||
dout(10) << "commit want " << want << " on " << *this << dendl;
|
||||
if (want == 0) want = get_version();
|
||||
@ -1797,7 +1797,7 @@ void CDir::commit(version_t want, Context *c, bool ignore_authpinnability)
|
||||
waiting_for_commit[want].push_back(c);
|
||||
|
||||
// ok.
|
||||
_commit(want);
|
||||
_commit(want, op_prio);
|
||||
}
|
||||
|
||||
class C_Dir_Committed : public Context {
|
||||
@ -1815,13 +1815,16 @@ public:
|
||||
* Flush out the modified dentries in this dir. Keep the bufferlist
|
||||
* below max_write_size;
|
||||
*/
|
||||
void CDir::_omap_commit()
|
||||
void CDir::_omap_commit(int op_prio)
|
||||
{
|
||||
dout(10) << "_omap_commit" << dendl;
|
||||
|
||||
unsigned max_write_size = cache->max_dir_commit_size;
|
||||
unsigned write_size = 0;
|
||||
|
||||
if (op_prio < 0)
|
||||
op_prio = CEPH_MSG_PRIO_DEFAULT;
|
||||
|
||||
// snap purge?
|
||||
const set<snapid_t> *snaps = NULL;
|
||||
SnapRealm *realm = inode->find_snaprealm();
|
||||
@ -1877,7 +1880,7 @@ void CDir::_omap_commit()
|
||||
|
||||
if (write_size >= max_write_size) {
|
||||
ObjectOperation op;
|
||||
op.priority = CEPH_MSG_PRIO_LOW; // set priority lower than journal!
|
||||
op.priority = op_prio;
|
||||
op.tmap_to_omap(true); // convert tmap to omap
|
||||
|
||||
if (!to_set.empty())
|
||||
@ -1895,7 +1898,7 @@ void CDir::_omap_commit()
|
||||
}
|
||||
|
||||
ObjectOperation op;
|
||||
op.priority = CEPH_MSG_PRIO_LOW; // set priority lower than journal!
|
||||
op.priority = op_prio;
|
||||
op.tmap_to_omap(true); // convert tmap to omap
|
||||
|
||||
/*
|
||||
@ -1968,7 +1971,7 @@ void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
|
||||
}
|
||||
}
|
||||
|
||||
void CDir::_commit(version_t want)
|
||||
void CDir::_commit(version_t want, int op_prio)
|
||||
{
|
||||
dout(10) << "_commit want " << want << " on " << *this << dendl;
|
||||
|
||||
@ -2008,7 +2011,7 @@ void CDir::_commit(version_t want)
|
||||
|
||||
if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_c);
|
||||
|
||||
_omap_commit();
|
||||
_omap_commit(op_prio);
|
||||
}
|
||||
|
||||
|
||||
@ -2090,7 +2093,7 @@ void CDir::_committed(version_t v)
|
||||
++n;
|
||||
if (p->first > committed_version) {
|
||||
dout(10) << " there are waiters for " << p->first << ", committing again" << dendl;
|
||||
_commit(p->first);
|
||||
_commit(p->first, -1);
|
||||
break;
|
||||
}
|
||||
cache->mds->queue_waiters(p->second);
|
||||
|
@ -498,14 +498,15 @@ protected:
|
||||
|
||||
// -- commit --
|
||||
map<version_t, list<Context*> > waiting_for_commit;
|
||||
void _commit(version_t want);
|
||||
void _omap_commit();
|
||||
void _commit(version_t want, int op_prio);
|
||||
void _omap_commit(int op_prio);
|
||||
void _encode_dentry(CDentry *dn, bufferlist& bl, const set<snapid_t> *snaps);
|
||||
void _committed(version_t v);
|
||||
public:
|
||||
void wait_for_commit(Context *c, version_t v=0);
|
||||
void commit_to(version_t want);
|
||||
void commit(version_t want, Context *c, bool ignore_authpinnability=false);
|
||||
void commit(version_t want, Context *c,
|
||||
bool ignore_authpinnability=false, int op_prio=-1);
|
||||
|
||||
// -- dirtyness --
|
||||
version_t get_committing_version() { return committing_version; }
|
||||
|
@ -67,7 +67,7 @@ class LogSegment {
|
||||
map<int,version_t> tablev;
|
||||
|
||||
// try to expire
|
||||
void try_to_expire(MDS *mds, C_GatherBuilder &gather_bld);
|
||||
void try_to_expire(MDS *mds, C_GatherBuilder &gather_bld, int op_prio);
|
||||
|
||||
// cons
|
||||
LogSegment(loff_t off) :
|
||||
|
@ -333,9 +333,14 @@ void MDLog::trim(int m)
|
||||
if (stop < ceph_clock_now(g_ceph_context))
|
||||
break;
|
||||
|
||||
if ((int)expiring_segments.size() >= g_conf->mds_log_max_expiring)
|
||||
int num_expiring_segments = (int)expiring_segments.size();
|
||||
if (num_expiring_segments >= g_conf->mds_log_max_expiring)
|
||||
break;
|
||||
|
||||
int op_prio = CEPH_MSG_PRIO_LOW +
|
||||
(CEPH_MSG_PRIO_HIGH - CEPH_MSG_PRIO_LOW) *
|
||||
num_expiring_segments / g_conf->mds_log_max_expiring;
|
||||
|
||||
// look at first segment
|
||||
LogSegment *ls = p->second;
|
||||
assert(ls);
|
||||
@ -351,7 +356,7 @@ void MDLog::trim(int m)
|
||||
} else if (expired_segments.count(ls)) {
|
||||
dout(5) << "trim already expired segment " << ls->offset << ", " << ls->num_events << " events" << dendl;
|
||||
} else {
|
||||
try_expire(ls);
|
||||
try_expire(ls, op_prio);
|
||||
}
|
||||
}
|
||||
|
||||
@ -360,16 +365,16 @@ void MDLog::trim(int m)
|
||||
}
|
||||
|
||||
|
||||
void MDLog::try_expire(LogSegment *ls)
|
||||
void MDLog::try_expire(LogSegment *ls, int op_prio)
|
||||
{
|
||||
C_GatherBuilder gather_bld(g_ceph_context);
|
||||
ls->try_to_expire(mds, gather_bld);
|
||||
ls->try_to_expire(mds, gather_bld, op_prio);
|
||||
if (gather_bld.has_subs()) {
|
||||
assert(expiring_segments.count(ls) == 0);
|
||||
expiring_segments.insert(ls);
|
||||
expiring_events += ls->num_events;
|
||||
dout(5) << "try_expire expiring segment " << ls->offset << dendl;
|
||||
gather_bld.set_finisher(new C_MaybeExpiredSegment(this, ls));
|
||||
gather_bld.set_finisher(new C_MaybeExpiredSegment(this, ls, op_prio));
|
||||
gather_bld.activate();
|
||||
} else {
|
||||
dout(10) << "try_expire expired segment " << ls->offset << dendl;
|
||||
@ -380,13 +385,13 @@ void MDLog::try_expire(LogSegment *ls)
|
||||
logger->set(l_mdl_evexg, expiring_events);
|
||||
}
|
||||
|
||||
void MDLog::_maybe_expired(LogSegment *ls)
|
||||
void MDLog::_maybe_expired(LogSegment *ls, int op_prio)
|
||||
{
|
||||
dout(10) << "_maybe_expired segment " << ls->offset << " " << ls->num_events << " events" << dendl;
|
||||
assert(expiring_segments.count(ls));
|
||||
expiring_segments.erase(ls);
|
||||
expiring_events -= ls->num_events;
|
||||
try_expire(ls);
|
||||
try_expire(ls, op_prio);
|
||||
}
|
||||
|
||||
void MDLog::_trim_expired_segments()
|
||||
|
@ -219,15 +219,16 @@ private:
|
||||
class C_MaybeExpiredSegment : public Context {
|
||||
MDLog *mdlog;
|
||||
LogSegment *ls;
|
||||
int op_prio;
|
||||
public:
|
||||
C_MaybeExpiredSegment(MDLog *mdl, LogSegment *s) : mdlog(mdl), ls(s) {}
|
||||
C_MaybeExpiredSegment(MDLog *mdl, LogSegment *s, int p) : mdlog(mdl), ls(s), op_prio(p) {}
|
||||
void finish(int res) {
|
||||
mdlog->_maybe_expired(ls);
|
||||
mdlog->_maybe_expired(ls, op_prio);
|
||||
}
|
||||
};
|
||||
|
||||
void try_expire(LogSegment *ls);
|
||||
void _maybe_expired(LogSegment *ls);
|
||||
void try_expire(LogSegment *ls, int op_prio);
|
||||
void _maybe_expired(LogSegment *ls, int op_prio);
|
||||
void _expired(LogSegment *ls);
|
||||
void _trim_expired_segments();
|
||||
|
||||
|
@ -62,7 +62,7 @@
|
||||
// -----------------------
|
||||
// LogSegment
|
||||
|
||||
void LogSegment::try_to_expire(MDS *mds, C_GatherBuilder &gather_bld)
|
||||
void LogSegment::try_to_expire(MDS *mds, C_GatherBuilder &gather_bld, int op_prio)
|
||||
{
|
||||
set<CDir*> commit;
|
||||
|
||||
@ -103,7 +103,7 @@ void LogSegment::try_to_expire(MDS *mds, C_GatherBuilder &gather_bld)
|
||||
assert(dir->is_auth());
|
||||
if (dir->can_auth_pin()) {
|
||||
dout(15) << "try_to_expire committing " << *dir << dendl;
|
||||
dir->commit(0, gather_bld.new_sub());
|
||||
dir->commit(0, gather_bld.new_sub(), false, op_prio);
|
||||
} else {
|
||||
dout(15) << "try_to_expire waiting for unfreeze on " << *dir << dendl;
|
||||
dir->add_waiter(CDir::WAIT_UNFREEZE, gather_bld.new_sub());
|
||||
|
Loading…
Reference in New Issue
Block a user