mirror of
https://github.com/ceph/ceph
synced 2025-02-20 17:37:29 +00:00
Merge PR #26468 into master
* refs/pull/26468/head: qa: config recall settings to test cache drop qa: check cache dump works without timeout mds: add 2nd order recall throttle mds: drive log flush and cache trim during recall mds: avoid gather assertion when subs exist mds: output full details for recall threshold Reviewed-by: Patrick Donnelly <pdonnell@redhat.com>
This commit is contained in:
commit
0989abe3fe
@ -225,57 +225,91 @@ class TestMisc(CephFSTestCase):
|
||||
info = self.fs.mds_asok(['dump', 'inode', hex(ino)])
|
||||
assert info['path'] == "/foo"
|
||||
|
||||
def _run_drop_cache_cmd(self, timeout):
|
||||
drop_res = None
|
||||
|
||||
class TestCacheDrop(CephFSTestCase):
|
||||
CLIENTS_REQUIRED = 1
|
||||
|
||||
def _run_drop_cache_cmd(self, timeout=None):
|
||||
result = None
|
||||
mds_id = self.fs.get_lone_mds_id()
|
||||
drop_res = json.loads(
|
||||
self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id),
|
||||
"cache", "drop", str(timeout)))
|
||||
return drop_res
|
||||
|
||||
def _drop_cache_command(self, timeout):
|
||||
self.mount_b.umount_wait()
|
||||
ls_data = self.fs.mds_asok(['session', 'ls'])
|
||||
self.assert_session_count(1, ls_data)
|
||||
if timeout is not None:
|
||||
result = self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id),
|
||||
"cache", "drop", str(timeout))
|
||||
else:
|
||||
result = self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id),
|
||||
"cache", "drop")
|
||||
return json.loads(result)
|
||||
|
||||
def _setup(self, max_caps=20, threshold=400):
|
||||
# create some files
|
||||
self.mount_a.create_n_files("dc-dir/dc-file", 1000)
|
||||
# drop cache
|
||||
drop_res = self._run_drop_cache_cmd(timeout)
|
||||
self.mount_a.create_n_files("dc-dir/dc-file", 1000, sync=True)
|
||||
|
||||
self.assertTrue(drop_res['client_recall']['return_code'] == 0)
|
||||
self.assertTrue(drop_res['flush_journal']['return_code'] == 0)
|
||||
|
||||
def _drop_cache_command_timeout(self, timeout):
|
||||
self.mount_b.umount_wait()
|
||||
ls_data = self.fs.mds_asok(['session', 'ls'])
|
||||
self.assert_session_count(1, ls_data)
|
||||
|
||||
# create some files
|
||||
self.mount_a.create_n_files("dc-dir/dc-file-t", 1000)
|
||||
|
||||
# simulate client death and try drop cache
|
||||
self.mount_a.kill()
|
||||
drop_res = self._run_drop_cache_cmd(timeout)
|
||||
|
||||
self.assertTrue(drop_res['client_recall']['return_code'] == -errno.ETIMEDOUT)
|
||||
self.assertTrue(drop_res['flush_journal']['return_code'] == 0)
|
||||
|
||||
self.mount_a.kill_cleanup()
|
||||
self.mount_a.mount()
|
||||
self.mount_a.wait_until_mounted()
|
||||
# Reduce this so the MDS doesn't rkcall the maximum for simple tests
|
||||
self.fs.rank_asok(['config', 'set', 'mds_recall_max_caps', str(max_caps)])
|
||||
self.fs.rank_asok(['config', 'set', 'mds_recall_max_decay_threshold', str(threshold)])
|
||||
|
||||
def test_drop_cache_command(self):
|
||||
"""
|
||||
Basic test for checking drop cache command using tell interface.
|
||||
Basic test for checking drop cache command.
|
||||
Confirm it halts without a timeout.
|
||||
Note that the cache size post trimming is not checked here.
|
||||
"""
|
||||
self._drop_cache_command(10)
|
||||
mds_min_caps_per_client = int(self.fs.get_config("mds_min_caps_per_client"))
|
||||
self._setup()
|
||||
result = self._run_drop_cache_cmd()
|
||||
self.assertTrue(result['client_recall']['return_code'] == 0)
|
||||
self.assertTrue(result['flush_journal']['return_code'] == 0)
|
||||
# It should take at least 1 second
|
||||
self.assertTrue(result['duration'] > 1)
|
||||
self.assertGreaterEqual(result['trim_cache']['trimmed'], 1000-2*mds_min_caps_per_client)
|
||||
|
||||
def test_drop_cache_command_timeout(self):
|
||||
"""
|
||||
Basic test for checking drop cache command.
|
||||
Confirm recall halts early via a timeout.
|
||||
Note that the cache size post trimming is not checked here.
|
||||
"""
|
||||
self._setup()
|
||||
result = self._run_drop_cache_cmd(timeout=10)
|
||||
self.assertTrue(result['client_recall']['return_code'] == -errno.ETIMEDOUT)
|
||||
self.assertTrue(result['flush_journal']['return_code'] == 0)
|
||||
self.assertTrue(result['duration'] > 10)
|
||||
self.assertGreaterEqual(result['trim_cache']['trimmed'], 100) # we did something, right?
|
||||
|
||||
def test_drop_cache_command_dead_timeout(self):
|
||||
"""
|
||||
Check drop cache command with non-responding client using tell
|
||||
interface. Note that the cache size post trimming is not checked
|
||||
here.
|
||||
"""
|
||||
self._drop_cache_command_timeout(5)
|
||||
self._setup()
|
||||
self.mount_a.kill()
|
||||
# Note: recall is subject to the timeout. The journal flush will
|
||||
# be delayed due to the client being dead.
|
||||
result = self._run_drop_cache_cmd(timeout=5)
|
||||
self.assertTrue(result['client_recall']['return_code'] == -errno.ETIMEDOUT)
|
||||
self.assertTrue(result['flush_journal']['return_code'] == 0)
|
||||
self.assertTrue(result['duration'] > 5)
|
||||
self.assertTrue(result['duration'] < 120)
|
||||
self.assertEqual(0, result['trim_cache']['trimmed'])
|
||||
self.mount_a.kill_cleanup()
|
||||
self.mount_a.mount()
|
||||
self.mount_a.wait_until_mounted()
|
||||
|
||||
def test_drop_cache_command_dead(self):
|
||||
"""
|
||||
Check drop cache command with non-responding client using tell
|
||||
interface. Note that the cache size post trimming is not checked
|
||||
here.
|
||||
"""
|
||||
self._setup()
|
||||
self.mount_a.kill()
|
||||
result = self._run_drop_cache_cmd()
|
||||
self.assertTrue(result['client_recall']['return_code'] == 0)
|
||||
self.assertTrue(result['flush_journal']['return_code'] == 0)
|
||||
self.assertTrue(result['duration'] > 5)
|
||||
self.assertTrue(result['duration'] < 120)
|
||||
self.assertEqual(0, result['trim_cache']['trimmed'])
|
||||
self.mount_a.kill_cleanup()
|
||||
self.mount_a.mount()
|
||||
self.mount_a.wait_until_mounted()
|
||||
|
@ -313,6 +313,15 @@ private:
|
||||
Context *timer_task = nullptr;
|
||||
};
|
||||
|
||||
auto do_trim() {
|
||||
auto [throttled, count] = mdcache->trim(UINT64_MAX);
|
||||
dout(10) << __func__
|
||||
<< (throttled ? " (throttled)" : "")
|
||||
<< " trimmed " << count << " caps" << dendl;
|
||||
dentries_trimmed += count;
|
||||
return std::make_pair(throttled, count);
|
||||
}
|
||||
|
||||
void recall_client_state() {
|
||||
dout(20) << __func__ << dendl;
|
||||
auto now = mono_clock::now();
|
||||
@ -333,12 +342,15 @@ private:
|
||||
ctx->start_timer();
|
||||
gather->set_finisher(new MDSInternalContextWrapper(mds, ctx));
|
||||
gather->activate();
|
||||
mdlog->flush(); /* use down-time to incrementally flush log */
|
||||
do_trim(); /* use down-time to incrementally trim cache */
|
||||
} else {
|
||||
if (!gather->has_subs()) {
|
||||
delete gather;
|
||||
return handle_recall_client_state(0);
|
||||
} else if (recall_timeout > 0 && duration > recall_timeout) {
|
||||
delete gather;
|
||||
gather->set_finisher(new C_MDSInternalNoop);
|
||||
gather->activate();
|
||||
return handle_recall_client_state(-ETIMEDOUT);
|
||||
} else {
|
||||
uint64_t remaining = (recall_timeout == 0 ? 0 : recall_timeout-duration);
|
||||
@ -400,11 +412,7 @@ private:
|
||||
void trim_cache() {
|
||||
dout(20) << __func__ << dendl;
|
||||
|
||||
auto [throttled, count] = mdcache->trim(UINT64_MAX);
|
||||
dout(10) << __func__
|
||||
<< (throttled ? " (throttled)" : "")
|
||||
<< " trimmed " << count << " caps" << dendl;
|
||||
dentries_trimmed += count;
|
||||
auto [throttled, count] = do_trim();
|
||||
if (throttled && count > 0) {
|
||||
auto timer = new FunctionContext([this](int _) {
|
||||
trim_cache();
|
||||
|
@ -1579,11 +1579,16 @@ std::pair<bool, uint64_t> Server::recall_client_state(MDSGatherBuilder* gather,
|
||||
uint64_t recall = std::min<uint64_t>(recall_max_caps, num_caps-newlim);
|
||||
newlim = num_caps-recall;
|
||||
const uint64_t session_recall_throttle = session->get_recall_caps_throttle();
|
||||
const uint64_t session_recall_throttle2o = session->get_recall_caps_throttle2o();
|
||||
const uint64_t global_recall_throttle = recall_throttle.get();
|
||||
if (session_recall_throttle+recall > recall_max_decay_threshold) {
|
||||
dout(15) << " session recall threshold (" << recall_max_decay_threshold << ") hit at " << session_recall_throttle << "; skipping!" << dendl;
|
||||
throttled = true;
|
||||
continue;
|
||||
} else if (session_recall_throttle2o+recall > recall_max_caps*2) {
|
||||
dout(15) << " session recall 2nd-order threshold (" << 2*recall_max_caps << ") hit at " << session_recall_throttle2o << "; skipping!" << dendl;
|
||||
throttled = true;
|
||||
continue;
|
||||
} else if (global_recall_throttle+recall > recall_global_max_decay_threshold) {
|
||||
dout(15) << " global recall threshold (" << recall_global_max_decay_threshold << ") hit at " << global_recall_throttle << "; skipping!" << dendl;
|
||||
throttled = true;
|
||||
@ -1602,7 +1607,9 @@ std::pair<bool, uint64_t> Server::recall_client_state(MDSGatherBuilder* gather,
|
||||
* session threshold for the session's cap recall throttle.
|
||||
*/
|
||||
dout(15) << " 2*session_release < session_recall"
|
||||
" (2*" << session_release << " < " << session_recall << ");"
|
||||
" (2*" << session_release << " < " << session_recall << ") &&"
|
||||
" 2*session_recall < recall_max_decay_threshold"
|
||||
" (2*" << session_recall << " > " << recall_max_decay_threshold << ")"
|
||||
" Skipping because we are unlikely to get more released." << dendl;
|
||||
continue;
|
||||
} else if (recall < recall_max_caps && 2*recall < session_recall) {
|
||||
|
@ -883,6 +883,7 @@ uint64_t Session::notify_recall_sent(size_t new_limit)
|
||||
* throttle future RECALL messages).
|
||||
*/
|
||||
recall_caps_throttle.hit(count);
|
||||
recall_caps_throttle2o.hit(count);
|
||||
recall_caps.hit(count);
|
||||
return new_change;
|
||||
}
|
||||
|
@ -120,6 +120,8 @@ private:
|
||||
DecayCounter release_caps;
|
||||
// throttle on caps recalled
|
||||
DecayCounter recall_caps_throttle;
|
||||
// second order throttle that prevents recalling too quickly
|
||||
DecayCounter recall_caps_throttle2o;
|
||||
// New limit in SESSION_RECALL
|
||||
uint32_t recall_limit = 0;
|
||||
|
||||
@ -185,6 +187,9 @@ public:
|
||||
auto get_recall_caps_throttle() const {
|
||||
return recall_caps_throttle.get();
|
||||
}
|
||||
auto get_recall_caps_throttle2o() const {
|
||||
return recall_caps_throttle2o.get();
|
||||
}
|
||||
auto get_recall_caps() const {
|
||||
return recall_caps.get();
|
||||
}
|
||||
@ -392,6 +397,7 @@ public:
|
||||
recall_caps(g_conf().get_val<double>("mds_recall_warning_decay_rate")),
|
||||
release_caps(g_conf().get_val<double>("mds_recall_warning_decay_rate")),
|
||||
recall_caps_throttle(g_conf().get_val<double>("mds_recall_max_decay_rate")),
|
||||
recall_caps_throttle2o(0.5),
|
||||
birth_time(clock::now()),
|
||||
auth_caps(g_ceph_context),
|
||||
item_session_list(this),
|
||||
|
Loading…
Reference in New Issue
Block a user