mirror of
https://github.com/ceph/ceph
synced 2025-03-30 07:19:14 +00:00
Merge PR #39138 into master
* refs/pull/39138/head: qa: valgrind test for cephfs-mirror daemon cephfs-mirror: use preforker for daemonizing test: adjust sleep time to account for valgrind runs cephfs-mirror: gracefully shutdown threads, timers, etc.. cephfs-mirror: call ceph_release() to cleanup mount alloc cephfs-mirror: shutdown filesystem/cluster connections on shutdown cephfs-mirror: set init failed flag on FSMirror::init() failure Reviewed-by: Patrick Donnelly <pdonnell@redhat.com>
This commit is contained in:
commit
a3591378a5
1
qa/suites/fs/valgrind/.qa
Symbolic link
1
qa/suites/fs/valgrind/.qa
Symbolic link
@ -0,0 +1 @@
|
||||
../.qa/
|
1
qa/suites/fs/valgrind/begin.yaml
Symbolic link
1
qa/suites/fs/valgrind/begin.yaml
Symbolic link
@ -0,0 +1 @@
|
||||
.qa/cephfs/begin.yaml
|
1
qa/suites/fs/valgrind/centos_latest.yaml
Symbolic link
1
qa/suites/fs/valgrind/centos_latest.yaml
Symbolic link
@ -0,0 +1 @@
|
||||
.qa/distros/supported/centos_latest.yaml
|
0
qa/suites/fs/valgrind/mirror/%
Normal file
0
qa/suites/fs/valgrind/mirror/%
Normal file
1
qa/suites/fs/valgrind/mirror/.qa
Symbolic link
1
qa/suites/fs/valgrind/mirror/.qa
Symbolic link
@ -0,0 +1 @@
|
||||
../.qa/
|
@ -0,0 +1,7 @@
|
||||
meta:
|
||||
- desc: run one cephfs-mirror daemon on primary cluster
|
||||
|
||||
tasks:
|
||||
- cephfs-mirror:
|
||||
client: client.mirror
|
||||
valgrind: [--tool=memcheck, --leak-check=full, --show-reachable=yes]
|
18
qa/suites/fs/valgrind/mirror/clients/mirror.yaml
Normal file
18
qa/suites/fs/valgrind/mirror/clients/mirror.yaml
Normal file
@ -0,0 +1,18 @@
|
||||
meta:
|
||||
- desc: configure the permissions for client.mirror
|
||||
overrides:
|
||||
ceph:
|
||||
conf:
|
||||
client:
|
||||
debug cephfs_mirror: 20
|
||||
log to stderr: false
|
||||
# make these predictable
|
||||
client.mirror:
|
||||
admin socket: /var/run/ceph/cephfs-mirror.asok
|
||||
pid file: /var/run/ceph/cephfs-mirror.pid
|
||||
tasks:
|
||||
- exec:
|
||||
client.mirror:
|
||||
- "sudo ceph auth caps client.mirror mon 'allow r' mds 'allow r' osd 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*' mgr 'allow r'"
|
||||
client.mirror_remote:
|
||||
- "sudo ceph auth caps client.mirror_remote mon 'allow r' mds 'allow rwps' osd 'allow rw tag cephfs *=*' mgr 'allow r'"
|
17
qa/suites/fs/valgrind/mirror/cluster/1-node.yaml
Normal file
17
qa/suites/fs/valgrind/mirror/cluster/1-node.yaml
Normal file
@ -0,0 +1,17 @@
|
||||
meta:
|
||||
- desc: 1 ceph cluster with 1 mon, 1 mgr, 3 osds, 5 mdss
|
||||
roles:
|
||||
- - mon.a
|
||||
- mgr.x
|
||||
- mds.a
|
||||
- mds.b
|
||||
- mds.c
|
||||
- mds.d
|
||||
- mds.e
|
||||
- osd.0
|
||||
- osd.1
|
||||
- osd.2
|
||||
- client.0
|
||||
- client.1
|
||||
- client.mirror
|
||||
- client.mirror_remote
|
2
qa/suites/fs/valgrind/mirror/mount/fuse.yaml
Normal file
2
qa/suites/fs/valgrind/mirror/mount/fuse.yaml
Normal file
@ -0,0 +1,2 @@
|
||||
tasks:
|
||||
- ceph-fuse: [client.0, client.1]
|
1
qa/suites/fs/valgrind/mirror/overrides/.qa
Symbolic link
1
qa/suites/fs/valgrind/mirror/overrides/.qa
Symbolic link
@ -0,0 +1 @@
|
||||
../.qa
|
1
qa/suites/fs/valgrind/mirror/overrides/whitelist_health.yaml
Symbolic link
1
qa/suites/fs/valgrind/mirror/overrides/whitelist_health.yaml
Symbolic link
@ -0,0 +1 @@
|
||||
.qa/cephfs/overrides/whitelist_health.yaml
|
10
qa/suites/fs/valgrind/mirror/tasks/mirror.yaml
Normal file
10
qa/suites/fs/valgrind/mirror/tasks/mirror.yaml
Normal file
@ -0,0 +1,10 @@
|
||||
overrides:
|
||||
ceph:
|
||||
conf:
|
||||
mgr:
|
||||
debug client: 10
|
||||
|
||||
tasks:
|
||||
- cephfs_test_runner:
|
||||
modules:
|
||||
- tasks.cephfs.test_mirroring
|
8
qa/suites/fs/valgrind/notcmalloc.yaml
Normal file
8
qa/suites/fs/valgrind/notcmalloc.yaml
Normal file
@ -0,0 +1,8 @@
|
||||
meta:
|
||||
- desc: use notcmalloc version for valgrind
|
||||
|
||||
overrides:
|
||||
install:
|
||||
ceph:
|
||||
flavor: notcmalloc
|
||||
debuginfo: true
|
@ -596,7 +596,7 @@ class TestMirroring(CephFSTestCase):
|
||||
blocklist = self.get_blocklisted_instances()
|
||||
self.assertTrue(rados_inst in blocklist)
|
||||
|
||||
time.sleep(240)
|
||||
time.sleep(500)
|
||||
self.check_peer_status(self.primary_fs_name, self.primary_fs_id,
|
||||
"client.mirror_remote@ceph", '/d0', 'snap0', expected_snap_count=1)
|
||||
self.verify_snapshot('d0', 'snap0')
|
||||
|
@ -41,8 +41,15 @@ class CephFSMirror(Task):
|
||||
'term',
|
||||
]
|
||||
|
||||
if 'valgrind' in self.config:
|
||||
args = misc.get_valgrind_args(
|
||||
testdir, 'cephfs-mirror-{id}'.format(id=self.client),
|
||||
args, self.config.get('valgrind'))
|
||||
|
||||
args.extend([
|
||||
'cephfs-mirror',
|
||||
'--cluster',
|
||||
self.cluster_name,
|
||||
'--id',
|
||||
self.client_id,
|
||||
])
|
||||
|
@ -104,7 +104,6 @@ FSMirror::~FSMirror() {
|
||||
std::scoped_lock locker(m_lock);
|
||||
delete m_instance_watcher;
|
||||
delete m_mirror_watcher;
|
||||
m_cluster.reset();
|
||||
}
|
||||
// outside the lock so that in-progress commands can acquire
|
||||
// lock and finish executing.
|
||||
@ -123,6 +122,7 @@ void FSMirror::shutdown_replayer(PeerReplayer *peer_replayer) {
|
||||
void FSMirror::cleanup() {
|
||||
dout(20) << dendl;
|
||||
ceph_unmount(m_mount);
|
||||
ceph_release(m_mount);
|
||||
m_ioctx.close();
|
||||
m_cluster.reset();
|
||||
}
|
||||
@ -134,12 +134,14 @@ void FSMirror::init(Context *on_finish) {
|
||||
int r = connect(g_ceph_context->_conf->name.to_str(),
|
||||
g_ceph_context->_conf->cluster, &m_cluster);
|
||||
if (r < 0) {
|
||||
m_init_failed = true;
|
||||
on_finish->complete(r);
|
||||
return;
|
||||
}
|
||||
|
||||
r = m_cluster->ioctx_create2(m_pool_id, m_ioctx);
|
||||
if (r < 0) {
|
||||
m_init_failed = true;
|
||||
m_cluster.reset();
|
||||
derr << ": error accessing local pool (id=" << m_pool_id << "): "
|
||||
<< cpp_strerror(r) << dendl;
|
||||
@ -149,6 +151,7 @@ void FSMirror::init(Context *on_finish) {
|
||||
|
||||
r = mount(m_cluster, m_filesystem, true, &m_mount);
|
||||
if (r < 0) {
|
||||
m_init_failed = true;
|
||||
m_ioctx.close();
|
||||
m_cluster.reset();
|
||||
on_finish->complete(r);
|
||||
@ -170,13 +173,12 @@ void FSMirror::shutdown(Context *on_finish) {
|
||||
if (m_on_init_finish != nullptr) {
|
||||
dout(10) << ": delaying shutdown -- init in progress" << dendl;
|
||||
m_on_shutdown_finish = new LambdaContext([this, on_finish](int r) {
|
||||
cleanup();
|
||||
if (r < 0) {
|
||||
on_finish->complete(0);
|
||||
return;
|
||||
}
|
||||
m_on_shutdown_finish = on_finish;
|
||||
shutdown_mirror_watcher();
|
||||
shutdown_peer_replayers();
|
||||
});
|
||||
return;
|
||||
}
|
||||
@ -184,11 +186,18 @@ void FSMirror::shutdown(Context *on_finish) {
|
||||
m_on_shutdown_finish = on_finish;
|
||||
}
|
||||
|
||||
shutdown_peer_replayers();
|
||||
}
|
||||
|
||||
void FSMirror::shutdown_peer_replayers() {
|
||||
dout(20) << dendl;
|
||||
|
||||
for (auto &[peer, peer_replayer] : m_peer_replayers) {
|
||||
dout(5) << ": shutting down replayer for peer=" << peer << dendl;
|
||||
shutdown_replayer(peer_replayer.get());
|
||||
dout(5) << ": shutting down replayer for peer=" << peer << dendl;
|
||||
shutdown_replayer(peer_replayer.get());
|
||||
}
|
||||
m_peer_replayers.clear();
|
||||
|
||||
shutdown_mirror_watcher();
|
||||
}
|
||||
|
||||
@ -196,8 +205,11 @@ void FSMirror::init_instance_watcher(Context *on_finish) {
|
||||
dout(20) << dendl;
|
||||
|
||||
m_on_init_finish = new LambdaContext([this, on_finish](int r) {
|
||||
if (r < 0) {
|
||||
m_init_failed = true;
|
||||
{
|
||||
std::scoped_lock locker(m_lock);
|
||||
if (r < 0) {
|
||||
m_init_failed = true;
|
||||
}
|
||||
}
|
||||
on_finish->complete(r);
|
||||
if (m_on_shutdown_finish != nullptr) {
|
||||
@ -281,12 +293,14 @@ void FSMirror::shutdown_instance_watcher() {
|
||||
std::scoped_lock locker(m_lock);
|
||||
Context *ctx = new C_CallbackAdapter<
|
||||
FSMirror, &FSMirror::handle_shutdown_instance_watcher>(this);
|
||||
m_instance_watcher->shutdown(ctx);
|
||||
m_instance_watcher->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, ctx));
|
||||
}
|
||||
|
||||
void FSMirror::handle_shutdown_instance_watcher(int r) {
|
||||
dout(20) << ": r=" << r << dendl;
|
||||
|
||||
cleanup();
|
||||
|
||||
Context *on_init_finish = nullptr;
|
||||
Context *on_shutdown_finish = nullptr;
|
||||
|
||||
|
@ -128,6 +128,8 @@ private:
|
||||
void init_mirror_watcher();
|
||||
void handle_init_mirror_watcher(int r);
|
||||
|
||||
void shutdown_peer_replayers();
|
||||
|
||||
void shutdown_mirror_watcher();
|
||||
void handle_shutdown_mirror_watcher(int r);
|
||||
|
||||
|
@ -35,11 +35,6 @@ public:
|
||||
: SafeTimer(cct, timer_lock, true) {
|
||||
init();
|
||||
}
|
||||
|
||||
~SafeTimerSingleton() {
|
||||
std::scoped_lock locker(timer_lock);
|
||||
shutdown();
|
||||
}
|
||||
};
|
||||
|
||||
class ThreadPoolSingleton : public ThreadPool {
|
||||
@ -52,13 +47,6 @@ public:
|
||||
|
||||
start();
|
||||
}
|
||||
|
||||
~ThreadPoolSingleton() override {
|
||||
work_queue->drain();
|
||||
delete work_queue;
|
||||
|
||||
stop();
|
||||
}
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
@ -212,6 +200,7 @@ Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args,
|
||||
"cephfs::mirror::thread_pool", false, cct));
|
||||
auto safe_timer = &(cct->lookup_or_create_singleton_object<SafeTimerSingleton>(
|
||||
"cephfs::mirror::safe_timer", false, cct));
|
||||
m_thread_pool = thread_pool;
|
||||
m_work_queue = thread_pool->work_queue;
|
||||
m_timer = safe_timer;
|
||||
m_timer_lock = &safe_timer->timer_lock;
|
||||
@ -221,12 +210,25 @@ Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args,
|
||||
|
||||
Mirror::~Mirror() {
|
||||
dout(10) << dendl;
|
||||
{
|
||||
std::scoped_lock timer_lock(*m_timer_lock);
|
||||
m_timer->shutdown();
|
||||
}
|
||||
|
||||
m_work_queue->drain();
|
||||
delete m_work_queue;
|
||||
{
|
||||
std::scoped_lock locker(m_lock);
|
||||
m_thread_pool->stop();
|
||||
m_cluster_watcher.reset();
|
||||
}
|
||||
}
|
||||
|
||||
int Mirror::init_mon_client() {
|
||||
dout(20) << dendl;
|
||||
|
||||
m_monc->set_messenger(m_msgr);
|
||||
m_msgr->add_dispatcher_head(m_monc);
|
||||
m_monc->set_want_keys(CEPH_ENTITY_TYPE_MON);
|
||||
|
||||
int r = m_monc->init();
|
||||
@ -262,20 +264,14 @@ void Mirror::shutdown() {
|
||||
dout(20) << dendl;
|
||||
|
||||
std::unique_lock locker(m_lock);
|
||||
if (m_mirror_actions.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
m_stopping = true;
|
||||
m_cond.notify_all();
|
||||
m_cond.wait(locker, [this] {return m_stopped;});
|
||||
}
|
||||
|
||||
void Mirror::handle_signal(int signum) {
|
||||
dout(10) << ": signal=" << signum << dendl;
|
||||
ceph_assert(signum == SIGTERM || signum == SIGINT);
|
||||
shutdown();
|
||||
::exit(0);
|
||||
}
|
||||
|
||||
void Mirror::handle_enable_mirroring(const Filesystem &filesystem,
|
||||
@ -537,9 +533,6 @@ void Mirror::run() {
|
||||
dout(10) << ": shutdown filesystem=" << filesystem << ", r=" << r << dendl;
|
||||
}
|
||||
}
|
||||
|
||||
m_stopped = true;
|
||||
m_cond.notify_all();
|
||||
}
|
||||
|
||||
} // namespace mirror
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <vector>
|
||||
|
||||
#include "common/ceph_mutex.h"
|
||||
#include "common/WorkQueue.h"
|
||||
#include "mds/FSMap.h"
|
||||
#include "ClusterWatcher.h"
|
||||
#include "FSMirror.h"
|
||||
@ -88,13 +89,13 @@ private:
|
||||
Messenger *m_msgr;
|
||||
ClusterListener m_listener;
|
||||
|
||||
ThreadPool *m_thread_pool = nullptr;
|
||||
ContextWQ *m_work_queue = nullptr;
|
||||
SafeTimer *m_timer = nullptr;
|
||||
ceph::mutex *m_timer_lock = nullptr;
|
||||
Context *m_timer_task = nullptr;
|
||||
|
||||
bool m_stopping = false;
|
||||
bool m_stopped = false;
|
||||
std::unique_ptr<ClusterWatcher> m_cluster_watcher;
|
||||
std::map<Filesystem, MirrorAction> m_mirror_actions;
|
||||
|
||||
|
@ -187,6 +187,7 @@ void PeerReplayer::shutdown() {
|
||||
}
|
||||
m_replayers.clear();
|
||||
ceph_unmount(m_remote_mount);
|
||||
ceph_release(m_remote_mount);
|
||||
m_remote_mount = nullptr;
|
||||
m_remote_cluster.reset();
|
||||
}
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include "common/debug.h"
|
||||
#include "common/errno.h"
|
||||
#include "common/async/context_pool.h"
|
||||
#include "common/Preforker.h"
|
||||
#include "global/global_init.h"
|
||||
#include "global/signal_handler.h"
|
||||
#include "mon/MonClient.h"
|
||||
@ -47,12 +48,32 @@ int main(int argc, const char **argv) {
|
||||
CODE_ENVIRONMENT_DAEMON,
|
||||
CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
|
||||
|
||||
if (g_conf()->daemonize) {
|
||||
global_init_daemonize(g_ceph_context);
|
||||
Preforker forker;
|
||||
if (global_init_prefork(g_ceph_context) >= 0) {
|
||||
std::string err;
|
||||
int r = forker.prefork(err);
|
||||
if (r < 0) {
|
||||
cerr << err << std::endl;
|
||||
return r;
|
||||
}
|
||||
if (forker.is_parent()) {
|
||||
g_ceph_context->_log->start();
|
||||
if (forker.parent_wait(err) != 0) {
|
||||
return -ENXIO;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
global_init_postfork_start(g_ceph_context);
|
||||
}
|
||||
|
||||
common_init_finish(g_ceph_context);
|
||||
|
||||
bool daemonize = g_conf().get_val<bool>("daemonize");
|
||||
if (daemonize) {
|
||||
global_init_postfork_finish(g_ceph_context);
|
||||
forker.daemonize();
|
||||
}
|
||||
|
||||
init_async_signal_handler();
|
||||
register_async_signal_handler_oneshot(SIGINT, handle_signal);
|
||||
register_async_signal_handler_oneshot(SIGTERM, handle_signal);
|
||||
@ -82,12 +103,18 @@ int main(int argc, const char **argv) {
|
||||
}
|
||||
|
||||
mirror->run();
|
||||
delete mirror;
|
||||
|
||||
cleanup:
|
||||
monc.shutdown();
|
||||
cleanup_messenger:
|
||||
msgr->shutdown();
|
||||
delete mirror;
|
||||
msgr->wait();
|
||||
delete msgr;
|
||||
|
||||
return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE;
|
||||
unregister_async_signal_handler(SIGINT, handle_signal);
|
||||
unregister_async_signal_handler(SIGTERM, handle_signal);
|
||||
shutdown_async_signal_handler();
|
||||
|
||||
return forker.signal_exit(r);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user