1
0
mirror of https://github.com/ceph/ceph synced 2025-03-25 11:48:05 +00:00

Merge PR into master

* refs/pull/26038/head:
	mds: simplify recall warnings
	mds: add extra details for cache drop output
	qa: test mds_max_caps_per_client conf
	mds: limit maximum number of caps held by session
	mds: adapt drop cache for incremental recall
	mds: recall caps incrementally
	mds: adapt drop cache for incremental trim
	mds: add throttle for trimming MDCache
	mds: cleanup SessionMap init
	mds: cleanup Session init

Reviewed-by: Zheng Yan <zyan@redhat.com>
This commit is contained in:
Patrick Donnelly 2019-01-31 12:08:26 -08:00
commit 0d26266ccb
No known key found for this signature in database
GPG Key ID: 3A2A7E25BEA8AADB
14 changed files with 443 additions and 205 deletions

View File

@ -165,6 +165,26 @@
respectively. This is to clarify that these warnings are related to pg scrubbing respectively. This is to clarify that these warnings are related to pg scrubbing
and are a ratio of the related interval. These options are now enabled by default. and are a ratio of the related interval. These options are now enabled by default.
* The MDS cache trimming is now throttled. Dropping the MDS cache
via the `ceph tell mds.<foo> cache drop` command or large reductions in the
cache size will no longer cause service unavailability.
* The CephFS MDS behavior with recalling caps has been significantly improved
to not attempt recalling too many caps at once, leading to instability.
MDS with a large cache (64GB+) should be more stable.
* MDS now provides a config option "mds_max_caps_per_client" (default: 1M) to
limit the number of caps a client session may hold. Long running client
sessions with a large number of caps have been a source of instability in the
MDS when all of these caps need to be processed during certain session
events. It is recommended to not unnecessarily increase this value.
* The MDS config mds_recall_state_timeout has been removed. Late client recall
warnings are now generated based on the number of caps the MDS has recalled
which have not been released. The new configs mds_recall_warning_threshold
(default: 32K) and mds_recall_warning_decay_rate (default: 60s) sets the
threshold for this warning.
>=13.1.0 >=13.1.0
-------- --------

View File

@ -10,7 +10,6 @@ overrides:
tasks: tasks:
- exec: - exec:
mon.a: mon.a:
- "ceph tell mds.* config set mds_max_ratio_caps_per_client 1"
- "ceph tell mds.* config set mds_min_caps_per_client 1" - "ceph tell mds.* config set mds_min_caps_per_client 1"
- background_exec: - background_exec:
mon.a: mon.a:

View File

@ -42,12 +42,14 @@ class TestClientLimits(CephFSTestCase):
cache_size = open_files/2 cache_size = open_files/2
self.set_conf('mds', 'mds cache size', cache_size) self.set_conf('mds', 'mds cache size', cache_size)
self.set_conf('mds', 'mds_recall_max_caps', open_files/2)
self.set_conf('mds', 'mds_recall_warning_threshold', open_files)
self.fs.mds_fail_restart() self.fs.mds_fail_restart()
self.fs.wait_for_daemons() self.fs.wait_for_daemons()
mds_min_caps_per_client = int(self.fs.get_config("mds_min_caps_per_client")) mds_min_caps_per_client = int(self.fs.get_config("mds_min_caps_per_client"))
mds_recall_warning_decay_rate = self.fs.get_config("mds_recall_warning_decay_rate")
self.assertTrue(open_files >= mds_min_caps_per_client) self.assertTrue(open_files >= mds_min_caps_per_client)
mds_max_ratio_caps_per_client = float(self.fs.get_config("mds_max_ratio_caps_per_client"))
mount_a_client_id = self.mount_a.get_global_id() mount_a_client_id = self.mount_a.get_global_id()
path = "subdir/mount_a" if use_subdir else "mount_a" path = "subdir/mount_a" if use_subdir else "mount_a"
@ -64,13 +66,11 @@ class TestClientLimits(CephFSTestCase):
# MDS should not be happy about that, as the client is failing to comply # MDS should not be happy about that, as the client is failing to comply
# with the SESSION_RECALL messages it is being sent # with the SESSION_RECALL messages it is being sent
mds_recall_state_timeout = float(self.fs.get_config("mds_recall_state_timeout")) self.wait_for_health("MDS_CLIENT_RECALL", mds_recall_warning_decay_rate*2)
self.wait_for_health("MDS_CLIENT_RECALL", mds_recall_state_timeout+10)
# We can also test that the MDS health warning for oversized # We can also test that the MDS health warning for oversized
# cache is functioning as intended. # cache is functioning as intended.
self.wait_for_health("MDS_CACHE_OVERSIZED", self.wait_for_health("MDS_CACHE_OVERSIZED", mds_recall_warning_decay_rate*2)
mds_recall_state_timeout + 10)
# When the client closes the files, it should retain only as many caps as allowed # When the client closes the files, it should retain only as many caps as allowed
# under the SESSION_RECALL policy # under the SESSION_RECALL policy
@ -84,14 +84,13 @@ class TestClientLimits(CephFSTestCase):
# The remaining caps should comply with the numbers sent from MDS in SESSION_RECALL message, # The remaining caps should comply with the numbers sent from MDS in SESSION_RECALL message,
# which depend on the caps outstanding, cache size and overall ratio # which depend on the caps outstanding, cache size and overall ratio
recall_expected_value = int((1.0-mds_max_ratio_caps_per_client)*(open_files+2))
def expected_caps(): def expected_caps():
num_caps = self.get_session(mount_a_client_id)['num_caps'] num_caps = self.get_session(mount_a_client_id)['num_caps']
if num_caps < mds_min_caps_per_client: if num_caps < mds_min_caps_per_client:
raise RuntimeError("client caps fell below min!") raise RuntimeError("client caps fell below min!")
elif num_caps == mds_min_caps_per_client: elif num_caps == mds_min_caps_per_client:
return True return True
elif recall_expected_value*.95 <= num_caps <= recall_expected_value*1.05: elif num_caps < cache_size:
return True return True
else: else:
return False return False
@ -237,3 +236,28 @@ class TestClientLimits(CephFSTestCase):
def test_client_cache_size(self): def test_client_cache_size(self):
self._test_client_cache_size(False) self._test_client_cache_size(False)
self._test_client_cache_size(True) self._test_client_cache_size(True)
def test_client_max_caps(self):
"""
That the MDS will not let a client sit above mds_max_caps_per_client caps.
"""
mds_min_caps_per_client = int(self.fs.get_config("mds_min_caps_per_client"))
mds_max_caps_per_client = 2*mds_min_caps_per_client
self.set_conf('mds', 'mds_max_caps_per_client', mds_max_caps_per_client)
self.fs.mds_fail_restart()
self.fs.wait_for_daemons()
self.mount_a.create_n_files("foo/", 3*mds_max_caps_per_client, sync=True)
mount_a_client_id = self.mount_a.get_global_id()
def expected_caps():
num_caps = self.get_session(mount_a_client_id)['num_caps']
if num_caps < mds_min_caps_per_client:
raise RuntimeError("client caps fell below min!")
elif num_caps <= mds_max_caps_per_client:
return True
else:
return False
self.wait_until_true(expected_caps, timeout=60)

View File

