client: unify cap flush and snapcap flush

This patch includes following changes
- assign flush tid to snapcap flush
- remove session's flushing_capsnaps list. add inode with snapcap
  flushes to session's flushing_caps list instead.
- when reconnecting to MDS, re-send one inode's snapcap flushes and
  cap flushes at the same time.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
This commit is contained in:
Yan, Zheng 2016-06-28 20:39:08 +08:00
parent bc50e03092
commit a05e996b2a
4 changed files with 44 additions and 44 deletions

View File

@ -3450,11 +3450,9 @@ void Client::_flushed_cap_snap(Inode *in, snapid_t seq)
flush_snaps(in);
}
void Client::flush_snaps(Inode *in, bool all_again, CapSnap *again)
void Client::flush_snaps(Inode *in, bool all_again)
{
ldout(cct, 10) << "flush_snaps on " << *in
<< " all_again " << all_again
<< " again " << again << dendl;
ldout(cct, 10) << "flush_snaps on " << *in << " all_again " << all_again << dendl;
assert(in->cap_snaps.size());
// pick auth mds
@ -3464,13 +3462,9 @@ void Client::flush_snaps(Inode *in, bool all_again, CapSnap *again)
for (map<snapid_t,CapSnap*>::iterator p = in->cap_snaps.begin(); p != in->cap_snaps.end(); ++p) {
CapSnap *capsnap = p->second;
if (again) {
// only one capsnap
if (again != capsnap)
continue;
} else if (!all_again) {
if (!all_again) {
// only flush once per session
if (capsnap->flushing_item.is_on_list())
if (capsnap->flush_tid > 0)
continue;
}
@ -3484,9 +3478,13 @@ void Client::flush_snaps(Inode *in, bool all_again, CapSnap *again)
if (capsnap->dirty_data || capsnap->writing)
continue;
in->auth_cap->session->flushing_capsnaps.push_back(&capsnap->flushing_item);
if (capsnap->flush_tid == 0) {
capsnap->flush_tid = ++last_flush_tid;
if (!in->flushing_cap_item.is_on_list())
session->flushing_caps.push_back(&in->flushing_cap_item);
session->flushing_caps_tids.insert(capsnap->flush_tid);
}
capsnap->flush_tid = ++last_flush_tid;
MClientCaps *m = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP, in->ino, in->snaprealm->ino, 0, mseq,
cap_epoch_barrier);
if (user_id >= 0)
@ -3519,6 +3517,9 @@ void Client::flush_snaps(Inode *in, bool all_again, CapSnap *again)
m->inline_data = in->inline_data;
}
assert(!session->flushing_caps_tids.empty());
m->set_oldest_flush_tid(*session->flushing_caps_tids.begin());
session->con->send_message(m);
}
}
@ -4021,8 +4022,8 @@ int Client::mark_caps_flushing(Inode *in, ceph_tid_t* ptid)
in->flushing_caps |= flushing;
in->dirty_caps = 0;
session->flushing_caps.push_back(&in->flushing_cap_item);
if (!in->flushing_cap_item.is_on_list())
session->flushing_caps.push_back(&in->flushing_cap_item);
session->flushing_caps_tids.insert(flush_tid);
*ptid = flush_tid;
@ -4031,6 +4032,13 @@ int Client::mark_caps_flushing(Inode *in, ceph_tid_t* ptid)
void Client::adjust_session_flushing_caps(Inode *in, MetaSession *old_s, MetaSession *new_s)
{
for (auto p = in->cap_snaps.begin(); p != in->cap_snaps.end(); ++p) {
CapSnap *capsnap = p->second;
if (capsnap->flush_tid > 0) {
old_s->flushing_caps_tids.erase(capsnap->flush_tid);
new_s->flushing_caps_tids.insert(capsnap->flush_tid);
}
}
for (map<ceph_tid_t, int>::iterator it = in->flushing_cap_tids.begin();
it != in->flushing_cap_tids.end();
++it) {
@ -4069,8 +4077,6 @@ void Client::flush_caps(Inode *in, MetaSession *session)
for (map<ceph_tid_t,int>::iterator p = in->flushing_cap_tids.begin();
p != in->flushing_cap_tids.end();
++p) {
if (session->kicked_flush_tids.count(p->first))
continue;
send_cap(in, session, cap, (get_caps_used(in) | in->caps_dirty()),
in->caps_wanted(), (cap->issued | cap->implemented),
p->second, p->first);
@ -4117,33 +4123,27 @@ void Client::kick_flushing_caps(MetaSession *session)
mds_rank_t mds = session->mds_num;
ldout(cct, 10) << "kick_flushing_caps mds." << mds << dendl;
for (xlist<CapSnap*>::iterator p = session->flushing_capsnaps.begin(); !p.end(); ++p) {
CapSnap *capsnap = *p;
InodeRef& in = capsnap->in;
ldout(cct, 20) << " reflushing capsnap " << capsnap
<< " on " << *in << " to mds." << mds << dendl;
flush_snaps(in.get(), false, capsnap);
}
for (xlist<Inode*>::iterator p = session->flushing_caps.begin(); !p.end(); ++p) {
Inode *in = *p;
if (session->early_flushing_caps.count(in))
continue;
ldout(cct, 20) << " reflushing caps on " << *in << " to mds." << mds << dendl;
if (in->cap_snaps.size())
flush_snaps(in, true);
if (in->flushing_caps)
flush_caps(in, session);
}
session->kicked_flush_tids.clear();
session->early_flushing_caps.clear();
}
void Client::early_kick_flushing_caps(MetaSession *session)
{
session->kicked_flush_tids.clear();
session->early_flushing_caps.clear();
for (xlist<Inode*>::iterator p = session->flushing_caps.begin(); !p.end(); ++p) {
Inode *in = *p;
if (!in->flushing_caps)
continue;
assert(in->auth_cap);
Cap *cap = in->auth_cap;
// if flushing caps were revoked, we re-send the cap flush in client reconnect
// stage. This guarantees that MDS processes the cap flush message before issuing
@ -4154,14 +4154,13 @@ void Client::early_kick_flushing_caps(MetaSession *session)
ldout(cct, 20) << " reflushing caps (revoked) on " << *in
<< " to mds." << session->mds_num << dendl;
for (map<ceph_tid_t,int>::iterator q = in->flushing_cap_tids.begin();
q != in->flushing_cap_tids.end();
++q) {
send_cap(in, session, cap, (get_caps_used(in) | in->caps_dirty()),
in->caps_wanted(), (cap->issued | cap->implemented),
q->second, q->first);
session->kicked_flush_tids.insert(q->first);
}
session->early_flushing_caps.insert(in);
if (in->cap_snaps.size())
flush_snaps(in, true);
if (in->flushing_caps)
flush_caps(in, session);
}
}
@ -4699,8 +4698,9 @@ void Client::handle_cap_flush_ack(MetaSession *session, Inode *in, Cap *cap, MCl
in->flushing_caps &= ~cleaned;
if (in->flushing_caps == 0) {
ldout(cct, 10) << " " << *in << " !flushing" << dendl;
in->flushing_cap_item.remove_myself();
num_flushing_caps--;
if (in->cap_snaps.empty())
in->flushing_cap_item.remove_myself();
}
if (!in->caps_dirty())
put_inode(in);
@ -4725,7 +4725,10 @@ void Client::handle_cap_flushsnap_ack(MetaSession *session, Inode *in, MClientCa
ldout(cct, 5) << "handle_cap_flushedsnap mds." << mds << " flushed snap follows " << follows
<< " on " << *in << dendl;
in->cap_snaps.erase(follows);
capsnap->flushing_item.remove_myself();
if (in->flushing_caps == 0 && in->cap_snaps.empty())
in->flushing_cap_item.remove_myself();
session->flushing_caps_tids.erase(capsnap->flush_tid);
delete capsnap;
}
} else {

View File

@ -648,7 +648,7 @@ protected:
void check_caps(Inode *in, bool is_delayed);
void get_cap_ref(Inode *in, int cap);
void put_cap_ref(Inode *in, int cap);
void flush_snaps(Inode *in, bool all_again=false, CapSnap *again=0);
void flush_snaps(Inode *in, bool all_again=false);
void wait_sync_caps(Inode *in, ceph_tid_t want);
void wait_sync_caps(ceph_tid_t want);
void queue_cap_snap(Inode *in, SnapContext &old_snapc);

View File

@ -64,13 +64,11 @@ struct CapSnap {
bool writing, dirty_data;
uint64_t flush_tid;
xlist<CapSnap*>::item flushing_item;
explicit CapSnap(Inode *i)
: in(i), issued(0), dirty(0),
size(0), time_warp_seq(0), mode(0), uid(0), gid(0), xattr_version(0),
inline_version(0), writing(false), dirty_data(false), flush_tid(0),
flushing_item(this)
inline_version(0), writing(false), dirty_data(false), flush_tid(0)
{}
void dump(Formatter *f) const;

View File

@ -41,11 +41,10 @@ struct MetaSession {
xlist<Cap*> caps;
xlist<Inode*> flushing_caps;
xlist<CapSnap*> flushing_capsnaps;
xlist<MetaRequest*> requests;
xlist<MetaRequest*> unsafe_requests;
std::set<ceph_tid_t> flushing_caps_tids;
std::set<ceph_tid_t> kicked_flush_tids;
std::set<Inode*> early_flushing_caps;
Cap *s_cap_iterator;