Merge branch 'unstable' into rados

Conflicts:

	src/client/SyntheticClient.cc
	src/kernel/bookkeeper.h
	src/osd/ReplicatedPG.cc
	src/osdc/Filer.cc
	src/osdc/Objecter.h
This commit is contained in:
Sage Weil 2009-05-19 13:20:14 -07:00
commit 49d6e404a2
27 changed files with 166 additions and 330 deletions

8
debian/changelog vendored
View File

@ -1,8 +1,12 @@
ceph (0.8-1) unstable; urgency=low
* release
* mount error reporting
* fix mds file size/mtime recovery
* crush fixes
* mds cap fixes
* support Linux 2.6.30
-- sage <sage@newdream.net> Fri, 7 May 2009 13:57:22 -0800
-- sage <sage@newdream.net> Tue, 19 May 2009 13:57:22 -0800
ceph (0.7.3-1) unstable; urgency=low

View File

@ -384,8 +384,6 @@ noinst_HEADERS = \
kernel/Kconfig\
kernel/Makefile\
kernel/addr.c\
kernel/bookkeeper.c\
kernel/bookkeeper.h\
kernel/caps.c\
kernel/ceph_debug.h\
kernel/ceph_fs.h\

View File

@ -40,6 +40,11 @@ v0.8
- backup homiebackup5
- backup backup.
- bug fixes?
- mount error reporting
- fix mds file size/mtime recovery
- changes for 2.6.30
- mds cap fixes
- crush fixes
v0.9
- librados

View File

@ -72,7 +72,7 @@ get_name_list() {
what="$what $bit"
;;
*)
if echo " $bit " | grep -v -q " $f "; then
if echo " $bit " | grep -v -q " " $f " "; then
echo "$0: $type '$f' not found ($conf defines $bit)"
exit 1
fi

View File

@ -1361,7 +1361,7 @@ void Client::send_cap(Inode *in, int mds, InodeCap *cap, int used, int want, int
int held = cap->issued | cap->implemented;
int revoking = cap->implemented & ~cap->issued;
int dropping = cap->issued & ~retain;
int op = (retain == 0) ? CEPH_CAP_OP_RELEASE : CEPH_CAP_OP_UPDATE;
int op = CEPH_CAP_OP_UPDATE;
dout(10) << "send_cap " << *in
<< " mds" << mds << " seq " << cap->seq
@ -1998,12 +1998,7 @@ void Client::handle_caps(MClientCaps *m)
if (inode_map.count(vino)) in = inode_map[vino];
if (!in) {
dout(5) << "handle_caps don't have vino " << vino << dendl;
// release.
m->set_op(CEPH_CAP_OP_RELEASE);
m->head.caps = 0;
m->head.dirty = 0;
messenger->send_message(m, m->get_source_inst());
delete m;
return;
}
@ -2014,10 +2009,8 @@ void Client::handle_caps(MClientCaps *m)
}
if (in->caps.count(mds) == 0) {
m->set_op(CEPH_CAP_OP_RELEASE);
m->head.caps = 0;
m->head.dirty = 0;
messenger->send_message(m, m->get_source_inst());
dout(5) << "handle_caps don't have " << *in << " cap on mds" << mds << dendl;
delete m;
return;
}
InodeCap *cap = in->caps[mds];

View File

@ -1347,7 +1347,8 @@ int SyntheticClient::play_trace(Trace& t, string& prefix, bool metadata_only)
lock.Lock();
ceph_object_layout layout = client->osdmap->make_object_layout(oid, CEPH_CASDATA_RULE);
__u64 size;
client->objecter->stat(oid, layout, CEPH_NOSNAP, &size, 0, new C_SafeCond(&lock, &cond, &ack));
utime_t mtime;
client->objecter->stat(oid, layout, CEPH_NOSNAP, &size, &mtime, 0, new C_SafeCond(&lock, &cond, &ack));
while (!ack) cond.Wait(lock);
lock.Unlock();
}

View File

@ -1,7 +1,6 @@
#ifdef __KERNEL__
# include <linux/slab.h>
# include "../bookkeeper.h"
#else
# include <stdlib.h>
# include <assert.h>