@ -410,7 +410,6 @@ OPTION(mds_session_blacklist_on_timeout, OPT_BOOL) // whether to blacklist cl
OPTION(mds_session_blacklist_on_evict, OPT_BOOL) // whether to blacklist clients whose sessions are dropped via admin commands OPTION(mds_session_blacklist_on_evict, OPT_BOOL) // whether to blacklist clients whose sessions are dropped via admin commands
OPTION(mds_sessionmap_keys_per_op, OPT_U32) // how many sessions should I try to load/store in a single OMAP operation? OPTION(mds_sessionmap_keys_per_op, OPT_U32) // how many sessions should I try to load/store in a single OMAP operation?
OPTION(mds_recall_state_timeout, OPT_FLOAT) // detect clients which aren't trimming caps
OPTION(mds_freeze_tree_timeout, OPT_FLOAT) // detecting freeze tree deadlock OPTION(mds_freeze_tree_timeout, OPT_FLOAT) // detecting freeze tree deadlock
OPTION(mds_health_summarize_threshold, OPT_INT) // collapse N-client health metrics to a single 'many' OPTION(mds_health_summarize_threshold, OPT_INT) // collapse N-client health metrics to a single 'many'
OPTION(mds_reconnect_timeout, OPT_FLOAT) // seconds to wait for clients during mds restart OPTION(mds_reconnect_timeout, OPT_FLOAT) // seconds to wait for clients during mds restart

View File

