Merge commit '114ad5177f281b96ba6bbebe9ed85e2a4aec783f' into rados

Conflicts:

	src/include/ceph_fs.h
This commit is contained in:
Sage Weil 2009-06-16 16:34:03 -07:00
commit d01b24973d
23 changed files with 446 additions and 49 deletions

View File

@ -573,6 +573,8 @@ noinst_HEADERS = \
messages/MPGStats.h\
messages/MPGStatsAck.h\
messages/MPing.h\
messages/MPoolSnap.h\
messages/MPoolSnapReply.h\
messages/MRemoveSnaps.h\
messages/MStatfs.h\
messages/MStatfsReply.h\

View File

@ -638,6 +638,8 @@ Inode* Client::insert_trace(MetaRequest *request, utime_t from, int mds)
string dname;
LeaseStat dlease;
while (numdn) {
__u32 pos; // dentry pos within the fragment
::decode(pos, p);
::decode(dname, p);
::decode(dlease, p);
InodeStat ist(p);

View File

@ -27,7 +27,7 @@
#define CEPH_MDS_PROTOCOL 9 /* cluster internal */
#define CEPH_MON_PROTOCOL 4 /* cluster internal */
#define CEPH_OSDC_PROTOCOL 18 /* public/client */
#define CEPH_MDSC_PROTOCOL 21 /* public/client */
#define CEPH_MDSC_PROTOCOL 22 /* public/client */
#define CEPH_MONC_PROTOCOL 12 /* public/client */

View File

@ -51,8 +51,8 @@ void rados_pool_close_ctx(rados_list_ctx_t *ctx);
int rados_pool_list_next(rados_pool_t pool, const char **entry, rados_list_ctx_t *ctx);
/* snapshots */
int rados_snap_create(rados_pool_t pool, const char *snapname);
int rados_snap_remove(rados_pool_t pool, const char *snapname);
int rados_snap_create(const rados_pool_t pool, const char *snapname);
int rados_snap_remove(const rados_pool_t pool, const char *snapname);
int rados_snap_list(rados_pool_t pool, rados_snap_t *snaps, int maxlen);
int rados_snap_get_name(rados_pool_t pool, rados_snap_t id, char *name, int maxlen);
@ -114,8 +114,8 @@ public:
std::map<std::string,rados_pool_stat_t>& stats);
int get_fs_stats(rados_statfs_t& result);
int snap_create(rados_pool_t pool, const char *snapname);
int snap_remove(rados_pool_t pool, const char *snapname);
int snap_create(const rados_pool_t pool, const char *snapname);
int snap_remove(const rados_pool_t pool, const char *snapname);
int snap_list(rados_pool_t pool, vector<rados_snap_t> *snaps);
int snap_get_name(rados_pool_t pool, rados_snap_t snap, std::string *name);
int snap_get_stamp(rados_pool_t pool, rados_snap_t snap, time_t *t);

View File