View File

@ -12,9 +12,3 @@ config CEPH_FS
If unsure, say N.
config CEPH_BOOKKEEPER
bool "Ceph leaks detection tool"
depends on CEPH_FS
help
Leaks detection tool for the Ceph fs.

View File

@ -12,7 +12,7 @@ ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o \
mds_client.o mdsmap.o \
mon_client.o \
osd_client.o osdmap.o crush/crush.o crush/mapper.o \
debugfs.o bookkeeper.o
debugfs.o
else
#Otherwise we were called directly from the command

View File

@ -1,122 +0,0 @@
#include <linux/spinlock.h>
#include <linux/slab.h>
#include <linux/kallsyms.h>
#define CEPH_OVERRIDE_BOOKKEEPER /* avoid kmalloc/kfree recursion */
#define CEPH_BK_MAGIC 0x140985AC
#include "ceph_debug.h"
int ceph_debug_tools __read_mostly = -1;
#define DOUT_VAR ceph_debug_tools
#define DOUT_MASK DOUT_MASK_TOOLS
#include "super.h"
static struct list_head _bk_allocs;
static DEFINE_SPINLOCK(_bk_lock);
static size_t _total_alloc;
static size_t _total_free;
struct alloc_data {
u32 prefix_magic;
struct list_head node;
size_t size;
char *fname;
int line;
u32 suffix_magic;
};
void *ceph_kmalloc(char *fname, int line, size_t size, gfp_t flags)
{
struct alloc_data *p = kmalloc(size+sizeof(struct alloc_data), flags);
if (!p)
return NULL;
p->prefix_magic = CEPH_BK_MAGIC;
p->size = size;
p->fname = fname;
p->line = line;
p->suffix_magic = CEPH_BK_MAGIC;
spin_lock(&_bk_lock);
_total_alloc += size;
list_add_tail(&p->node, &_bk_allocs);
spin_unlock(&_bk_lock);
return ((void *)p)+sizeof(struct alloc_data);
}
void ceph_kfree(const void *ptr)
{
struct alloc_data *p = (struct alloc_data *)(ptr -
sizeof(struct alloc_data));
int overrun = 0;
if (!ptr)
return;
if (p->prefix_magic != CEPH_BK_MAGIC) {
derr(0, "ERROR: memory overrun (under)!\n");
overrun = 1;
}
if (p->suffix_magic != CEPH_BK_MAGIC) {
derr(0, "ERROR: Memory overrun (over)!\n");
overrun = 1;
}
if (overrun) {
derr(0, "Memory allocated at %s(%d): p=%p (%zu bytes)\n",
p->fname, p->line, ((void *)p)+sizeof(struct alloc_data),
p->size);
}
BUG_ON(overrun);
spin_lock(&_bk_lock);
_total_free += p->size;
list_del(&p->node);
spin_unlock(&_bk_lock);
kfree(p);
return;
}
void ceph_bookkeeper_dump(void)
{
struct list_head *p;
struct alloc_data *entry;
dout(1, "bookkeeper: total bytes alloc: %zu\n", _total_alloc);
dout(1, "bookkeeper: total bytes free: %zu\n", _total_free);
if (_total_alloc != _total_free) {
list_for_each(p, &_bk_allocs) {
entry = list_entry(p, struct alloc_data, node);
dout(1, "%s(%d): p=%p (%zu bytes)\n", entry->fname,
entry->line,
((void *)entry)+sizeof(struct alloc_data),
entry->size);
}
} else {
dout(1, "No leaks found! Yay!\n");
}
}
void ceph_bookkeeper_init(void)
{
dout(10, "bookkeeper: start\n");
INIT_LIST_HEAD(&_bk_allocs);
}
void ceph_bookkeeper_finalize(void)
{
ceph_bookkeeper_dump();
}

View File

