Merge PR #49553 into main

* refs/pull/49553/head:
	mds: wait reintegrate to finish when unlinking
	message: make MClientReply inherit MMDSOp
	mds: notify the waiters in replica MDSs

Reviewed-by: Patrick Donnelly <pdonnell@redhat.com>
This commit is contained in:
Patrick Donnelly 2023-05-22 15:42:28 -04:00
commit 6f1eae80f8
No known key found for this signature in database
GPG Key ID: BE69BB7D36E459B4
9 changed files with 148 additions and 9 deletions

View File

@ -108,6 +108,8 @@ ostream& operator<<(ostream& out, const CDentry& dn)
out << " state=" << dn.get_state();
if (dn.is_new()) out << "|new";
if (dn.state_test(CDentry::STATE_BOTTOMLRU)) out << "|bottomlru";
if (dn.state_test(CDentry::STATE_UNLINKING)) out << "|unlinking";
if (dn.state_test(CDentry::STATE_REINTEGRATING)) out << "|reintegrating";
if (dn.get_num_ref()) {
out << " |";

View File

@ -87,6 +87,7 @@ public:
static const int STATE_PURGINGPINNED = (1<<5);
static const int STATE_BOTTOMLRU = (1<<6);
static const int STATE_UNLINKING = (1<<7);
static const int STATE_REINTEGRATING = (1<<8);
// stray dentry needs notification of releasing reference
static const int STATE_STRAY = STATE_NOTIFYREF;
static const int MASK_STATE_IMPORT_KEPT = STATE_BOTTOMLRU;
@ -100,8 +101,9 @@ public:
static const unsigned EXPORT_NONCE = 1;
const static uint64_t WAIT_UNLINK_STATE = (1<<0);
const static uint64_t WAIT_UNLINK_FINISH = (1<<1);
const static uint64_t WAIT_UNLINK_STATE = (1<<0);
const static uint64_t WAIT_UNLINK_FINISH = (1<<1);
const static uint64_t WAIT_REINTEGRATE_FINISH = (1<<2);
uint32_t replica_unlinking_ref = 0;
CDentry(std::string_view n, __u32 h,

View File

@ -11349,6 +11349,11 @@ void MDCache::handle_dentry_unlink(const cref_t<MDentryUnlink> &m)
}
ceph_assert(dnl->is_null());
dn->state_clear(CDentry::STATE_UNLINKING);
MDSContext::vec finished;
dn->take_waiting(CDentry::WAIT_UNLINK_FINISH, finished);
mds->queue_waiters(finished);
}
}

View File