@ -7179,6 +7179,14 @@ std::vector<Option> get_mds_options() {
.set_default(.7) .set_default(.7)
.set_description("midpoint for MDS cache LRU"), .set_description("midpoint for MDS cache LRU"),
Option("mds_cache_trim_decay_rate", Option::TYPE_FLOAT, Option::LEVEL_ADVANCED)
.set_default(1)
.set_description("decay rate for trimming MDS cache throttle"),
Option("mds_cache_trim_threshold", Option::TYPE_SIZE, Option::LEVEL_ADVANCED)
.set_default(64_K)
.set_description("threshold for number of dentries that can be trimmed"),
Option("mds_max_file_recover", Option::TYPE_UINT, Option::LEVEL_ADVANCED) Option("mds_max_file_recover", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(32) .set_default(32)
.set_description("maximum number of files to recover file sizes in parallel"), .set_description("maximum number of files to recover file sizes in parallel"),
@ -7223,9 +7231,29 @@ std::vector<Option> get_mds_options() {
.set_default(1024) .set_default(1024)
.set_description("number of omap keys to read from the SessionMap in one operation"), .set_description("number of omap keys to read from the SessionMap in one operation"),
Option("mds_recall_state_timeout", Option::TYPE_FLOAT, Option::LEVEL_ADVANCED) Option("mds_recall_max_caps", Option::TYPE_SIZE, Option::LEVEL_ADVANCED)
.set_default(60) .set_default(5000)
.set_description("timeout for clients late on cap recall to create health warnings"), .set_description("maximum number of caps to recall from client session in single recall"),
Option("mds_recall_max_decay_rate", Option::TYPE_FLOAT, Option::LEVEL_ADVANCED)
.set_default(2.5)
.set_description("decay rate for throttle on recalled caps on a session"),
Option("mds_recall_max_decay_threshold", Option::TYPE_SIZE, Option::LEVEL_ADVANCED)
.set_default(16_K)
.set_description("decay threshold for throttle on recalled caps on a session"),
Option("mds_recall_global_max_decay_threshold", Option::TYPE_SIZE, Option::LEVEL_ADVANCED)
.set_default(64_K)
.set_description("decay threshold for throttle on recalled caps globally"),
Option("mds_recall_warning_threshold", Option::TYPE_SIZE, Option::LEVEL_ADVANCED)
.set_default(32_K)
.set_description("decay threshold for warning on slow session cap recall"),
Option("mds_recall_warning_decay_rate", Option::TYPE_FLOAT, Option::LEVEL_ADVANCED)
.set_default(60.0)
.set_description("decay rate for warning on slow session cap recall"),
Option("mds_freeze_tree_timeout", Option::TYPE_FLOAT, Option::LEVEL_DEV) Option("mds_freeze_tree_timeout", Option::TYPE_FLOAT, Option::LEVEL_DEV)
.set_default(30) .set_default(30)
@ -7605,9 +7633,9 @@ std::vector<Option> get_mds_options() {
.set_default(100) .set_default(100)
.set_description("minimum number of capabilities a client may hold"), .set_description("minimum number of capabilities a client may hold"),
Option("mds_max_ratio_caps_per_client", Option::TYPE_FLOAT, Option::LEVEL_DEV) Option("mds_max_caps_per_client", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(.8) .set_default(1_M)
.set_description("maximum ratio of current caps that may be recalled during MDS cache pressure"), .set_description("maximum number of capabilities a client may hold"),
Option("mds_hack_allow_loading_invalid_metadata", Option::TYPE_BOOL, Option::LEVEL_ADVANCED) Option("mds_hack_allow_loading_invalid_metadata", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
.set_default(0) .set_default(0)

View File

@ -374,40 +374,27 @@ void Beacon::notify_health(MDSRank const *mds)
set<Session*> sessions; set<Session*> sessions;
mds->sessionmap.get_client_session_set(sessions); mds->sessionmap.get_client_session_set(sessions);
auto mds_recall_state_timeout = g_conf()->mds_recall_state_timeout; const auto recall_warning_threshold = g_conf().get_val<Option::size_t>("mds_recall_warning_threshold");
auto last_recall = mds->mdcache->last_recall_state; const auto max_completed_requests = g_conf()->mds_max_completed_requests;
auto last_recall_span = std::chrono::duration<double>(clock::now()-last_recall).count(); const auto max_completed_flushes = g_conf()->mds_max_completed_flushes;
bool recall_state_timedout = last_recall_span > mds_recall_state_timeout;
std::list<MDSHealthMetric> late_recall_metrics; std::list<MDSHealthMetric> late_recall_metrics;
std::list<MDSHealthMetric> large_completed_requests_metrics; std::list<MDSHealthMetric> large_completed_requests_metrics;
for (auto& session : sessions) { for (auto& session : sessions) {
if (session->recalled_at != Session::clock::zero()) { const uint64_t recall_caps = session->get_recall_caps();
auto last_recall_sent = session->last_recall_sent; if (recall_caps > recall_warning_threshold) {
auto recalled_at = session->recalled_at; dout(2) << "Session " << *session <<
auto recalled_at_span = std::chrono::duration<double>(clock::now()-recalled_at).count(); " is not releasing caps fast enough. Recalled caps at " << recall_caps
<< " > " << recall_warning_threshold << " (mds_recall_warning_threshold)." << dendl;
dout(20) << "Session servicing RECALL " << session->info.inst std::ostringstream oss;
<< ": " << recalled_at_span << "s ago " << session->recall_release_count oss << "Client " << session->get_human_name() << " failing to respond to cache pressure";
<< "/" << session->recall_count << dendl; MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL, HEALTH_WARN, oss.str());
if (recall_state_timedout || last_recall_sent < last_recall) { m.metadata["client_id"] = stringify(session->get_client());
dout(20) << " no longer recall" << dendl; late_recall_metrics.push_back(m);
session->clear_recalled_at();
} else if (recalled_at_span > mds_recall_state_timeout) {
dout(20) << " exceeded timeout " << recalled_at_span << " vs. " << mds_recall_state_timeout << dendl;
std::ostringstream oss;
oss << "Client " << session->get_human_name() << " failing to respond to cache pressure";
MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL, HEALTH_WARN, oss.str());
m.metadata["client_id"] = stringify(session->get_client());
late_recall_metrics.push_back(m);
} else {
dout(20) << " within timeout " << recalled_at_span << " vs. " << mds_recall_state_timeout << dendl;
}
} }
if ((session->get_num_trim_requests_warnings() > 0 && if ((session->get_num_trim_requests_warnings() > 0 &&
session->get_num_completed_requests() >= g_conf()->mds_max_completed_requests) || session->get_num_completed_requests() >= max_completed_requests) ||
(session->get_num_trim_flushes_warnings() > 0 && (session->get_num_trim_flushes_warnings() > 0 &&
session->get_num_completed_flushes() >= g_conf()->mds_max_completed_flushes)) { session->get_num_completed_flushes() >= max_completed_flushes)) {
std::ostringstream oss; std::ostringstream oss;
oss << "Client " << session->get_human_name() << " failing to advance its oldest client/flush tid"; oss << "Client " << session->get_human_name() << " failing to advance its oldest client/flush tid";
MDSHealthMetric m(MDS_HEALTH_CLIENT_OLDEST_TID, HEALTH_WARN, oss.str()); MDSHealthMetric m(MDS_HEALTH_CLIENT_OLDEST_TID, HEALTH_WARN, oss.str());

View File

@ -139,6 +139,7 @@ MDCache::MDCache(MDSRank *m, PurgeQueue &purge_queue_) :
exceeded_size_limit(false), exceeded_size_limit(false),
recovery_queue(m), recovery_queue(m),
stray_manager(m, purge_queue_), stray_manager(m, purge_queue_),
trim_counter(g_conf().get_val<double>("mds_cache_trim_decay_rate")),
open_file_table(m) open_file_table(m)
{ {
migrator.reset(new Migrator(mds, this)); migrator.reset(new Migrator(mds, this));
@ -211,6 +212,9 @@ void MDCache::handle_conf_change(const ConfigProxy& conf,
cache_health_threshold = g_conf().get_val<double>("mds_health_cache_threshold"); cache_health_threshold = g_conf().get_val<double>("mds_health_cache_threshold");
if (changed.count("mds_cache_mid")) if (changed.count("mds_cache_mid"))
lru.lru_set_midpoint(g_conf().get_val<double>("mds_cache_mid")); lru.lru_set_midpoint(g_conf().get_val<double>("mds_cache_mid"));
if (changed.count("mds_cache_trim_decay_rate")) {
trim_counter = DecayCounter(g_conf().get_val<double>("mds_cache_trim_decay_rate"));
}
migrator->handle_conf_change(conf, changed, mdsmap); migrator->handle_conf_change(conf, changed, mdsmap);
mds->balancer->handle_conf_change(conf, changed, mdsmap); mds->balancer->handle_conf_change(conf, changed, mdsmap);
@ -6528,12 +6532,14 @@ void MDCache::start_recovered_truncates()
// ================================================================================ // ================================================================================
// cache trimming // cache trimming
void MDCache::trim_lru(uint64_t count, expiremap& expiremap) std::pair<bool, uint64_t> MDCache::trim_lru(uint64_t count, expiremap& expiremap)
{ {
bool is_standby_replay = mds->is_standby_replay(); bool is_standby_replay = mds->is_standby_replay();
std::vector<CDentry *> unexpirables; std::vector<CDentry *> unexpirables;
uint64_t trimmed = 0; uint64_t trimmed = 0;
auto trim_threshold = g_conf().get_val<Option::size_t>("mds_cache_trim_threshold");
dout(7) << "trim_lru trimming " << count dout(7) << "trim_lru trimming " << count
<< " items from LRU" << " items from LRU"
<< " size=" << lru.lru_get_size() << " size=" << lru.lru_get_size()
@ -6542,7 +6548,11 @@ void MDCache::trim_lru(uint64_t count, expiremap& expiremap)
<< " pinned=" << lru.lru_get_num_pinned() << " pinned=" << lru.lru_get_num_pinned()
<< dendl; << dendl;
for (;;) { const uint64_t trim_counter_start = trim_counter.get();
bool throttled = false;
while (1) {
throttled |= trim_counter_start+trimmed >= trim_threshold;
if (throttled) break;
CDentry *dn = static_cast<CDentry*>(bottom_lru.lru_expire()); CDentry *dn = static_cast<CDentry*>(bottom_lru.lru_expire());
if (!dn) if (!dn)
break; break;
@ -6559,7 +6569,9 @@ void MDCache::trim_lru(uint64_t count, expiremap& expiremap)
unexpirables.clear(); unexpirables.clear();
// trim dentries from the LRU until count is reached // trim dentries from the LRU until count is reached
while (cache_toofull() || count > 0) { while (!throttled && (cache_toofull() || count > 0)) {
throttled |= trim_counter_start+trimmed >= trim_threshold;
if (throttled) break;
CDentry *dn = static_cast<CDentry*>(lru.lru_expire()); CDentry *dn = static_cast<CDentry*>(lru.lru_expire());
if (!dn) { if (!dn) {
break; break;
@ -6574,6 +6586,7 @@ void MDCache::trim_lru(uint64_t count, expiremap& expiremap)
if (count > 0) count--; if (count > 0) count--;
} }
} }
trim_counter.hit(trimmed);
for (auto &dn : unexpirables) { for (auto &dn : unexpirables) {
lru.lru_insert_mid(dn); lru.lru_insert_mid(dn);
@ -6581,6 +6594,7 @@ void MDCache::trim_lru(uint64_t count, expiremap& expiremap)
unexpirables.clear(); unexpirables.clear();
dout(7) << "trim_lru trimmed " << trimmed << " items" << dendl; dout(7) << "trim_lru trimmed " << trimmed << " items" << dendl;
return std::pair<bool, uint64_t>(throttled, trimmed);
} }
/* /*
@ -6589,7 +6603,7 @@ void MDCache::trim_lru(uint64_t count, expiremap& expiremap)
* *
* @param count is number of dentries to try to expire * @param count is number of dentries to try to expire
*/ */
bool MDCache::trim(uint64_t count) std::pair<bool, uint64_t> MDCache::trim(uint64_t count)
{ {
uint64_t used = cache_size(); uint64_t used = cache_size();
uint64_t limit = cache_memory_limit; uint64_t limit = cache_memory_limit;
@ -6603,7 +6617,8 @@ bool MDCache::trim(uint64_t count)
// process delayed eval_stray() // process delayed eval_stray()
stray_manager.advance_delayed(); stray_manager.advance_delayed();
trim_lru(count, expiremap); auto result = trim_lru(count, expiremap);
auto& trimmed = result.second;
// trim non-auth, non-bound subtrees // trim non-auth, non-bound subtrees
for (auto p = subtrees.begin(); p != subtrees.end();) { for (auto p = subtrees.begin(); p != subtrees.end();) {
@ -6619,6 +6634,7 @@ bool MDCache::trim(uint64_t count)
continue; continue;
migrator->export_empty_import(dir); migrator->export_empty_import(dir);
++trimmed;
} }
} else { } else {
if (!diri->is_auth()) { if (!diri->is_auth()) {
@ -6635,6 +6651,7 @@ bool MDCache::trim(uint64_t count)
rejoin_ack_gather.count(dir->get_dir_auth().first)) rejoin_ack_gather.count(dir->get_dir_auth().first))
continue; continue;
trim_dirfrag(dir, 0, expiremap); trim_dirfrag(dir, 0, expiremap);
++trimmed;
} }
} }
} }
@ -6645,11 +6662,15 @@ bool MDCache::trim(uint64_t count)
root->get_dirfrags(ls); root->get_dirfrags(ls);
for (list<CDir*>::iterator p = ls.begin(); p != ls.end(); ++p) { for (list<CDir*>::iterator p = ls.begin(); p != ls.end(); ++p) {
CDir *dir = *p; CDir *dir = *p;
if (dir->get_num_ref() == 1) // subtree pin if (dir->get_num_ref() == 1) { // subtree pin
trim_dirfrag(dir, 0, expiremap); trim_dirfrag(dir, 0, expiremap);
++trimmed;
}
} }
if (root->get_num_ref() == 0) if (root->get_num_ref() == 0) {
trim_inode(0, root, 0, expiremap); trim_inode(0, root, 0, expiremap);
++trimmed;
}
} }
std::set<mds_rank_t> stopping; std::set<mds_rank_t> stopping;
@ -6673,11 +6694,15 @@ bool MDCache::trim(uint64_t count)
list<CDir*> ls; list<CDir*> ls;
mdsdir_in->get_dirfrags(ls); mdsdir_in->get_dirfrags(ls);
for (auto dir : ls) { for (auto dir : ls) {
if (dir->get_num_ref() == 1) // subtree pin if (dir->get_num_ref() == 1) { // subtree pin
trim_dirfrag(dir, dir, expiremap); trim_dirfrag(dir, dir, expiremap);
++trimmed;
}
} }
if (mdsdir_in->get_num_ref() == 0) if (mdsdir_in->get_num_ref() == 0) {
trim_inode(NULL, mdsdir_in, NULL, expiremap); trim_inode(NULL, mdsdir_in, NULL, expiremap);
++trimmed;
}
} else { } else {
dout(20) << __func__ << ": some unexpirable contents in mdsdir" << dendl; dout(20) << __func__ << ": some unexpirable contents in mdsdir" << dendl;
} }
@ -6694,6 +6719,7 @@ bool MDCache::trim(uint64_t count)
dout(20) << __func__ << ": maybe trimming base: " << *base_in << dendl; dout(20) << __func__ << ": maybe trimming base: " << *base_in << dendl;
if (base_in->get_num_ref() == 0) { if (base_in->get_num_ref() == 0) {
trim_inode(NULL, base_in, NULL, expiremap); trim_inode(NULL, base_in, NULL, expiremap);
++trimmed;
} }
} }
} }
@ -6702,7 +6728,7 @@ bool MDCache::trim(uint64_t count)
// send any expire messages // send any expire messages
send_expire_messages(expiremap); send_expire_messages(expiremap);
return true; return result;
} }
void MDCache::send_expire_messages(expiremap& expiremap) void MDCache::send_expire_messages(expiremap& expiremap)
@ -7539,8 +7565,7 @@ void MDCache::check_memory_usage()
mds->mlogger->set(l_mdm_heap, last.get_heap()); mds->mlogger->set(l_mdm_heap, last.get_heap());
if (cache_toofull()) { if (cache_toofull()) {
last_recall_state = clock::now(); mds->server->recall_client_state(nullptr);
mds->server->recall_client_state(-1.0, false, nullptr);
} }
// If the cache size had exceeded its limit, but we're back in bounds // If the cache size had exceeded its limit, but we're back in bounds