@ -1,32 +0,0 @@
#ifdef CONFIG_CEPH_BOOKKEEPER
#ifndef _FS_CEPH_BOOKKEEPER_H
#define _FS_CEPH_BOOKKEEPER_H
extern void ceph_bookkeeper_dump(void);
extern void ceph_bookkeeper_init(void);
extern void ceph_bookkeeper_finalize(void);
extern void *ceph_kmalloc(char *fname, int line, size_t size, gfp_t flags);
extern void ceph_kfree(const void *ptr);
#endif
#ifndef CEPH_OVERRIDE_BOOKKEEPER
#define CEPH_BOOKKEEPER_DEFINED
#define kmalloc(size, flags) ceph_kmalloc(__FILE__, __LINE__, size, flags)
#define kzalloc(size, flags) ceph_kmalloc(__FILE__, __LINE__, size, \
flags | __GFP_ZERO)
#define kfree ceph_kfree
#endif
#ifdef CEPH_DISABLE_BOOKKEEPER
#ifdef CEPH_BOOKKEEPER_DEFINED
#undef kmalloc
#undef kzalloc
#undef kfree
#undef CEPH_BOOKKEEPER_DEFINED
#endif
#endif
#endif

View File

@ -3,8 +3,6 @@
#include <linux/string.h>
#include "bookkeeper.h"
extern int ceph_debug __read_mostly; /* debug level. */
extern int ceph_debug_console __read_mostly; /* send debug output to console? */
extern int ceph_debug_mask __read_mostly;
@ -85,21 +83,4 @@ extern int ceph_debug_tools __read_mostly;
#define dout(x, args...) _dout(x, args, TRAIL_PARAM)
#define derr(x, args...) _derr(x, args, TRAIL_PARAM)
/* dcache d_count debugging */
#if 0
# define dput(dentry) \
do { \
dout(20, "dput %p %d -> %d\n", dentry, \
atomic_read(&dentry->d_count), \
atomic_read(&dentry->d_count)-1); \
dput(dentry); \
} while (0)
# define d_drop(dentry) \
do { \
dout(20, "d_drop %p\n", dentry); \
d_drop(dentry); \
} while (0)
#endif
#endif

View File

