Merge pull request #20506 from jan--f/prometheus-rm-outdated-daemon-metrics

pybind/mgr/prometheus: don't export metrics for dead daemon; new metrics

Reviewed-by: John Spray <john.spray@redhat.com>
This commit is contained in:
Kefu Chai 2018-03-02 08:57:42 +08:00 committed by GitHub
commit ca2fd5abb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -56,7 +56,14 @@ DF_POOL = ['max_avail', 'bytes_used', 'raw_bytes_used', 'objects', 'dirty',
OSD_FLAGS = ('noup', 'nodown', 'noout', 'noin', 'nobackfill', 'norebalance',
'norecover', 'noscrub', 'nodeep-scrub')
OSD_METADATA = ('cluster_addr', 'device_class', 'id', 'public_addr')
FS_METADATA = ('data_pools', 'id', 'metadata_pool', 'name')
MDS_METADATA = ('id', 'fs', 'hostname', 'public_addr', 'rank', 'ceph_version')
MON_METADATA = ('id', 'hostname', 'public_addr', 'rank', 'ceph_version')
OSD_METADATA = ('cluster_addr', 'device_class', 'id', 'hostname', 'public_addr',
'ceph_version')
OSD_STATUS = ['weight', 'up', 'in']
@ -67,19 +74,172 @@ POOL_METADATA = ('pool_id', 'name')
DISK_OCCUPATION = ('instance', 'device', 'ceph_daemon')
class Metrics(object):
def __init__(self):
self.metrics = self._setup_static_metrics()
self.pending = {}
def set(self, key, value, labels=('',)):
'''
Set the value of a single Metrics. This should be used for static metrics,
e.g. cluster health.
'''
self.metrics[key].set(value, labels)
def append(self, key, value, labels = ('',)):
'''
Append a metrics to the staging area. Use this to aggregate daemon specific
metrics that can appear and go away as daemons are added or removed.
'''
if key not in self.pending:
self.pending[key] = []
self.pending[key].append((labels, value))
def reset(self):
'''
When metrics aggregation is done, call Metrics.reset() to apply the
aggregated metric. This will remove all label -> value mappings for a
metric and set the new mapping (from pending). This means daemon specific
metrics os daemons that do no longer exist, are removed.
'''
for k, v in self.pending.items():
self.metrics[k].reset(v)
self.pending = {}
def add_metric(self, path, metric):
if path not in self.metrics:
self.metrics[path] = metric
def _setup_static_metrics(self):
metrics = {}
metrics['health_status'] = Metric(
'untyped',
'health_status',
'Cluster health status'
)
metrics['mon_quorum_status'] = Metric(
'gauge',
'mon_quorum_status',
'Monitors in quorum',
('ceph_daemon',)
)
metrics['fs_metadata'] = Metric(
'untyped',
'fs_metadata',
'FS Metadata',
FS_METADATA
)
metrics['mds_metadata'] = Metric(
'untyped',
'mds_metadata',
'MDS Metadata',
MDS_METADATA
)
metrics['mon_metadata'] = Metric(
'untyped',
'mon_metadata',
'MON Metadata',
MON_METADATA
)
metrics['osd_metadata'] = Metric(
'untyped',
'osd_metadata',
'OSD Metadata',
OSD_METADATA
)
# The reason for having this separate to OSD_METADATA is
# so that we can stably use the same tag names that
# the Prometheus node_exporter does
metrics['disk_occupation'] = Metric(
'untyped',
'disk_occupation',
'Associate Ceph daemon with disk used',
DISK_OCCUPATION
)
metrics['pool_metadata'] = Metric(
'untyped',
'pool_metadata',
'POOL Metadata',
POOL_METADATA
)
metrics['pg_total'] = Metric(
'gauge',
'pg_total',
'PG Total Count'
)
for flag in OSD_FLAGS:
path = 'osd_flag_{}'.format(flag)
metrics[path] = Metric(
'untyped',
path,
'OSD Flag {}'.format(flag)
)
for state in OSD_STATUS:
path = 'osd_{}'.format(state)
metrics[path] = Metric(
'untyped',
path,
'OSD status {}'.format(state),
('ceph_daemon',)
)
for stat in OSD_STATS:
path = 'osd_{}'.format(stat)
metrics[path] = Metric(
'gauge',
path,
'OSD stat {}'.format(stat),
('ceph_daemon',)
)
for state in PG_STATES:
path = 'pg_{}'.format(state)
metrics[path] = Metric(
'gauge',
path,
'PG {}'.format(state),
)
for state in DF_CLUSTER:
path = 'cluster_{}'.format(state)
metrics[path] = Metric(
'gauge',
path,
'DF {}'.format(state),
)
for state in DF_POOL:
path = 'pool_{}'.format(state)
metrics[path] = Metric(
'gauge',
path,
'DF pool {}'.format(state),
('pool_id',)
)
return metrics
class Metric(object):
def __init__(self, mtype, name, desc, labels=None):
self.mtype = mtype
self.name = name
self.desc = desc
self.labelnames = labels # tuple if present
self.value = dict() # indexed by label values
self.value = {} # indexed by label values
def set(self, value, labelvalues=None):
# labelvalues must be a tuple
labelvalues = labelvalues or ('',)
self.value[labelvalues] = value
def reset(self, values):
self.value = {}
for labelvalues, value in values:
self.value[labelvalues] = value
def str_expfmt(self):
def promethize(path):
@ -143,135 +303,69 @@ class Module(MgrModule):
def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)
self.metrics = self._setup_static_metrics()
self.metrics = Metrics()
self.schema = OrderedDict()
_global_instance['plugin'] = self
def _setup_static_metrics(self):
metrics = {}
metrics['health_status'] = Metric(
'untyped',
'health_status',
'Cluster health status'
)
metrics['mon_quorum_count'] = Metric(
'gauge',
'mon_quorum_count',
'Monitors in quorum'
)
metrics['osd_metadata'] = Metric(
'untyped',
'osd_metadata',
'OSD Metadata',
OSD_METADATA
)
# The reason for having this separate to OSD_METADATA is
# so that we can stably use the same tag names that
# the Prometheus node_exporter does
metrics['disk_occupation'] = Metric(
'untyped',
'disk_occupation',
'Associate Ceph daemon with disk used',
DISK_OCCUPATION
)
metrics['pool_metadata'] = Metric(
'untyped',
'pool_metadata',
'POOL Metadata',
POOL_METADATA
)
metrics['pg_total'] = Metric(
'gauge',
'pg_total',
'PG Total Count'
)
for flag in OSD_FLAGS:
path = 'osd_flag_{}'.format(flag)
metrics[path] = Metric(
'untyped',
path,
'OSD Flag {}'.format(flag)
)
for state in OSD_STATUS:
path = 'osd_{}'.format(state)
self.log.debug("init: creating {}".format(path))
metrics[path] = Metric(
'untyped',
path,
'OSD status {}'.format(state),
('ceph_daemon',)
)
for stat in OSD_STATS:
path = 'osd_{}'.format(stat)
self.log.debug("init: creating {}".format(path))
metrics[path] = Metric(
'gauge',
path,
'OSD stat {}'.format(stat),
('ceph_daemon',)
)
for state in PG_STATES:
path = 'pg_{}'.format(state)
self.log.debug("init: creating {}".format(path))
metrics[path] = Metric(
'gauge',
path,
'PG {}'.format(state),
)
for state in DF_CLUSTER:
path = 'cluster_{}'.format(state)
self.log.debug("init: creating {}".format(path))
metrics[path] = Metric(
'gauge',
path,
'DF {}'.format(state),
)
for state in DF_POOL:
path = 'pool_{}'.format(state)
self.log.debug("init: creating {}".format(path))
metrics[path] = Metric(
'gauge',
path,
'DF pool {}'.format(state),
('pool_id',)
)
return metrics
def get_health(self):
health = json.loads(self.get('health')['json'])
self.metrics['health_status'].set(
health_status_to_number(health['status'])
self.metrics.set('health_status',
health_status_to_number(health['status'])
)
def get_df(self):
# maybe get the to-be-exported metrics from a config?
df = self.get('df')
for stat in DF_CLUSTER:
path = 'cluster_{}'.format(stat)
self.metrics[path].set(df['stats'][stat])
self.metrics.set('cluster_{}'.format(stat), df['stats'][stat])
for pool in df['pools']:
for stat in DF_POOL:
path = 'pool_{}'.format(stat)
self.metrics[path].set(pool['stats'][stat], (pool['id'],))
self.metrics.append('pool_{}'.format(stat),
pool['stats'][stat],
(pool['id'],))
def get_fs(self):
fs_map = self.get('fs_map')
servers = self.get_service_list()
active_daemons = []
for fs in fs_map['filesystems']:
# collect fs metadata
data_pools = ",".join([str(pool) for pool in fs['mdsmap']['data_pools']])
self.metrics.append('fs_metadata', 1,
(data_pools,
fs['id'],
fs['mdsmap']['metadata_pool'],
fs['mdsmap']['fs_name']))
for gid, daemon in fs['mdsmap']['info'].items():
id_ = daemon['name']
host_version = servers.get((id_, 'mds'), ('',''))
self.metrics.append('mds_metadata', 1,
(id_, fs['id'], host_version[0],
daemon['addr'], daemon['rank'],
host_version[1]))
def get_quorum_status(self):
mon_status = json.loads(self.get('mon_status')['json'])
self.metrics['mon_quorum_count'].set(len(mon_status['quorum']))
servers = self.get_service_list()
for mon in mon_status['monmap']['mons']:
rank = mon['rank']
id_ = mon['name']
host_version = servers.get((id_, 'mon'), ('',''))
self.metrics.append('mon_metadata', 1,
(id_, host_version[0],
mon['public_addr'].split(':')[0], rank,
host_version[1]))
in_quorum = int(rank in mon_status['quorum'])
self.metrics.append('mon_quorum_status', in_quorum,
('mon_{}'.format(id_),))
def get_pg_status(self):
# TODO add per pool status?
pg_status = self.get('pg_status')
# Set total count of PGs, first
self.metrics['pg_total'].set(
pg_status['num_pgs'],
)
self.metrics.set('pg_total', pg_status['num_pgs'])
reported_states = {}
for pg in pg_status['pgs_by_state']:
@ -281,15 +375,14 @@ class Module(MgrModule):
for state in reported_states:
path = 'pg_{}'.format(state)
try:
self.metrics[path].set(reported_states[state])
self.metrics.set(path, reported_states[state])
except KeyError:
self.log.warn("skipping pg in unknown state {}".format(state))
for state in PG_STATES:
path = 'pg_{}'.format(state)
if state not in reported_states:
try:
self.metrics[path].set(0)
self.metrics.set('pg_{}'.format(state), 0)
except KeyError:
self.log.warn("skipping pg in unknown state {}".format(state))
@ -298,21 +391,32 @@ class Module(MgrModule):
for osd in osd_stats['osd_stats']:
id_ = osd['osd']
for stat in OSD_STATS:
status = osd['perf_stat'][stat]
self.metrics['osd_{}'.format(stat)].set(
status,
('osd.{}'.format(id_),))
val = osd['perf_stat'][stat]
self.metrics.append('osd_{}'.format(stat), val,
('osd.{}'.format(id_),))
def get_service_list(self):
ret = {}
for server in self.list_servers():
version = server.get('ceph_version', '')
host = server.get('hostname', '')
for service in server.get('services', []):
ret.update({(service['id'], service['type']): (host, version)})
return ret
def get_metadata_and_osd_status(self):
osd_map = self.get('osd_map')
osd_flags = osd_map['flags'].split(',')
for flag in OSD_FLAGS:
self.metrics['osd_flag_{}'.format(flag)].set(
int(flag in osd_flags)
)
self.metrics.set('osd_flag_{}'.format(flag),
int(flag in osd_flags))
osd_devices = self.get('osd_map_crush')['devices']
servers = self.get_service_list()
for osd in osd_map['osds']:
id_ = osd['osd']
# id can be used to link osd metrics and metadata
id_ = str(osd['osd'])
# collect osd metadata
p_addr = osd['public_addr'].split(':')[0]
c_addr = osd['cluster_addr'].split(':')[0]
if p_addr == "-" or c_addr == "-":
@ -321,20 +425,21 @@ class Module(MgrModule):
" and metadata records for this osd".format(id_)
)
continue
dev_class = next((osd for osd in osd_devices if osd['id'] == id_))
self.metrics['osd_metadata'].set(1, (
c_addr,
dev_class['class'],
id_,
p_addr
))
dev_class = next((osd for osd in osd_devices if str(osd['id']) == id_))
host_version = servers.get((id_, 'osd'), ('',''))
self.metrics.append('osd_metadata', 1, (c_addr,
dev_class.get('class',''),
id_, host_version[0],
p_addr, host_version[1]))
# collect osd status
for state in OSD_STATUS:
status = osd[state]
self.metrics['osd_{}'.format(state)].set(
status,
('osd.{}'.format(id_),))
self.metrics.append('osd_{}'.format(state), status,
('osd.{}'.format(id_),))
osd_metadata = self.get_metadata("osd", str(id_))
# collect disk occupation metadata
osd_metadata = self.get_metadata("osd", id_)
if osd_metadata is None:
continue
dev_keys = ("backend_filestore_dev_node", "bluestore_bdev_dev_node")
@ -348,7 +453,7 @@ class Module(MgrModule):
if osd_dev_node and osd_hostname:
self.log.debug("Got dev for osd {0}: {1}/{2}".format(
id_, osd_hostname, osd_dev_node))
self.metrics['disk_occupation'].set(1, (
self.metrics.set('disk_occupation', 1, (
osd_hostname,
osd_dev_node,
"osd.{0}".format(id_)
@ -357,20 +462,20 @@ class Module(MgrModule):
self.log.info("Missing dev node metadata for osd {0}, skipping "
"occupation record for this osd".format(id_))
pool_meta = []
for pool in osd_map['pools']:
id_ = pool['pool']
name = pool['pool_name']
self.metrics['pool_metadata'].set(1, (id_, name))
self.metrics.append('pool_metadata', 1, (pool['pool'], pool['pool_name']))
def collect(self):
self.get_health()
self.get_df()
self.get_fs()
self.get_osd_stats()
self.get_quorum_status()
self.get_metadata_and_osd_status()
self.get_pg_status()
for daemon, counters in self.get_all_perf_counters().iteritems():
for daemon, counters in self.get_all_perf_counters().items():
for path, counter_info in counters.items():
stattype = self._stattype_to_str(counter_info['type'])
# XXX simplify first effort: no histograms
@ -379,20 +484,18 @@ class Module(MgrModule):
self.log.debug('ignoring %s, type %s' % (path, stattype))
continue
if path not in self.metrics:
self.metrics[path] = Metric(
self.metrics.add_metric(path, Metric(
stattype,
path,
counter_info['description'],
("ceph_daemon",),
)
))
self.metrics[path].set(
counter_info['value'],
(daemon,)
)
self.metrics.append(path, counter_info['value'], (daemon,))
# It is sufficient to reset the pending metrics once per scrape
self.metrics.reset()
return self.metrics
return self.metrics.metrics
def handle_command(self, cmd):
if cmd['prefix'] == 'prometheus self-test':