View File

@ -19,6 +19,7 @@
#include <string_view> #include <string_view>
#include "common/DecayCounter.h"
#include "include/types.h" #include "include/types.h"
#include "include/filepath.h" #include "include/filepath.h"
#include "include/elist.h" #include "include/elist.h"
@ -743,9 +744,9 @@ public:
size_t get_cache_size() { return lru.lru_get_size(); } size_t get_cache_size() { return lru.lru_get_size(); }
// trimming // trimming
bool trim(uint64_t count=0); std::pair<bool, uint64_t> trim(uint64_t count=0);
private: private:
void trim_lru(uint64_t count, expiremap& expiremap); std::pair<bool, uint64_t> trim_lru(uint64_t count, expiremap& expiremap);
bool trim_dentry(CDentry *dn, expiremap& expiremap); bool trim_dentry(CDentry *dn, expiremap& expiremap);
void trim_dirfrag(CDir *dir, CDir *con, expiremap& expiremap); void trim_dirfrag(CDir *dir, CDir *con, expiremap& expiremap);
bool trim_inode(CDentry *dn, CInode *in, CDir *con, expiremap&); bool trim_inode(CDentry *dn, CInode *in, CDir *con, expiremap&);
@ -779,8 +780,6 @@ public:
void trim_client_leases(); void trim_client_leases();
void check_memory_usage(); void check_memory_usage();
time last_recall_state;
// shutdown // shutdown
private: private:
set<inodeno_t> shutdown_exporting_strays; set<inodeno_t> shutdown_exporting_strays;
@ -1187,6 +1186,10 @@ private:
LogSegment *ls, bufferlist *rollback=NULL); LogSegment *ls, bufferlist *rollback=NULL);
void finish_uncommitted_fragment(dirfrag_t basedirfrag, int op); void finish_uncommitted_fragment(dirfrag_t basedirfrag, int op);
void rollback_uncommitted_fragment(dirfrag_t basedirfrag, frag_vec_t&& old_frags); void rollback_uncommitted_fragment(dirfrag_t basedirfrag, frag_vec_t&& old_frags);
DecayCounter trim_counter;
public: public:
void wait_for_uncommitted_fragment(dirfrag_t dirfrag, MDSInternalContextBase *c) { void wait_for_uncommitted_fragment(dirfrag_t dirfrag, MDSInternalContextBase *c) {
ceph_assert(uncommitted_fragments.count(dirfrag)); ceph_assert(uncommitted_fragments.count(dirfrag));

View File

@ -370,6 +370,7 @@ const char** MDSDaemon::get_tracked_conf_keys() const
"mds_health_cache_threshold", "mds_health_cache_threshold",
"mds_cache_mid", "mds_cache_mid",
"mds_dump_cache_threshold_formatter", "mds_dump_cache_threshold_formatter",
"mds_cache_trim_decay_rate",
"mds_dump_cache_threshold_file", "mds_dump_cache_threshold_file",
// MDBalancer // MDBalancer
"mds_bal_fragment_dirs", "mds_bal_fragment_dirs",
@ -383,8 +384,10 @@ const char** MDSDaemon::get_tracked_conf_keys() const
"mds_inject_migrator_session_race", "mds_inject_migrator_session_race",
"host", "host",
"fsid", "fsid",
"mds_request_load_average_decay_rate",
"mds_cap_revoke_eviction_timeout", "mds_cap_revoke_eviction_timeout",
// SessionMap
"mds_request_load_average_decay_rate",
"mds_recall_max_decay_rate",
NULL NULL
}; };
return KEYS; return KEYS;

View File

