mirror of https://github.com/ceph/ceph
mds: client is evicted when an export subtree task is interrupted
The importer will force open some sessions provided by the exporter but the client does not know about the new sessions until the exporter notifies it, and the notifications cannot be sent if the exporter is interrupted. The client does not renew the sessions regularly that it does not know about, so the client will be evicted by the importer after `session_autoclose` seconds (300 seconds by default). The sessions that are forced opened in the importer need to be closed when the import process is reversed. Signed-off-by: Zhansong Gao <zhsgao@hotmail.com>
This commit is contained in:
parent
f23bd5d099
commit
00b0711188
|
@ -2949,10 +2949,14 @@ void Migrator::import_reverse(CDir *dir)
|
|||
}
|
||||
in->put(CInode::PIN_IMPORTINGCAPS);
|
||||
}
|
||||
}
|
||||
|
||||
if (stat.state == IMPORT_LOGGINGSTART || stat.state == IMPORT_ACKING) {
|
||||
for (auto& p : stat.session_map) {
|
||||
Session *session = p.second.first;
|
||||
session->dec_importing();
|
||||
}
|
||||
mds->server->close_forced_opened_sessions(stat.session_map);
|
||||
}
|
||||
|
||||
// log our failure
|
||||
|
@ -3054,10 +3058,9 @@ void Migrator::import_logged_start(dirfrag_t df, CDir *dir, mds_rank_t from,
|
|||
dout(7) << *dir << dendl;
|
||||
|
||||
map<dirfrag_t, import_state_t>::iterator it = import_state.find(dir->dirfrag());
|
||||
if (it == import_state.end() ||
|
||||
it->second.state != IMPORT_LOGGINGSTART) {
|
||||
ceph_assert(it != import_state.end());
|
||||
if (it->second.state != IMPORT_LOGGINGSTART) {
|
||||
dout(7) << "import " << df << " must have aborted" << dendl;
|
||||
mds->server->finish_force_open_sessions(imported_session_map);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -615,6 +615,9 @@ void Server::handle_client_session(const cref_t<MClientSession> &m)
|
|||
mds->send_message(reply, m->get_connection());
|
||||
return;
|
||||
}
|
||||
if (!session->client_opened) {
|
||||
session->client_opened = true;
|
||||
}
|
||||
if (session->is_opening() ||
|
||||
session->is_open() ||
|
||||
session->is_stale() ||
|
||||
|
@ -1054,7 +1057,7 @@ version_t Server::prepare_force_open_sessions(map<client_t,entity_inst_t>& cm,
|
|||
return pv;
|
||||
}
|
||||
|
||||
void Server::finish_force_open_sessions(const map<client_t,pair<Session*,uint64_t> >& smap,
|
||||
void Server::finish_force_open_sessions(map<client_t,pair<Session*,uint64_t> >& smap,
|
||||
bool dec_import)
|
||||
{
|
||||
/*
|
||||
|
@ -1073,7 +1076,7 @@ void Server::finish_force_open_sessions(const map<client_t,pair<Session*,uint64_
|
|||
dout(10) << "force_open_sessions skipping changed " << session->info.inst << dendl;
|
||||
} else {
|
||||
dout(10) << "force_open_sessions opened " << session->info.inst << dendl;
|
||||
mds->sessionmap.set_state(session, Session::STATE_OPEN);
|
||||
it.second.second = mds->sessionmap.set_state(session, Session::STATE_OPEN);
|
||||
mds->sessionmap.touch_session(session);
|
||||
metrics_handler->add_session(session);
|
||||
|
||||
|
@ -1103,6 +1106,29 @@ void Server::finish_force_open_sessions(const map<client_t,pair<Session*,uint64_
|
|||
dout(10) << __func__ << ": final v " << mds->sessionmap.get_version() << dendl;
|
||||
}
|
||||
|
||||
void Server::close_forced_opened_sessions(const map<client_t,pair<Session*,uint64_t> >& smap)
|
||||
{
|
||||
dout(10) << __func__ << " on " << smap.size() << " clients" << dendl;
|
||||
|
||||
for (auto &it : smap) {
|
||||
Session *session = it.second.first;
|
||||
uint64_t sseq = it.second.second;
|
||||
if (sseq == 0)
|
||||
continue;
|
||||
if (session->get_state_seq() != sseq) {
|
||||
dout(10) << "skipping changed session (" << session->get_state_name() << ") "
|
||||
<< session->info.inst << dendl;
|
||||
continue;
|
||||
}
|
||||
if (session->client_opened)
|
||||
continue;
|
||||
dout(10) << "closing forced opened session (" << session->get_state_name() << ") "
|
||||
<< session->info.inst << dendl;
|
||||
ceph_assert(!session->is_importing());
|
||||
journal_close_session(session, Session::STATE_CLOSING, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
class C_MDS_TerminatedSessions : public ServerContext {
|
||||
void finish(int r) override {
|
||||
server->terminating_sessions = false;
|
||||
|
|
|
@ -129,8 +129,9 @@ public:
|
|||
version_t prepare_force_open_sessions(std::map<client_t,entity_inst_t> &cm,
|
||||
std::map<client_t,client_metadata_t>& cmm,
|
||||
std::map<client_t,std::pair<Session*,uint64_t> >& smap);
|
||||
void finish_force_open_sessions(const std::map<client_t,std::pair<Session*,uint64_t> >& smap,
|
||||
void finish_force_open_sessions(std::map<client_t,std::pair<Session*,uint64_t> >& smap,
|
||||
bool dec_import=true);
|
||||
void close_forced_opened_sessions(const std::map<client_t,std::pair<Session*,uint64_t> >& smap);
|
||||
void flush_client_sessions(std::set<client_t>& client_set, MDSGatherBuilder& gather);
|
||||
void finish_flush_session(Session *session, version_t seq);
|
||||
void terminate_sessions();
|
||||
|
|
|
@ -417,6 +417,10 @@ public:
|
|||
session_info_t info; ///< durable bits
|
||||
MDSAuthCaps auth_caps;
|
||||
|
||||
// True if the session is opened by the client.
|
||||
// False if the session is forced to open, until it is opened again by the client.
|
||||
bool client_opened = false;
|
||||
|
||||
xlist<Session*>::item item_session_list;
|
||||
|
||||
std::list<ceph::ref_t<Message>> preopen_out_queue; ///< messages for client, queued before they connect
|
||||
|
|
Loading…
Reference in New Issue