@ -70,8 +70,7 @@ static int __dcache_readdir(struct file *filp,
p = parent->d_subdirs.prev;
dout(10, " initial p %p/%p\n", p->prev, p->next);
} else {
p = last->d_u.d_child.prev;
filp->f_pos++;
p = &last->d_u.d_child;
}
more:
@ -157,7 +156,7 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
int err;
u32 ftype;
struct ceph_mds_reply_info_parsed *rinfo;
int complete = 0, len;
int len;
const int max_entries = client->mount_args.max_readdir;
dout(5, "readdir %p filp %p frag %u off %u\n", inode, filp, frag, off);
@ -165,8 +164,8 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
return 0;
if (filp->f_pos == 0) {
/* set I_READDIR at start of readdir */
ceph_i_set(inode, CEPH_I_READDIR);
/* note dir version at start of readdir */
fi->dir_release_count = ci->i_release_count;
dout(10, "readdir off 0 -> '.'\n");
if (filldir(dirent, ".", 1, ceph_make_fpos(0, 0),
@ -242,8 +241,10 @@ more:
(int)req->r_reply_info.dir_end,
(int)req->r_reply_info.dir_complete);
if (req->r_reply_info.dir_complete)
complete = 1;
if (!req->r_did_prepopulate) {
dout(10, "readdir !did_prepopulate");
fi->dir_release_count--;
}
fi->off = fi->next_off;
kfree(fi->last_name);
@ -277,22 +278,25 @@ more:
dout(10, "readdir frag %x num %d off %d fragoff %d skew %d\n", frag,
rinfo->dir_nr, off, fi->off, skew);
while (off >= skew && off+skew < rinfo->dir_nr) {
dout(10, "readdir off %d -> %d / %d name '%.*s'\n",
off, off+skew,
rinfo->dir_nr, rinfo->dir_dname_len[off+skew],
u64 pos = ceph_make_fpos(frag, rinfo->dir_pos[off+skew] +
(frag_is_leftmost(frag) ? 2 : 0));
dout(10, "readdir off %d (%d/%d) -> %lld '%.*s'\n",
off, off+skew, rinfo->dir_nr, pos,
rinfo->dir_dname_len[off+skew],
rinfo->dir_dname[off+skew]);
ftype = le32_to_cpu(rinfo->dir_in[off+skew].in->mode) >> 12;
if (filldir(dirent,
rinfo->dir_dname[off+skew],
rinfo->dir_dname_len[off+skew],
ceph_make_fpos(frag, off),
pos,
le64_to_cpu(rinfo->dir_in[off+skew].in->ino),
ftype) < 0) {
dout(20, "filldir stopping us...\n");
return 0;
}
off++;
filp->f_pos++;
filp->f_pos = pos + 1;
}
if (fi->last_name) {
@ -312,15 +316,14 @@ more:
fi->at_end = 1;
/*
* if I_READDIR is still set, no dentries were released
* during the whole readdir, and we should have the complete
* dir contents in our cache.
* if dir_release_count still matches the dir, no dentries
* were released during the whole readdir, and we should have
* the complete dir contents in our cache.
*/
spin_lock(&inode->i_lock);
if (complete && (ci->i_ceph_flags & CEPH_I_READDIR)) {
if (ci->i_release_count == fi->dir_release_count) {
dout(10, " marking %p complete\n", inode);
ci->i_ceph_flags |= CEPH_I_COMPLETE;
ci->i_ceph_flags &= ~CEPH_I_READDIR;
ci->i_max_offset = filp->f_pos;
}
spin_unlock(&inode->i_lock);
@ -364,9 +367,9 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int origin)
fi->at_end = 0;
}
/* clear I_READDIR if we did a forward seek */
/* bump dir_release_count if we did a forward seek */
if (offset > old_offset)
ceph_inode(inode)->i_ceph_flags &= ~CEPH_I_READDIR;
fi->dir_release_count--;
}
mutex_unlock(&inode->i_mutex);
return retval;
@ -903,7 +906,8 @@ static void ceph_dentry_release(struct dentry *dentry)
if (ci->i_rdcache_gen == di->lease_rdcache_gen) {
dout(10, " clearing %p complete (d_release)\n",
parent_inode);
ci->i_ceph_flags &= ~(CEPH_I_COMPLETE|CEPH_I_READDIR);
ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
ci->i_release_count++;
}
spin_unlock(&parent_inode->i_lock);
}

View File

@ -251,6 +251,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ci->i_version = 0;
ci->i_time_warp_seq = 0;
ci->i_ceph_flags = 0;
ci->i_release_count = 0;
ci->i_symlink = NULL;
ci->i_fragtree = RB_ROOT;
@ -854,7 +855,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
ceph_inode(req->r_locked_dir);
dout(10, " clearing %p complete (empty trace)\n",
req->r_locked_dir);
ci->i_ceph_flags &= ~(CEPH_I_READDIR | CEPH_I_COMPLETE);
ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
ci->i_release_count++;
}
return 0;
}
@ -1105,8 +1107,8 @@ retry_lookup:
}
di = dn->d_fsdata;
di->offset = ceph_make_fpos(frag,
i + (frag_is_leftmost(frag) ? 2 : 0));
di->offset = ceph_make_fpos(frag, rinfo->dir_pos[i] +
(frag_is_leftmost(frag) ? 2 : 0));
/* inode */
if (dn->d_inode) {
@ -1134,6 +1136,7 @@ retry_lookup:
req->r_session, req->r_request_started);
dput(dn);
}
req->r_did_prepopulate = true;
out:
if (snapdir) {
@ -1559,16 +1562,19 @@ int ceph_permission(struct inode *inode, int mask)
int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
struct kstat *stat)
{
struct inode *inode = dentry->d_inode;
int err;
err = ceph_do_getattr(dentry->d_inode, CEPH_STAT_CAP_INODE_ALL);
err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL);
if (!err) {
generic_fillattr(dentry->d_inode, stat);
stat->ino = dentry->d_inode->i_ino;
if (ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
stat->dev = ceph_snap(dentry->d_inode);
generic_fillattr(inode, stat);
stat->ino = inode->i_ino;
if (ceph_snap(inode) != CEPH_NOSNAP)
stat->dev = ceph_snap(inode);
else
stat->dev = 0;
if (S_ISDIR(inode->i_mode))
stat->blksize = 65536;
}
return err;
}

