Merge pull request #22732 from b-ranto/wip-metric-reset

prometheus: Fix metric resets

Reviewed-by: Jan Fajerski <jfajerski@suse.com>
This commit is contained in:
Boris Ranto 2018-07-18 16:20:57 +02:00 committed by GitHub
commit 2e4c8896b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -5,7 +5,7 @@ import math
import os
import socket
import threading
from collections import OrderedDict
import time
from mgr_module import MgrModule, MgrStandbyModule, CommandResult, PG_STATES
# Defaults for the Prometheus HTTP server. Can also set in config-key
@ -75,42 +75,103 @@ DISK_OCCUPATION = ( 'ceph_daemon', 'device','instance')
NUM_OBJECTS = ['degraded', 'misplaced', 'unfound']
class Metrics(object):
def __init__(self):
class Metric(object):
def __init__(self, mtype, name, desc, labels=None):
self.mtype = mtype
self.name = name
self.desc = desc
self.labelnames = labels # tuple if present
self.value = {} # indexed by label values
def clear(self):
self.value = {}
def set(self, value, labelvalues=None):
# labelvalues must be a tuple
labelvalues = labelvalues or ('',)
self.value[labelvalues] = value
def str_expfmt(self):
def promethize(path):
''' replace illegal metric name characters '''
result = path.replace('.', '_').replace('+', '_plus').replace('::', '_')
# Hyphens usually turn into underscores, unless they are
# trailing
if result.endswith("-"):
result = result[0:-1] + "_minus"
else:
result = result.replace("-", "_")
return "ceph_{0}".format(result)
def floatstr(value):
''' represent as Go-compatible float '''
if value == float('inf'):
return '+Inf'
if value == float('-inf'):
return '-Inf'
if math.isnan(value):
return 'NaN'
return repr(float(value))
name = promethize(self.name)
expfmt = '''
# HELP {name} {desc}
# TYPE {name} {mtype}'''.format(
name=name,
desc=self.desc,
mtype=self.mtype,
)
for labelvalues, value in self.value.items():
if self.labelnames:
labels = zip(self.labelnames, labelvalues)
labels = ','.join('%s="%s"' % (k, v) for k, v in labels)
else:
labels = ''
if labels:
fmtstr = '\n{name}{{{labels}}} {value}'
else:
fmtstr = '\n{name} {value}'
expfmt += fmtstr.format(
name=name,
labels=labels,
value=floatstr(value),
)
return expfmt
class Module(MgrModule):
COMMANDS = [
{
"cmd": "prometheus self-test",
"desc": "Run a self test on the prometheus module",
"perm": "rw"
},
{
"cmd": "prometheus file_sd_config",
"desc": "Return file_sd compatible prometheus config for mgr cluster",
"perm": "r"
},
]
OPTIONS = [
{'name': 'server_addr'},
{'name': 'server_port'},
{'name': 'scrape_interval'},
]
def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)
self.metrics = self._setup_static_metrics()
self.pending = {}
def set(self, key, value, labels=('',)):
'''
Set the value of a single Metrics. This should be used for static metrics,
e.g. cluster health.
'''
self.metrics[key].set(value, labels)
def append(self, key, value, labels = ('',)):
'''
Append a metrics to the staging area. Use this to aggregate daemon specific
metrics that can appear and go away as daemons are added or removed.
'''
if key not in self.pending:
self.pending[key] = []
self.pending[key].append((labels, value))
def reset(self):
'''
When metrics aggregation is done, call Metrics.reset() to apply the
aggregated metric. This will remove all label -> value mappings for a
metric and set the new mapping (from pending). This means daemon specific
metrics os daemons that do no longer exist, are removed.
'''
for k, v in self.pending.items():
self.metrics[k].reset(v)
self.pending = {}
def add_metric(self, path, metric):
if path not in self.metrics:
self.metrics[path] = metric
self.shutdown_event = threading.Event()
self.collect_lock = threading.RLock()
self.collect_time = 0
self.collect_timeout = 5.0
self.collect_cache = None
_global_instance['plugin'] = self
def _setup_static_metrics(self):
metrics = {}
@ -235,121 +296,24 @@ class Metrics(object):
return metrics
class Metric(object):
def __init__(self, mtype, name, desc, labels=None):
self.mtype = mtype
self.name = name
self.desc = desc
self.labelnames = labels # tuple if present
self.value = {} # indexed by label values
def set(self, value, labelvalues=None):
# labelvalues must be a tuple
labelvalues = labelvalues or ('',)
self.value[labelvalues] = value
def reset(self, values):
self.value = {}
for labelvalues, value in values:
self.value[labelvalues] = value
def str_expfmt(self):
def promethize(path):
''' replace illegal metric name characters '''
result = path.replace('.', '_').replace('+', '_plus').replace('::', '_')
# Hyphens usually turn into underscores, unless they are
# trailing
if result.endswith("-"):
result = result[0:-1] + "_minus"
else:
result = result.replace("-", "_")
return "ceph_{0}".format(result)
def floatstr(value):
''' represent as Go-compatible float '''
if value == float('inf'):
return '+Inf'
if value == float('-inf'):
return '-Inf'
if math.isnan(value):
return 'NaN'
return repr(float(value))
name = promethize(self.name)
expfmt = '''
# HELP {name} {desc}
# TYPE {name} {mtype}'''.format(
name=name,
desc=self.desc,
mtype=self.mtype,
)
for labelvalues, value in self.value.items():
if self.labelnames:
labels = zip(self.labelnames, labelvalues)
labels = ','.join('%s="%s"' % (k, v) for k, v in labels)
else:
labels = ''
if labels:
fmtstr = '\n{name}{{{labels}}} {value}'
else:
fmtstr = '\n{name} {value}'
expfmt += fmtstr.format(
name=name,
labels=labels,
value=floatstr(value),
)
return expfmt
class Module(MgrModule):
COMMANDS = [
{
"cmd": "prometheus self-test",
"desc": "Run a self test on the prometheus module",
"perm": "rw"
},
{
"cmd": "prometheus file_sd_config",
"desc": "Return file_sd compatible prometheus config for mgr cluster",
"perm": "r"
},
]
OPTIONS = [
{'name': 'server_addr'},
{'name': 'server_port'},
]
def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)
self.metrics = Metrics()
self.schema = OrderedDict()
self.shutdown_event = threading.Event()
_global_instance['plugin'] = self
def get_health(self):
health = json.loads(self.get('health')['json'])
self.metrics.set('health_status',
health_status_to_number(health['status'])
self.metrics['health_status'].set(
health_status_to_number(health['status'])
)
def get_df(self):
# maybe get the to-be-exported metrics from a config?
df = self.get('df')
for stat in DF_CLUSTER:
self.metrics.set('cluster_{}'.format(stat), df['stats'][stat])
self.metrics['cluster_{}'.format(stat)].set(df['stats'][stat])
for pool in df['pools']:
for stat in DF_POOL:
self.metrics.append('pool_{}'.format(stat),
pool['stats'][stat],
(pool['id'],))
self.metrics['pool_{}'.format(stat)].set(
pool['stats'][stat],
(pool['id'],)
)
def get_fs(self):
fs_map = self.get('fs_map')
@ -358,19 +322,21 @@ class Module(MgrModule):
for fs in fs_map['filesystems']:
# collect fs metadata
data_pools = ",".join([str(pool) for pool in fs['mdsmap']['data_pools']])
self.metrics.append('fs_metadata', 1,
(data_pools,
fs['id'],
fs['mdsmap']['metadata_pool'],
fs['mdsmap']['fs_name']))
self.metrics['fs_metadata'].set(1, (
data_pools,
fs['id'],
fs['mdsmap']['metadata_pool'],
fs['mdsmap']['fs_name']
))
self.log.debug('mdsmap: {}'.format(fs['mdsmap']))
for gid, daemon in fs['mdsmap']['info'].items():
id_ = daemon['name']
host_version = servers.get((id_, 'mds'), ('',''))
self.metrics.append('mds_metadata', 1,
('mds.{}'.format(id_), fs['id'],
host_version[0], daemon['addr'],
daemon['rank'], host_version[1]))
self.metrics['mds_metadata'].set(1, (
'mds.{}'.format(id_), fs['id'],
host_version[0], daemon['addr'],
daemon['rank'], host_version[1]
))
def get_quorum_status(self):
mon_status = json.loads(self.get('mon_status')['json'])
@ -379,20 +345,22 @@ class Module(MgrModule):
rank = mon['rank']
id_ = mon['name']
host_version = servers.get((id_, 'mon'), ('',''))
self.metrics.append('mon_metadata', 1,
('mon.{}'.format(id_), host_version[0],
mon['public_addr'].split(':')[0], rank,
host_version[1]))
self.metrics['mon_metadata'].set(1, (
'mon.{}'.format(id_), host_version[0],
mon['public_addr'].split(':')[0], rank,
host_version[1]
))
in_quorum = int(rank in mon_status['quorum'])
self.metrics.append('mon_quorum_status', in_quorum,
('mon.{}'.format(id_),))
self.metrics['mon_quorum_status'].set(in_quorum, (
'mon.{}'.format(id_),
))
def get_pg_status(self):
# TODO add per pool status?
pg_status = self.get('pg_status')
# Set total count of PGs, first
self.metrics.set('pg_total', pg_status['num_pgs'])
self.metrics['pg_total'].set(pg_status['num_pgs'])
reported_states = {}
for pg in pg_status['pgs_by_state']:
@ -402,14 +370,14 @@ class Module(MgrModule):
for state in reported_states:
path = 'pg_{}'.format(state)
try:
self.metrics.set(path, reported_states[state])
self.metrics[path].set(reported_states[state])
except KeyError:
self.log.warn("skipping pg in unknown state {}".format(state))
for state in PG_STATES:
if state not in reported_states:
try:
self.metrics.set('pg_{}'.format(state), 0)
self.metrics['pg_{}'.format(state)].set(0)
except KeyError:
self.log.warn("skipping pg in unknown state {}".format(state))
@ -419,8 +387,9 @@ class Module(MgrModule):
id_ = osd['osd']
for stat in OSD_STATS:
val = osd['perf_stat'][stat]
self.metrics.append('osd_{}'.format(stat), val,
('osd.{}'.format(id_),))
self.metrics['osd_{}'.format(stat)].set(val, (
'osd.{}'.format(id_),
))
def get_service_list(self):
ret = {}
@ -435,8 +404,9 @@ class Module(MgrModule):
osd_map = self.get('osd_map')
osd_flags = osd_map['flags'].split(',')
for flag in OSD_FLAGS:
self.metrics.set('osd_flag_{}'.format(flag),
int(flag in osd_flags))
self.metrics['osd_flag_{}'.format(flag)].set(
int(flag in osd_flags)
)
osd_devices = self.get('osd_map_crush')['devices']
servers = self.get_service_list()
@ -467,7 +437,7 @@ class Module(MgrModule):
host_version = servers.get((str(id_), 'osd'), ('',''))
self.metrics.append('osd_metadata', 1, (
self.metrics['osd_metadata'].set(1, (
'osd.{}'.format(id_),
c_addr,
dev_class,
@ -478,8 +448,9 @@ class Module(MgrModule):
# collect osd status
for state in OSD_STATUS:
status = osd[state]
self.metrics.append('osd_{}'.format(state), status,
('osd.{}'.format(id_),))
self.metrics['osd_{}'.format(state)].set(status, (
'osd.{}'.format(id_),
))
# collect disk occupation metadata
osd_metadata = self.get_metadata("osd", str(id_))
@ -496,7 +467,7 @@ class Module(MgrModule):
if osd_dev_node and osd_hostname:
self.log.debug("Got dev for osd {0}: {1}/{2}".format(
id_, osd_hostname, osd_dev_node))
self.metrics.set('disk_occupation', 1, (
self.metrics['disk_occupation'].set(1, (
"osd.{0}".format(id_),
osd_dev_node,
osd_hostname
@ -507,7 +478,7 @@ class Module(MgrModule):
pool_meta = []
for pool in osd_map['pools']:
self.metrics.append('pool_metadata', 1, (pool['pool'], pool['pool_name']))
self.metrics['pool_metadata'].set(1, (pool['pool'], pool['pool_name']))
# Populate rgw_metadata
for key, value in servers.items():
@ -515,8 +486,7 @@ class Module(MgrModule):
if service_type != 'rgw':
continue
hostname, version = value
self.metrics.append(
'rgw_metadata',
self.metrics['rgw_metadata'].set(
1,
('{}.{}'.format(service_type, service_id), hostname, version)
)
@ -525,9 +495,13 @@ class Module(MgrModule):
pg_sum = self.get('pg_summary')['pg_stats_sum']['stat_sum']
for obj in NUM_OBJECTS:
stat = 'num_objects_{}'.format(obj)
self.metrics.set(stat, pg_sum[stat])
self.metrics[stat].set(pg_sum[stat])
def collect(self):
# Clear the metrics before scraping
for k in self.metrics.keys():
self.metrics[k].clear()
self.get_health()
self.get_df()
self.get_fs()
@ -551,35 +525,40 @@ class Module(MgrModule):
# Represent the long running avgs as sum/count pairs
if counter_info['type'] & self.PERFCOUNTER_LONGRUNAVG:
_path = path + '_sum'
self.metrics.add_metric(_path, Metric(
stattype,
_path,
counter_info['description'] + ' Total',
("ceph_daemon",),
))
self.metrics.append(_path, value, (daemon,))
if _path not in self.metrics:
self.metrics[_path] = Metric(
stattype,
_path,
counter_info['description'] + ' Total',
("ceph_daemon",),
)
self.metrics[_path].set(value, (daemon,))
_path = path + '_count'
self.metrics.add_metric(_path, Metric(
'counter',
_path,
counter_info['description'] + ' Count',
("ceph_daemon",),
))
self.metrics.append(_path, counter_info['count'], (daemon,))
if _path not in self.metrics:
self.metrics[_path] = Metric(
'counter',
_path,
counter_info['description'] + ' Count',
("ceph_daemon",),
)
self.metrics[_path].set(counter_info['count'], (daemon,))
else:
self.metrics.add_metric(path, Metric(
stattype,
path,
counter_info['description'],
("ceph_daemon",),
))
self.metrics.append(path, value, (daemon,))
if path not in self.metrics:
self.metrics[path] = Metric(
stattype,
path,
counter_info['description'],
("ceph_daemon",),
)
self.metrics[path].set(value, (daemon,))
# It is sufficient to reset the pending metrics once per scrape
self.metrics.reset()
# Return formatted metrics and clear no longer used data
_metrics = [m.str_expfmt() for m in self.metrics.values()]
for k in self.metrics.keys():
self.metrics[k].clear()
return self.metrics.metrics
return ''.join(_metrics) + '\n'
def get_file_sd_config(self):
servers = self.list_servers()
@ -637,12 +616,6 @@ class Module(MgrModule):
cherrypy.request.path = ''
return self
def format_metrics(self, metrics):
formatted = ''
for m in metrics.values():
formatted += m.str_expfmt()
return formatted + '\n'
@cherrypy.expose
def index(self):
return '''<!DOCTYPE html>
@ -656,14 +629,32 @@ class Module(MgrModule):
@cherrypy.expose
def metrics(self):
if global_instance().have_mon_connection():
metrics = global_instance().collect()
instance = global_instance()
# Lock the function execution
try:
instance.collect_lock.acquire()
return self._metrics(instance)
finally:
instance.collect_lock.release()
def _metrics(self, instance):
# Return cached data if available and collected before the cache times out
if instance.collect_cache and time.time() - instance.collect_time < instance.collect_timeout:
cherrypy.response.headers['Content-Type'] = 'text/plain'
if metrics:
return self.format_metrics(metrics)
return instance.collect_cache
if instance.have_mon_connection():
instance.collect_cache = None
instance.collect_time = time.time()
instance.collect_cache = instance.collect()
cherrypy.response.headers['Content-Type'] = 'text/plain'
return instance.collect_cache
else:
raise cherrypy.HTTPError(503, 'No MON connection')
# Make the cache timeout for collecting configurable
self.collect_timeout = self.get_localized_config('scrape_interval', 5.0)
server_addr = self.get_localized_config('server_addr', DEFAULT_ADDR)
server_port = self.get_localized_config('server_port', DEFAULT_PORT)
self.log.info(