1
0
mirror of https://github.com/ceph/ceph synced 2025-04-04 15:36:24 +00:00

lots of cap bits in the kenrel client; cap vars in ceph_fs.h, renamed

This commit is contained in:
Sage Weil 2008-01-11 17:15:03 -08:00
parent 9e1d292f9d
commit baf11742f7
19 changed files with 341 additions and 152 deletions

View File

@ -395,7 +395,7 @@ Inode* Client::insert_inode(Dir *dir, InodeStat *st, const string& dname)
dn->inode->mask = st->mask;
// or do we have newer size/mtime from writing?
if (dn->inode->file_caps() & CAP_FILE_WR) {
if (dn->inode->file_caps() & CEPH_CAP_WR) {
if (dn->inode->file_wr_size > dn->inode->inode.size)
dn->inode->inode.size = dn->inode->file_wr_size;
if (dn->inode->file_wr_mtime > dn->inode->inode.mtime)
@ -1241,8 +1241,8 @@ void Client::handle_file_caps(MClientFileCaps *m)
<< " was " << cap_string(old_caps) << dendl;
// did file size decrease?
if ((old_caps & (CAP_FILE_RD|CAP_FILE_WR)) == 0 &&
(new_caps & (CAP_FILE_RD|CAP_FILE_WR)) != 0 &&
if ((old_caps & (CEPH_CAP_RD|CEPH_CAP_WR)) == 0 &&
(new_caps & (CEPH_CAP_RD|CEPH_CAP_WR)) != 0 &&
in->inode.size > (loff_t)m->get_size()) {
dout(10) << "*** file size decreased from " << in->inode.size << " to " << m->get_size() << dendl;
@ -1284,7 +1284,7 @@ void Client::handle_file_caps(MClientFileCaps *m)
// caching off.
// wake up waiters?
if (new_caps & CAP_FILE_RD) {
if (new_caps & CEPH_CAP_RD) {
for (list<Cond*>::iterator it = in->waitfor_read.begin();
it != in->waitfor_read.end();
it++) {
@ -1293,7 +1293,7 @@ void Client::handle_file_caps(MClientFileCaps *m)
}
in->waitfor_read.clear();
}
if (new_caps & CAP_FILE_WR) {
if (new_caps & CEPH_CAP_WR) {
for (list<Cond*>::iterator it = in->waitfor_write.begin();
it != in->waitfor_write.end();
it++) {
@ -1302,7 +1302,7 @@ void Client::handle_file_caps(MClientFileCaps *m)
}
in->waitfor_write.clear();
}
if (new_caps & CAP_FILE_LAZYIO) {
if (new_caps & CEPH_CAP_LAZYIO) {
for (list<Cond*>::iterator it = in->waitfor_lazy.begin();
it != in->waitfor_lazy.end();
it++) {
@ -2668,8 +2668,8 @@ void Client::close_release(Inode *in)
in->fc.release_clean();
int retain = 0;
if (in->num_open_wr || in->fc.is_dirty()) retain |= CAP_FILE_WR | CAP_FILE_WRBUFFER | CAP_FILE_WREXTEND;
if (in->num_open_rd || in->fc.is_cached()) retain |= CAP_FILE_RD | CAP_FILE_RDCACHE;
if (in->num_open_wr || in->fc.is_dirty()) retain |= CEPH_CAP_WR | CEPH_CAP_WRBUFFER | CEPH_CAP_WREXTEND;
if (in->num_open_rd || in->fc.is_cached()) retain |= CEPH_CAP_RD | CEPH_CAP_RDCACHE;
release_caps(in, retain); // release caps now.
}
@ -2859,7 +2859,7 @@ int Client::_read(Fh *f, off_t offset, off_t size, bufferlist *bl)
// determine whether read range overlaps with file
// ...ONLY if we're doing async io
if (!lazy && (in->file_caps() & (CAP_FILE_WRBUFFER|CAP_FILE_RDCACHE))) {
if (!lazy && (in->file_caps() & (CEPH_CAP_WRBUFFER|CEPH_CAP_RDCACHE))) {
// we're doing buffered i/o. make sure we're inside the file.
// we can trust size info bc we get accurate info when buffering/caching caps are issued.
dout(10) << "file size: " << in->inode.size << dendl;
@ -2904,14 +2904,14 @@ int Client::_read(Fh *f, off_t offset, off_t size, bufferlist *bl)
// object cache OFF -- legacy inconsistent way.
// do we have read file cap?
while (!lazy && (in->file_caps() & CAP_FILE_RD) == 0) {
while (!lazy && (in->file_caps() & CEPH_CAP_RD) == 0) {
dout(7) << " don't have read cap, waiting" << dendl;
Cond cond;
in->waitfor_read.push_back(&cond);
cond.Wait(client_lock);
}
// lazy cap?
while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
while (lazy && (in->file_caps() & CEPH_CAP_LAZYIO) == 0) {
dout(7) << " don't have lazy cap, waiting" << dendl;
Cond cond;
in->waitfor_lazy.push_back(&cond);
@ -3027,13 +3027,13 @@ int Client::_write(Fh *f, off_t offset, off_t size, const char *buf)
dout(7) << "synchronous write" << dendl;
// do we have write file cap?
while (!lazy && (in->file_caps() & CAP_FILE_WR) == 0) {
while (!lazy && (in->file_caps() & CEPH_CAP_WR) == 0) {
dout(7) << " don't have write cap, waiting" << dendl;
Cond cond;
in->waitfor_write.push_back(&cond);
cond.Wait(client_lock);
}
while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
while (lazy && (in->file_caps() & CEPH_CAP_LAZYIO) == 0) {
dout(7) << " don't have lazy cap, waiting" << dendl;
Cond cond;
in->waitfor_lazy.push_back(&cond);
@ -3301,7 +3301,7 @@ int Client::lazyio_propogate(int fd, off_t offset, size_t count)
if (f->mode & FILE_MODE_LAZY) {
// wait for lazy cap
while ((in->file_caps() & CAP_FILE_LAZYIO) == 0) {
while ((in->file_caps() & CEPH_CAP_LAZYIO) == 0) {
dout(7) << " don't have lazy cap, waiting" << dendl;
Cond cond;
in->waitfor_lazy.push_back(&cond);
@ -3337,7 +3337,7 @@ int Client::lazyio_synchronize(int fd, off_t offset, size_t count)
if (f->mode & FILE_MODE_LAZY) {
// wait for lazy cap
while ((in->file_caps() & CAP_FILE_LAZYIO) == 0) {
while ((in->file_caps() & CEPH_CAP_LAZYIO) == 0) {
dout(7) << " don't have lazy cap, waiting" << dendl;
Cond cond;
in->waitfor_lazy.push_back(&cond);

View File

@ -228,11 +228,11 @@ class Inode {
int file_caps_wanted() {
int w = 0;
if (num_open_rd) w |= CAP_FILE_RD|CAP_FILE_RDCACHE;
if (num_open_wr) w |= CAP_FILE_WR|CAP_FILE_WRBUFFER;
if (num_open_lazy) w |= CAP_FILE_LAZYIO;
if (fc.is_dirty()) w |= CAP_FILE_WRBUFFER;
if (fc.is_cached()) w |= CAP_FILE_RDCACHE;
if (num_open_rd) w |= CEPH_CAP_RD|CEPH_CAP_RDCACHE;
if (num_open_wr) w |= CEPH_CAP_WR|CEPH_CAP_WRBUFFER;
if (num_open_lazy) w |= CEPH_CAP_LAZYIO;
if (fc.is_dirty()) w |= CEPH_CAP_WRBUFFER;
if (fc.is_cached()) w |= CEPH_CAP_RDCACHE;
return w;
}

View File

@ -123,10 +123,10 @@ void FileCache::set_caps(int caps, Context *onimplement)
int FileCache::get_used_caps()
{
int used = 0;
if (num_reading) used |= CAP_FILE_RD;
if (oc->set_is_cached(inode.ino)) used |= CAP_FILE_RDCACHE;
if (num_writing) used |= CAP_FILE_WR;
if (oc->set_is_dirty_or_committing(inode.ino)) used |= CAP_FILE_WRBUFFER;
if (num_reading) used |= CEPH_CAP_RD;
if (oc->set_is_cached(inode.ino)) used |= CEPH_CAP_RDCACHE;
if (num_writing) used |= CEPH_CAP_WR;
if (oc->set_is_dirty_or_committing(inode.ino)) used |= CEPH_CAP_WRBUFFER;
return used;
}
@ -138,11 +138,11 @@ void FileCache::check_caps()
// try to implement caps?
// BUG? latest_caps, not least caps i've seen?
if ((latest_caps & CAP_FILE_RDCACHE) == 0 &&
(used & CAP_FILE_RDCACHE))
if ((latest_caps & CEPH_CAP_RDCACHE) == 0 &&
(used & CEPH_CAP_RDCACHE))
release_clean();
if ((latest_caps & CAP_FILE_WRBUFFER) == 0 &&
(used & CAP_FILE_WRBUFFER))
if ((latest_caps & CEPH_CAP_WRBUFFER) == 0 &&
(used & CEPH_CAP_WRBUFFER))
flush_dirty(new C_FC_CheckCaps(this));
used = get_used_caps();
@ -176,7 +176,7 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_
int r = 0;
// can i read?
while ((latest_caps & CAP_FILE_RD) == 0) {
while ((latest_caps & CEPH_CAP_RD) == 0) {
dout(10) << "read doesn't have RD cap, blocking" << dendl;
Cond c;
waitfor_read.insert(&c);
@ -187,7 +187,7 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_
// inc reading counter
num_reading++;
if (latest_caps & CAP_FILE_RDCACHE) {
if (latest_caps & CEPH_CAP_RDCACHE) {
// read (and block)
Cond cond;
bool done = false;
@ -221,7 +221,7 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_
void FileCache::write(off_t offset, size_t size, bufferlist& blist, Mutex& client_lock)
{
// can i write
while ((latest_caps & CAP_FILE_WR) == 0) {
while ((latest_caps & CEPH_CAP_WR) == 0) {
dout(10) << "write doesn't have WR cap, blocking" << dendl;
Cond c;
waitfor_write.insert(&c);
@ -233,7 +233,7 @@ void FileCache::write(off_t offset, size_t size, bufferlist& blist, Mutex& clien
num_writing++;
if (size > 0) {
if (latest_caps & CAP_FILE_WRBUFFER) { // caps buffered write?
if (latest_caps & CEPH_CAP_WRBUFFER) { // caps buffered write?
// wait? (this may block!)
oc->wait_for_write(size, client_lock);

View File

@ -53,8 +53,8 @@ class FileCache {
}
// waiters/waiting
bool can_read() { return latest_caps & CAP_FILE_RD; }
bool can_write() { return latest_caps & CAP_FILE_WR; }
bool can_read() { return latest_caps & CEPH_CAP_RD; }
bool can_write() { return latest_caps & CEPH_CAP_WR; }
bool all_safe();// { return num_unsafe == 0; }
void add_safe_waiter(Context *c);

View File

@ -419,9 +419,17 @@ struct ceph_mds_reply_dirfrag {
} __attribute__ ((packed));
/* client file caps */
#define CEPH_CAP_PIN 1 /* no specific capabilities beyond the pin */
#define CEPH_CAP_RDCACHE 2 /* client can cache reads */
#define CEPH_CAP_RD 4 /* client can read */
#define CEPH_CAP_WR 8 /* client can write */
#define CEPH_CAP_WRBUFFER 16 /* client can buffer writes */
#define CEPH_CAP_WREXTEND 32 /* client can extend eof */
#define CEPH_CAP_LAZYIO 64 /* client can perform lazy io */
enum {
CEPH_CAP_OP_GRANT, /* mds->client grant */
CEPH_CAP_OP_ACK, /* client->mds ack (if prior grant was a recall) */
CEPH_CAP_OP_REQUEST, /* client->mds request (update wanted bits) */
CEPH_CAP_OP_RELEASE, /* mds->client release (*) */
CEPH_CAP_OP_EXPORT, /* mds has exported the cap */
CEPH_CAP_OP_IMPORT /* mds has imported the cap from specified mds */
@ -432,13 +440,12 @@ enum {
* if a concurrent open() would map to the same inode.
*/
struct ceph_mds_file_caps {
__le64 seq;
__le32 op;
__le32 seq;
__le32 caps, wanted;
__le64 ino;
__le64 size;
__le32 op;
__le32 migrate_mds;
__le32 migrate_seq;
__le32 migrate_mds, migrate_seq;
struct ceph_timeval mtime, atime;
} __attribute__ ((packed));

View File

@ -270,7 +270,7 @@ void ceph_dispatch(void *p, struct ceph_msg *msg)
ceph_mdsc_handle_forward(&client->mdsc, msg);
break;
case CEPH_MSG_CLIENT_FILECAPS:
ceph_handle_filecaps(&client->mdsc, msg);
ceph_mdsc_handle_filecaps(&client->mdsc, msg);
break;
/* osd client */

View File

@ -47,7 +47,9 @@ int ceph_open(struct inode *inode, struct file *file)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_inode_cap *cap;
struct ceph_file_info *fi;
struct ceph_file_info *cf;
int mode;
int wanted;
dout(5, "ceph_open inode %p (%lu) file %p\n", inode, inode->i_ino, file);
cap = ceph_find_cap(inode, 0);
@ -57,27 +59,41 @@ int ceph_open(struct inode *inode, struct file *file)
return PTR_ERR(cap);
}
fi = kzalloc(sizeof(*fi), GFP_KERNEL);
if (fi == NULL)
cf = kzalloc(sizeof(*cf), GFP_KERNEL);
if (cf == NULL)
return -ENOMEM;
file->private_data = fi;
file->private_data = cf;
atomic_inc(&ci->i_cap_count);
dout(5, "ceph_open success\n");
mode = ceph_file_mode(file->f_flags);
ci->i_nr_by_mode[mode]++;
wanted = ceph_caps_wanted(ci);
ci->i_cap_wanted |= wanted; /* FIXME this isn't quite right */
dout(5, "ceph_open success, %lx %p\n", inode->i_ino, ilookup(inode->i_sb, inode->i_ino));
return 0;
}
int ceph_release(struct inode *inode, struct file *filp)
int ceph_release(struct inode *inode, struct file *file)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_file_info *fi = filp->private_data;
dout(5, "ceph_release inode %p filp %p\n", inode, filp);
struct ceph_file_info *cf = file->private_data;
int mode, wanted;
dout(5, "ceph_release inode %p file %p\n", inode, file);
atomic_dec(&ci->i_cap_count);
if (cf->rinfo.reply)
ceph_mdsc_destroy_reply_info(&cf->rinfo);
kfree(cf);
if (fi->rinfo.reply)
ceph_mdsc_destroy_reply_info(&fi->rinfo);
kfree(fi);
mode = ceph_file_mode(file->f_flags);
ci->i_nr_by_mode[mode]--;
wanted = ceph_caps_wanted(ci);
dout(10, "mode %d wanted %d was %d\n", mode, wanted, ci->i_cap_wanted);
if (wanted != ci->i_cap_wanted)
ceph_mdsc_update_cap_wanted(ci, wanted);
return 0;
}

View File

@ -51,7 +51,10 @@ int ceph_fill_inode(struct inode *inode, struct ceph_mds_reply_inode *info)
ci->i_frag_map[0].mds = 0; // FIXME
ci->i_nr_caps = 0;
for (i=0; i<4; i++)
ci->i_nr_by_mode[i] = 0;
ci->i_cap_wanted = 0;
ci->i_wr_size = 0;
ci->i_wr_mtime.tv_sec = 0;
ci->i_wr_mtime.tv_nsec = 0;
@ -103,6 +106,16 @@ struct ceph_inode_cap *ceph_find_cap(struct inode *inode, int want)
return 0;
}
static struct ceph_inode_cap *get_cap_for_mds(struct inode *inode, int mds)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int i;
for (i=0; i<ci->i_nr_caps; i++)
if (ci->i_caps[i].mds == mds)
return &ci->i_caps[i];
return 0;
}
struct ceph_inode_cap *ceph_add_cap(struct inode *inode, int mds, u32 cap, u32 seq)
{
@ -151,40 +164,59 @@ int ceph_get_caps(struct ceph_inode_info *ci)
}
/* caps */
void ceph_handle_filecaps(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
/*
* 0 - ok
* 1 - send the msg back to mds
*/
int ceph_handle_cap_grant(struct inode *inode, struct ceph_mds_file_caps *grant, struct ceph_mds_session *session)
{
struct super_block *sb = mdsc->client->sb;
struct ceph_client *client = ceph_sbinfo(sb)->sb_client;
struct inode *inode;
struct ceph_mds_file_caps *h;
int mds = msg->hdr.src.name.num;
int op;
__u64 ino;
dout(10, "handle_filecaps from mds%d\n", mds);
/* decode */
if (msg->front.iov_len != sizeof(*h))
goto bad;
h = msg->front.iov_base;
op = le32_to_cpu(h->op);
ino = le64_to_cpu(h->ino);
struct ceph_inode_cap *cap;
struct ceph_inode_info *ci = ceph_inode(inode);
int mds = session->s_mds;
int seq = le32_to_cpu(grant->seq);
int newcaps;
/* lookup ino */
inode = ilookup(sb, ino);
dout(20, "op is %d, inode is %llx %p\n", op, ino, inode);
dout(10, "handle_cap_grant inode %p ci %p mds%d seq %d\n", inode, ci, mds, seq);
switch (op) {
/* unwanted? */
if (ceph_caps_wanted(ci) == 0) {
dout(10, "wanted=0, reminding mds\n");
grant->wanted = cpu_to_le32(0);
return 1; /* ack */
}
return;
bad:
dout(10, "corrupt filecaps message\n");
}
/* new cap? */
dout(10, "1\n");
cap = get_cap_for_mds(inode, mds);
dout(10, "2\n");
if (!cap) {
dout(10, "adding new cap inode %p for mds%d\n", inode, mds);
cap = ceph_add_cap(inode, mds, le32_to_cpu(grant->caps), le32_to_cpu(grant->seq));
return 0;
}
/* revocation? */
dout(10, "3\n");
newcaps = le32_to_cpu(grant->caps);
dout(10, "4\n");
if (cap->caps & ~newcaps) {
dout(10, "revocation: %d -> %d\n", cap->caps, newcaps);
/* FIXME FIXME FIXME DO STUFF HERE */
/* blindly ack for now: */
cap->caps = newcaps;
return 1; /* ack */
}
/* grant or no-op */
dout(10, "5\n");
if (cap->caps == newcaps) {
dout(10, "no-op: %d -> %d\n", cap->caps, newcaps);
} else {
dout(10, "grant: %d -> %d\n", cap->caps, newcaps);
cap->caps = newcaps;
}
return 0;
}

View File

@ -108,6 +108,7 @@ static void register_session(struct ceph_mds_client *mdsc, int mds)
mdsc->max_sessions = mds+1;
}
s = kmalloc(sizeof(struct ceph_mds_session), GFP_KERNEL);
s->s_mds = mds;
s->s_state = CEPH_MDS_SESSION_NEW;
s->s_cap_seq = 0;
INIT_LIST_HEAD(&s->s_caps);
@ -741,8 +742,8 @@ void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
ceph_encode_32(&p, end, session->s_nr_caps);
list_for_each(cp, &session->s_caps) {
cap = list_entry(cp, struct ceph_inode_cap, session_caps);
ceph_encode_32(&p, end, cap->ci->i_cap_wanted);
ceph_encode_32(&p, end, cap->ci->i_cap_issued);
ceph_encode_32(&p, end, ceph_caps_wanted(cap->ci));
ceph_encode_32(&p, end, ceph_caps_issued(cap->ci));
ceph_encode_64(&p, end, cap->ci->i_wr_size);
ceph_encode_timespec(&p, end, &cap->ci->vfs_inode.i_mtime); //i_wr_mtime
ceph_encode_timespec(&p, end, &cap->ci->vfs_inode.i_atime); /* atime.. fixme */
@ -888,6 +889,105 @@ bad2:
return;
}
/* caps */
void ceph_mdsc_handle_filecaps(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
{
struct super_block *sb = mdsc->client->sb;
struct ceph_client *client = ceph_sbinfo(sb)->sb_client;
struct ceph_mds_session *session;
struct inode *inode;
struct ceph_mds_file_caps *h;
int mds = msg->hdr.src.name.num;
int op;
__u64 ino;
dout(10, "handle_filecaps from mds%d\n", mds);
/* decode */
if (msg->front.iov_len != sizeof(*h))
goto bad;
h = msg->front.iov_base;
op = le32_to_cpu(h->op);
ino = le64_to_cpu(h->ino);
/* find session */
session = get_session(&client->mdsc, mds);
if (!session) {
dout(10, "WTF, got filecap msg but no session for mds%d\n", mds);
return;
}
session->s_cap_seq++;
/* lookup ino */
inode = ilookup(sb, ino);
dout(20, "op is %d, inode is %llx %p\n", op, ino, inode);
if (!inode) {
dout(10, "hrm, wtf, don't have inode?\n");
return;
}
switch (op) {
case CEPH_CAP_OP_GRANT:
if (ceph_handle_cap_grant(inode, h, session) == 1) {
dout(10, "sending reply back to mds%d\n", mds);
ceph_msg_get(msg);
send_msg_mds(mdsc, msg, mds);
}
break;
case CEPH_CAP_OP_EXPORT:
case CEPH_CAP_OP_IMPORT:
dout(10, "cap export/import -- IMPLEMENT ME\n");
break;
}
return;
bad:
dout(10, "corrupt filecaps message\n");
return;
}
int ceph_mdsc_update_cap_wanted(struct ceph_inode_info *ci, int wanted)
{
struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
struct ceph_mds_client *mdsc = &client->mdsc;
struct ceph_inode_cap *cap;
struct ceph_mds_session *session;
struct ceph_mds_file_caps *fc;
struct ceph_msg *msg;
int i;
dout(10, "update_cap_wanted %d -> %d\n", ci->i_cap_wanted, wanted);
for (i=0; i<ci->i_nr_caps; i++) {
cap = &ci->i_caps[i];
session = get_session(mdsc, cap->mds);
BUG_ON(!session);
msg = ceph_msg_new(CEPH_MSG_CLIENT_FILECAPS, sizeof(*fc), 0, 0, 0);
if (IS_ERR(msg))
return PTR_ERR(msg);
cap->caps &= wanted; /* drop caps we don't want */
fc = msg->front.iov_base;
fc->op = cpu_to_le32(CEPH_CAP_OP_ACK); /* misnomer */
fc->seq = cap->seq;
fc->caps = cap->caps;
fc->wanted = wanted;
fc->ino = cpu_to_le64(ci->vfs_inode.i_ino);
fc->size = cpu_to_le64(ci->vfs_inode.i_size);
send_msg_mds(mdsc, msg, cap->mds);
}
ci->i_cap_wanted = wanted;
return 0;
}
/* eof */

View File

@ -21,6 +21,7 @@ enum {
CEPH_MDS_SESSION_CLOSING = 4
};
struct ceph_mds_session {
int s_mds;
int s_state;
__u64 s_cap_seq; /* cap message count/seq from mds */
struct list_head s_caps;
@ -98,6 +99,10 @@ extern void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, struct ceph_m
extern void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg);
extern void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg);
extern void ceph_mdsc_handle_filecaps(struct ceph_mds_client *mdsc, struct ceph_msg *msg);
struct ceph_inode_info;
extern int ceph_mdsc_update_cap_wanted(struct ceph_inode_info *ci, int wanted);
extern struct ceph_msg *ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, ceph_ino_t ino1, const char *path1, ceph_ino_t ino2, const char *path2);
extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg,
struct ceph_mds_reply_info *rinfo, int mds);

View File

@ -425,6 +425,7 @@ static int open_root_inode(struct super_block *sb, struct ceph_mount_args *args)
int frommds;
int err;
struct ceph_inode_cap *cap;
struct ceph_inode_info *ci;
/* open dir */
dout(30, "open_root_inode opening '%s'\n", args->path);
@ -432,7 +433,7 @@ static int open_root_inode(struct super_block *sb, struct ceph_mount_args *args)
if (IS_ERR(req))
return PTR_ERR(req);
reqhead = req->front.iov_base;
reqhead->args.open.flags = 0;
reqhead->args.open.flags = O_DIRECTORY;
reqhead->args.open.mode = 0;
if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, -1)) < 0)
return err;
@ -454,6 +455,8 @@ static int open_root_inode(struct super_block *sb, struct ceph_mount_args *args)
err = PTR_ERR(cap);
goto out;
}
ci = ceph_inode(inode);
ci->i_nr_by_mode[FILE_MODE_PIN]++;
root = d_alloc_root(inode);
if (root == NULL) {

View File

@ -70,6 +70,13 @@ struct ceph_inode_frag_map_item {
};
#define STATIC_CAPS 2
enum {
FILE_MODE_PIN,
FILE_MODE_RDONLY,
FILE_MODE_RDWR,
FILE_MODE_WRONLY
};
struct ceph_inode_info {
struct ceph_file_layout i_layout;
@ -82,14 +89,42 @@ struct ceph_inode_info {
struct ceph_inode_cap i_caps_static[STATIC_CAPS];
atomic_t i_cap_count; /* ref count (e.g. from file*) */
int i_nr_by_mode[4];
int i_cap_wanted;
int i_cap_issued;
loff_t i_wr_size;
struct timespec i_wr_mtime;
struct inode vfs_inode; /* at end */
};
static inline int ceph_caps_issued(struct ceph_inode_info *ci) {
int i, issued = 0;
for (i=0; i<ci->i_nr_caps; i++)
issued |= ci->i_caps[i].caps;
return issued;
}
static inline int ceph_caps_wanted(struct ceph_inode_info *ci) {
int want = 0;
if (ci->i_nr_by_mode[0]) want |= CEPH_CAP_PIN;
if (ci->i_nr_by_mode[1]) want |= CEPH_CAP_RD|CEPH_CAP_RDCACHE;
if (ci->i_nr_by_mode[2]) want |= CEPH_CAP_RD|CEPH_CAP_RDCACHE|CEPH_CAP_WR|CEPH_CAP_WRBUFFER;
if (ci->i_nr_by_mode[3]) want |= CEPH_CAP_WR|CEPH_CAP_WRBUFFER;
return want;
}
static inline int ceph_file_mode(int flags)
{
if ((flags & O_DIRECTORY) == O_DIRECTORY)
return FILE_MODE_PIN;
if ((flags & O_RDWR) == O_RDWR)
return FILE_MODE_RDWR;
if ((flags & O_WRONLY) == O_WRONLY)
return FILE_MODE_WRONLY;
if ((flags & O_RDONLY) == O_RDONLY)
return FILE_MODE_RDONLY;
BUG_ON(1);
}
static inline struct ceph_inode_info *ceph_inode(struct inode *inode)
{
return list_entry(inode, struct ceph_inode_info, vfs_inode);
@ -139,8 +174,7 @@ extern struct ceph_inode_cap *ceph_find_cap(struct inode *inode, int want);
extern struct ceph_inode_cap *ceph_add_cap(struct inode *inode, int mds, u32 cap, u32 seq);
extern int ceph_inode_getattr(struct vfsmount *mnt, struct dentry *dentry,
struct kstat *stat);
extern void ceph_handle_filecaps(struct ceph_mds_client *mdsc, struct ceph_msg *msg);
extern int ceph_handle_cap_grant(struct inode *inode, struct ceph_mds_file_caps *grant, struct ceph_mds_session *session);
/* addr.c */
extern const struct address_space_operations ceph_aops;

View File

@ -448,7 +448,7 @@ public:
linklock.replicate_relax();
dirfragtreelock.replicate_relax();
if (get_caps_issued() & (CAP_FILE_WR|CAP_FILE_WRBUFFER) == 0)
if (get_caps_issued() & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER) == 0)
filelock.replicate_relax();
dirlock.replicate_relax();

View File

@ -24,15 +24,6 @@ using namespace std;
#include "config.h"
// definite caps
#define CAP_FILE_RDCACHE 1 // client can safely cache reads
#define CAP_FILE_RD 2 // client can read
#define CAP_FILE_WR 4 // client can write
#define CAP_FILE_WREXTEND 8 // client can extend file
#define CAP_FILE_WRBUFFER 16 // client can safely buffer writes
#define CAP_FILE_LAZYIO 32 // client can perform lazy io
// heuristics
//#define CAP_FILE_DELAYFLUSH 32
@ -40,12 +31,13 @@ inline string cap_string(int cap)
{
string s;
s = "[";
if (cap & CAP_FILE_RDCACHE) s += " rdcache";
if (cap & CAP_FILE_RD) s += " rd";
if (cap & CAP_FILE_WR) s += " wr";
if (cap & CAP_FILE_WRBUFFER) s += " wrbuffer";
if (cap & CAP_FILE_WRBUFFER) s += " wrextend";
if (cap & CAP_FILE_LAZYIO) s += " lazyio";
if (cap & CEPH_CAP_PIN) s += " pin";
if (cap & CEPH_CAP_RDCACHE) s += " rdcache";
if (cap & CEPH_CAP_RD) s += " rd";
if (cap & CEPH_CAP_WR) s += " wr";
if (cap & CEPH_CAP_WRBUFFER) s += " wrbuffer";
if (cap & CEPH_CAP_WRBUFFER) s += " wrextend";
if (cap & CEPH_CAP_LAZYIO) s += " lazyio";
s += " ]";
return s;
}
@ -126,17 +118,17 @@ public:
// needed
static int needed(int from) {
// strip out wrbuffer, rdcache
return from & (CAP_FILE_WR|CAP_FILE_RD);
return from & (CEPH_CAP_WR|CEPH_CAP_RD);
}
int needed() { return needed(wanted_caps); }
// conflicts
static int conflicts(int from) {
int c = 0;
if (from & CAP_FILE_WRBUFFER) c |= CAP_FILE_RDCACHE|CAP_FILE_RD;
if (from & CAP_FILE_WR) c |= CAP_FILE_RDCACHE;
if (from & CAP_FILE_RD) c |= CAP_FILE_WRBUFFER;
if (from & CAP_FILE_RDCACHE) c |= CAP_FILE_WRBUFFER|CAP_FILE_WR;
if (from & CEPH_CAP_WRBUFFER) c |= CEPH_CAP_RDCACHE|CEPH_CAP_RD;
if (from & CEPH_CAP_WR) c |= CEPH_CAP_RDCACHE;
if (from & CEPH_CAP_RD) c |= CEPH_CAP_WRBUFFER;
if (from & CEPH_CAP_RDCACHE) c |= CEPH_CAP_WRBUFFER|CEPH_CAP_WR;
return c;
}
int wanted_conflicts() { return conflicts(wanted()); }

View File

@ -159,52 +159,52 @@ class FileLock : public SimpleLock {
// client caps allowed
int caps_allowed_ever() {
if (parent->is_auth())
return CAP_FILE_RDCACHE | CAP_FILE_RD | CAP_FILE_WR | CAP_FILE_WREXTEND | CAP_FILE_WRBUFFER | CAP_FILE_LAZYIO;
return CEPH_CAP_RDCACHE | CEPH_CAP_RD | CEPH_CAP_WR | CEPH_CAP_WREXTEND | CEPH_CAP_WRBUFFER | CEPH_CAP_LAZYIO;
else
return CAP_FILE_RDCACHE | CAP_FILE_RD | CAP_FILE_LAZYIO;
return CEPH_CAP_RDCACHE | CEPH_CAP_RD | CEPH_CAP_LAZYIO;
}
int caps_allowed() {
if (parent->is_auth())
switch (state) {
case LOCK_SYNC:
return CAP_FILE_RDCACHE | CAP_FILE_RD | CAP_FILE_LAZYIO;
return CEPH_CAP_RDCACHE | CEPH_CAP_RD | CEPH_CAP_LAZYIO;
case LOCK_LOCK:
case LOCK_GLOCKR:
case LOCK_GLOCKL:
return CAP_FILE_RDCACHE;
return CEPH_CAP_RDCACHE;
case LOCK_GLOCKM:
return 0;
case LOCK_MIXED:
return CAP_FILE_RD | CAP_FILE_WR | CAP_FILE_WREXTEND | CAP_FILE_LAZYIO;
return CEPH_CAP_RD | CEPH_CAP_WR | CEPH_CAP_WREXTEND | CEPH_CAP_LAZYIO;
case LOCK_GMIXEDR:
return CAP_FILE_RD | CAP_FILE_LAZYIO;
return CEPH_CAP_RD | CEPH_CAP_LAZYIO;
case LOCK_GMIXEDL:
return 0;
case LOCK_LONER: // single client writer, of course.
return CAP_FILE_RDCACHE | CAP_FILE_RD | CAP_FILE_WR | CAP_FILE_WREXTEND | CAP_FILE_WRBUFFER | CAP_FILE_LAZYIO;
return CEPH_CAP_RDCACHE | CEPH_CAP_RD | CEPH_CAP_WR | CEPH_CAP_WREXTEND | CEPH_CAP_WRBUFFER | CEPH_CAP_LAZYIO;
case LOCK_GLONERR:
return CAP_FILE_RD | CAP_FILE_LAZYIO;
return CEPH_CAP_RD | CEPH_CAP_LAZYIO;
case LOCK_GLONERM:
return CAP_FILE_RD | CAP_FILE_WR | CAP_FILE_WREXTEND | CAP_FILE_LAZYIO;
return CEPH_CAP_RD | CEPH_CAP_WR | CEPH_CAP_WREXTEND | CEPH_CAP_LAZYIO;
case LOCK_GSYNCL:
return CAP_FILE_RDCACHE | CAP_FILE_LAZYIO;
return CEPH_CAP_RDCACHE | CEPH_CAP_LAZYIO;
case LOCK_GSYNCM:
return CAP_FILE_RD | CAP_FILE_LAZYIO;
return CEPH_CAP_RD | CEPH_CAP_LAZYIO;
}
else
switch (state) {
case LOCK_SYNC:
return CAP_FILE_RDCACHE | CAP_FILE_RD | CAP_FILE_LAZYIO;
return CEPH_CAP_RDCACHE | CEPH_CAP_RD | CEPH_CAP_LAZYIO;
case LOCK_LOCK:
case LOCK_GLOCKR:
return CAP_FILE_RDCACHE;
return CEPH_CAP_RDCACHE;
case LOCK_GMIXEDR:
case LOCK_MIXED:
return CAP_FILE_RD | CAP_FILE_LAZYIO;
return CEPH_CAP_RD | CEPH_CAP_LAZYIO;
}
assert(0);
return 0;

View File

@ -455,8 +455,8 @@ Capability* Locker::issue_new_caps(CInode *in,
// my needs
int my_client = req->get_client().num();
int my_want = 0;
if (mode & FILE_MODE_R) my_want |= CAP_FILE_RDCACHE | CAP_FILE_RD;
if (mode & FILE_MODE_W) my_want |= CAP_FILE_WRBUFFER | CAP_FILE_WR;
if (mode & FILE_MODE_R) my_want |= CEPH_CAP_RDCACHE | CEPH_CAP_RD;
if (mode & FILE_MODE_W) my_want |= CEPH_CAP_WRBUFFER | CEPH_CAP_WR;
// register a capability
Capability *cap = in->get_client_cap(my_client);
@ -498,14 +498,14 @@ Capability* Locker::issue_new_caps(CInode *in,
int now = cap->pending();
if (before != now &&
(before & CAP_FILE_WR) == 0 &&
(now & CAP_FILE_WR)) {
(before & CEPH_CAP_WR) == 0 &&
(now & CEPH_CAP_WR)) {
// FIXME FIXME FIXME
}
// twiddle file_data_version?
if ((before & CAP_FILE_WRBUFFER) == 0 &&
(now & CAP_FILE_WRBUFFER)) {
if ((before & CEPH_CAP_WRBUFFER) == 0 &&
(now & CEPH_CAP_WRBUFFER)) {
in->inode.file_data_version++;
dout(7) << " incrementing file_data_version, now " << in->inode.file_data_version << " for " << *in << dendl;
}
@ -538,8 +538,8 @@ bool Locker::issue_caps(CInode *in)
int after = it->second.pending();
// twiddle file_data_version?
if (!(before & CAP_FILE_WRBUFFER) &&
(after & CAP_FILE_WRBUFFER)) {
if (!(before & CEPH_CAP_WRBUFFER) &&
(after & CEPH_CAP_WRBUFFER)) {
dout(7) << " incrementing file_data_version for " << *in << dendl;
in->inode.file_data_version++;
}
@ -740,7 +740,7 @@ void Locker::handle_client_file_caps(MClientFileCaps *m)
in->inode.atime = m->get_atime();
}
if ((has|had) & CAP_FILE_WR) {
if ((has|had) & CEPH_CAP_WR) {
bool dirty = false;
// mtime
@ -2423,7 +2423,7 @@ void Locker::file_eval(FileLock *lock)
// * -> loner?
if (!lock->is_rdlocked() &&
!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
(wanted & CAP_FILE_WR) &&
(wanted & CEPH_CAP_WR) &&
loner &&
lock->get_state() != LOCK_LONER) {
dout(7) << "file_eval stable, bump to loner " << *lock << " on " << *lock->get_parent() << dendl;
@ -2433,8 +2433,8 @@ void Locker::file_eval(FileLock *lock)
// * -> mixed?
else if (!lock->is_rdlocked() &&
!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
(wanted & CAP_FILE_RD) &&
(wanted & CAP_FILE_WR) &&
(wanted & CEPH_CAP_RD) &&
(wanted & CEPH_CAP_WR) &&
!(loner && lock->get_state() == LOCK_LONER) &&
lock->get_state() != LOCK_MIXED) {
dout(7) << "file_eval stable, bump to mixed " << *lock << " on " << *lock->get_parent() << dendl;
@ -2443,8 +2443,8 @@ void Locker::file_eval(FileLock *lock)
// * -> sync?
else if (!in->filelock.is_waiter_for(SimpleLock::WAIT_WR) &&
!(wanted & (CAP_FILE_WR|CAP_FILE_WRBUFFER)) &&
((wanted & CAP_FILE_RD) ||
!(wanted & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) &&
((wanted & CEPH_CAP_RD) ||
in->is_replicated() ||
(!loner && lock->get_state() == LOCK_LONER)) &&
lock->get_state() != LOCK_SYNC) {
@ -2473,7 +2473,7 @@ bool Locker::file_sync(FileLock *lock)
int issued = in->get_caps_issued();
assert((in->get_caps_wanted() & CAP_FILE_WR) == 0);
assert((in->get_caps_wanted() & CEPH_CAP_WR) == 0);
if (lock->get_state() == LOCK_LOCK) {
if (in->is_replicated()) {
@ -2491,7 +2491,7 @@ bool Locker::file_sync(FileLock *lock)
else if (lock->get_state() == LOCK_MIXED) {
// writers?
if (issued & CAP_FILE_WR) {
if (issued & CEPH_CAP_WR) {
// gather client write caps
lock->set_state(LOCK_GSYNCM);
lock->get_parent()->auth_pin();
@ -2512,7 +2512,7 @@ bool Locker::file_sync(FileLock *lock)
else if (lock->get_state() == LOCK_LONER) {
// writers?
if (issued & CAP_FILE_WR) {
if (issued & CEPH_CAP_WR) {
// gather client write caps
lock->set_state(LOCK_GSYNCL);
lock->get_parent()->auth_pin();
@ -2601,7 +2601,7 @@ void Locker::file_lock(FileLock *lock)
}
else if (lock->get_state() == LOCK_LONER) {
if (issued & CAP_FILE_WR) {
if (issued & CEPH_CAP_WR) {
// change lock
lock->set_state(LOCK_GLOCKL);
lock->get_parent()->auth_pin();
@ -2664,7 +2664,7 @@ void Locker::file_mixed(FileLock *lock)
}
else if (lock->get_state() == LOCK_LONER) {
if (issued & CAP_FILE_WRBUFFER) {
if (issued & CEPH_CAP_WRBUFFER) {
// gather up WRBUFFER caps
lock->set_state(LOCK_GMIXEDL);
lock->get_parent()->auth_pin();
@ -2786,7 +2786,7 @@ void Locker::handle_file_lock(FileLock *lock, MLock *m)
lock->set_state(LOCK_GLOCKR);
// call back caps?
if (issued & CAP_FILE_RD) {
if (issued & CEPH_CAP_RD) {
dout(7) << "handle_file_lock client readers, gathering caps on " << *in << dendl;
issue_caps(in);
break;
@ -2811,7 +2811,7 @@ void Locker::handle_file_lock(FileLock *lock, MLock *m)
if (lock->get_state() == LOCK_SYNC) {
// MIXED
if (issued & CAP_FILE_RD) {
if (issued & CEPH_CAP_RD) {
// call back client caps
lock->set_state(LOCK_GMIXEDR);
issue_caps(in);

View File

@ -362,8 +362,8 @@ void Server::process_reconnected_caps()
int issued = in->get_caps_issued();
if (in->is_auth()) {
// wr?
if (issued & (CAP_FILE_WR|CAP_FILE_WRBUFFER)) {
if (issued & (CAP_FILE_RDCACHE|CAP_FILE_WRBUFFER)) {
if (issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) {
if (issued & (CEPH_CAP_RDCACHE|CEPH_CAP_WRBUFFER)) {
in->filelock.set_state(LOCK_LONER);
} else {
in->filelock.set_state(LOCK_MIXED);
@ -371,7 +371,7 @@ void Server::process_reconnected_caps()
}
} else {
// note that client should perform stale/reap cleanup during reconnect.
assert(issued & (CAP_FILE_WR|CAP_FILE_WRBUFFER) == 0); // ????
assert(issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER) == 0); // ????
if (in->filelock.is_xlocked())
in->filelock.set_state(LOCK_LOCK);
else
@ -3743,7 +3743,7 @@ void Server::handle_client_open(MDRequest *mdr)
return;
}
// can only open a dir rdonly, no flags.
if (cur->inode.is_dir() && (cmode != FILE_MODE_R || flags != 0)) {
if (cur->inode.is_dir() && (cmode != FILE_MODE_R || flags != O_DIRECTORY)) {
reply_request(mdr, -EINVAL);
return;
}

View File

@ -38,7 +38,7 @@ class MClientFileCaps : public Message {
public:
int get_caps() { return le32_to_cpu(h.caps); }
int get_wanted() { return le32_to_cpu(h.wanted); }
capseq_t get_seq() { return le64_to_cpu(h.seq); }
capseq_t get_seq() { return le32_to_cpu(h.seq); }
inodeno_t get_ino() { return le64_to_cpu(h.ino); }
__u64 get_size() { return le64_to_cpu(h.size); }
@ -70,12 +70,12 @@ class MClientFileCaps : public Message {
int mmds=0,
int mseq=0) :
Message(CEPH_MSG_CLIENT_FILECAPS) {
h.seq = cpu_to_le64(seq);
h.op = cpu_to_le32(op);
h.seq = cpu_to_le32(seq);
h.caps = cpu_to_le32(caps);
h.wanted = cpu_to_le32(wanted);
h.ino = cpu_to_le64(inode.ino);
h.size = cpu_to_le64(inode.size);
h.op = cpu_to_le32(op);
h.migrate_mds = cpu_to_le32(mmds);
h.migrate_seq = cpu_to_le32(mseq);
h.mtime = inode.mtime.tv_ref();
@ -84,8 +84,8 @@ class MClientFileCaps : public Message {
const char *get_type_name() { return "Cfcap";}
void print(ostream& out) {
out << "client_file_caps(" << le32_to_cpu(h.op)
<< " " << le64_to_cpu(h.ino)
out << "client_file_caps(" << get_opname(le32_to_cpu(h.op))
<< " ino " << inodeno_t(le64_to_cpu(h.ino))
<< " seq " << le32_to_cpu(h.seq)
<< " caps " << cap_string(le32_to_cpu(h.caps))
<< " wanted" << cap_string(le32_to_cpu(h.wanted))

View File

@ -6,4 +6,4 @@
./cosd --mkfs --osd 1 &
./cosd --mkfs --osd 2 &
./cosd --mkfs --osd 3 &
./cmds &
./cmds &