@ -5,7 +5,6 @@
#include "super.h"
#include "mds_client.h"
#include "bookkeeper.h"
static struct dentry *ceph_debugfs_dir;
static struct dentry *ceph_debugfs_debug;
@ -13,9 +12,6 @@ static struct dentry *ceph_debugfs_debug_msgr;
static struct dentry *ceph_debugfs_debug_console;
static struct dentry *ceph_debugfs_debug_mask;
static struct dentry *ceph_debugfs_caps_reservation;
#ifdef CONFIG_CEPH_BOOKKEEPER
static struct dentry *ceph_debugfs_bookkeeper;
#endif
/*
* ceph_debug_mask
@ -431,24 +427,6 @@ DEFINE_SHOW_FUNC(osdc_show)
DEFINE_SHOW_FUNC(caps_reservation_show)
DEFINE_SHOW_FUNC(dentry_lru_show)
#ifdef CONFIG_CEPH_BOOKKEEPER
static int debugfs_bookkeeper_set(void *data, u64 val)
{
if (val)
ceph_bookkeeper_dump();
return 0;
}
static int debugfs_bookkeeper_get(void *data, u64 *val)
{
*val = 0;
return 0;
}
DEFINE_SIMPLE_ATTRIBUTE(bookkeeper_fops, debugfs_bookkeeper_get,
debugfs_bookkeeper_set, "%llu\n");
#endif
int ceph_debugfs_init(void)
{
int ret = -ENOMEM;
@ -495,15 +473,6 @@ int ceph_debugfs_init(void)
if (!ceph_debugfs_caps_reservation)
goto out;
#ifdef CONFIG_CEPH_BOOKKEEPER
ceph_debugfs_bookkeeper = debugfs_create_file("show_bookkeeper",
0600,
ceph_debugfs_dir,
NULL,
&bookkeeper_fops);
if (!ceph_debugfs_bookkeeper)
goto out;
#endif
return 0;
out:
@ -518,9 +487,6 @@ void ceph_debugfs_cleanup(void)
debugfs_remove(ceph_debugfs_debug_mask);
debugfs_remove(ceph_debugfs_debug_msgr);
debugfs_remove(ceph_debugfs_debug);
#ifdef CONFIG_CEPH_BOOKKEEPER
debugfs_remove(ceph_debugfs_bookkeeper);
#endif
debugfs_remove(ceph_debugfs_dir);
}

View File

@ -290,14 +290,11 @@ parameters.
EOF
git add fs/ceph/ceph_debug.h
git add fs/ceph/bookkeeper.h
git add fs/ceph/bookkeeper.c
git commit -F - <<EOF
ceph: debugging
Some debugging infrastructure, including the ability to adjust the
level of debug output on a per-file basis. A memory leak detection
tool can also be enabled via .config.
level of debug output on a per-file basis.
EOF

View File

@ -14,8 +14,9 @@ diff --git a/fs/Makefile b/fs/Makefile
index 38bc735..e11fa80 100644
--- a/fs/Makefile
+++ b/fs/Makefile
@@ -122,3 +122,4 @@ obj-$(CONFIG_DEBUG_FS) += debugfs/
@@ -122,4 +122,5 @@ obj-$(CONFIG_DEBUG_FS) += debugfs/
obj-$(CONFIG_OCFS2_FS) += ocfs2/
obj-$(CONFIG_BTRFS_FS) += btrfs/
obj-$(CONFIG_GFS2_FS) += gfs2/
obj-$(CONFIG_EXOFS_FS) += exofs/
+obj-$(CONFIG_CEPH_FS) += ceph/

View File

@ -14,7 +14,6 @@
#include "ceph_debug.h"
#include "ceph_ver.h"
#include "bookkeeper.h"
#include "decode.h"
/*
@ -867,8 +866,8 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
/* wait */
dout(10, "mount sent to mon%d, waiting for maps\n", which);
err = wait_event_interruptible_timeout(client->mount_wq,
have_all_maps(client),
request_interval);
client->mount_err || have_all_maps(client),
request_interval);
if (err == -EINTR)
goto out;
if (client->mount_err) {
@ -1173,9 +1172,6 @@ static int __init init_ceph(void)
dout(1, "init_ceph\n");
dout(0, "ceph (%s)\n", STRINGIFY(CEPH_GIT_VER));
#ifdef CONFIG_CEPH_BOOKKEEPER
ceph_bookkeeper_init();
#endif
ret = ceph_debugfs_init();
if (ret < 0)
goto out;
@ -1213,9 +1209,6 @@ static void __exit exit_ceph(void)
destroy_caches();
ceph_msgr_exit();
ceph_debugfs_cleanup();
#ifdef CONFIG_CEPH_BOOKKEEPER
ceph_bookkeeper_finalize();
#endif
}
module_init(init_ceph);

View File

@ -1,9 +1,6 @@
#ifndef _FS_CEPH_SUPER_H
#define _FS_CEPH_SUPER_H
#define CEPH_DISABLE_BOOKKEEPER
#include "bookkeeper.h"
#include <linux/fs.h>
#include <linux/wait.h>
#include <linux/completion.h>
@ -11,9 +8,6 @@
#include <linux/exportfs.h>
#include <linux/backing-dev.h>
#undef CEPH_DISABLE_BOOKKEEPER
#include "bookkeeper.h"
#include "types.h"
#include "ceph_debug.h"
#include "messenger.h"
@ -21,7 +15,6 @@
#include "mds_client.h"
#include "osd_client.h"
#include "ceph_fs.h"
#include "bookkeeper.h"
/* f_type in struct statfs */
#define CEPH_SUPER_MAGIC 0x00c36400

View File