@ -245,7 +245,8 @@ public:
Formatter *f, Context *on_finish) Formatter *f, Context *on_finish)
: MDSInternalContext(mds), : MDSInternalContext(mds),
server(server), mdcache(mdcache), mdlog(mdlog), server(server), mdcache(mdcache), mdlog(mdlog),
recall_timeout(recall_timeout), f(f), on_finish(on_finish), recall_timeout(recall_timeout), recall_start(mono_clock::now()),
f(f), on_finish(on_finish),
whoami(mds->whoami), incarnation(mds->incarnation) { whoami(mds->whoami), incarnation(mds->incarnation) {
} }
@ -255,6 +256,7 @@ public:
assert(mds->mds_lock.is_locked()); assert(mds->mds_lock.is_locked());
dout(20) << __func__ << dendl; dout(20) << __func__ << dendl;
f->open_object_section("result");
recall_client_state(); recall_client_state();
} }
@ -313,25 +315,40 @@ private:
void recall_client_state() { void recall_client_state() {
dout(20) << __func__ << dendl; dout(20) << __func__ << dendl;
auto now = mono_clock::now();
f->open_object_section("result"); auto duration = std::chrono::duration<double>(now-recall_start).count();
MDSGatherBuilder *gather = new MDSGatherBuilder(g_ceph_context); MDSGatherBuilder *gather = new MDSGatherBuilder(g_ceph_context);
server->recall_client_state(1.0, true, gather); auto [throttled, count] = server->recall_client_state(gather, Server::RecallFlags::STEADY);
if (!gather->has_subs()) { dout(10) << __func__
handle_recall_client_state(0); << (throttled ? " (throttled)" : "")
delete gather; << " recalled " << count << " caps" << dendl;
return;
caps_recalled += count;
if ((throttled || count > 0) && (recall_timeout == 0 || duration < recall_timeout)) {
auto timer = new FunctionContext([this](int _) {
recall_client_state();
});
mds->timer.add_event_after(1.0, timer);
} else {
if (!gather->has_subs()) {
delete gather;
return handle_recall_client_state(0);
} else if (recall_timeout > 0 && duration > recall_timeout) {
delete gather;
return handle_recall_client_state(-ETIMEDOUT);
} else {
uint64_t remaining = (recall_timeout == 0 ? 0 : recall_timeout-duration);
C_ContextTimeout *ctx = new C_ContextTimeout(
mds, remaining, new FunctionContext([this](int r) {
handle_recall_client_state(r);
}));
ctx->start_timer();
gather->set_finisher(new MDSInternalContextWrapper(mds, ctx));
gather->activate();
}
} }
C_ContextTimeout *ctx = new C_ContextTimeout(
mds, recall_timeout, new FunctionContext([this](int r) {
handle_recall_client_state(r);
}));
ctx->start_timer();
gather->set_finisher(new MDSInternalContextWrapper(mds, ctx));
gather->activate();
} }
void handle_recall_client_state(int r) { void handle_recall_client_state(int r) {
@ -341,6 +358,7 @@ private:
f->open_object_section("client_recall"); f->open_object_section("client_recall");
f->dump_int("return_code", r); f->dump_int("return_code", r);
f->dump_string("message", cpp_strerror(r)); f->dump_string("message", cpp_strerror(r));
f->dump_int("recalled", caps_recalled);
f->close_section(); f->close_section();
// we can still continue after recall timeout // we can still continue after recall timeout
@ -379,21 +397,30 @@ private:
void trim_cache() { void trim_cache() {
dout(20) << __func__ << dendl; dout(20) << __func__ << dendl;
if (!mdcache->trim(UINT64_MAX)) { auto [throttled, count] = mdcache->trim(UINT64_MAX);
cmd_err(f, "failed to trim cache"); dout(10) << __func__
complete(-EINVAL); << (throttled ? " (throttled)" : "")
return; << " trimmed " << count << " caps" << dendl;
dentries_trimmed += count;
if (throttled && count > 0) {
auto timer = new FunctionContext([this](int _) {
trim_cache();
});
mds->timer.add_event_after(1.0, timer);
} else {
cache_status();
} }
cache_status();
} }
void cache_status() { void cache_status() {
dout(20) << __func__ << dendl; dout(20) << __func__ << dendl;
f->open_object_section("trim_cache");
f->dump_int("trimmed", dentries_trimmed);
f->close_section();
// cache status section // cache status section
mdcache->cache_status(f); mdcache->cache_status(f);
f->close_section();
complete(0); complete(0);
} }
@ -401,6 +428,10 @@ private:
void finish(int r) override { void finish(int r) override {
dout(20) << __func__ << ": r=" << r << dendl; dout(20) << __func__ << ": r=" << r << dendl;
auto d = std::chrono::duration<double>(mono_clock::now()-recall_start);
f->dump_float("duration", d.count());
f->close_section();
on_finish->complete(r); on_finish->complete(r);
} }
@ -408,11 +439,14 @@ private:
MDCache *mdcache; MDCache *mdcache;
MDLog *mdlog; MDLog *mdlog;
uint64_t recall_timeout; uint64_t recall_timeout;
mono_time recall_start;
Formatter *f; Formatter *f;
Context *on_finish; Context *on_finish;
int retval = 0; int retval = 0;
std::stringstream ss; std::stringstream ss;
uint64_t caps_recalled = 0;
uint64_t dentries_trimmed = 0;
// so as to use dout // so as to use dout
mds_rank_t whoami; mds_rank_t whoami;
@ -693,6 +727,7 @@ void MDSRankDispatcher::tick()
sessionmap.update_average_session_age(); sessionmap.update_average_session_age();
if (is_active() || is_stopping()) { if (is_active() || is_stopping()) {
server->recall_client_state(nullptr, Server::RecallFlags::ENFORCE_MAX);
mdcache->trim(); mdcache->trim();
mdcache->trim_client_leases(); mdcache->trim_client_leases();
mdcache->check_memory_usage(); mdcache->check_memory_usage();

View File

@ -17,6 +17,7 @@
#include <boost/config/warning_disable.hpp> #include <boost/config/warning_disable.hpp>
#include <boost/fusion/include/std_pair.hpp> #include <boost/fusion/include/std_pair.hpp>
#include <boost/range/adaptor/reversed.hpp>
#include "MDSRank.h" #include "MDSRank.h"
#include "Server.h" #include "Server.h"
@ -49,6 +50,7 @@
#include "osd/OSDMap.h" #include "osd/OSDMap.h"
#include <errno.h> #include <errno.h>
#include <math.h>
#include <list> #include <list>
#include <iostream> #include <iostream>
@ -188,7 +190,8 @@ Server::Server(MDSRank *m) :
reconnect_done(NULL), reconnect_done(NULL),
failed_reconnects(0), failed_reconnects(0),
reconnect_evicting(false), reconnect_evicting(false),
terminating_sessions(false) terminating_sessions(false),
recall_throttle(g_conf().get_val<double>("mds_recall_max_decay_rate"))
{ {
supported_features = feature_bitset_t(CEPHFS_FEATURES_MDS_SUPPORTED); supported_features = feature_bitset_t(CEPHFS_FEATURES_MDS_SUPPORTED);
} }
@ -1072,6 +1075,9 @@ void Server::handle_conf_change(const ConfigProxy& conf,
dout(20) << __func__ << " cap revoke eviction timeout changed to " dout(20) << __func__ << " cap revoke eviction timeout changed to "
<< cap_revoke_eviction_timeout << dendl; << cap_revoke_eviction_timeout << dendl;
} }
if (changed.count("mds_recall_max_decay_rate")) {
recall_throttle = DecayCounter(g_conf().get_val<double>("mds_recall_max_decay_rate"));
}
} }
/* /*
@ -1510,62 +1516,122 @@ void Server::recover_filelocks(CInode *in, bufferlist locks, int64_t client)
} }
} }
/** /**
* Call this when the MDCache is oversized, to send requests to the clients * Call this when the MDCache is oversized, to send requests to the clients
* to trim some caps, and consequently unpin some inodes in the MDCache so * to trim some caps, and consequently unpin some inodes in the MDCache so
* that it can trim too. * that it can trim too.
*/ */
void Server::recall_client_state(double ratio, bool flush_client_session, std::pair<bool, uint64_t> Server::recall_client_state(MDSGatherBuilder* gather, RecallFlags flags)
MDSGatherBuilder *gather) { {
if (flush_client_session) { const auto now = clock::now();
assert(gather != nullptr); const bool steady = flags&RecallFlags::STEADY;
} const bool enforce_max = flags&RecallFlags::ENFORCE_MAX;
/* try to recall at least 80% of all caps */ const auto max_caps_per_client = g_conf().get_val<uint64_t>("mds_max_caps_per_client");
uint64_t max_caps_per_client = Capability::count() * g_conf().get_val<double>("mds_max_ratio_caps_per_client"); const auto min_caps_per_client = g_conf().get_val<uint64_t>("mds_min_caps_per_client");
uint64_t min_caps_per_client = g_conf().get_val<uint64_t>("mds_min_caps_per_client"); const auto recall_global_max_decay_threshold = g_conf().get_val<Option::size_t>("mds_recall_global_max_decay_threshold");
if (max_caps_per_client < min_caps_per_client) { const auto recall_max_caps = g_conf().get_val<Option::size_t>("mds_recall_max_caps");
dout(0) << "max_caps_per_client " << max_caps_per_client const auto recall_max_decay_threshold = g_conf().get_val<Option::size_t>("mds_recall_max_decay_threshold");
<< " < min_caps_per_client " << min_caps_per_client << dendl;
max_caps_per_client = min_caps_per_client + 1;
}
/* unless this ratio is smaller: */ dout(7) << __func__ << ":"
/* ratio: determine the amount of caps to recall from each client. Use << " min=" << min_caps_per_client
* percentage full over the cache reservation. Cap the ratio at 80% of client << " max=" << max_caps_per_client
* caps. */ << " total=" << Capability::count()
if (ratio < 0.0) << " flags=0x" << std::hex << flags
ratio = 1.0 - fmin(0.80, mdcache->cache_toofull_ratio()); << dendl;
dout(10) << __func__ << ": ratio=" << ratio << ", caps per client " /* trim caps of sessions with the most caps first */
<< min_caps_per_client << "-" << max_caps_per_client << dendl; std::multimap<uint64_t, Session*> caps_session;
auto f = [&caps_session, enforce_max, max_caps_per_client](auto& s) {
auto num_caps = s->caps.size();
if (!enforce_max || num_caps > max_caps_per_client) {
caps_session.emplace(std::piecewise_construct, std::forward_as_tuple(num_caps), std::forward_as_tuple(s));
}
};
mds->sessionmap.get_client_sessions(std::move(f));
set<Session*> sessions; std::pair<bool, uint64_t> result = {false, 0};
mds->sessionmap.get_client_session_set(sessions); auto& [throttled, caps_recalled] = result;
last_recall_state = now;
for (auto &session : sessions) { for (const auto& [num_caps, session] : boost::adaptors::reverse(caps_session)) {
if (!session->is_open() || if (!session->is_open() ||
!session->get_connection() || !session->get_connection() ||
!session->info.inst.name.is_client()) !session->info.inst.name.is_client())
continue; continue;
dout(10) << " session " << session->info.inst dout(10) << __func__ << ":"
<< " caps " << session->caps.size() << " session " << session->info.inst
<< " caps " << num_caps
<< ", leases " << session->leases.size() << ", leases " << session->leases.size()
<< dendl; << dendl;
uint64_t newlim = std::max(std::min<uint64_t>((session->caps.size() * ratio), max_caps_per_client), min_caps_per_client); uint64_t newlim;
if (session->caps.size() > newlim) { if (num_caps < recall_max_caps || (num_caps-recall_max_caps) < min_caps_per_client) {
newlim = min_caps_per_client;
} else {
newlim = num_caps-recall_max_caps;
}
if (num_caps > newlim) {
/* now limit the number of caps we recall at a time to prevent overloading ourselves */
uint64_t recall = std::min<uint64_t>(recall_max_caps, num_caps-newlim);
newlim = num_caps-recall;
const uint64_t session_recall_throttle = session->get_recall_caps_throttle();
const uint64_t global_recall_throttle = recall_throttle.get();
if (session_recall_throttle+recall > recall_max_decay_threshold) {
dout(15) << " session recall threshold (" << recall_max_decay_threshold << ") hit at " << session_recall_throttle << "; skipping!" << dendl;
throttled = true;
continue;
} else if (global_recall_throttle+recall > recall_global_max_decay_threshold) {
dout(15) << " global recall threshold (" << recall_global_max_decay_threshold << ") hit at " << global_recall_throttle << "; skipping!" << dendl;
throttled = true;
break;
}
// now check if we've recalled caps recently and the client is unlikely to satisfy a new recall
if (steady) {
const auto session_recall = session->get_recall_caps();
const auto session_release = session->get_release_caps();
if (2*session_release < session_recall && 2*session_recall > recall_max_decay_threshold) {
/* The session has been unable to keep up with the number of caps
* recalled (by half); additionally, to prevent marking sessions
* we've just begun to recall from, the session_recall counter
* (decayed count of caps recently recalled) is **greater** than the
* session threshold for the session's cap recall throttle.
*/
dout(15) << " 2*session_release < session_recall"
" (2*" << session_release << " < " << session_recall << ");"
" Skipping because we are unlikely to get more released." << dendl;
continue;
} else if (recall < recall_max_caps && 2*recall < session_recall) {
/* The number of caps recalled is less than the number we *could*
* recall (so there isn't much left to recall?) and the number of
* caps is less than the current recall_caps counter (decayed count
* of caps recently recalled).
*/
dout(15) << " 2*recall < session_recall "
" (2*" << recall << " < " << session_recall << ") &&"
" recall < recall_max_caps (" << recall << " < " << recall_max_caps << ");"
" Skipping because we are unlikely to get more released." << dendl;
continue;
}
}
dout(7) << " recalling " << recall << " caps; session_recall_throttle = " << session_recall_throttle << "; global_recall_throttle = " << global_recall_throttle << dendl;
auto m = MClientSession::create(CEPH_SESSION_RECALL_STATE); auto m = MClientSession::create(CEPH_SESSION_RECALL_STATE);
m->head.max_caps = newlim; m->head.max_caps = newlim;
mds->send_message_client(m, session); mds->send_message_client(m, session);
if (flush_client_session) { if (gather) {
flush_session(session, gather); flush_session(session, gather);
} }
session->notify_recall_sent(newlim); caps_recalled += session->notify_recall_sent(newlim);
recall_throttle.hit(recall);
} }
} }
dout(7) << "recalled" << (throttled ? " (throttled)" : "") << " " << caps_recalled << " client caps." << dendl;
return result;
} }
void Server::force_clients_readonly() void Server::force_clients_readonly()

View File

@ -17,6 +17,8 @@
#include <string_view> #include <string_view>
#include <common/DecayCounter.h>
#include "messages/MClientReconnect.h" #include "messages/MClientReconnect.h"
#include "messages/MClientReply.h" #include "messages/MClientReply.h"
#include "messages/MClientRequest.h" #include "messages/MClientRequest.h"
@ -130,6 +132,10 @@ public:
bool waiting_for_reconnect(client_t c) const; bool waiting_for_reconnect(client_t c) const;
void dump_reconnect_status(Formatter *f) const; void dump_reconnect_status(Formatter *f) const;
time last_recalled() const {
return last_recall_state;
}
void handle_client_session(const MClientSession::const_ref &m); void handle_client_session(const MClientSession::const_ref &m);
void _session_logged(Session *session, uint64_t state_seq, void _session_logged(Session *session, uint64_t state_seq,
bool open, version_t pv, interval_set<inodeno_t>& inos,version_t piv); bool open, version_t pv, interval_set<inodeno_t>& inos,version_t piv);
@ -163,8 +169,12 @@ public:
void reconnect_tick(); void reconnect_tick();
void recover_filelocks(CInode *in, bufferlist locks, int64_t client); void recover_filelocks(CInode *in, bufferlist locks, int64_t client);
void recall_client_state(double ratio, bool flush_client_session, enum RecallFlags {
MDSGatherBuilder *gather); NONE = 0,
STEADY = (1<<0),
ENFORCE_MAX = (1<<1),
};
std::pair<bool, uint64_t> recall_client_state(MDSGatherBuilder* gather, enum RecallFlags=RecallFlags::NONE);
void force_clients_readonly(); void force_clients_readonly();
// -- requests -- // -- requests --
@ -339,6 +349,9 @@ public:
private: private:
void reply_client_request(MDRequestRef& mdr, const MClientReply::ref &reply); void reply_client_request(MDRequestRef& mdr, const MClientReply::ref &reply);
void flush_session(Session *session, MDSGatherBuilder *gather); void flush_session(Session *session, MDSGatherBuilder *gather);
DecayCounter recall_throttle;
time last_recall_state;
}; };
#endif #endif

