mirror of
https://github.com/ceph/ceph
synced 2025-01-29 14:34:40 +00:00
Merge PR #25621 into master
* refs/pull/25621/head: mds: allow boot on read-only mds: setup readonly mode for PurgeQueue mds: return string_view for type str mds: add missing locks for PurgeQueue methods mds: delete on_error context on des Reviewed-by: Zheng Yan <zyan@redhat.com>
This commit is contained in:
commit
03990857fa
@ -229,8 +229,15 @@ class FuseMount(CephFSMount):
|
||||
|
||||
# Now that we're mounted, set permissions so that the rest of the test will have
|
||||
# unrestricted access to the filesystem mount.
|
||||
self.client_remote.run(
|
||||
args=['sudo', 'chmod', '1777', self.mountpoint], timeout=(15*60))
|
||||
try:
|
||||
stderr = StringIO()
|
||||
self.client_remote.run(args=['sudo', 'chmod', '1777', self.mountpoint], timeout=(15*60), stderr=stderr)
|
||||
except run.CommandFailedError:
|
||||
stderr = stderr.getvalue()
|
||||
if "Read-only file system".lower() in stderr.lower():
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
|
||||
def _mountpoint_exists(self):
|
||||
return self.client_remote.run(args=["ls", "-d", self.mountpoint], check_status=False, timeout=(15*60)).exitstatus == 0
|
||||
|
@ -12,6 +12,7 @@ DAMAGED_ON_START = "damaged_on_start"
|
||||
DAMAGED_ON_LS = "damaged_on_ls"
|
||||
CRASHED = "server crashed"
|
||||
NO_DAMAGE = "no damage"
|
||||
READONLY = "readonly"
|
||||
FAILED_CLIENT = "client failed"
|
||||
FAILED_SERVER = "server failed"
|
||||
|
||||
@ -161,14 +162,22 @@ class TestDamage(CephFSTestCase):
|
||||
))
|
||||
|
||||
# Blatant corruptions
|
||||
mutations.extend([
|
||||
MetadataMutation(
|
||||
o,
|
||||
"Corrupt {0}".format(o),
|
||||
lambda o=o: self.fs.rados(["put", o, "-"], stdin_data=junk),
|
||||
DAMAGED_ON_START
|
||||
) for o in data_objects
|
||||
])
|
||||
for obj_id in data_objects:
|
||||
if obj_id == "500.00000000":
|
||||
# purge queue corruption results in read-only FS
|
||||
mutations.append(MetadataMutation(
|
||||
obj_id,
|
||||
"Corrupt {0}".format(obj_id),
|
||||
lambda o=obj_id: self.fs.rados(["put", o, "-"], stdin_data=junk),
|
||||
READONLY
|
||||
))
|
||||
else:
|
||||
mutations.append(MetadataMutation(
|
||||
obj_id,
|
||||
"Corrupt {0}".format(obj_id),
|
||||
lambda o=obj_id: self.fs.rados(["put", o, "-"], stdin_data=junk),
|
||||
DAMAGED_ON_START
|
||||
))
|
||||
|
||||
# Truncations
|
||||
for obj_id in data_objects:
|
||||
@ -316,7 +325,17 @@ class TestDamage(CephFSTestCase):
|
||||
else:
|
||||
log.error("Result: Failed to go damaged on mutation '{0}'".format(mutation.desc))
|
||||
results[mutation] = FAILED_SERVER
|
||||
|
||||
elif mutation.expectation == READONLY:
|
||||
proc = self.mount_a.run_shell(["mkdir", "foo"], wait=False)
|
||||
try:
|
||||
proc.wait()
|
||||
except CommandFailedError:
|
||||
stderr = proc.stderr.getvalue()
|
||||
log.info(stderr)
|
||||
if "Read-only file system".lower() in stderr.lower():
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
try:
|
||||
wait([proc], 20)
|
||||
|
@ -1476,6 +1476,8 @@ void MDSRank::boot_start(BootStep step, int r)
|
||||
<< cpp_strerror(r);
|
||||
damaged();
|
||||
ceph_assert(r == 0); // Unreachable, damaged() calls respawn()
|
||||
} else if (r == -EROFS) {
|
||||
dout(0) << "boot error forcing transition to read-only; MDS will try to continue" << dendl;
|
||||
} else {
|
||||
// Completely unexpected error, give up and die
|
||||
dout(0) << "boot_start encountered an error, failing" << dendl;
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
#include "PurgeQueue.h"
|
||||
|
||||
#include <string.h>
|
||||
|
||||
#define dout_context cct
|
||||
#define dout_subsys ceph_subsys_mds
|
||||
@ -108,6 +109,7 @@ PurgeQueue::~PurgeQueue()
|
||||
if (logger) {
|
||||
g_ceph_context->get_perfcounters_collection()->remove(logger.get());
|
||||
}
|
||||
delete on_error;
|
||||
}
|
||||
|
||||
void PurgeQueue::create_logger()
|
||||
@ -138,6 +140,12 @@ void PurgeQueue::init()
|
||||
void PurgeQueue::activate()
|
||||
{
|
||||
std::lock_guard l(lock);
|
||||
|
||||
if (readonly) {
|
||||
dout(10) << "skipping activate: PurgeQueue is readonly" << dendl;
|
||||
return;
|
||||
}
|
||||
|
||||
if (journaler.get_read_pos() == journaler.get_write_pos())
|
||||
return;
|
||||
|
||||
@ -192,7 +200,7 @@ void PurgeQueue::open(Context *completion)
|
||||
finish_contexts(g_ceph_context, waiting_for_recovery);
|
||||
} else {
|
||||
derr << "Error " << r << " loading Journaler" << dendl;
|
||||
on_error->complete(r);
|
||||
_go_readonly(r);
|
||||
}
|
||||
}));
|
||||
}
|
||||
@ -200,10 +208,14 @@ void PurgeQueue::open(Context *completion)
|
||||
void PurgeQueue::wait_for_recovery(Context* c)
|
||||
{
|
||||
std::lock_guard l(lock);
|
||||
if (recovered)
|
||||
if (recovered) {
|
||||
c->complete(0);
|
||||
else
|
||||
} else if (readonly) {
|
||||
dout(10) << "cannot wait for recovery: PurgeQueue is readonly" << dendl;
|
||||
c->complete(-EROFS);
|
||||
} else {
|
||||
waiting_for_recovery.push_back(c);
|
||||
}
|
||||
}
|
||||
|
||||
void PurgeQueue::_recover()
|
||||
@ -225,7 +237,7 @@ void PurgeQueue::_recover()
|
||||
if (journaler.get_error()) {
|
||||
int r = journaler.get_error();
|
||||
derr << "Error " << r << " recovering write_pos" << dendl;
|
||||
on_error->complete(r);
|
||||
_go_readonly(r);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -259,8 +271,12 @@ void PurgeQueue::create(Context *fin)
|
||||
journaler.create(&layout, JOURNAL_FORMAT_RESILIENT);
|
||||
journaler.write_head(new FunctionContext([this](int r) {
|
||||
std::lock_guard l(lock);
|
||||
recovered = true;
|
||||
finish_contexts(g_ceph_context, waiting_for_recovery);
|
||||
if (r) {
|
||||
_go_readonly(r);
|
||||
} else {
|
||||
recovered = true;
|
||||
finish_contexts(g_ceph_context, waiting_for_recovery);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@ -272,6 +288,12 @@ void PurgeQueue::push(const PurgeItem &pi, Context *completion)
|
||||
dout(4) << "pushing inode " << pi.ino << dendl;
|
||||
std::lock_guard l(lock);
|
||||
|
||||
if (readonly) {
|
||||
dout(10) << "cannot push inode: PurgeQueue is readonly" << dendl;
|
||||
completion->complete(-EROFS);
|
||||
return;
|
||||
}
|
||||
|
||||
// Callers should have waited for open() before using us
|
||||
ceph_assert(!journaler.is_readonly());
|
||||
|
||||
@ -286,7 +308,7 @@ void PurgeQueue::push(const PurgeItem &pi, Context *completion)
|
||||
if (!could_consume) {
|
||||
// Usually, it is not necessary to explicitly flush here, because the reader
|
||||
// will get flushes generated inside Journaler::is_readable. However,
|
||||
// if we remain in a can_consume()==false state for a long period then
|
||||
// if we remain in a _can_consume()==false state for a long period then
|
||||
// we should flush in order to allow MDCache to drop its strays rather
|
||||
// than having them wait for purgequeue to progress.
|
||||
if (!delayed_flush) {
|
||||
@ -332,8 +354,13 @@ uint32_t PurgeQueue::_calculate_ops(const PurgeItem &item) const
|
||||
return ops_required;
|
||||
}
|
||||
|
||||
bool PurgeQueue::can_consume()
|
||||
bool PurgeQueue::_can_consume()
|
||||
{
|
||||
if (readonly) {
|
||||
dout(10) << "can't consume: PurgeQueue is readonly" << dendl;
|
||||
return false;
|
||||
}
|
||||
|
||||
dout(20) << ops_in_flight << "/" << max_purge_ops << " ops, "
|
||||
<< in_flight.size() << "/" << g_conf()->mds_max_purge_files
|
||||
<< " files" << dendl;
|
||||
@ -361,12 +388,23 @@ bool PurgeQueue::can_consume()
|
||||
}
|
||||
}
|
||||
|
||||
void PurgeQueue::_go_readonly(int r)
|
||||
{
|
||||
if (readonly) return;
|
||||
dout(1) << "going readonly because internal IO failed: " << strerror(-r) << dendl;
|
||||
readonly = true;
|
||||
on_error->complete(r);
|
||||
on_error = nullptr;
|
||||
journaler.set_readonly();
|
||||
finish_contexts(g_ceph_context, waiting_for_recovery, r);
|
||||
}
|
||||
|
||||
bool PurgeQueue::_consume()
|
||||
{
|
||||
ceph_assert(lock.is_locked_by_me());
|
||||
|
||||
bool could_consume = false;
|
||||
while(can_consume()) {
|
||||
while(_can_consume()) {
|
||||
|
||||
if (delayed_flush) {
|
||||
// We are now going to read from the journal, so any proactive
|
||||
@ -378,7 +416,7 @@ bool PurgeQueue::_consume()
|
||||
|
||||
if (int r = journaler.get_error()) {
|
||||
derr << "Error " << r << " recovering write_pos" << dendl;
|
||||
on_error->complete(r);
|
||||
_go_readonly(r);
|
||||
return could_consume;
|
||||
}
|
||||
|
||||
@ -392,7 +430,7 @@ bool PurgeQueue::_consume()
|
||||
if (r == 0) {
|
||||
_consume();
|
||||
} else if (r != -EAGAIN) {
|
||||
on_error->complete(r);
|
||||
_go_readonly(r);
|
||||
}
|
||||
}));
|
||||
}
|
||||
@ -414,7 +452,7 @@ bool PurgeQueue::_consume()
|
||||
} catch (const buffer::error &err) {
|
||||
derr << "Decode error at read_pos=0x" << std::hex
|
||||
<< journaler.get_read_pos() << dendl;
|
||||
on_error->complete(0);
|
||||
_go_readonly(EIO);
|
||||
}
|
||||
dout(20) << " executing item (" << item.ino << ")" << dendl;
|
||||
_execute_item(item, journaler.get_read_pos());
|
||||
@ -581,6 +619,11 @@ void PurgeQueue::update_op_limit(const MDSMap &mds_map)
|
||||
{
|
||||
std::lock_guard l(lock);
|
||||
|
||||
if (readonly) {
|
||||
dout(10) << "skipping; PurgeQueue is readonly" << dendl;
|
||||
return;
|
||||
}
|
||||
|
||||
uint64_t pg_count = 0;
|
||||
objecter->with_osdmap([&](const OSDMap& o) {
|
||||
// Number of PGs across all data pools
|
||||
@ -636,6 +679,13 @@ bool PurgeQueue::drain(
|
||||
size_t *in_flight_count
|
||||
)
|
||||
{
|
||||
std::lock_guard l(lock);
|
||||
|
||||
if (readonly) {
|
||||
dout(10) << "skipping drain; PurgeQueue is readonly" << dendl;
|
||||
return true;
|
||||
}
|
||||
|
||||
ceph_assert(progress != nullptr);
|
||||
ceph_assert(progress_total != nullptr);
|
||||
ceph_assert(in_flight_count != nullptr);
|
||||
@ -668,7 +718,7 @@ bool PurgeQueue::drain(
|
||||
return false;
|
||||
}
|
||||
|
||||
std::string PurgeItem::get_type_str() const
|
||||
std::string_view PurgeItem::get_type_str() const
|
||||
{
|
||||
switch(action) {
|
||||
case PurgeItem::NONE: return "NONE";
|
||||
|
@ -76,7 +76,7 @@ public:
|
||||
f->close_section();
|
||||
}
|
||||
|
||||
std::string get_type_str() const;
|
||||
std::string_view get_type_str() const;
|
||||
private:
|
||||
static const std::map<std::string, PurgeItem::Action> actions;
|
||||
};
|
||||
@ -106,6 +106,7 @@ protected:
|
||||
CephContext *cct;
|
||||
const mds_rank_t rank;
|
||||
Mutex lock;
|
||||
bool readonly = false;
|
||||
|
||||
int64_t metadata_pool;
|
||||
|
||||
@ -134,7 +135,7 @@ protected:
|
||||
|
||||
uint32_t _calculate_ops(const PurgeItem &item) const;
|
||||
|
||||
bool can_consume();
|
||||
bool _can_consume();
|
||||
|
||||
// How many bytes were remaining when drain() was first called,
|
||||
// used for indicating progress.
|
||||
@ -164,6 +165,8 @@ protected:
|
||||
bool recovered;
|
||||
std::list<Context*> waiting_for_recovery;
|
||||
|
||||
void _go_readonly(int r);
|
||||
|
||||
public:
|
||||
void init();
|
||||
void activate();
|
||||
|
Loading…
Reference in New Issue
Block a user