diff --git a/qa/tasks/cephfs/test_sessionmap.py b/qa/tasks/cephfs/test_sessionmap.py index 29dd0ff298b..e7de6ef0576 100644 --- a/qa/tasks/cephfs/test_sessionmap.py +++ b/qa/tasks/cephfs/test_sessionmap.py @@ -22,47 +22,35 @@ class TestSessionMap(CephFSTestCase): self.mount_a.umount_wait() self.mount_b.umount_wait() - mds_id = self.fs.get_lone_mds_id() - self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id), "session", "ls") + status = self.fs.status() + self.fs.rank_tell(["session", "ls"], status=status) - ls_data = self.fs.mds_asok(['session', 'ls']) + ls_data = self.fs.rank_asok(['session', 'ls'], status=status) self.assertEqual(len(ls_data), 0) - def _get_thread_count(self, mds_id): - remote = self.fs.mds_daemons[mds_id].remote - - ps_txt = remote.run( - args=["ps", "-ww", "axo", "nlwp,cmd"], - stdout=StringIO() - ).stdout.getvalue().strip() - lines = ps_txt.split("\n")[1:] - - for line in lines: - if "ceph-mds" in line and not "daemon-helper" in line: - if line.find("-i {0}".format(mds_id)) != -1: - log.info("Found ps line for daemon: {0}".format(line)) - return int(line.split()[0]) - - raise RuntimeError("No process found in ps output for MDS {0}: {1}".format( - mds_id, ps_txt - )) + def _get_connection_count(self, status=None): + perf = self.fs.rank_asok(["perf", "dump"], status=status) + conn = 0 + for module, dump in perf.iteritems(): + if "AsyncMessenger::Worker" in module: + conn += dump['msgr_active_connections'] + return conn def test_tell_conn_close(self): """ That when a `tell` command is sent using the python CLI, - the thread count goes back to where it started (i.e. we aren't + the conn count goes back to where it started (i.e. we aren't leaving connections open) """ self.mount_a.umount_wait() self.mount_b.umount_wait() - mds_id = self.fs.get_lone_mds_id() + status = self.fs.status() + s = self._get_connection_count(status=status) + self.fs.rank_tell(["session", "ls"], status=status) + e = self._get_connection_count(status=status) - initial_thread_count = self._get_thread_count(mds_id) - self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id), "session", "ls") - final_thread_count = self._get_thread_count(mds_id) - - self.assertEqual(initial_thread_count, final_thread_count) + self.assertEqual(s, e) def test_mount_conn_close(self): """ @@ -72,16 +60,15 @@ class TestSessionMap(CephFSTestCase): self.mount_a.umount_wait() self.mount_b.umount_wait() - mds_id = self.fs.get_lone_mds_id() - - initial_thread_count = self._get_thread_count(mds_id) + status = self.fs.status() + s = self._get_connection_count(status=status) self.mount_a.mount() self.mount_a.wait_until_mounted() - self.assertGreater(self._get_thread_count(mds_id), initial_thread_count) + self.assertGreater(self._get_connection_count(status=status), s) self.mount_a.umount_wait() - final_thread_count = self._get_thread_count(mds_id) + e = self._get_connection_count(status=status) - self.assertEqual(initial_thread_count, final_thread_count) + self.assertEqual(s, e) def test_version_splitting(self): """ @@ -102,11 +89,7 @@ class TestSessionMap(CephFSTestCase): self.fs.set_max_mds(2) self.fs.wait_for_daemons() - active_mds_names = self.fs.get_active_names() - rank_0_id = active_mds_names[0] - rank_1_id = active_mds_names[1] - log.info("Ranks 0 and 1 are {0} and {1}".format( - rank_0_id, rank_1_id)) + status = self.fs.status() # Bring the clients back self.mount_a.mount() @@ -115,10 +98,10 @@ class TestSessionMap(CephFSTestCase): self.mount_b.create_files() # See that they've got sessions - self.assert_session_count(2, mds_id=rank_0_id) + self.assert_session_count(2, mds_id=self.fs.get_rank(status=status)['name']) # See that we persist their sessions - self.fs.mds_asok(["flush", "journal"], rank_0_id) + self.fs.rank_asok(["flush", "journal"], rank=0, status=status) table_json = json.loads(self.fs.table_tool(["0", "show", "session"])) log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2))) self.assertEqual(table_json['0']['result'], 0) @@ -130,14 +113,14 @@ class TestSessionMap(CephFSTestCase): self.mount_b.run_shell(["ls", "-l", "bravo/file"]) def get_omap_wrs(): - return self.fs.mds_asok(['perf', 'dump', 'objecter'], rank_1_id)['objecter']['omap_wr'] + return self.fs.rank_asok(['perf', 'dump', 'objecter'], rank=1, status=status)['objecter']['omap_wr'] # Flush so that there are no dirty sessions on rank 1 - self.fs.mds_asok(["flush", "journal"], rank_1_id) + self.fs.rank_asok(["flush", "journal"], rank=1, status=status) # Export so that we get a force_open to rank 1 for the two sessions from rank 0 initial_omap_wrs = get_omap_wrs() - self.fs.mds_asok(['export', 'dir', '/bravo', '1'], rank_0_id) + self.fs.rank_asok(['export', 'dir', '/bravo', '1'], rank=0, status=status) # This is the critical (if rather subtle) check: that in the process of doing an export dir, # we hit force_open_sessions, and as a result we end up writing out the sessionmap. There @@ -156,10 +139,10 @@ class TestSessionMap(CephFSTestCase): self.mount_b.umount_wait() # In-memory sessionmap check - self.assert_session_count(0, mds_id=rank_0_id) + self.assert_session_count(0, mds_id=self.fs.get_rank(status=status)['name']) # On-disk sessionmap check - self.fs.mds_asok(["flush", "journal"], rank_0_id) + self.fs.rank_asok(["flush", "journal"], rank=0, status=status) table_json = json.loads(self.fs.table_tool(["0", "show", "session"])) log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2))) self.assertEqual(table_json['0']['result'], 0)