@ -1178,6 +1178,7 @@ bool MDSRank::is_valid_message(const cref_t<Message> &m) {
type == CEPH_MSG_CLIENT_RECONNECT ||
type == CEPH_MSG_CLIENT_RECLAIM ||
type == CEPH_MSG_CLIENT_REQUEST ||
type == CEPH_MSG_CLIENT_REPLY ||
type == MSG_MDS_PEER_REQUEST ||
type == MSG_MDS_HEARTBEAT ||
type == MSG_MDS_TABLE_REQUEST ||
@ -1231,6 +1232,7 @@ void MDSRank::handle_message(const cref_t<Message> &m)
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
// fall-thru
case CEPH_MSG_CLIENT_REQUEST:
case CEPH_MSG_CLIENT_REPLY:
server->dispatch(m);
break;
case MSG_MDS_PEER_REQUEST:

View File

@ -151,6 +151,29 @@ class Finisher;
class ScrubStack;
class C_ExecAndReply;
struct MDSMetaRequest {
private:
int _op;
CDentry *_dentry;
ceph_tid_t _tid;
public:
explicit MDSMetaRequest(int op, CDentry *dn, ceph_tid_t tid) :
_op(op), _dentry(dn), _tid(tid) {
if (_dentry) {
_dentry->get(CDentry::PIN_PURGING);
}
}
~MDSMetaRequest() {
if (_dentry) {
_dentry->put(CDentry::PIN_PURGING);
}
}
CDentry *get_dentry() { return _dentry; }
int get_op() { return _op; }
ceph_tid_t get_tid() { return _tid; }
};
/**
* The public part of this class's interface is what's exposed to all
* the various subsystems (server, mdcache, etc), such as pointers
@ -416,6 +439,8 @@ class MDSRank {
PerfCounters *logger = nullptr, *mlogger = nullptr;
OpTracker op_tracker;
std::map<ceph_tid_t, MDSMetaRequest> internal_client_requests;
// The last different state I held before current
MDSMap::DaemonState last_state = MDSMap::STATE_BOOT;
// The state assigned to me by the MDSMap

View File

@ -31,6 +31,7 @@
#include "Mutation.h"
#include "MetricsHandler.h"
#include "cephfs_features.h"
#include "MDSContext.h"
#include "msg/Messenger.h"
@ -360,6 +361,9 @@ void Server::dispatch(const cref_t<Message> &m)
case CEPH_MSG_CLIENT_REQUEST:
handle_client_request(ref_cast<MClientRequest>(m));
return;
case CEPH_MSG_CLIENT_REPLY:
handle_client_reply(ref_cast<MClientReply>(m));
return;
case CEPH_MSG_CLIENT_RECLAIM:
handle_client_reclaim(ref_cast<MClientReclaim>(m));
return;
@ -2296,6 +2300,10 @@ void Server::reply_client_request(MDRequestRef& mdr, const ref_t<MClientReply> &
mds->send_message_client(reply, session);
}
if (client_inst.name.is_mds() && reply->get_op() == CEPH_MDS_OP_RENAME) {
mds->send_message(reply, mdr->client_request->get_connection());
}
if (req->is_queued_for_replay() &&
(mdr->has_completed || reply->get_result() < 0)) {
if (reply->get_result() < 0) {
@ -2528,6 +2536,38 @@ void Server::handle_client_request(const cref_t<MClientRequest> &req)
return;
}
void Server::handle_client_reply(const cref_t<MClientReply> &reply)
{
dout(4) << "handle_client_reply " << *reply << dendl;
ceph_assert(reply->is_safe());
ceph_tid_t tid = reply->get_tid();
if (mds->internal_client_requests.count(tid) == 0) {
dout(1) << " no pending request on tid " << tid << dendl;
return;
}
auto &req = mds->internal_client_requests.at(tid);
CDentry *dn = req.get_dentry();
switch (reply->get_op()) {
case CEPH_MDS_OP_RENAME:
if (dn) {
dn->state_clear(CDentry::STATE_REINTEGRATING);
MDSContext::vec finished;
dn->take_waiting(CDentry::WAIT_REINTEGRATE_FINISH, finished);
mds->queue_waiters(finished);
}
break;
default:
dout(5) << " unknown client op " << reply->get_op() << dendl;
}
mds->internal_client_requests.erase(tid);
}
void Server::handle_osd_map()
{
/* Note that we check the OSDMAP_FULL flag directly rather than
@ -6852,6 +6892,45 @@ void Server::wait_for_pending_unlink(CDentry *dn, MDRequestRef& mdr)
dn->add_waiter(CDentry::WAIT_UNLINK_FINISH, new C_WaitUnlinkToFinish(mdcache, dn, fin));
}
struct C_WaitReintegrateToFinish : public MDSContext {
protected:
MDCache *mdcache;
CDentry *dn;
MDSContext *fin;
MDSRank *get_mds() override
{
ceph_assert(mdcache != NULL);
return mdcache->mds;
}
public:
C_WaitReintegrateToFinish(MDCache *m, CDentry *d, MDSContext *f) :
mdcache(m), dn(d), fin(f) {}
void finish(int r) override {
fin->complete(r);
dn->put(CDentry::PIN_PURGING);
}
};
bool Server::is_reintegrate_pending(CDentry *dn)
{
CDentry::linkage_t *dnl = dn->get_projected_linkage();
if (!dnl->is_null() && dn->state_test(CDentry::STATE_REINTEGRATING)) {
return true;
}
return false;
}
void Server::wait_for_pending_reintegrate(CDentry *dn, MDRequestRef& mdr)
{
dout(20) << __func__ << " dn " << *dn << dendl;
mds->locker->drop_locks(mdr.get());
auto fin = new C_MDS_RetryRequest(mdcache, mdr);
dn->get(CDentry::PIN_PURGING);
dn->add_waiter(CDentry::WAIT_REINTEGRATE_FINISH, new C_WaitReintegrateToFinish(mdcache, dn, fin));
}
// MKNOD
class C_MDS_mknod_finish : public ServerLogContext {
@ -7922,6 +8001,11 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
if (!dn)
return;
if (is_reintegrate_pending(dn)) {
wait_for_pending_reintegrate(dn, mdr);
return;
}
// notify replica MDSes the dentry is under unlink
if (!dn->state_test(CDentry::STATE_UNLINKING)) {
dn->state_set(CDentry::STATE_UNLINKING);

View File

@ -159,6 +159,7 @@ public:
// -- requests --
void handle_client_request(const cref_t<MClientRequest> &m);
void handle_client_reply(const cref_t<MClientReply> &m);
void journal_and_reply(MDRequestRef& mdr, CInode *tracei, CDentry *tracedn,
LogEvent *le, MDSLogContextBase *fin);
@ -239,6 +240,9 @@ public:
bool is_unlink_pending(CDentry *dn);
void wait_for_pending_unlink(CDentry *dn, MDRequestRef& mdr);
bool is_reintegrate_pending(CDentry *dn);
void wait_for_pending_reintegrate(CDentry *dn, MDRequestRef& mdr);
// open
void handle_client_open(MDRequestRef& mdr);
void handle_client_openc(MDRequestRef& mdr); // O_CREAT variant.

View File

@ -687,19 +687,27 @@ void StrayManager::reintegrate_stray(CDentry *straydn, CDentry *rdn)
dout(10) << __func__ << " " << *straydn << " to " << *rdn << dendl;
logger->inc(l_mdc_strays_reintegrated);
// rename it to remote linkage .
filepath src(straydn->get_name(), straydn->get_dir()->ino());
filepath dst(rdn->get_name(), rdn->get_dir()->ino());
ceph_tid_t tid = mds->issue_tid();
auto req = make_message<MClientRequest>(CEPH_MDS_OP_RENAME);
req->set_filepath(dst);
req->set_filepath2(src);
req->set_tid(mds->issue_tid());
req->set_tid(tid);
rdn->state_set(CDentry::STATE_REINTEGRATING);
mds->internal_client_requests.emplace(std::piecewise_construct,
std::make_tuple(tid),
std::make_tuple(CEPH_MDS_OP_RENAME,
rdn, tid));
mds->send_message_mds(req, rdn->authority().first);
}
void StrayManager::migrate_stray(CDentry *dn, mds_rank_t to)
{
dout(10) << __func__ << " " << *dn << " to mds." << to << dendl;
@ -713,10 +721,17 @@ void StrayManager::migrate_stray(CDentry *dn, mds_rank_t to)
filepath src(dn->get_name(), dirino);
filepath dst(dn->get_name(), MDS_INO_STRAY(to, MDS_INO_STRAY_INDEX(dirino)));
ceph_tid_t tid = mds->issue_tid();
auto req = make_message<MClientRequest>(CEPH_MDS_OP_RENAME);
req->set_filepath(dst);
req->set_filepath2(src);
req->set_tid(mds->issue_tid());
req->set_tid(tid);
mds->internal_client_requests.emplace(std::piecewise_construct,
std::make_tuple(tid),
std::make_tuple(CEPH_MDS_OP_RENAME,
nullptr, tid));
mds->send_message_mds(req, to);
}

View File

@ -312,7 +312,7 @@ public:
} __attribute__ ((__may_alias__));
WRITE_CLASS_ENCODER(openc_response_t)
class MClientReply final : public SafeMessage {
class MClientReply final : public MMDSOp {
public:
// reply data
struct ceph_mds_reply_head head {};
@ -347,9 +347,9 @@ public:
bool is_safe() const { return head.safe; }
protected:
MClientReply() : SafeMessage{CEPH_MSG_CLIENT_REPLY} {}
MClientReply() : MMDSOp{CEPH_MSG_CLIENT_REPLY} {}
MClientReply(const MClientRequest &req, int result = 0) :
SafeMessage{CEPH_MSG_CLIENT_REPLY} {
MMDSOp{CEPH_MSG_CLIENT_REPLY} {
memset(&head, 0, sizeof(head));
header.tid = req.get_tid();
head.op = req.get_op();