mirror of
https://github.com/ceph/ceph
synced 2025-01-02 17:12:31 +00:00
client: put MetaSession map container
Simplifies memory management and reduces fragmentation. Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
This commit is contained in:
parent
21564d264c
commit
4bf53ce082
@ -1504,9 +1504,9 @@ void Client::dump_mds_sessions(Formatter *f)
|
||||
{
|
||||
f->dump_int("id", get_nodeid().v);
|
||||
f->open_array_section("sessions");
|
||||
for (map<mds_rank_t,MetaSession*>::const_iterator p = mds_sessions.begin(); p != mds_sessions.end(); ++p) {
|
||||
for (const auto &p : mds_sessions) {
|
||||
f->open_object_section("session");
|
||||
p->second->dump(f);
|
||||
p.second.dump(f);
|
||||
f->close_section();
|
||||
}
|
||||
f->close_section();
|
||||
@ -1702,7 +1702,7 @@ int Client::make_request(MetaRequest *request,
|
||||
if (!have_open_session(mds))
|
||||
continue;
|
||||
} else {
|
||||
session = mds_sessions[mds];
|
||||
session = &mds_sessions.at(mds);
|
||||
}
|
||||
|
||||
// send request.
|
||||
@ -1903,27 +1903,26 @@ void Client::encode_cap_releases(MetaRequest *req, mds_rank_t mds)
|
||||
|
||||
bool Client::have_open_session(mds_rank_t mds)
|
||||
{
|
||||
return
|
||||
mds_sessions.count(mds) &&
|
||||
(mds_sessions[mds]->state == MetaSession::STATE_OPEN ||
|
||||
mds_sessions[mds]->state == MetaSession::STATE_STALE);
|
||||
const auto &it = mds_sessions.find(mds);
|
||||
return it != mds_sessions.end() &&
|
||||
(it->second.state == MetaSession::STATE_OPEN ||
|
||||
it->second.state == MetaSession::STATE_STALE);
|
||||
}
|
||||
|
||||
MetaSession *Client::_get_mds_session(mds_rank_t mds, Connection *con)
|
||||
{
|
||||
if (mds_sessions.count(mds) == 0)
|
||||
const auto &it = mds_sessions.find(mds);
|
||||
if (it == mds_sessions.end() || it->second.con != con) {
|
||||
return NULL;
|
||||
MetaSession *s = mds_sessions[mds];
|
||||
if (s->con != con)
|
||||
return NULL;
|
||||
return s;
|
||||
} else {
|
||||
return &it->second;
|
||||
}
|
||||
}
|
||||
|
||||
MetaSession *Client::_get_or_open_mds_session(mds_rank_t mds)
|
||||
{
|
||||
if (mds_sessions.count(mds))
|
||||
return mds_sessions[mds];
|
||||
return _open_mds_session(mds);
|
||||
auto it = mds_sessions.find(mds);
|
||||
return it == mds_sessions.end() ? _open_mds_session(mds) : &it->second;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1989,15 +1988,12 @@ void Client::update_metadata(std::string const &k, std::string const &v)
|
||||
MetaSession *Client::_open_mds_session(mds_rank_t mds)
|
||||
{
|
||||
ldout(cct, 10) << "_open_mds_session mds." << mds << dendl;
|
||||
assert(mds_sessions.count(mds) == 0);
|
||||
MetaSession *session = new MetaSession;
|
||||
session->mds_num = mds;
|
||||
session->seq = 0;
|
||||
session->inst = mdsmap->get_inst(mds);
|
||||
session->con = messenger->get_connection(session->inst);
|
||||
session->state = MetaSession::STATE_OPENING;
|
||||
session->mds_state = MDSMap::STATE_NULL;
|
||||
mds_sessions[mds] = session;
|
||||
entity_inst_t inst = mdsmap->get_inst(mds);
|
||||
auto em = mds_sessions.emplace(std::piecewise_construct,
|
||||
std::forward_as_tuple(mds),
|
||||
std::forward_as_tuple(mds, messenger->get_connection(inst), inst));
|
||||
assert(em.second); /* not already present */
|
||||
MetaSession *session = &em.first->second;
|
||||
|
||||
// Maybe skip sending a request to open if this MDS daemon
|
||||
// has previously sent us a REJECT.
|
||||
@ -2035,7 +2031,6 @@ void Client::_closed_mds_session(MetaSession *s)
|
||||
remove_session_caps(s);
|
||||
kick_requests_closed(s);
|
||||
mds_sessions.erase(s->mds_num);
|
||||
delete s;
|
||||
}
|
||||
|
||||
void Client::handle_client_session(MClientSession *m)
|
||||
@ -2106,8 +2101,8 @@ bool Client::_any_stale_sessions() const
|
||||
{
|
||||
assert(client_lock.is_locked_by_me());
|
||||
|
||||
for (const auto &i : mds_sessions) {
|
||||
if (i.second->state == MetaSession::STATE_STALE) {
|
||||
for (const auto &it : mds_sessions) {
|
||||
if (it.second.state == MetaSession::STATE_STALE) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@ -2119,12 +2114,11 @@ void Client::_kick_stale_sessions()
|
||||
{
|
||||
ldout(cct, 1) << "kick_stale_sessions" << dendl;
|
||||
|
||||
for (map<mds_rank_t,MetaSession*>::iterator p = mds_sessions.begin();
|
||||
p != mds_sessions.end(); ) {
|
||||
MetaSession *s = p->second;
|
||||
++p;
|
||||
if (s->state == MetaSession::STATE_STALE)
|
||||
_closed_mds_session(s);
|
||||
for (auto it = mds_sessions.begin(); it != mds_sessions.end(); ) {
|
||||
MetaSession &s = it->second;
|
||||
++it;
|
||||
if (s.state == MetaSession::STATE_STALE)
|
||||
_closed_mds_session(&s);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2437,8 +2431,8 @@ void Client::handle_osd_map(MOSDMap *m)
|
||||
// on the MDS side because the MDS will have seen the blacklist too.
|
||||
while(!mds_sessions.empty()) {
|
||||
auto i = mds_sessions.begin();
|
||||
auto session = i->second;
|
||||
_closed_mds_session(session);
|
||||
auto &session = i->second;
|
||||
_closed_mds_session(&session);
|
||||
}
|
||||
|
||||
// Since we know all our OSD ops will fail, cancel them all preemtively,
|
||||
@ -2630,10 +2624,9 @@ void Client::handle_mds_map(MMDSMap* m)
|
||||
}
|
||||
|
||||
// reset session
|
||||
for (map<mds_rank_t,MetaSession*>::iterator p = mds_sessions.begin();
|
||||
p != mds_sessions.end(); ) {
|
||||
for (auto p = mds_sessions.begin(); p != mds_sessions.end(); ) {
|
||||
mds_rank_t mds = p->first;
|
||||
MetaSession *session = p->second;
|
||||
MetaSession *session = &p->second;
|
||||
++p;
|
||||
|
||||
int oldstate = oldmap->get_state(mds);
|
||||
@ -2803,12 +2796,10 @@ void Client::resend_unsafe_requests(MetaSession *session)
|
||||
void Client::wait_unsafe_requests()
|
||||
{
|
||||
list<MetaRequest*> last_unsafe_reqs;
|
||||
for (map<mds_rank_t,MetaSession*>::iterator p = mds_sessions.begin();
|
||||
p != mds_sessions.end();
|
||||
++p) {
|
||||
MetaSession *s = p->second;
|
||||
if (!s->unsafe_requests.empty()) {
|
||||
MetaRequest *req = s->unsafe_requests.back();
|
||||
for (const auto &p : mds_sessions) {
|
||||
const MetaSession &s = p.second;
|
||||
if (!s.unsafe_requests.empty()) {
|
||||
MetaRequest *req = s.unsafe_requests.back();
|
||||
req->get();
|
||||
last_unsafe_reqs.push_back(req);
|
||||
}
|
||||
@ -3426,8 +3417,7 @@ void Client::check_caps(Inode *in, unsigned flags)
|
||||
Cap *cap = it->second;
|
||||
++it;
|
||||
|
||||
MetaSession *session = mds_sessions[mds];
|
||||
assert(session);
|
||||
MetaSession *session = &mds_sessions.at(mds);
|
||||
|
||||
cap_used = used;
|
||||
if (in->auth_cap && cap != in->auth_cap)
|
||||
@ -4257,15 +4247,13 @@ void Client::wait_sync_caps(ceph_tid_t want)
|
||||
retry:
|
||||
ldout(cct, 10) << "wait_sync_caps want " << want << " (last is " << last_flush_tid << ", "
|
||||
<< num_flushing_caps << " total flushing)" << dendl;
|
||||
for (map<mds_rank_t,MetaSession*>::iterator p = mds_sessions.begin();
|
||||
p != mds_sessions.end();
|
||||
++p) {
|
||||
MetaSession *s = p->second;
|
||||
for (auto &p : mds_sessions) {
|
||||
MetaSession *s = &p.second;
|
||||
if (s->flushing_caps_tids.empty())
|
||||
continue;
|
||||
ceph_tid_t oldest_tid = *s->flushing_caps_tids.begin();
|
||||
if (oldest_tid <= want) {
|
||||
ldout(cct, 10) << " waiting on mds." << p->first << " tid " << oldest_tid
|
||||
ldout(cct, 10) << " waiting on mds." << p.first << " tid " << oldest_tid
|
||||
<< " (want " << want << ")" << dendl;
|
||||
sync_cond.Wait(client_lock);
|
||||
goto retry;
|
||||
@ -5786,11 +5774,9 @@ void Client::_close_sessions()
|
||||
{
|
||||
while (!mds_sessions.empty()) {
|
||||
// send session closes!
|
||||
for (map<mds_rank_t,MetaSession*>::iterator p = mds_sessions.begin();
|
||||
p != mds_sessions.end();
|
||||
++p) {
|
||||
if (p->second->state != MetaSession::STATE_CLOSING) {
|
||||
_close_mds_session(p->second);
|
||||
for (auto &p : mds_sessions) {
|
||||
if (p.second.state != MetaSession::STATE_CLOSING) {
|
||||
_close_mds_session(&p.second);
|
||||
}
|
||||
}
|
||||
|
||||
@ -5804,11 +5790,8 @@ void Client::flush_mdlog_sync()
|
||||
{
|
||||
if (mds_requests.empty())
|
||||
return;
|
||||
for (map<mds_rank_t,MetaSession*>::iterator p = mds_sessions.begin();
|
||||
p != mds_sessions.end();
|
||||
++p) {
|
||||
MetaSession *s = p->second;
|
||||
flush_mdlog(s);
|
||||
for (auto &it : mds_sessions) {
|
||||
flush_mdlog(&it.second);
|
||||
}
|
||||
}
|
||||
|
||||
@ -5950,17 +5933,16 @@ void Client::unmount()
|
||||
void Client::flush_cap_releases()
|
||||
{
|
||||
// send any cap releases
|
||||
for (map<mds_rank_t,MetaSession*>::iterator p = mds_sessions.begin();
|
||||
p != mds_sessions.end();
|
||||
++p) {
|
||||
if (p->second->release && mdsmap->is_clientreplay_or_active_or_stopping(
|
||||
p->first)) {
|
||||
for (auto &p : mds_sessions) {
|
||||
auto &session = p.second;
|
||||
if (session.release && mdsmap->is_clientreplay_or_active_or_stopping(
|
||||
p.first)) {
|
||||
if (cct->_conf->client_inject_release_failure) {
|
||||
ldout(cct, 20) << __func__ << " injecting failure to send cap release message" << dendl;
|
||||
} else {
|
||||
p->second->con->send_message(p->second->release);
|
||||
session.con->send_message(session.release);
|
||||
}
|
||||
p->second->release.reset();
|
||||
session.release.reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -5992,10 +5974,9 @@ void Client::tick()
|
||||
req->caller_cond->Signal();
|
||||
}
|
||||
signal_cond_list(waiting_for_mdsmap);
|
||||
for (map<mds_rank_t,MetaSession*>::iterator p = mds_sessions.begin();
|
||||
p != mds_sessions.end();
|
||||
++p)
|
||||
signal_context_list(p->second->waiting_for_open);
|
||||
for (auto &p : mds_sessions) {
|
||||
signal_context_list(p.second.waiting_for_open);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -6028,12 +6009,10 @@ void Client::renew_caps()
|
||||
ldout(cct, 10) << "renew_caps()" << dendl;
|
||||
last_cap_renew = ceph_clock_now();
|
||||
|
||||
for (map<mds_rank_t,MetaSession*>::iterator p = mds_sessions.begin();
|
||||
p != mds_sessions.end();
|
||||
++p) {
|
||||
ldout(cct, 15) << "renew_caps requesting from mds." << p->first << dendl;
|
||||
if (mdsmap->get_state(p->first) >= MDSMap::STATE_REJOIN)
|
||||
renew_caps(p->second);
|
||||
for (auto &p : mds_sessions) {
|
||||
ldout(cct, 15) << "renew_caps requesting from mds." << p.first << dendl;
|
||||
if (mdsmap->get_state(p.first) >= MDSMap::STATE_REJOIN)
|
||||
renew_caps(&p.second);
|
||||
}
|
||||
}
|
||||
|
||||
@ -6119,15 +6098,15 @@ int Client::_lookup(Inode *dir, const string& dname, int mask, InodeRef *target,
|
||||
if (dn->lease_mds >= 0 &&
|
||||
dn->lease_ttl > now &&
|
||||
mds_sessions.count(dn->lease_mds)) {
|
||||
MetaSession *s = mds_sessions[dn->lease_mds];
|
||||
if (s->cap_ttl > now &&
|
||||
s->cap_gen == dn->lease_gen) {
|
||||
MetaSession &s = mds_sessions.at(dn->lease_mds);
|
||||
if (s.cap_ttl > now &&
|
||||
s.cap_gen == dn->lease_gen) {
|
||||
// touch this mds's dir cap too, even though we don't _explicitly_ use it here, to
|
||||
// make trim_caps() behave.
|
||||
dir->try_touch_cap(dn->lease_mds);
|
||||
goto hit_dn;
|
||||
}
|
||||
ldout(cct, 20) << " bad lease, cap_ttl " << s->cap_ttl << ", cap_gen " << s->cap_gen
|
||||
ldout(cct, 20) << " bad lease, cap_ttl " << s.cap_ttl << ", cap_gen " << s.cap_gen
|
||||
<< " vs lease_gen " << dn->lease_gen << dendl;
|
||||
}
|
||||
// dir lease?
|
||||
@ -6187,9 +6166,9 @@ int Client::get_or_create(Inode *dir, const char* name,
|
||||
dn->lease_mds >= 0 &&
|
||||
dn->lease_ttl > now &&
|
||||
mds_sessions.count(dn->lease_mds)) {
|
||||
MetaSession *s = mds_sessions[dn->lease_mds];
|
||||
if (s->cap_ttl > now &&
|
||||
s->cap_gen == dn->lease_gen) {
|
||||
MetaSession &s = mds_sessions.at(dn->lease_mds);
|
||||
if (s.cap_ttl > now &&
|
||||
s.cap_gen == dn->lease_gen) {
|
||||
if (expect_null)
|
||||
return -EEXIST;
|
||||
}
|
||||
@ -13315,12 +13294,10 @@ void Client::ms_handle_remote_reset(Connection *con)
|
||||
// kludge to figure out which mds this is; fixme with a Connection* state
|
||||
mds_rank_t mds = MDS_RANK_NONE;
|
||||
MetaSession *s = NULL;
|
||||
for (map<mds_rank_t,MetaSession*>::iterator p = mds_sessions.begin();
|
||||
p != mds_sessions.end();
|
||||
++p) {
|
||||
if (mdsmap->get_addr(p->first) == con->get_peer_addr()) {
|
||||
mds = p->first;
|
||||
s = p->second;
|
||||
for (auto &p : mds_sessions) {
|
||||
if (mdsmap->get_addr(p.first) == con->get_peer_addr()) {
|
||||
mds = p.first;
|
||||
s = &p.second;
|
||||
}
|
||||
}
|
||||
if (mds >= 0) {
|
||||
|
@ -16,39 +16,37 @@
|
||||
#ifndef CEPH_CLIENT_H
|
||||
#define CEPH_CLIENT_H
|
||||
|
||||
#include "include/types.h"
|
||||
|
||||
// stl
|
||||
#include <string>
|
||||
#include <memory>
|
||||
#include <set>
|
||||
#include <map>
|
||||
#include <fstream>
|
||||
using std::set;
|
||||
using std::map;
|
||||
using std::fstream;
|
||||
|
||||
#include "include/unordered_set.h"
|
||||
#include "include/unordered_map.h"
|
||||
#include "common/CommandTable.h"
|
||||
#include "common/Finisher.h"
|
||||
#include "common/Mutex.h"
|
||||
#include "common/Timer.h"
|
||||
#include "common/cmdparse.h"
|
||||
#include "common/compiler_extensions.h"
|
||||
#include "include/cephfs/ceph_statx.h"
|
||||
#include "include/filepath.h"
|
||||
#include "include/interval_set.h"
|
||||
#include "include/lru.h"
|
||||
#include "include/types.h"
|
||||
#include "include/unordered_map.h"
|
||||
#include "include/unordered_set.h"
|
||||
#include "mds/mdstypes.h"
|
||||
#include "msg/Dispatcher.h"
|
||||
#include "msg/Messenger.h"
|
||||
|
||||
#include "common/Mutex.h"
|
||||
#include "common/Timer.h"
|
||||
#include "common/Finisher.h"
|
||||
#include "common/compiler_extensions.h"
|
||||
#include "common/cmdparse.h"
|
||||
#include "common/CommandTable.h"
|
||||
|
||||
#include "osdc/ObjectCacher.h"
|
||||
|
||||
#include "InodeRef.h"
|
||||
#include "MetaSession.h"
|
||||
#include "UserPerm.h"
|
||||
#include "include/cephfs/ceph_statx.h"
|
||||
|
||||
#include <fstream>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <set>
|
||||
#include <string>
|
||||
|
||||
using std::set;
|
||||
using std::map;
|
||||
using std::fstream;
|
||||
|
||||
class FSMap;
|
||||
class FSMapUser;
|
||||
@ -123,7 +121,6 @@ struct SnapRealm;
|
||||
struct Fh;
|
||||
struct CapSnap;
|
||||
|
||||
struct MetaSession;
|
||||
struct MetaRequest;
|
||||
class ceph_lock_state_t;
|
||||
|
||||
@ -316,7 +313,7 @@ protected:
|
||||
epoch_t cap_epoch_barrier;
|
||||
|
||||
// mds sessions
|
||||
map<mds_rank_t, MetaSession*> mds_sessions; // mds -> push seq
|
||||
map<mds_rank_t, MetaSession> mds_sessions; // mds -> push seq
|
||||
list<Cond*> waiting_for_mdsmap;
|
||||
|
||||
// FSMap, for when using mds_command
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include "include/types.h"
|
||||
#include "include/utime.h"
|
||||
#include "include/xlist.h"
|
||||
#include "mds/MDSMap.h"
|
||||
#include "mds/mdstypes.h"
|
||||
#include "messages/MClientCapRelease.h"
|
||||
|
||||
@ -26,7 +27,7 @@ struct MetaSession {
|
||||
entity_inst_t inst;
|
||||
|
||||
enum {
|
||||
STATE_NEW,
|
||||
STATE_NEW, // Unused
|
||||
STATE_OPENING,
|
||||
STATE_OPEN,
|
||||
STATE_CLOSING,
|
||||
@ -47,11 +48,11 @@ struct MetaSession {
|
||||
std::set<Inode*> early_flushing_caps;
|
||||
|
||||
boost::intrusive_ptr<MClientCapRelease> release;
|
||||
|
||||
MetaSession()
|
||||
: mds_num(-1), con(NULL),
|
||||
seq(0), cap_gen(0), cap_renew_seq(0), num_caps(0),
|
||||
state(STATE_NEW), mds_state(0), readonly(false)
|
||||
|
||||
MetaSession(mds_rank_t mds_num, ConnectionRef con, entity_inst_t inst)
|
||||
: mds_num(mds_num), con(con),
|
||||
seq(0), cap_gen(0), cap_renew_seq(0), num_caps(0), inst(inst),
|
||||
state(STATE_OPENING), mds_state(MDSMap::STATE_NULL), readonly(false)
|
||||
{}
|
||||
|
||||
const char *get_state_name() const;
|
||||
|
Loading…
Reference in New Issue
Block a user