Merge PR #27415 into master

* refs/pull/27415/head:
	qa: decouple session map test from simple msgr
	msg/async: move connection ref
	msg/async: dec active connections when marked down

Reviewed-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Patrick Donnelly 2019-04-08 15:41:48 -07:00
commit 9ce4c2df01
No known key found for this signature in database
GPG Key ID: 3A2A7E25BEA8AADB
3 changed files with 31 additions and 49 deletions

View File

@ -22,47 +22,35 @@ class TestSessionMap(CephFSTestCase):
self.mount_a.umount_wait()
self.mount_b.umount_wait()
mds_id = self.fs.get_lone_mds_id()
self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id), "session", "ls")
status = self.fs.status()
self.fs.rank_tell(["session", "ls"], status=status)
ls_data = self.fs.mds_asok(['session', 'ls'])
ls_data = self.fs.rank_asok(['session', 'ls'], status=status)
self.assertEqual(len(ls_data), 0)
def _get_thread_count(self, mds_id):
remote = self.fs.mds_daemons[mds_id].remote
ps_txt = remote.run(
args=["ps", "-ww", "axo", "nlwp,cmd"],
stdout=StringIO()
).stdout.getvalue().strip()
lines = ps_txt.split("\n")[1:]
for line in lines:
if "ceph-mds" in line and not "daemon-helper" in line:
if line.find("-i {0}".format(mds_id)) != -1:
log.info("Found ps line for daemon: {0}".format(line))
return int(line.split()[0])
raise RuntimeError("No process found in ps output for MDS {0}: {1}".format(
mds_id, ps_txt
))
def _get_connection_count(self, status=None):
perf = self.fs.rank_asok(["perf", "dump"], status=status)
conn = 0
for module, dump in perf.iteritems():
if "AsyncMessenger::Worker" in module:
conn += dump['msgr_active_connections']
return conn
def test_tell_conn_close(self):
"""
That when a `tell` command is sent using the python CLI,
the thread count goes back to where it started (i.e. we aren't
the conn count goes back to where it started (i.e. we aren't
leaving connections open)
"""
self.mount_a.umount_wait()
self.mount_b.umount_wait()
mds_id = self.fs.get_lone_mds_id()
status = self.fs.status()
s = self._get_connection_count(status=status)
self.fs.rank_tell(["session", "ls"], status=status)
e = self._get_connection_count(status=status)
initial_thread_count = self._get_thread_count(mds_id)
self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id), "session", "ls")
final_thread_count = self._get_thread_count(mds_id)
self.assertEqual(initial_thread_count, final_thread_count)
self.assertEqual(s, e)
def test_mount_conn_close(self):
"""
@ -72,16 +60,15 @@ class TestSessionMap(CephFSTestCase):
self.mount_a.umount_wait()
self.mount_b.umount_wait()
mds_id = self.fs.get_lone_mds_id()
initial_thread_count = self._get_thread_count(mds_id)
status = self.fs.status()
s = self._get_connection_count(status=status)
self.mount_a.mount()
self.mount_a.wait_until_mounted()
self.assertGreater(self._get_thread_count(mds_id), initial_thread_count)
self.assertGreater(self._get_connection_count(status=status), s)
self.mount_a.umount_wait()
final_thread_count = self._get_thread_count(mds_id)
e = self._get_connection_count(status=status)
self.assertEqual(initial_thread_count, final_thread_count)
self.assertEqual(s, e)
def test_version_splitting(self):
"""
@ -102,11 +89,7 @@ class TestSessionMap(CephFSTestCase):
self.fs.set_max_mds(2)
self.fs.wait_for_daemons()
active_mds_names = self.fs.get_active_names()
rank_0_id = active_mds_names[0]
rank_1_id = active_mds_names[1]
log.info("Ranks 0 and 1 are {0} and {1}".format(
rank_0_id, rank_1_id))
status = self.fs.status()
# Bring the clients back
self.mount_a.mount()
@ -115,10 +98,10 @@ class TestSessionMap(CephFSTestCase):
self.mount_b.create_files()
# See that they've got sessions
self.assert_session_count(2, mds_id=rank_0_id)
self.assert_session_count(2, mds_id=self.fs.get_rank(status=status)['name'])
# See that we persist their sessions
self.fs.mds_asok(["flush", "journal"], rank_0_id)
self.fs.rank_asok(["flush", "journal"], rank=0, status=status)
table_json = json.loads(self.fs.table_tool(["0", "show", "session"]))
log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2)))
self.assertEqual(table_json['0']['result'], 0)
@ -130,14 +113,14 @@ class TestSessionMap(CephFSTestCase):
self.mount_b.run_shell(["ls", "-l", "bravo/file"])
def get_omap_wrs():
return self.fs.mds_asok(['perf', 'dump', 'objecter'], rank_1_id)['objecter']['omap_wr']
return self.fs.rank_asok(['perf', 'dump', 'objecter'], rank=1, status=status)['objecter']['omap_wr']
# Flush so that there are no dirty sessions on rank 1
self.fs.mds_asok(["flush", "journal"], rank_1_id)
self.fs.rank_asok(["flush", "journal"], rank=1, status=status)
# Export so that we get a force_open to rank 1 for the two sessions from rank 0
initial_omap_wrs = get_omap_wrs()
self.fs.mds_asok(['export', 'dir', '/bravo', '1'], rank_0_id)
self.fs.rank_asok(['export', 'dir', '/bravo', '1'], rank=0, status=status)
# This is the critical (if rather subtle) check: that in the process of doing an export dir,
# we hit force_open_sessions, and as a result we end up writing out the sessionmap. There
@ -156,10 +139,10 @@ class TestSessionMap(CephFSTestCase):
self.mount_b.umount_wait()
# In-memory sessionmap check
self.assert_session_count(0, mds_id=rank_0_id)
self.assert_session_count(0, mds_id=self.fs.get_rank(status=status)['name'])
# On-disk sessionmap check
self.fs.mds_asok(["flush", "journal"], rank_0_id)
self.fs.rank_asok(["flush", "journal"], rank=0, status=status)
table_json = json.loads(self.fs.table_tool(["0", "show", "session"]))
log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2)))
self.assertEqual(table_json['0']['result'], 0)

View File

@ -856,7 +856,6 @@ int AsyncMessenger::accept_conn(AsyncConnectionRef conn)
// If conn already in, we will return 0
Mutex::Locker l(deleted_lock);
if (deleted_conns.erase(existing)) {
existing->get_perf_counter()->dec(l_msgr_active_connections);
conns.erase(it);
} else if (conn != existing) {
return -1;

View File

@ -314,7 +314,6 @@ private:
// lazy delete, see "deleted_conns"
Mutex::Locker l(deleted_lock);
if (deleted_conns.erase(p->second)) {
p->second->get_perf_counter()->dec(l_msgr_active_connections);
conns.erase(p);
return NULL;
}
@ -402,7 +401,8 @@ public:
*/
void unregister_conn(AsyncConnectionRef conn) {
Mutex::Locker l(deleted_lock);
deleted_conns.insert(conn);
conn->get_perf_counter()->dec(l_msgr_active_connections);
deleted_conns.emplace(std::move(conn));
if (deleted_conns.size() >= ReapDeadConnectionThreshold) {
local_worker->center.dispatch_event_external(reap_handler);