mirror of https://github.com/ceph/ceph
282 lines
9.5 KiB
Python
282 lines
9.5 KiB
Python
|
from tasks.ceph_test_case import CephTestCase
|
||
|
import logging
|
||
|
import json
|
||
|
from tasks.netsplit import disconnect, reconnect, get_ip_and_ports
|
||
|
import itertools
|
||
|
import time
|
||
|
from io import StringIO
|
||
|
log = logging.getLogger(__name__)
|
||
|
|
||
|
|
||
|
class TestNetSplit(CephTestCase):
|
||
|
MON_LIST = ["mon.a", "mon.d", "mon.g"]
|
||
|
CLIENT = "client.0"
|
||
|
CLUSTER = "ceph"
|
||
|
WRITE_PERIOD = 10
|
||
|
READ_PERIOD = 10
|
||
|
RECOVERY_PERIOD = WRITE_PERIOD * 6
|
||
|
SUCCESS_HOLD_TIME = 10
|
||
|
PEERING_CRUSH_BUCKET_COUNT = 2
|
||
|
PEERING_CRUSH_BUCKET_TARGET = 3
|
||
|
PEERING_CRUSH_BUCKET_BARRIER = 'datacenter'
|
||
|
POOL = 'pool_stretch'
|
||
|
CRUSH_RULE = 'replicated_rule_custom'
|
||
|
SIZE = 6
|
||
|
MIN_SIZE = 3
|
||
|
BUCKET_MAX = SIZE // PEERING_CRUSH_BUCKET_TARGET
|
||
|
if (BUCKET_MAX * PEERING_CRUSH_BUCKET_TARGET) < SIZE:
|
||
|
BUCKET_MAX += 1
|
||
|
|
||
|
def setUp(self):
|
||
|
"""
|
||
|
Set up the cluster for the test.
|
||
|
"""
|
||
|
super(TestNetSplit, self).setUp()
|
||
|
|
||
|
def tearDown(self):
|
||
|
"""
|
||
|
Clean up the cluter after the test.
|
||
|
"""
|
||
|
super(TestNetSplit, self).tearDown()
|
||
|
|
||
|
def _setup_pool(self, size=None, min_size=None, rule=None):
|
||
|
"""
|
||
|
Create a pool and set its size.
|
||
|
"""
|
||
|
self.mgr_cluster.mon_manager.create_pool(self.POOL, min_size=min_size)
|
||
|
if size is not None:
|
||
|
self.mgr_cluster.mon_manager.raw_cluster_cmd(
|
||
|
'osd', 'pool', 'set', self.POOL, 'size', str(size))
|
||
|
if rule is not None:
|
||
|
self.mgr_cluster.mon_manager.raw_cluster_cmd(
|
||
|
'osd', 'pool', 'set', self.POOL, 'crush_rule', rule)
|
||
|
|
||
|
def _get_pg_stats(self):
|
||
|
"""
|
||
|
Dump the cluster and get pg stats
|
||
|
"""
|
||
|
(client,) = self.ctx.cluster.only(self.CLIENT).remotes.keys()
|
||
|
arg = ['ceph', 'pg', 'dump', '--format=json']
|
||
|
proc = client.run(args=arg, wait=True, stdout=StringIO(), timeout=30)
|
||
|
if proc.exitstatus != 0:
|
||
|
log.error("pg dump failed")
|
||
|
raise Exception("pg dump failed")
|
||
|
out = proc.stdout.getvalue()
|
||
|
j = json.loads('\n'.join(out.split('\n')[1:]))
|
||
|
try:
|
||
|
return j['pg_map']['pg_stats']
|
||
|
except KeyError:
|
||
|
return j['pg_stats']
|
||
|
|
||
|
def _get_active_pg(self, pgs):
|
||
|
"""
|
||
|
Get the number of active PGs
|
||
|
"""
|
||
|
num_active = 0
|
||
|
for pg in pgs:
|
||
|
if pg['state'].count('active') and not pg['state'].count('stale'):
|
||
|
num_active += 1
|
||
|
return num_active
|
||
|
|
||
|
def _print_not_active_clean_pg(self, pgs):
|
||
|
"""
|
||
|
Print the PGs that are not active+clean.
|
||
|
"""
|
||
|
for pg in pgs:
|
||
|
if not (pg['state'].count('active') and
|
||
|
pg['state'].count('clean') and
|
||
|
not pg['state'].count('stale')):
|
||
|
log.debug(
|
||
|
"PG %s is not active+clean, but %s",
|
||
|
pg['pgid'], pg['state']
|
||
|
)
|
||
|
|
||
|
def _print_not_active_pg(self, pgs):
|
||
|
"""
|
||
|
Print the PGs that are not active.
|
||
|
"""
|
||
|
for pg in pgs:
|
||
|
if not (pg['state'].count('active') and
|
||
|
not pg['state'].count('stale')):
|
||
|
log.debug(
|
||
|
"PG %s is not active, but %s",
|
||
|
pg['pgid'], pg['state']
|
||
|
)
|
||
|
|
||
|
def _pg_all_active(self):
|
||
|
"""
|
||
|
Check if all pgs are active.
|
||
|
"""
|
||
|
pgs = self._get_pg_stats()
|
||
|
result = self._get_active_pg(pgs) == len(pgs)
|
||
|
if result:
|
||
|
log.debug("All PGs are active")
|
||
|
else:
|
||
|
log.debug("Not all PGs are active")
|
||
|
self._print_not_active_pg(pgs)
|
||
|
return result
|
||
|
|
||
|
def _get_active_clean_pg(self, pgs):
|
||
|
"""
|
||
|
Get the number of active+clean PGs
|
||
|
"""
|
||
|
num_active_clean = 0
|
||
|
for pg in pgs:
|
||
|
if (pg['state'].count('active') and
|
||
|
pg['state'].count('clean') and
|
||
|
not pg['state'].count('stale') and
|
||
|
not pg['state'].count('laggy')):
|
||
|
num_active_clean += 1
|
||
|
return num_active_clean
|
||
|
|
||
|
def _pg_all_active_clean(self):
|
||
|
"""
|
||
|
Check if all pgs are active and clean.
|
||
|
"""
|
||
|
pgs = self._get_pg_stats()
|
||
|
result = self._get_active_clean_pg(pgs) == len(pgs)
|
||
|
if result:
|
||
|
log.debug("All PGs are active+clean")
|
||
|
else:
|
||
|
log.debug("Not all PGs are active+clean")
|
||
|
self._print_not_active_clean_pg(pgs)
|
||
|
return result
|
||
|
|
||
|
def _disconnect_mons(self, config):
|
||
|
"""
|
||
|
Disconnect the mons in the <config> list.
|
||
|
"""
|
||
|
disconnect(self.ctx, config)
|
||
|
|
||
|
def _reconnect_mons(self, config):
|
||
|
"""
|
||
|
Reconnect the mons in the <config> list.
|
||
|
"""
|
||
|
reconnect(self.ctx, config)
|
||
|
|
||
|
def _reply_to_mon_command(self):
|
||
|
"""
|
||
|
Check if the cluster is accessible.
|
||
|
"""
|
||
|
try:
|
||
|
self.mgr_cluster.mon_manager.raw_cluster_cmd('status')
|
||
|
return True
|
||
|
except Exception:
|
||
|
return False
|
||
|
|
||
|
def _check_if_disconnect(self, config):
|
||
|
"""
|
||
|
Check if the mons in the <config> list are disconnected.
|
||
|
"""
|
||
|
assert config[0].startswith('mon.')
|
||
|
assert config[1].startswith('mon.')
|
||
|
log.info("Checking if the {} and {} are disconnected".format(
|
||
|
config[0], config[1]))
|
||
|
(ip1, _) = get_ip_and_ports(self.ctx, config[0])
|
||
|
(ip2, _) = get_ip_and_ports(self.ctx, config[1])
|
||
|
(host1,) = self.ctx.cluster.only(config[0]).remotes.keys()
|
||
|
(host2,) = self.ctx.cluster.only(config[1]).remotes.keys()
|
||
|
assert host1 is not None
|
||
|
assert host2 is not None
|
||
|
# if the mons are disconnected, the ping should fail (exitstatus = 1)
|
||
|
try:
|
||
|
if (host1.run(args=["ping", "-c", "1", ip2]).exitstatus == 0 or
|
||
|
host2.run(args=["ping", "-c", "1", ip1]).exitstatus == 0):
|
||
|
return False
|
||
|
except Exception:
|
||
|
return True
|
||
|
|
||
|
def _check_if_connect(self, config):
|
||
|
"""
|
||
|
Check if the mons in the <config> list are connected.
|
||
|
"""
|
||
|
assert config[0].startswith('mon.')
|
||
|
assert config[1].startswith('mon.')
|
||
|
log.info("Checking if {} and {} are connected".format(
|
||
|
config[0], config[1]))
|
||
|
(ip1, _) = get_ip_and_ports(self.ctx, config[0])
|
||
|
(ip2, _) = get_ip_and_ports(self.ctx, config[1])
|
||
|
(host1,) = self.ctx.cluster.only(config[0]).remotes.keys()
|
||
|
(host2,) = self.ctx.cluster.only(config[1]).remotes.keys()
|
||
|
assert host1 is not None
|
||
|
assert host2 is not None
|
||
|
# if the mons are connected, the ping should succeed (exitstatus = 0)
|
||
|
try:
|
||
|
if (host1.run(args=["ping", "-c", "1", ip2]).exitstatus == 0 and
|
||
|
host2.run(args=["ping", "-c", "1", ip1]).exitstatus == 0):
|
||
|
return True
|
||
|
except Exception:
|
||
|
return False
|
||
|
|
||
|
def test_mon_netsplit(self):
|
||
|
"""
|
||
|
Test the mon netsplit scenario, if cluster is actually accessible.
|
||
|
"""
|
||
|
log.info("Running test_mon_netsplit")
|
||
|
self._setup_pool(
|
||
|
self.SIZE,
|
||
|
min_size=self.MIN_SIZE,
|
||
|
rule=self.CRUSH_RULE
|
||
|
)
|
||
|
# set the pool to stretch
|
||
|
self.mgr_cluster.mon_manager.raw_cluster_cmd(
|
||
|
'osd', 'pool', 'stretch', 'set',
|
||
|
self.POOL, str(self.PEERING_CRUSH_BUCKET_COUNT),
|
||
|
str(self.PEERING_CRUSH_BUCKET_TARGET),
|
||
|
self.PEERING_CRUSH_BUCKET_BARRIER,
|
||
|
self.CRUSH_RULE, str(self.SIZE), str(self.MIN_SIZE))
|
||
|
# check if all the mons are connected
|
||
|
self.wait_until_true(
|
||
|
lambda: all(
|
||
|
[
|
||
|
self._check_if_connect([mon1, mon2])
|
||
|
for mon1, mon2 in itertools.combinations(self.MON_LIST, 2)
|
||
|
]
|
||
|
),
|
||
|
timeout=self.RECOVERY_PERIOD,
|
||
|
)
|
||
|
|
||
|
# wait for all PGs to become active
|
||
|
self.wait_until_true_and_hold(
|
||
|
lambda: self._pg_all_active(),
|
||
|
timeout=self.RECOVERY_PERIOD,
|
||
|
success_hold_time=self.SUCCESS_HOLD_TIME
|
||
|
)
|
||
|
|
||
|
# Scenario 1: disconnect Site 1 and Site 2
|
||
|
# Site 3 is still connected to both Site 1 and Site 2
|
||
|
config = ["mon.a", "mon.d"]
|
||
|
# disconnect the mons
|
||
|
self._disconnect_mons(config)
|
||
|
# wait for the mons to be disconnected
|
||
|
time.sleep(self.RECOVERY_PERIOD)
|
||
|
# check if the mons are disconnected
|
||
|
self.wait_until_true(
|
||
|
lambda: self._check_if_disconnect(config),
|
||
|
timeout=self.RECOVERY_PERIOD,
|
||
|
)
|
||
|
# check the cluster is accessible
|
||
|
self.wait_until_true_and_hold(
|
||
|
lambda: self._reply_to_mon_command(),
|
||
|
timeout=self.RECOVERY_PERIOD * 5,
|
||
|
success_hold_time=self.SUCCESS_HOLD_TIME
|
||
|
)
|
||
|
# reconnect the mons
|
||
|
self._reconnect_mons(config)
|
||
|
# wait for the mons to be reconnected
|
||
|
time.sleep(self.RECOVERY_PERIOD)
|
||
|
# check if the mons are reconnected
|
||
|
self.wait_until_true(
|
||
|
lambda: self._check_if_connect(config),
|
||
|
timeout=self.RECOVERY_PERIOD,
|
||
|
)
|
||
|
# wait for the PGs to recover
|
||
|
time.sleep(self.RECOVERY_PERIOD)
|
||
|
# check if all PGs are active+clean
|
||
|
self.wait_until_true_and_hold(
|
||
|
lambda: self._pg_all_active_clean(),
|
||
|
timeout=self.RECOVERY_PERIOD,
|
||
|
success_hold_time=self.SUCCESS_HOLD_TIME
|
||
|
)
|
||
|
log.info("test_mon_netsplit passed!")
|