Merge pull request #51766 from zhsgao/mds_clear_session_failed

mds: session in the importing state cannot be cleared if an export subtree task is interrupted while the state of importer is acking

Reviewed-by: Venky Shankar <vshankar@redhat.com>
This commit is contained in:
Venky Shankar 2024-11-14 12:38:35 +05:30 committed by GitHub
commit ef3f512306
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 137 additions and 12 deletions

View File

@ -252,8 +252,8 @@ class CephFSTestCase(CephTestCase):
def get_session_data(self, client_id):
return self._session_by_id(client_id)
def _session_list(self):
ls_data = self.fs.mds_asok(['session', 'ls'])
def _session_list(self, rank=None, status=None):
ls_data = self.fs.rank_asok(['session', 'ls'], rank=rank, status=status)
ls_data = [s for s in ls_data if s['state'] not in ['stale', 'closed']]
return ls_data
@ -269,9 +269,9 @@ class CephFSTestCase(CephTestCase):
def perf_dump(self, rank=None, status=None):
return self.fs.rank_asok(['perf', 'dump'], rank=rank, status=status)
def wait_until_evicted(self, client_id, timeout=30):
def wait_until_evicted(self, client_id, rank=None, timeout=30):
def is_client_evicted():
ls = self._session_list()
ls = self._session_list(rank=rank)
for s in ls:
if s['id'] == client_id:
return False

View File

@ -649,6 +649,8 @@ class FilesystemBase(MDSClusterBase):
def set_session_timeout(self, timeout):
self.set_var("session_timeout", "%d" % timeout)
def set_session_autoclose(self, autoclose_time):
self.set_var("session_autoclose", "%d" % autoclose_time)
def set_allow_standby_replay(self, yes):
self.set_var("allow_standby_replay", yes)

View File

@ -724,3 +724,91 @@ class TestDumpExportStates(CephFSTestCase):
self._test_freeze_tree(state, 0)
self.assertTrue(type(state['notify_ack_waiting']) is list)
class TestKillExports(CephFSTestCase):
MDSS_REQUIRED = 2
CLIENTS_REQUIRED = 1
def setUp(self):
CephFSTestCase.setUp(self)
self.fs.set_max_mds(self.MDSS_REQUIRED)
self.status = self.fs.wait_for_daemons()
self.mount_a.run_shell_payload('mkdir -p test/export')
def tearDown(self):
super().tearDown()
def _kill_export_as(self, rank, kill):
self.fs.rank_asok(['config', 'set', 'mds_kill_export_at', str(kill)], rank=rank, status=self.status)
def _export_dir(self, path, source, target):
self.fs.rank_asok(['export', 'dir', path, str(target)], rank=source, status=self.status)
def _wait_failover(self):
self.wait_until_true(lambda: self.fs.status().hadfailover(self.status), timeout=self.fs.beacon_timeout)
def _clear_coredump(self, rank):
crash_rank = self.fs.get_rank(rank=rank, status=self.status)
self.delete_mds_coredump(crash_rank['name'])
def _run_kill_export(self, kill_at, exporter_rank=0, importer_rank=1, restart=True):
self._kill_export_as(exporter_rank, kill_at)
self._export_dir("/test", exporter_rank, importer_rank)
self._wait_failover()
self._clear_coredump(exporter_rank)
if restart:
self.fs.rank_restart(rank=exporter_rank, status=self.status)
self.status = self.fs.wait_for_daemons()
def test_session_cleanup(self):
"""
Test importer's session cleanup after an export subtree task is interrupted.
Set 'mds_kill_export_at' to 9 or 10 so that the importer will wait for the exporter
to restart while the state is 'acking'.
See https://tracker.ceph.com/issues/61459
"""
kill_export_at = [9, 10]
exporter_rank = 0
importer_rank = 1
for kill in kill_export_at:
log.info(f"kill_export_at: {kill}")
self._run_kill_export(kill, exporter_rank, importer_rank)
if len(self._session_list(importer_rank, self.status)) > 0:
client_id = self.mount_a.get_global_id()
self.fs.rank_asok(['session', 'evict', "%s" % client_id], rank=importer_rank, status=self.status)
# timeout if buggy
self.wait_until_evicted(client_id, importer_rank)
# for multiple tests
self.mount_a.remount()
def test_client_eviction(self):
# modify the timeout so that we don't have to wait too long
timeout = 30
self.fs.set_session_timeout(timeout)
self.fs.set_session_autoclose(timeout + 5)
kill_export_at = [9, 10]
exporter_rank = 0
importer_rank = 1
for kill in kill_export_at:
log.info(f"kill_export_at: {kill}")
self._run_kill_export(kill, exporter_rank, importer_rank)
client_id = self.mount_a.get_global_id()
self.wait_until_evicted(client_id, importer_rank, timeout + 10)
time.sleep(1)
# failed if buggy
self.mount_a.ls()

View File

