Merge pull request #17735 from jcsp/wip-mgr-perf-interface

mgr: common interface for TSDB modules

Reviewed-by: My Do <mhdo@umich.edu>
Reviewed-by: Jan Fajerski <jfajerski@suse.com>
Reviewed-by: John Spray <john.spray@redhat.com>
This commit is contained in:
John Spray 2017-10-02 11:12:35 +01:00 committed by GitHub
commit 47bfe6cf17
23 changed files with 854 additions and 182 deletions

View File

@ -32,4 +32,5 @@ sensible.
Zabbix plugin <zabbix>
Prometheus plugin <prometheus>
Writing plugins <plugins>
Influx plugin <influx>

162
doc/mgr/influx.rst Normal file
View File

@ -0,0 +1,162 @@
=============
Influx Plugin
=============
The influx plugin continuously collects and sends time series data to an
influxdb database.
The influx plugin was introduced in the 13.x *Mimic* release.
--------
Enabling
--------
To enable the module, use the following command:
::
ceph mgr module enable influx
If you wish to subsequently disable the module, you can use the equivalent
*disable* command:
::
ceph mgr module disable influx
-------------
Configuration
-------------
For the influx module to send statistics to an InfluxDB server, it
is necessary to configure the servers address and some authentication
credentials.
Set configuration values using the following command:
::
ceph config-key set mgr/influx/<key> <value>
The most important settings are ``hostname``, ``username`` and ``password``.
For example, a typical configuration might look like this:
::
ceph config-key set mgr/influx/hostname influx.mydomain.com
ceph config-key set mgr/influx/username admin123
ceph config-key set mgr/influx/password p4ssw0rd
Additional optional configuration settings are:
:interval: Time between reports to InfluxDB. Default 5 seconds.
:database: InfluxDB database name. Default "ceph"
:port: InfluxDB server port. Default 8086
---------
Debugging
---------
By default, a few debugging statments as well as error statements have been set to print in the log files. Users can add more if necessary.
To make use of the debugging option in the module:
- Add this to the ceph.conf file.::
[mgr]
debug_mgr = 20
- Use this command ``ceph tell mgr.<mymonitor> influx self-test``.
- Check the log files. Users may find it easier to filter the log files using *mgr[influx]*.
--------------------
Interesting counters
--------------------
The following tables describe a subset of the values output by
this module.
^^^^^
Pools
^^^^^
+---------------+-----------------------------------------------------+
|Counter | Description |
+===============+=====================================================+
|bytes_used | Bytes used in the pool not including copies |
+---------------+-----------------------------------------------------+
|max_avail | Max available number of bytes in the pool |
+---------------+-----------------------------------------------------+
|objects | Number of objects in the pool |
+---------------+-----------------------------------------------------+
|wr_bytes | Number of bytes written in the pool |
+---------------+-----------------------------------------------------+
|dirty | Number of bytes dirty in the pool |
+---------------+-----------------------------------------------------+
|rd_bytes | Number of bytes read in the pool |
+---------------+-----------------------------------------------------+
|raw_bytes_used | Bytes used in pool including copies made |
+---------------+-----------------------------------------------------+
^^^^
OSDs
^^^^
+------------+------------------------------------+
|Counter | Description |
+============+====================================+
|op_w | Client write operations |
+------------+------------------------------------+
|op_in_bytes | Client operations total write size |
+------------+------------------------------------+
|op_r | Client read operations |
+------------+------------------------------------+
|op_out_bytes| Client operations total read size |
+------------+------------------------------------+
+------------------------+--------------------------------------------------------------------------+
|Counter | Description |
+========================+==========================================================================+
|op_wip | Replication operations currently being processed (primary) |
+------------------------+--------------------------------------------------------------------------+
|op_latency | Latency of client operations (including queue time) |
+------------------------+--------------------------------------------------------------------------+
|op_process_latency | Latency of client operations (excluding queue time) |
+------------------------+--------------------------------------------------------------------------+
|op_prepare_latency | Latency of client operations (excluding queue time and wait for finished)|
+------------------------+--------------------------------------------------------------------------+
|op_r_latency | Latency of read operation (including queue time) |
+------------------------+--------------------------------------------------------------------------+
|op_r_process_latency | Latency of read operation (excluding queue time) |
+------------------------+--------------------------------------------------------------------------+
|op_w_in_bytes | Client data written |
+------------------------+--------------------------------------------------------------------------+
|op_w_latency | Latency of write operation (including queue time) |
+------------------------+--------------------------------------------------------------------------+
|op_w_process_latency | Latency of write operation (excluding queue time) |
+------------------------+--------------------------------------------------------------------------+
|op_w_prepare_latency | Latency of write operations (excluding queue time and wait for finished) |
+------------------------+--------------------------------------------------------------------------+
|op_rw | Client read-modify-write operations |
+------------------------+--------------------------------------------------------------------------+
|op_rw_in_bytes | Client read-modify-write operations write in |
+------------------------+--------------------------------------------------------------------------+
|op_rw_out_bytes | Client read-modify-write operations read out |
+------------------------+--------------------------------------------------------------------------+
|op_rw_latency | Latency of read-modify-write operation (including queue time) |
+------------------------+--------------------------------------------------------------------------+
|op_rw_process_latency | Latency of read-modify-write operation (excluding queue time) |
+------------------------+--------------------------------------------------------------------------+
|op_rw_prepare_latency | Latency of read-modify-write operations (excluding queue time |
| | and wait for finished) |
+------------------------+--------------------------------------------------------------------------+
|op_before_queue_op_lat | Latency of IO before calling queue (before really queue into ShardedOpWq)|
| | op_before_dequeue_op_lat |
+------------------------+--------------------------------------------------------------------------+
|op_before_dequeue_op_lat| Latency of IO before calling dequeue_op(already dequeued and get PG lock)|
+------------------------+--------------------------------------------------------------------------+
Latency counters are measured in microseconds unless otherwise specified in the description.

