From 924064f83b7fb5d4f0961ee712d410ed1855cba0 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 16 Apr 2014 10:53:01 +0800 Subject: [PATCH] mds: dynamically adjust priority of committing dirfrags Adjust priority of committing dirfrags according to number of expiring log segments. The more expiring log segments, the higher priority. Because it mean MDS does not trim log segments quickly enough. Signed-off-by: Yan, Zheng --- src/mds/CDir.cc | 19 +++++++++++-------- src/mds/CDir.h | 7 ++++--- src/mds/LogSegment.h | 2 +- src/mds/MDLog.cc | 19 ++++++++++++------- src/mds/MDLog.h | 9 +++++---- src/mds/journal.cc | 4 ++-- 6 files changed, 35 insertions(+), 25 deletions(-) diff --git a/src/mds/CDir.cc b/src/mds/CDir.cc index b89ac08e746..a423ef75dbf 100644 --- a/src/mds/CDir.cc +++ b/src/mds/CDir.cc @@ -1775,7 +1775,7 @@ void CDir::_omap_fetched(bufferlist& hdrbl, map& omap, * @param want - min version i want committed * @param c - callback for completion */ -void CDir::commit(version_t want, Context *c, bool ignore_authpinnability) +void CDir::commit(version_t want, Context *c, bool ignore_authpinnability, int op_prio) { dout(10) << "commit want " << want << " on " << *this << dendl; if (want == 0) want = get_version(); @@ -1797,7 +1797,7 @@ void CDir::commit(version_t want, Context *c, bool ignore_authpinnability) waiting_for_commit[want].push_back(c); // ok. - _commit(want); + _commit(want, op_prio); } class C_Dir_Committed : public Context { @@ -1815,13 +1815,16 @@ public: * Flush out the modified dentries in this dir. Keep the bufferlist * below max_write_size; */ -void CDir::_omap_commit() +void CDir::_omap_commit(int op_prio) { dout(10) << "_omap_commit" << dendl; unsigned max_write_size = cache->max_dir_commit_size; unsigned write_size = 0; + if (op_prio < 0) + op_prio = CEPH_MSG_PRIO_DEFAULT; + // snap purge? const set *snaps = NULL; SnapRealm *realm = inode->find_snaprealm(); @@ -1877,7 +1880,7 @@ void CDir::_omap_commit() if (write_size >= max_write_size) { ObjectOperation op; - op.priority = CEPH_MSG_PRIO_LOW; // set priority lower than journal! + op.priority = op_prio; op.tmap_to_omap(true); // convert tmap to omap if (!to_set.empty()) @@ -1895,7 +1898,7 @@ void CDir::_omap_commit() } ObjectOperation op; - op.priority = CEPH_MSG_PRIO_LOW; // set priority lower than journal! + op.priority = op_prio; op.tmap_to_omap(true); // convert tmap to omap /* @@ -1968,7 +1971,7 @@ void CDir::_encode_dentry(CDentry *dn, bufferlist& bl, } } -void CDir::_commit(version_t want) +void CDir::_commit(version_t want, int op_prio) { dout(10) << "_commit want " << want << " on " << *this << dendl; @@ -2008,7 +2011,7 @@ void CDir::_commit(version_t want) if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_c); - _omap_commit(); + _omap_commit(op_prio); } @@ -2090,7 +2093,7 @@ void CDir::_committed(version_t v) ++n; if (p->first > committed_version) { dout(10) << " there are waiters for " << p->first << ", committing again" << dendl; - _commit(p->first); + _commit(p->first, -1); break; } cache->mds->queue_waiters(p->second); diff --git a/src/mds/CDir.h b/src/mds/CDir.h index 5ef2d65372c..f5762e210c1 100644 --- a/src/mds/CDir.h +++ b/src/mds/CDir.h @@ -498,14 +498,15 @@ protected: // -- commit -- map > waiting_for_commit; - void _commit(version_t want); - void _omap_commit(); + void _commit(version_t want, int op_prio); + void _omap_commit(int op_prio); void _encode_dentry(CDentry *dn, bufferlist& bl, const set *snaps); void _committed(version_t v); public: void wait_for_commit(Context *c, version_t v=0); void commit_to(version_t want); - void commit(version_t want, Context *c, bool ignore_authpinnability=false); + void commit(version_t want, Context *c, + bool ignore_authpinnability=false, int op_prio=-1); // -- dirtyness -- version_t get_committing_version() { return committing_version; } diff --git a/src/mds/LogSegment.h b/src/mds/LogSegment.h index 2fffe3bbaf9..a2b8ace387c 100644 --- a/src/mds/LogSegment.h +++ b/src/mds/LogSegment.h @@ -67,7 +67,7 @@ class LogSegment { map tablev; // try to expire - void try_to_expire(MDS *mds, C_GatherBuilder &gather_bld); + void try_to_expire(MDS *mds, C_GatherBuilder &gather_bld, int op_prio); // cons LogSegment(loff_t off) : diff --git a/src/mds/MDLog.cc b/src/mds/MDLog.cc index 991eaf5d2b9..c224695671b 100644 --- a/src/mds/MDLog.cc +++ b/src/mds/MDLog.cc @@ -333,9 +333,14 @@ void MDLog::trim(int m) if (stop < ceph_clock_now(g_ceph_context)) break; - if ((int)expiring_segments.size() >= g_conf->mds_log_max_expiring) + int num_expiring_segments = (int)expiring_segments.size(); + if (num_expiring_segments >= g_conf->mds_log_max_expiring) break; + int op_prio = CEPH_MSG_PRIO_LOW + + (CEPH_MSG_PRIO_HIGH - CEPH_MSG_PRIO_LOW) * + num_expiring_segments / g_conf->mds_log_max_expiring; + // look at first segment LogSegment *ls = p->second; assert(ls); @@ -351,7 +356,7 @@ void MDLog::trim(int m) } else if (expired_segments.count(ls)) { dout(5) << "trim already expired segment " << ls->offset << ", " << ls->num_events << " events" << dendl; } else { - try_expire(ls); + try_expire(ls, op_prio); } } @@ -360,16 +365,16 @@ void MDLog::trim(int m) } -void MDLog::try_expire(LogSegment *ls) +void MDLog::try_expire(LogSegment *ls, int op_prio) { C_GatherBuilder gather_bld(g_ceph_context); - ls->try_to_expire(mds, gather_bld); + ls->try_to_expire(mds, gather_bld, op_prio); if (gather_bld.has_subs()) { assert(expiring_segments.count(ls) == 0); expiring_segments.insert(ls); expiring_events += ls->num_events; dout(5) << "try_expire expiring segment " << ls->offset << dendl; - gather_bld.set_finisher(new C_MaybeExpiredSegment(this, ls)); + gather_bld.set_finisher(new C_MaybeExpiredSegment(this, ls, op_prio)); gather_bld.activate(); } else { dout(10) << "try_expire expired segment " << ls->offset << dendl; @@ -380,13 +385,13 @@ void MDLog::try_expire(LogSegment *ls) logger->set(l_mdl_evexg, expiring_events); } -void MDLog::_maybe_expired(LogSegment *ls) +void MDLog::_maybe_expired(LogSegment *ls, int op_prio) { dout(10) << "_maybe_expired segment " << ls->offset << " " << ls->num_events << " events" << dendl; assert(expiring_segments.count(ls)); expiring_segments.erase(ls); expiring_events -= ls->num_events; - try_expire(ls); + try_expire(ls, op_prio); } void MDLog::_trim_expired_segments() diff --git a/src/mds/MDLog.h b/src/mds/MDLog.h index 82d51c33d15..6e8e980c94e 100644 --- a/src/mds/MDLog.h +++ b/src/mds/MDLog.h @@ -219,15 +219,16 @@ private: class C_MaybeExpiredSegment : public Context { MDLog *mdlog; LogSegment *ls; + int op_prio; public: - C_MaybeExpiredSegment(MDLog *mdl, LogSegment *s) : mdlog(mdl), ls(s) {} + C_MaybeExpiredSegment(MDLog *mdl, LogSegment *s, int p) : mdlog(mdl), ls(s), op_prio(p) {} void finish(int res) { - mdlog->_maybe_expired(ls); + mdlog->_maybe_expired(ls, op_prio); } }; - void try_expire(LogSegment *ls); - void _maybe_expired(LogSegment *ls); + void try_expire(LogSegment *ls, int op_prio); + void _maybe_expired(LogSegment *ls, int op_prio); void _expired(LogSegment *ls); void _trim_expired_segments(); diff --git a/src/mds/journal.cc b/src/mds/journal.cc index 2bf24c3eb3d..2b8bef0bd8e 100644 --- a/src/mds/journal.cc +++ b/src/mds/journal.cc @@ -62,7 +62,7 @@ // ----------------------- // LogSegment -void LogSegment::try_to_expire(MDS *mds, C_GatherBuilder &gather_bld) +void LogSegment::try_to_expire(MDS *mds, C_GatherBuilder &gather_bld, int op_prio) { set commit; @@ -103,7 +103,7 @@ void LogSegment::try_to_expire(MDS *mds, C_GatherBuilder &gather_bld) assert(dir->is_auth()); if (dir->can_auth_pin()) { dout(15) << "try_to_expire committing " << *dir << dendl; - dir->commit(0, gather_bld.new_sub()); + dir->commit(0, gather_bld.new_sub(), false, op_prio); } else { dout(15) << "try_to_expire waiting for unfreeze on " << *dir << dendl; dir->add_waiter(CDir::WAIT_UNFREEZE, gather_bld.new_sub());