mirror of
https://github.com/ceph/ceph
synced 2025-01-01 16:42:29 +00:00
mgr: clean up DaemonStateIndex locking
Various things here were dangerously operating outside locks. Additionally switch to a RWLock because this lock will be relatively read-hot when it's taken every time a MMgrReport is handled, to look up the DaemonState for the sender. Fixes: http://tracker.ceph.com/issues/21158 Signed-off-by: John Spray <john.spray@redhat.com>
This commit is contained in:
parent
28c8e8953c
commit
806f10847c
@ -333,7 +333,7 @@ bool DaemonServer::handle_open(MMgrOpen *m)
|
||||
if (daemon) {
|
||||
dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl;
|
||||
Mutex::Locker l(daemon->lock);
|
||||
daemon_state.get(key)->perf_counters.clear();
|
||||
daemon->perf_counters.clear();
|
||||
}
|
||||
|
||||
if (m->service_daemon) {
|
||||
|
@ -20,7 +20,7 @@
|
||||
|
||||
void DaemonStateIndex::insert(DaemonStatePtr dm)
|
||||
{
|
||||
Mutex::Locker l(lock);
|
||||
RWLock::WLocker l(lock);
|
||||
|
||||
if (all.count(dm->key)) {
|
||||
_erase(dm->key);
|
||||
@ -32,7 +32,7 @@ void DaemonStateIndex::insert(DaemonStatePtr dm)
|
||||
|
||||
void DaemonStateIndex::_erase(const DaemonKey& dmk)
|
||||
{
|
||||
assert(lock.is_locked_by_me());
|
||||
assert(lock.is_wlocked());
|
||||
|
||||
const auto to_erase = all.find(dmk);
|
||||
assert(to_erase != all.end());
|
||||
@ -49,7 +49,7 @@ void DaemonStateIndex::_erase(const DaemonKey& dmk)
|
||||
DaemonStateCollection DaemonStateIndex::get_by_service(
|
||||
const std::string& svc) const
|
||||
{
|
||||
Mutex::Locker l(lock);
|
||||
RWLock::RLocker l(lock);
|
||||
|
||||
DaemonStateCollection result;
|
||||
|
||||
@ -65,7 +65,7 @@ DaemonStateCollection DaemonStateIndex::get_by_service(
|
||||
DaemonStateCollection DaemonStateIndex::get_by_server(
|
||||
const std::string &hostname) const
|
||||
{
|
||||
Mutex::Locker l(lock);
|
||||
RWLock::RLocker l(lock);
|
||||
|
||||
if (by_server.count(hostname)) {
|
||||
return by_server.at(hostname);
|
||||
@ -76,14 +76,14 @@ DaemonStateCollection DaemonStateIndex::get_by_server(
|
||||
|
||||
bool DaemonStateIndex::exists(const DaemonKey &key) const
|
||||
{
|
||||
Mutex::Locker l(lock);
|
||||
RWLock::RLocker l(lock);
|
||||
|
||||
return all.count(key) > 0;
|
||||
}
|
||||
|
||||
DaemonStatePtr DaemonStateIndex::get(const DaemonKey &key)
|
||||
{
|
||||
Mutex::Locker l(lock);
|
||||
RWLock::RLocker l(lock);
|
||||
|
||||
auto iter = all.find(key);
|
||||
if (iter != all.end()) {
|
||||
@ -98,7 +98,7 @@ void DaemonStateIndex::cull(const std::string& svc_name,
|
||||
{
|
||||
std::vector<string> victims;
|
||||
|
||||
Mutex::Locker l(lock);
|
||||
RWLock::WLocker l(lock);
|
||||
auto begin = all.lower_bound({svc_name, ""});
|
||||
auto end = all.end();
|
||||
for (auto &i = begin; i != end; ++i) {
|
||||
|
@ -20,7 +20,7 @@
|
||||
#include <set>
|
||||
#include <boost/circular_buffer.hpp>
|
||||
|
||||
#include "common/Mutex.h"
|
||||
#include "common/RWLock.h"
|
||||
|
||||
#include "msg/msg_types.h"
|
||||
|
||||
@ -133,38 +133,52 @@ typedef std::map<DaemonKey, DaemonStatePtr> DaemonStateCollection;
|
||||
class DaemonStateIndex
|
||||
{
|
||||
private:
|
||||
mutable RWLock lock = {"DaemonStateIndex", true, true, true};
|
||||
|
||||
std::map<std::string, DaemonStateCollection> by_server;
|
||||
DaemonStateCollection all;
|
||||
|
||||
std::set<DaemonKey> updating;
|
||||
|
||||
mutable Mutex lock;
|
||||
void _erase(const DaemonKey& dmk);
|
||||
|
||||
public:
|
||||
|
||||
DaemonStateIndex() : lock("DaemonState") {}
|
||||
DaemonStateIndex() {}
|
||||
|
||||
// FIXME: shouldn't really be public, maybe construct DaemonState
|
||||
// objects internally to avoid this.
|
||||
PerfCounterTypes types;
|
||||
|
||||
void insert(DaemonStatePtr dm);
|
||||
void _erase(const DaemonKey& dmk);
|
||||
|
||||
bool exists(const DaemonKey &key) const;
|
||||
DaemonStatePtr get(const DaemonKey &key);
|
||||
|
||||
// Note that these return by value rather than reference to avoid
|
||||
// callers needing to stay in lock while using result. Callers must
|
||||
// still take the individual DaemonState::lock on each entry though.
|
||||
DaemonStateCollection get_by_server(const std::string &hostname) const;
|
||||
DaemonStateCollection get_by_service(const std::string &svc_name) const;
|
||||
DaemonStateCollection get_all() const {return all;}
|
||||
|
||||
const DaemonStateCollection &get_all() const {return all;}
|
||||
const std::map<std::string, DaemonStateCollection> &get_all_servers() const
|
||||
{
|
||||
return by_server;
|
||||
template<typename Callback, typename...Args>
|
||||
auto with_daemons_by_server(Callback&& cb, Args&&... args) const ->
|
||||
decltype(cb(by_server, std::forward<Args>(args)...)) {
|
||||
RWLock::RLocker l(lock);
|
||||
|
||||
return std::forward<Callback>(cb)(by_server, std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
void notify_updating(const DaemonKey &k) { updating.insert(k); }
|
||||
void clear_updating(const DaemonKey &k) { updating.erase(k); }
|
||||
bool is_updating(const DaemonKey &k) { return updating.count(k) > 0; }
|
||||
void notify_updating(const DaemonKey &k) {
|
||||
RWLock::WLocker l(lock);
|
||||
updating.insert(k);
|
||||
}
|
||||
void clear_updating(const DaemonKey &k) {
|
||||
RWLock::WLocker l(lock);
|
||||
updating.erase(k);
|
||||
}
|
||||
bool is_updating(const DaemonKey &k) {
|
||||
RWLock::RLocker l(lock);
|
||||
return updating.count(k) > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove state for all daemons of this type whose names are
|
||||
|
@ -105,14 +105,16 @@ PyObject *PyModules::list_servers_python()
|
||||
dout(10) << " >" << dendl;
|
||||
|
||||
PyFormatter f(false, true);
|
||||
const auto &all = daemon_state.get_all_servers();
|
||||
for (const auto &i : all) {
|
||||
const auto &hostname = i.first;
|
||||
daemon_state.with_daemons_by_server([this, &f]
|
||||
(const std::map<std::string, DaemonStateCollection> &all) {
|
||||
for (const auto &i : all) {
|
||||
const auto &hostname = i.first;
|
||||
|
||||
f.open_object_section("server");
|
||||
dump_server(hostname, i.second, &f);
|
||||
f.close_section();
|
||||
}
|
||||
f.open_object_section("server");
|
||||
dump_server(hostname, i.second, &f);
|
||||
f.close_section();
|
||||
}
|
||||
});
|
||||
|
||||
return f.get();
|
||||
}
|
||||
@ -731,35 +733,34 @@ PyObject* PyModules::get_perf_schema_python(
|
||||
Mutex::Locker l(lock);
|
||||
PyEval_RestoreThread(tstate);
|
||||
|
||||
DaemonStateCollection states;
|
||||
DaemonStateCollection daemons;
|
||||
|
||||
if (svc_type == "") {
|
||||
states = daemon_state.get_all();
|
||||
daemons = std::move(daemon_state.get_all());
|
||||
} else if (svc_id.empty()) {
|
||||
states = daemon_state.get_by_service(svc_type);
|
||||
daemons = std::move(daemon_state.get_by_service(svc_type));
|
||||
} else {
|
||||
auto key = DaemonKey(svc_type, svc_id);
|
||||
// so that the below can be a loop in all cases
|
||||
auto got = daemon_state.get(key);
|
||||
if (got != nullptr) {
|
||||
states[key] = got;
|
||||
daemons[key] = got;
|
||||
}
|
||||
}
|
||||
|
||||
PyFormatter f;
|
||||
f.open_object_section("perf_schema");
|
||||
|
||||
// FIXME: this is unsafe, I need to either be inside DaemonStateIndex's
|
||||
// lock or put a lock on individual DaemonStates
|
||||
if (!states.empty()) {
|
||||
for (auto statepair : states) {
|
||||
std::ostringstream daemon_name;
|
||||
if (!daemons.empty()) {
|
||||
for (auto statepair : daemons) {
|
||||
auto key = statepair.first;
|
||||
auto state = statepair.second;
|
||||
Mutex::Locker l(state->lock);
|
||||
|
||||
std::ostringstream daemon_name;
|
||||
daemon_name << key.first << "." << key.second;
|
||||
f.open_object_section(daemon_name.str().c_str());
|
||||
|
||||
Mutex::Locker l(state->lock);
|
||||
for (auto typestr : state->perf_counters.declared_types) {
|
||||
f.open_object_section(typestr.c_str());
|
||||
auto type = state->perf_counters.types[typestr];
|
||||
|
Loading…
Reference in New Issue
Block a user