View File

@ -0,0 +1,16 @@
tasks:
- install:
- ceph:
# tests may leave mgrs broken, so don't try and call into them
# to invoke e.g. pg dump during teardown.
wait-for-scrub: false
log-whitelist:
- overall HEALTH_
- \(MGR_DOWN\)
- \(PG_
- replacing it with standby
- No standby daemons available
- cephfs_test_runner:
modules:
- tasks.mgr.test_module_selftest

View File

@ -0,0 +1,38 @@
from tasks.mgr.mgr_test_case import MgrTestCase
class TestModuleSelftest(MgrTestCase):
"""
That modules with a self-test command can be loaded and execute it
without errors.
This is not a substitute for really testing the modules, but it
is quick and is designed to catch regressions that could occur
if data structures change in a way that breaks how the modules
touch them.
"""
MGRS_REQUIRED = 1
def _selftest_plugin(self, plugin_name):
initial_gid = self.mgr_cluster.get_mgr_map()['active_gid']
self.mgr_cluster.mon_manager.raw_cluster_cmd("mgr", "module", "enable",
plugin_name)
# Wait for the module to load
def has_restarted():
map = self.mgr_cluster.get_mgr_map()
return map['active_gid'] != initial_gid and map['available']
self.wait_until_true(has_restarted, timeout=30)
# Execute the module's self-test routine
self.mgr_cluster.mon_manager.raw_cluster_cmd(plugin_name, "self-test")
def test_zabbix(self):
self._selftest_plugin("zabbix")
def test_prometheus(self):
self._selftest_plugin("influx")
def test_influx(self):
self._selftest_plugin("prometheus")

View File

@ -48,7 +48,7 @@ PRIO_USEFUL = 5
PRIO_UNINTERESTING = 2
PRIO_DEBUGONLY = 0
PRIO_DEFAULT = PRIO_USEFUL
PRIO_DEFAULT = PRIO_INTERESTING
# Make life easier on developers:
# If our parent dir contains CMakeCache.txt and bin/init-ceph,

View File

@ -53,7 +53,7 @@ void PerfCountersCollection::add(class PerfCounters *l)
path += ".";
path += data.name;
by_path[path] = &data;
by_path[path] = {&data, l};
}
}
@ -396,12 +396,7 @@ void PerfCounters::dump_formatted_generic(Formatter *f, bool schema,
} else {
f->dump_string("nick", "");
}
if (d->prio) {
int p = std::max(std::min(d->prio + prio_adjust,
(int)PerfCountersBuilder::PRIO_CRITICAL),
0);
f->dump_int("priority", p);
}
f->dump_int("priority", get_adjusted_priority(d->prio));
f->close_section();
} else {
if (d->type & PERFCOUNTER_LONGRUNAVG) {
@ -549,7 +544,7 @@ void PerfCountersBuilder::add_impl(
assert(strlen(nick) <= 4);
}
data.nick = nick;
data.prio = prio;
data.prio = prio ? prio : prio_default;
data.type = (enum perfcounter_type_d)ty;
data.histogram = std::move(histogram);
}

View File

@ -42,6 +42,80 @@ enum perfcounter_type_d : uint8_t
};
/* Class for constructing a PerfCounters object.
*
* This class performs some validation that the parameters we have supplied are
* correct in create_perf_counters().
*
* In the future, we will probably get rid of the first/last arguments, since
* PerfCountersBuilder can deduce them itself.
*/
class PerfCountersBuilder
{
public:
PerfCountersBuilder(CephContext *cct, const std::string &name,
int first, int last);
~PerfCountersBuilder();
// prio values: higher is better, and higher values get included in
// 'ceph daemonperf' (and similar) results.
// Use of priorities enables us to add large numbers of counters
// internally without necessarily overwhelming consumers.
enum {
PRIO_CRITICAL = 10,
// 'interesting' is the default threshold for `daemonperf` output
PRIO_INTERESTING = 8,
// `useful` is the default threshold for transmission to ceph-mgr
// and inclusion in prometheus/influxdb plugin output
PRIO_USEFUL = 5,
PRIO_UNINTERESTING = 2,
PRIO_DEBUGONLY = 0,
};
void add_u64(int key, const char *name,
const char *description=NULL, const char *nick = NULL,
int prio=0);
void add_u64_counter(int key, const char *name,
const char *description=NULL,
const char *nick = NULL,
int prio=0);
void add_u64_avg(int key, const char *name,
const char *description=NULL,
const char *nick = NULL,
int prio=0);
void add_time(int key, const char *name,
const char *description=NULL,
const char *nick = NULL,
int prio=0);
void add_time_avg(int key, const char *name,
const char *description=NULL,
const char *nick = NULL,
int prio=0);
void add_u64_counter_histogram(
int key, const char* name,
PerfHistogramCommon::axis_config_d x_axis_config,
PerfHistogramCommon::axis_config_d y_axis_config,
const char *description=NULL,
const char* nick = NULL,
int prio=0);
void set_prio_default(int prio_)
{
prio_default = prio_;
}
PerfCounters* create_perf_counters();
private:
PerfCountersBuilder(const PerfCountersBuilder &rhs);
PerfCountersBuilder& operator=(const PerfCountersBuilder &rhs);
void add_impl(int idx, const char *name,
const char *description, const char *nick, int prio, int ty,
unique_ptr<PerfHistogram<>> histogram = nullptr);
PerfCounters *m_perf_counters;
int prio_default = 0;
};
/*
* A PerfCounters object is usually associated with a single subsystem.
* It contains counters which we modify to track performance and throughput
@ -96,7 +170,7 @@ public:
const char *name;
const char *description;
const char *nick;
int prio = 0;
uint8_t prio = 0;
enum perfcounter_type_d type;
std::atomic<uint64_t> u64 = { 0 };
std::atomic<uint64_t> avgcount = { 0 };
@ -179,6 +253,12 @@ public:
prio_adjust = p;
}
int get_adjusted_priority(int p) const {
return std::max(std::min(p + prio_adjust,
(int)PerfCountersBuilder::PRIO_CRITICAL),
0);
}
private:
PerfCounters(CephContext *cct, const std::string &name,
int lower_bound, int upper_bound);
@ -240,8 +320,17 @@ public:
dump_formatted_generic(f, schema, true, logger, counter);
}
// A reference to a perf_counter_data_any_d, with an accompanying
// pointer to the enclosing PerfCounters, in order that the consumer
// can see the prio_adjust
class PerfCounterRef
{
public:
PerfCounters::perf_counter_data_any_d *data;
PerfCounters *perf_counters;
};
typedef std::map<std::string,
PerfCounters::perf_counter_data_any_d *> CounterMap;
PerfCounterRef> CounterMap;
void with_counters(std::function<void(const CounterMap &)>) const;
@ -257,72 +346,12 @@ private:
perf_counters_set_t m_loggers;
std::map<std::string, PerfCounters::perf_counter_data_any_d *> by_path;
CounterMap by_path;
friend class PerfCountersCollectionTest;
};
/* Class for constructing a PerfCounters object.
*
* This class performs some validation that the parameters we have supplied are
* correct in create_perf_counters().
*
* In the future, we will probably get rid of the first/last arguments, since
* PerfCountersBuilder can deduce them itself.
*/
class PerfCountersBuilder
{
public:
PerfCountersBuilder(CephContext *cct, const std::string &name,
int first, int last);
~PerfCountersBuilder();
// prio values: higher is better, and higher values get included in
// 'ceph daemonperf' (and similar) results.
enum {
PRIO_CRITICAL = 10,
PRIO_INTERESTING = 8,
PRIO_USEFUL = 5,
PRIO_UNINTERESTING = 2,
PRIO_DEBUGONLY = 0,
};
void add_u64(int key, const char *name,
const char *description=NULL, const char *nick = NULL,
int prio=0);
void add_u64_counter(int key, const char *name,
const char *description=NULL,
const char *nick = NULL,
int prio=0);
void add_u64_avg(int key, const char *name,
const char *description=NULL,
const char *nick = NULL,
int prio=0);
void add_time(int key, const char *name,
const char *description=NULL,
const char *nick = NULL,
int prio=0);
void add_time_avg(int key, const char *name,
const char *description=NULL,
const char *nick = NULL,
int prio=0);
void add_u64_counter_histogram(
int key, const char* name,
PerfHistogramCommon::axis_config_d x_axis_config,
PerfHistogramCommon::axis_config_d y_axis_config,
const char *description=NULL,
const char* nick = NULL,
int prio=0);
PerfCounters* create_perf_counters();
private:
PerfCountersBuilder(const PerfCountersBuilder &rhs);
PerfCountersBuilder& operator=(const PerfCountersBuilder &rhs);
void add_impl(int idx, const char *name,
const char *description, const char *nick, int prio, int ty,
unique_ptr<PerfHistogram<>> histogram = nullptr);
PerfCounters *m_perf_counters;
};
class PerfCountersDeleter {
CephContext* cct;

View File

@ -12,6 +12,7 @@
#include <boost/scoped_ptr.hpp>
#include "include/encoding.h"
#include "common/Formatter.h"
#include "common/perf_counters.h"
using std::string;
/**
@ -350,6 +351,15 @@ public:
virtual void get_statistics(Formatter *f) {
return;
}
/**
* Return your perf counters if you have any. Subclasses are not
* required to implement this, and callers must respect a null return
* value.
*/
virtual PerfCounters *get_perf_counters() {
return nullptr;
}
protected:
/// List of matching prefixes and merge operators
std::vector<std::pair<std::string,

View File

@ -184,6 +184,11 @@ public:
void close() override;
PerfCounters *get_perf_counters() override
{
return logger;
}
class LevelDBTransactionImpl : public KeyValueDB::TransactionImpl {
public:
leveldb::WriteBatch bat;

View File

@ -159,6 +159,11 @@ public:
void split_stats(const std::string &s, char delim, std::vector<std::string> &elems);
void get_statistics(Formatter *f) override;
PerfCounters *get_perf_counters() override
{
return logger;
}
struct RocksWBHandler: public rocksdb::WriteBatch::Handler {
std::string seen ;
int num_seen = 0;

View File

@ -29,27 +29,36 @@ public:
std::string nick;
enum perfcounter_type_d type;
// For older clients that did not send priority, pretend everything
// is "useful" so that mgr plugins filtering on prio will get some
// data (albeit probably more than they wanted)
uint8_t priority = PerfCountersBuilder::PRIO_USEFUL;
void encode(bufferlist &bl) const
{
// TODO: decide whether to drop the per-type
// encoding here, we could rely on the MgrReport
// verisoning instead.
ENCODE_START(1, 1, bl);
ENCODE_START(2, 1, bl);
::encode(path, bl);
::encode(description, bl);
::encode(nick, bl);
static_assert(sizeof(type) == 1, "perfcounter_type_d must be one byte");
::encode((uint8_t)type, bl);
::encode(priority, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator &p)
{
DECODE_START(1, p);
DECODE_START(2, p);
::decode(path, p);
::decode(description, p);
::decode(nick, p);
::decode((uint8_t&)type, p);
if (struct_v >= 2) {
::decode(priority, p);
}
DECODE_FINISH(p);
}
};

View File

@ -228,27 +228,43 @@ void MgrClient::send_report()
pcc->with_counters([this, report](
const PerfCountersCollection::CounterMap &by_path)
{
// Helper for checking whether a counter should be included
auto include_counter = [this](
const PerfCounters::perf_counter_data_any_d &ctr)
const PerfCounters::perf_counter_data_any_d &ctr,
const PerfCounters &perf_counters)
{
return ctr.prio >= (int)stats_threshold;
return perf_counters.get_adjusted_priority(ctr.prio) >= (int)stats_threshold;
};
// Helper for cases where we want to forget a counter
auto undeclare = [report, this](const std::string &path)
{
report->undeclare_types.push_back(path);
ldout(cct,20) << " undeclare " << path << dendl;
session->declared.erase(path);
};
ENCODE_START(1, 1, report->packed);
// Find counters that no longer exist, and undeclare them
for (auto p = session->declared.begin(); p != session->declared.end(); ) {
if (by_path.count(*p) == 0 || !include_counter(*(by_path.at(*p)))) {
report->undeclare_types.push_back(*p);
ldout(cct,20) << __func__ << " undeclare " << *p << dendl;
p = session->declared.erase(p);
} else {
++p;
const auto &path = *(p++);
if (by_path.count(path) == 0) {
undeclare(path);
}
}
for (const auto &i : by_path) {
auto& path = i.first;
auto& data = *(i.second);
auto& data = *(i.second.data);
auto& perf_counters = *(i.second.perf_counters);
if (!include_counter(data)) {
// Find counters that still exist, but are no longer permitted by
// stats_threshold
if (!include_counter(data, perf_counters)) {
if (session->declared.count(path)) {
undeclare(path);
}
continue;
}
@ -263,6 +279,7 @@ void MgrClient::send_report()
type.nick = data.nick;
}
type.type = data.type;
type.priority = perf_counters.get_adjusted_priority(data.prio);
report->declare_types.push_back(std::move(type));
session->declared.insert(path);
}

View File

@ -158,8 +158,7 @@ void MgrStandby::send_beacon()
bool available = active_mgr != nullptr && active_mgr->is_initialized();
auto addr = available ? active_mgr->get_server_addr() : entity_addr_t();
dout(10) << "sending beacon as gid " << monc.get_global_id()
<< " modules " << modules << dendl;
dout(10) << "sending beacon as gid " << monc.get_global_id() << dendl;
map<string,string> metadata;
metadata["addr"] = monc.get_my_addr().ip_only_to_str();

View File

@ -759,8 +759,6 @@ PyObject* PyModules::get_perf_schema_python(
}
PyFormatter f;
f.open_object_section("perf_schema");
if (!daemons.empty()) {
for (auto statepair : daemons) {
auto key = statepair.first;
@ -771,16 +769,16 @@ PyObject* PyModules::get_perf_schema_python(
f.open_object_section(daemon_name.str().c_str());
Mutex::Locker l(state->lock);
for (const auto &ctr_inst_iter : state->perf_counters.instances) {
const auto &typestr = ctr_inst_iter.first;
f.open_object_section(typestr.c_str());
auto type = state->perf_counters.types[typestr];
for (auto ctr_inst_iter : state->perf_counters.instances) {
const auto &counter_name = ctr_inst_iter.first;
f.open_object_section(counter_name.c_str());
auto type = state->perf_counters.types[counter_name];
f.dump_string("description", type.description);
if (!type.nick.empty()) {
f.dump_string("nick", type.nick);
}
f.dump_unsigned("type", type.type);
f.dump_unsigned("priority", type.priority);
f.close_section();
}
f.close_section();
@ -789,7 +787,6 @@ PyObject* PyModules::get_perf_schema_python(
dout(4) << __func__ << ": No daemon state found for "
<< svc_type << "." << svc_id << ")" << dendl;
}
f.close_section();
return f.get();
}

View File

@ -552,14 +552,22 @@ int Monitor::preinit()
assert(!logger);
{
PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess");
pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions", "sadd");
pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions", "srm");
pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions");
pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in");
pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started");
pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won");
pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost");
pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess",
PerfCountersBuilder::PRIO_USEFUL);
pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions",
"sadd", PerfCountersBuilder::PRIO_INTERESTING);
pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions",
"srm", PerfCountersBuilder::PRIO_INTERESTING);
pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions",
"strm", PerfCountersBuilder::PRIO_USEFUL);
pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in",
"ecnt", PerfCountersBuilder::PRIO_USEFUL);
pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started",
"estt", PerfCountersBuilder::PRIO_INTERESTING);
pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won",
"ewon", PerfCountersBuilder::PRIO_INTERESTING);
pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost",
"elst", PerfCountersBuilder::PRIO_INTERESTING);
logger = pcb.create_perf_counters();
cct->get_perfcounters_collection()->add(logger);
}

