qa: decouple session map test from simple msgr

Instead of looking at the number of threads (used by the simple messenger) to
judge the coming and going of connections, use the (async) messenger perf
counters.

Plus some other minor improvements.

Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
This commit is contained in:
Patrick Donnelly 2019-04-05 16:06:22 -07:00
parent 5134a7ddbb
commit 95714b5c7c
No known key found for this signature in database
GPG Key ID: 3A2A7E25BEA8AADB

View File

@ -22,47 +22,35 @@ class TestSessionMap(CephFSTestCase):
self.mount_a.umount_wait()
self.mount_b.umount_wait()
mds_id = self.fs.get_lone_mds_id()
self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id), "session", "ls")
status = self.fs.status()
self.fs.rank_tell(["session", "ls"], status=status)
ls_data = self.fs.mds_asok(['session', 'ls'])
ls_data = self.fs.rank_asok(['session', 'ls'], status=status)
self.assertEqual(len(ls_data), 0)
def _get_thread_count(self, mds_id):
remote = self.fs.mds_daemons[mds_id].remote
ps_txt = remote.run(
args=["ps", "-ww", "axo", "nlwp,cmd"],
stdout=StringIO()
).stdout.getvalue().strip()
lines = ps_txt.split("\n")[1:]
for line in lines:
if "ceph-mds" in line and not "daemon-helper" in line:
if line.find("-i {0}".format(mds_id)) != -1:
log.info("Found ps line for daemon: {0}".format(line))
return int(line.split()[0])
raise RuntimeError("No process found in ps output for MDS {0}: {1}".format(
mds_id, ps_txt
))
def _get_connection_count(self, status=None):
perf = self.fs.rank_asok(["perf", "dump"], status=status)
conn = 0
for module, dump in perf.iteritems():
if "AsyncMessenger::Worker" in module:
conn += dump['msgr_active_connections']
return conn
def test_tell_conn_close(self):
"""
That when a `tell` command is sent using the python CLI,
the thread count goes back to where it started (i.e. we aren't
the conn count goes back to where it started (i.e. we aren't
leaving connections open)
"""
self.mount_a.umount_wait()
self.mount_b.umount_wait()
mds_id = self.fs.get_lone_mds_id()
status = self.fs.status()
s = self._get_connection_count(status=status)
self.fs.rank_tell(["session", "ls"], status=status)
e = self._get_connection_count(status=status)
initial_thread_count = self._get_thread_count(mds_id)
self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id), "session", "ls")
final_thread_count = self._get_thread_count(mds_id)
self.assertEqual(initial_thread_count, final_thread_count)
self.assertEqual(s, e)
def test_mount_conn_close(self):
"""
@ -72,16 +60,15 @@ class TestSessionMap(CephFSTestCase):
self.mount_a.umount_wait()
self.mount_b.umount_wait()
mds_id = self.fs.get_lone_mds_id()
initial_thread_count = self._get_thread_count(mds_id)
status = self.fs.status()
s = self._get_connection_count(status=status)
self.mount_a.mount()
self.mount_a.wait_until_mounted()
self.assertGreater(self._get_thread_count(mds_id), initial_thread_count)
self.assertGreater(self._get_connection_count(status=status), s)
self.mount_a.umount_wait()
final_thread_count = self._get_thread_count(mds_id)
e = self._get_connection_count(status=status)
self.assertEqual(initial_thread_count, final_thread_count)
self.assertEqual(s, e)
def test_version_splitting(self):
"""
@ -102,11 +89,7 @@ class TestSessionMap(CephFSTestCase):
self.fs.set_max_mds(2)
self.fs.wait_for_daemons()
active_mds_names = self.fs.get_active_names()
rank_0_id = active_mds_names[0]
rank_1_id = active_mds_names[1]
log.info("Ranks 0 and 1 are {0} and {1}".format(
rank_0_id, rank_1_id))
status = self.fs.status()
# Bring the clients back
self.mount_a.mount()
@ -115,10 +98,10 @@ class TestSessionMap(CephFSTestCase):
self.mount_b.create_files()
# See that they've got sessions
self.assert_session_count(2, mds_id=rank_0_id)
self.assert_session_count(2, mds_id=self.fs.get_rank(status=status)['name'])
# See that we persist their sessions
self.fs.mds_asok(["flush", "journal"], rank_0_id)
self.fs.rank_asok(["flush", "journal"], rank=0, status=status)
table_json = json.loads(self.fs.table_tool(["0", "show", "session"]))
log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2)))
self.assertEqual(table_json['0']['result'], 0)
@ -130,14 +113,14 @@ class TestSessionMap(CephFSTestCase):
self.mount_b.run_shell(["ls", "-l", "bravo/file"])
def get_omap_wrs():
return self.fs.mds_asok(['perf', 'dump', 'objecter'], rank_1_id)['objecter']['omap_wr']
return self.fs.rank_asok(['perf', 'dump', 'objecter'], rank=1, status=status)['objecter']['omap_wr']
# Flush so that there are no dirty sessions on rank 1
self.fs.mds_asok(["flush", "journal"], rank_1_id)
self.fs.rank_asok(["flush", "journal"], rank=1, status=status)
# Export so that we get a force_open to rank 1 for the two sessions from rank 0
initial_omap_wrs = get_omap_wrs()
self.fs.mds_asok(['export', 'dir', '/bravo', '1'], rank_0_id)
self.fs.rank_asok(['export', 'dir', '/bravo', '1'], rank=0, status=status)
# This is the critical (if rather subtle) check: that in the process of doing an export dir,
# we hit force_open_sessions, and as a result we end up writing out the sessionmap. There
@ -156,10 +139,10 @@ class TestSessionMap(CephFSTestCase):
self.mount_b.umount_wait()
# In-memory sessionmap check
self.assert_session_count(0, mds_id=rank_0_id)
self.assert_session_count(0, mds_id=self.fs.get_rank(status=status)['name'])
# On-disk sessionmap check
self.fs.mds_asok(["flush", "journal"], rank_0_id)
self.fs.rank_asok(["flush", "journal"], rank=0, status=status)
table_json = json.loads(self.fs.table_tool(["0", "show", "session"]))
log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2)))
self.assertEqual(table_json['0']['result'], 0)