@ -4331,7 +4331,7 @@ void MDCache::do_file_recover()
<< " " << *in << dendl;
file_recovering.insert(in);
mds->filer->probe(in->inode.ino, &in->inode.layout, in->last,
in->inode.max_size, &in->inode.size, false,
in->inode.max_size, &in->inode.size, &in->inode.mtime, false,
0, new C_MDC_Recover(this, in));
} else {
dout(10) << "do_file_recover skipping " << in->inode.size << "/" << in->inode.max_size
@ -4345,7 +4345,8 @@ void MDCache::do_file_recover()
void MDCache::_recovered(CInode *in, int r)
{
dout(10) << "_recovered r=" << r << " size=" << in->inode.size << " for " << *in << dendl;
dout(10) << "_recovered r=" << r << " size=" << in->inode.size << " mtime=" << in->inode.mtime
<< " for " << *in << dendl;
file_recovering.erase(in);
in->state_clear(CInode::STATE_RECOVERING);

View File

@ -133,7 +133,7 @@ void MDSTable::load_2(int r, bufferlist& bl, Context *onfinish)
assert(is_opening());
state = STATE_ACTIVE;
if (r > 0) {
if (r >= 0) {
dout(10) << "load_2 got " << bl.length() << " bytes" << dendl;
bufferlist::iterator p = bl.begin();
::decode(version, p);

View File

@ -138,6 +138,11 @@ bool ClientMonitor::check_mount(MClientMount *m)
dout(0) << "client is not authorized to mount" << dendl;
ss << "client " << addr << " is not authorized to mount";
mon->get_logclient()->log(LOG_SEC, ss);
string s;
getline(ss, s);
mon->messenger->send_message(new MClientMountAck(-EPERM, s.c_str()),
m->get_orig_source_inst());
return true;
}
if (client_map.addr_client.count(addr)) {

View File

@ -34,6 +34,8 @@
#include "messages/MMonPaxos.h"
#include "messages/MClass.h"
#include "messages/MClientMountAck.h"
#include "common/Timer.h"
#include "common/Clock.h"
@ -352,6 +354,16 @@ bool Monitor::dispatch_impl(Message *m)
if (m->get_header().monc_protocol != CEPH_MONC_PROTOCOL) {
dout(0) << "monc protocol v " << (int)m->get_header().monc_protocol << " != my " << CEPH_MONC_PROTOCOL
<< " from " << m->get_orig_source_inst() << " " << *m << dendl;
if (m->get_type() == CEPH_MSG_CLIENT_MOUNT) {
stringstream ss;
ss << "client protocol v " << (int)m->get_header().monc_protocol << " != server v " << CEPH_MONC_PROTOCOL;
string s;
getline(ss, s);
messenger->send_message(new MClientMountAck(-EINVAL, s.c_str()),
m->get_orig_source_inst());
}
delete m;
return true;
}

View File

@ -703,11 +703,12 @@ int ReplicatedPG::do_read_ops(MOSDOp *op, sobject_t& soid, object_info_t& oi,
{
struct stat st;
memset(&st, sizeof(st), 0);
int r = osd->store->stat(info.pgid.to_coll(), soid, &st);
if (r >= 0)
p->length = st.st_size;
else
result = r;
result = osd->store->stat(info.pgid.to_coll(), soid, &st);
if (result >= 0) {
__u64 size = st.st_size;
::encode(size, data);
::encode(oi.mtime, data);
}
}
break;

View File

@ -37,9 +37,10 @@ public:
Probe *probe;
object_t oid;
__u64 size;
utime_t mtime;
C_Probe(Filer *f, Probe *p, object_t o) : filer(f), probe(p), oid(o), size(0) {}
void finish(int r) {
filer->_probed(probe, oid, size);
filer->_probed(probe, oid, size, mtime);
}
};
@ -48,6 +49,7 @@ int Filer::probe(inodeno_t ino,
snapid_t snapid,
__u64 start_from,
__u64 *end, // LB, when !fwd
utime_t *pmtime,
bool fwd,
int flags,
Context *onfinish)
@ -59,7 +61,7 @@ int Filer::probe(inodeno_t ino,
assert(snapid); // (until there is a non-NOSNAP write)
Probe *probe = new Probe(ino, *layout, snapid, start_from, end, flags, fwd, onfinish);
Probe *probe = new Probe(ino, *layout, snapid, start_from, end, pmtime, flags, fwd, onfinish);
// period (bytes before we jump unto a new set of object(s))
__u64 period = ceph_file_layout_period(*layout);
@ -73,7 +75,7 @@ int Filer::probe(inodeno_t ino,
assert(start_from > *end);
if (start_from % period)
probe->probing_len -= period - (start_from % period);
probe->from -= probe->probing_len;
probe->probing_off -= probe->probing_len;
}
_probe(probe);
@ -84,26 +86,33 @@ int Filer::probe(inodeno_t ino,
void Filer::_probe(Probe *probe)
{
dout(10) << "_probe " << hex << probe->ino << dec
<< " " << probe->from << "~" << probe->probing_len
<< " " << probe->probing_off << "~" << probe->probing_len
<< dendl;
// map range onto objects
file_to_extents(probe->ino, &probe->layout, probe->from, probe->probing_len, probe->probing);
probe->known_size.clear();
probe->probing.clear();
file_to_extents(probe->ino, &probe->layout,
probe->probing_off, probe->probing_len, probe->probing);
for (vector<ObjectExtent>::iterator p = probe->probing.begin();
p != probe->probing.end();
p++) {
dout(10) << "_probe probing " << p->oid << dendl;
C_Probe *c = new C_Probe(this, probe, p->oid);
probe->ops[p->oid] = objecter->stat(p->oid, p->layout, probe->snapid, &c->size, probe->flags, c);
probe->ops[p->oid] = objecter->stat(p->oid, p->layout, probe->snapid, &c->size, &c->mtime, probe->flags, c);
}
}
void Filer::_probed(Probe *probe, object_t oid, __u64 size)
void Filer::_probed(Probe *probe, object_t oid, __u64 size, utime_t mtime)
{
dout(10) << "_probed " << probe->ino << " object " << hex << oid << dec << " has size " << size << dendl;
dout(10) << "_probed " << probe->ino << " object " << oid
<< " has size " << size << " mtime " << mtime << dendl;
probe->known_size[oid] = size;
if (mtime > probe->max_mtime)
probe->max_mtime = mtime;
probe->known[oid] = size;
assert(probe->ops.count(oid));
probe->ops.erase(oid);
@ -111,7 +120,6 @@ void Filer::_probed(Probe *probe, object_t oid, __u64 size)
return; // waiting for more!
// analyze!
bool found = false;
__u64 end = 0;
if (!probe->fwd) {
@ -130,63 +138,73 @@ void Filer::_probed(Probe *probe, object_t oid, __u64 size)
__u64 shouldbe = p->length + p->offset;
dout(10) << "_probed " << probe->ino << " object " << hex << p->oid << dec
<< " should be " << shouldbe
<< ", actual is " << probe->known[p->oid]
<< ", actual is " << probe->known_size[p->oid]
<< dendl;
if (probe->known[p->oid] < 0) { end = -1; break; } // error!
// error?
if (probe->known_size[p->oid] < 0) {
probe->err = probe->known_size[p->oid];
break;
}
assert(probe->known[p->oid] <= shouldbe);
if (shouldbe == probe->known[p->oid] && probe->fwd)
continue; // keep going
// aha, we found the end!
// calc offset into buffer_extent to get distance from probe->from.
__u64 oleft = probe->known[p->oid] - p->offset;
for (map<__u32,__u32>::iterator i = p->buffer_extents.begin();
i != p->buffer_extents.end();
i++) {
if (oleft <= (__u64)i->second) {
end = probe->from + i->first + oleft;
found = true;
dout(10) << "_probed end is in buffer_extent " << i->first << "~" << i->second << " off " << oleft
<< ", from was " << probe->from << ", end is " << end
<< dendl;
break;
if (!probe->found_size) {
assert(probe->known_size[p->oid] <= shouldbe);
if ((probe->fwd && probe->known_size[p->oid] == shouldbe) ||
(!probe->fwd && probe->known_size[p->oid] == 0))
continue; // keep going
// aha, we found the end!
// calc offset into buffer_extent to get distance from probe->from.
__u64 oleft = probe->known_size[p->oid] - p->offset;
for (map<__u32,__u32>::iterator i = p->buffer_extents.begin();
i != p->buffer_extents.end();
i++) {
if (oleft <= (__u64)i->second) {
end = probe->probing_off + i->first + oleft;
dout(10) << "_probed end is in buffer_extent " << i->first << "~" << i->second << " off " << oleft
<< ", from was " << probe->probing_off << ", end is " << end
<< dendl;
probe->found_size = true;
dout(10) << "_probed found size at " << end << dendl;
*probe->psize = end;
if (!probe->pmtime) // stop if we don't need mtime too
break;
}
oleft -= i->second;
}
oleft -= i->second;
}
break;
}
if (!found) {
if (!probe->found_size || (probe->probing_off && probe->pmtime)) {
// keep probing!
dout(10) << "_probed didn't find end, probing further" << dendl;
dout(10) << "_probed probing further" << dendl;
__u64 period = ceph_file_layout_period(probe->layout);
if (probe->fwd) {
probe->from += probe->probing_len;
assert(probe->from % period == 0);
probe->probing_off += probe->probing_len;
assert(probe->probing_off % period == 0);
probe->probing_len = period;
} else {
// previous period.
assert(probe->from % period == 0);
probe->probing_len -= period;
probe->from -= period;
assert(probe->probing_off % period == 0);
probe->probing_len = period;
probe->probing_off -= period;
}
_probe(probe);
return;
}
if (end < 0) {
dout(10) << "_probed encountered an error while probing" << dendl;
*probe->end = -1;
} else {
// hooray!
dout(10) << "_probed found end at " << end << dendl;
*probe->end = end;
if (probe->pmtime) {
dout(10) << "_probed found mtime " << probe->max_mtime << dendl;
*probe->pmtime = probe->max_mtime;
}
// done! finish and clean up.
probe->onfinish->finish(end >= 0 ? 0:-1);
probe->onfinish->finish(probe->err);
delete probe->onfinish;
delete probe;
}

View File

@ -46,8 +46,10 @@ class Filer {
inodeno_t ino;
ceph_file_layout layout;
snapid_t snapid;
__u64 from; // for !fwd, this is start of extent we are probing, thus possibly < our endpoint.
__u64 *end;
__u64 *psize;
utime_t *pmtime;
int flags;
bool fwd;
@ -55,21 +57,28 @@ class Filer {
Context *onfinish;
vector<ObjectExtent> probing;
__u64 probing_len;
__u64 probing_off, probing_len;
map<object_t, __u64> known;
map<object_t, __u64> known_size;
utime_t max_mtime;
map<object_t, tid_t> ops;
int err;
bool found_size;
Probe(inodeno_t i, ceph_file_layout &l, snapid_t sn,
__u64 f, __u64 *e, int fl, bool fw, Context *c) :
__u64 f, __u64 *e, utime_t *m, int fl, bool fw, Context *c) :
ino(i), layout(l), snapid(sn),
from(f), end(e), flags(fl), fwd(fw), onfinish(c), probing_len(0) {}
psize(e), pmtime(m), flags(fl), fwd(fw), onfinish(c),
probing_off(f), probing_len(0),
err(0), found_size(false) {}
};
class C_Probe;
void _probe(Probe *p);
void _probed(Probe *p, object_t oid, __u64 size);
void _probed(Probe *p, object_t oid, __u64 size, utime_t mtime);
public:
Filer(Objecter *o) : objecter(o) {}
@ -236,10 +245,10 @@ class Filer {
snapid_t snapid,
__u64 start_from,
__u64 *end,
utime_t *mtime,
bool fwd,
int flags,
Context *onfinish);
};

View File

@ -146,7 +146,7 @@ void Journaler::_finish_read_head(int r, bufferlist& bl)
state = STATE_PROBING;
C_ProbeEnd *fin = new C_ProbeEnd(this);
filer.probe(ino, &layout, CEPH_NOSNAP,
h.write_pos, (__u64 *)&fin->end, true, CEPH_OSD_FLAG_INCLOCK_FAIL, fin);
h.write_pos, (__u64 *)&fin->end, 0, true, CEPH_OSD_FLAG_INCLOCK_FAIL, fin);
}
void Journaler::_finish_probe_end(int r, __s64 end)

View File

@ -462,14 +462,9 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m)
dout(7) << " got reply on " << rd->ops << dendl;
int bytes_read = m->get_data().length();
if (rd->pbl)
rd->pbl->claim(m->get_data());
if (rd->psize)
for (vector<ceph_osd_op>::iterator p = m->ops.begin(); p != m->ops.end(); p++)
if (p->op == CEPH_OSD_OP_STAT)
*(rd->psize) = p->length;
// finish, clean up
Context *onfinish = rd->onfinish;
dout(7) << " " << bytes_read << " bytes " << dendl;
@ -477,7 +472,7 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m)
// done
delete rd;
if (onfinish) {
onfinish->finish(bytes_read);// > 0 ? bytes_read:m->get_result());
onfinish->finish(m->get_result());
delete onfinish;
}
delete m;

View File

@ -193,7 +193,7 @@ class Objecter {
snapid_t snap;
bufferlist bl;
bufferlist *pbl;
__u64 *psize;
int flags;
Context *onfinish;
@ -205,7 +205,7 @@ class Objecter {
ReadOp(object_t o, ceph_object_layout& ol, vector<ceph_osd_op>& op, snapid_t s, int f, Context *of) :
oid(o), layout(ol), snap(s),
pbl(0), psize(0), flags(f), onfinish(of),
pbl(0), flags(f), onfinish(of),
tid(0), attempts(0), inc_lock(-1),
paused(false) {
ops.swap(op);
@ -239,6 +239,30 @@ class Objecter {
}
};
struct C_Stat : public Context {
bufferlist bl;
__u64 *psize;
utime_t *pmtime;
Context *fin;
C_Stat(__u64 *ps, utime_t *pm, Context *c) :
psize(ps), pmtime(pm), fin(c) {}
void finish(int r) {
if (r >= 0) {
bufferlist::iterator p = bl.begin();
__u64 s;
utime_t m;
::decode(s, p);
::decode(m, p);
if (psize)
*psize = s;
if (pmtime)
*pmtime = m;
}
fin->finish(r);
delete fin;
}
};
private:
// pending ops
@ -323,12 +347,11 @@ class Objecter {
tid_t read_submit(ReadOp *rd);
tid_t modify_submit(ModifyOp *wr);
tid_t read(object_t oid, ceph_object_layout ol, vector<ceph_osd_op>& ops,
snapid_t snap, bufferlist *pbl, __u64 *psize, int flags,
tid_t read(object_t oid, ceph_object_layout ol, vector<ceph_osd_op>& ops,
snapid_t snap, bufferlist *pbl, int flags,
Context *onfinish) {
ReadOp *rd = new ReadOp(oid, ol, ops, snap, flags, onfinish);
rd->pbl = pbl;
rd->psize = psize;
return read_submit(rd);
}
tid_t read(object_t oid, ceph_object_layout ol,
@ -349,12 +372,13 @@ class Objecter {
// high-level helpers
tid_t stat(object_t oid, ceph_object_layout ol, snapid_t snap,
__u64 *psize, int flags,
__u64 *psize, utime_t *pmtime, int flags,
Context *onfinish) {
vector<ceph_osd_op> ops(1);
memset(&ops[0], 0, sizeof(ops[0]));
ops[0].op = CEPH_OSD_OP_STAT;
return read(oid, ol, ops, snap, 0, psize, flags, onfinish);
C_Stat *fin = new C_Stat(psize, pmtime, onfinish);
return read(oid, ol, ops, snap, &fin->bl, flags, fin);
}
tid_t read(object_t oid, ceph_object_layout ol,
@ -365,7 +389,7 @@ class Objecter {
ops[0].op = CEPH_OSD_OP_READ;
ops[0].offset = off;
ops[0].length = len;
return read(oid, ol, ops, snap, pbl, 0, flags, onfinish);
return read(oid, ol, ops, snap, pbl, flags, onfinish);
}
tid_t read_full(object_t oid, ceph_object_layout ol,
snapid_t snap, bufferlist *pbl, int flags,