mirror of
https://github.com/ceph/ceph
synced 2025-01-18 00:43:38 +00:00
Merge PR #36912 into master
* refs/pull/36912/head: mds: defer encoding and storing the inode backtrace mds: defer encoding and storing the CDir dentries mds: add error handler with lock support Reviewed-by: Zheng Yan <zyan@redhat.com>
This commit is contained in:
commit
13a892d8df
333
src/mds/CDir.cc
333
src/mds/CDir.cc
@ -13,6 +13,7 @@
|
||||
*/
|
||||
|
||||
#include <string_view>
|
||||
#include <algorithm>
|
||||
|
||||
#include "include/types.h"
|
||||
|
||||
@ -1748,7 +1749,7 @@ CDentry *CDir::_load_dentry(
|
||||
stale = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* look for existing dentry for _last_ snap, because unlink +
|
||||
* create may leave a "hole" (epochs during which the dentry
|
||||
@ -1791,7 +1792,7 @@ CDentry *CDir::_load_dentry(
|
||||
} else {
|
||||
// (remote) link
|
||||
dn = add_remote_dentry(dname, ino, d_type, first, last);
|
||||
|
||||
|
||||
// link to inode?
|
||||
CInode *in = mdcache->get_inode(ino); // we may or may not have it.
|
||||
if (in) {
|
||||
@ -1801,14 +1802,14 @@ CDentry *CDir::_load_dentry(
|
||||
dout(12) << "_fetched got remote link " << ino << " (don't have it)" << dendl;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (type == 'I') {
|
||||
// inode
|
||||
|
||||
|
||||
// Load inode data before looking up or constructing CInode
|
||||
InodeStore inode_data;
|
||||
inode_data.decode_bare(q);
|
||||
|
||||
|
||||
if (stale) {
|
||||
if (!dn) {
|
||||
stale_items.insert(mempool::mds_co::string(key));
|
||||
@ -2160,10 +2161,175 @@ public:
|
||||
dir->_committed(r, version);
|
||||
}
|
||||
void print(ostream& out) const override {
|
||||
out << "dirfrag_commit(" << dir->dirfrag() << ")";
|
||||
out << "dirfrag_committed(" << dir->dirfrag() << ")";
|
||||
}
|
||||
};
|
||||
|
||||
class C_IO_Dir_Commit_Ops : public Context {
|
||||
public:
|
||||
C_IO_Dir_Commit_Ops(CDir *d, int pr, bufferlist &&bl,
|
||||
vector<dentry_key_t> &&r, vector<CDir::dentry_commit_item> &&s,
|
||||
mempool::mds_co::compact_set<mempool::mds_co::string> &&stale) :
|
||||
dir(d), op_prio(pr) {
|
||||
version = dir->get_version();
|
||||
is_new = dir->is_new();
|
||||
dfts.swap(bl);
|
||||
to_remove.swap(r);
|
||||
to_set.swap(s);
|
||||
stale_items.swap(stale);
|
||||
}
|
||||
|
||||
void finish(int r) override {
|
||||
dir->_omap_commit_ops(r, op_prio, version, is_new, dfts, to_remove, to_set,
|
||||
stale_items);
|
||||
}
|
||||
|
||||
private:
|
||||
CDir *dir;
|
||||
version_t version;
|
||||
int op_prio;
|
||||
bool is_new;
|
||||
bufferlist dfts;
|
||||
vector<dentry_key_t> to_remove;
|
||||
vector<CDir::dentry_commit_item> to_set;
|
||||
mempool::mds_co::compact_set<mempool::mds_co::string> stale_items;
|
||||
};
|
||||
|
||||
// This is not locked by mds_lock
|
||||
void CDir::_omap_commit_ops(int r, int op_prio, version_t version, bool _new, bufferlist &dfts,
|
||||
vector<dentry_key_t>& to_remove, vector<dentry_commit_item> &to_set,
|
||||
mempool::mds_co::compact_set<mempool::mds_co::string> &stales)
|
||||
{
|
||||
dout(10) << __func__ << dendl;
|
||||
|
||||
if (r < 0) {
|
||||
mdcache->mds->handle_write_error_with_lock(r);
|
||||
return;
|
||||
}
|
||||
|
||||
C_GatherBuilder gather(g_ceph_context,
|
||||
new C_OnFinisher(new C_IO_Dir_Committed(this, version),
|
||||
mdcache->mds->finisher));
|
||||
|
||||
SnapContext snapc;
|
||||
object_t oid = get_ondisk_object();
|
||||
object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
|
||||
|
||||
map<string, bufferlist> _set;
|
||||
set<string> _rm;
|
||||
|
||||
unsigned max_write_size = mdcache->max_dir_commit_size;
|
||||
unsigned write_size = 0;
|
||||
|
||||
auto commit_one = [&](bool header=false) {
|
||||
ObjectOperation op;
|
||||
|
||||
// don't create new dirfrag blindly
|
||||
if (!_new)
|
||||
op.stat(nullptr, nullptr, nullptr);
|
||||
|
||||
/*
|
||||
* save the header at the last moment.. If we were to send it off before
|
||||
* other updates, but die before sending them all, we'd think that the
|
||||
* on-disk state was fully committed even though it wasn't! However, since
|
||||
* the messages are strictly ordered between the MDS and the OSD, and
|
||||
* since messages to a given PG are strictly ordered, if we simply send
|
||||
* the message containing the header off last, we cannot get our header
|
||||
* into an incorrect state.
|
||||
*/
|
||||
if (header) {
|
||||
bufferlist header;
|
||||
encode(*fnode, header);
|
||||
op.omap_set_header(header);
|
||||
}
|
||||
|
||||
op.priority = op_prio;
|
||||
if (!_set.empty())
|
||||
op.omap_set(_set);
|
||||
if (!_rm.empty())
|
||||
op.omap_rm_keys(_rm);
|
||||
mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
|
||||
ceph::real_clock::now(),
|
||||
0, gather.new_sub());
|
||||
write_size = 0;
|
||||
_set.clear();
|
||||
_rm.clear();
|
||||
};
|
||||
|
||||
for (auto &key : stales) {
|
||||
write_size += key.length();
|
||||
_rm.emplace(key);
|
||||
|
||||
if (write_size >= max_write_size)
|
||||
commit_one();
|
||||
}
|
||||
|
||||
for (auto &k : to_remove) {
|
||||
string key;
|
||||
k.encode(key);
|
||||
write_size += key.length();
|
||||
_rm.emplace(std::move(key));
|
||||
|
||||
if (write_size >= max_write_size)
|
||||
commit_one();
|
||||
}
|
||||
|
||||
uint64_t off = 0;
|
||||
bufferlist bl;
|
||||
using ceph::encode;
|
||||
for (auto &item : to_set) {
|
||||
string key;
|
||||
item.key.encode(key);
|
||||
|
||||
encode(item.first, bl);
|
||||
if (item.is_remote) {
|
||||
bl.append('L'); // remote link
|
||||
encode(item.ino, bl);
|
||||
encode(item.d_type, bl);
|
||||
} else {
|
||||
bl.append('I'); // inode
|
||||
|
||||
encode(*item.inode, bl, item.features);
|
||||
|
||||
if (!item.symlink.empty())
|
||||
encode(item.symlink, bl);
|
||||
|
||||
// dirfragtree
|
||||
dfts.splice(0, item.dft_len, &bl);
|
||||
|
||||
if (item.xattrs)
|
||||
encode(*item.xattrs, bl);
|
||||
else
|
||||
encode((__u32)0, bl);
|
||||
|
||||
if (item.snaprealm) {
|
||||
bufferlist snapr_bl;
|
||||
encode(item.srnode, snapr_bl);
|
||||
encode(snapr_bl, bl);
|
||||
} else {
|
||||
encode(bufferlist(), bl);
|
||||
}
|
||||
|
||||
if (item.old_inodes)
|
||||
encode(*item.old_inodes, bl, item.features);
|
||||
else
|
||||
encode((__u32)0, bl);
|
||||
|
||||
encode(item.oldest_snap, bl);
|
||||
encode(item.damage_flags, bl);
|
||||
}
|
||||
off += item.dft_len;
|
||||
|
||||
write_size += key.length() + bl.length();
|
||||
_set[std::move(key)].swap(bl);
|
||||
if (write_size >= max_write_size)
|
||||
commit_one();
|
||||
}
|
||||
|
||||
commit_one(true);
|
||||
gather.activate();
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush out the modified dentries in this dir. Keep the bufferlist
|
||||
* below max_write_size;
|
||||
@ -2172,9 +2338,6 @@ void CDir::_omap_commit(int op_prio)
|
||||
{
|
||||
dout(10) << __func__ << dendl;
|
||||
|
||||
unsigned max_write_size = mdcache->max_dir_commit_size;
|
||||
unsigned write_size = 0;
|
||||
|
||||
if (op_prio < 0)
|
||||
op_prio = CEPH_MSG_PRIO_DEFAULT;
|
||||
|
||||
@ -2191,70 +2354,51 @@ void CDir::_omap_commit(int op_prio)
|
||||
// fnode.snap_purged_thru = realm->get_last_destroyed();
|
||||
}
|
||||
|
||||
set<string> to_remove;
|
||||
map<string, bufferlist> to_set;
|
||||
|
||||
C_GatherBuilder gather(g_ceph_context,
|
||||
new C_OnFinisher(new C_IO_Dir_Committed(this,
|
||||
get_version()),
|
||||
mdcache->mds->finisher));
|
||||
|
||||
SnapContext snapc;
|
||||
object_t oid = get_ondisk_object();
|
||||
object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
|
||||
|
||||
if (!stale_items.empty()) {
|
||||
for (const auto &p : stale_items) {
|
||||
to_remove.insert(std::string(p));
|
||||
write_size += p.length();
|
||||
}
|
||||
stale_items.clear();
|
||||
size_t count = 0;
|
||||
if (state_test(CDir::STATE_FRAGMENTING) && is_new()) {
|
||||
count = get_num_head_items() && get_num_snap_items();
|
||||
} else {
|
||||
for (elist<CDentry*>::iterator it = dirty_dentries.begin(); !it.end(); ++it)
|
||||
++count;
|
||||
}
|
||||
|
||||
vector<dentry_key_t> to_remove;
|
||||
// reverve enough memories, which maybe larger than the actually needed
|
||||
to_remove.reserve(count);
|
||||
|
||||
vector<dentry_commit_item> to_set;
|
||||
// reverve enough memories, which maybe larger than the actually needed
|
||||
to_set.reserve(count);
|
||||
|
||||
bufferlist dfts(CEPH_PAGE_SIZE);
|
||||
|
||||
auto write_one = [&](CDentry *dn) {
|
||||
string key;
|
||||
dn->key().encode(key);
|
||||
auto key = dn->key();
|
||||
|
||||
if (dn->last != CEPH_NOSNAP &&
|
||||
snaps && try_trim_snap_dentry(dn, *snaps)) {
|
||||
dout(10) << " rm " << key << dendl;
|
||||
write_size += key.length();
|
||||
to_remove.insert(key);
|
||||
to_remove.push_back(key);
|
||||
return;
|
||||
}
|
||||
|
||||
if (dn->get_linkage()->is_null()) {
|
||||
dout(10) << " rm " << dn->get_name() << " " << *dn << dendl;
|
||||
write_size += key.length();
|
||||
to_remove.insert(key);
|
||||
to_remove.push_back(key);
|
||||
} else {
|
||||
dout(10) << " set " << dn->get_name() << " " << *dn << dendl;
|
||||
bufferlist dnbl;
|
||||
_encode_dentry(dn, dnbl, snaps);
|
||||
write_size += key.length() + dnbl.length();
|
||||
to_set[key].swap(dnbl);
|
||||
}
|
||||
|
||||
if (write_size >= max_write_size) {
|
||||
ObjectOperation op;
|
||||
op.priority = op_prio;
|
||||
uint64_t off = dfts.length();
|
||||
// try to reserve new size if there has less
|
||||
// than 1/8 page space
|
||||
uint64_t left = CEPH_PAGE_SIZE - off % CEPH_PAGE_SIZE;
|
||||
if (left < CEPH_PAGE_SIZE / 8)
|
||||
dfts.reserve(left + CEPH_PAGE_SIZE);
|
||||
|
||||
// don't create new dirfrag blindly
|
||||
if (!is_new())
|
||||
op.stat(nullptr, nullptr, nullptr);
|
||||
|
||||
if (!to_set.empty())
|
||||
op.omap_set(to_set);
|
||||
if (!to_remove.empty())
|
||||
op.omap_rm_keys(to_remove);
|
||||
|
||||
mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
|
||||
ceph::real_clock::now(),
|
||||
0, gather.new_sub());
|
||||
|
||||
write_size = 0;
|
||||
to_set.clear();
|
||||
to_remove.clear();
|
||||
auto& item = to_set.emplace_back();
|
||||
item.key = key;
|
||||
_parse_dentry(dn, item, snaps, dfts);
|
||||
item.dft_len = dfts.length() - off;
|
||||
}
|
||||
};
|
||||
|
||||
@ -2275,64 +2419,34 @@ void CDir::_omap_commit(int op_prio)
|
||||
}
|
||||
}
|
||||
|
||||
ObjectOperation op;
|
||||
op.priority = op_prio;
|
||||
|
||||
// don't create new dirfrag blindly
|
||||
if (!is_new())
|
||||
op.stat(nullptr, nullptr, nullptr);
|
||||
|
||||
/*
|
||||
* save the header at the last moment.. If we were to send it off before other
|
||||
* updates, but die before sending them all, we'd think that the on-disk state
|
||||
* was fully committed even though it wasn't! However, since the messages are
|
||||
* strictly ordered between the MDS and the OSD, and since messages to a given
|
||||
* PG are strictly ordered, if we simply send the message containing the header
|
||||
* off last, we cannot get our header into an incorrect state.
|
||||
*/
|
||||
bufferlist header;
|
||||
encode(*fnode, header);
|
||||
op.omap_set_header(header);
|
||||
|
||||
if (!to_set.empty())
|
||||
op.omap_set(to_set);
|
||||
if (!to_remove.empty())
|
||||
op.omap_rm_keys(to_remove);
|
||||
|
||||
mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
|
||||
ceph::real_clock::now(),
|
||||
0, gather.new_sub());
|
||||
|
||||
gather.activate();
|
||||
auto c = new C_IO_Dir_Commit_Ops(this, op_prio, std::move(dfts),
|
||||
std::move(to_remove), std::move(to_set),
|
||||
std::move(stale_items));
|
||||
stale_items.clear();
|
||||
mdcache->mds->finisher->queue(c);
|
||||
}
|
||||
|
||||
void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
|
||||
const set<snapid_t> *snaps)
|
||||
void CDir::_parse_dentry(CDentry *dn, dentry_commit_item &item,
|
||||
const set<snapid_t> *snaps, bufferlist &bl)
|
||||
{
|
||||
// clear dentry NEW flag, if any. we can no longer silently drop it.
|
||||
dn->clear_new();
|
||||
|
||||
encode(dn->first, bl);
|
||||
item.first = dn->first;
|
||||
|
||||
// primary or remote?
|
||||
if (dn->linkage.is_remote()) {
|
||||
inodeno_t ino = dn->linkage.get_remote_ino();
|
||||
unsigned char d_type = dn->linkage.get_remote_d_type();
|
||||
dout(14) << " pos " << bl.length() << " dn '" << dn->get_name() << "' remote ino " << ino << dendl;
|
||||
|
||||
// marker, name, ino
|
||||
bl.append('L'); // remote link
|
||||
encode(ino, bl);
|
||||
encode(d_type, bl);
|
||||
item.is_remote = true;
|
||||
item.ino = dn->linkage.get_remote_ino();
|
||||
item.d_type = dn->linkage.get_remote_d_type();
|
||||
dout(14) << " dn '" << dn->get_name() << "' remote ino " << item.ino << dendl;
|
||||
} else if (dn->linkage.is_primary()) {
|
||||
// primary link
|
||||
CInode *in = dn->linkage.get_inode();
|
||||
ceph_assert(in);
|
||||
|
||||
dout(14) << " pos " << bl.length() << " dn '" << dn->get_name() << "' inode " << *in << dendl;
|
||||
|
||||
|
||||
dout(14) << " dn '" << dn->get_name() << "' inode " << *in << dendl;
|
||||
// marker, name, inode, [symlink string]
|
||||
bl.append('I'); // inode
|
||||
|
||||
if (in->is_multiversion()) {
|
||||
if (!in->snaprealm) {
|
||||
@ -2343,9 +2457,20 @@ void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
|
||||
}
|
||||
}
|
||||
|
||||
bufferlist snap_blob;
|
||||
in->encode_snap_blob(snap_blob);
|
||||
in->encode_bare(bl, mdcache->mds->mdsmap->get_up_features(), &snap_blob);
|
||||
if (in->snaprealm) {
|
||||
item.snaprealm = true;
|
||||
item.srnode = in->snaprealm->srnode;
|
||||
}
|
||||
item.features = mdcache->mds->mdsmap->get_up_features();
|
||||
item.inode = in->inode;
|
||||
if (in->inode->is_symlink())
|
||||
item.symlink = in->symlink;
|
||||
using ceph::encode;
|
||||
encode(in->dirfragtree, bl);
|
||||
item.xattrs = in->xattrs;
|
||||
item.old_inodes = in->old_inodes;
|
||||
item.oldest_snap = in->oldest_snap;
|
||||
item.damage_flags = in->damage_flags;
|
||||
} else {
|
||||
ceph_assert(!dn->linkage.is_null());
|
||||
}
|
||||
|
@ -39,8 +39,6 @@
|
||||
class CDentry;
|
||||
class MDCache;
|
||||
|
||||
struct ObjectOperation;
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const class CDir& dir);
|
||||
|
||||
class CDir : public MDSCacheObject, public Counter<CDir> {
|
||||
@ -59,6 +57,27 @@ public:
|
||||
return std::allocate_shared<fnode_t>(allocator, std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
struct dentry_commit_item {
|
||||
dentry_key_t key;
|
||||
snapid_t first;
|
||||
bool is_remote = false;
|
||||
|
||||
inodeno_t ino;
|
||||
unsigned char d_type;
|
||||
|
||||
bool snaprealm = false;
|
||||
sr_t srnode;
|
||||
|
||||
mempool::mds_co::string symlink;
|
||||
uint64_t features;
|
||||
uint64_t dft_len;
|
||||
CInode::inode_const_ptr inode;
|
||||
CInode::xattr_map_const_ptr xattrs;
|
||||
CInode::old_inode_map_const_ptr old_inodes;
|
||||
snapid_t oldest_snap;
|
||||
damage_flags_t damage_flags;
|
||||
};
|
||||
|
||||
// -- freezing --
|
||||
struct freeze_tree_state_t {
|
||||
CDir *dir; // freezing/frozen tree root
|
||||
@ -661,6 +680,7 @@ protected:
|
||||
friend class C_IO_Dir_OMAP_Fetched;
|
||||
friend class C_IO_Dir_OMAP_FetchedMore;
|
||||
friend class C_IO_Dir_Committed;
|
||||
friend class C_IO_Dir_Commit_Ops;
|
||||
|
||||
void _omap_fetch(MDSContext *fin, const std::set<dentry_key_t>& keys);
|
||||
void _omap_fetch_more(
|
||||
@ -691,8 +711,12 @@ protected:
|
||||
|
||||
// -- commit --
|
||||
void _commit(version_t want, int op_prio);
|
||||
void _omap_commit_ops(int r, int op_prio, version_t version, bool _new, bufferlist &bl,
|
||||
vector<dentry_key_t> &to_remove, vector<dentry_commit_item> &to_set,
|
||||
mempool::mds_co::compact_set<mempool::mds_co::string> &_stale);
|
||||
void _omap_commit(int op_prio);
|
||||
void _encode_dentry(CDentry *dn, ceph::buffer::list& bl, const std::set<snapid_t> *snaps);
|
||||
void _parse_dentry(CDentry *dn, dentry_commit_item &item,
|
||||
const set<snapid_t> *snaps, bufferlist &bl);
|
||||
void _committed(int r, version_t v);
|
||||
|
||||
static fnode_const_ptr empty_fnode;
|
||||
|
@ -45,12 +45,31 @@
|
||||
#include "mds/MDSContinuation.h"
|
||||
#include "mds/InoTable.h"
|
||||
#include "cephfs_features.h"
|
||||
#include "osdc/Objecter.h"
|
||||
|
||||
#define dout_context g_ceph_context
|
||||
#define dout_subsys ceph_subsys_mds
|
||||
#undef dout_prefix
|
||||
#define dout_prefix *_dout << "mds." << mdcache->mds->get_nodeid() << ".cache.ino(" << ino() << ") "
|
||||
|
||||
void CInodeCommitOperation::update(ObjectOperation &op, inode_backtrace_t *bt) {
|
||||
using ceph::encode;
|
||||
|
||||
op.priority = priority;
|
||||
op.create(false);
|
||||
|
||||
bufferlist parent_bl;
|
||||
encode(*bt, parent_bl);
|
||||
op.setxattr("parent", parent_bl);
|
||||
|
||||
// for the old pool there is no need to update the layout
|
||||
if (!update_layout)
|
||||
return;
|
||||
|
||||
bufferlist layout_bl;
|
||||
encode(_layout, layout_bl, _features);
|
||||
op.setxattr("layout", layout_bl);
|
||||
}
|
||||
|
||||
class CInodeIOContext : public MDSIOContextBase
|
||||
{
|
||||
@ -1330,7 +1349,53 @@ struct C_IO_Inode_StoredBacktrace : public CInodeIOContext {
|
||||
}
|
||||
};
|
||||
|
||||
void CInode::store_backtrace(MDSContext *fin, int op_prio)
|
||||
struct C_IO_Inode_CommitBacktrace : public Context {
|
||||
CInode *in;
|
||||
version_t version;
|
||||
MDSContext *fin;
|
||||
std::vector<CInodeCommitOperation> ops_vec;
|
||||
inode_backtrace_t bt;
|
||||
|
||||
C_IO_Inode_CommitBacktrace(CInode *i, version_t v, MDSContext *f) :
|
||||
in(i), version(v), fin(f) { }
|
||||
void finish(int r) override {
|
||||
in->_commit_ops(r, version, fin, ops_vec, &bt);
|
||||
}
|
||||
};
|
||||
|
||||
void CInode::_commit_ops(int r, version_t version, MDSContext *fin,
|
||||
std::vector<CInodeCommitOperation> &ops_vec,
|
||||
inode_backtrace_t *bt)
|
||||
{
|
||||
dout(10) << __func__ << dendl;
|
||||
|
||||
if (r < 0) {
|
||||
mdcache->mds->handle_write_error_with_lock(r);
|
||||
return;
|
||||
}
|
||||
|
||||
C_GatherBuilder gather(g_ceph_context,
|
||||
new C_OnFinisher(new C_IO_Inode_StoredBacktrace(this,
|
||||
version,
|
||||
fin),
|
||||
mdcache->mds->finisher));
|
||||
|
||||
SnapContext snapc;
|
||||
object_t oid = get_object_name(ino(), frag_t(), "");
|
||||
|
||||
for (auto &op : ops_vec) {
|
||||
ObjectOperation obj_op;
|
||||
object_locator_t oloc(op.get_pool());
|
||||
op.update(obj_op, bt);
|
||||
mdcache->mds->objecter->mutate(oid, oloc, obj_op, snapc,
|
||||
ceph::real_clock::now(),
|
||||
0, gather.new_sub());
|
||||
}
|
||||
gather.activate();
|
||||
}
|
||||
|
||||
void CInode::_store_backtrace(std::vector<CInodeCommitOperation> &ops_vec,
|
||||
inode_backtrace_t &bt, int op_prio)
|
||||
{
|
||||
dout(10) << __func__ << " on " << *this << dendl;
|
||||
ceph_assert(is_dirty_parent());
|
||||
@ -1341,41 +1406,16 @@ void CInode::store_backtrace(MDSContext *fin, int op_prio)
|
||||
auth_pin(this);
|
||||
|
||||
const int64_t pool = get_backtrace_pool();
|
||||
inode_backtrace_t bt;
|
||||
build_backtrace(pool, bt);
|
||||
bufferlist parent_bl;
|
||||
using ceph::encode;
|
||||
encode(bt, parent_bl);
|
||||
|
||||
ObjectOperation op;
|
||||
op.priority = op_prio;
|
||||
op.create(false);
|
||||
op.setxattr("parent", parent_bl);
|
||||
|
||||
bufferlist layout_bl;
|
||||
encode(get_inode()->layout, layout_bl, mdcache->mds->mdsmap->get_up_features());
|
||||
op.setxattr("layout", layout_bl);
|
||||
|
||||
SnapContext snapc;
|
||||
object_t oid = get_object_name(ino(), frag_t(), "");
|
||||
object_locator_t oloc(pool);
|
||||
Context *fin2 = new C_OnFinisher(
|
||||
new C_IO_Inode_StoredBacktrace(this, get_inode()->backtrace_version, fin),
|
||||
mdcache->mds->finisher);
|
||||
ops_vec.emplace_back(op_prio, pool, get_inode()->layout,
|
||||
mdcache->mds->mdsmap->get_up_features());
|
||||
|
||||
if (!state_test(STATE_DIRTYPOOL) || get_inode()->old_pools.empty()) {
|
||||
dout(20) << __func__ << ": no dirtypool or no old pools" << dendl;
|
||||
mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
|
||||
ceph::real_clock::now(),
|
||||
0, fin2);
|
||||
return;
|
||||
}
|
||||
|
||||
C_GatherBuilder gather(g_ceph_context, fin2);
|
||||
mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
|
||||
ceph::real_clock::now(),
|
||||
0, gather.new_sub());
|
||||
|
||||
// In the case where DIRTYPOOL is set, we update all old pools backtraces
|
||||
// such that anyone reading them will see the new pool ID in
|
||||
// inode_backtrace_t::pool and go read everything else from there.
|
||||
@ -1385,17 +1425,26 @@ void CInode::store_backtrace(MDSContext *fin, int op_prio)
|
||||
|
||||
dout(20) << __func__ << ": updating old pool " << p << dendl;
|
||||
|
||||
ObjectOperation op;
|
||||
op.priority = op_prio;
|
||||
op.create(false);
|
||||
op.setxattr("parent", parent_bl);
|
||||
|
||||
object_locator_t oloc(p);
|
||||
mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
|
||||
ceph::real_clock::now(),
|
||||
0, gather.new_sub());
|
||||
ops_vec.emplace_back(op_prio, p);
|
||||
}
|
||||
gather.activate();
|
||||
}
|
||||
|
||||
void CInode::store_backtrace(MDSContext *fin, int op_prio)
|
||||
{
|
||||
std::vector<CInodeCommitOperation> ops_vec;
|
||||
auto version = get_inode()->backtrace_version;
|
||||
|
||||
auto c = new C_IO_Inode_CommitBacktrace(this, version, fin);
|
||||
_store_backtrace(c->ops_vec, c->bt, op_prio);
|
||||
mdcache->mds->finisher->queue(c);
|
||||
}
|
||||
|
||||
void CInode::store_backtrace(CInodeCommitOperations &op, int op_prio)
|
||||
{
|
||||
op.version = get_inode()->backtrace_version;
|
||||
op.in = this;
|
||||
|
||||
_store_backtrace(op.ops_vec, op.bt, op_prio);
|
||||
}
|
||||
|
||||
void CInode::_stored_backtrace(int r, version_t v, Context *fin)
|
||||
|
@ -60,6 +60,34 @@ struct cinode_lock_info_t {
|
||||
int wr_caps;
|
||||
};
|
||||
|
||||
struct CInodeCommitOperation {
|
||||
public:
|
||||
CInodeCommitOperation(int prio, int64_t po)
|
||||
: pool(po), priority(prio) {
|
||||
}
|
||||
CInodeCommitOperation(int prio, int64_t po, file_layout_t l, uint64_t f)
|
||||
: pool(po), priority(prio), _layout(l), _features(f) {
|
||||
update_layout = true;
|
||||
}
|
||||
|
||||
void update(ObjectOperation &op, inode_backtrace_t *bt);
|
||||
int64_t get_pool() { return pool; }
|
||||
|
||||
private:
|
||||
int64_t pool; ///< pool id
|
||||
int priority;
|
||||
bool update_layout = false;
|
||||
file_layout_t _layout;
|
||||
uint64_t _features;
|
||||
};
|
||||
|
||||
struct CInodeCommitOperations {
|
||||
std::vector<CInodeCommitOperation> ops_vec;
|
||||
inode_backtrace_t bt;
|
||||
version_t version;
|
||||
CInode *in;
|
||||
};
|
||||
|
||||
/**
|
||||
* Base class for CInode, containing the backing store data and
|
||||
* serialization methods. This exists so that we can read and
|
||||
@ -763,7 +791,13 @@ class CInode : public MDSCacheObject, public InodeStoreBase, public Counter<CIno
|
||||
void fetch(MDSContext *fin);
|
||||
void _fetched(ceph::buffer::list& bl, ceph::buffer::list& bl2, Context *fin);
|
||||
|
||||
void _commit_ops(int r, version_t version, MDSContext *fin,
|
||||
std::vector<CInodeCommitOperation> &ops_vec,
|
||||
inode_backtrace_t *bt);
|
||||
void build_backtrace(int64_t pool, inode_backtrace_t& bt);
|
||||
void _store_backtrace(std::vector<CInodeCommitOperation> &ops_vec,
|
||||
inode_backtrace_t &bt, int op_prio);
|
||||
void store_backtrace(CInodeCommitOperations &op, int op_prio);
|
||||
void store_backtrace(MDSContext *fin, int op_prio=-1);
|
||||
void _stored_backtrace(int r, version_t v, Context *fin);
|
||||
void fetch_backtrace(Context *fin, ceph::buffer::list *backtrace);
|
||||
|
@ -968,6 +968,12 @@ void MDSRank::handle_write_error(int err)
|
||||
}
|
||||
}
|
||||
|
||||
void MDSRank::handle_write_error_with_lock(int err)
|
||||
{
|
||||
std::scoped_lock l(mds_lock);
|
||||
handle_write_error(err);
|
||||
}
|
||||
|
||||
void *MDSRank::ProgressThread::entry()
|
||||
{
|
||||
std::unique_lock l(mds->mds_lock);
|
||||
|
@ -215,6 +215,7 @@ class MDSRank {
|
||||
}
|
||||
|
||||
void handle_write_error(int err);
|
||||
void handle_write_error_with_lock(int err);
|
||||
|
||||
void update_mlogger();
|
||||
|
||||
|
@ -62,6 +62,40 @@
|
||||
// -----------------------
|
||||
// LogSegment
|
||||
|
||||
struct BatchStoredBacktrace : public MDSContext {
|
||||
MDSContext *fin;
|
||||
MDSRank *mds;
|
||||
|
||||
BatchStoredBacktrace(MDSContext *f, MDSRank *m) : fin(f), mds(m) {}
|
||||
void finish(int r) override {
|
||||
fin->complete(r);
|
||||
}
|
||||
MDSRank *get_mds() override { return mds; };
|
||||
};
|
||||
|
||||
struct BatchCommitBacktrace : public Context {
|
||||
std::vector<CInodeCommitOperations> ops_vec;
|
||||
MDSContext *con;
|
||||
MDSRank *mds;
|
||||
|
||||
BatchCommitBacktrace(std::vector<CInodeCommitOperations> &ops, MDSContext *c,
|
||||
MDSRank *m) : con(c), mds(m) {
|
||||
ops_vec.swap(ops);
|
||||
}
|
||||
void finish(int r) override {
|
||||
MDSGatherBuilder gather(g_ceph_context);
|
||||
|
||||
for (auto &op : ops_vec) {
|
||||
op.in->_commit_ops(r, op.version, gather.new_sub(), op.ops_vec, &op.bt);
|
||||
}
|
||||
if (gather.has_subs()) {
|
||||
gather.set_finisher(new BatchStoredBacktrace(con, mds));
|
||||
std::scoped_lock l(mds->mds_lock);
|
||||
gather.activate();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
void LogSegment::try_to_expire(MDSRank *mds, MDSGatherBuilder &gather_bld, int op_prio)
|
||||
{
|
||||
set<CDir*> commit;
|
||||
@ -187,18 +221,27 @@ void LogSegment::try_to_expire(MDSRank *mds, MDSGatherBuilder &gather_bld, int o
|
||||
|
||||
ceph_assert(g_conf()->mds_kill_journal_expire_at != 3);
|
||||
|
||||
size_t count = 0;
|
||||
for (elist<CInode*>::iterator it = dirty_parent_inodes.begin(); !it.end(); ++it)
|
||||
count++;
|
||||
|
||||
std::vector<CInodeCommitOperations> ops_vec;
|
||||
ops_vec.reserve(count);
|
||||
// backtraces to be stored/updated
|
||||
for (elist<CInode*>::iterator p = dirty_parent_inodes.begin(); !p.end(); ++p) {
|
||||
CInode *in = *p;
|
||||
ceph_assert(in->is_auth());
|
||||
if (in->can_auth_pin()) {
|
||||
dout(15) << "try_to_expire waiting for storing backtrace on " << *in << dendl;
|
||||
in->store_backtrace(gather_bld.new_sub(), op_prio);
|
||||
ops_vec.resize(ops_vec.size() + 1);
|
||||
in->store_backtrace(ops_vec.back(), op_prio);
|
||||
} else {
|
||||
dout(15) << "try_to_expire waiting for unfreeze on " << *in << dendl;
|
||||
in->add_waiter(CInode::WAIT_UNFREEZE, gather_bld.new_sub());
|
||||
}
|
||||
}
|
||||
if (!ops_vec.empty())
|
||||
mds->finisher->queue(new BatchCommitBacktrace(ops_vec, gather_bld.new_sub(), mds));
|
||||
|
||||
ceph_assert(g_conf()->mds_kill_journal_expire_at != 4);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user