@ -1957,10 +1957,10 @@ void Migrator::handle_export_ack(const cref_t<MExportDirAck> &m)
// this keeps authority().first in sync with subtree auth state in the journal.
mdcache->adjust_subtree_auth(dir, it->second.peer, mds->get_nodeid());
ceph_assert(g_conf()->mds_kill_export_at != 10);
// log export completion, then finish (unfreeze, trigger finish context, etc.)
mds->mdlog->submit_entry(le, new C_MDS_ExportFinishLogged(this, dir));
mds->mdlog->flush();
ceph_assert(g_conf()->mds_kill_export_at != 10);
}
void Migrator::export_notify_abort(CDir *dir, export_state_t& stat, set<CDir*>& bounds)
@ -2844,7 +2844,6 @@ void Migrator::import_reverse(CDir *dir)
dout(7) << *dir << dendl;
import_state_t& stat = import_state[dir->dirfrag()];
stat.state = IMPORT_ABORTING;
set<CDir*> bounds;
mdcache->get_subtree_bounds(dir, bounds);
@ -2950,10 +2949,14 @@ void Migrator::import_reverse(CDir *dir)
}
in->put(CInode::PIN_IMPORTINGCAPS);
}
}
if (stat.state == IMPORT_LOGGINGSTART || stat.state == IMPORT_ACKING) {
for (auto& p : stat.session_map) {
Session *session = p.second.first;
session->dec_importing();
}
mds->server->close_forced_opened_sessions(stat.session_map);
}
// log our failure
@ -2962,6 +2965,7 @@ void Migrator::import_reverse(CDir *dir)
mdcache->trim(num_dentries); // try trimming dentries
// notify bystanders; wait in aborting state
stat.state = IMPORT_ABORTING;
import_notify_abort(dir, bounds);
}
@ -3054,10 +3058,9 @@ void Migrator::import_logged_start(dirfrag_t df, CDir *dir, mds_rank_t from,
dout(7) << *dir << dendl;
map<dirfrag_t, import_state_t>::iterator it = import_state.find(dir->dirfrag());
if (it == import_state.end() ||
it->second.state != IMPORT_LOGGINGSTART) {
ceph_assert(it != import_state.end());
if (it->second.state != IMPORT_LOGGINGSTART) {
dout(7) << "import " << df << " must have aborted" << dendl;
mds->server->finish_force_open_sessions(imported_session_map);
return;
}

View File

@ -615,6 +615,9 @@ void Server::handle_client_session(const cref_t<MClientSession> &m)
mds->send_message(reply, m->get_connection());
return;
}
if (!session->client_opened) {
session->client_opened = true;
}
if (session->is_opening() ||
session->is_open() ||
session->is_stale() ||
@ -1054,7 +1057,7 @@ version_t Server::prepare_force_open_sessions(map<client_t,entity_inst_t>& cm,
return pv;
}
void Server::finish_force_open_sessions(const map<client_t,pair<Session*,uint64_t> >& smap,
void Server::finish_force_open_sessions(map<client_t,pair<Session*,uint64_t> >& smap,
bool dec_import)
{
/*
@ -1073,7 +1076,7 @@ void Server::finish_force_open_sessions(const map<client_t,pair<Session*,uint64_
dout(10) << "force_open_sessions skipping changed " << session->info.inst << dendl;
} else {
dout(10) << "force_open_sessions opened " << session->info.inst << dendl;
mds->sessionmap.set_state(session, Session::STATE_OPEN);
it.second.second = mds->sessionmap.set_state(session, Session::STATE_OPEN);
mds->sessionmap.touch_session(session);
metrics_handler->add_session(session);
@ -1103,6 +1106,29 @@ void Server::finish_force_open_sessions(const map<client_t,pair<Session*,uint64_
dout(10) << __func__ << ": final v " << mds->sessionmap.get_version() << dendl;
}
void Server::close_forced_opened_sessions(const map<client_t,pair<Session*,uint64_t> >& smap)
{
dout(10) << __func__ << " on " << smap.size() << " clients" << dendl;
for (auto &it : smap) {
Session *session = it.second.first;
uint64_t sseq = it.second.second;
if (sseq == 0)
continue;
if (session->get_state_seq() != sseq) {
dout(10) << "skipping changed session (" << session->get_state_name() << ") "
<< session->info.inst << dendl;
continue;
}
if (session->client_opened)
continue;
dout(10) << "closing forced opened session (" << session->get_state_name() << ") "
<< session->info.inst << dendl;
ceph_assert(!session->is_importing());
journal_close_session(session, Session::STATE_CLOSING, NULL);
}
}
class C_MDS_TerminatedSessions : public ServerContext {
void finish(int r) override {
server->terminating_sessions = false;

View File

@ -129,8 +129,9 @@ public:
version_t prepare_force_open_sessions(std::map<client_t,entity_inst_t> &cm,
std::map<client_t,client_metadata_t>& cmm,
std::map<client_t,std::pair<Session*,uint64_t> >& smap);
void finish_force_open_sessions(const std::map<client_t,std::pair<Session*,uint64_t> >& smap,
void finish_force_open_sessions(std::map<client_t,std::pair<Session*,uint64_t> >& smap,
bool dec_import=true);
void close_forced_opened_sessions(const std::map<client_t,std::pair<Session*,uint64_t> >& smap);
void flush_client_sessions(std::set<client_t>& client_set, MDSGatherBuilder& gather);
void finish_flush_session(Session *session, version_t seq);
void terminate_sessions();

View File

@ -615,6 +615,7 @@ void Session::dump(Formatter *f, bool cap_dump) const
f->dump_unsigned("num_completed_requests", get_num_completed_requests());
f->dump_unsigned("num_completed_flushes", get_num_completed_flushes());
f->dump_bool("reconnecting", reconnecting);
f->dump_int("importing_count", importing_count);
f->dump_object("recall_caps", recall_caps);
f->dump_object("release_caps", release_caps);
f->dump_object("recall_caps_throttle", recall_caps_throttle);

View File

@ -417,6 +417,10 @@ public:
session_info_t info; ///< durable bits
MDSAuthCaps auth_caps;
// True if the session is opened by the client.
// False if the session is forced to open, until it is opened again by the client.
bool client_opened = false;
xlist<Session*>::item item_session_list;
std::list<ceph::ref_t<Message>> preopen_out_queue; ///< messages for client, queued before they connect