diff --git a/qa/tasks/cephfs/cephfs_test_case.py b/qa/tasks/cephfs/cephfs_test_case.py index c1312ec5efc..21b96d2b22b 100644 --- a/qa/tasks/cephfs/cephfs_test_case.py +++ b/qa/tasks/cephfs/cephfs_test_case.py @@ -252,8 +252,8 @@ class CephFSTestCase(CephTestCase): def get_session_data(self, client_id): return self._session_by_id(client_id) - def _session_list(self): - ls_data = self.fs.mds_asok(['session', 'ls']) + def _session_list(self, rank=None, status=None): + ls_data = self.fs.rank_asok(['session', 'ls'], rank=rank, status=status) ls_data = [s for s in ls_data if s['state'] not in ['stale', 'closed']] return ls_data @@ -269,9 +269,9 @@ class CephFSTestCase(CephTestCase): def perf_dump(self, rank=None, status=None): return self.fs.rank_asok(['perf', 'dump'], rank=rank, status=status) - def wait_until_evicted(self, client_id, timeout=30): + def wait_until_evicted(self, client_id, rank=None, timeout=30): def is_client_evicted(): - ls = self._session_list() + ls = self._session_list(rank=rank) for s in ls: if s['id'] == client_id: return False diff --git a/qa/tasks/cephfs/filesystem.py b/qa/tasks/cephfs/filesystem.py index 2b7fd2ee569..3846ef23f97 100644 --- a/qa/tasks/cephfs/filesystem.py +++ b/qa/tasks/cephfs/filesystem.py @@ -649,6 +649,8 @@ class FilesystemBase(MDSClusterBase): def set_session_timeout(self, timeout): self.set_var("session_timeout", "%d" % timeout) + def set_session_autoclose(self, autoclose_time): + self.set_var("session_autoclose", "%d" % autoclose_time) def set_allow_standby_replay(self, yes): self.set_var("allow_standby_replay", yes) diff --git a/qa/tasks/cephfs/test_exports.py b/qa/tasks/cephfs/test_exports.py index e5ad18dd662..346f139874b 100644 --- a/qa/tasks/cephfs/test_exports.py +++ b/qa/tasks/cephfs/test_exports.py @@ -724,3 +724,91 @@ class TestDumpExportStates(CephFSTestCase): self._test_freeze_tree(state, 0) self.assertTrue(type(state['notify_ack_waiting']) is list) + +class TestKillExports(CephFSTestCase): + MDSS_REQUIRED = 2 + CLIENTS_REQUIRED = 1 + + def setUp(self): + CephFSTestCase.setUp(self) + + self.fs.set_max_mds(self.MDSS_REQUIRED) + self.status = self.fs.wait_for_daemons() + + self.mount_a.run_shell_payload('mkdir -p test/export') + + def tearDown(self): + super().tearDown() + + def _kill_export_as(self, rank, kill): + self.fs.rank_asok(['config', 'set', 'mds_kill_export_at', str(kill)], rank=rank, status=self.status) + + def _export_dir(self, path, source, target): + self.fs.rank_asok(['export', 'dir', path, str(target)], rank=source, status=self.status) + + def _wait_failover(self): + self.wait_until_true(lambda: self.fs.status().hadfailover(self.status), timeout=self.fs.beacon_timeout) + + def _clear_coredump(self, rank): + crash_rank = self.fs.get_rank(rank=rank, status=self.status) + self.delete_mds_coredump(crash_rank['name']) + + def _run_kill_export(self, kill_at, exporter_rank=0, importer_rank=1, restart=True): + self._kill_export_as(exporter_rank, kill_at) + self._export_dir("/test", exporter_rank, importer_rank) + self._wait_failover() + self._clear_coredump(exporter_rank) + + if restart: + self.fs.rank_restart(rank=exporter_rank, status=self.status) + self.status = self.fs.wait_for_daemons() + + def test_session_cleanup(self): + """ + Test importer's session cleanup after an export subtree task is interrupted. + Set 'mds_kill_export_at' to 9 or 10 so that the importer will wait for the exporter + to restart while the state is 'acking'. + + See https://tracker.ceph.com/issues/61459 + """ + + kill_export_at = [9, 10] + + exporter_rank = 0 + importer_rank = 1 + + for kill in kill_export_at: + log.info(f"kill_export_at: {kill}") + self._run_kill_export(kill, exporter_rank, importer_rank) + + if len(self._session_list(importer_rank, self.status)) > 0: + client_id = self.mount_a.get_global_id() + self.fs.rank_asok(['session', 'evict', "%s" % client_id], rank=importer_rank, status=self.status) + + # timeout if buggy + self.wait_until_evicted(client_id, importer_rank) + + # for multiple tests + self.mount_a.remount() + + def test_client_eviction(self): + # modify the timeout so that we don't have to wait too long + timeout = 30 + self.fs.set_session_timeout(timeout) + self.fs.set_session_autoclose(timeout + 5) + + kill_export_at = [9, 10] + + exporter_rank = 0 + importer_rank = 1 + + for kill in kill_export_at: + log.info(f"kill_export_at: {kill}") + self._run_kill_export(kill, exporter_rank, importer_rank) + + client_id = self.mount_a.get_global_id() + self.wait_until_evicted(client_id, importer_rank, timeout + 10) + time.sleep(1) + + # failed if buggy + self.mount_a.ls() diff --git a/src/mds/Migrator.cc b/src/mds/Migrator.cc index 722b6bd7422..6b12f710db4 100644 --- a/src/mds/Migrator.cc +++ b/src/mds/Migrator.cc @@ -1957,10 +1957,10 @@ void Migrator::handle_export_ack(const cref_t &m) // this keeps authority().first in sync with subtree auth state in the journal. mdcache->adjust_subtree_auth(dir, it->second.peer, mds->get_nodeid()); + ceph_assert(g_conf()->mds_kill_export_at != 10); // log export completion, then finish (unfreeze, trigger finish context, etc.) mds->mdlog->submit_entry(le, new C_MDS_ExportFinishLogged(this, dir)); mds->mdlog->flush(); - ceph_assert(g_conf()->mds_kill_export_at != 10); } void Migrator::export_notify_abort(CDir *dir, export_state_t& stat, set& bounds) @@ -2844,7 +2844,6 @@ void Migrator::import_reverse(CDir *dir) dout(7) << *dir << dendl; import_state_t& stat = import_state[dir->dirfrag()]; - stat.state = IMPORT_ABORTING; set bounds; mdcache->get_subtree_bounds(dir, bounds); @@ -2950,10 +2949,14 @@ void Migrator::import_reverse(CDir *dir) } in->put(CInode::PIN_IMPORTINGCAPS); } + } + + if (stat.state == IMPORT_LOGGINGSTART || stat.state == IMPORT_ACKING) { for (auto& p : stat.session_map) { Session *session = p.second.first; session->dec_importing(); } + mds->server->close_forced_opened_sessions(stat.session_map); } // log our failure @@ -2962,6 +2965,7 @@ void Migrator::import_reverse(CDir *dir) mdcache->trim(num_dentries); // try trimming dentries // notify bystanders; wait in aborting state + stat.state = IMPORT_ABORTING; import_notify_abort(dir, bounds); } @@ -3054,10 +3058,9 @@ void Migrator::import_logged_start(dirfrag_t df, CDir *dir, mds_rank_t from, dout(7) << *dir << dendl; map::iterator it = import_state.find(dir->dirfrag()); - if (it == import_state.end() || - it->second.state != IMPORT_LOGGINGSTART) { + ceph_assert(it != import_state.end()); + if (it->second.state != IMPORT_LOGGINGSTART) { dout(7) << "import " << df << " must have aborted" << dendl; - mds->server->finish_force_open_sessions(imported_session_map); return; } diff --git a/src/mds/Server.cc b/src/mds/Server.cc index cf286b46d46..5874a3dce56 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -615,6 +615,9 @@ void Server::handle_client_session(const cref_t &m) mds->send_message(reply, m->get_connection()); return; } + if (!session->client_opened) { + session->client_opened = true; + } if (session->is_opening() || session->is_open() || session->is_stale() || @@ -1054,7 +1057,7 @@ version_t Server::prepare_force_open_sessions(map& cm, return pv; } -void Server::finish_force_open_sessions(const map >& smap, +void Server::finish_force_open_sessions(map >& smap, bool dec_import) { /* @@ -1073,7 +1076,7 @@ void Server::finish_force_open_sessions(const mapinfo.inst << dendl; } else { dout(10) << "force_open_sessions opened " << session->info.inst << dendl; - mds->sessionmap.set_state(session, Session::STATE_OPEN); + it.second.second = mds->sessionmap.set_state(session, Session::STATE_OPEN); mds->sessionmap.touch_session(session); metrics_handler->add_session(session); @@ -1103,6 +1106,29 @@ void Server::finish_force_open_sessions(const mapsessionmap.get_version() << dendl; } +void Server::close_forced_opened_sessions(const map >& smap) +{ + dout(10) << __func__ << " on " << smap.size() << " clients" << dendl; + + for (auto &it : smap) { + Session *session = it.second.first; + uint64_t sseq = it.second.second; + if (sseq == 0) + continue; + if (session->get_state_seq() != sseq) { + dout(10) << "skipping changed session (" << session->get_state_name() << ") " + << session->info.inst << dendl; + continue; + } + if (session->client_opened) + continue; + dout(10) << "closing forced opened session (" << session->get_state_name() << ") " + << session->info.inst << dendl; + ceph_assert(!session->is_importing()); + journal_close_session(session, Session::STATE_CLOSING, NULL); + } +} + class C_MDS_TerminatedSessions : public ServerContext { void finish(int r) override { server->terminating_sessions = false; diff --git a/src/mds/Server.h b/src/mds/Server.h index 68842ea01cb..5f9a763e550 100644 --- a/src/mds/Server.h +++ b/src/mds/Server.h @@ -129,8 +129,9 @@ public: version_t prepare_force_open_sessions(std::map &cm, std::map& cmm, std::map >& smap); - void finish_force_open_sessions(const std::map >& smap, + void finish_force_open_sessions(std::map >& smap, bool dec_import=true); + void close_forced_opened_sessions(const std::map >& smap); void flush_client_sessions(std::set& client_set, MDSGatherBuilder& gather); void finish_flush_session(Session *session, version_t seq); void terminate_sessions(); diff --git a/src/mds/SessionMap.cc b/src/mds/SessionMap.cc index ba0b0817738..0f6038eb82b 100644 --- a/src/mds/SessionMap.cc +++ b/src/mds/SessionMap.cc @@ -615,6 +615,7 @@ void Session::dump(Formatter *f, bool cap_dump) const f->dump_unsigned("num_completed_requests", get_num_completed_requests()); f->dump_unsigned("num_completed_flushes", get_num_completed_flushes()); f->dump_bool("reconnecting", reconnecting); + f->dump_int("importing_count", importing_count); f->dump_object("recall_caps", recall_caps); f->dump_object("release_caps", release_caps); f->dump_object("recall_caps_throttle", recall_caps_throttle); diff --git a/src/mds/SessionMap.h b/src/mds/SessionMap.h index 9e82f00a9bf..bfe7dcd4895 100644 --- a/src/mds/SessionMap.h +++ b/src/mds/SessionMap.h @@ -417,6 +417,10 @@ public: session_info_t info; ///< durable bits MDSAuthCaps auth_caps; + // True if the session is opened by the client. + // False if the session is forced to open, until it is opened again by the client. + bool client_opened = false; + xlist::item item_session_list; std::list> preopen_out_queue; ///< messages for client, queued before they connect