Merge remote-tracking branch 'gh/jewel'

This commit is contained in:
Sage Weil 2016-01-20 16:49:12 -05:00
commit 141888609a
11 changed files with 121 additions and 25 deletions

View File

@ -3975,6 +3975,8 @@ void Client::flush_caps(Inode *in, MetaSession *session)
for (map<ceph_tid_t,int>::iterator p = in->flushing_cap_tids.begin();
p != in->flushing_cap_tids.end();
++p) {
if (session->kicked_flush_tids.count(p->first))
continue;
send_cap(in, session, cap, (get_caps_used(in) | in->caps_dirty()),
in->caps_wanted(), (cap->issued | cap->implemented),
p->second, p->first);
@ -4034,10 +4036,14 @@ void Client::kick_flushing_caps(MetaSession *session)
if (in->flushing_caps)
flush_caps(in, session);
}
session->kicked_flush_tids.clear();
}
void Client::early_kick_flushing_caps(MetaSession *session)
{
session->kicked_flush_tids.clear();
for (xlist<Inode*>::iterator p = session->flushing_caps.begin(); !p.end(); ++p) {
Inode *in = *p;
if (!in->flushing_caps)
@ -4048,20 +4054,19 @@ void Client::early_kick_flushing_caps(MetaSession *session)
// if flushing caps were revoked, we re-send the cap flush in client reconnect
// stage. This guarantees that MDS processes the cap flush message before issuing
// the flushing caps to other client.
bool send_now = (in->flushing_caps & in->auth_cap->issued) != in->flushing_caps;
if ((in->flushing_caps & in->auth_cap->issued) == in->flushing_caps)
continue;
if (send_now)
ldout(cct, 20) << " reflushing caps (revoked) on " << *in
<< " to mds." << session->mds_num << dendl;
ldout(cct, 20) << " reflushing caps (revoked) on " << *in
<< " to mds." << session->mds_num << dendl;
for (map<ceph_tid_t,int>::iterator q = in->flushing_cap_tids.begin();
q != in->flushing_cap_tids.end();
++q) {
if (send_now) {
send_cap(in, session, cap, (get_caps_used(in) | in->caps_dirty()),
in->caps_wanted(), (cap->issued | cap->implemented),
q->second, q->first);
}
send_cap(in, session, cap, (get_caps_used(in) | in->caps_dirty()),
in->caps_wanted(), (cap->issued | cap->implemented),
q->second, q->first);
session->kicked_flush_tids.insert(q->first);
}
}
}
@ -7834,6 +7839,8 @@ int Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl)
const md_config_t *conf = cct->_conf;
Inode *in = f->inode.get();
if ((f->mode & CEPH_FILE_MODE_RD) == 0)
return -EBADF;
//bool lazy = f->mode == CEPH_FILE_MODE_LAZY;
bool movepos = false;

View File

@ -47,6 +47,7 @@ struct MetaSession {
xlist<MetaRequest*> requests;
xlist<MetaRequest*> unsafe_requests;
std::set<ceph_tid_t> flushing_caps_tids;
std::set<ceph_tid_t> kicked_flush_tids;
Cap *s_cap_iterator;

View File

@ -432,6 +432,7 @@ OPTION(mds_client_prealloc_inos, OPT_INT, 1000)
OPTION(mds_early_reply, OPT_BOOL, true)
OPTION(mds_default_dir_hash, OPT_INT, CEPH_STR_HASH_RJENKINS)
OPTION(mds_log, OPT_BOOL, true)
OPTION(mds_log_pause, OPT_BOOL, false)
OPTION(mds_log_skip_corrupt_events, OPT_BOOL, false)
OPTION(mds_log_max_events, OPT_INT, -1)
OPTION(mds_log_events_per_segment, OPT_INT, 1024)

View File

@ -2465,7 +2465,7 @@ void Locker::handle_client_caps(MClientCaps *m)
if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
if (mds->is_reconnect() &&
m->get_dirty() && m->get_client_tid() > 0 &&
session->have_completed_flush(m->get_client_tid())) {
!session->have_completed_flush(m->get_client_tid())) {
mdcache->set_reconnect_dirty_caps(m->get_ino(), m->get_dirty());
}
mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
@ -2488,8 +2488,14 @@ void Locker::handle_client_caps(MClientCaps *m)
ack->set_snap_follows(follows);
ack->set_client_tid(m->get_client_tid());
mds->send_message_client_counted(ack, m->get_connection());
m->put();
return;
if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
m->put();
return;
} else {
// fall-thru because the message may release some caps
m->clear_dirty();
m->set_op(CEPH_CAP_OP_UPDATE);
}
}
// "oldest flush tid" > 0 means client uses unique TID for each flush
@ -2517,7 +2523,13 @@ void Locker::handle_client_caps(MClientCaps *m)
CInode *head_in = mdcache->get_inode(m->get_ino());
if (!head_in) {
dout(7) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
if (mds->is_clientreplay()) {
dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
<< ", will try again after replayed client requests" << dendl;
mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
return;
}
dout(1) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
m->put();
return;
}