View File

@ -547,7 +547,7 @@ void SessionMapStore::decode_legacy(bufferlist::const_iterator& p)
while (n-- && !p.end()) { while (n-- && !p.end()) {
auto p2 = p; auto p2 = p;
Session *s = new Session(nullptr); Session *s = new Session(ConnectionRef());
s->info.decode(p); s->info.decode(p);
if (session_map.count(s->info.inst.name)) { if (session_map.count(s->info.inst.name)) {
// eager client connected too fast! aie. // eager client connected too fast! aie.
@ -855,11 +855,8 @@ size_t Session::get_request_count()
*/ */
void Session::notify_cap_release(size_t n_caps) void Session::notify_cap_release(size_t n_caps)
{ {
if (recalled_at != clock::zero()) { recall_caps.hit(-(double)n_caps);
recall_release_count += n_caps; release_caps.hit(n_caps);
if (recall_release_count >= recall_count)
clear_recalled_at();
}
} }
/** /**
@ -868,25 +865,26 @@ void Session::notify_cap_release(size_t n_caps)
* in order to generate health metrics if the session doesn't see * in order to generate health metrics if the session doesn't see
* a commensurate number of calls to ::notify_cap_release * a commensurate number of calls to ::notify_cap_release
*/ */
void Session::notify_recall_sent(const size_t new_limit) uint64_t Session::notify_recall_sent(size_t new_limit)
{ {
if (recalled_at == clock::zero()) { const auto num_caps = caps.size();
// Entering recall phase, set up counters so we can later ceph_assert(new_limit < num_caps); // Behaviour of Server::recall_client_state
// judge whether the client has respected the recall request const auto count = num_caps-new_limit;
recalled_at = last_recall_sent = clock::now(); uint64_t new_change;
assert (new_limit < caps.size()); // Behaviour of Server::recall_client_state if (recall_limit != new_limit) {
recall_count = caps.size() - new_limit; new_change = count;
recall_release_count = 0;
} else { } else {
last_recall_sent = clock::now(); new_change = 0; /* no change! */
} }
}
void Session::clear_recalled_at() /* Always hit the session counter as a RECALL message is still sent to the
{ * client and we do not want the MDS to burn its global counter tokens on a
recalled_at = last_recall_sent = clock::zero(); * session that is not releasing caps (i.e. allow the session counter to
recall_count = 0; * throttle future RECALL messages).
recall_release_count = 0; */
recall_caps_throttle.hit(count);
recall_caps.hit(count);
return new_change;
} }
/** /**
@ -980,25 +978,47 @@ void SessionMap::hit_session(Session *session) {
} }
void SessionMap::handle_conf_change(const ConfigProxy &conf, void SessionMap::handle_conf_change(const ConfigProxy &conf,
const std::set <std::string> &changed) { const std::set <std::string> &changed)
{
auto apply_to_open_sessions = [this](auto f) {
if (auto it = by_state.find(Session::STATE_OPEN); it != by_state.end()) {
for (const auto &session : *(it->second)) {
f(session);
}
}
if (auto it = by_state.find(Session::STATE_STALE); it != by_state.end()) {
for (const auto &session : *(it->second)) {
f(session);
}
}
};
if (changed.count("mds_request_load_average_decay_rate")) { if (changed.count("mds_request_load_average_decay_rate")) {
decay_rate = g_conf().get_val<double>("mds_request_load_average_decay_rate"); auto d = g_conf().get_val<double>("mds_request_load_average_decay_rate");
dout(20) << __func__ << " decay rate changed to " << decay_rate << dendl; dout(20) << __func__ << " decay rate changed to " << d << dendl;
total_load_avg = DecayCounter(decay_rate); decay_rate = d;
total_load_avg = DecayCounter(d);
auto p = by_state.find(Session::STATE_OPEN); auto mut = [d](auto s) {
if (p != by_state.end()) { s->set_load_avg_decay_rate(d);
for (const auto &session : *(p->second)) { };
session->set_load_avg_decay_rate(decay_rate); apply_to_open_sessions(mut);
} }
} if (changed.count("mds_recall_max_decay_rate")) {
p = by_state.find(Session::STATE_STALE); auto d = g_conf().get_val<double>("mds_recall_max_decay_rate");
if (p != by_state.end()) { auto mut = [d](auto s) {
for (const auto &session : *(p->second)) { s->recall_caps_throttle = DecayCounter(d);
session->set_load_avg_decay_rate(decay_rate); };
} apply_to_open_sessions(mut);
} }
if (changed.count("mds_recall_warning_decay_rate")) {
auto d = g_conf().get_val<double>("mds_recall_warning_decay_rate");
auto mut = [d](auto s) {
s->recall_caps = DecayCounter(d);
s->release_caps = DecayCounter(d);
};
apply_to_open_sessions(mut);
} }
} }

View File

@ -27,10 +27,10 @@ using std::set;
#include "mdstypes.h" #include "mdstypes.h"
#include "mds/MDSAuthCaps.h" #include "mds/MDSAuthCaps.h"
#include "common/perf_counters.h" #include "common/perf_counters.h"
#include "common/DecayCounter.h"
class CInode; class CInode;
struct MDRequestImpl; struct MDRequestImpl;
class DecayCounter;
#include "CInode.h" #include "CInode.h"
#include "Capability.h" #include "Capability.h"
@ -97,9 +97,9 @@ public:
} }
private: private:
int state; int state = STATE_CLOSED;
uint64_t state_seq; uint64_t state_seq = 0;
int importing_count; int importing_count = 0;
friend class SessionMap; friend class SessionMap;
// Human (friendly) name is soft state generated from client metadata // Human (friendly) name is soft state generated from client metadata
@ -113,6 +113,16 @@ private:
// request load average for this session // request load average for this session
DecayCounter load_avg; DecayCounter load_avg;
// Ephemeral state for tracking progress of capability recalls
// caps being recalled recently by this session; used for Beacon warnings
DecayCounter recall_caps;
// caps that have been released
DecayCounter release_caps;
// throttle on caps recalled
DecayCounter recall_caps_throttle;
// New limit in SESSION_RECALL
uint32_t recall_limit = 0;
// session start time -- used to track average session time // session start time -- used to track average session time
// note that this is initialized in the constructor rather // note that this is initialized in the constructor rather
// than at the time of adding a session to the sessionmap // than at the time of adding a session to the sessionmap
@ -153,12 +163,6 @@ public:
const std::string& get_human_name() const {return human_name;} const std::string& get_human_name() const {return human_name;}
// Ephemeral state for tracking progress of capability recalls
time recalled_at = clock::zero(); // When was I asked to SESSION_RECALL?
time last_recall_sent = clock::zero();
uint32_t recall_count; // How many caps was I asked to SESSION_RECALL?
uint32_t recall_release_count; // How many caps have I actually revoked?
session_info_t info; ///< durable bits session_info_t info; ///< durable bits
MDSAuthCaps auth_caps; MDSAuthCaps auth_caps;
@ -177,8 +181,16 @@ public:
interval_set<inodeno_t> pending_prealloc_inos; // journaling prealloc, will be added to prealloc_inos interval_set<inodeno_t> pending_prealloc_inos; // journaling prealloc, will be added to prealloc_inos
void notify_cap_release(size_t n_caps); void notify_cap_release(size_t n_caps);
void notify_recall_sent(const size_t new_limit); uint64_t notify_recall_sent(size_t new_limit);
void clear_recalled_at(); auto get_recall_caps_throttle() const {
return recall_caps_throttle.get();
}
auto get_recall_caps() const {
return recall_caps.get();
}
auto get_release_caps() const {
return release_caps.get();
}
inodeno_t next_ino() const { inodeno_t next_ino() const {
if (info.prealloc_inos.empty()) if (info.prealloc_inos.empty())
@ -249,8 +261,8 @@ public:
// -- caps -- // -- caps --
private: private:
uint32_t cap_gen; uint32_t cap_gen = 0;
version_t cap_push_seq; // cap push seq # version_t cap_push_seq = 0; // cap push seq #
map<version_t, MDSInternalContextBase::vec > waitfor_flush; // flush session messages map<version_t, MDSInternalContextBase::vec > waitfor_flush; // flush session messages
public: public:
@ -291,16 +303,16 @@ public:
} }
// -- leases -- // -- leases --
uint32_t lease_seq; uint32_t lease_seq = 0;
// -- completed requests -- // -- completed requests --
private: private:
// Has completed_requests been modified since the last time we // Has completed_requests been modified since the last time we
// wrote this session out? // wrote this session out?
bool completed_requests_dirty; bool completed_requests_dirty = false;
unsigned num_trim_flushes_warnings; unsigned num_trim_flushes_warnings = 0;
unsigned num_trim_requests_warnings; unsigned num_trim_requests_warnings = 0;
public: public:
void add_completed_request(ceph_tid_t t, inodeno_t created) { void add_completed_request(ceph_tid_t t, inodeno_t created) {
info.completed_requests[t] = created; info.completed_requests[t] = created;
@ -375,21 +387,17 @@ public:
int check_access(CInode *in, unsigned mask, int caller_uid, int caller_gid, int check_access(CInode *in, unsigned mask, int caller_uid, int caller_gid,
const vector<uint64_t> *gid_list, int new_uid, int new_gid); const vector<uint64_t> *gid_list, int new_uid, int new_gid);
Session() = delete;
Session(Connection *con) : Session(ConnectionRef con) :
state(STATE_CLOSED), state_seq(0), importing_count(0), recall_caps(g_conf().get_val<double>("mds_recall_warning_decay_rate")),
birth_time(clock::now()), recall_count(0), release_caps(g_conf().get_val<double>("mds_recall_warning_decay_rate")),
recall_release_count(0), auth_caps(g_ceph_context), recall_caps_throttle(g_conf().get_val<double>("mds_recall_max_decay_rate")),
connection(NULL), item_session_list(this), birth_time(clock::now()),
requests(0), // member_offset passed to front() manually auth_caps(g_ceph_context),
cap_gen(0), cap_push_seq(0), item_session_list(this),
lease_seq(0), requests(0) // member_offset passed to front() manually
completed_requests_dirty(false), {
num_trim_flushes_warnings(0), set_connection(std::move(con));
num_trim_requests_warnings(0) {
if (con) {
set_connection(con);
}
} }
~Session() override { ~Session() override {
if (state == STATE_CLOSED) { if (state == STATE_CLOSED) {
@ -400,9 +408,11 @@ public:
preopen_out_queue.clear(); preopen_out_queue.clear();
} }
void set_connection(Connection *con) { void set_connection(ConnectionRef con) {
connection = con; connection = std::move(con);
socket_addr = con->get_peer_socket_addr(); if (connection) {
socket_addr = connection->get_peer_socket_addr();
}
} }
const ConnectionRef& get_connection() const { const ConnectionRef& get_connection() const {
return connection; return connection;
@ -490,7 +500,7 @@ public:
if (session_map_entry != session_map.end()) { if (session_map_entry != session_map.end()) {
s = session_map_entry->second; s = session_map_entry->second;
} else { } else {
s = session_map[i.name] = new Session(nullptr); s = session_map[i.name] = new Session(ConnectionRef());
s->info.inst = i; s->info.inst = i;
s->last_cap_renew = Session::clock::now(); s->last_cap_renew = Session::clock::now();
if (logger) { if (logger) {
@ -522,17 +532,15 @@ public:
MDSRank *mds; MDSRank *mds;
protected: protected:
version_t projected, committing, committed; version_t projected = 0, committing = 0, committed = 0;
public: public:
map<int,xlist<Session*>* > by_state; map<int,xlist<Session*>* > by_state;
uint64_t set_state(Session *session, int state); uint64_t set_state(Session *session, int state);
map<version_t, MDSInternalContextBase::vec > commit_waiters; map<version_t, MDSInternalContextBase::vec > commit_waiters;
void update_average_session_age(); void update_average_session_age();
explicit SessionMap(MDSRank *m) : mds(m), SessionMap() = delete;
projected(0), committing(0), committed(0), explicit SessionMap(MDSRank *m) : mds(m) {}
loaded_legacy(false)
{ }
~SessionMap() override ~SessionMap() override
{ {
@ -626,12 +634,20 @@ public:
void dump(); void dump();
void get_client_session_set(set<Session*>& s) const { template<typename F>
for (ceph::unordered_map<entity_name_t,Session*>::const_iterator p = session_map.begin(); void get_client_sessions(F&& f) const {
p != session_map.end(); for (const auto& p : session_map) {
++p) auto& session = p.second;
if (p->second->info.inst.name.is_client()) if (session->info.inst.name.is_client())
s.insert(p->second); f(session);
}
}
template<typename C>
void get_client_session_set(C& c) const {
auto f = [&c](auto& s) {
c.insert(s);
};
get_client_sessions(f);
} }
void replay_open_sessions(map<client_t,entity_inst_t>& client_map, void replay_open_sessions(map<client_t,entity_inst_t>& client_map,
@ -695,7 +711,7 @@ public:
protected: protected:
std::set<entity_name_t> dirty_sessions; std::set<entity_name_t> dirty_sessions;
std::set<entity_name_t> null_sessions; std::set<entity_name_t> null_sessions;
bool loaded_legacy; bool loaded_legacy = false;
void _mark_dirty(Session *session); void _mark_dirty(Session *session);
public: public: