From baf11742f77e8baa288bb6c9d00fdf1c90bcdb2d Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 11 Jan 2008 17:15:03 -0800 Subject: [PATCH] lots of cap bits in the kenrel client; cap vars in ceph_fs.h, renamed --- src/client/Client.cc | 30 +++++----- src/client/Client.h | 10 ++-- src/client/FileCache.cc | 24 ++++---- src/client/FileCache.h | 4 +- src/include/ceph_fs.h | 15 +++-- src/kernel/client.c | 2 +- src/kernel/file.c | 40 +++++++++---- src/kernel/inode.c | 90 +++++++++++++++++++--------- src/kernel/mds_client.c | 104 ++++++++++++++++++++++++++++++++- src/kernel/mds_client.h | 5 ++ src/kernel/super.c | 5 +- src/kernel/super.h | 40 ++++++++++++- src/mds/CInode.h | 2 +- src/mds/Capability.h | 32 ++++------ src/mds/FileLock.h | 28 ++++----- src/mds/Locker.cc | 42 ++++++------- src/mds/Server.cc | 8 +-- src/messages/MClientFileCaps.h | 10 ++-- src/start.sh | 2 +- 19 files changed, 341 insertions(+), 152 deletions(-) diff --git a/src/client/Client.cc b/src/client/Client.cc index 5373eab245b..f7f5a4dae35 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -395,7 +395,7 @@ Inode* Client::insert_inode(Dir *dir, InodeStat *st, const string& dname) dn->inode->mask = st->mask; // or do we have newer size/mtime from writing? - if (dn->inode->file_caps() & CAP_FILE_WR) { + if (dn->inode->file_caps() & CEPH_CAP_WR) { if (dn->inode->file_wr_size > dn->inode->inode.size) dn->inode->inode.size = dn->inode->file_wr_size; if (dn->inode->file_wr_mtime > dn->inode->inode.mtime) @@ -1241,8 +1241,8 @@ void Client::handle_file_caps(MClientFileCaps *m) << " was " << cap_string(old_caps) << dendl; // did file size decrease? - if ((old_caps & (CAP_FILE_RD|CAP_FILE_WR)) == 0 && - (new_caps & (CAP_FILE_RD|CAP_FILE_WR)) != 0 && + if ((old_caps & (CEPH_CAP_RD|CEPH_CAP_WR)) == 0 && + (new_caps & (CEPH_CAP_RD|CEPH_CAP_WR)) != 0 && in->inode.size > (loff_t)m->get_size()) { dout(10) << "*** file size decreased from " << in->inode.size << " to " << m->get_size() << dendl; @@ -1284,7 +1284,7 @@ void Client::handle_file_caps(MClientFileCaps *m) // caching off. // wake up waiters? - if (new_caps & CAP_FILE_RD) { + if (new_caps & CEPH_CAP_RD) { for (list::iterator it = in->waitfor_read.begin(); it != in->waitfor_read.end(); it++) { @@ -1293,7 +1293,7 @@ void Client::handle_file_caps(MClientFileCaps *m) } in->waitfor_read.clear(); } - if (new_caps & CAP_FILE_WR) { + if (new_caps & CEPH_CAP_WR) { for (list::iterator it = in->waitfor_write.begin(); it != in->waitfor_write.end(); it++) { @@ -1302,7 +1302,7 @@ void Client::handle_file_caps(MClientFileCaps *m) } in->waitfor_write.clear(); } - if (new_caps & CAP_FILE_LAZYIO) { + if (new_caps & CEPH_CAP_LAZYIO) { for (list::iterator it = in->waitfor_lazy.begin(); it != in->waitfor_lazy.end(); it++) { @@ -2668,8 +2668,8 @@ void Client::close_release(Inode *in) in->fc.release_clean(); int retain = 0; - if (in->num_open_wr || in->fc.is_dirty()) retain |= CAP_FILE_WR | CAP_FILE_WRBUFFER | CAP_FILE_WREXTEND; - if (in->num_open_rd || in->fc.is_cached()) retain |= CAP_FILE_RD | CAP_FILE_RDCACHE; + if (in->num_open_wr || in->fc.is_dirty()) retain |= CEPH_CAP_WR | CEPH_CAP_WRBUFFER | CEPH_CAP_WREXTEND; + if (in->num_open_rd || in->fc.is_cached()) retain |= CEPH_CAP_RD | CEPH_CAP_RDCACHE; release_caps(in, retain); // release caps now. } @@ -2859,7 +2859,7 @@ int Client::_read(Fh *f, off_t offset, off_t size, bufferlist *bl) // determine whether read range overlaps with file // ...ONLY if we're doing async io - if (!lazy && (in->file_caps() & (CAP_FILE_WRBUFFER|CAP_FILE_RDCACHE))) { + if (!lazy && (in->file_caps() & (CEPH_CAP_WRBUFFER|CEPH_CAP_RDCACHE))) { // we're doing buffered i/o. make sure we're inside the file. // we can trust size info bc we get accurate info when buffering/caching caps are issued. dout(10) << "file size: " << in->inode.size << dendl; @@ -2904,14 +2904,14 @@ int Client::_read(Fh *f, off_t offset, off_t size, bufferlist *bl) // object cache OFF -- legacy inconsistent way. // do we have read file cap? - while (!lazy && (in->file_caps() & CAP_FILE_RD) == 0) { + while (!lazy && (in->file_caps() & CEPH_CAP_RD) == 0) { dout(7) << " don't have read cap, waiting" << dendl; Cond cond; in->waitfor_read.push_back(&cond); cond.Wait(client_lock); } // lazy cap? - while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) { + while (lazy && (in->file_caps() & CEPH_CAP_LAZYIO) == 0) { dout(7) << " don't have lazy cap, waiting" << dendl; Cond cond; in->waitfor_lazy.push_back(&cond); @@ -3027,13 +3027,13 @@ int Client::_write(Fh *f, off_t offset, off_t size, const char *buf) dout(7) << "synchronous write" << dendl; // do we have write file cap? - while (!lazy && (in->file_caps() & CAP_FILE_WR) == 0) { + while (!lazy && (in->file_caps() & CEPH_CAP_WR) == 0) { dout(7) << " don't have write cap, waiting" << dendl; Cond cond; in->waitfor_write.push_back(&cond); cond.Wait(client_lock); } - while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) { + while (lazy && (in->file_caps() & CEPH_CAP_LAZYIO) == 0) { dout(7) << " don't have lazy cap, waiting" << dendl; Cond cond; in->waitfor_lazy.push_back(&cond); @@ -3301,7 +3301,7 @@ int Client::lazyio_propogate(int fd, off_t offset, size_t count) if (f->mode & FILE_MODE_LAZY) { // wait for lazy cap - while ((in->file_caps() & CAP_FILE_LAZYIO) == 0) { + while ((in->file_caps() & CEPH_CAP_LAZYIO) == 0) { dout(7) << " don't have lazy cap, waiting" << dendl; Cond cond; in->waitfor_lazy.push_back(&cond); @@ -3337,7 +3337,7 @@ int Client::lazyio_synchronize(int fd, off_t offset, size_t count) if (f->mode & FILE_MODE_LAZY) { // wait for lazy cap - while ((in->file_caps() & CAP_FILE_LAZYIO) == 0) { + while ((in->file_caps() & CEPH_CAP_LAZYIO) == 0) { dout(7) << " don't have lazy cap, waiting" << dendl; Cond cond; in->waitfor_lazy.push_back(&cond); diff --git a/src/client/Client.h b/src/client/Client.h index ed0b38ed33d..7881c56e5ac 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -228,11 +228,11 @@ class Inode { int file_caps_wanted() { int w = 0; - if (num_open_rd) w |= CAP_FILE_RD|CAP_FILE_RDCACHE; - if (num_open_wr) w |= CAP_FILE_WR|CAP_FILE_WRBUFFER; - if (num_open_lazy) w |= CAP_FILE_LAZYIO; - if (fc.is_dirty()) w |= CAP_FILE_WRBUFFER; - if (fc.is_cached()) w |= CAP_FILE_RDCACHE; + if (num_open_rd) w |= CEPH_CAP_RD|CEPH_CAP_RDCACHE; + if (num_open_wr) w |= CEPH_CAP_WR|CEPH_CAP_WRBUFFER; + if (num_open_lazy) w |= CEPH_CAP_LAZYIO; + if (fc.is_dirty()) w |= CEPH_CAP_WRBUFFER; + if (fc.is_cached()) w |= CEPH_CAP_RDCACHE; return w; } diff --git a/src/client/FileCache.cc b/src/client/FileCache.cc index 1adec4aaabe..344148da2d6 100644 --- a/src/client/FileCache.cc +++ b/src/client/FileCache.cc @@ -123,10 +123,10 @@ void FileCache::set_caps(int caps, Context *onimplement) int FileCache::get_used_caps() { int used = 0; - if (num_reading) used |= CAP_FILE_RD; - if (oc->set_is_cached(inode.ino)) used |= CAP_FILE_RDCACHE; - if (num_writing) used |= CAP_FILE_WR; - if (oc->set_is_dirty_or_committing(inode.ino)) used |= CAP_FILE_WRBUFFER; + if (num_reading) used |= CEPH_CAP_RD; + if (oc->set_is_cached(inode.ino)) used |= CEPH_CAP_RDCACHE; + if (num_writing) used |= CEPH_CAP_WR; + if (oc->set_is_dirty_or_committing(inode.ino)) used |= CEPH_CAP_WRBUFFER; return used; } @@ -138,11 +138,11 @@ void FileCache::check_caps() // try to implement caps? // BUG? latest_caps, not least caps i've seen? - if ((latest_caps & CAP_FILE_RDCACHE) == 0 && - (used & CAP_FILE_RDCACHE)) + if ((latest_caps & CEPH_CAP_RDCACHE) == 0 && + (used & CEPH_CAP_RDCACHE)) release_clean(); - if ((latest_caps & CAP_FILE_WRBUFFER) == 0 && - (used & CAP_FILE_WRBUFFER)) + if ((latest_caps & CEPH_CAP_WRBUFFER) == 0 && + (used & CEPH_CAP_WRBUFFER)) flush_dirty(new C_FC_CheckCaps(this)); used = get_used_caps(); @@ -176,7 +176,7 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_ int r = 0; // can i read? - while ((latest_caps & CAP_FILE_RD) == 0) { + while ((latest_caps & CEPH_CAP_RD) == 0) { dout(10) << "read doesn't have RD cap, blocking" << dendl; Cond c; waitfor_read.insert(&c); @@ -187,7 +187,7 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_ // inc reading counter num_reading++; - if (latest_caps & CAP_FILE_RDCACHE) { + if (latest_caps & CEPH_CAP_RDCACHE) { // read (and block) Cond cond; bool done = false; @@ -221,7 +221,7 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_ void FileCache::write(off_t offset, size_t size, bufferlist& blist, Mutex& client_lock) { // can i write - while ((latest_caps & CAP_FILE_WR) == 0) { + while ((latest_caps & CEPH_CAP_WR) == 0) { dout(10) << "write doesn't have WR cap, blocking" << dendl; Cond c; waitfor_write.insert(&c); @@ -233,7 +233,7 @@ void FileCache::write(off_t offset, size_t size, bufferlist& blist, Mutex& clien num_writing++; if (size > 0) { - if (latest_caps & CAP_FILE_WRBUFFER) { // caps buffered write? + if (latest_caps & CEPH_CAP_WRBUFFER) { // caps buffered write? // wait? (this may block!) oc->wait_for_write(size, client_lock); diff --git a/src/client/FileCache.h b/src/client/FileCache.h index 8d6e08146b5..03322fb929a 100644 --- a/src/client/FileCache.h +++ b/src/client/FileCache.h @@ -53,8 +53,8 @@ class FileCache { } // waiters/waiting - bool can_read() { return latest_caps & CAP_FILE_RD; } - bool can_write() { return latest_caps & CAP_FILE_WR; } + bool can_read() { return latest_caps & CEPH_CAP_RD; } + bool can_write() { return latest_caps & CEPH_CAP_WR; } bool all_safe();// { return num_unsafe == 0; } void add_safe_waiter(Context *c); diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index 9be58ffa401..227fa22b927 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -419,9 +419,17 @@ struct ceph_mds_reply_dirfrag { } __attribute__ ((packed)); /* client file caps */ +#define CEPH_CAP_PIN 1 /* no specific capabilities beyond the pin */ +#define CEPH_CAP_RDCACHE 2 /* client can cache reads */ +#define CEPH_CAP_RD 4 /* client can read */ +#define CEPH_CAP_WR 8 /* client can write */ +#define CEPH_CAP_WRBUFFER 16 /* client can buffer writes */ +#define CEPH_CAP_WREXTEND 32 /* client can extend eof */ +#define CEPH_CAP_LAZYIO 64 /* client can perform lazy io */ enum { CEPH_CAP_OP_GRANT, /* mds->client grant */ CEPH_CAP_OP_ACK, /* client->mds ack (if prior grant was a recall) */ + CEPH_CAP_OP_REQUEST, /* client->mds request (update wanted bits) */ CEPH_CAP_OP_RELEASE, /* mds->client release (*) */ CEPH_CAP_OP_EXPORT, /* mds has exported the cap */ CEPH_CAP_OP_IMPORT /* mds has imported the cap from specified mds */ @@ -432,13 +440,12 @@ enum { * if a concurrent open() would map to the same inode. */ struct ceph_mds_file_caps { - __le64 seq; + __le32 op; + __le32 seq; __le32 caps, wanted; __le64 ino; __le64 size; - __le32 op; - __le32 migrate_mds; - __le32 migrate_seq; + __le32 migrate_mds, migrate_seq; struct ceph_timeval mtime, atime; } __attribute__ ((packed)); diff --git a/src/kernel/client.c b/src/kernel/client.c index db8a0464664..0f03a88eae0 100644 --- a/src/kernel/client.c +++ b/src/kernel/client.c @@ -270,7 +270,7 @@ void ceph_dispatch(void *p, struct ceph_msg *msg) ceph_mdsc_handle_forward(&client->mdsc, msg); break; case CEPH_MSG_CLIENT_FILECAPS: - ceph_handle_filecaps(&client->mdsc, msg); + ceph_mdsc_handle_filecaps(&client->mdsc, msg); break; /* osd client */ diff --git a/src/kernel/file.c b/src/kernel/file.c index 4ee8a6735ea..fd5fee85fee 100644 --- a/src/kernel/file.c +++ b/src/kernel/file.c @@ -47,7 +47,9 @@ int ceph_open(struct inode *inode, struct file *file) { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_cap *cap; - struct ceph_file_info *fi; + struct ceph_file_info *cf; + int mode; + int wanted; dout(5, "ceph_open inode %p (%lu) file %p\n", inode, inode->i_ino, file); cap = ceph_find_cap(inode, 0); @@ -57,27 +59,41 @@ int ceph_open(struct inode *inode, struct file *file) return PTR_ERR(cap); } - fi = kzalloc(sizeof(*fi), GFP_KERNEL); - if (fi == NULL) + cf = kzalloc(sizeof(*cf), GFP_KERNEL); + if (cf == NULL) return -ENOMEM; - file->private_data = fi; + file->private_data = cf; atomic_inc(&ci->i_cap_count); - dout(5, "ceph_open success\n"); + + mode = ceph_file_mode(file->f_flags); + ci->i_nr_by_mode[mode]++; + wanted = ceph_caps_wanted(ci); + ci->i_cap_wanted |= wanted; /* FIXME this isn't quite right */ + + dout(5, "ceph_open success, %lx %p\n", inode->i_ino, ilookup(inode->i_sb, inode->i_ino)); return 0; } -int ceph_release(struct inode *inode, struct file *filp) +int ceph_release(struct inode *inode, struct file *file) { struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_file_info *fi = filp->private_data; - - dout(5, "ceph_release inode %p filp %p\n", inode, filp); + struct ceph_file_info *cf = file->private_data; + int mode, wanted; + + dout(5, "ceph_release inode %p file %p\n", inode, file); atomic_dec(&ci->i_cap_count); + + if (cf->rinfo.reply) + ceph_mdsc_destroy_reply_info(&cf->rinfo); + kfree(cf); - if (fi->rinfo.reply) - ceph_mdsc_destroy_reply_info(&fi->rinfo); - kfree(fi); + mode = ceph_file_mode(file->f_flags); + ci->i_nr_by_mode[mode]--; + wanted = ceph_caps_wanted(ci); + dout(10, "mode %d wanted %d was %d\n", mode, wanted, ci->i_cap_wanted); + if (wanted != ci->i_cap_wanted) + ceph_mdsc_update_cap_wanted(ci, wanted); return 0; } diff --git a/src/kernel/inode.c b/src/kernel/inode.c index 3387afb9419..c6da9543f5d 100644 --- a/src/kernel/inode.c +++ b/src/kernel/inode.c @@ -51,7 +51,10 @@ int ceph_fill_inode(struct inode *inode, struct ceph_mds_reply_inode *info) ci->i_frag_map[0].mds = 0; // FIXME ci->i_nr_caps = 0; - + for (i=0; i<4; i++) + ci->i_nr_by_mode[i] = 0; + ci->i_cap_wanted = 0; + ci->i_wr_size = 0; ci->i_wr_mtime.tv_sec = 0; ci->i_wr_mtime.tv_nsec = 0; @@ -103,6 +106,16 @@ struct ceph_inode_cap *ceph_find_cap(struct inode *inode, int want) return 0; } +static struct ceph_inode_cap *get_cap_for_mds(struct inode *inode, int mds) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + int i; + for (i=0; ii_nr_caps; i++) + if (ci->i_caps[i].mds == mds) + return &ci->i_caps[i]; + return 0; +} + struct ceph_inode_cap *ceph_add_cap(struct inode *inode, int mds, u32 cap, u32 seq) { @@ -151,40 +164,59 @@ int ceph_get_caps(struct ceph_inode_info *ci) } -/* caps */ - -void ceph_handle_filecaps(struct ceph_mds_client *mdsc, struct ceph_msg *msg) +/* + * 0 - ok + * 1 - send the msg back to mds + */ +int ceph_handle_cap_grant(struct inode *inode, struct ceph_mds_file_caps *grant, struct ceph_mds_session *session) { - struct super_block *sb = mdsc->client->sb; - struct ceph_client *client = ceph_sbinfo(sb)->sb_client; - struct inode *inode; - struct ceph_mds_file_caps *h; - int mds = msg->hdr.src.name.num; - int op; - __u64 ino; - - dout(10, "handle_filecaps from mds%d\n", mds); - - /* decode */ - if (msg->front.iov_len != sizeof(*h)) - goto bad; - h = msg->front.iov_base; - op = le32_to_cpu(h->op); - ino = le64_to_cpu(h->ino); + struct ceph_inode_cap *cap; + struct ceph_inode_info *ci = ceph_inode(inode); + int mds = session->s_mds; + int seq = le32_to_cpu(grant->seq); + int newcaps; - /* lookup ino */ - inode = ilookup(sb, ino); - dout(20, "op is %d, inode is %llx %p\n", op, ino, inode); + dout(10, "handle_cap_grant inode %p ci %p mds%d seq %d\n", inode, ci, mds, seq); - switch (op) { - + /* unwanted? */ + if (ceph_caps_wanted(ci) == 0) { + dout(10, "wanted=0, reminding mds\n"); + grant->wanted = cpu_to_le32(0); + return 1; /* ack */ } - return; -bad: - dout(10, "corrupt filecaps message\n"); -} + /* new cap? */ + dout(10, "1\n"); + cap = get_cap_for_mds(inode, mds); + dout(10, "2\n"); + if (!cap) { + dout(10, "adding new cap inode %p for mds%d\n", inode, mds); + cap = ceph_add_cap(inode, mds, le32_to_cpu(grant->caps), le32_to_cpu(grant->seq)); + return 0; + } + /* revocation? */ + dout(10, "3\n"); + newcaps = le32_to_cpu(grant->caps); + dout(10, "4\n"); + if (cap->caps & ~newcaps) { + dout(10, "revocation: %d -> %d\n", cap->caps, newcaps); + /* FIXME FIXME FIXME DO STUFF HERE */ + /* blindly ack for now: */ + cap->caps = newcaps; + return 1; /* ack */ + } + + /* grant or no-op */ + dout(10, "5\n"); + if (cap->caps == newcaps) { + dout(10, "no-op: %d -> %d\n", cap->caps, newcaps); + } else { + dout(10, "grant: %d -> %d\n", cap->caps, newcaps); + cap->caps = newcaps; + } + return 0; +} diff --git a/src/kernel/mds_client.c b/src/kernel/mds_client.c index 235e31154d1..ce207d479c7 100644 --- a/src/kernel/mds_client.c +++ b/src/kernel/mds_client.c @@ -108,6 +108,7 @@ static void register_session(struct ceph_mds_client *mdsc, int mds) mdsc->max_sessions = mds+1; } s = kmalloc(sizeof(struct ceph_mds_session), GFP_KERNEL); + s->s_mds = mds; s->s_state = CEPH_MDS_SESSION_NEW; s->s_cap_seq = 0; INIT_LIST_HEAD(&s->s_caps); @@ -741,8 +742,8 @@ void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds) ceph_encode_32(&p, end, session->s_nr_caps); list_for_each(cp, &session->s_caps) { cap = list_entry(cp, struct ceph_inode_cap, session_caps); - ceph_encode_32(&p, end, cap->ci->i_cap_wanted); - ceph_encode_32(&p, end, cap->ci->i_cap_issued); + ceph_encode_32(&p, end, ceph_caps_wanted(cap->ci)); + ceph_encode_32(&p, end, ceph_caps_issued(cap->ci)); ceph_encode_64(&p, end, cap->ci->i_wr_size); ceph_encode_timespec(&p, end, &cap->ci->vfs_inode.i_mtime); //i_wr_mtime ceph_encode_timespec(&p, end, &cap->ci->vfs_inode.i_atime); /* atime.. fixme */ @@ -888,6 +889,105 @@ bad2: return; } +/* caps */ + +void ceph_mdsc_handle_filecaps(struct ceph_mds_client *mdsc, struct ceph_msg *msg) +{ + struct super_block *sb = mdsc->client->sb; + struct ceph_client *client = ceph_sbinfo(sb)->sb_client; + struct ceph_mds_session *session; + struct inode *inode; + struct ceph_mds_file_caps *h; + int mds = msg->hdr.src.name.num; + int op; + __u64 ino; + + dout(10, "handle_filecaps from mds%d\n", mds); + + /* decode */ + if (msg->front.iov_len != sizeof(*h)) + goto bad; + h = msg->front.iov_base; + op = le32_to_cpu(h->op); + ino = le64_to_cpu(h->ino); + + /* find session */ + session = get_session(&client->mdsc, mds); + if (!session) { + dout(10, "WTF, got filecap msg but no session for mds%d\n", mds); + return; + } + session->s_cap_seq++; + + /* lookup ino */ + inode = ilookup(sb, ino); + dout(20, "op is %d, inode is %llx %p\n", op, ino, inode); + if (!inode) { + dout(10, "hrm, wtf, don't have inode?\n"); + return; + } + + switch (op) { + case CEPH_CAP_OP_GRANT: + if (ceph_handle_cap_grant(inode, h, session) == 1) { + dout(10, "sending reply back to mds%d\n", mds); + ceph_msg_get(msg); + send_msg_mds(mdsc, msg, mds); + } + break; + + case CEPH_CAP_OP_EXPORT: + case CEPH_CAP_OP_IMPORT: + dout(10, "cap export/import -- IMPLEMENT ME\n"); + break; + } + + return; +bad: + dout(10, "corrupt filecaps message\n"); + return; +} + +int ceph_mdsc_update_cap_wanted(struct ceph_inode_info *ci, int wanted) +{ + struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode); + struct ceph_mds_client *mdsc = &client->mdsc; + struct ceph_inode_cap *cap; + struct ceph_mds_session *session; + struct ceph_mds_file_caps *fc; + struct ceph_msg *msg; + int i; + + dout(10, "update_cap_wanted %d -> %d\n", ci->i_cap_wanted, wanted); + + for (i=0; ii_nr_caps; i++) { + cap = &ci->i_caps[i]; + + session = get_session(mdsc, cap->mds); + BUG_ON(!session); + + msg = ceph_msg_new(CEPH_MSG_CLIENT_FILECAPS, sizeof(*fc), 0, 0, 0); + if (IS_ERR(msg)) + return PTR_ERR(msg); + + cap->caps &= wanted; /* drop caps we don't want */ + + fc = msg->front.iov_base; + fc->op = cpu_to_le32(CEPH_CAP_OP_ACK); /* misnomer */ + fc->seq = cap->seq; + fc->caps = cap->caps; + fc->wanted = wanted; + fc->ino = cpu_to_le64(ci->vfs_inode.i_ino); + fc->size = cpu_to_le64(ci->vfs_inode.i_size); + + send_msg_mds(mdsc, msg, cap->mds); + } + + ci->i_cap_wanted = wanted; + return 0; +} + + /* eof */ diff --git a/src/kernel/mds_client.h b/src/kernel/mds_client.h index 70ea1ec3c02..d4943738eb7 100644 --- a/src/kernel/mds_client.h +++ b/src/kernel/mds_client.h @@ -21,6 +21,7 @@ enum { CEPH_MDS_SESSION_CLOSING = 4 }; struct ceph_mds_session { + int s_mds; int s_state; __u64 s_cap_seq; /* cap message count/seq from mds */ struct list_head s_caps; @@ -98,6 +99,10 @@ extern void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, struct ceph_m extern void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg); extern void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg); +extern void ceph_mdsc_handle_filecaps(struct ceph_mds_client *mdsc, struct ceph_msg *msg); +struct ceph_inode_info; +extern int ceph_mdsc_update_cap_wanted(struct ceph_inode_info *ci, int wanted); + extern struct ceph_msg *ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, ceph_ino_t ino1, const char *path1, ceph_ino_t ino2, const char *path2); extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg, struct ceph_mds_reply_info *rinfo, int mds); diff --git a/src/kernel/super.c b/src/kernel/super.c index 481e8895f0e..6f700229704 100644 --- a/src/kernel/super.c +++ b/src/kernel/super.c @@ -425,6 +425,7 @@ static int open_root_inode(struct super_block *sb, struct ceph_mount_args *args) int frommds; int err; struct ceph_inode_cap *cap; + struct ceph_inode_info *ci; /* open dir */ dout(30, "open_root_inode opening '%s'\n", args->path); @@ -432,7 +433,7 @@ static int open_root_inode(struct super_block *sb, struct ceph_mount_args *args) if (IS_ERR(req)) return PTR_ERR(req); reqhead = req->front.iov_base; - reqhead->args.open.flags = 0; + reqhead->args.open.flags = O_DIRECTORY; reqhead->args.open.mode = 0; if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, -1)) < 0) return err; @@ -454,6 +455,8 @@ static int open_root_inode(struct super_block *sb, struct ceph_mount_args *args) err = PTR_ERR(cap); goto out; } + ci = ceph_inode(inode); + ci->i_nr_by_mode[FILE_MODE_PIN]++; root = d_alloc_root(inode); if (root == NULL) { diff --git a/src/kernel/super.h b/src/kernel/super.h index 9034b772af4..a3ac19c9a3a 100644 --- a/src/kernel/super.h +++ b/src/kernel/super.h @@ -70,6 +70,13 @@ struct ceph_inode_frag_map_item { }; #define STATIC_CAPS 2 + +enum { + FILE_MODE_PIN, + FILE_MODE_RDONLY, + FILE_MODE_RDWR, + FILE_MODE_WRONLY +}; struct ceph_inode_info { struct ceph_file_layout i_layout; @@ -82,14 +89,42 @@ struct ceph_inode_info { struct ceph_inode_cap i_caps_static[STATIC_CAPS]; atomic_t i_cap_count; /* ref count (e.g. from file*) */ + int i_nr_by_mode[4]; int i_cap_wanted; - int i_cap_issued; loff_t i_wr_size; struct timespec i_wr_mtime; struct inode vfs_inode; /* at end */ }; +static inline int ceph_caps_issued(struct ceph_inode_info *ci) { + int i, issued = 0; + for (i=0; ii_nr_caps; i++) + issued |= ci->i_caps[i].caps; + return issued; +} + +static inline int ceph_caps_wanted(struct ceph_inode_info *ci) { + int want = 0; + if (ci->i_nr_by_mode[0]) want |= CEPH_CAP_PIN; + if (ci->i_nr_by_mode[1]) want |= CEPH_CAP_RD|CEPH_CAP_RDCACHE; + if (ci->i_nr_by_mode[2]) want |= CEPH_CAP_RD|CEPH_CAP_RDCACHE|CEPH_CAP_WR|CEPH_CAP_WRBUFFER; + if (ci->i_nr_by_mode[3]) want |= CEPH_CAP_WR|CEPH_CAP_WRBUFFER; + return want; +} +static inline int ceph_file_mode(int flags) +{ + if ((flags & O_DIRECTORY) == O_DIRECTORY) + return FILE_MODE_PIN; + if ((flags & O_RDWR) == O_RDWR) + return FILE_MODE_RDWR; + if ((flags & O_WRONLY) == O_WRONLY) + return FILE_MODE_WRONLY; + if ((flags & O_RDONLY) == O_RDONLY) + return FILE_MODE_RDONLY; + BUG_ON(1); +} + static inline struct ceph_inode_info *ceph_inode(struct inode *inode) { return list_entry(inode, struct ceph_inode_info, vfs_inode); @@ -139,8 +174,7 @@ extern struct ceph_inode_cap *ceph_find_cap(struct inode *inode, int want); extern struct ceph_inode_cap *ceph_add_cap(struct inode *inode, int mds, u32 cap, u32 seq); extern int ceph_inode_getattr(struct vfsmount *mnt, struct dentry *dentry, struct kstat *stat); -extern void ceph_handle_filecaps(struct ceph_mds_client *mdsc, struct ceph_msg *msg); - +extern int ceph_handle_cap_grant(struct inode *inode, struct ceph_mds_file_caps *grant, struct ceph_mds_session *session); /* addr.c */ extern const struct address_space_operations ceph_aops; diff --git a/src/mds/CInode.h b/src/mds/CInode.h index b6da550a8d7..d55a35b360c 100644 --- a/src/mds/CInode.h +++ b/src/mds/CInode.h @@ -448,7 +448,7 @@ public: linklock.replicate_relax(); dirfragtreelock.replicate_relax(); - if (get_caps_issued() & (CAP_FILE_WR|CAP_FILE_WRBUFFER) == 0) + if (get_caps_issued() & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER) == 0) filelock.replicate_relax(); dirlock.replicate_relax(); diff --git a/src/mds/Capability.h b/src/mds/Capability.h index d7619d13ca1..25b421661b0 100644 --- a/src/mds/Capability.h +++ b/src/mds/Capability.h @@ -24,15 +24,6 @@ using namespace std; #include "config.h" -// definite caps -#define CAP_FILE_RDCACHE 1 // client can safely cache reads -#define CAP_FILE_RD 2 // client can read -#define CAP_FILE_WR 4 // client can write -#define CAP_FILE_WREXTEND 8 // client can extend file -#define CAP_FILE_WRBUFFER 16 // client can safely buffer writes -#define CAP_FILE_LAZYIO 32 // client can perform lazy io - - // heuristics //#define CAP_FILE_DELAYFLUSH 32 @@ -40,12 +31,13 @@ inline string cap_string(int cap) { string s; s = "["; - if (cap & CAP_FILE_RDCACHE) s += " rdcache"; - if (cap & CAP_FILE_RD) s += " rd"; - if (cap & CAP_FILE_WR) s += " wr"; - if (cap & CAP_FILE_WRBUFFER) s += " wrbuffer"; - if (cap & CAP_FILE_WRBUFFER) s += " wrextend"; - if (cap & CAP_FILE_LAZYIO) s += " lazyio"; + if (cap & CEPH_CAP_PIN) s += " pin"; + if (cap & CEPH_CAP_RDCACHE) s += " rdcache"; + if (cap & CEPH_CAP_RD) s += " rd"; + if (cap & CEPH_CAP_WR) s += " wr"; + if (cap & CEPH_CAP_WRBUFFER) s += " wrbuffer"; + if (cap & CEPH_CAP_WRBUFFER) s += " wrextend"; + if (cap & CEPH_CAP_LAZYIO) s += " lazyio"; s += " ]"; return s; } @@ -126,17 +118,17 @@ public: // needed static int needed(int from) { // strip out wrbuffer, rdcache - return from & (CAP_FILE_WR|CAP_FILE_RD); + return from & (CEPH_CAP_WR|CEPH_CAP_RD); } int needed() { return needed(wanted_caps); } // conflicts static int conflicts(int from) { int c = 0; - if (from & CAP_FILE_WRBUFFER) c |= CAP_FILE_RDCACHE|CAP_FILE_RD; - if (from & CAP_FILE_WR) c |= CAP_FILE_RDCACHE; - if (from & CAP_FILE_RD) c |= CAP_FILE_WRBUFFER; - if (from & CAP_FILE_RDCACHE) c |= CAP_FILE_WRBUFFER|CAP_FILE_WR; + if (from & CEPH_CAP_WRBUFFER) c |= CEPH_CAP_RDCACHE|CEPH_CAP_RD; + if (from & CEPH_CAP_WR) c |= CEPH_CAP_RDCACHE; + if (from & CEPH_CAP_RD) c |= CEPH_CAP_WRBUFFER; + if (from & CEPH_CAP_RDCACHE) c |= CEPH_CAP_WRBUFFER|CEPH_CAP_WR; return c; } int wanted_conflicts() { return conflicts(wanted()); } diff --git a/src/mds/FileLock.h b/src/mds/FileLock.h index 09868f7563f..b2e409198bf 100644 --- a/src/mds/FileLock.h +++ b/src/mds/FileLock.h @@ -159,52 +159,52 @@ class FileLock : public SimpleLock { // client caps allowed int caps_allowed_ever() { if (parent->is_auth()) - return CAP_FILE_RDCACHE | CAP_FILE_RD | CAP_FILE_WR | CAP_FILE_WREXTEND | CAP_FILE_WRBUFFER | CAP_FILE_LAZYIO; + return CEPH_CAP_RDCACHE | CEPH_CAP_RD | CEPH_CAP_WR | CEPH_CAP_WREXTEND | CEPH_CAP_WRBUFFER | CEPH_CAP_LAZYIO; else - return CAP_FILE_RDCACHE | CAP_FILE_RD | CAP_FILE_LAZYIO; + return CEPH_CAP_RDCACHE | CEPH_CAP_RD | CEPH_CAP_LAZYIO; } int caps_allowed() { if (parent->is_auth()) switch (state) { case LOCK_SYNC: - return CAP_FILE_RDCACHE | CAP_FILE_RD | CAP_FILE_LAZYIO; + return CEPH_CAP_RDCACHE | CEPH_CAP_RD | CEPH_CAP_LAZYIO; case LOCK_LOCK: case LOCK_GLOCKR: case LOCK_GLOCKL: - return CAP_FILE_RDCACHE; + return CEPH_CAP_RDCACHE; case LOCK_GLOCKM: return 0; case LOCK_MIXED: - return CAP_FILE_RD | CAP_FILE_WR | CAP_FILE_WREXTEND | CAP_FILE_LAZYIO; + return CEPH_CAP_RD | CEPH_CAP_WR | CEPH_CAP_WREXTEND | CEPH_CAP_LAZYIO; case LOCK_GMIXEDR: - return CAP_FILE_RD | CAP_FILE_LAZYIO; + return CEPH_CAP_RD | CEPH_CAP_LAZYIO; case LOCK_GMIXEDL: return 0; case LOCK_LONER: // single client writer, of course. - return CAP_FILE_RDCACHE | CAP_FILE_RD | CAP_FILE_WR | CAP_FILE_WREXTEND | CAP_FILE_WRBUFFER | CAP_FILE_LAZYIO; + return CEPH_CAP_RDCACHE | CEPH_CAP_RD | CEPH_CAP_WR | CEPH_CAP_WREXTEND | CEPH_CAP_WRBUFFER | CEPH_CAP_LAZYIO; case LOCK_GLONERR: - return CAP_FILE_RD | CAP_FILE_LAZYIO; + return CEPH_CAP_RD | CEPH_CAP_LAZYIO; case LOCK_GLONERM: - return CAP_FILE_RD | CAP_FILE_WR | CAP_FILE_WREXTEND | CAP_FILE_LAZYIO; + return CEPH_CAP_RD | CEPH_CAP_WR | CEPH_CAP_WREXTEND | CEPH_CAP_LAZYIO; case LOCK_GSYNCL: - return CAP_FILE_RDCACHE | CAP_FILE_LAZYIO; + return CEPH_CAP_RDCACHE | CEPH_CAP_LAZYIO; case LOCK_GSYNCM: - return CAP_FILE_RD | CAP_FILE_LAZYIO; + return CEPH_CAP_RD | CEPH_CAP_LAZYIO; } else switch (state) { case LOCK_SYNC: - return CAP_FILE_RDCACHE | CAP_FILE_RD | CAP_FILE_LAZYIO; + return CEPH_CAP_RDCACHE | CEPH_CAP_RD | CEPH_CAP_LAZYIO; case LOCK_LOCK: case LOCK_GLOCKR: - return CAP_FILE_RDCACHE; + return CEPH_CAP_RDCACHE; case LOCK_GMIXEDR: case LOCK_MIXED: - return CAP_FILE_RD | CAP_FILE_LAZYIO; + return CEPH_CAP_RD | CEPH_CAP_LAZYIO; } assert(0); return 0; diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc index b1da8857672..54b18536387 100644 --- a/src/mds/Locker.cc +++ b/src/mds/Locker.cc @@ -455,8 +455,8 @@ Capability* Locker::issue_new_caps(CInode *in, // my needs int my_client = req->get_client().num(); int my_want = 0; - if (mode & FILE_MODE_R) my_want |= CAP_FILE_RDCACHE | CAP_FILE_RD; - if (mode & FILE_MODE_W) my_want |= CAP_FILE_WRBUFFER | CAP_FILE_WR; + if (mode & FILE_MODE_R) my_want |= CEPH_CAP_RDCACHE | CEPH_CAP_RD; + if (mode & FILE_MODE_W) my_want |= CEPH_CAP_WRBUFFER | CEPH_CAP_WR; // register a capability Capability *cap = in->get_client_cap(my_client); @@ -498,14 +498,14 @@ Capability* Locker::issue_new_caps(CInode *in, int now = cap->pending(); if (before != now && - (before & CAP_FILE_WR) == 0 && - (now & CAP_FILE_WR)) { + (before & CEPH_CAP_WR) == 0 && + (now & CEPH_CAP_WR)) { // FIXME FIXME FIXME } // twiddle file_data_version? - if ((before & CAP_FILE_WRBUFFER) == 0 && - (now & CAP_FILE_WRBUFFER)) { + if ((before & CEPH_CAP_WRBUFFER) == 0 && + (now & CEPH_CAP_WRBUFFER)) { in->inode.file_data_version++; dout(7) << " incrementing file_data_version, now " << in->inode.file_data_version << " for " << *in << dendl; } @@ -538,8 +538,8 @@ bool Locker::issue_caps(CInode *in) int after = it->second.pending(); // twiddle file_data_version? - if (!(before & CAP_FILE_WRBUFFER) && - (after & CAP_FILE_WRBUFFER)) { + if (!(before & CEPH_CAP_WRBUFFER) && + (after & CEPH_CAP_WRBUFFER)) { dout(7) << " incrementing file_data_version for " << *in << dendl; in->inode.file_data_version++; } @@ -740,7 +740,7 @@ void Locker::handle_client_file_caps(MClientFileCaps *m) in->inode.atime = m->get_atime(); } - if ((has|had) & CAP_FILE_WR) { + if ((has|had) & CEPH_CAP_WR) { bool dirty = false; // mtime @@ -2423,7 +2423,7 @@ void Locker::file_eval(FileLock *lock) // * -> loner? if (!lock->is_rdlocked() && !lock->is_waiter_for(SimpleLock::WAIT_WR) && - (wanted & CAP_FILE_WR) && + (wanted & CEPH_CAP_WR) && loner && lock->get_state() != LOCK_LONER) { dout(7) << "file_eval stable, bump to loner " << *lock << " on " << *lock->get_parent() << dendl; @@ -2433,8 +2433,8 @@ void Locker::file_eval(FileLock *lock) // * -> mixed? else if (!lock->is_rdlocked() && !lock->is_waiter_for(SimpleLock::WAIT_WR) && - (wanted & CAP_FILE_RD) && - (wanted & CAP_FILE_WR) && + (wanted & CEPH_CAP_RD) && + (wanted & CEPH_CAP_WR) && !(loner && lock->get_state() == LOCK_LONER) && lock->get_state() != LOCK_MIXED) { dout(7) << "file_eval stable, bump to mixed " << *lock << " on " << *lock->get_parent() << dendl; @@ -2443,8 +2443,8 @@ void Locker::file_eval(FileLock *lock) // * -> sync? else if (!in->filelock.is_waiter_for(SimpleLock::WAIT_WR) && - !(wanted & (CAP_FILE_WR|CAP_FILE_WRBUFFER)) && - ((wanted & CAP_FILE_RD) || + !(wanted & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) && + ((wanted & CEPH_CAP_RD) || in->is_replicated() || (!loner && lock->get_state() == LOCK_LONER)) && lock->get_state() != LOCK_SYNC) { @@ -2473,7 +2473,7 @@ bool Locker::file_sync(FileLock *lock) int issued = in->get_caps_issued(); - assert((in->get_caps_wanted() & CAP_FILE_WR) == 0); + assert((in->get_caps_wanted() & CEPH_CAP_WR) == 0); if (lock->get_state() == LOCK_LOCK) { if (in->is_replicated()) { @@ -2491,7 +2491,7 @@ bool Locker::file_sync(FileLock *lock) else if (lock->get_state() == LOCK_MIXED) { // writers? - if (issued & CAP_FILE_WR) { + if (issued & CEPH_CAP_WR) { // gather client write caps lock->set_state(LOCK_GSYNCM); lock->get_parent()->auth_pin(); @@ -2512,7 +2512,7 @@ bool Locker::file_sync(FileLock *lock) else if (lock->get_state() == LOCK_LONER) { // writers? - if (issued & CAP_FILE_WR) { + if (issued & CEPH_CAP_WR) { // gather client write caps lock->set_state(LOCK_GSYNCL); lock->get_parent()->auth_pin(); @@ -2601,7 +2601,7 @@ void Locker::file_lock(FileLock *lock) } else if (lock->get_state() == LOCK_LONER) { - if (issued & CAP_FILE_WR) { + if (issued & CEPH_CAP_WR) { // change lock lock->set_state(LOCK_GLOCKL); lock->get_parent()->auth_pin(); @@ -2664,7 +2664,7 @@ void Locker::file_mixed(FileLock *lock) } else if (lock->get_state() == LOCK_LONER) { - if (issued & CAP_FILE_WRBUFFER) { + if (issued & CEPH_CAP_WRBUFFER) { // gather up WRBUFFER caps lock->set_state(LOCK_GMIXEDL); lock->get_parent()->auth_pin(); @@ -2786,7 +2786,7 @@ void Locker::handle_file_lock(FileLock *lock, MLock *m) lock->set_state(LOCK_GLOCKR); // call back caps? - if (issued & CAP_FILE_RD) { + if (issued & CEPH_CAP_RD) { dout(7) << "handle_file_lock client readers, gathering caps on " << *in << dendl; issue_caps(in); break; @@ -2811,7 +2811,7 @@ void Locker::handle_file_lock(FileLock *lock, MLock *m) if (lock->get_state() == LOCK_SYNC) { // MIXED - if (issued & CAP_FILE_RD) { + if (issued & CEPH_CAP_RD) { // call back client caps lock->set_state(LOCK_GMIXEDR); issue_caps(in); diff --git a/src/mds/Server.cc b/src/mds/Server.cc index e383a87e1b5..bba24bf526d 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -362,8 +362,8 @@ void Server::process_reconnected_caps() int issued = in->get_caps_issued(); if (in->is_auth()) { // wr? - if (issued & (CAP_FILE_WR|CAP_FILE_WRBUFFER)) { - if (issued & (CAP_FILE_RDCACHE|CAP_FILE_WRBUFFER)) { + if (issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) { + if (issued & (CEPH_CAP_RDCACHE|CEPH_CAP_WRBUFFER)) { in->filelock.set_state(LOCK_LONER); } else { in->filelock.set_state(LOCK_MIXED); @@ -371,7 +371,7 @@ void Server::process_reconnected_caps() } } else { // note that client should perform stale/reap cleanup during reconnect. - assert(issued & (CAP_FILE_WR|CAP_FILE_WRBUFFER) == 0); // ???? + assert(issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER) == 0); // ???? if (in->filelock.is_xlocked()) in->filelock.set_state(LOCK_LOCK); else @@ -3743,7 +3743,7 @@ void Server::handle_client_open(MDRequest *mdr) return; } // can only open a dir rdonly, no flags. - if (cur->inode.is_dir() && (cmode != FILE_MODE_R || flags != 0)) { + if (cur->inode.is_dir() && (cmode != FILE_MODE_R || flags != O_DIRECTORY)) { reply_request(mdr, -EINVAL); return; } diff --git a/src/messages/MClientFileCaps.h b/src/messages/MClientFileCaps.h index b44365ee73c..78396fb773b 100644 --- a/src/messages/MClientFileCaps.h +++ b/src/messages/MClientFileCaps.h @@ -38,7 +38,7 @@ class MClientFileCaps : public Message { public: int get_caps() { return le32_to_cpu(h.caps); } int get_wanted() { return le32_to_cpu(h.wanted); } - capseq_t get_seq() { return le64_to_cpu(h.seq); } + capseq_t get_seq() { return le32_to_cpu(h.seq); } inodeno_t get_ino() { return le64_to_cpu(h.ino); } __u64 get_size() { return le64_to_cpu(h.size); } @@ -70,12 +70,12 @@ class MClientFileCaps : public Message { int mmds=0, int mseq=0) : Message(CEPH_MSG_CLIENT_FILECAPS) { - h.seq = cpu_to_le64(seq); + h.op = cpu_to_le32(op); + h.seq = cpu_to_le32(seq); h.caps = cpu_to_le32(caps); h.wanted = cpu_to_le32(wanted); h.ino = cpu_to_le64(inode.ino); h.size = cpu_to_le64(inode.size); - h.op = cpu_to_le32(op); h.migrate_mds = cpu_to_le32(mmds); h.migrate_seq = cpu_to_le32(mseq); h.mtime = inode.mtime.tv_ref(); @@ -84,8 +84,8 @@ class MClientFileCaps : public Message { const char *get_type_name() { return "Cfcap";} void print(ostream& out) { - out << "client_file_caps(" << le32_to_cpu(h.op) - << " " << le64_to_cpu(h.ino) + out << "client_file_caps(" << get_opname(le32_to_cpu(h.op)) + << " ino " << inodeno_t(le64_to_cpu(h.ino)) << " seq " << le32_to_cpu(h.seq) << " caps " << cap_string(le32_to_cpu(h.caps)) << " wanted" << cap_string(le32_to_cpu(h.wanted)) diff --git a/src/start.sh b/src/start.sh index b34b93d74b2..f77c005d50d 100755 --- a/src/start.sh +++ b/src/start.sh @@ -6,4 +6,4 @@ ./cosd --mkfs --osd 1 & ./cosd --mkfs --osd 2 & ./cosd --mkfs --osd 3 & -./cmds & +./cmds &