Merge pull request #3038 from ceph/wip-mds-readonly

Wip mds readonly

Reviewed-by: Greg Farnum <gfarnum@redhat.com>
This commit is contained in:
Gregory Farnum 2014-12-10 20:39:37 -08:00
commit a1eb443ee5
22 changed files with 320 additions and 67 deletions

View File

@ -1752,6 +1752,10 @@ void Client::handle_client_session(MClientSession *m)
session->con->send_message(new MClientSession(CEPH_SESSION_FLUSHMSG_ACK, m->get_seq()));
break;
case CEPH_SESSION_FORCE_RO:
force_session_readonly(session);
break;
default:
assert(0);
}
@ -2148,6 +2152,8 @@ void Client::send_reconnect(MetaSession *session)
// trim unused caps to reduce MDS's cache rejoin time
trim_cache_for_reconnect(session);
session->readonly = false;
if (session->release) {
session->release->put();
session->release = NULL;
@ -2541,6 +2547,10 @@ int Client::get_caps(Inode *in, int need, int want, int *phave, loff_t endoff)
}
ldout(cct, 10) << "waiting for caps need " << ccap_string(need) << " want " << ccap_string(want) << dendl;
}
if ((need & CEPH_CAP_FILE_WR) && in->auth_cap &&
in->auth_cap->session->readonly)
return -EROFS;
wait_on_list(in->waitfor_caps);
}
@ -3347,6 +3357,16 @@ void Client::trim_caps(MetaSession *s, int max)
s->s_cap_iterator = NULL;
}
void Client::force_session_readonly(MetaSession *s)
{
s->readonly = true;
for (xlist<Cap*>::iterator p = s->caps.begin(); !p.end(); ++p) {
Inode *in = (*p)->inode;
if (in->caps_wanted() & CEPH_CAP_FILE_WR)
signal_cond_list(in->waitfor_caps);
}
}
void Client::mark_caps_dirty(Inode *in, int caps)
{
ldout(cct, 10) << "mark_caps_dirty " << *in << " " << ccap_string(in->dirty_caps) << " -> "

View File

@ -433,7 +433,10 @@ protected:
void dump_inode(Formatter *f, Inode *in, set<Inode*>& did, bool disconnected);
void dump_cache(Formatter *f); // debug
// force read-only
void force_session_readonly(MetaSession *s);
// trace generation
ofstream traceout;

View File

@ -37,6 +37,8 @@ struct MetaSession {
STATE_STALE,
} state;
bool readonly;
list<Context*> waiting_for_open;
xlist<Cap*> caps;
@ -52,7 +54,7 @@ struct MetaSession {
MetaSession()
: mds_num(-1), con(NULL),
seq(0), cap_gen(0), cap_renew_seq(0), num_caps(0),
state(STATE_NEW), s_cap_iterator(NULL),
state(STATE_NEW), readonly(false), s_cap_iterator(NULL),
release(NULL)
{}
~MetaSession();

View File

@ -408,6 +408,8 @@ OPTION(mds_snap_min_uid, OPT_U32, 0) // The minimum UID required to create a sna
OPTION(mds_snap_max_uid, OPT_U32, 65536) // The maximum UID allowed to create a snapshot
OPTION(mds_verify_backtrace, OPT_U32, 1)
OPTION(mds_action_on_write_error, OPT_U32, 1) // 0: ignore; 1: force readonly; 2: crash
// If true, compact leveldb store on mount
OPTION(osd_compact_leveldb_on_mount, OPT_BOOL, false)

View File

@ -285,6 +285,7 @@ enum {
CEPH_SESSION_RECALL_STATE,
CEPH_SESSION_FLUSHMSG,
CEPH_SESSION_FLUSHMSG_ACK,
CEPH_SESSION_FORCE_RO,
};
extern const char *ceph_session_op_name(int op);

View File

@ -121,6 +121,7 @@ ostream& operator<<(ostream& out, CDir& dir)
if (dir.state_test(CDir::STATE_FREEZINGDIR)) out << "|freezingdir";
if (dir.state_test(CDir::STATE_EXPORTBOUND)) out << "|exportbound";
if (dir.state_test(CDir::STATE_IMPORTBOUND)) out << "|importbound";
if (dir.state_test(CDir::STATE_BADFRAG)) out << "|badfrag";
// fragstat
out << " " << dir.fnode.fragstat;
@ -1521,8 +1522,7 @@ void CDir::_omap_fetched(bufferlist& hdrbl, map<string, bufferlist>& omap,
dout(0) << "_fetched missing object for " << *this << dendl;
clog->error() << "dir " << dirfrag() << " object missing on disk; some files may be lost\n";
log_mark_dirty();
state_set(STATE_BADFRAG);
// mark complete, !fetching
mark_complete();
state_clear(STATE_FETCHING);
@ -1835,8 +1835,7 @@ class C_IO_Dir_Committed : public CDirIOContext {
public:
C_IO_Dir_Committed(CDir *d, version_t v) : CDirIOContext(d), version(v) { }
void finish(int r) {
assert(r == 0);
dir->_committed(version);
dir->_committed(r, version);
}
};
@ -1913,6 +1912,11 @@ void CDir::_omap_commit(int op_prio)
if (write_size >= max_write_size) {
ObjectOperation op;
op.priority = op_prio;
// don't create new dirfrag blindly
if (!is_new() && !state_test(CDir::STATE_FRAGMENTING))
op.stat(NULL, (utime_t*)NULL, NULL);
op.tmap_to_omap(true); // convert tmap to omap
if (!to_set.empty())
@ -1931,6 +1935,11 @@ void CDir::_omap_commit(int op_prio)
ObjectOperation op;
op.priority = op_prio;
// don't create new dirfrag blindly
if (!is_new() && !state_test(CDir::STATE_FRAGMENTING))
op.stat(NULL, (utime_t*)NULL, NULL);
op.tmap_to_omap(true); // convert tmap to omap
/*
@ -2040,8 +2049,16 @@ void CDir::_commit(version_t want, int op_prio)
*
* @param v version i just committed
*/
void CDir::_committed(version_t v)
void CDir::_committed(int r, version_t v)
{
if (r < 0) {
dout(1) << "commit error " << r << " v " << v << dendl;
cache->mds->clog->error() << "failed to commit dir " << dirfrag() << " object,"
<< " errno " << r << "\n";
cache->mds->handle_write_error(r);
return;
}
dout(10) << "_committed v " << v << " on " << *this << dendl;
assert(is_auth());

View File

@ -106,6 +106,7 @@ public:
static const unsigned STATE_DNPINNEDFRAG = (1<<16); // dir is refragmenting
static const unsigned STATE_ASSIMRSTAT = (1<<17); // assimilating inode->frag rstats
static const unsigned STATE_DIRTYDFT = (1<<18); // dirty dirfragtree
static const unsigned STATE_BADFRAG = (1<<19); // bad dirfrag
// common states
static const unsigned STATE_CLEAN = 0;
@ -114,7 +115,7 @@ public:
// these state bits are preserved by an import/export
// ...except if the directory is hashed, in which case none of them are!
static const unsigned MASK_STATE_EXPORTED =
(STATE_COMPLETE|STATE_DIRTY|STATE_DIRTYDFT);
(STATE_COMPLETE|STATE_DIRTY|STATE_DIRTYDFT|STATE_BADFRAG);
static const unsigned MASK_STATE_IMPORT_KEPT =
(
STATE_IMPORTING
@ -220,6 +221,8 @@ public:
bool is_new() { return item_new.is_on_list(); }
void mark_new(LogSegment *ls);
bool is_bad() { return state_test(STATE_BADFRAG); }
public:
typedef std::map<dentry_key_t, CDentry*> map_t;
protected:
@ -498,7 +501,7 @@ protected:
void _commit(version_t want, int op_prio);
void _omap_commit(int op_prio);
void _encode_dentry(CDentry *dn, bufferlist& bl, const std::set<snapid_t> *snaps);
void _committed(version_t v);
void _committed(int r, version_t v);
public:
#if 0 // unused?
void wait_for_commit(Context *c, version_t v=0);

View File

@ -904,8 +904,7 @@ struct C_IO_Inode_Stored : public CInodeIOContext {
Context *fin;
C_IO_Inode_Stored(CInode *i, version_t v, Context *f) : CInodeIOContext(i), version(v), fin(f) {}
void finish(int r) {
assert(r == 0);
in->_stored(version, fin);
in->_stored(r, version, fin);
}
};
@ -943,9 +942,17 @@ void CInode::store(MDSInternalContextBase *fin)
NULL, newfin);
}
void CInode::_stored(version_t v, Context *fin)
void CInode::_stored(int r, version_t v, Context *fin)
{
dout(10) << "_stored " << v << " " << *this << dendl;
if (r < 0) {
dout(1) << "store error " << r << " v " << v << " on " << *this << dendl;
mdcache->mds->clog->error() << "failed to store ino " << ino() << " object,"
<< " errno " << r << "\n";
mdcache->mds->handle_write_error(r);
return;
}
dout(10) << "_stored " << v << " on " << *this << dendl;
if (v == get_projected_version())
mark_clean();
@ -1062,8 +1069,7 @@ struct C_IO_Inode_StoredBacktrace : public CInodeIOContext {
Context *fin;
C_IO_Inode_StoredBacktrace(CInode *i, version_t v, Context *f) : CInodeIOContext(i), version(v), fin(f) {}
void finish(int r) {
assert(r == 0);
in->_stored_backtrace(version, fin);
in->_stored_backtrace(r, version, fin);
}
};
@ -1130,9 +1136,17 @@ void CInode::store_backtrace(MDSInternalContextBase *fin, int op_prio)
gather.activate();
}
void CInode::_stored_backtrace(version_t v, Context *fin)
void CInode::_stored_backtrace(int r, version_t v, Context *fin)
{
dout(10) << "_stored_backtrace" << dendl;
if (r < 0) {
dout(1) << "store backtrace error " << r << " v " << v << dendl;
mdcache->mds->clog->error() << "failed to store backtrace on dir ino "
<< ino() << " object, errno " << r << "\n";
mdcache->mds->handle_write_error(r);
return;
}
dout(10) << "_stored_backtrace v " << v << dendl;
auth_unpin(this);
if (v == inode.backtrace_version)
@ -2500,6 +2514,8 @@ void CInode::decode_snap(bufferlist::iterator& p)
client_t CInode::calc_ideal_loner()
{
if (mdcache->is_readonly())
return -1;
if (!mds_caps_wanted.empty())
return -1;

View File

@ -544,7 +544,7 @@ public:
void mark_clean();
void store(MDSInternalContextBase *fin);
void _stored(version_t cv, Context *fin);
void _stored(int r, version_t cv, Context *fin);
/**
* Flush a CInode to disk. This includes the backtrace, the parent
* directory's link, and the Inode object itself (if a base directory).
@ -559,7 +559,7 @@ public:
void build_backtrace(int64_t pool, inode_backtrace_t& bt);
void store_backtrace(MDSInternalContextBase *fin, int op_prio=-1);
void _stored_backtrace(version_t v, Context *fin);
void _stored_backtrace(int r, version_t v, Context *fin);
void fetch_backtrace(Context *fin, bufferlist *backtrace);
void _mark_dirty_parent(LogSegment *ls, bool dirty_pool=false);
void clear_dirty_parent();

View File

@ -3692,6 +3692,14 @@ void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
return;
}
if (mdcache->is_readonly()) {
if (lock->get_state() != LOCK_SYNC) {
dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
simple_sync(lock, need_issue);
}
return;
}
CInode *in = 0;
int wanted = 0;
if (lock->get_type() != CEPH_LOCK_DN) {
@ -4119,6 +4127,14 @@ void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
dout(20) << " freezing|frozen" << dendl;
return;
}
if (mdcache->is_readonly()) {
if (lock->get_state() != LOCK_SYNC) {
dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
simple_sync(lock, need_issue);
}
return;
}
if (!lock->is_rdlocked() &&
lock->get_state() != LOCK_MIX &&
@ -4226,6 +4242,14 @@ void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool fo
}
*/
if (mdcache->is_readonly()) {
if (lock->get_state() != LOCK_SYNC) {
dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl;
simple_sync(static_cast<ScatterLock*>(lock));
}
break;
}
// adjust lock state
dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl;
switch (lock->get_type()) {
@ -4468,6 +4492,14 @@ void Locker::file_eval(ScatterLock *lock, bool *need_issue)
if (lock->get_parent()->is_freezing_or_frozen())
return;
if (mdcache->is_readonly()) {
if (lock->get_state() != LOCK_SYNC) {
dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
simple_sync(lock, need_issue);
}
return;
}
// excl -> *?
if (lock->get_state() == LOCK_EXCL) {
dout(20) << " is excl" << dendl;

View File

@ -172,6 +172,7 @@ MDCache::MDCache(MDS *m) :
migrator = new Migrator(mds, this);
root = NULL;
myin = NULL;
readonly = false;
stray_index = 0;
for (int i = 0; i < NUM_STRAY; ++i) {
@ -10451,6 +10452,10 @@ public:
bool MDCache::can_fragment(CInode *diri, list<CDir*>& dirs)
{
if (is_readonly()) {
dout(7) << "can_fragment: read-only FS, no fragmenting for now" << dendl;
return false;
}
if (mds->mdsmap->is_degraded()) {
dout(7) << "can_fragment: cluster degraded, no fragmenting for now" << dendl;
return false;
@ -10475,6 +10480,10 @@ bool MDCache::can_fragment(CInode *diri, list<CDir*>& dirs)
dout(7) << "can_fragment: not auth on " << *dir << dendl;
return false;
}
if (dir->is_bad()) {
dout(7) << "can_fragment: bad dirfrag " << *dir << dendl;
return false;
}
if (dir->is_frozen() ||
dir->is_freezing()) {
dout(7) << "can_fragment: can't merge, freezing|frozen. wait for other exports to finish first." << dendl;
@ -11278,6 +11287,29 @@ void MDCache::rollback_uncommitted_fragments()
}
}
void MDCache::force_readonly()
{
if (is_readonly())
return;
dout(1) << "force file system read-only" << dendl;
mds->clog->warn() << "force file system read-only\n";
set_readonly();
mds->server->force_clients_readonly();
// revoke write caps
for (ceph::unordered_map<vinodeno_t,CInode*>::iterator p = inode_map.begin();
p != inode_map.end();
++p) {
CInode *in = p->second;
if (in->is_head())
mds->locker->eval(in, CEPH_CAP_LOCKS);
}
mds->mdlog->flush();
}
// ==============================================================
@ -11576,6 +11608,11 @@ void MDCache::scrub_dentry_work(MDRequestRef& mdr)
void MDCache::flush_dentry(const string& path, Context *fin)
{
if (is_readonly()) {
dout(10) << __func__ << ": read-only FS" << dendl;
fin->complete(-EROFS);
return;
}
dout(10) << "flush_dentry " << path << dendl;
MDRequestRef mdr = request_start_internal(CEPH_MDS_OP_FLUSH);
filepath fp(path.c_str());

View File

@ -90,6 +90,9 @@ class MDCache {
CInode *root; // root inode
CInode *myin; // .ceph/mds%d dir
bool readonly;
void set_readonly() { readonly = true; }
CInode *strays[NUM_STRAY]; // my stray dir
int stray_index;
@ -103,6 +106,8 @@ public:
void advance_stray() {
stray_index = (stray_index+1)%NUM_STRAY;
}
bool is_readonly() { return readonly; }
void force_readonly();
DecayRate decayrate;
@ -858,7 +863,7 @@ public:
void eval_remote(CDentry *dn);
void maybe_eval_stray(CInode *in, bool delay=false) {
if (in->inode.nlink > 0 || in->is_base())
if (in->inode.nlink > 0 || in->is_base() || is_readonly())
return;
CDentry *dn = in->get_projected_parent_dn();
if (!dn->state_test(CDentry::STATE_PURGING) &&

View File

@ -84,7 +84,8 @@ class C_MDL_WriteError : public MDSIOContextBase {
void finish(int r) {
MDS *mds = get_mds();
// assume journal is reliable, so don't choose action based on
// g_conf->mds_action_on_write_error.
if (r == -EBLACKLISTED) {
derr << "we have been blacklisted (fenced), respawning..." << dendl;
mds->respawn();
@ -489,6 +490,11 @@ void MDLog::trim(int m)
if (m >= 0)
max_events = m;
if (mds->mdcache->is_readonly()) {
dout(10) << "trim, ignoring read-only FS" << dendl;
return;
}
submit_mutex.Lock();
// trim!
@ -561,6 +567,19 @@ void MDLog::trim(int m)
_trim_expired_segments();
}
class C_MaybeExpiredSegment : public MDSInternalContext {
MDLog *mdlog;
LogSegment *ls;
int op_prio;
public:
C_MaybeExpiredSegment(MDLog *mdl, LogSegment *s, int p) :
MDSInternalContext(mdl->mds), mdlog(mdl), ls(s), op_prio(p) {}
void finish(int res) {
if (res < 0)
mdlog->mds->handle_write_error(res);
mdlog->_maybe_expired(ls, op_prio);
}
};
/**
* Like ::trim, but instead of trimming to max_segments, trim all but the latest
@ -644,6 +663,11 @@ void MDLog::try_expire(LogSegment *ls, int op_prio)
void MDLog::_maybe_expired(LogSegment *ls, int op_prio)
{
if (mds->mdcache->is_readonly()) {
dout(10) << "_maybe_expired, ignoring read-only FS" << dendl;
return;
}
dout(10) << "_maybe_expired segment " << ls->seq << "/" << ls->offset
<< ", " << ls->num_events << " events" << dendl;
try_expire(ls, op_prio);

View File

@ -284,22 +284,13 @@ public:
}
private:
class C_MaybeExpiredSegment : public MDSInternalContext {
MDLog *mdlog;
LogSegment *ls;
int op_prio;
public:
C_MaybeExpiredSegment(MDLog *mdl, LogSegment *s, int p) : MDSInternalContext(mdl->mds), mdlog(mdl), ls(s), op_prio(p) {}
void finish(int res) {
mdlog->_maybe_expired(ls, op_prio);
}
};
void try_expire(LogSegment *ls, int op_prio);
void _maybe_expired(LogSegment *ls, int op_prio);
void _expired(LogSegment *ls);
void _trim_expired_segments();
friend class C_MaybeExpiredSegment;
public:
void trim_expired_segments();
void trim(int max=-1);

View File

@ -299,6 +299,10 @@ bool MDS::asok_command(string command, cmdmap_t& cmdmap, string format,
command_flush_path(f, path);
} else if (command == "flush journal") {
command_flush_journal(f);
} else if (command == "force_readonly") {
mds_lock.Lock();
mdcache->force_readonly();
mds_lock.Unlock();
}
f->flush(ss);
delete f;
@ -361,6 +365,11 @@ int MDS::_command_flush_journal(std::stringstream *ss)
Mutex::Locker l(mds_lock);
if (mdcache->is_readonly()) {
dout(5) << __func__ << ": read-only FS" << dendl;
return -EROFS;
}
// I need to seal off the current segment, and then mark all previous segments
// for expiry
mdlog->start_new_segment();
@ -478,6 +487,11 @@ void MDS::set_up_admin_socket()
asok_hook,
"Flush the journal to the backing store");
assert(0 == r);
r = admin_socket->register_command("force_readonly",
"force_readonly",
asok_hook,
"Force MDS to read-only mode");
assert(0 == r);
}
void MDS::clean_up_admin_socket()
@ -487,6 +501,11 @@ void MDS::clean_up_admin_socket()
admin_socket->unregister_command("dump_ops_in_flight");
admin_socket->unregister_command("dump_historic_ops");
admin_socket->unregister_command("scrub_path");
admin_socket->unregister_command("flush_path");
admin_socket->unregister_command("session evict");
admin_socket->unregister_command("session ls");
admin_socket->unregister_command("flush journal");
admin_socket->unregister_command("force_readonly");
delete asok_hook;
asok_hook = NULL;
}
@ -2242,8 +2261,25 @@ void MDS::respawn()
suicide();
}
void MDS::handle_write_error(int err)
{
if (err == -EBLACKLISTED) {
derr << "we have been blacklisted (fenced), respawning..." << dendl;
respawn();
return;
}
if (g_conf->mds_action_on_write_error >= 2) {
derr << "unhandled write error " << cpp_strerror(err) << ", suicide..." << dendl;
suicide();
} else if (g_conf->mds_action_on_write_error == 1) {
derr << "unhandled write error " << cpp_strerror(err) << ", force readonly..." << dendl;
mdcache->force_readonly();
} else {
// ignore;
derr << "unhandled write error " << cpp_strerror(err) << ", ignore..." << dendl;
}
}
bool MDS::ms_dispatch(Message *m)
{

View File

@ -441,6 +441,7 @@ private:
void suicide();
void respawn();
void handle_write_error(int err);
void tick();

View File

@ -88,16 +88,15 @@ void MDSTable::save(MDSInternalContextBase *onfinish, version_t v)
void MDSTable::save_2(int r, version_t v)
{
dout(10) << "save_2 v " << v << dendl;
if (r == -EBLACKLISTED) {
mds->suicide();
if (r < 0) {
dout(1) << "save error " << r << " v " << v << dendl;
mds->clog->error() << "failed to store table " << table_name << " object,"
<< " errno " << r << "\n";
mds->handle_write_error(r);
return;
}
if (r < 0) {
dout(10) << "save_2 could not write table: " << r << dendl;
assert(r >= 0);
}
assert(r >= 0);
dout(10) << "save_2 v " << v << dendl;
committed_version = v;
list<MDSInternalContextBase*> ls;

View File

@ -728,6 +728,10 @@ void Migrator::export_dir(CDir *dir, mds_rank_t dest)
assert(dir->is_auth());
assert(dest != mds->get_nodeid());
if (mds->mdcache->is_readonly()) {
dout(7) << "read-only FS, no exports for now" << dendl;
return;
}
if (mds->mdsmap->is_degraded()) {
dout(7) << "cluster degraded, no exports for now" << dendl;
return;
@ -2137,7 +2141,8 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
dout(7) << " all ready, noting auth and freezing import region" << dendl;
bool success = true;
if (dir->get_inode()->filelock.can_wrlock(-1) &&
if (!mds->mdcache->is_readonly() &&
dir->get_inode()->filelock.can_wrlock(-1) &&
dir->get_inode()->nestlock.can_wrlock(-1)) {
it->second.mut = MutationRef(new MutationImpl);
// force some locks. hacky.

View File

@ -224,6 +224,7 @@ struct MDRequestImpl : public MutationImpl, public TrackedOp {
// break rarely-used fields into a separately allocated structure
// to save memory for most ops
struct More {
int slave_error;
set<mds_rank_t> slaves; // mds nodes that have slave requests to me (implies client_request)
set<mds_rank_t> waiting_on_slave; // peers i'm waiting for slavereq replies from.
@ -271,6 +272,7 @@ struct MDRequestImpl : public MutationImpl, public TrackedOp {
filepath filepath2;
More() :
slave_error(0),
has_journaled_slaves(false), slave_update_journaled(false),
srcdn_auth_mds(-1), inode_import_v(0), rename_inode(0),
is_freeze_authpin(false), is_ambiguous_auth(false),

View File

@ -344,6 +344,8 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve
mds->sessionmap.touch_session(session);
assert(session->connection != NULL);
session->connection->send_message(new MClientSession(CEPH_SESSION_OPEN));
if (mdcache->is_readonly())
session->connection->send_message(new MClientSession(CEPH_SESSION_FORCE_RO));
} else if (session->is_closing() ||
session->is_killing()) {
// kill any lingering capabilities, leases, requests
@ -448,11 +450,9 @@ void Server::finish_force_open_sessions(map<client_t,entity_inst_t>& cm,
dout(10) << "force_open_sessions opened " << session->info.inst << dendl;
mds->sessionmap.set_state(session, Session::STATE_OPEN);
mds->sessionmap.touch_session(session);
Message *m = new MClientSession(CEPH_SESSION_OPEN);
if (session->connection)
session->connection->send_message(m);
else
session->preopen_out_queue.push_back(m);
mds->send_message_client(new MClientSession(CEPH_SESSION_OPEN), session);
if (mdcache->is_readonly())
mds->send_message_client(new MClientSession(CEPH_SESSION_FORCE_RO), session);
}
} else {
dout(10) << "force_open_sessions skipping already-open " << session->info.inst << dendl;
@ -650,6 +650,7 @@ void Server::handle_client_reconnect(MClientReconnect *m)
delay -= reconnect_start;
dout(10) << " reconnect_start " << reconnect_start << " delay " << delay << dendl;
bool deny = false;
if (!mds->is_reconnect()) {
// XXX maybe in the future we can do better than this?
dout(1) << " no longer in reconnect state, ignoring reconnect, sending close" << dendl;
@ -657,16 +658,20 @@ void Server::handle_client_reconnect(MClientReconnect *m)
<< ceph_mds_state_name(mds->get_state())
<< ") from " << m->get_source_inst()
<< " after " << delay << " (allowed interval " << g_conf->mds_reconnect_timeout << ")\n";
m->get_connection()->send_message(new MClientSession(CEPH_SESSION_CLOSE));
m->put();
return;
}
if (session->is_closed()) {
deny = true;
} else if (session->is_closed()) {
dout(1) << " session is closed, ignoring reconnect, sending close" << dendl;
mds->clog->info() << "denied reconnect attempt (mds is "
<< ceph_mds_state_name(mds->get_state())
<< ") from " << m->get_source_inst() << " (session is closed)\n";
deny = true;
} else if (mdcache->is_readonly()) {
dout(1) << " read-only FS, ignoring reconnect, sending close" << dendl;
mds->clog->info() << "denied reconnect attempt (mds is read-only)\n";
deny = true;
}
if (deny) {
m->get_connection()->send_message(new MClientSession(CEPH_SESSION_CLOSE));
m->put();
return;
@ -836,6 +841,21 @@ void Server::recall_client_state(float ratio)
}
}
void Server::force_clients_readonly()
{
dout(10) << "force_clients_readonly" << dendl;
set<Session*> sessions;
mds->sessionmap.get_client_session_set(sessions);
for (set<Session*>::const_iterator p = sessions.begin();
p != sessions.end();
++p) {
Session *session = *p;
if (!session->info.inst.name.is_client() ||
!(session->is_open() || session->is_stale()))
continue;
mds->send_message_client(new MClientSession(CEPH_SESSION_FORCE_RO), session);
}
}
/*******
* some generic stuff for finishing off requests
@ -1289,6 +1309,19 @@ void Server::dispatch_client_request(MDRequestRef& mdr)
// we shouldn't be waiting on anyone.
assert(mdr->more()->waiting_on_slave.empty());
if (req->get_op() & CEPH_MDS_OP_WRITE) {
if (mdcache->is_readonly()) {
dout(10) << " read-only FS" << dendl;
respond_to_request(mdr, -EROFS);
return;
}
if (mdr->has_more() && mdr->more()->slave_error) {
dout(10) << " got error from slaves" << dendl;
respond_to_request(mdr, mdr->more()->slave_error);
return;
}
}
switch (req->get_op()) {
case CEPH_MDS_OP_LOOKUPHASH:
@ -1717,21 +1750,29 @@ void Server::handle_slave_auth_pin(MDRequestRef& mdr)
// build list of objects
list<MDSCacheObject*> objects;
CInode *auth_pin_freeze = NULL;
bool fail = false, wouldblock = false;
bool fail = false, wouldblock = false, readonly = false;
for (vector<MDSCacheObjectInfo>::iterator p = mdr->slave_request->get_authpins().begin();
p != mdr->slave_request->get_authpins().end();
++p) {
MDSCacheObject *object = mdcache->get_object(*p);
if (!object) {
dout(10) << " don't have " << *p << dendl;
fail = true;
break;
if (mdcache->is_readonly()) {
dout(10) << " read-only FS" << dendl;
readonly = true;
fail = true;
}
if (!fail) {
for (vector<MDSCacheObjectInfo>::iterator p = mdr->slave_request->get_authpins().begin();
p != mdr->slave_request->get_authpins().end();
++p) {
MDSCacheObject *object = mdcache->get_object(*p);
if (!object) {
dout(10) << " don't have " << *p << dendl;
fail = true;
break;
}
objects.push_back(object);
if (*p == mdr->slave_request->get_authpin_freeze())
auth_pin_freeze = static_cast<CInode*>(object);
}
objects.push_back(object);
if (*p == mdr->slave_request->get_authpin_freeze())
auth_pin_freeze = static_cast<CInode*>(object);
}
// can we auth pin them?
@ -1831,6 +1872,8 @@ void Server::handle_slave_auth_pin(MDRequestRef& mdr)
if (wouldblock)
reply->mark_error_wouldblock();
if (readonly)
reply->mark_error_rofs();
mds->send_message_mds(reply, mdr->slave_to_mds);
@ -1873,8 +1916,13 @@ void Server::handle_slave_auth_pin_ack(MDRequestRef& mdr, MMDSSlaveRequest *ack)
}
}
if (ack->is_error_wouldblock())
if (ack->is_error_rofs()) {
mdr->more()->slave_error = -EROFS;
mdr->aborted = true;
} else if (ack->is_error_wouldblock()) {
mdr->more()->slave_error = -EWOULDBLOCK;
mdr->aborted = true;
}
// note slave
mdr->more()->slaves.insert(from);
@ -2576,6 +2624,11 @@ void Server::handle_client_open(MDRequestRef& mdr)
respond_to_request(mdr, -EINVAL);
return;
}
if ((cmode & CEPH_FILE_MODE_WR) && mdcache->is_readonly()) {
dout(7) << "read-only FS" << dendl;
respond_to_request(mdr, -EROFS);
}
set<SimpleLock*> rdlocks, wrlocks, xlocks;
CInode *cur = rdlock_path_pin_ref(mdr, 0, rdlocks, need_auth);

View File

@ -102,6 +102,7 @@ public:
void recover_filelocks(CInode *in, bufferlist locks, int64_t client);
void recall_client_state(float ratio);
void force_clients_readonly();
// -- requests --
void handle_client_request(MClientRequest *m);

View File

@ -99,6 +99,7 @@ class MMDSSlaveRequest : public Message {
static const unsigned FLAG_NONBLOCK = 1;
static const unsigned FLAG_WOULDBLOCK = 2;
static const unsigned FLAG_NOTJOURNALED = 4;
static const unsigned FLAG_EROFS = 8;
// for locking
__u16 lock_type; // lock object type
@ -136,6 +137,8 @@ public:
bool is_error_wouldblock() { return (flags & FLAG_WOULDBLOCK); }
void mark_not_journaled() { flags |= FLAG_NOTJOURNALED; }
bool is_not_journaled() { return (flags & FLAG_NOTJOURNALED); }
void mark_error_rofs() { flags |= FLAG_EROFS; }
bool is_error_rofs() { return (flags & FLAG_EROFS); }
void set_lock_type(int t) { lock_type = t; }