mirror of https://github.com/ceph/ceph
qa: add test for importer's session cleanup after an export subtree task is interrupted
Signed-off-by: Zhansong Gao <zhsgao@hotmail.com>
This commit is contained in:
parent
11a4303d66
commit
eccaf85294
|
@ -252,8 +252,8 @@ class CephFSTestCase(CephTestCase):
|
|||
def get_session_data(self, client_id):
|
||||
return self._session_by_id(client_id)
|
||||
|
||||
def _session_list(self):
|
||||
ls_data = self.fs.mds_asok(['session', 'ls'])
|
||||
def _session_list(self, rank=None, status=None):
|
||||
ls_data = self.fs.rank_asok(['session', 'ls'], rank=rank, status=status)
|
||||
ls_data = [s for s in ls_data if s['state'] not in ['stale', 'closed']]
|
||||
return ls_data
|
||||
|
||||
|
@ -269,9 +269,9 @@ class CephFSTestCase(CephTestCase):
|
|||
def perf_dump(self, rank=None, status=None):
|
||||
return self.fs.rank_asok(['perf', 'dump'], rank=rank, status=status)
|
||||
|
||||
def wait_until_evicted(self, client_id, timeout=30):
|
||||
def wait_until_evicted(self, client_id, rank=None, timeout=30):
|
||||
def is_client_evicted():
|
||||
ls = self._session_list()
|
||||
ls = self._session_list(rank=rank)
|
||||
for s in ls:
|
||||
if s['id'] == client_id:
|
||||
return False
|
||||
|
|
|
@ -724,3 +724,69 @@ class TestDumpExportStates(CephFSTestCase):
|
|||
self._test_freeze_tree(state, 0)
|
||||
|
||||
self.assertTrue(type(state['notify_ack_waiting']) is list)
|
||||
|
||||
class TestKillExports(CephFSTestCase):
|
||||
MDSS_REQUIRED = 2
|
||||
CLIENTS_REQUIRED = 1
|
||||
|
||||
def setUp(self):
|
||||
CephFSTestCase.setUp(self)
|
||||
|
||||
self.fs.set_max_mds(self.MDSS_REQUIRED)
|
||||
self.status = self.fs.wait_for_daemons()
|
||||
|
||||
self.mount_a.run_shell_payload('mkdir -p test/export')
|
||||
|
||||
def tearDown(self):
|
||||
super().tearDown()
|
||||
|
||||
def _kill_export_as(self, rank, kill):
|
||||
self.fs.rank_asok(['config', 'set', 'mds_kill_export_at', str(kill)], rank=rank, status=self.status)
|
||||
|
||||
def _export_dir(self, path, source, target):
|
||||
self.fs.rank_asok(['export', 'dir', path, str(target)], rank=source, status=self.status)
|
||||
|
||||
def _wait_failover(self):
|
||||
self.wait_until_true(lambda: self.fs.status().hadfailover(self.status), timeout=self.fs.beacon_timeout)
|
||||
|
||||
def _clear_coredump(self, rank):
|
||||
crash_rank = self.fs.get_rank(rank=rank, status=self.status)
|
||||
self.delete_mds_coredump(crash_rank['name'])
|
||||
|
||||
def _run_kill_export(self, kill_at, exporter_rank=0, importer_rank=1, restart=True):
|
||||
self._kill_export_as(exporter_rank, kill_at)
|
||||
self._export_dir("/test", exporter_rank, importer_rank)
|
||||
self._wait_failover()
|
||||
self._clear_coredump(exporter_rank)
|
||||
|
||||
if restart:
|
||||
self.fs.rank_restart(rank=exporter_rank, status=self.status)
|
||||
self.status = self.fs.wait_for_daemons()
|
||||
|
||||
def test_session_cleanup(self):
|
||||
"""
|
||||
Test importer's session cleanup after an export subtree task is interrupted.
|
||||
Set 'mds_kill_export_at' to 9 or 10 so that the importer will wait for the exporter
|
||||
to restart while the state is 'acking'.
|
||||
|
||||
See https://tracker.ceph.com/issues/61459
|
||||
"""
|
||||
|
||||
kill_export_at = [9, 10]
|
||||
|
||||
exporter_rank = 0
|
||||
importer_rank = 1
|
||||
|
||||
for kill in kill_export_at:
|
||||
log.info(f"kill_export_at: {kill}")
|
||||
self._run_kill_export(kill, exporter_rank, importer_rank)
|
||||
|
||||
if len(self._session_list(importer_rank, self.status)) > 0:
|
||||
client_id = self.mount_a.get_global_id()
|
||||
self.fs.rank_asok(['session', 'evict', "%s" % client_id], rank=importer_rank, status=self.status)
|
||||
|
||||
# timeout if buggy
|
||||
self.wait_until_evicted(client_id, importer_rank)
|
||||
|
||||
# for multiple tests
|
||||
self.mount_a.remount()
|
Loading…
Reference in New Issue