ceph/qa/tasks/ceph_test_case.py
Sridhar Seshasayee 328271d587 qa/tasks: Enhance wait_until_true() to check & retry recovery progress
With mclock scheduler enabled, the recovery throughput is throttled based
on factors like the type of mclock profile enabled, the OSD capacity among
others. Due to this the recovery times may vary and therefore the existing
timeout of 120 secs may not be sufficient.

To address the above, a new method called _is_inprogress_or_complete() is
introduced in the TestProgress Class that checks if the event with the
specified 'id' is in progress by checking the 'progress' key of the
progress command response. This method also handles the corner case where
the event completes just before it's called.

The existing wait_until_true() method in the CephTestCase Class is
modified to accept another function argument called "check_fn". This is
set to the _is_inprogress_or_complete() function described earlier in the
"test_turn_off_module" test that has been observed to fail due to the
reasons already described above. A retry mechanism of a maximum of 5
attempts is introduced after the first timeout is hit. This means that
the wait can extend up to a maximum of 600 secs (120 secs * 5) as long as
there is recovery progress reported by the 'ceph progress' command result.

Signed-off-by: Sridhar Seshasayee <sseshasa@redhat.com>
2021-06-02 14:19:48 +05:30

214 lines
8.1 KiB
Python

from typing import Optional, TYPE_CHECKING
import unittest
import time
import logging
from teuthology.orchestra.run import CommandFailedError
if TYPE_CHECKING:
from tasks.mgr.mgr_test_case import MgrCluster
log = logging.getLogger(__name__)
class TestTimeoutError(RuntimeError):
pass
class CephTestCase(unittest.TestCase):
"""
For test tasks that want to define a structured set of
tests implemented in python. Subclass this with appropriate
helpers for the subsystem you're testing.
"""
# Environment references
mounts = None
fs = None
recovery_fs = None
backup_fs = None
ceph_cluster = None
mds_cluster = None
mgr_cluster: Optional['MgrCluster'] = None
ctx = None
mon_manager = None
# Declarative test requirements: subclasses should override these to indicate
# their special needs. If not met, tests will be skipped.
REQUIRE_MEMSTORE = False
def setUp(self):
self._mon_configs_set = set()
self.ceph_cluster.mon_manager.raw_cluster_cmd("log",
"Starting test {0}".format(self.id()))
if self.REQUIRE_MEMSTORE:
objectstore = self.ceph_cluster.get_config("osd_objectstore", "osd")
if objectstore != "memstore":
# You certainly *could* run this on a real OSD, but you don't want to sit
# here for hours waiting for the test to fill up a 1TB drive!
raise self.skipTest("Require `memstore` OSD backend (test " \
"would take too long on full sized OSDs")
def tearDown(self):
self.config_clear()
self.ceph_cluster.mon_manager.raw_cluster_cmd("log",
"Ended test {0}".format(self.id()))
def config_clear(self):
for section, key in self._mon_configs_set:
self.config_rm(section, key)
self._mon_configs_set.clear()
def _fix_key(self, key):
return str(key).replace(' ', '_')
def config_get(self, section, key):
key = self._fix_key(key)
return self.ceph_cluster.mon_manager.raw_cluster_cmd("config", "get", section, key).strip()
def config_show(self, entity, key):
key = self._fix_key(key)
return self.ceph_cluster.mon_manager.raw_cluster_cmd("config", "show", entity, key).strip()
def config_minimal(self):
return self.ceph_cluster.mon_manager.raw_cluster_cmd("config", "generate-minimal-conf").strip()
def config_rm(self, section, key):
key = self._fix_key(key)
self.ceph_cluster.mon_manager.raw_cluster_cmd("config", "rm", section, key)
# simplification: skip removing from _mon_configs_set;
# let tearDown clear everything again
def config_set(self, section, key, value):
key = self._fix_key(key)
self._mon_configs_set.add((section, key))
self.ceph_cluster.mon_manager.raw_cluster_cmd("config", "set", section, key, str(value))
def assert_cluster_log(self, expected_pattern, invert_match=False,
timeout=10, watch_channel=None):
"""
Context manager. Assert that during execution, or up to 5 seconds later,
the Ceph cluster log emits a message matching the expected pattern.
:param expected_pattern: A string that you expect to see in the log output
:type expected_pattern: str
:param watch_channel: Specifies the channel to be watched. This can be
'cluster', 'audit', ...
:type watch_channel: str
"""
ceph_manager = self.ceph_cluster.mon_manager
class ContextManager(object):
def match(self):
found = expected_pattern in self.watcher_process.stdout.getvalue()
if invert_match:
return not found
return found
def __enter__(self):
# XXX: For reason behind setting "shell" to False, see
# https://tracker.ceph.com/issues/49644.
self.watcher_process = ceph_manager.run_ceph_w(watch_channel,
shell=False)
def __exit__(self, exc_type, exc_val, exc_tb):
if not self.watcher_process.finished:
# Check if we got an early match, wait a bit if we didn't
if self.match():
return
else:
log.debug("No log hits yet, waiting...")
# Default monc tick interval is 10s, so wait that long and
# then some grace
time.sleep(5 + timeout)
self.watcher_process.stdin.close()
try:
self.watcher_process.wait()
except CommandFailedError:
pass
if not self.match():
log.error("Log output: \n{0}\n".format(self.watcher_process.stdout.getvalue()))
raise AssertionError("Expected log message not found: '{0}'".format(expected_pattern))
return ContextManager()
def wait_for_health(self, pattern, timeout):
"""
Wait until 'ceph health' contains messages matching the pattern
"""
def seen_health_warning():
health = self.ceph_cluster.mon_manager.get_mon_health()
codes = [s for s in health['checks']]
summary_strings = [s[1]['summary']['message'] for s in health['checks'].items()]
if len(summary_strings) == 0:
log.debug("Not expected number of summary strings ({0})".format(summary_strings))
return False
else:
for ss in summary_strings:
if pattern in ss:
return True
if pattern in codes:
return True
log.debug("Not found expected summary strings yet ({0})".format(summary_strings))
return False
self.wait_until_true(seen_health_warning, timeout)
def wait_for_health_clear(self, timeout):
"""
Wait until `ceph health` returns no messages
"""
def is_clear():
health = self.ceph_cluster.mon_manager.get_mon_health()
return len(health['checks']) == 0
self.wait_until_true(is_clear, timeout)
def wait_until_equal(self, get_fn, expect_val, timeout, reject_fn=None, period=5):
elapsed = 0
while True:
val = get_fn()
if val == expect_val:
return
elif reject_fn and reject_fn(val):
raise RuntimeError("wait_until_equal: forbidden value {0} seen".format(val))
else:
if elapsed >= timeout:
raise TestTimeoutError("Timed out after {0} seconds waiting for {1} (currently {2})".format(
elapsed, expect_val, val
))
else:
log.debug("wait_until_equal: {0} != {1}, waiting (timeout={2})...".format(val, expect_val, timeout))
time.sleep(period)
elapsed += period
log.debug("wait_until_equal: success")
@classmethod
def wait_until_true(cls, condition, timeout, check_fn=None, period=5):
elapsed = 0
retry_count = 0
while True:
if condition():
log.debug("wait_until_true: success in {0}s and {1} retries".format(elapsed, retry_count))
return
else:
if elapsed >= timeout:
if check_fn and check_fn() and retry_count < 5:
elapsed = 0
retry_count += 1
log.debug("wait_until_true: making progress, waiting (timeout={0} retry_count={1})...".format(timeout, retry_count))
else:
raise TestTimeoutError("Timed out after {0}s and {1} retries".format(elapsed, retry_count))
else:
log.debug("wait_until_true: waiting (timeout={0} retry_count={1})...".format(timeout, retry_count))
time.sleep(period)
elapsed += period