mirror of
https://github.com/ceph/ceph
synced 2025-01-19 09:32:00 +00:00
Merge pull request #14232 from jcsp/wip-19412
mgr: fix python module teardown & add tests Reviewed-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
commit
c237e7ed29
@ -45,7 +45,7 @@ class MgrCluster(CephCluster):
|
||||
|
||||
|
||||
class MgrTestCase(CephTestCase):
|
||||
REQUIRE_MGRS = 1
|
||||
MGRS_REQUIRED = 1
|
||||
|
||||
def setUp(self):
|
||||
super(MgrTestCase, self).setUp()
|
||||
@ -53,10 +53,10 @@ class MgrTestCase(CephTestCase):
|
||||
# The test runner should have populated this
|
||||
assert self.mgr_cluster is not None
|
||||
|
||||
if len(self.mgr_cluster.mgr_ids) < self.REQUIRE_MGRS:
|
||||
if len(self.mgr_cluster.mgr_ids) < self.MGRS_REQUIRED:
|
||||
raise case.SkipTest("Only have {0} manager daemons, "
|
||||
"{1} are required".format(
|
||||
len(self.mgr_cluster.mgr_ids), self.REQUIRE_MGRS))
|
||||
len(self.mgr_cluster.mgr_ids), self.MGRS_REQUIRED))
|
||||
|
||||
# Restart all the daemons
|
||||
for daemon in self.mgr_cluster.mgr_daemons.values():
|
||||
|
@ -8,7 +8,7 @@ log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TestFailover(MgrTestCase):
|
||||
REQUIRE_MGRS = 2
|
||||
MGRS_REQUIRED = 2
|
||||
|
||||
def test_timeout(self):
|
||||
"""
|
||||
@ -35,6 +35,38 @@ class TestFailover(MgrTestCase):
|
||||
timeout=10
|
||||
)
|
||||
|
||||
def test_timeout_nostandby(self):
|
||||
"""
|
||||
That when an active mgr stop responding, and no standby is
|
||||
available, the active mgr is removed from the map anyway.
|
||||
"""
|
||||
# Query which mgr is active
|
||||
original_active = self.mgr_cluster.get_active_id()
|
||||
original_standbys = self.mgr_cluster.get_standby_ids()
|
||||
|
||||
for s in original_standbys:
|
||||
self.mgr_cluster.mgr_stop(s)
|
||||
self.mgr_cluster.mgr_fail(s)
|
||||
|
||||
self.assertListEqual(self.mgr_cluster.get_standby_ids(), [])
|
||||
self.assertEqual(self.mgr_cluster.get_active_id(), original_active)
|
||||
|
||||
grace = int(self.mgr_cluster.get_config("mon_mgr_beacon_grace"))
|
||||
log.info("Should time out in about {0} seconds".format(grace))
|
||||
|
||||
self.mgr_cluster.mgr_stop(original_active)
|
||||
|
||||
# Now wait for the mon to notice the mgr is gone and remove it
|
||||
# from the map.
|
||||
self.wait_until_equal(
|
||||
lambda: self.mgr_cluster.get_active_id(),
|
||||
"",
|
||||
timeout=grace * 2
|
||||
)
|
||||
|
||||
self.assertListEqual(self.mgr_cluster.get_standby_ids(), [])
|
||||
self.assertEqual(self.mgr_cluster.get_active_id(), "")
|
||||
|
||||
def test_explicit_fail(self):
|
||||
"""
|
||||
That when a user explicitly fails a daemon, a standby immediately
|
||||
@ -60,6 +92,27 @@ class TestFailover(MgrTestCase):
|
||||
timeout=10
|
||||
)
|
||||
|
||||
# We should be able to fail back over again: the exercises
|
||||
# our re-initialization of the python runtime within
|
||||
# a single process lifetime.
|
||||
|
||||
# Get rid of any bystander standbys so that the original_active
|
||||
# will be selected as next active.
|
||||
new_active = self.mgr_cluster.get_active_id()
|
||||
for daemon in original_standbys:
|
||||
if daemon != new_active:
|
||||
self.mgr_cluster.mgr_stop(daemon)
|
||||
self.mgr_cluster.mgr_fail(daemon)
|
||||
|
||||
self.assertListEqual(self.mgr_cluster.get_standby_ids(),
|
||||
[original_active])
|
||||
|
||||
self.mgr_cluster.mgr_stop(new_active)
|
||||
self.mgr_cluster.mgr_fail(new_active)
|
||||
|
||||
self.assertEqual(self.mgr_cluster.get_active_id(), original_active)
|
||||
self.assertEqual(self.mgr_cluster.get_standby_ids(), [])
|
||||
|
||||
def test_standby_timeout(self):
|
||||
"""
|
||||
That when a standby daemon stops sending beacons, it is
|
||||
|
@ -672,9 +672,6 @@ class LocalMDSCluster(LocalCephCluster, MDSCluster):
|
||||
super(LocalMDSCluster, self).__init__(ctx)
|
||||
|
||||
self.mds_ids = ctx.daemons.daemons['mds'].keys()
|
||||
if not self.mds_ids:
|
||||
raise RuntimeError("No MDSs found in ceph.conf!")
|
||||
|
||||
self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
|
||||
|
||||
def clear_firewall(self):
|
||||
@ -690,9 +687,6 @@ class LocalMgrCluster(LocalCephCluster, MgrCluster):
|
||||
super(LocalMgrCluster, self).__init__(ctx)
|
||||
|
||||
self.mgr_ids = ctx.daemons.daemons['mgr'].keys()
|
||||
if not self.mgr_ids:
|
||||
raise RuntimeError("No manager daemonss found in ceph.conf!")
|
||||
|
||||
self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids])
|
||||
|
||||
|
||||
@ -811,14 +805,17 @@ def scan_tests(modules):
|
||||
|
||||
max_required_mds = 0
|
||||
max_required_clients = 0
|
||||
max_required_mgr = 0
|
||||
|
||||
for suite, case in enumerate_methods(overall_suite):
|
||||
max_required_mds = max(max_required_mds,
|
||||
getattr(case, "MDSS_REQUIRED", 0))
|
||||
max_required_clients = max(max_required_clients,
|
||||
getattr(case, "CLIENTS_REQUIRED", 0))
|
||||
max_required_mgr = max(max_required_mgr,
|
||||
getattr(case, "MGRS_REQUIRED", 0))
|
||||
|
||||
return max_required_mds, max_required_clients
|
||||
return max_required_mds, max_required_clients, max_required_mgr
|
||||
|
||||
|
||||
class LocalCluster(object):
|
||||
@ -880,7 +877,7 @@ def exec_test():
|
||||
log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries)))
|
||||
sys.exit(-1)
|
||||
|
||||
max_required_mds, max_required_clients = scan_tests(modules)
|
||||
max_required_mds, max_required_clients, max_required_mgr = scan_tests(modules)
|
||||
|
||||
remote = LocalRemote()
|
||||
|
||||
@ -906,7 +903,7 @@ def exec_test():
|
||||
vstart_env["FS"] = "0"
|
||||
vstart_env["MDS"] = max_required_mds.__str__()
|
||||
vstart_env["OSD"] = "1"
|
||||
vstart_env["MGR"] = "1"
|
||||
vstart_env["MGR"] = max(max_required_mgr, 1).__str__()
|
||||
|
||||
remote.run([os.path.join(SRC_PREFIX, "vstart.sh"), "-n", "-d", "--nolockdep"],
|
||||
env=vstart_env)
|
||||
|
@ -31,9 +31,14 @@ MgrPyModule::MgrPyModule(const std::string &module_name_)
|
||||
|
||||
MgrPyModule::~MgrPyModule()
|
||||
{
|
||||
Py_XDECREF(pModule);
|
||||
Py_XDECREF(pClass);
|
||||
PyGILState_STATE gstate;
|
||||
gstate = PyGILState_Ensure();
|
||||
|
||||
Py_XDECREF(pClassInstance);
|
||||
Py_XDECREF(pClass);
|
||||
Py_XDECREF(pModule);
|
||||
|
||||
PyGILState_Release(gstate);
|
||||
}
|
||||
|
||||
int MgrPyModule::load()
|
||||
|
@ -446,20 +446,39 @@ void PyModules::shutdown()
|
||||
{
|
||||
Mutex::Locker locker(lock);
|
||||
|
||||
// Signal modules to drop out of serve()
|
||||
for (auto& i : modules) {
|
||||
// Signal modules to drop out of serve() and/or tear down resources
|
||||
C_SaferCond shutdown_called;
|
||||
C_GatherBuilder gather(g_ceph_context);
|
||||
for (auto &i : modules) {
|
||||
auto module = i.second.get();
|
||||
finisher.queue(new FunctionContext([module](int r){
|
||||
auto shutdown_cb = gather.new_sub();
|
||||
finisher.queue(new FunctionContext([module, shutdown_cb](int r){
|
||||
module->shutdown();
|
||||
shutdown_cb->complete(0);
|
||||
}));
|
||||
}
|
||||
|
||||
if (gather.has_subs()) {
|
||||
gather.set_finisher(&shutdown_called);
|
||||
gather.activate();
|
||||
}
|
||||
|
||||
// For modules implementing serve(), finish the threads where we
|
||||
// were running that.
|
||||
for (auto &i : serve_threads) {
|
||||
lock.Unlock();
|
||||
i.second->join();
|
||||
lock.Lock();
|
||||
}
|
||||
serve_threads.clear();
|
||||
|
||||
// Wait for the module's shutdown() to complete before
|
||||
// we proceed to destroy the module.
|
||||
if (!modules.empty()) {
|
||||
dout(4) << "waiting for module shutdown calls" << dendl;
|
||||
shutdown_called.wait();
|
||||
}
|
||||
|
||||
modules.clear();
|
||||
|
||||
PyGILState_Ensure();
|
||||
|
Loading…
Reference in New Issue
Block a user