mirror of
https://github.com/ceph/ceph
synced 2025-03-11 02:39:05 +00:00
Merge branch 'caps' of ssh://ceph.newdream.net/home/sage/ceph.newdream.net/git/ceph into caps
This commit is contained in:
commit
4ea03afae5
@ -787,6 +787,12 @@ void Client::handle_client_session(MClientSession *m)
|
||||
last_cap_renew = g_clock.now();
|
||||
break;
|
||||
|
||||
case CEPH_SESSION_STALE:
|
||||
// hmm, verify caps have been revoked?
|
||||
messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_RESUME, g_clock.now()),
|
||||
m->get_source_inst());
|
||||
break;
|
||||
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
@ -1586,8 +1592,7 @@ int Client::unmount()
|
||||
p != mds_sessions.end();
|
||||
++p) {
|
||||
dout(2) << "sending client_session close to mds" << p->first << " seq " << p->second << dendl;
|
||||
messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE,
|
||||
p->second),
|
||||
messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE, p->second),
|
||||
mdsmap->get_inst(p->first));
|
||||
}
|
||||
|
||||
@ -1654,7 +1659,7 @@ void Client::renew_caps()
|
||||
p++) {
|
||||
dout(15) << "renew_caps requesting from mds" << p->first << dendl;
|
||||
messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_RENEWCAPS),
|
||||
mdsmap->get_inst(p->first));
|
||||
mdsmap->get_inst(p->first));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -224,8 +224,8 @@ md_config_t g_conf = {
|
||||
mds_beacon_interval: 4, //30.0,
|
||||
mds_beacon_grace: 15, //60*60.0,
|
||||
|
||||
mds_cap_timeout: 60, // cap bits time out if client idle
|
||||
mds_session_autoclose: 300, // autoclose idle session
|
||||
mds_cap_timeout: 10, // cap bits time out if client idle
|
||||
mds_session_autoclose: 30, // autoclose idle session
|
||||
|
||||
mds_log: true,
|
||||
mds_log_max_events: -1, //MDS_CACHE_SIZE / 3,
|
||||
|
@ -285,7 +285,10 @@ enum {
|
||||
CEPH_SESSION_REQUEST_CLOSE,
|
||||
CEPH_SESSION_CLOSE,
|
||||
CEPH_SESSION_REQUEST_RENEWCAPS,
|
||||
CEPH_SESSION_RENEWCAPS
|
||||
CEPH_SESSION_RENEWCAPS,
|
||||
CEPH_SESSION_STALE, // caps not renewed.
|
||||
CEPH_SESSION_REQUEST_RESUME,
|
||||
CEPH_SESSION_RESUME
|
||||
};
|
||||
|
||||
/* client_request */
|
||||
|
@ -27,6 +27,10 @@ public:
|
||||
~item() {
|
||||
remove_myself();
|
||||
}
|
||||
// no copying!
|
||||
item(const item& other);
|
||||
const item& operator= (const item& right);
|
||||
|
||||
|
||||
xlist* get_xlist() { return _list; }
|
||||
void remove_myself() {
|
||||
|
@ -78,7 +78,7 @@ ostream& operator<<(ostream& out, CInode& in)
|
||||
// hack: spit out crap on which clients have caps
|
||||
if (!in.get_client_caps().empty()) {
|
||||
out << " caps={";
|
||||
for (map<int,Capability>::iterator it = in.get_client_caps().begin();
|
||||
for (map<int,Capability*>::iterator it = in.get_client_caps().begin();
|
||||
it != in.get_client_caps().end();
|
||||
it++) {
|
||||
if (it != in.get_client_caps().begin()) out << ",";
|
||||
|
@ -191,7 +191,7 @@ public:
|
||||
// -- distributed state --
|
||||
protected:
|
||||
// file capabilities
|
||||
map<int, Capability> client_caps; // client -> caps
|
||||
map<int, Capability*> client_caps; // client -> caps
|
||||
map<int, int> mds_caps_wanted; // [auth] mds -> caps wanted
|
||||
int replica_caps_wanted; // [replica] what i've requested from auth
|
||||
utime_t replica_caps_wanted_keep_until;
|
||||
@ -342,23 +342,24 @@ public:
|
||||
// -- caps -- (new)
|
||||
// client caps
|
||||
bool is_any_caps() { return !client_caps.empty(); }
|
||||
map<int,Capability>& get_client_caps() { return client_caps; }
|
||||
map<int,Capability*>& get_client_caps() { return client_caps; }
|
||||
Capability *get_client_cap(int client) {
|
||||
if (client_caps.count(client))
|
||||
return &client_caps[client];
|
||||
return client_caps[client];
|
||||
return 0;
|
||||
}
|
||||
Capability *add_client_cap(int client, CInode *in, xlist<Capability*>& cls) {
|
||||
if (client_caps.empty())
|
||||
get(PIN_CAPS);
|
||||
assert(client_caps.count(client) == 0);
|
||||
Capability *cap = &client_caps[client];
|
||||
Capability *cap = client_caps[client] = new Capability;
|
||||
cap->set_inode(in);
|
||||
cap->add_to_cap_list(cls);
|
||||
return cap;
|
||||
}
|
||||
void remove_client_cap(int client) {
|
||||
assert(client_caps.count(client) == 1);
|
||||
delete client_caps[client];
|
||||
client_caps.erase(client);
|
||||
if (client_caps.empty())
|
||||
put(PIN_CAPS);
|
||||
@ -376,59 +377,33 @@ public:
|
||||
inode.mtime = MAX(inode.mtime, icr.mtime);
|
||||
inode.atime = MAX(inode.atime, icr.atime);
|
||||
}
|
||||
/*
|
||||
void set_client_caps(map<int,Capability>& cl) {
|
||||
if (client_caps.empty() && !cl.empty())
|
||||
get(PIN_CAPS);
|
||||
client_caps.clear();
|
||||
client_caps = cl;
|
||||
}
|
||||
*/
|
||||
void clear_client_caps() {
|
||||
if (!client_caps.empty())
|
||||
put(PIN_CAPS);
|
||||
client_caps.clear();
|
||||
while (!client_caps.empty())
|
||||
remove_client_cap(client_caps.begin()->first);
|
||||
}
|
||||
void export_client_caps(map<int,Capability::Export>& cl) {
|
||||
for (map<int,Capability>::iterator it = client_caps.begin();
|
||||
for (map<int,Capability*>::iterator it = client_caps.begin();
|
||||
it != client_caps.end();
|
||||
it++) {
|
||||
cl[it->first] = it->second.make_export();
|
||||
cl[it->first] = it->second->make_export();
|
||||
}
|
||||
}
|
||||
void merge_client_caps(map<int,Capability::Export>& cl, set<int>& new_client_caps) {
|
||||
if (client_caps.empty() && !cl.empty())
|
||||
get(PIN_CAPS);
|
||||
|
||||
for (map<int,Capability::Export>::iterator it = cl.begin();
|
||||
it != cl.end();
|
||||
it++) {
|
||||
new_client_caps.insert(it->first);
|
||||
if (client_caps.count(it->first)) {
|
||||
// merge
|
||||
client_caps[it->first].merge(it->second);
|
||||
} else {
|
||||
// new
|
||||
client_caps[it->first] = Capability(this, it->second);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// caps issued, wanted
|
||||
int get_caps_issued() {
|
||||
int c = 0;
|
||||
for (map<int,Capability>::iterator it = client_caps.begin();
|
||||
for (map<int,Capability*>::iterator it = client_caps.begin();
|
||||
it != client_caps.end();
|
||||
it++)
|
||||
c |= it->second.issued();
|
||||
c |= it->second->issued();
|
||||
return c;
|
||||
}
|
||||
int get_caps_wanted() {
|
||||
int w = 0;
|
||||
for (map<int,Capability>::iterator it = client_caps.begin();
|
||||
for (map<int,Capability*>::iterator it = client_caps.begin();
|
||||
it != client_caps.end();
|
||||
it++) {
|
||||
w |= it->second.wanted();
|
||||
w |= it->second->wanted();
|
||||
//cout << " get_caps_wanted client " << it->first << " " << cap_string(it->second.wanted()) << endl;
|
||||
}
|
||||
if (is_auth())
|
||||
|
@ -76,17 +76,6 @@ public:
|
||||
suppress(false), stale(false),
|
||||
session_caps_item(this) {
|
||||
}
|
||||
Capability(CInode *i, Export& other) :
|
||||
inode(i),
|
||||
wanted_caps(other.wanted),
|
||||
last_sent(0), last_recv(0),
|
||||
suppress(false), stale(false),
|
||||
session_caps_item(this) {
|
||||
// issued vs pending
|
||||
if (other.issued & ~other.pending)
|
||||
issue(other.issued);
|
||||
issue(other.pending);
|
||||
}
|
||||
|
||||
bool is_suppress() { return suppress; }
|
||||
void set_suppress(bool b) { suppress = b; }
|
||||
|
@ -524,16 +524,17 @@ bool Locker::issue_caps(CInode *in)
|
||||
int nissued = 0;
|
||||
|
||||
// client caps
|
||||
for (map<int, Capability>::iterator it = in->client_caps.begin();
|
||||
for (map<int, Capability*>::iterator it = in->client_caps.begin();
|
||||
it != in->client_caps.end();
|
||||
it++) {
|
||||
if (it->second.pending() != (it->second.wanted() & allowed)) {
|
||||
Capability *cap = it->second;
|
||||
if (cap->pending() != (cap->wanted() & allowed)) {
|
||||
// issue
|
||||
nissued++;
|
||||
|
||||
int before = it->second.pending();
|
||||
long seq = it->second.issue(it->second.wanted() & allowed);
|
||||
int after = it->second.pending();
|
||||
int before = cap->pending();
|
||||
long seq = cap->issue(cap->wanted() & allowed);
|
||||
int after = cap->pending();
|
||||
|
||||
// twiddle file_data_version?
|
||||
if (!(before & CEPH_CAP_WRBUFFER) &&
|
||||
@ -543,13 +544,15 @@ bool Locker::issue_caps(CInode *in)
|
||||
}
|
||||
|
||||
if (seq > 0 &&
|
||||
!it->second.is_suppress()) {
|
||||
dout(7) << " sending MClientFileCaps to client" << it->first << " seq " << it->second.get_last_seq() << " new pending " << cap_string(it->second.pending()) << " was " << cap_string(before) << dendl;
|
||||
!cap->is_suppress()) {
|
||||
dout(7) << " sending MClientFileCaps to client" << it->first << " seq " << cap->get_last_seq()
|
||||
<< " new pending " << cap_string(cap->pending()) << " was " << cap_string(before)
|
||||
<< dendl;
|
||||
mds->send_message_client(new MClientFileCaps(MClientFileCaps::OP_GRANT,
|
||||
in->inode,
|
||||
it->second.get_last_seq(),
|
||||
it->second.pending(),
|
||||
it->second.wanted()),
|
||||
cap->get_last_seq(),
|
||||
cap->pending(),
|
||||
cap->wanted()),
|
||||
it->first);
|
||||
}
|
||||
}
|
||||
@ -560,7 +563,7 @@ bool Locker::issue_caps(CInode *in)
|
||||
|
||||
void Locker::revoke_stale_caps(Session *session)
|
||||
{
|
||||
dout(10) << "revoke_stale_caps for client " << session->inst.name << dendl;
|
||||
dout(10) << "revoke_stale_caps for " << session->inst.name << dendl;
|
||||
|
||||
for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
|
||||
Capability *cap = *p;
|
||||
@ -574,7 +577,22 @@ void Locker::revoke_stale_caps(Session *session)
|
||||
dout(10) << " nothing issued on " << *in << dendl;
|
||||
}
|
||||
cap->set_stale(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Locker::resume_stale_caps(Session *session)
|
||||
{
|
||||
dout(10) << "resume_stale_caps for " << session->inst.name << dendl;
|
||||
|
||||
for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
|
||||
Capability *cap = *p;
|
||||
CInode *in = cap->get_inode();
|
||||
if (cap->is_stale()) {
|
||||
dout(10) << " clearing stale flag on " << *in << dendl;
|
||||
cap->set_stale(false);
|
||||
file_eval(&in->filelock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class C_MDL_RequestInodeFileCaps : public Context {
|
||||
|
@ -182,6 +182,7 @@ protected:
|
||||
Capability* issue_new_caps(CInode *in, int mode, Session *session);
|
||||
bool issue_caps(CInode *in);
|
||||
void revoke_stale_caps(Session *session);
|
||||
void resume_stale_caps(Session *session);
|
||||
|
||||
protected:
|
||||
void handle_client_file_caps(class MClientFileCaps *m);
|
||||
|
@ -1082,7 +1082,7 @@ void MDCache::send_resolve_now(int who)
|
||||
}
|
||||
// [resolving]
|
||||
if (uncommitted_slave_updates.count(who)) {
|
||||
for (map<metareqid_t, MDSlaveUpdate>::iterator p = uncommitted_slave_updates[who].begin();
|
||||
for (map<metareqid_t, MDSlaveUpdate*>::iterator p = uncommitted_slave_updates[who].begin();
|
||||
p != uncommitted_slave_updates[who].end();
|
||||
++p) {
|
||||
dout(10) << " including uncommitted " << p->first << dendl;
|
||||
@ -1388,7 +1388,8 @@ void MDCache::handle_resolve_ack(MMDSResolveAck *ack)
|
||||
if (mds->is_resolve()) {
|
||||
// replay
|
||||
assert(uncommitted_slave_updates[from].count(*p));
|
||||
uncommitted_slave_updates[from][*p].commit.replay(mds);
|
||||
uncommitted_slave_updates[from][*p]->commit.replay(mds);
|
||||
delete uncommitted_slave_updates[from][*p];
|
||||
uncommitted_slave_updates[from].erase(*p);
|
||||
// log commit
|
||||
mds->mdlog->submit_entry(new ESlaveUpdate(mds->mdlog, "unknown", *p, from, ESlaveUpdate::OP_COMMIT));
|
||||
@ -1406,7 +1407,8 @@ void MDCache::handle_resolve_ack(MMDSResolveAck *ack)
|
||||
|
||||
if (mds->is_resolve()) {
|
||||
assert(uncommitted_slave_updates[from].count(*p));
|
||||
uncommitted_slave_updates[from][*p].rollback.replay(mds);
|
||||
uncommitted_slave_updates[from][*p]->rollback.replay(mds);
|
||||
delete uncommitted_slave_updates[from][*p];
|
||||
uncommitted_slave_updates[from].erase(*p);
|
||||
mds->mdlog->submit_entry(new ESlaveUpdate(mds->mdlog, "unknown", *p, from, ESlaveUpdate::OP_ROLLBACK));
|
||||
} else {
|
||||
@ -2659,11 +2661,13 @@ void MDCache::rejoin_import_cap(CInode *in, int client, inode_caps_reconnect_t&
|
||||
in->reconnect_cap(client, icr, session->caps);
|
||||
|
||||
// send REAP
|
||||
Capability *cap = in->get_client_cap(client);
|
||||
assert(cap); // ?
|
||||
MClientFileCaps *reap = new MClientFileCaps(MClientFileCaps::OP_IMPORT,
|
||||
in->inode,
|
||||
in->client_caps[client].get_last_seq(),
|
||||
in->client_caps[client].pending(),
|
||||
in->client_caps[client].wanted());
|
||||
cap->get_last_seq(),
|
||||
cap->pending(),
|
||||
cap->wanted());
|
||||
reap->set_mds(frommds); // reap from whom?
|
||||
mds->messenger->send_message(reap, session->inst);
|
||||
}
|
||||
|
@ -386,7 +386,7 @@ protected:
|
||||
// from MMDSResolves
|
||||
map<int, map<dirfrag_t, list<dirfrag_t> > > other_ambiguous_imports;
|
||||
|
||||
map<int, map<metareqid_t, MDSlaveUpdate> > uncommitted_slave_updates; // for replay.
|
||||
map<int, map<metareqid_t, MDSlaveUpdate*> > uncommitted_slave_updates; // for replay.
|
||||
map<metareqid_t, bool> ambiguous_slave_updates; // for log trimming.
|
||||
map<metareqid_t, Context*> waiting_for_slave_update_commit;
|
||||
friend class ESlaveUpdate;
|
||||
|
@ -882,7 +882,7 @@ void Migrator::encode_export_inode_caps(CInode *in, bufferlist& bl,
|
||||
in->state_set(CInode::STATE_EXPORTINGCAPS);
|
||||
|
||||
// make note of clients named by exported capabilities
|
||||
for (map<int, Capability>::iterator it = in->client_caps.begin();
|
||||
for (map<int, Capability*>::iterator it = in->client_caps.begin();
|
||||
it != in->client_caps.end();
|
||||
it++)
|
||||
exported_client_map[it->first] = mds->sessionmap.get_inst(entity_name_t::CLIENT(it->first));
|
||||
@ -893,16 +893,17 @@ void Migrator::finish_export_inode_caps(CInode *in)
|
||||
in->state_clear(CInode::STATE_EXPORTINGCAPS);
|
||||
|
||||
// tell (all) clients about migrating caps..
|
||||
for (map<int, Capability>::iterator it = in->client_caps.begin();
|
||||
for (map<int, Capability*>::iterator it = in->client_caps.begin();
|
||||
it != in->client_caps.end();
|
||||
it++) {
|
||||
Capability *cap = it->second;
|
||||
dout(7) << "finish_export_inode telling client" << it->first
|
||||
<< " exported caps on " << *in << dendl;
|
||||
MClientFileCaps *m = new MClientFileCaps(MClientFileCaps::OP_EXPORT,
|
||||
in->inode,
|
||||
it->second.get_last_seq(),
|
||||
it->second.pending(),
|
||||
it->second.wanted());
|
||||
cap->get_last_seq(),
|
||||
cap->pending(),
|
||||
cap->wanted());
|
||||
mds->send_message_client(m, it->first);
|
||||
}
|
||||
in->clear_client_caps();
|
||||
@ -2034,23 +2035,29 @@ void Migrator::finish_import_inode_caps(CInode *in, int from,
|
||||
map<int,Capability::Export> &cap_map)
|
||||
{
|
||||
assert(!cap_map.empty());
|
||||
|
||||
set<int> new_caps;
|
||||
in->merge_client_caps(cap_map, new_caps);
|
||||
in->put(CInode::PIN_IMPORTINGCAPS);
|
||||
|
||||
for (set<int>::iterator it = new_caps.begin();
|
||||
it != new_caps.end();
|
||||
for (map<int,Capability::Export>::iterator it = cap_map.begin();
|
||||
it != cap_map.end();
|
||||
it++) {
|
||||
dout(0) << "finish_import_inode_caps for client" << *it << " on " << *in << dendl;
|
||||
dout(0) << "finish_import_inode_caps for client" << it->first << " on " << *in << dendl;
|
||||
Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(it->first));
|
||||
assert(session);
|
||||
|
||||
Capability *cap = in->get_client_cap(it->first);
|
||||
if (!cap)
|
||||
cap = in->add_client_cap(it->first, in, session->caps);
|
||||
cap->merge(it->second);
|
||||
|
||||
MClientFileCaps *caps = new MClientFileCaps(MClientFileCaps::OP_IMPORT,
|
||||
in->inode,
|
||||
in->client_caps[*it].get_last_seq(),
|
||||
in->client_caps[*it].pending(),
|
||||
in->client_caps[*it].wanted());
|
||||
cap->get_last_seq(),
|
||||
cap->pending(),
|
||||
cap->wanted());
|
||||
caps->set_mds(from); // from whom?
|
||||
mds->send_message_client(caps, *it);
|
||||
mds->send_message_client(caps, session->inst);
|
||||
}
|
||||
|
||||
in->put(CInode::PIN_IMPORTINGCAPS);
|
||||
}
|
||||
|
||||
int Migrator::decode_import_dir(bufferlist::iterator& blp,
|
||||
|
@ -102,6 +102,7 @@ void Server::dispatch(Message *m)
|
||||
switch (m->get_type()) {
|
||||
case CEPH_MSG_CLIENT_SESSION:
|
||||
handle_client_session((MClientSession*)m);
|
||||
delete m;
|
||||
return;
|
||||
case CEPH_MSG_CLIENT_REQUEST:
|
||||
handle_client_request((MClientRequest*)m);
|
||||
@ -137,72 +138,79 @@ public:
|
||||
|
||||
void Server::handle_client_session(MClientSession *m)
|
||||
{
|
||||
dout(3) << "handle_client_session " << *m << " from " << m->get_source() << dendl;
|
||||
bool open = false;
|
||||
version_t pv;
|
||||
Session *session = mds->sessionmap.get_session(m->get_source());
|
||||
|
||||
dout(3) << "handle_client_session " << *m << " from " << m->get_source() << dendl;
|
||||
assert(m->get_source().is_client()); // should _not_ come from an mds!
|
||||
|
||||
switch (m->op) {
|
||||
|
||||
case CEPH_SESSION_REQUEST_RENEWCAPS:
|
||||
if (!session) {
|
||||
dout(10) << "dne, dropping this req" << dendl;
|
||||
delete m;
|
||||
return;
|
||||
}
|
||||
mds->sessionmap.touch_session(session);
|
||||
mds->messenger->send_message(new MClientSession(CEPH_SESSION_RENEWCAPS), session->inst);
|
||||
delete m;
|
||||
return;
|
||||
|
||||
case CEPH_SESSION_REQUEST_OPEN:
|
||||
open = true;
|
||||
if (session && (session->is_opening() || session->is_open())) {
|
||||
dout(10) << "already open|opening, dropping this req" << dendl;
|
||||
delete m;
|
||||
return;
|
||||
}
|
||||
assert(!session); // ?
|
||||
session = mds->sessionmap.get_or_add_session(m->get_source_inst());
|
||||
session->state = Session::STATE_OPENING;
|
||||
mds->sessionmap.touch_session(session);
|
||||
pv = ++mds->sessionmap.projected;
|
||||
mdlog->submit_entry(new ESession(m->get_source_inst(), true, pv),
|
||||
new C_MDS_session_finish(mds, session, true, pv));
|
||||
break;
|
||||
|
||||
case CEPH_SESSION_REQUEST_RENEWCAPS:
|
||||
if (!session) {
|
||||
dout(10) << "dne, dropping this req" << dendl;
|
||||
return;
|
||||
}
|
||||
mds->sessionmap.touch_session(session);
|
||||
mds->messenger->send_message(new MClientSession(CEPH_SESSION_RENEWCAPS, m->stamp), session->inst);
|
||||
break;
|
||||
|
||||
case CEPH_SESSION_REQUEST_RESUME:
|
||||
if (!session) {
|
||||
dout(10) << "dne, replying with close" << dendl;
|
||||
mds->messenger->send_message(new MClientSession(CEPH_SESSION_CLOSE), m->get_source_inst());
|
||||
return;
|
||||
}
|
||||
if (!session->is_stale()) {
|
||||
dout(10) << "hmm, got request_resume on non-stale session for " << session->inst << dendl;
|
||||
assert(0);
|
||||
return;
|
||||
}
|
||||
session->state = Session::STATE_OPEN;
|
||||
mds->sessionmap.touch_session(session);
|
||||
mds->locker->resume_stale_caps(session);
|
||||
mds->messenger->send_message(new MClientSession(CEPH_SESSION_RESUME, m->stamp), session->inst);
|
||||
break;
|
||||
|
||||
case CEPH_SESSION_REQUEST_CLOSE:
|
||||
if (!session || session->is_closing()) {
|
||||
dout(10) << "already closing|dne, dropping this req" << dendl;
|
||||
delete m;
|
||||
return;
|
||||
}
|
||||
if (m->seq < session->get_push_seq()) {
|
||||
dout(10) << "old push seq " << m->seq << " < " << session->get_push_seq()
|
||||
<< ", dropping" << dendl;
|
||||
delete m;
|
||||
return;
|
||||
}
|
||||
assert(m->seq == session->get_push_seq());
|
||||
session->state = Session::STATE_CLOSING;
|
||||
pv = ++mds->sessionmap.projected;
|
||||
mdlog->submit_entry(new ESession(m->get_source_inst(), false, pv),
|
||||
new C_MDS_session_finish(mds, session, false, pv));
|
||||
break;
|
||||
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
|
||||
// journal it
|
||||
version_t pv = ++mds->sessionmap.projected;
|
||||
dout(10) << " sessionmap v " << mds->sessionmap.version << " pv " << pv << dendl;
|
||||
mdlog->submit_entry(new ESession(m->get_source_inst(), open, pv),
|
||||
new C_MDS_session_finish(mds, session, open, pv));
|
||||
delete m;
|
||||
|
||||
if (logger) logger->inc("hcsess");
|
||||
}
|
||||
|
||||
void Server::_session_logged(Session *session, bool open, version_t pv)
|
||||
{
|
||||
dout(10) << "_session_logged " << session->inst << " " << (open ? "open":"close")
|
||||
<< " " << pv
|
||||
<< dendl;
|
||||
<< " " << pv << dendl;
|
||||
|
||||
// apply
|
||||
if (open) {
|
||||
@ -217,11 +225,8 @@ void Server::_session_logged(Session *session, bool open, version_t pv)
|
||||
assert(!open);
|
||||
mds->sessionmap.version++; // noop
|
||||
}
|
||||
|
||||
assert(pv == mds->sessionmap.version);
|
||||
|
||||
// reply
|
||||
if (open)
|
||||
|
||||
if (open)
|
||||
mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->inst);
|
||||
else
|
||||
mds->messenger->send_message(new MClientSession(CEPH_SESSION_CLOSE), session->inst);
|
||||
@ -287,14 +292,16 @@ void Server::find_idle_sessions()
|
||||
{
|
||||
dout(10) << "find_idle_sessions" << dendl;
|
||||
|
||||
utime_t cutoff = g_clock.now();
|
||||
// stale
|
||||
utime_t now = g_clock.now();
|
||||
utime_t cutoff = now;
|
||||
cutoff -= g_conf.mds_cap_timeout;
|
||||
while (1) {
|
||||
Session *session = mds->sessionmap.get_oldest_active_session();
|
||||
if (!session) break;
|
||||
dout(20) << "laggiest session is " << session->inst << dendl;
|
||||
dout(20) << "laggiest active session is " << session->inst << dendl;
|
||||
if (session->last_cap_renew >= cutoff) {
|
||||
dout(20) << "laggiest session is " << session->inst << " and sufficiently new ("
|
||||
dout(20) << "laggiest active session is " << session->inst << " and sufficiently new ("
|
||||
<< session->last_cap_renew << ")" << dendl;
|
||||
break;
|
||||
}
|
||||
@ -302,7 +309,32 @@ void Server::find_idle_sessions()
|
||||
dout(10) << "new stale session " << session->inst << " last " << session->last_cap_renew << dendl;
|
||||
mds->sessionmap.mark_session_stale(session);
|
||||
mds->locker->revoke_stale_caps(session);
|
||||
mds->messenger->send_message(new MClientSession(CEPH_SESSION_STALE, g_clock.now()),
|
||||
session->inst);
|
||||
}
|
||||
|
||||
// dead
|
||||
cutoff = now;
|
||||
cutoff -= g_conf.mds_session_autoclose;
|
||||
while (1) {
|
||||
Session *session = mds->sessionmap.get_oldest_stale_session();
|
||||
if (!session) break;
|
||||
dout(20) << "oldest stale session is " << session->inst << dendl;
|
||||
if (session->last_cap_renew >= cutoff) {
|
||||
dout(20) << "oldest stale session is " << session->inst << " and sufficiently new ("
|
||||
<< session->last_cap_renew << ")" << dendl;
|
||||
break;
|
||||
}
|
||||
|
||||
dout(10) << "autoclosing stale session " << session->inst << " last " << session->last_cap_renew << dendl;
|
||||
|
||||
mds->sessionmap.mark_session_stale(session);
|
||||
mds->locker->revoke_stale_caps(session);
|
||||
mds->messenger->send_message(new MClientSession(CEPH_SESSION_CLOSE),
|
||||
session->inst);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
@ -56,7 +56,7 @@ public:
|
||||
set<CInode*> reconnected_caps;
|
||||
|
||||
void handle_client_session(class MClientSession *m);
|
||||
void _session_logged(Session *session, bool open, version_t cmapv);
|
||||
void _session_logged(Session *session, bool open, version_t pv);
|
||||
void prepare_force_open_sessions(map<int,entity_inst_t> &cm);
|
||||
void finish_force_open_sessions(map<int,entity_inst_t> &cm);
|
||||
void terminate_sessions();
|
||||
|
@ -36,11 +36,11 @@ class Session {
|
||||
// -- state etc --
|
||||
public:
|
||||
static const int STATE_UNDEF = 0;
|
||||
static const int STATE_OPENING = 1;
|
||||
static const int STATE_OPENING = 1; // journaling open
|
||||
static const int STATE_OPEN = 2;
|
||||
static const int STATE_CLOSING = 3;
|
||||
static const int STATE_STALE = 4; // ?
|
||||
static const int STATE_RECONNECTING = 5;
|
||||
static const int STATE_CLOSING = 3; // journaling close
|
||||
static const int STATE_STALE = 4;
|
||||
static const int STATE_RECONNECTING = 5; // ?
|
||||
|
||||
int state;
|
||||
entity_inst_t inst;
|
||||
@ -165,6 +165,10 @@ public:
|
||||
void mark_session_stale(Session *s) {
|
||||
stale_sessions.push_back(&s->session_list_item);
|
||||
}
|
||||
Session *get_oldest_stale_session() {
|
||||
if (stale_sessions.empty()) return 0;
|
||||
return stale_sessions.front();
|
||||
}
|
||||
|
||||
void get_client_set(set<int>& s) {
|
||||
for (hash_map<entity_name_t,Session*>::iterator p = session_map.begin();
|
||||
|
@ -613,14 +613,15 @@ void ESlaveUpdate::replay(MDS *mds)
|
||||
commit._segment = _segment; // may need this later
|
||||
rollback._segment = _segment; // may need this later
|
||||
mds->mdcache->uncommitted_slave_updates[master][reqid] =
|
||||
MDSlaveUpdate(commit, rollback, _segment->slave_updates);
|
||||
new MDSlaveUpdate(commit, rollback, _segment->slave_updates);
|
||||
break;
|
||||
|
||||
case ESlaveUpdate::OP_COMMIT:
|
||||
if (mds->mdcache->uncommitted_slave_updates[master].count(reqid)) {
|
||||
dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds" << master
|
||||
<< ": applying commit blob" << dendl;
|
||||
mds->mdcache->uncommitted_slave_updates[master][reqid].commit.replay(mds, _segment);
|
||||
mds->mdcache->uncommitted_slave_updates[master][reqid]->commit.replay(mds, _segment);
|
||||
delete mds->mdcache->uncommitted_slave_updates[master][reqid];
|
||||
mds->mdcache->uncommitted_slave_updates[master].erase(reqid);
|
||||
} else {
|
||||
dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds" << master
|
||||
@ -633,7 +634,8 @@ void ESlaveUpdate::replay(MDS *mds)
|
||||
dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds" << master
|
||||
<< ": applying rollback blob" << dendl;
|
||||
assert(mds->mdcache->uncommitted_slave_updates[master].count(reqid));
|
||||
mds->mdcache->uncommitted_slave_updates[master][reqid].rollback.replay(mds, _segment);
|
||||
mds->mdcache->uncommitted_slave_updates[master][reqid]->rollback.replay(mds, _segment);
|
||||
delete mds->mdcache->uncommitted_slave_updates[master][reqid];
|
||||
mds->mdcache->uncommitted_slave_updates[master].erase(reqid);
|
||||
} else {
|
||||
dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds" << master
|
||||
|
@ -27,17 +27,22 @@ public:
|
||||
case CEPH_SESSION_CLOSE: return "close";
|
||||
case CEPH_SESSION_REQUEST_RENEWCAPS: return "request_renewcaps";
|
||||
case CEPH_SESSION_RENEWCAPS: return "renewcaps";
|
||||
case CEPH_SESSION_STALE: return "stale";
|
||||
default: assert(0); return 0;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t op;
|
||||
version_t seq;
|
||||
version_t seq; // used when requesting close only
|
||||
utime_t stamp;
|
||||
|
||||
MClientSession() : Message(CEPH_MSG_CLIENT_SESSION) { }
|
||||
MClientSession(int o, version_t s=0) :
|
||||
Message(CEPH_MSG_CLIENT_SESSION),
|
||||
op(o), seq(s) { }
|
||||
MClientSession(int o, utime_t st) :
|
||||
Message(CEPH_MSG_CLIENT_SESSION),
|
||||
op(o), seq(0), stamp(st) { }
|
||||
|
||||
const char *get_type_name() { return "client_session"; }
|
||||
void print(ostream& out) {
|
||||
@ -47,13 +52,15 @@ public:
|
||||
}
|
||||
|
||||
void decode_payload() {
|
||||
int off = 0;
|
||||
::_decode(op, payload, off);
|
||||
::_decode(seq, payload, off);
|
||||
bufferlist::iterator p = payload.begin();
|
||||
::_decode_simple(op, p);
|
||||
::_decode_simple(seq, p);
|
||||
::_decode_simple(stamp, p);
|
||||
}
|
||||
void encode_payload() {
|
||||
::_encode(op, payload);
|
||||
::_encode(seq, payload);
|
||||
::_encode_simple(op, payload);
|
||||
::_encode_simple(seq, payload);
|
||||
::_encode_simple(stamp, payload);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -6,4 +6,4 @@
|
||||
./cosd --mkfs --osd 1 &
|
||||
./cosd --mkfs --osd 2 &
|
||||
./cosd --mkfs --osd 3 &
|
||||
./cmds &
|
||||
./cmds --debug_ms 1 --debug_mds 20 &
|
||||
|
Loading…
Reference in New Issue
Block a user