View File

@ -624,6 +624,8 @@ class MonitorDBStore
db->init(g_conf->mon_rocksdb_options);
else
db->init();
}
int open(ostream &out) {
@ -640,6 +642,16 @@ class MonitorDBStore
r = db->open(out);
if (r < 0)
return r;
// Monitors are few in number, so the resource cost of exposing
// very detailed stats is low: ramp up the priority of all the
// KV store's perf counters. Do this after open, because backend may
// not have constructed PerfCounters earlier.
if (db->get_perf_counters()) {
db->get_perf_counters()->set_prio_adjust(
PerfCountersBuilder::PRIO_USEFUL - PerfCountersBuilder::PRIO_DEBUGONLY);
}
io_work.start();
is_open = true;
return 0;

View File

@ -86,6 +86,11 @@ void Paxos::init()
void Paxos::init_logger()
{
PerfCountersBuilder pcb(g_ceph_context, "paxos", l_paxos_first, l_paxos_last);
// Because monitors are so few in number, the resource cost of capturing
// almost all their perf counters at USEFUL is trivial.
pcb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
pcb.add_u64_counter(l_paxos_start_leader, "start_leader", "Starts in leader role");
pcb.add_u64_counter(l_paxos_start_peon, "start_peon", "Starts in peon role");
pcb.add_u64_counter(l_paxos_restart, "restart", "Restarts");

View File

@ -2942,6 +2942,9 @@ void OSD::create_logger()
};
// All the basic OSD operation stats are to be considered useful
osd_plb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
osd_plb.add_u64(
l_osd_op_wip, "op_wip",
"Replication operations currently being processed (primary)");
@ -3029,6 +3032,10 @@ void OSD::create_logger()
l_osd_op_rw_prepare_lat, "op_rw_prepare_latency",
"Latency of read-modify-write operations (excluding queue time and wait for finished)");
// Now we move on to some more obscure stats, revert to assuming things
// are low priority unless otherwise specified.
osd_plb.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY);
osd_plb.add_time_avg(l_osd_op_before_queue_op_lat, "op_before_queue_op_lat",
"Latency of IO before calling queue(before really queue into ShardedOpWq)"); // client io before queue op_wq latency
osd_plb.add_time_avg(l_osd_op_before_dequeue_op_lat, "op_before_dequeue_op_lat",

View File

@ -0,0 +1 @@
from module import * # NOQA

View File

@ -0,0 +1,158 @@
from datetime import datetime
from threading import Event
import json
import errno
from mgr_module import MgrModule
from mgr_module import PERFCOUNTER_HISTOGRAM
try:
from influxdb import InfluxDBClient
except ImportError:
InfluxDBClient = None
class Module(MgrModule):
COMMANDS = [
{
"cmd": "influx self-test",
"desc": "debug the module",
"perm": "rw"
},
]
def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)
self.event = Event()
self.run = True
def get_latest(self, daemon_type, daemon_name, stat):
data = self.get_counter(daemon_type, daemon_name, stat)[stat]
if data:
return data[-1][1]
else:
return 0
def get_df_stats(self):
df = self.get("df")
data = []
df_types = [
'bytes_used',
'dirty',
'rd_bytes',
'raw_bytes_used',
'wr_bytes',
'objects',
'max_avail'
]
for df_type in df_types:
for pool in df['pools']:
point = {
"measurement": "ceph_pool_stats",
"tags": {
"pool_name" : pool['name'],
"pool_id" : pool['id'],
"type_instance" : df_type,
"mgr_id" : self.get_mgr_id(),
},
"time" : datetime.utcnow().isoformat() + 'Z',
"fields": {
"value" : pool['stats'][df_type],
}
}
data.append(point)
return data
def get_daemon_stats(self):
data = []
for daemon, counters in self.get_all_perf_counters().iteritems():
svc_type, svc_id = daemon.split(".")
metadata = self.get_metadata(svc_type, svc_id)
for path, counter_info in counters.items():
if counter_info['type'] & PERFCOUNTER_HISTOGRAM:
continue
value = counter_info['value']
data.append({
"measurement": "ceph_osd_stats",
"tags": {
"ceph_daemon": daemon,
"type_instance": path,
"host": metadata['hostname']
},
"time": datetime.utcnow().isoformat() + 'Z',
"fields": {
"value": value
}
})
return data
def send_to_influx(self):
host = self.get_config("hostname")
if not host:
self.log.error("No InfluxDB server configured, please set"
"`hostname` configuration key.")
return
port = int(self.get_config("port", default="8086"))
database = self.get_config("database", default="ceph")
# If influx server has authentication turned off then
# missing username/password is valid.
username = self.get_config("username", default="")
password = self.get_config("password", default="")
client = InfluxDBClient(host, port, username, password, database)
databases_avail = client.get_list_database()
if database not in databases_avail:
self.log.info("Creating database '{0}'".format(database))
client.create_database(database)
client.write_points(self.get_df_stats(), 'ms')
client.write_points(self.get_daemon_stats(), 'ms')
def shutdown(self):
self.log.info('Stopping influx module')
self.run = False
self.event.set()
def handle_command(self, cmd):
if cmd['prefix'] == 'influx self-test':
daemon_stats = self.get_daemon_stats()
assert len(daemon_stats)
df_stats = self.get_df_stats()
result = {
'daemon_stats': daemon_stats,
'df_stats': df_stats
}
return 0, json.dumps(result, indent=2), 'Self-test OK'
else:
return (-errno.EINVAL, '',
"Command not found '{0}'".format(cmd['prefix']))
def serve(self):
if InfluxDBClient is None:
self.log.error("Cannot transmit statistics: influxdb python "
"module not found. Did you install it?")
return
self.log.info('Starting influx module')
self.run = True
while self.run:
self.send_to_influx()
self.log.debug("Running interval loop")
interval = self.get_config("interval")
if interval is None:
interval = 5
self.log.debug("sleeping for %d seconds",interval)
self.event.wait(interval)