View File

@ -5540,8 +5540,14 @@ void MDCache::export_remaining_imported_caps()
}
}
for (map<inodeno_t, list<MDSInternalContextBase*> >::iterator p = cap_reconnect_waiters.begin();
p != cap_reconnect_waiters.end();
++p)
mds->queue_waiters(p->second);
cap_imports.clear();
cap_imports_dirty.clear();
cap_reconnect_waiters.clear();
if (warn_str.peek() != EOF) {
mds->clog->warn() << "failed to reconnect caps for missing inodes:" << "\n";
@ -5571,6 +5577,13 @@ void MDCache::try_reconnect_cap(CInode *in, Session *session)
in->choose_lock_states(dirty_caps);
dout(15) << " chose lock states on " << *in << dendl;
}
map<inodeno_t, list<MDSInternalContextBase*> >::iterator it =
cap_reconnect_waiters.find(in->ino());
if (it != cap_reconnect_waiters.end()) {
mds->queue_waiters(it->second);
cap_reconnect_waiters.erase(it);
}
}
}

View File

@ -493,6 +493,7 @@ protected:
map<inodeno_t,map<client_t,map<mds_rank_t,ceph_mds_cap_reconnect> > > cap_imports; // ino -> client -> frommds -> capex
map<inodeno_t,int> cap_imports_dirty;
set<inodeno_t> cap_imports_missing;
map<inodeno_t, list<MDSInternalContextBase*> > cap_reconnect_waiters;
int cap_imports_num_opening;
set<CInode*> rejoin_undef_inodes;
@ -551,6 +552,9 @@ public:
void set_reconnect_dirty_caps(inodeno_t ino, int dirty) {
cap_imports_dirty[ino] |= dirty;
}
void wait_replay_cap_reconnect(inodeno_t ino, MDSInternalContextBase *c) {
cap_reconnect_waiters[ino].push_back(c);
}
// [reconnect/rejoin caps]
map<CInode*,map<client_t, inodeno_t> > reconnected_caps; // inode -> client -> realmino

View File

@ -356,6 +356,11 @@ void MDLog::_submit_thread()
submit_mutex.Lock();
while (!mds->is_daemon_stopping()) {
if (g_conf->mds_log_pause) {
submit_cond.Wait(submit_mutex);
continue;
}
map<uint64_t,list<PendingEvent> >::iterator it = pending_events.begin();
if (it == pending_events.end()) {
submit_cond.Wait(submit_mutex);
@ -460,6 +465,12 @@ void MDLog::flush()
journaler->flush();
}
void MDLog::kick_submitter()
{
Mutex::Locker l(submit_mutex);
submit_cond.Signal();
}
void MDLog::cap()
{
dout(5) << "cap" << dendl;

View File

@ -254,6 +254,7 @@ public:
bool is_capped() { return capped; }
void cap();
void kick_submitter();
void shutdown();
// -- events --

View File

@ -346,6 +346,7 @@ const char** MDSDaemon::get_tracked_conf_keys() const
"mds_op_complaint_time", "mds_op_log_threshold",
"mds_op_history_size", "mds_op_history_duration",
"mds_enable_op_tracker",
"mds_log_pause",
// clog & admin clog
"clog_to_monitors",
"clog_to_syslog",
@ -388,6 +389,10 @@ void MDSDaemon::handle_conf_change(const struct md_config_t *conf,
mds_rank->update_log_config();
}
}
if (!g_conf->mds_log_pause && changed.count("mds_log_pause")) {
if (mds_rank)
mds_rank->mdlog->kick_submitter();
}
}

View File

