mirror of
https://github.com/ceph/ceph
synced 2025-02-19 00:47:49 +00:00
mgr: forward RADOS client instances for potential blacklist
The mgr creates a per-module RADOS client connection for modules which interact with RADOS (e.g. the volumes module). These clients should also be blacklisted when the active mgr is failed; we don't want the former active mgr to continue interacting with RADOS when the new one takes over. This is particularly impactful for avoiding extraneous "unresponsive client" warnings from the MDS when the mgr switches (especially in testing). The MDS will pickup the new OSD blacklists which include's the old mgr's libcephfs instance and blacklist/evict that session quietly. Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
This commit is contained in:
parent
f2986a4400
commit
df507cde8d
@ -3666,6 +3666,15 @@ CEPH_RADOS_API int rados_blacklist_add(rados_t cluster,
|
||||
char *client_address,
|
||||
uint32_t expire_seconds);
|
||||
|
||||
/**
|
||||
* Gets addresses of the RADOS session, suitable for blacklisting.
|
||||
*
|
||||
* @param cluster cluster handle
|
||||
* @param addrs the output string.
|
||||
* @returns 0 on success, negative error code on failure
|
||||
*/
|
||||
CEPH_RADOS_API int rados_getaddrs(rados_t cluster, char** addrs);
|
||||
|
||||
CEPH_RADOS_API void rados_set_osdmap_full_try(rados_ioctx_t io)
|
||||
__attribute__((deprecated));
|
||||
|
||||
|
@ -228,6 +228,13 @@ public:
|
||||
return umask;
|
||||
}
|
||||
|
||||
std::string getaddrs()
|
||||
{
|
||||
CachedStackStringStream cos;
|
||||
*cos << messenger->get_myaddrs();
|
||||
return std::string(cos->strv());
|
||||
}
|
||||
|
||||
int conf_read_file(const char *path_list)
|
||||
{
|
||||
int ret = cct->_conf.parse_config_files(path_list, nullptr, 0);
|
||||
@ -427,6 +434,15 @@ extern "C" uint64_t ceph_get_instance_id(struct ceph_mount_info *cmount)
|
||||
return 0;
|
||||
}
|
||||
|
||||
extern "C" int ceph_getaddrs(struct ceph_mount_info *cmount, char** addrs)
|
||||
{
|
||||
if (!cmount->is_initialized())
|
||||
return -ENOTCONN;
|
||||
auto s = cmount->getaddrs();
|
||||
*addrs = strdup(s.c_str());
|
||||
return 0;
|
||||
}
|
||||
|
||||
extern "C" int ceph_conf_read_file(struct ceph_mount_info *cmount, const char *path)
|
||||
{
|
||||
return cmount->conf_read_file(path);
|
||||
|
@ -796,6 +796,12 @@ void librados::RadosClient::blacklist_self(bool set) {
|
||||
objecter->blacklist_self(set);
|
||||
}
|
||||
|
||||
std::string librados::RadosClient::get_addrs() const {
|
||||
CachedStackStringStream cos;
|
||||
*cos << messenger->get_myaddrs();
|
||||
return std::string(cos->strv());
|
||||
}
|
||||
|
||||
int librados::RadosClient::blacklist_add(const string& client_address,
|
||||
uint32_t expire_seconds)
|
||||
{
|
||||
|
@ -167,6 +167,8 @@ public:
|
||||
bool put();
|
||||
void blacklist_self(bool set);
|
||||
|
||||
std::string get_addrs() const;
|
||||
|
||||
int service_daemon_register(
|
||||
const std::string& service, ///< service name (e.g., 'rgw')
|
||||
const std::string& name, ///< daemon name (e.g., 'gwfoo')
|
||||
|
@ -442,6 +442,15 @@ extern "C" int _rados_blacklist_add(rados_t cluster, char *client_address,
|
||||
}
|
||||
LIBRADOS_C_API_BASE_DEFAULT(rados_blacklist_add);
|
||||
|
||||
extern "C" int _rados_getaddrs(rados_t cluster, char** addrs)
|
||||
{
|
||||
librados::RadosClient *radosp = (librados::RadosClient *)cluster;
|
||||
auto s = radosp->get_addrs();
|
||||
*addrs = strdup(s.c_str());
|
||||
return 0;
|
||||
}
|
||||
LIBRADOS_C_API_BASE_DEFAULT(rados_getaddrs);
|
||||
|
||||
extern "C" void _rados_set_osdmap_full_try(rados_ioctx_t io)
|
||||
{
|
||||
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
|
||||
|
@ -24,7 +24,7 @@
|
||||
|
||||
class MMgrBeacon : public PaxosServiceMessage {
|
||||
private:
|
||||
static constexpr int HEAD_VERSION = 9;
|
||||
static constexpr int HEAD_VERSION = 10;
|
||||
static constexpr int COMPAT_VERSION = 8;
|
||||
|
||||
protected:
|
||||
@ -45,6 +45,8 @@ protected:
|
||||
|
||||
map<string,string> metadata; ///< misc metadata about this osd
|
||||
|
||||
std::vector<entity_addrvec_t> clients;
|
||||
|
||||
uint64_t mgr_features = 0; ///< reporting mgr's features
|
||||
|
||||
public:
|
||||
@ -57,10 +59,12 @@ public:
|
||||
entity_addrvec_t server_addrs_, bool available_,
|
||||
std::vector<MgrMap::ModuleInfo>&& modules_,
|
||||
map<string,string>&& metadata_,
|
||||
std::vector<entity_addrvec_t> clients,
|
||||
uint64_t feat)
|
||||
: PaxosServiceMessage{MSG_MGR_BEACON, 0, HEAD_VERSION, COMPAT_VERSION},
|
||||
gid(gid_), server_addrs(server_addrs_), available(available_), name(name_),
|
||||
fsid(fsid_), modules(std::move(modules_)), metadata(std::move(metadata_)),
|
||||
clients(std::move(clients)),
|
||||
mgr_features(feat)
|
||||
{
|
||||
}
|
||||
@ -98,6 +102,11 @@ public:
|
||||
return modules;
|
||||
}
|
||||
|
||||
const auto& get_clients() const
|
||||
{
|
||||
return clients;
|
||||
}
|
||||
|
||||
private:
|
||||
~MMgrBeacon() override {}
|
||||
|
||||
@ -143,6 +152,7 @@ public:
|
||||
|
||||
encode(modules, payload);
|
||||
encode(mgr_features, payload);
|
||||
encode(clients, payload, features);
|
||||
}
|
||||
void decode_payload() override {
|
||||
auto p = payload.cbegin();
|
||||
@ -182,6 +192,9 @@ public:
|
||||
if (header.version >= 9) {
|
||||
decode(mgr_features, p);
|
||||
}
|
||||
if (header.version >= 10) {
|
||||
decode(clients, p);
|
||||
}
|
||||
}
|
||||
private:
|
||||
template<class T, typename... Args>
|
||||
|
@ -1080,3 +1080,25 @@ void ActivePyModules::cluster_log(const std::string &channel, clog_type prio,
|
||||
clog->do_log(prio, message);
|
||||
}
|
||||
}
|
||||
|
||||
void ActivePyModules::register_client(std::string_view name, std::string addrs)
|
||||
{
|
||||
std::lock_guard l(lock);
|
||||
|
||||
entity_addrvec_t addrv;
|
||||
addrv.parse(addrs.data());
|
||||
|
||||
dout(7) << "registering msgr client handle " << addrv << dendl;
|
||||
py_module_registry.register_client(name, std::move(addrv));
|
||||
}
|
||||
|
||||
void ActivePyModules::unregister_client(std::string_view name, std::string addrs)
|
||||
{
|
||||
std::lock_guard l(lock);
|
||||
|
||||
entity_addrvec_t addrv;
|
||||
addrv.parse(addrs.data());
|
||||
|
||||
dout(7) << "unregistering msgr client handle " << addrv << dendl;
|
||||
py_module_registry.unregister_client(name, addrv);
|
||||
}
|
||||
|
@ -134,6 +134,9 @@ public:
|
||||
void clear_all_progress_events();
|
||||
void get_progress_events(std::map<std::string,ProgressEvent>* events);
|
||||
|
||||
void register_client(std::string_view name, std::string addrs);
|
||||
void unregister_client(std::string_view name, std::string addrs);
|
||||
|
||||
void config_notify();
|
||||
|
||||
void set_uri(const std::string& module_name, const std::string &uri);
|
||||
|
@ -1050,6 +1050,32 @@ ceph_is_authorized(BaseMgrModule *self, PyObject *args)
|
||||
Py_RETURN_FALSE;
|
||||
}
|
||||
|
||||
static PyObject*
|
||||
ceph_register_client(BaseMgrModule *self, PyObject *args)
|
||||
{
|
||||
char *addrs = nullptr;
|
||||
if (!PyArg_ParseTuple(args, "s:ceph_register_client", &addrs)) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
self->py_modules->register_client(self->this_module->get_name(), addrs);
|
||||
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
static PyObject*
|
||||
ceph_unregister_client(BaseMgrModule *self, PyObject *args)
|
||||
{
|
||||
char *addrs = nullptr;
|
||||
if (!PyArg_ParseTuple(args, "s:ceph_unregister_client", &addrs)) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
self->py_modules->unregister_client(self->this_module->get_name(), addrs);
|
||||
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
PyMethodDef BaseMgrModule_methods[] = {
|
||||
{"_ceph_get", (PyCFunction)ceph_state_get, METH_VARARGS,
|
||||
"Get a cluster object"},
|
||||
@ -1146,6 +1172,12 @@ PyMethodDef BaseMgrModule_methods[] = {
|
||||
{"_ceph_is_authorized", (PyCFunction)ceph_is_authorized,
|
||||
METH_VARARGS, "Verify the current session caps are valid"},
|
||||
|
||||
{"_ceph_register_client", (PyCFunction)ceph_register_client,
|
||||
METH_VARARGS, "Register RADOS instance for potential blacklisting"},
|
||||
|
||||
{"_ceph_unregister_client", (PyCFunction)ceph_unregister_client,
|
||||
METH_VARARGS, "Unregister RADOS instance for potential blacklisting"},
|
||||
|
||||
{NULL, NULL, 0, NULL}
|
||||
};
|
||||
|
||||
|
@ -206,6 +206,11 @@ void MgrStandby::send_beacon()
|
||||
module_info.push_back(std::move(info));
|
||||
}
|
||||
|
||||
auto clients = py_module_registry.get_clients();
|
||||
for (const auto& client : clients) {
|
||||
dout(15) << "noting RADOS client for blacklist: " << client << dendl;
|
||||
}
|
||||
|
||||
// Whether I think I am available (request MgrMonitor to set me
|
||||
// as available in the map)
|
||||
bool available = active_mgr != nullptr && active_mgr->is_initialized();
|
||||
@ -225,6 +230,7 @@ void MgrStandby::send_beacon()
|
||||
available,
|
||||
std::move(module_info),
|
||||
std::move(metadata),
|
||||
std::move(clients),
|
||||
CEPH_FEATURES_ALL);
|
||||
|
||||
if (available) {
|
||||
|
@ -44,6 +44,7 @@ private:
|
||||
LogChannelRef clog;
|
||||
|
||||
std::map<std::string, PyModuleRef> modules;
|
||||
std::multimap<std::string, entity_addrvec_t> clients;
|
||||
|
||||
std::unique_ptr<ActivePyModules> active_modules;
|
||||
std::unique_ptr<StandbyPyModules> standby_modules;
|
||||
@ -182,5 +183,30 @@ public:
|
||||
ceph_assert(active_modules);
|
||||
return active_modules->get_services();
|
||||
}
|
||||
|
||||
void register_client(std::string_view name, entity_addrvec_t addrs)
|
||||
{
|
||||
clients.emplace(std::string(name), std::move(addrs));
|
||||
}
|
||||
void unregister_client(std::string_view name, const entity_addrvec_t& addrs)
|
||||
{
|
||||
auto itp = clients.equal_range(std::string(name));
|
||||
for (auto it = itp.first; it != itp.second; ++it) {
|
||||
if (it->second == addrs) {
|
||||
it = clients.erase(it);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto get_clients() const
|
||||
{
|
||||
std::scoped_lock l(lock);
|
||||
std::vector<entity_addrvec_t> v;
|
||||
for (const auto& p : clients) {
|
||||
v.push_back(p.second);
|
||||
}
|
||||
return v;
|
||||
}
|
||||
|
||||
// <<< (end of ActivePyModules cheeky call-throughs)
|
||||
};
|
||||
|
@ -238,6 +238,8 @@ public:
|
||||
/// features
|
||||
uint64_t active_mgr_features = 0;
|
||||
|
||||
std::vector<entity_addrvec_t> clients; // for blacklist
|
||||
|
||||
std::map<uint64_t, StandbyInfo> standbys;
|
||||
|
||||
// Modules which are enabled
|
||||
@ -381,7 +383,7 @@ public:
|
||||
ENCODE_FINISH(bl);
|
||||
return;
|
||||
}
|
||||
ENCODE_START(10, 6, bl);
|
||||
ENCODE_START(11, 6, bl);
|
||||
encode(epoch, bl);
|
||||
encode(active_addrs, bl, features);
|
||||
encode(active_gid, bl);
|
||||
@ -395,13 +397,14 @@ public:
|
||||
encode(always_on_modules, bl);
|
||||
encode(active_mgr_features, bl);
|
||||
encode(last_failure_osd_epoch, bl);
|
||||
encode(clients, bl, features);
|
||||
ENCODE_FINISH(bl);
|
||||
return;
|
||||
}
|
||||
|
||||
void decode(ceph::buffer::list::const_iterator& p)
|
||||
{
|
||||
DECODE_START(8, p);
|
||||
DECODE_START(11, p);
|
||||
decode(epoch, p);
|
||||
decode(active_addrs, p);
|
||||
decode(active_gid, p);
|
||||
@ -446,6 +449,9 @@ public:
|
||||
if (struct_v >= 10) {
|
||||
decode(last_failure_osd_epoch, p);
|
||||
}
|
||||
if (struct_v >= 11) {
|
||||
decode(clients, p);
|
||||
}
|
||||
DECODE_FINISH(p);
|
||||
}
|
||||
|
||||
@ -498,6 +504,11 @@ public:
|
||||
f->close_section();
|
||||
}
|
||||
f->dump_int("last_failure_osd_epoch", last_failure_osd_epoch);
|
||||
f->open_array_section("active_clients");
|
||||
for (const auto &c : clients) {
|
||||
f->dump_object("client", c);
|
||||
}
|
||||
f->close_section();
|
||||
f->close_section();
|
||||
}
|
||||
|
||||
|
@ -524,6 +524,13 @@ bool MgrMonitor::prepare_beacon(MonOpRequestRef op)
|
||||
pending_map.available_modules = m->get_available_modules();
|
||||
updated = true;
|
||||
}
|
||||
const auto& clients = m->get_clients();
|
||||
if (pending_map.clients != clients) {
|
||||
dout(4) << "active's RADOS clients " << clients
|
||||
<< " (was " << pending_map.clients << ")" << dendl;
|
||||
pending_map.clients = clients;
|
||||
updated = true;
|
||||
}
|
||||
} else if (pending_map.active_gid == 0) {
|
||||
// There is no currently active daemon, select this one.
|
||||
if (pending_map.standbys.count(m->get_gid())) {
|
||||
@ -834,6 +841,11 @@ void MgrMonitor::drop_active()
|
||||
<< pending_map.active_gid << " ("
|
||||
<< pending_map.active_addrs << ")" << dendl;
|
||||
auto blacklist_epoch = mon->osdmon()->blacklist(pending_map.active_addrs, until);
|
||||
|
||||
/* blacklist RADOS clients in use by the mgr */
|
||||
for (const auto& a : pending_map.clients) {
|
||||
mon->osdmon()->blacklist(a, until);
|
||||
}
|
||||
request_proposal(mon->osdmon());
|
||||
|
||||
pending_metadata_rm.insert(pending_map.active_name);
|
||||
@ -845,6 +857,7 @@ void MgrMonitor::drop_active()
|
||||
pending_map.available = false;
|
||||
pending_map.active_addrs = entity_addrvec_t();
|
||||
pending_map.services.clear();
|
||||
pending_map.clients.clear();
|
||||
pending_map.last_failure_osd_epoch = blacklist_epoch;
|
||||
|
||||
// So that when new active mgr subscribes to mgrdigest, it will
|
||||
|
@ -130,6 +130,7 @@ cdef extern from "cephfs/libcephfs.h" nogil:
|
||||
int ceph_init(ceph_mount_info *cmount)
|
||||
void ceph_shutdown(ceph_mount_info *cmount)
|
||||
|
||||
int ceph_getaddrs(ceph_mount_info* cmount, char** addrs)
|
||||
int ceph_conf_read_file(ceph_mount_info *cmount, const char *path_list)
|
||||
int ceph_conf_parse_argv(ceph_mount_info *cmount, int argc, const char **argv)
|
||||
int ceph_conf_get(ceph_mount_info *cmount, const char *option, char *buf, size_t len)
|
||||
@ -503,6 +504,27 @@ cdef class LibCephFS(object):
|
||||
for key, value in conf.iteritems():
|
||||
self.conf_set(key, value)
|
||||
|
||||
def get_addrs(self):
|
||||
"""
|
||||
Get associated client addresses with this RADOS session.
|
||||
"""
|
||||
self.require_state("mounted")
|
||||
|
||||
cdef:
|
||||
char* addrs = NULL
|
||||
|
||||
try:
|
||||
|
||||
with nogil:
|
||||
ret = ceph_getaddrs(self.cluster, &addrs)
|
||||
if ret:
|
||||
raise make_ex(ret, "error calling getaddrs")
|
||||
|
||||
return decode_cstr(addrs)
|
||||
finally:
|
||||
free(addrs)
|
||||
|
||||
|
||||
def conf_read_file(self, conffile=None):
|
||||
"""
|
||||
Load the ceph configuration from the specified config file.
|
||||
|
@ -775,7 +775,9 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
|
||||
:return: None
|
||||
"""
|
||||
if self._rados:
|
||||
addrs = self._rados.get_addrs()
|
||||
self._rados.shutdown()
|
||||
self._ceph_unregister_client(addrs)
|
||||
|
||||
def get(self, data_name):
|
||||
"""
|
||||
@ -1376,7 +1378,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
|
||||
ctx_capsule = self.get_context()
|
||||
self._rados = rados.Rados(context=ctx_capsule)
|
||||
self._rados.connect()
|
||||
|
||||
self._ceph_register_client(self._rados.get_addrs())
|
||||
return self._rados
|
||||
|
||||
@staticmethod
|
||||
|
@ -82,11 +82,14 @@ class ConnectionPool(object):
|
||||
log.debug("CephFS mounting...")
|
||||
self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
|
||||
log.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
|
||||
self.mgr._ceph_register_client(self.fs.get_addrs())
|
||||
|
||||
def disconnect(self):
|
||||
assert self.ops_in_progress == 0
|
||||
log.info("disconnecting from cephfs '{0}'".format(self.fs_name))
|
||||
addrs = self.fs.get_addrs()
|
||||
self.fs.shutdown()
|
||||
self.mgr._ceph_unregister_client(addrs)
|
||||
self.fs = None
|
||||
|
||||
def abort(self):
|
||||
|
@ -152,6 +152,7 @@ cdef extern from "rados/librados.h" nogil:
|
||||
int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result)
|
||||
int rados_cluster_fsid(rados_t cluster, char *buf, size_t len)
|
||||
int rados_blacklist_add(rados_t cluster, char *client_address, uint32_t expire_seconds)
|
||||
int rados_getaddrs(rados_t cluster, char** addrs)
|
||||
int rados_application_enable(rados_ioctx_t io, const char *app_name,
|
||||
int force)
|
||||
void rados_set_osdmap_full_try(rados_ioctx_t io)
|
||||
@ -739,6 +740,26 @@ cdef class Rados(object):
|
||||
for key, value in conf.items():
|
||||
self.conf_set(key, value)
|
||||
|
||||
def get_addrs(self):
|
||||
"""
|
||||
Get associated client addresses with this RADOS session.
|
||||
"""
|
||||
self.require_state("configuring", "connected")
|
||||
|
||||
cdef:
|
||||
char* addrs = NULL
|
||||
|
||||
try:
|
||||
|
||||
with nogil:
|
||||
ret = rados_getaddrs(self.cluster, &addrs)
|
||||
if ret:
|
||||
raise make_ex(ret, "error calling getaddrs")
|
||||
|
||||
return decode_cstr(addrs)
|
||||
finally:
|
||||
free(addrs)
|
||||
|
||||
def require_state(self, *args):
|
||||
"""
|
||||
Checks if the Rados object is in a special state
|
||||
|
Loading…
Reference in New Issue
Block a user