Merge PR #34861 into master

* refs/pull/34861/head:
	test: adjust scrub control tests for optional scrub status
	mgr: set `task_dirty_status` on reconnect
	mds: send scrub status to ceph-mgr only when scrub is running

Reviewed-by: Patrick Donnelly <pdonnell@redhat.com>
This commit is contained in:
Patrick Donnelly 2020-07-09 06:28:13 -07:00
commit 957bcb4edf
No known key found for this signature in database
GPG Key ID: 3A2A7E25BEA8AADB
6 changed files with 101 additions and 49 deletions

View File

@ -6,6 +6,7 @@ import logging
import errno
import time
from teuthology.exceptions import CommandFailedError
from teuthology.contextutil import safe_while
import os
from tasks.cephfs.cephfs_test_case import CephFSTestCase
@ -30,22 +31,46 @@ class TestScrubControls(CephFSTestCase):
self.assertEqual(res['return_code'], expected)
def _get_scrub_status(self):
return self.fs.rank_tell(["scrub", "status"])
def _check_task_status(self, expected_status):
task_status = self.fs.get_task_status("scrub status")
active = self.fs.get_active_names()
log.debug("current active={0}".format(active))
self.assertTrue(task_status[active[0]].startswith(expected_status))
def _check_task_status(self, expected_status, timo=120):
""" check scrub status for current active mds in ceph status """
with safe_while(sleep=1, tries=120, action='wait for task status') as proceed:
while proceed():
active = self.fs.get_active_names()
log.debug("current active={0}".format(active))
task_status = self.fs.get_task_status("scrub status")
try:
if task_status[active[0]].startswith(expected_status):
return True
except KeyError:
pass
def _check_task_status_na(self, timo=120):
""" check absence of scrub status in ceph status """
with safe_while(sleep=1, tries=120, action='wait for task status') as proceed:
while proceed():
active = self.fs.get_active_names()
log.debug("current active={0}".format(active))
task_status = self.fs.get_task_status("scrub status")
if not active[0] in task_status:
return True
def create_scrub_data(self, test_dir):
for i in range(32):
dirname = "dir.{0}".format(i)
dirpath = os.path.join(test_dir, dirname)
self.mount_a.run_shell_payload(f"""
set -e
mkdir -p {dirpath}
for ((i = 0; i < 32; i++)); do
dd if=/dev/urandom of={dirpath}/filename.$i bs=1M conv=fdatasync count=1
done
""")
def test_scrub_abort(self):
test_dir = "scrub_control_test_path"
abs_test_path = "/{0}".format(test_dir)
log.info("mountpoint: {0}".format(self.mount_a.mountpoint))
client_path = os.path.join(self.mount_a.mountpoint, test_dir)
log.info("client_path: {0}".format(client_path))
log.info("Cloning repo into place")
TestScrubChecks.clone_repo(self.mount_a, client_path)
self.create_scrub_data(test_dir)
out_json = self.fs.rank_tell(["scrub", "start", abs_test_path, "recursive"])
self.assertNotEqual(out_json, None)
@ -56,8 +81,8 @@ class TestScrubControls(CephFSTestCase):
self.assertTrue("no active" in out_json['status'])
# sleep enough to fetch updated task status
time.sleep(10)
self._check_task_status("idle")
checked = self._check_task_status_na()
self.assertTrue(checked)
def test_scrub_pause_and_resume(self):
test_dir = "scrub_control_test_path"
@ -67,8 +92,7 @@ class TestScrubControls(CephFSTestCase):
client_path = os.path.join(self.mount_a.mountpoint, test_dir)
log.info("client_path: {0}".format(client_path))
log.info("Cloning repo into place")
_ = TestScrubChecks.clone_repo(self.mount_a, client_path)
self.create_scrub_data(test_dir)
out_json = self.fs.rank_tell(["scrub", "start", abs_test_path, "recursive"])
self.assertNotEqual(out_json, None)
@ -78,25 +102,22 @@ class TestScrubControls(CephFSTestCase):
out_json = self._get_scrub_status()
self.assertTrue("PAUSED" in out_json['status'])
# sleep enough to fetch updated task status
time.sleep(10)
self._check_task_status("paused")
checked = self._check_task_status("paused")
self.assertTrue(checked)
# resume and verify
self._resume_scrub(0)
out_json = self._get_scrub_status()
self.assertFalse("PAUSED" in out_json['status'])
checked = self._check_task_status_na()
self.assertTrue(checked)
def test_scrub_pause_and_resume_with_abort(self):
test_dir = "scrub_control_test_path"
abs_test_path = "/{0}".format(test_dir)
log.info("mountpoint: {0}".format(self.mount_a.mountpoint))
client_path = os.path.join(self.mount_a.mountpoint, test_dir)
log.info("client_path: {0}".format(client_path))
log.info("Cloning repo into place")
_ = TestScrubChecks.clone_repo(self.mount_a, client_path)
self.create_scrub_data(test_dir)
out_json = self.fs.rank_tell(["scrub", "start", abs_test_path, "recursive"])
self.assertNotEqual(out_json, None)
@ -106,9 +127,8 @@ class TestScrubControls(CephFSTestCase):
out_json = self._get_scrub_status()
self.assertTrue("PAUSED" in out_json['status'])
# sleep enough to fetch updated task status
time.sleep(10)
self._check_task_status("paused")
checked = self._check_task_status("paused")
self.assertTrue(checked)
# abort and verify
self._abort_scrub(0)
@ -116,26 +136,37 @@ class TestScrubControls(CephFSTestCase):
self.assertTrue("PAUSED" in out_json['status'])
self.assertTrue("0 inodes" in out_json['status'])
# sleep enough to fetch updated task status
time.sleep(10)
self._check_task_status("paused")
# scrub status should still be paused...
checked = self._check_task_status("paused")
self.assertTrue(checked)
# resume and verify
self._resume_scrub(0)
out_json = self._get_scrub_status()
self.assertTrue("no active" in out_json['status'])
# sleep enough to fetch updated task status
time.sleep(10)
self._check_task_status("idle")
checked = self._check_task_status_na()
self.assertTrue(checked)
def test_scrub_task_status_on_mds_failover(self):
# sleep enough to fetch updated task status
time.sleep(10)
(original_active, ) = self.fs.get_active_names()
original_standbys = self.mds_cluster.get_standby_daemons()
self._check_task_status("idle")
test_dir = "scrub_control_test_path"
abs_test_path = "/{0}".format(test_dir)
self.create_scrub_data(test_dir)
out_json = self.fs.rank_tell(["scrub", "start", abs_test_path, "recursive"])
self.assertNotEqual(out_json, None)
# pause and verify
self._pause_scrub(0)
out_json = self._get_scrub_status()
self.assertTrue("PAUSED" in out_json['status'])
checked = self._check_task_status("paused")
self.assertTrue(checked)
# Kill the rank 0
self.fs.mds_stop(original_active)
@ -150,12 +181,7 @@ class TestScrubControls(CephFSTestCase):
original_standbys))
self.wait_until_true(promoted, timeout=grace*2)
mgr_beacon_grace = float(self.fs.get_config("mgr_service_beacon_grace", service_type="mon"))
def status_check():
task_status = self.fs.get_task_status("scrub status")
return original_active not in task_status
self.wait_until_true(status_check, timeout=mgr_beacon_grace*2)
self._check_task_status_na()
class TestScrubChecks(CephFSTestCase):
"""

View File

@ -3676,7 +3676,10 @@ void MDSRank::get_task_status(std::map<std::string, std::string> *status) {
// scrub summary for now..
std::string_view scrub_summary = scrubstack->scrub_summary();
status->emplace(SCRUB_STATUS_KEY, std::move(scrub_summary));
if (!ScrubStack::is_idle(scrub_summary)) {
send_status = true;
status->emplace(SCRUB_STATUS_KEY, std::move(scrub_summary));
}
}
void MDSRank::schedule_update_timer_task() {
@ -3692,13 +3695,17 @@ void MDSRank::send_task_status() {
std::map<std::string, std::string> status;
get_task_status(&status);
if (!status.empty()) {
dout(20) << __func__ << ": updating " << status.size() << " status keys" << dendl;
if (send_status) {
if (status.empty()) {
send_status = false;
}
dout(20) << __func__ << ": updating " << status.size() << " status keys" << dendl;
int r = mgrc->service_daemon_update_task_status(std::move(status));
if (r < 0) {
derr << ": failed to update service daemon status: " << cpp_strerror(r) << dendl;
}
}
schedule_update_timer_task();

View File

@ -581,6 +581,8 @@ class MDSRank {
bool standby_replaying = false; // true if current replay pass is in standby-replay mode
private:
bool send_status = true;
// "task" string that gets displayed in ceph status
inline static const std::string SCRUB_STATUS_KEY = "scrub status";

View File

@ -105,6 +105,10 @@ public:
*/
std::string_view scrub_summary();
static bool is_idle(std::string_view state_str) {
return state_str == "idle";
}
bool is_scrubbing() const { return !inode_stack.empty(); }
MDCache *mdcache;

View File

@ -551,9 +551,21 @@ bool DaemonServer::handle_close(const ref_t<MMgrClose>& m)
void DaemonServer::update_task_status(DaemonKey key, const ref_t<MMgrReport>& m) {
dout(10) << "got task status from " << key << dendl;
auto p = pending_service_map.get_daemon(key.type, key.name);
if (!map_compare(p.first->task_status, *m->task_status)) {
p.first->task_status = *m->task_status;
bool service_map_dirty = false;
if ((*m->task_status).empty()) {
auto removed = pending_service_map.rm_daemon(key.type, key.name);
if (removed) {
service_map_dirty = true;
}
} else {
auto p = pending_service_map.get_daemon(key.type, key.name);
if (!map_compare(p.first->task_status, *m->task_status)) {
service_map_dirty = true;
p.first->task_status = *m->task_status;
}
}
if (service_map_dirty) {
pending_service_map_dirty = pending_service_map.epoch;
}
}

View File

@ -184,6 +184,7 @@ void MgrClient::reconnect()
if (service_daemon) {
daemon_dirty_status = true;
}
task_dirty_status = true;
// Don't send an open if we're just a client (i.e. doing
// command-sending, not stats etc)