mirror of
https://github.com/ceph/ceph
synced 2025-01-02 00:52:22 +00:00
Merge pull request #46068 from neesingh-rh/change_perf_stats_o/p_structure
mgr/stats: change in structure of perf_stats o/p Reviewed-by: Venky Shankar <vshankar@redhat.com> Reviewed-by: Anthony D'Atri <anthony.datri@gmail.com> Reviewed-by: Jos Collin <jcollin@redhat.com>
This commit is contained in:
commit
d8ba839bcd
@ -29,9 +29,8 @@ metrics are for a particular MDS rank (e.g., number of subtrees handled by an MD
|
||||
Once enabled, Ceph Filesystem metrics can be fetched via::
|
||||
|
||||
$ ceph fs perf stats
|
||||
{"version": 1, "global_counters": ["cap_hit", "read_latency", "write_latency", "metadata_latency", "dentry_lease", "opened_files", "pinned_icaps", "opened_inodes", "avg_read_latency", "stdev_read_latency", "avg_write_latency", "stdev_write_latency", "avg_metadata_latency", "stdev_metadata_latency"], "counters": [], "client_metadata": {"client.324130": {"IP": "192.168.1.100", "hostname": "ceph-host1", "root": "/", "mount_point": "/mnt/cephfs", "valid_metrics": ["cap_hit", "read_latency", "write_latency", "metadata_latency", "dentry_lease, "opened_files", "pinned_icaps", "opened_inodes", "avg_read_latency", "stdev_read_latency", "avg_write_latency", "stdev_write_latency", "avg_metadata_latency", "stdev_metadata_latency"]}}, "global_metrics": {"client.324130": [[309785, 1280], [0, 0], [197, 519015022], [88, 279074768], [12, 70147], [0, 3], [3, 3], [0, 3], [0, 0], [0, 0], [0, 11699223], [0, 88245], [0, 6596951], [0, 9539]]}, "metrics": {"delayed_ranks": [], "mds.0": {"client.324130": []}}}
|
||||
|
||||
Details of the JSON command output are as follows:
|
||||
The output format is JSON and contains fields as follows:
|
||||
|
||||
- `version`: Version of stats output
|
||||
- `global_counters`: List of global performance metrics
|
||||
|
@ -31,23 +31,28 @@ class TestMDSMetrics(CephFSTestCase):
|
||||
if curr_max_mds > 1:
|
||||
self.fs.shrink(1)
|
||||
|
||||
def verify_mds_metrics(self, active_mds_count=1, client_count=1, ranks=[]):
|
||||
def verify_mds_metrics(self, active_mds_count=1, client_count=1, ranks=[], mul_fs=[]):
|
||||
def verify_metrics_cbk(metrics):
|
||||
mds_metrics = metrics['metrics']
|
||||
if not len(mds_metrics) == active_mds_count + 1: # n active mdss + delayed set
|
||||
return False
|
||||
fs_status = self.fs.status()
|
||||
nonlocal ranks
|
||||
nonlocal ranks, mul_fs
|
||||
if not ranks:
|
||||
ranks = set([info['rank'] for info in fs_status.get_ranks(self.fs.id)])
|
||||
if not mul_fs:
|
||||
mul_fs = [self.fs.id]
|
||||
for filesystem in mul_fs:
|
||||
ranks = set([info['rank'] for info in fs_status.get_ranks(filesystem)])
|
||||
for rank in ranks:
|
||||
r = mds_metrics.get("mds.{}".format(rank), None)
|
||||
if not r or not len(mds_metrics['delayed_ranks']) == 0:
|
||||
return False
|
||||
global_metrics = metrics['global_metrics']
|
||||
client_metadata = metrics['client_metadata']
|
||||
if not len(global_metrics) >= client_count or not len(client_metadata) >= client_count:
|
||||
return False
|
||||
for item in mul_fs:
|
||||
key = fs_status.get_fsmap(item)['mdsmap']['fs_name']
|
||||
global_metrics = metrics['global_metrics'].get(key, {})
|
||||
client_metadata = metrics['client_metadata'].get(key, {})
|
||||
if not len(global_metrics) >= client_count or not len(client_metadata) >= client_count:
|
||||
return False
|
||||
return True
|
||||
return verify_metrics_cbk
|
||||
|
||||
@ -102,12 +107,12 @@ class TestMDSMetrics(CephFSTestCase):
|
||||
|
||||
def _setup_fs(self, fs_name):
|
||||
fs_a = self.mds_cluster.newfs(name=fs_name)
|
||||
|
||||
|
||||
self.mds_cluster.mds_restart()
|
||||
|
||||
# Wait for filesystem to go healthy
|
||||
fs_a.wait_for_daemons()
|
||||
|
||||
|
||||
# Reconfigure client auth caps
|
||||
for mount in self.mounts:
|
||||
self.mds_cluster.mon_manager.raw_cluster_cmd_result(
|
||||
@ -292,14 +297,29 @@ class TestMDSMetrics(CephFSTestCase):
|
||||
log.debug("metrics={0}".format(metrics))
|
||||
self.assertTrue(valid)
|
||||
|
||||
filtered_mds = 1
|
||||
def verify_filtered_mds_rank_metrics(metrics):
|
||||
# checks if the metrics has only client_metadata and
|
||||
# global_metrics filtered using --mds_rank=1
|
||||
global_metrics = metrics['global_metrics'].get(self.fs.name, {})
|
||||
client_metadata = metrics['client_metadata'].get(self.fs.name, {})
|
||||
mds_metrics = metrics['metrics']
|
||||
if len(mds_metrics) != 2 or f"mds.{filtered_mds}" not in mds_metrics:
|
||||
return False
|
||||
if len(global_metrics) > TestMDSMetrics.CLIENTS_REQUIRED or\
|
||||
len(client_metadata) > TestMDSMetrics.CLIENTS_REQUIRED:
|
||||
return False
|
||||
if len(set(global_metrics) - set(mds_metrics[f"mds.{filtered_mds}"])) or\
|
||||
len(set(client_metadata) - set(mds_metrics[f"mds.{filtered_mds}"])):
|
||||
return False
|
||||
return True
|
||||
# initiate a new query with `--mds_rank` filter and validate if
|
||||
# we get metrics *only* from that mds.
|
||||
filtered_mds = 1
|
||||
valid, metrics = self._get_metrics(
|
||||
self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED,
|
||||
ranks=[filtered_mds]), 30, '--mds_rank={}'.format(filtered_mds))
|
||||
log.debug("metrics={0}".format(metrics))
|
||||
self.assertTrue(valid)
|
||||
valid, metrics = self._get_metrics(verify_filtered_mds_rank_metrics, 30,
|
||||
f'--mds_rank={filtered_mds}')
|
||||
log.debug(f"metrics={metrics}")
|
||||
self.assertTrue(valid, "Incorrect 'ceph fs perf stats' output"
|
||||
f" with filter '--mds_rank={filtered_mds}'")
|
||||
|
||||
def test_query_client_filter(self):
|
||||
# validate
|
||||
@ -326,7 +346,7 @@ class TestMDSMetrics(CephFSTestCase):
|
||||
log.debug("metrics={0}".format(metrics))
|
||||
self.assertTrue(valid)
|
||||
|
||||
client_matadata = metrics['client_metadata']
|
||||
client_matadata = metrics['client_metadata'][self.fs.name]
|
||||
# pick an random client
|
||||
client = random.choice(list(client_matadata.keys()))
|
||||
# get IP of client to use in filter
|
||||
@ -338,8 +358,8 @@ class TestMDSMetrics(CephFSTestCase):
|
||||
self.assertTrue(valid)
|
||||
|
||||
# verify IP from output with filter IP
|
||||
for i in metrics['client_metadata']:
|
||||
self.assertEqual(client_ip, metrics['client_metadata'][i]['IP'])
|
||||
for i in metrics['client_metadata'][self.fs.name]:
|
||||
self.assertEqual(client_ip, metrics['client_metadata'][self.fs.name][i]['IP'])
|
||||
|
||||
def test_query_mds_and_client_filter(self):
|
||||
# validate
|
||||
@ -423,21 +443,20 @@ class TestMDSMetrics(CephFSTestCase):
|
||||
log.debug(f'metrics={metrics}')
|
||||
self.assertTrue(valid)
|
||||
|
||||
#mount_a and mount_b are the clients mounted for TestMDSMetrics. So get their
|
||||
#entries from the global_metrics.
|
||||
# mount_a and mount_b are the clients mounted for TestMDSMetrics. So get their
|
||||
# entries from the global_metrics.
|
||||
client_a_name = f'client.{self.mount_a.get_global_id()}'
|
||||
client_b_name = f'client.{self.mount_b.get_global_id()}'
|
||||
|
||||
global_metrics = metrics['global_metrics']
|
||||
client_a_metrics = global_metrics[client_a_name]
|
||||
client_b_metrics = global_metrics[client_b_name]
|
||||
client_a_metrics = global_metrics[self.fs.name][client_a_name]
|
||||
client_b_metrics = global_metrics[self.fs.name][client_b_name]
|
||||
|
||||
#fail rank0 mds
|
||||
# fail rank0 mds
|
||||
self.fs.rank_fail(rank=0)
|
||||
|
||||
# Wait for 10 seconds for the failover to complete and
|
||||
# the mgr to get initial metrics from the new rank0 mds.
|
||||
time.sleep(10)
|
||||
# Wait for rank0 up:active state
|
||||
self.fs.wait_for_state('up:active', rank=0, timeout=30)
|
||||
|
||||
fscid = self.fs.id
|
||||
|
||||
@ -457,15 +476,22 @@ class TestMDSMetrics(CephFSTestCase):
|
||||
log.debug(f'metrics={metrics_new}')
|
||||
self.assertTrue(valid)
|
||||
|
||||
global_metrics = metrics_new['global_metrics']
|
||||
client_a_metrics_new = global_metrics[client_a_name]
|
||||
client_b_metrics_new = global_metrics[client_b_name]
|
||||
client_metadata = metrics_new['client_metadata']
|
||||
client_a_metadata = client_metadata.get(self.fs.name, {}).get(client_a_name, {})
|
||||
client_b_metadata = client_metadata.get(self.fs.name, {}).get(client_b_name, {})
|
||||
|
||||
#the metrics should be different for the test to succeed.
|
||||
self.assertNotEqual(client_a_metrics, client_a_metrics_new)
|
||||
self.assertNotEqual(client_b_metrics, client_b_metrics_new)
|
||||
global_metrics = metrics_new['global_metrics']
|
||||
client_a_metrics_new = global_metrics.get(self.fs.name, {}).get(client_a_name, {})
|
||||
client_b_metrics_new = global_metrics.get(self.fs.name, {}).get(client_b_name, {})
|
||||
|
||||
# the metrics should be different for the test to succeed.
|
||||
self.assertTrue(client_a_metadata and client_b_metadata and
|
||||
client_a_metrics_new and client_b_metrics_new and
|
||||
(client_a_metrics_new != client_a_metrics) and
|
||||
(client_b_metrics_new != client_b_metrics),
|
||||
"Invalid 'ceph fs perf stats' metrics after rank0 mds failover")
|
||||
except MaxWhileTries:
|
||||
raise RuntimeError("Failed to fetch `ceph fs perf stats` metrics")
|
||||
raise RuntimeError("Failed to fetch 'ceph fs perf stats' metrics")
|
||||
finally:
|
||||
# cleanup test directories
|
||||
self._cleanup_test_dirs()
|
||||
@ -473,13 +499,13 @@ class TestMDSMetrics(CephFSTestCase):
|
||||
def test_client_metrics_and_metadata(self):
|
||||
self.mount_a.umount_wait()
|
||||
self.mount_b.umount_wait()
|
||||
self.fs.delete_all_filesystems()
|
||||
|
||||
self.mds_cluster.mon_manager.raw_cluster_cmd("fs", "flag", "set",
|
||||
"enable_multiple", "true",
|
||||
"--yes-i-really-mean-it")
|
||||
|
||||
#creating filesystem
|
||||
fs_a = self._setup_fs(fs_name = "fs1")
|
||||
"enable_multiple", "true", "--yes-i-really-mean-it")
|
||||
|
||||
# creating filesystem
|
||||
fs_a = self._setup_fs(fs_name="fs1")
|
||||
|
||||
# Mount a client on fs_a
|
||||
self.mount_a.mount_wait(cephfs_name=fs_a.name)
|
||||
@ -488,8 +514,8 @@ class TestMDSMetrics(CephFSTestCase):
|
||||
self.mount_a.path_to_ino("test.bin")
|
||||
self.mount_a.create_files()
|
||||
|
||||
#creating another filesystem
|
||||
fs_b = self._setup_fs(fs_name = "fs2")
|
||||
# creating another filesystem
|
||||
fs_b = self._setup_fs(fs_name="fs2")
|
||||
|
||||
# Mount a client on fs_b
|
||||
self.mount_b.mount_wait(cephfs_name=fs_b.name)
|
||||
@ -497,19 +523,47 @@ class TestMDSMetrics(CephFSTestCase):
|
||||
self.mount_b.path_to_ino("test.bin")
|
||||
self.mount_b.create_files()
|
||||
|
||||
fscid_list = [fs_a.id, fs_b.id]
|
||||
|
||||
# validate
|
||||
valid, metrics = self._get_metrics(
|
||||
self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
|
||||
self.verify_mds_metrics(client_count=1, mul_fs=fscid_list), 30)
|
||||
log.debug(f"metrics={metrics}")
|
||||
self.assertTrue(valid)
|
||||
|
||||
client_metadata = metrics['client_metadata']
|
||||
|
||||
for i in client_metadata:
|
||||
if not (client_metadata[i]['hostname']):
|
||||
raise RuntimeError("hostname not found!")
|
||||
if not (client_metadata[i]['valid_metrics']):
|
||||
raise RuntimeError("valid_metrics not found!")
|
||||
client_metadata_a = metrics['client_metadata']['fs1']
|
||||
client_metadata_b = metrics['client_metadata']['fs2']
|
||||
|
||||
for i in client_metadata_a:
|
||||
if not (client_metadata_a[i]['hostname']):
|
||||
raise RuntimeError("hostname of fs1 not found!")
|
||||
if not (client_metadata_a[i]['valid_metrics']):
|
||||
raise RuntimeError("valid_metrics of fs1 not found!")
|
||||
|
||||
for i in client_metadata_b:
|
||||
if not (client_metadata_b[i]['hostname']):
|
||||
raise RuntimeError("hostname of fs2 not found!")
|
||||
if not (client_metadata_b[i]['valid_metrics']):
|
||||
raise RuntimeError("valid_metrics of fs2 not found!")
|
||||
|
||||
def test_non_existing_mds_rank(self):
|
||||
def verify_filtered_metrics(metrics):
|
||||
# checks if the metrics has non empty client_metadata and global_metrics
|
||||
if metrics['client_metadata'].get(self.fs.name, {})\
|
||||
or metrics['global_metrics'].get(self.fs.name, {}):
|
||||
return True
|
||||
return False
|
||||
|
||||
try:
|
||||
# validate
|
||||
filter_rank = random.randint(1, 10)
|
||||
valid, metrics = self._get_metrics(verify_filtered_metrics, 30,
|
||||
'--mds_rank={}'.format(filter_rank))
|
||||
log.info(f'metrics={metrics}')
|
||||
self.assertFalse(valid, "Fetched 'ceph fs perf stats' metrics using nonexistent MDS rank")
|
||||
except MaxWhileTries:
|
||||
# success
|
||||
pass
|
||||
|
||||
def test_perf_stats_stale_metrics_with_multiple_filesystem(self):
|
||||
self.mount_a.umount_wait()
|
||||
|
@ -14,7 +14,7 @@ from datetime import datetime, timedelta
|
||||
from threading import Lock, Condition, Thread, Timer
|
||||
from ipaddress import ip_address
|
||||
|
||||
PERF_STATS_VERSION = 1
|
||||
PERF_STATS_VERSION = 2
|
||||
|
||||
QUERY_IDS = "query_ids"
|
||||
GLOBAL_QUERY_ID = "global_query_id"
|
||||
@ -23,11 +23,13 @@ QUERY_RAW_COUNTERS = "query_raw_counters"
|
||||
QUERY_RAW_COUNTERS_GLOBAL = "query_raw_counters_global"
|
||||
|
||||
MDS_RANK_ALL = (-1,)
|
||||
CLIENT_ID_ALL = "\d*"
|
||||
CLIENT_ID_ALL = r"\d*"
|
||||
CLIENT_IP_ALL = ".*"
|
||||
|
||||
fs_list = [] # type: List[str]
|
||||
|
||||
MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS = '^(.*)$'
|
||||
MDS_PERF_QUERY_REGEX_MATCH_CLIENTS = '^(client.{0}\s+{1}):.*'
|
||||
MDS_PERF_QUERY_REGEX_MATCH_CLIENTS = r'^(client.{0}\s+{1}):.*'
|
||||
MDS_PERF_QUERY_COUNTERS_MAP = OrderedDict({'cap_hit': 0,
|
||||
'read_latency': 1,
|
||||
'write_latency': 2,
|
||||
@ -143,8 +145,9 @@ class FSPerfStats(object):
|
||||
self.report_processor = Thread(target=self.run)
|
||||
self.report_processor.start()
|
||||
|
||||
def set_client_metadata(self, client_id, key, meta):
|
||||
result = self.client_metadata['metadata'].setdefault(client_id, {})
|
||||
def set_client_metadata(self, fs_name, client_id, key, meta):
|
||||
result = (self.client_metadata['metadata'].setdefault(
|
||||
fs_name, {})).setdefault(client_id, {})
|
||||
if not key in result or not result[key] == meta:
|
||||
result[key] = meta
|
||||
|
||||
@ -156,35 +159,40 @@ class FSPerfStats(object):
|
||||
except KeyError:
|
||||
self.log.warn(f"cmdtag {cmdtag} not found in client metadata")
|
||||
return
|
||||
|
||||
client_meta = result[1].wait()
|
||||
fs_name = result[0]
|
||||
client_meta = result[2].wait()
|
||||
if client_meta[0] != 0:
|
||||
self.log.warn("failed to fetch client metadata from rank {0}, err={1}".format(
|
||||
result[0], client_meta[2]))
|
||||
self.log.warn("failed to fetch client metadata from gid {0}, err={1}".format(
|
||||
result[1], client_meta[2]))
|
||||
return
|
||||
self.log.debug("notify: client metadata={0}".format(json.loads(client_meta[1])))
|
||||
for metadata in json.loads(client_meta[1]):
|
||||
client_id = "client.{0}".format(metadata['id'])
|
||||
result = self.client_metadata['metadata'].setdefault(client_id, {})
|
||||
result = (self.client_metadata['metadata'].setdefault(fs_name, {})).setdefault(client_id, {})
|
||||
for subkey in CLIENT_METADATA_SUBKEYS:
|
||||
self.set_client_metadata(client_id, subkey, metadata[CLIENT_METADATA_KEY][subkey])
|
||||
self.set_client_metadata(fs_name, client_id, subkey, metadata[CLIENT_METADATA_KEY][subkey])
|
||||
for subkey in CLIENT_METADATA_SUBKEYS_OPTIONAL:
|
||||
self.set_client_metadata(client_id, subkey,
|
||||
self.set_client_metadata(fs_name, client_id, subkey,
|
||||
metadata[CLIENT_METADATA_KEY].get(subkey, NON_EXISTENT_KEY_STR))
|
||||
metric_features = int(metadata[CLIENT_METADATA_KEY]["metric_spec"]["metric_flags"]["feature_bits"], 16)
|
||||
supported_metrics = [metric for metric, bit in MDS_PERF_QUERY_COUNTERS_MAP.items() if metric_features & (1 << bit)]
|
||||
self.set_client_metadata(client_id, "valid_metrics", supported_metrics)
|
||||
self.set_client_metadata(fs_name, client_id, "valid_metrics", supported_metrics)
|
||||
kver = metadata[CLIENT_METADATA_KEY].get("kernel_version", None)
|
||||
if kver:
|
||||
self.set_client_metadata(client_id, "kernel_version", kver)
|
||||
self.set_client_metadata(fs_name, client_id, "kernel_version", kver)
|
||||
# when all async requests are done, purge clients metadata if any.
|
||||
if not self.client_metadata['in_progress']:
|
||||
for client in self.client_metadata['to_purge']:
|
||||
try:
|
||||
self.log.info("purge client metadata for {0}".format(client))
|
||||
self.client_metadata['metadata'].remove(client)
|
||||
except:
|
||||
pass
|
||||
global fs_list
|
||||
for fs_name in fs_list:
|
||||
for client in self.client_metadata['to_purge']:
|
||||
try:
|
||||
if client in self.client_metadata['metadata'][fs_name]:
|
||||
self.log.info("purge client metadata for {0}".format(client))
|
||||
self.client_metadata['metadata'][fs_name].pop(client)
|
||||
except:
|
||||
pass
|
||||
if fs_name in self.client_metadata['metadata'] and not bool(self.client_metadata['metadata'][fs_name]):
|
||||
self.client_metadata['metadata'].pop(fs_name)
|
||||
self.client_metadata['to_purge'].clear()
|
||||
self.log.debug("client_metadata={0}, to_purge={1}".format(
|
||||
self.client_metadata['metadata'], self.client_metadata['to_purge']))
|
||||
@ -237,23 +245,28 @@ class FSPerfStats(object):
|
||||
def update_client_meta(self):
|
||||
new_updates = {}
|
||||
pending_updates = [v[0] for v in self.client_metadata['in_progress'].values()]
|
||||
global fs_list
|
||||
fs_list.clear()
|
||||
with self.meta_lock:
|
||||
fsmap = self.module.get('fs_map')
|
||||
for fs in fsmap['filesystems']:
|
||||
mdsmap = fs['mdsmap']
|
||||
gid = mdsmap['up']["mds_0"]
|
||||
if gid in pending_updates:
|
||||
continue
|
||||
tag = str(uuid.uuid4())
|
||||
result = CommandResult(tag)
|
||||
new_updates[tag] = (gid, result)
|
||||
self.client_metadata['in_progress'].update(new_updates)
|
||||
mds_map = fs['mdsmap']
|
||||
if mds_map is not None:
|
||||
fsname = mds_map['fs_name']
|
||||
for mds_id, mds_status in mds_map['info'].items():
|
||||
if mds_status['rank'] == 0:
|
||||
fs_list.append(fsname)
|
||||
rank0_gid = mds_status['gid']
|
||||
tag = str(uuid.uuid4())
|
||||
result = CommandResult(tag)
|
||||
new_updates[tag] = (fsname, rank0_gid, result)
|
||||
self.client_metadata['in_progress'].update(new_updates)
|
||||
|
||||
self.log.debug(f"updating client metadata from {new_updates}")
|
||||
|
||||
cmd_dict = {'prefix': 'client ls'}
|
||||
for tag,val in new_updates.items():
|
||||
self.module.send_command(val[1], "mds", str(val[0]), json.dumps(cmd_dict), tag)
|
||||
self.module.send_command(val[2], "mds", str(val[1]), json.dumps(cmd_dict), tag)
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
@ -285,7 +298,7 @@ class FSPerfStats(object):
|
||||
def cull_client_entries(self, raw_perf_counters, incoming_metrics, missing_clients):
|
||||
# this is a bit more involved -- for each rank figure out what clients
|
||||
# are missing in incoming report and purge them from our tracked map.
|
||||
# but, if this is invoked _after_ cull_mds_entries(), the rank set
|
||||
# but, if this is invoked after cull_mds_entries(), the rank set
|
||||
# is same, so we can loop based on that assumption.
|
||||
ranks = raw_perf_counters.keys()
|
||||
for rank in ranks:
|
||||
@ -308,13 +321,16 @@ class FSPerfStats(object):
|
||||
self.log.info("deferring client metadata purge (now {0} client(s))".format(
|
||||
len(self.client_metadata['to_purge'])))
|
||||
else:
|
||||
for client in missing_clients:
|
||||
try:
|
||||
self.log.info("purge client metadata for {0}".format(client))
|
||||
self.client_metadata['metadata'].pop(client)
|
||||
except KeyError:
|
||||
pass
|
||||
self.log.debug("client_metadata={0}".format(self.client_metadata['metadata']))
|
||||
global fs_list
|
||||
for fs_name in fs_list:
|
||||
for client in missing_clients:
|
||||
try:
|
||||
self.log.info("purge client metadata for {0}".format(client))
|
||||
if client in self.client_metadata['metadata'][fs_name]:
|
||||
self.client_metadata['metadata'][fs_name].pop(client)
|
||||
except KeyError:
|
||||
pass
|
||||
self.log.debug("client_metadata={0}".format(self.client_metadata['metadata']))
|
||||
|
||||
def cull_global_metrics(self, raw_perf_counters, incoming_metrics):
|
||||
tracked_clients = raw_perf_counters.keys()
|
||||
@ -345,21 +361,27 @@ class FSPerfStats(object):
|
||||
|
||||
# iterate over metrics list and update our copy (note that we have
|
||||
# already culled the differences).
|
||||
for counter in incoming_metrics:
|
||||
mds_rank = int(counter['k'][0][0])
|
||||
client_id, client_ip = extract_client_id_and_ip(counter['k'][1][0])
|
||||
if client_id is not None or not client_ip: # client_id _could_ be 0
|
||||
with self.meta_lock:
|
||||
self.set_client_metadata(client_id, "IP", client_ip)
|
||||
else:
|
||||
self.log.warn("client metadata for client_id={0} might be unavailable".format(client_id))
|
||||
global fs_list
|
||||
for fs_name in fs_list:
|
||||
for counter in incoming_metrics:
|
||||
mds_rank = int(counter['k'][0][0])
|
||||
client_id, client_ip = extract_client_id_and_ip(counter['k'][1][0])
|
||||
if self.client_metadata['metadata'].get(fs_name):
|
||||
if (client_id is not None or not client_ip) and\
|
||||
self.client_metadata["metadata"][fs_name].get(client_id): # client_id _could_ be 0
|
||||
with self.meta_lock:
|
||||
self.set_client_metadata(fs_name, client_id, "IP", client_ip)
|
||||
else:
|
||||
self.log.warn(f"client metadata for client_id={client_id} might be unavailable")
|
||||
else:
|
||||
self.log.warn(f"client metadata for filesystem={fs_name} might be unavailable")
|
||||
|
||||
raw_counters = raw_perf_counters.setdefault(mds_rank, [False, {}])
|
||||
raw_counters[0] = True if mds_rank in delayed_ranks else False
|
||||
raw_client_counters = raw_counters[1].setdefault(client_id, [])
|
||||
raw_counters = raw_perf_counters.setdefault(mds_rank, [False, {}])
|
||||
raw_counters[0] = True if mds_rank in delayed_ranks else False
|
||||
raw_client_counters = raw_counters[1].setdefault(client_id, [])
|
||||
|
||||
del raw_client_counters[:]
|
||||
raw_client_counters.extend(counter['c'])
|
||||
del raw_client_counters[:]
|
||||
raw_client_counters.extend(counter['c'])
|
||||
# send an asynchronous client metadata refresh
|
||||
self.update_client_meta()
|
||||
|
||||
@ -476,32 +498,42 @@ class FSPerfStats(object):
|
||||
|
||||
def generate_report(self, user_query):
|
||||
result = {} # type: Dict
|
||||
global fs_list
|
||||
# start with counter info -- metrics that are global and per mds
|
||||
result["version"] = PERF_STATS_VERSION
|
||||
result["global_counters"] = MDS_GLOBAL_PERF_QUERY_COUNTERS
|
||||
result["counters"] = MDS_PERF_QUERY_COUNTERS
|
||||
|
||||
# fill in client metadata
|
||||
raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})
|
||||
raw_perfs_global = user_query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})
|
||||
raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS, {})
|
||||
with self.meta_lock:
|
||||
raw_counters_clients = []
|
||||
for val in raw_perfs.values():
|
||||
raw_counters_clients.extend(list(val[1]))
|
||||
result_meta = result.setdefault("client_metadata", {})
|
||||
for client_id in raw_perfs.keys():
|
||||
if client_id in self.client_metadata["metadata"]:
|
||||
client_meta = result_meta.setdefault(client_id, {})
|
||||
client_meta.update(self.client_metadata["metadata"][client_id])
|
||||
for fs_name in fs_list:
|
||||
meta = self.client_metadata["metadata"]
|
||||
if fs_name in meta and len(meta[fs_name]):
|
||||
for client_id in raw_perfs_global.keys():
|
||||
if client_id in meta[fs_name] and client_id in raw_counters_clients:
|
||||
client_meta = (result_meta.setdefault(fs_name, {})).setdefault(client_id, {})
|
||||
client_meta.update(meta[fs_name][client_id])
|
||||
|
||||
# start populating global perf metrics w/ client metadata
|
||||
metrics = result.setdefault("global_metrics", {})
|
||||
for client_id, counters in raw_perfs.items():
|
||||
global_client_metrics = metrics.setdefault(client_id, [])
|
||||
del global_client_metrics[:]
|
||||
global_client_metrics.extend(counters)
|
||||
for fs_name in fs_list:
|
||||
if fs_name in meta and len(meta[fs_name]):
|
||||
for client_id, counters in raw_perfs_global.items():
|
||||
if client_id in meta[fs_name] and client_id in raw_counters_clients:
|
||||
global_client_metrics = (metrics.setdefault(fs_name, {})).setdefault(client_id, [])
|
||||
del global_client_metrics[:]
|
||||
global_client_metrics.extend(counters)
|
||||
|
||||
# and, now per-mds metrics keyed by mds rank along with delayed ranks
|
||||
raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS, {})
|
||||
metrics = result.setdefault("metrics", {})
|
||||
|
||||
metrics["delayed_ranks"] = [rank for rank,counters in raw_perfs.items() if counters[0]]
|
||||
metrics["delayed_ranks"] = [rank for rank, counters in raw_perfs.items() if counters[0]]
|
||||
for rank, counters in raw_perfs.items():
|
||||
mds_key = "mds.{}".format(rank)
|
||||
mds_metrics = metrics.setdefault(mds_key, {})
|
||||
|
@ -37,7 +37,7 @@ class MetricType(Enum):
|
||||
FS_TOP_PROG_STR = 'cephfs-top'
|
||||
|
||||
# version match b/w fstop and stats emitted by mgr/stats
|
||||
FS_TOP_SUPPORTED_VER = 1
|
||||
FS_TOP_SUPPORTED_VER = 2
|
||||
|
||||
ITEMS_PAD_LEN = 1
|
||||
ITEMS_PAD = " " * ITEMS_PAD_LEN
|
||||
|
Loading…
Reference in New Issue
Block a user