View File

@ -136,19 +136,23 @@ static int parse_reply_info_dir(void **p, void *end,
info->dir_in = kmalloc(num * (sizeof(*info->dir_in) +
sizeof(*info->dir_dname) +
sizeof(*info->dir_dname_len) +
sizeof(*info->dir_pos) +
sizeof(*info->dir_dlease)),
GFP_NOFS);
if (info->dir_in == NULL) {
err = -ENOMEM;
goto out_bad;
}
info->dir_dname = (void *)(info->dir_in + num);
info->dir_pos = (void *)(info->dir_in + num);
info->dir_dname = (void *)(info->dir_pos + num);
info->dir_dname_len = (void *)(info->dir_dname + num);
info->dir_dlease = (void *)(info->dir_dname_len + num);
while (num) {
/* dentry */
ceph_decode_32_safe(p, end, info->dir_dname_len[i], bad);
ceph_decode_need(p, end, sizeof(u32)*2, bad);
ceph_decode_32(p, info->dir_pos[i]);
ceph_decode_32(p, info->dir_dname_len[i]);
ceph_decode_need(p, end, info->dir_dname_len[i], bad);
info->dir_dname[i] = *p;
*p += info->dir_dname_len[i];

View File

@ -86,6 +86,7 @@ struct ceph_mds_reply_info_parsed {
u32 *dir_dname_len;
struct ceph_mds_reply_lease **dir_dlease;
struct ceph_mds_reply_info_in *dir_in;
u32 *dir_pos;
u8 dir_complete, dir_end;
/* encoded blob describing snapshot contexts for certain
@ -220,6 +221,8 @@ struct ceph_mds_request {
struct list_head r_unsafe_item; /* per-session unsafe list item */
bool r_got_unsafe, r_got_safe;
bool r_did_prepopulate;
struct ceph_cap_reservation r_caps_reservation;
int r_num_caps;
};

View File

@ -274,7 +274,6 @@ struct ceph_inode_xattrs_info {
* Ceph inode.
*/
#define CEPH_I_COMPLETE 1 /* we have complete directory cached */
#define CEPH_I_READDIR 2 /* no dentries trimmed since readdir start */
#define CEPH_I_NODELAY 4 /* do not delay cap release */
#define CEPH_I_FLUSH 8 /* do not delay cap send */
@ -285,6 +284,7 @@ struct ceph_inode_info {
u32 i_time_warp_seq;
unsigned i_ceph_flags;
unsigned long i_release_count;
struct ceph_file_layout i_layout;
char *i_symlink;
@ -622,6 +622,7 @@ struct ceph_file_info {
unsigned next_off;
struct dentry *dentry;
int at_end;
unsigned long dir_release_count;
/* used for -o dirstat read() on directory thing */
char *dir_info;

View File

@ -89,6 +89,8 @@ public:
int snap_lookup(PoolCtx *pool, const char *name, rados_snap_t *snapid);
int snap_get_name(PoolCtx *pool, rados_snap_t snapid, std::string *s);
int snap_get_stamp(PoolCtx *pool, rados_snap_t snapid, time_t *t);
int snap_create(const rados_pool_t pool, const char* snapname);
int snap_remove(const rados_pool_t pool, const char* snapname);
// io
int write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
@ -371,8 +373,11 @@ bool RadosClient::_dispatch(Message *m)
case CEPH_MSG_STATFS_REPLY:
objecter->handle_fs_stats_reply((MStatfsReply*)m);
break;
break;
case MSG_POOLSNAPREPLY:
objecter->handle_pool_snap_reply((MPoolSnapReply*)m);
break;
default:
return false;
}
@ -445,8 +450,51 @@ int RadosClient::get_fs_stats( rados_statfs_t& result ) {
}
// SNAPS
int RadosClient::snap_create( const rados_pool_t pool, const char *snapName) {
int reply;
int poolID = ((PoolCtx *)pool)->poolid;
string sName = string(snapName);
Mutex mylock ("RadosClient::snap_create::mylock");
Cond cond;
bool done;
lock.Lock();
objecter->create_pool_snap(&reply,
poolID,
sName,
new C_SafeCond(&mylock, &cond, &done));
lock.Unlock();
mylock.Lock();
while(!done) cond.Wait(mylock);
mylock.Unlock();
return reply;
}
int RadosClient::snap_remove( const rados_pool_t pool, const char *snapName) {
int reply;
int poolID = ((PoolCtx *)pool)->poolid;
string sName = string(snapName);
Mutex mylock ("RadosClient::snap_remove::mylock");
Cond cond;
bool done;
lock.Lock();
objecter->delete_pool_snap(&reply,
poolID,
sName,
new C_SafeCond(&mylock, &cond, &done));
lock.Unlock();
mylock.Lock();
while(!done) cond.Wait(mylock);
mylock.Unlock();
return reply;
}
int RadosClient::snap_list(PoolCtx *pool, vector<rados_snap_t> *snaps)
{
Mutex::Locker l(lock);
@ -885,6 +933,17 @@ int Rados::close_pool(rados_pool_t pool)
// SNAPS
int Rados::snap_create(const rados_pool_t pool, const char *snapname) {
if (!client) return -EINVAL;
return client->snap_create(pool, snapname);
}
int Rados::snap_remove(const rados_pool_t pool, const char *snapname) {
if (!client) return -EINVAL;
return client->snap_remove(pool, snapname);
}
void Rados::set_snap(rados_pool_t pool, snapid_t seq)
{
if (!client)

View File

@ -1486,6 +1486,15 @@ void CDir::_commit(version_t want)
inode->make_path_string(path);
m.setxattr("path", path);
CDentry *pdn = inode->get_parent_dn();
if (pdn) {
bufferlist parent(16 + pdn->name.length());
__u64 ino = pdn->get_dir()->get_inode()->ino();
::encode(ino, parent);
::encode(pdn->name, parent);
m.setxattr("parent", parent);
}
object_t oid = get_ondisk_object();
OSDMap *osdmap = cache->mds->objecter->osdmap;
ceph_object_layout ol = osdmap->make_object_layout(oid,

View File

@ -2143,13 +2143,11 @@ void Server::handle_client_readdir(MDRequest *mdr)
__u32 numfiles = 0;
__u32 pos = 0;
while (it != dir->end() && numfiles < max) {
CDentry *dn = it->second;
it++;
if (offset && strcmp(dn->get_name().c_str(), offset) <= 0)
continue;
if (dn->state_test(CDentry::STATE_PURGING))
continue;
@ -2168,6 +2166,10 @@ void Server::handle_client_readdir(MDRequest *mdr)
if (dn->last < snapid || dn->first > snapid)
continue;
__u32 dpos = pos++;
if (offset && strcmp(dn->get_name().c_str(), offset) <= 0)
continue;
CInode *in = dnl->get_inode();
// remote link?
@ -2195,6 +2197,7 @@ void Server::handle_client_readdir(MDRequest *mdr)
// dentry
dout(12) << "including dn " << *dn << dendl;
::encode(dpos, dnbl);
::encode(dn->name, dnbl);
mds->locker->issue_client_lease(dn, client, dnbl, mdr->now, mdr->session);
@ -2209,7 +2212,7 @@ void Server::handle_client_readdir(MDRequest *mdr)
}
__u8 end = (it == dir->end());
__u8 complete = (end && !offset);
__u8 complete = (end && !offset); // FIXME: what purpose does this serve
// final blob
bufferlist dirbl;

53
src/messages/MPoolSnap.h Normal file
View File

@ -0,0 +1,53 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef __MPOOLSNAP_H
#define __MPOOLSNAP_H
class MPoolSnap : public Message {
public:
ceph_fsid_t fsid;
tid_t tid;
int pool;
string name;
bool create;
MPoolSnap() : Message(MSG_POOLSNAP) {}
MPoolSnap( ceph_fsid_t& f, tid_t t, int p, string& n, bool c) :
Message(MSG_POOLSNAP), fsid(f), tid(t), pool(p), name(n), create(c) {}
const char *get_type_name() { return "poolsnap"; }
void print(ostream& out) {
out << "poolsnap(" << tid << " " << name << ")";
}
void encode_payload() {
::encode(fsid, payload);
::encode(tid, payload);
::encode(pool, payload);
::encode(name, payload);
::encode(create, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(fsid, p);
::decode(tid, p);
::decode(pool, p);
::decode(name, p);
::decode(create, p);
}
};
#endif

View File

@ -0,0 +1,50 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef __MPOOLSNAPREPLY_H
#define __MPOOLSNAPREPLY_H
class MPoolSnapReply : public Message {
public:
ceph_fsid_t fsid;
tid_t tid;
int replyCode;
epoch_t epoch;
MPoolSnapReply() : Message(MSG_POOLSNAPREPLY) {}
MPoolSnapReply( ceph_fsid_t& f, tid_t t, int rc, int e) :
Message(MSG_POOLSNAPREPLY), fsid(f), tid(t), replyCode(rc), epoch(e) {}
const char *get_type_name() { return "poolsnapreply"; }
void print(ostream& out) {
out << "poolsnapreply(" << tid <<")";
}
void encode_payload() {
::encode(fsid, payload);
::encode(tid, payload);
::encode(replyCode, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(fsid, p);
::decode(tid, p);
::decode(replyCode, p);
}
};
#endif

View File

@ -419,6 +419,10 @@ bool Monitor::dispatch_impl(Message *m)
paxos_service[PAXOS_PGMAP]->dispatch(m);
break;
case MSG_POOLSNAP:
paxos_service[PAXOS_OSDMAP]->dispatch(m);
break;
// log
case MSG_LOG:
paxos_service[PAXOS_LOG]->dispatch(m);

View File

@ -26,6 +26,8 @@
#include "messages/MOSDGetMap.h"
#include "messages/MOSDBoot.h"
#include "messages/MOSDAlive.h"
#include "messages/MPoolSnap.h"
#include "messages/MPoolSnapReply.h"
#include "messages/MMonCommand.h"
#include "messages/MRemoveSnaps.h"
#include "messages/MOSDScrub.h"
@ -271,6 +273,9 @@ bool OSDMonitor::preprocess_query(Message *m)
return preprocess_out((MOSDOut*)m);
*/
case MSG_POOLSNAP:
return preprocess_pool_snap((MPoolSnap*)m);
case MSG_REMOVE_SNAPS:
return preprocess_remove_snaps((MRemoveSnaps*)m);
@ -301,6 +306,8 @@ bool OSDMonitor::prepare_update(Message *m)
case MSG_OSD_OUT:
return prepare_out((MOSDOut*)m);
*/
case MSG_POOLSNAP:
return prepare_pool_snap((MPoolSnap*)m);
case MSG_REMOVE_SNAPS:
return prepare_remove_snaps((MRemoveSnaps*)m);
@ -614,7 +621,7 @@ bool OSDMonitor::preprocess_remove_snaps(MRemoveSnaps *m)
q != m->snaps.end();
q++) {
if (!osdmap.have_pg_pool(q->first)) {
dout(10) << " ignoring removed_snaps " << q->second << " on non-existant pool " << q->first << dendl;
dout(10) << " ignoring removed_snaps " << q->second << " on non-existent pool " << q->first << dendl;
continue;
}
const pg_pool_t& pi = osdmap.get_pg_pool(q->first);
@ -1264,4 +1271,55 @@ out:
return false;
}
bool OSDMonitor::preprocess_pool_snap ( MPoolSnap *m) {
if (m->pool < 0 ) {
_pool_snap(m, -ENOENT, pending_inc.epoch);
return true; //done with this message
}
bool snap_exists = false;
pg_pool_t *pp = 0;
if (pending_inc.new_pools.count(m->pool)) pp = &pending_inc.new_pools[m->pool];
//check if the snapname exists
if ((osdmap.get_pg_pool(m->pool).snap_exists(m->name.c_str())) ||
(pp && pp->snap_exists(m->name.c_str()))) snap_exists = true;
if (m->create) { //if it's a snap creation request
if(snap_exists) {
_pool_snap(m, -EEXIST, pending_inc.epoch);
return true;
}
else return false; //this message needs to go through preparation
}
//it's a snap deletion request if we make it here
if (!snap_exists) {
_pool_snap(m, -ENOENT, pending_inc.epoch);
return true; //done with this message
}
return false;
}
bool OSDMonitor::prepare_pool_snap ( MPoolSnap *m)
{
const pg_pool_t *p = &osdmap.get_pg_pool(m->pool);
pg_pool_t* pp = 0;
//if the pool isn't already in the update, add it
if (!pending_inc.new_pools.count(m->pool)) pending_inc.new_pools[m->pool] = *p;
pp = &pending_inc.new_pools[m->pool];
if (m->create) { //it's a snap creation message
pp->add_snap(m->name.c_str(), g_clock.now());
pp->set_snap_epoch(pending_inc.epoch);
}
else { //it's a snap removal message
pp->remove_snap(pp->snap_exists(m->name.c_str()));
}
paxos->wait_for_commit(new OSDMonitor::C_Snap(this, m, 0, pending_inc.epoch));
return true;
}
void OSDMonitor::_pool_snap(MPoolSnap *m, int replyCode, epoch_t epoch)
{
MPoolSnapReply *reply = new MPoolSnapReply(m->fsid, m->tid, replyCode, epoch);
mon->messenger->send_message(reply, m->get_orig_source_inst());
delete m;
}

View File

@ -32,6 +32,7 @@ using namespace std;
class Monitor;
class MOSDBoot;
class MMonCommand;
class MPoolSnap;
class OSDMonitor : public PaxosService {
public:
@ -83,6 +84,10 @@ private:
bool prepare_alive(class MOSDAlive *m);
void _alive(MOSDAlive *m);
bool preprocess_pool_snap ( class MPoolSnap *m);
bool prepare_pool_snap (MPoolSnap *m);
void _pool_snap(MPoolSnap *m, int replyCode, epoch_t epoch);
struct C_Booted : public Context {
OSDMonitor *cmon;
MOSDBoot *m;
@ -115,6 +120,17 @@ private:
cmon->dispatch((Message*)m);
}
};
struct C_Snap : public Context {
OSDMonitor *osdmon;
MPoolSnap *m;
int replyCode;
int epoch;
C_Snap(OSDMonitor * osd, MPoolSnap *m_, int rc, int e) :
osdmon(osd), m(m_), replyCode(rc), epoch(e) {}
void finish(int r) {
osdmon->_pool_snap(m, replyCode, epoch);
}
};
bool preprocess_out(class MOSDOut *m);
bool prepare_out(class MOSDOut *m);

View File

@ -19,6 +19,9 @@ using namespace std;
#include "messages/MGetPoolStats.h"
#include "messages/MGetPoolStatsReply.h"
#include "messages/MPoolSnap.h"
#include "messages/MPoolSnapReply.h"
#include "messages/MMonCommand.h"
#include "messages/MMonCommandAck.h"
#include "messages/MMonPaxos.h"
@ -168,7 +171,12 @@ Message *decode_message(ceph_msg_header& header, ceph_msg_footer& footer,
case MSG_GETPOOLSTATSREPLY:
m = new MGetPoolStatsReply;
break;
case MSG_POOLSNAP:
m = new MPoolSnap;
break;
case MSG_POOLSNAPREPLY:
m = new MPoolSnapReply;
break;
case MSG_MON_COMMAND:
m = new MMonCommand;
break;

View File

@ -35,6 +35,9 @@
#define MSG_GETPOOLSTATS 58
#define MSG_GETPOOLSTATSREPLY 59
#define MSG_POOLSNAP 49
#define MSG_POOLSNAPREPLY 48
// osd internal
#define MSG_OSD_PING 70
#define MSG_OSD_BOOT 71

View File

@ -1333,6 +1333,8 @@ void ReplicatedPG::make_writeable(OpContext *ctx)
ctx->clone_obc->obs.oi.last_reqid = oi.last_reqid;
ctx->clone_obc->obs.oi.mtime = oi.mtime;
ctx->clone_obc->obs.oi.snaps = snaps;
ctx->clone_obc->obs.exists = true;
ctx->clone_obc->get();
ctx->clone_obc->force_start_write();
if (is_primary())

View File

@ -25,6 +25,9 @@
#include "messages/MOSDMap.h"
#include "messages/MOSDGetMap.h"
#include "messages/MPoolSnap.h"
#include "messages/MPoolSnapReply.h"
#include "messages/MGetPoolStats.h"
#include "messages/MGetPoolStatsReply.h"
#include "messages/MStatfs.h"
@ -76,6 +79,10 @@ void Objecter::dispatch(Message *m)
handle_fs_stats_reply((MStatfsReply*)m);
break;
case MSG_POOLSNAPREPLY:
handle_pool_snap_reply((MPoolSnapReply*)m);
break;
default:
dout(1) << "don't know message type " << m->get_type() << dendl;
assert(0);
@ -177,7 +184,12 @@ void Objecter::handle_osd_map(MOSDMap *m)
kick_requests(changed_pgs);
}
finish_contexts(waiting_for_map);
map<epoch_t,list<Context*> >::iterator p = waiting_for_map.begin();
while (p != waiting_for_map.end() &&
p->first <= osdmap->get_epoch()) {
finish_contexts(p->second);
waiting_for_map.erase(p++);
}
delete m;
}
@ -525,6 +537,67 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
delete m;
}
//snapshots
void Objecter::create_pool_snap(int *reply, int pool, string& snapName, Context *onfinish) {
dout(10) << "create_pool_snap; pool: " << pool << "; snap: " << snapName << dendl;
SnapOp *op = new SnapOp;
op->tid = ++last_tid;
op->pool = pool;
op->name = snapName;
op->onfinish = onfinish;
op->create = true;
op->replyCode = reply;
op_snap[op->tid] = op;
pool_snap_submit(op);
}
void Objecter::delete_pool_snap(int *reply, int pool, string& snapName, Context *onfinish) {
dout(10) << "delete_pool_snap; pool: " << pool << "; snap: " << snapName << dendl;
SnapOp *op = new SnapOp;
op->tid = ++last_tid;
op->pool = pool;
op->name = snapName;
op->onfinish = onfinish;
op->create = false;
op->replyCode = reply;
op_snap[op->tid] = op;
pool_snap_submit(op);
}
void Objecter::pool_snap_submit(SnapOp *op) {
dout(10) << "pool_snap_submit " << op->tid << dendl;
MPoolSnap *m = new MPoolSnap(monmap->fsid, op->tid, op->pool, op->name, op->create);
int mon = monmap->pick_mon();
messenger->send_message(m, monmap->get_inst(mon));
}
void Objecter::handle_pool_snap_reply(MPoolSnapReply *m) {
dout(10) << "handle_pool_snap_reply " << *m << dendl;
tid_t tid = m->tid;
if (op_snap.count(tid)) {
SnapOp *op = op_snap[tid];
dout(10) << "have request " << tid << " at " << op << " Create: " << op->create << dendl;
*(op->replyCode) = m->replyCode;
if (osdmap->get_epoch() < m->epoch) {
dout(20) << "waiting for client to reach epoch " << m->epoch << " before calling back" << dendl;
wait_for_new_map(op->onfinish, m->epoch);
}
else {
op->onfinish->finish(0);
delete op->onfinish;
}
op->onfinish = NULL;
delete op;
op_snap.erase(tid);
} else {
dout(10) << "unknown request " << tid << dendl;
}
dout(10) << "done" << dendl;
delete m;
}
// pool stats

View File

@ -35,6 +35,8 @@ class OSDMap;
class MonMap;
class Message;
class MPoolSnapReply;
class MGetPoolStatsReply;
class MStatfsReply;
@ -259,13 +261,23 @@ class Objecter {
Context *onfinish;
};
struct SnapOp {
tid_t tid;
int pool;
string name;
Context *onfinish;
bool create;
int* replyCode;
};
private:
// pending ops
hash_map<tid_t,Op*> op_osd;
map<tid_t,PoolStatOp*> op_poolstat;
map<tid_t,StatfsOp*> op_statfs;
map<tid_t,SnapOp*> op_snap;
list<Context*> waiting_for_map;
map<epoch_t,list<Context*> > waiting_for_map;
/**
* track pending ops by pg
@ -338,8 +350,8 @@ private:
int get_client_incarnation() const { return client_inc; }
void set_client_incarnation(int inc) { client_inc = inc; }
void wait_for_new_map(Context *c) {
waiting_for_map.push_back(c);
void wait_for_new_map(Context *c, epoch_t epoch) {
waiting_for_map[epoch].push_back(c);
}
// mid-level helpers
@ -462,7 +474,14 @@ private:
o->snapc = snapc;
return op_submit(o);
}
// -------------------------
// snapshots
private:
void pool_snap_submit(SnapOp *op);
public:
void create_pool_snap(int *reply, int pool, string& snapName, Context *onfinish);
void delete_pool_snap(int *reply, int pool, string& snapName, Context *onfinish);
void handle_pool_snap_reply(MPoolSnapReply *m);
// --------------------------
// pool stats

View File

@ -426,7 +426,25 @@ int main(int argc, const char **argv)
}
cout << snaps.size() << " snaps" << std::endl;
}
else if (strcmp(nargs[0], "mksnap") == 0) {
if ( nargs.size() < 2) usage();
cout << "Submitting snap to backend." << std::endl;
int result = rados.snap_create(p, nargs[1]);
if (result == 0 ) cout << "Success! Created snapshot " << nargs[1] << std::endl;
else cout << "Failure. Attempt to create snapshot returned " << result << std::endl;
}
else if (strcmp(nargs[0], "rmsnap") == 0) {
if ( nargs.size() < 2) usage();
cout << "Submitting snap removal to backend." << std::endl;
int result = rados.snap_remove(p, nargs[1]);
if (result == 0 ) cout << "Success! Removed snapshot " << nargs[1] << std::endl;
else cout << "Failure. Attempt to remove snapshot returned " << result << std::endl;
}
else if (strcmp(nargs[0], "bench") == 0) {
if (nargs.size() < 2)
usage();