mds: catch damage to dentry's first field

When possible. Abort the MDS before it can be written to the
journal/directory.

This is part of a series to address corruption first observed in [1].
How the corruption is introduced is yet unknown.

[1] https://tracker.ceph.com/issues/38452#note-10

Fixes: http://tracker.ceph.com/issues/58482
Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
This commit is contained in:
Patrick Donnelly 2023-01-17 21:29:39 -05:00
parent c9d36056c7
commit 03407528f9
No known key found for this signature in database
GPG Key ID: BE69BB7D36E459B4
7 changed files with 56 additions and 14 deletions

View File

@ -952,6 +952,13 @@ options:
- mds
fmt_desc: The debug subtree invariants (for developers only).
with_legacy: true
- name: mds_abort_on_newly_corrupt_dentry
type: bool
level: advanced
default: true
services:
- mds
fmt_desc: MDS will abort if dentry is detected newly corrupted.
- name: mds_kill_mdstable_at
type: int
level: dev

View File

@ -17,6 +17,7 @@
#include "CDentry.h"
#include "CInode.h"
#include "CDir.h"
#include "SnapClient.h"
#include "MDSRank.h"
#include "MDCache.h"
@ -697,4 +698,25 @@ bool CDentry::scrub(snapid_t next_seq)
return false;
}
bool CDentry::check_corruption(bool load)
{
auto&& snapclient = dir->mdcache->mds->snapclient;
auto next_snap = snapclient->get_last_seq()+1;
if (first > last || (snapclient->is_server_ready() && first > next_snap)) {
if (load) {
dout(1) << "loaded already corrupt dentry: " << *this << dendl;
corrupt_first_loaded = true;
} else {
derr << "newly corrupt dentry to be committed: " << *this << dendl;
}
dir->go_bad_dentry(last, get_name());
if (!load && g_conf().get_val<bool>("mds_abort_on_newly_corrupt_dentry")) {
dir->mdcache->mds->clog->error() << "MDS abort because newly corrupt dentry to be committed: " << *this;
ceph_abort("detected newly corrupt dentry"); /* avoid writing out newly corrupted dn */
}
return true;
}
return false;
}
MEMPOOL_DEFINE_OBJECT_FACTORY(CDentry, co_dentry, mds_co);

View File

@ -160,6 +160,8 @@ public:
return dentry_key_t(last, name.c_str(), hash);
}
bool check_corruption(bool load);
const CDir *get_dir() const { return dir; }
CDir *get_dir() { return dir; }
std::string_view get_name() const { return std::string_view(name); }
@ -367,6 +369,7 @@ public:
__u32 hash;
snapid_t first, last;
bool corrupt_first_loaded = false; /* for Postgres corruption detection */
elist<CDentry*>::item item_dirty, item_dir_dirty;
elist<CDentry*>::item item_stray;

View File

@ -29,8 +29,10 @@
#include "MDLog.h"
#include "LogSegment.h"
#include "MDBalancer.h"
#include "SnapClient.h"
#include "common/bloom_filter.hpp"
#include "common/likely.h"
#include "include/Context.h"
#include "common/Clock.h"
@ -373,6 +375,9 @@ CDentry* CDir::add_null_dentry(std::string_view dname,
// create dentry
CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), "", first, last);
dn->dir = this;
dn->version = get_projected_version();
dn->check_corruption(true);
if (is_auth()) {
dn->state_set(CDentry::STATE_AUTH);
mdcache->lru.lru_insert_mid(dn);
@ -380,9 +385,6 @@ CDentry* CDir::add_null_dentry(std::string_view dname,
mdcache->bottom_lru.lru_insert_mid(dn);
dn->state_set(CDentry::STATE_BOTTOMLRU);
}
dn->dir = this;
dn->version = get_projected_version();
// add to dir
ceph_assert(items.count(dn->key()) == 0);
@ -419,6 +421,9 @@ CDentry* CDir::add_primary_dentry(std::string_view dname, CInode *in,
// create dentry
CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), std::move(alternate_name), first, last);
dn->dir = this;
dn->version = get_projected_version();
dn->check_corruption(true);
if (is_auth())
dn->state_set(CDentry::STATE_AUTH);
if (is_auth() || !inode->is_stray()) {
@ -428,9 +433,6 @@ CDentry* CDir::add_primary_dentry(std::string_view dname, CInode *in,
dn->state_set(CDentry::STATE_BOTTOMLRU);
}
dn->dir = this;
dn->version = get_projected_version();
// add to dir
ceph_assert(items.count(dn->key()) == 0);
//assert(null_items.count(dn->get_name()) == 0);
@ -469,12 +471,12 @@ CDentry* CDir::add_remote_dentry(std::string_view dname, inodeno_t ino, unsigned
// create dentry
CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), std::move(alternate_name), ino, d_type, first, last);
dn->dir = this;
dn->version = get_projected_version();
dn->check_corruption(true);
if (is_auth())
dn->state_set(CDentry::STATE_AUTH);
mdcache->lru.lru_insert_mid(dn);
dn->dir = this;
dn->version = get_projected_version();
// add to dir
ceph_assert(items.count(dn->key()) == 0);
@ -1797,11 +1799,6 @@ CDentry *CDir::_load_dentry(
<< " [" << first << "," << last << "]"
<< dendl;
if (first > last) {
go_bad_dentry(last, dname);
/* try to continue */
}
bool stale = false;
if (snaps && last != CEPH_NOSNAP) {
set<snapid_t>::const_iterator p = snaps->lower_bound(first);
@ -2562,6 +2559,10 @@ void CDir::_omap_commit(int op_prio)
string key;
dn->key().encode(key);
if (!dn->corrupt_first_loaded) {
dn->check_corruption(false);
}
if (snaps && try_trim_snap_dentry(dn, *snaps)) {
dout(10) << " rm " << key << dendl;
to_remove.emplace_back(std::move(key));

View File

@ -58,6 +58,10 @@ public:
void handle_mds_failure(mds_rank_t mds);
bool is_server_ready(void) const {
return server_ready;
}
// child must implement
virtual void resend_queries() = 0;
virtual void handle_query_result(const cref_t<MMDSTableRequest> &m) = 0;

View File

@ -87,6 +87,7 @@ public:
snapid_t get_last_created() const { return cached_last_created; }
snapid_t get_last_destroyed() const { return cached_last_destroyed; }
snapid_t get_last_seq() const { return std::max(cached_last_destroyed, cached_last_created); }
void get_snaps(std::set<snapid_t>& snaps) const;
std::set<snapid_t> filter(const std::set<snapid_t>& snaps) const;

View File

@ -417,6 +417,7 @@ private:
}
void add_null_dentry(dirlump& lump, CDentry *dn, bool dirty) {
// add the dir
dn->check_corruption(false);
lump.nnull++;
lump.add_dnull(dn->get_name(), dn->first, dn->last,
dn->get_projected_version(), dirty);
@ -430,6 +431,7 @@ private:
}
void add_remote_dentry(dirlump& lump, CDentry *dn, bool dirty,
inodeno_t rino=0, unsigned char rdt=0) {
dn->check_corruption(false);
if (!rino) {
rino = dn->get_projected_linkage()->get_remote_ino();
rdt = dn->get_projected_linkage()->get_remote_d_type();
@ -451,6 +453,8 @@ private:
add_primary_dentry(add_dir(dn->get_dir(), false), dn, in, state);
}
void add_primary_dentry(dirlump& lump, CDentry *dn, CInode *in, __u8 state) {
dn->check_corruption(false);
if (!in)
in = dn->get_projected_linkage()->get_inode();