@ -92,6 +92,8 @@ class MClientCaps : public Message {
void set_oldest_flush_tid(ceph_tid_t tid) { oldest_flush_tid = tid; }
ceph_tid_t get_oldest_flush_tid() { return oldest_flush_tid; }
void clear_dirty() { head.dirty = 0; }
MClientCaps()
: Message(CEPH_MSG_CLIENT_CAPS, HEAD_VERSION, COMPAT_VERSION),
osd_epoch_barrier(0),

View File

@ -66,6 +66,40 @@ TEST(LibCephFS, OpenEmptyComponent) {
ceph_shutdown(cmount);
}
TEST(LibCephFS, OpenReadWrite) {
struct ceph_mount_info *cmount;
ASSERT_EQ(0, ceph_create(&cmount, NULL));
ASSERT_EQ(0, ceph_conf_read_file(cmount, NULL));
ASSERT_EQ(0, ceph_conf_parse_env(cmount, NULL));
ASSERT_EQ(0, ceph_mount(cmount, "/"));
char c_path[1024];
sprintf(c_path, "test_open_rdwr_%d", getpid());
int fd = ceph_open(cmount, c_path, O_WRONLY|O_CREAT, 0666);
ASSERT_LT(0, fd);
const char *out_buf = "hello world";
size_t size = strlen(out_buf);
char in_buf[100];
ASSERT_EQ(ceph_write(cmount, fd, out_buf, size, 0), size);
ASSERT_EQ(ceph_read(cmount, fd, in_buf, sizeof(in_buf), 0), -EBADF);
ASSERT_EQ(0, ceph_close(cmount, fd));
fd = ceph_open(cmount, c_path, O_RDONLY, 0);
ASSERT_LT(0, fd);
ASSERT_EQ(ceph_write(cmount, fd, out_buf, size, 0), -EBADF);
ASSERT_EQ(ceph_read(cmount, fd, in_buf, sizeof(in_buf), 0), size);
ASSERT_EQ(0, ceph_close(cmount, fd));
fd = ceph_open(cmount, c_path, O_RDWR, 0);
ASSERT_LT(0, fd);
ASSERT_EQ(ceph_write(cmount, fd, out_buf, size, 0), size);
ASSERT_EQ(ceph_read(cmount, fd, in_buf, sizeof(in_buf), 0), size);
ASSERT_EQ(0, ceph_close(cmount, fd));
ceph_shutdown(cmount);
}
TEST(LibCephFS, MountNonExist) {
struct ceph_mount_info *cmount;
@ -301,8 +335,10 @@ TEST(LibCephFS, DirLs) {
int count = 0;
std::set<std::string> found;
while (count < r) {
while (true) {
int len = ceph_getdents(cmount, ls_dir, (char *)getdents_entries, r * sizeof(*getdents_entries));
if (len == 0)
break;
ASSERT_GT(len, 0);
ASSERT_TRUE((len % sizeof(*getdents_entries)) == 0);
int n = len / sizeof(*getdents_entries);
@ -311,19 +347,16 @@ TEST(LibCephFS, DirLs) {
ASSERT_STREQ(getdents_entries[0].d_name, ".");
ASSERT_STREQ(getdents_entries[1].d_name, "..");
j = 2;
count += n - 2;
} else {
j = 0;
count += n;
}
count += n;
for(; j < n; ++i, ++j) {
const char *name = getdents_entries[j].d_name;
ASSERT_TRUE(found.count(name) == 0);
found.insert(name);
}
}
ASSERT_EQ(count, r);
ASSERT_EQ(found.size(), r);
free(getdents_entries);
// test readdir_r
@ -337,12 +370,15 @@ TEST(LibCephFS, DirLs) {
ASSERT_STREQ(result->d_name, "..");
found.clear();
for(i = 0; i < r; ++i) {
while (true) {
struct dirent rdent;
ASSERT_EQ(ceph_readdir_r(cmount, ls_dir, &rdent), 1);
ASSERT_TRUE(found.count(rdent.d_name) == 0);
int len = ceph_readdir_r(cmount, ls_dir, &rdent);
if (len == 0)
break;
ASSERT_EQ(len, 1);
found.insert(rdent.d_name);
}
ASSERT_EQ(found.size(), r);
// test readdirplus
ceph_rewinddir(cmount, ls_dir);
@ -355,13 +391,15 @@ TEST(LibCephFS, DirLs) {
ASSERT_STREQ(result->d_name, "..");
found.clear();
for(i = 0; i < r; ++i) {
while (true) {
struct dirent rdent;
struct stat st;
int stmask;
ASSERT_EQ(ceph_readdirplus_r(cmount, ls_dir, &rdent, &st, &stmask), 1);
int len = ceph_readdirplus_r(cmount, ls_dir, &rdent, &st, &stmask);
if (len == 0)
break;
ASSERT_EQ(len, 1);
const char *name = rdent.d_name;
ASSERT_TRUE(found.count(name) == 0);
found.insert(name);
int size;
sscanf(name, "dirf%d", &size);
@ -369,6 +407,7 @@ TEST(LibCephFS, DirLs) {
ASSERT_EQ(st.st_ino, rdent.d_ino);
//ASSERT_EQ(st.st_mode, (mode_t)0666);
}
ASSERT_EQ(found.size(), r);
ASSERT_EQ(ceph_closedir(cmount, ls_dir), 0);