View File

@ -6,6 +6,25 @@ import ceph_crushmap #noqa
import json
import logging
import threading
from collections import defaultdict
# Priority definitions for perf counters
PRIO_CRITICAL = 10
PRIO_INTERESTING = 8
PRIO_USEFUL = 5
PRIO_UNINTERESTING = 2
PRIO_DEBUGONLY = 0
# counter value types
PERFCOUNTER_TIME = 1
PERFCOUNTER_U64 = 2
# counter types
PERFCOUNTER_LONGRUNAVG = 4
PERFCOUNTER_COUNTER = 8
PERFCOUNTER_HISTOGRAM = 0x10
PERFCOUNTER_TYPE_MASK = ~2
class CommandResult(object):
@ -145,6 +164,19 @@ class MgrModule(object):
self._version = ceph_state.get_version()
self._perf_schema_cache = None
def update_perf_schema(self, daemon_type, daemon_name):
"""
For plugins that use get_all_perf_counters, call this when
receiving a notification of type 'perf_schema_update', to
prompt MgrModule to update its cache of counter schemas.
:param daemon_type:
:param daemon_name:
:return:
"""
@property
def log(self):
return self._logger
@ -395,3 +427,60 @@ class MgrModule(object):
:return: OSDMap
"""
return OSDMap(ceph_state.get_osdmap())
def get_all_perf_counters(self, prio_limit=PRIO_USEFUL):
"""
Return the perf counters currently known to this ceph-mgr
instance, filtered by priority equal to or greater than `prio_limit`.
The result us a map of string to dict, associating services
(like "osd.123") with their counters. The counter
dict for each service maps counter paths to a counter
info structure, which is the information from
the schema, plus an additional "value" member with the latest
value.
"""
result = defaultdict(dict)
# TODO: improve C++->Python interface to return just
# the latest if that's all we want.
def get_latest(daemon_type, daemon_name, counter):
data = self.get_counter(daemon_type, daemon_name, counter)[counter]
if data:
return data[-1][1]
else:
return 0
for server in self.list_servers():
for service in server['services']:
if service['type'] not in ("mds", "osd", "mon"):
continue
schema = self.get_perf_schema(service['type'], service['id'])
if not schema:
self.log.warn("No perf counter schema for {0}.{1}".format(
service['type'], service['id']
))
continue
# Value is returned in a potentially-multi-service format,
# get just the service we're asking about
svc_full_name = "{0}.{1}".format(service['type'], service['id'])
schema = schema[svc_full_name]
# Populate latest values
for counter_path, counter_schema in schema.items():
# self.log.debug("{0}: {1}".format(
# counter_path, json.dumps(counter_schema)
# ))
if counter_schema['priority'] < prio_limit:
continue
counter_info = counter_schema
counter_info['value'] = get_latest(service['type'], service['id'], counter_path)
result[svc_full_name][counter_path] = counter_info
self.log.debug("returning {0} counter".format(len(result)))
return result

View File

@ -1,7 +1,8 @@
import cherrypy
import json
import errno
import math
import os
import time
from collections import OrderedDict
from mgr_module import MgrModule
@ -59,6 +60,30 @@ def stattype_to_str(stattype):
return ''
def health_status_to_number(status):
if status == 'HEALTH_OK':
return 0
elif status == 'HEALTH_WARN':
return 1
elif status == 'HEALTH_ERR':
return 2
PG_STATES = ['creating', 'active', 'clean', 'down', 'scrubbing', 'degraded',
'inconsistent', 'peering', 'repair', 'recovering', 'forced-recovery',
'backfill', 'forced-backfill', 'wait-backfill', 'backfill-toofull',
'incomplete', 'stale', 'remapped', 'undersized', 'peered']
DF_CLUSTER = ['total_bytes', 'total_used_bytes', 'total_objects']
DF_POOL = ['max_avail', 'bytes_used', 'raw_bytes_used', 'objects', 'dirty',
'quota_bytes', 'quota_objects', 'rd', 'rd_bytes', 'wr', 'wr_bytes']
OSD_METADATA = ('cluster_addr', 'device_class', 'id', 'public_addr', 'weight')
POOL_METADATA = ('pool_id', 'name')
class Metric(object):
def __init__(self, mtype, name, desc, labels=None):
self.mtype = mtype
@ -76,7 +101,7 @@ class Metric(object):
def promethize(path):
''' replace illegal metric name characters '''
result = path.replace('.', '_').replace('+', '_plus')
result = path.replace('.', '_').replace('+', '_plus').replace('::', '_')
# Hyphens usually turn into underscores, unless they are
# trailing
@ -85,7 +110,7 @@ class Metric(object):
else:
result = result.replace("-", "_")
return result
return "ceph_{0}".format(result)
def floatstr(value):
''' represent as Go-compatible float '''
@ -125,98 +150,174 @@ class Metric(object):
class Module(MgrModule):
COMMANDS = [
{
"cmd": "prometheus self-test",
"desc": "Run a self test on the prometheus module",
"perm": "rw"
},
]
def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)
self.notified = False
self.serving = False
self.metrics = dict()
self.metrics = self._setup_static_metrics()
self.schema = OrderedDict()
_global_instance['plugin'] = self
def _get_ordered_schema(self, **kwargs):
def _setup_static_metrics(self):
metrics = {}
metrics['health_status'] = Metric(
'undef',
'health_status',
'Cluster health status'
)
metrics['mon_quorum_count'] = Metric(
'gauge',
'mon_quorum_count',
'Monitors in quorum'
)
metrics['osd_metadata'] = Metric(
'undef',
'osd_metadata',
'OSD Metadata',
OSD_METADATA
)
metrics['pool_metadata'] = Metric(
'undef',
'pool_metadata',
'POOL Metadata',
POOL_METADATA
)
for state in PG_STATES:
path = 'pg_{}'.format(state)
self.log.debug("init: creating {}".format(path))
metrics[path] = Metric(
'gauge',
path,
'PG {}'.format(state),
)
for state in DF_CLUSTER:
path = 'cluster_{}'.format(state)
self.log.debug("init: creating {}".format(path))
metrics[path] = Metric(
'gauge',
path,
'DF {}'.format(state),
)
for state in DF_POOL:
path = 'pool_{}'.format(state)
self.log.debug("init: creating {}".format(path))
metrics[path] = Metric(
'gauge',
path,
'DF pool {}'.format(state),
('pool_id',)
)
'''
fetch an ordered-by-key performance counter schema
['perf_schema'][daemontype.id][countername] with keys
'nick' (if present)
'description'
'type' (counter type....counter/gauge/avg/histogram/etc.)
'''
daemon_type = kwargs.get('daemon_type', '')
daemon_id = kwargs.get('daemon_id', '')
schema = self.get_perf_schema(daemon_type, daemon_id)
if not schema:
self.log.warning('_get_ordered_schema: no data')
return
new_schema = dict()
for k1 in schema.keys(): # 'perf_schema', but assume only one
for k2 in sorted(schema[k1].keys()):
sorted_dict = OrderedDict(
sorted(schema[k1][k2].items(), key=lambda i: i[0])
)
new_schema[k2] = sorted_dict
for k in sorted(new_schema.keys()):
self.log.debug("updating schema for %s" % k)
self.schema[k] = new_schema[k]
return metrics
def shutdown(self):
self.serving = False
pass
# XXX duplicated from dashboard; factor out?
def get_latest(self, daemon_type, daemon_name, stat):
data = self.get_counter(daemon_type, daemon_name, stat)[stat]
if data:
return data[-1][1]
else:
return 0
def get_stat(self, daemon, path):
perfcounter = self.schema[daemon][path]
stattype = stattype_to_str(perfcounter['type'])
# XXX simplify first effort: no histograms
# averages are already collapsed to one value for us
if not stattype or stattype == 'histogram':
self.log.debug('ignoring %s, type %s' % (path, stattype))
return
if path not in self.metrics:
self.metrics[path] = Metric(
stattype,
path,
perfcounter['description'],
('daemon',),
)
daemon_type, daemon_id = daemon.split('.')
self.metrics[path].set(
self.get_latest(daemon_type, daemon_id, path),
(daemon,)
def get_health(self):
health = json.loads(self.get('health')['json'])
self.metrics['health_status'].set(
health_status_to_number(health['status'])
)
def get_df(self):
# maybe get the to-be-exported metrics from a config?
df = self.get('df')
for stat in DF_CLUSTER:
path = 'cluster_{}'.format(stat)
self.metrics[path].set(df['stats'][stat])
for pool in df['pools']:
for stat in DF_POOL:
path = 'pool_{}'.format(stat)
self.metrics[path].set(pool['stats'][stat], (pool['id'],))
def get_quorum_status(self):
mon_status = json.loads(self.get('mon_status')['json'])
self.metrics['mon_quorum_count'].set(len(mon_status['quorum']))
def get_pg_status(self):
# TODO add per pool status?
pg_s = self.get('pg_summary')['all']
reported_pg_s = [(s,v) for key, v in pg_s.items() for s in
key.split('+')]
for state, value in reported_pg_s:
path = 'pg_{}'.format(state)
self.metrics[path].set(value)
reported_states = [s[0] for s in reported_pg_s]
for state in PG_STATES:
path = 'pg_{}'.format(state)
if state not in reported_states:
self.metrics[path].set(0)
def get_metadata(self):
osd_map = self.get('osd_map')
osd_dev = self.get('osd_map_crush')['devices']
for osd in osd_map['osds']:
id_ = osd['osd']
p_addr = osd['public_addr']
c_addr = osd['cluster_addr']
w = osd['weight']
dev_class = next((osd for osd in osd_dev if osd['id'] == id_))
self.metrics['osd_metadata'].set(0, (
c_addr,
dev_class['class'],
id_,
p_addr,
w
))
for pool in osd_map['pools']:
id_ = pool['pool']
name = pool['pool_name']
self.metrics['pool_metadata'].set(0, (id_, name))
def collect(self):
for daemon in self.schema.keys():
for path in self.schema[daemon].keys():
self.get_stat(daemon, path)
self.get_health()
self.get_df()
self.get_quorum_status()
self.get_metadata()
self.get_pg_status()
for daemon, counters in self.get_all_perf_counters().iteritems():
for path, counter_info in counters.items():
stattype = stattype_to_str(counter_info['type'])
# XXX simplify first effort: no histograms
# averages are already collapsed to one value for us
if not stattype or stattype == 'histogram':
self.log.debug('ignoring %s, type %s' % (path, stattype))
continue
if path not in self.metrics:
self.metrics[path] = Metric(
stattype,
path,
counter_info['description'],
("ceph_daemon",),
)
self.metrics[path].set(
counter_info['value'],
(daemon,)
)
return self.metrics
def notify(self, ntype, nid):
''' Just try to sync and not run until we're notified once '''
if not self.notified:
self.serving = True
self.notified = True
if ntype == 'perf_schema_update':
daemon_type, daemon_id = nid.split('.')
self._get_ordered_schema(
daemon_type=daemon_type,
daemon_id=daemon_id
)
def handle_command(self, cmd):
if cmd['prefix'] == 'prometheus self-test':
self.collect()
return 0, '', 'Self-test OK'
else:
return (-errno.EINVAL, '',
"Command not found '{0}'".format(cmd['prefix']))
def serve(self):
@ -235,6 +336,17 @@ class Module(MgrModule):
@cherrypy.expose
def index(self):
return '''<!DOCTYPE html>
<html>
<head><title>Ceph Exporter</title></head>
<body>
<h1>Ceph Exporter</h1>
<p><a href='/metrics'>Metrics</a></p>
</body>
</html>'''
@cherrypy.expose
def metrics(self):
metrics = global_instance().collect()
cherrypy.response.headers['Content-Type'] = 'text/plain'
if metrics:
@ -246,9 +358,6 @@ class Module(MgrModule):
"server_addr: %s server_port: %s" %
(server_addr, server_port)
)
# wait for first notification (of any kind) to start up
while not self.serving:
time.sleep(1)
cherrypy.config.update({
'server.socket_host': server_addr,

View File

@ -182,7 +182,7 @@ TEST(PerfCounters, MultiplePerfCounters) {
ASSERT_EQ(sd("{\"test_perfcounter_1\":{\"element1\":13,\"element2\":0.000000000,"
"\"element3\":{\"avgcount\":0,\"sum\":0.000000000,\"avgtime\":0.000000000}}}"), msg);
ASSERT_EQ("", client.do_request("{ \"prefix\": \"perf schema\", \"format\": \"json\" }", &msg));
ASSERT_EQ(sd("{\"test_perfcounter_1\":{\"element1\":{\"type\":2,\"metric_type\":\"gauge\",\"value_type\":\"integer\",\"description\":\"\",\"nick\":\"\"},\"element2\":{\"type\":1,\"metric_type\":\"gauge\",\"value_type\":\"real\",\"description\":\"\",\"nick\":\"\"},\"element3\":{\"type\":5,\"metric_type\":\"gauge\",\"value_type\":\"real-integer-pair\",\"description\":\"\",\"nick\":\"\"}}}"), msg);
ASSERT_EQ(sd("{\"test_perfcounter_1\":{\"element1\":{\"type\":2,\"metric_type\":\"gauge\",\"value_type\":\"integer\",\"description\":\"\",\"nick\":\"\",\"priority\":0},\"element2\":{\"type\":1,\"metric_type\":\"gauge\",\"value_type\":\"real\",\"description\":\"\",\"nick\":\"\",\"priority\":0},\"element3\":{\"type\":5,\"metric_type\":\"gauge\",\"value_type\":\"real-integer-pair\",\"description\":\"\",\"nick\":\"\",\"priority\":0}}}"), msg);
coll->clear();
ASSERT_EQ("", client.do_request("{ \"prefix\": \"perf dump\", \"format\": \"json\" }", &msg));
ASSERT_EQ("{}", msg);