Merge remote-tracking branch 'gh/wip-objecter-fsx'

Reviewed-by: Sage Weil <sage@inktank.com>
This commit is contained in:
Sage Weil 2013-02-22 14:16:07 -08:00
commit e03657e452
5 changed files with 86 additions and 27 deletions

View File

@ -1392,7 +1392,7 @@ void librados::IoCtxImpl::set_sync_op_version(eversion_t& ver)
int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver, int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver,
uint64_t *cookie, librados::WatchCtx *ctx) uint64_t *cookie, librados::WatchCtx *ctx)
{ {
::ObjectOperation rd; ::ObjectOperation wr;
Mutex mylock("IoCtxImpl::watch::mylock"); Mutex mylock("IoCtxImpl::watch::mylock");
Cond cond; Cond cond;
bool done; bool done;
@ -1404,13 +1404,13 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver,
WatchContext *wc = new WatchContext(this, oid, ctx); WatchContext *wc = new WatchContext(this, oid, ctx);
client->register_watcher(wc, cookie); client->register_watcher(wc, cookie);
prepare_assert_ops(&rd); prepare_assert_ops(&wr);
rd.watch(*cookie, ver, 1); wr.watch(*cookie, ver, 1);
bufferlist bl; bufferlist bl;
wc->linger_id = objecter->linger( wc->linger_id = objecter->linger_mutate(oid, oloc, wr,
oid, oloc, rd, snap_seq, bl, NULL, snapc, ceph_clock_now(NULL), bl,
CEPH_OSD_FLAG_WRITE, 0,
NULL, onfinish, &objver); NULL, onfinish, &objver);
lock->Unlock(); lock->Unlock();
mylock.Lock(); mylock.Lock();
@ -1452,16 +1452,16 @@ int librados::IoCtxImpl::unwatch(const object_t& oid, uint64_t cookie)
Cond cond; Cond cond;
bool done; bool done;
int r; int r;
Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); Context *oncommit = new C_SafeCond(&mylock, &cond, &done, &r);
eversion_t ver; eversion_t ver;
lock->Lock(); lock->Lock();
client->unregister_watcher(cookie); client->unregister_watcher(cookie);
::ObjectOperation rd; ::ObjectOperation wr;
prepare_assert_ops(&rd); prepare_assert_ops(&wr);
rd.watch(cookie, 0, 0); wr.watch(cookie, 0, 0);
objecter->read(oid, oloc, rd, snap_seq, &outbl, 0, onack, &ver); objecter->mutate(oid, oloc, wr, snapc, ceph_clock_now(client->cct), 0, NULL, oncommit, &ver);
lock->Unlock(); lock->Unlock();
mylock.Lock(); mylock.Lock();
@ -1500,8 +1500,8 @@ int librados::IoCtxImpl::notify(const object_t& oid, uint64_t ver, bufferlist& b
::encode(timeout, inbl); ::encode(timeout, inbl);
::encode(bl, inbl); ::encode(bl, inbl);
rd.notify(cookie, ver, inbl); rd.notify(cookie, ver, inbl);
wc->linger_id = objecter->linger(oid, oloc, rd, snap_seq, inbl, NULL, wc->linger_id = objecter->linger_read(oid, oloc, rd, snap_seq, inbl, NULL, 0,
0, onack, NULL, &objver); onack, &objver);
lock->Unlock(); lock->Unlock();
mylock.Lock(); mylock.Lock();

View File

@ -2974,6 +2974,10 @@ ostream& operator<<(ostream& out, const OSDOp& op)
case CEPH_OSD_OP_ROLLBACK: case CEPH_OSD_OP_ROLLBACK:
out << " " << snapid_t(op.op.snap.snapid); out << " " << snapid_t(op.op.snap.snapid);
break; break;
case CEPH_OSD_OP_WATCH:
out << (op.op.watch.flag ? " add":" remove")
<< " cookie " << op.op.watch.cookie << " ver " << op.op.watch.ver;
break;
default: default:
out << " " << op.op.extent.offset << "~" << op.op.extent.length; out << " " << op.op.extent.offset << "~" << op.op.extent.length;
if (op.op.extent.truncate_seq) if (op.op.extent.truncate_seq)

View File

@ -265,6 +265,8 @@ void Objecter::send_linger(LingerOp *info)
onack, oncommit, onack, oncommit,
info->pobjver); info->pobjver);
o->snapid = info->snap; o->snapid = info->snap;
o->snapc = info->snapc;
o->mtime = info->mtime;
// do not resend this; we will send a new op to reregister // do not resend this; we will send a new op to reregister
o->should_resend = false; o->should_resend = false;
@ -285,6 +287,9 @@ void Objecter::send_linger(LingerOp *info)
info->register_tid = _op_submit(o); info->register_tid = _op_submit(o);
} else { } else {
// first send // first send
// populate info->pgid and info->acting so we
// don't resend the linger op on the next osdmap update
recalc_linger_op_target(info);
info->register_tid = op_submit(o); info->register_tid = op_submit(o);
} }
@ -335,11 +340,43 @@ void Objecter::unregister_linger(uint64_t linger_id)
} }
} }
tid_t Objecter::linger(const object_t& oid, const object_locator_t& oloc, tid_t Objecter::linger_mutate(const object_t& oid, const object_locator_t& oloc,
ObjectOperation& op, ObjectOperation& op,
snapid_t snap, bufferlist& inbl, bufferlist *poutbl, int flags, const SnapContext& snapc, utime_t mtime,
Context *onack, Context *onfinish, bufferlist& inbl, int flags,
eversion_t *objver) Context *onack, Context *oncommit,
eversion_t *objver)
{
LingerOp *info = new LingerOp;
info->oid = oid;
info->oloc = oloc;
if (info->oloc.key == oid)
info->oloc.key.clear();
info->snapc = snapc;
info->mtime = mtime;
info->flags = flags | CEPH_OSD_FLAG_WRITE;
info->ops = op.ops;
info->inbl = inbl;
info->poutbl = NULL;
info->pobjver = objver;
info->on_reg_ack = onack;
info->on_reg_commit = oncommit;
info->linger_id = ++max_linger_id;
linger_ops[info->linger_id] = info;
logger->set(l_osdc_linger_active, linger_ops.size());
send_linger(info);
return info->linger_id;
}
tid_t Objecter::linger_read(const object_t& oid, const object_locator_t& oloc,
ObjectOperation& op,
snapid_t snap, bufferlist& inbl, bufferlist *poutbl, int flags,
Context *onfinish,
eversion_t *objver)
{ {
LingerOp *info = new LingerOp; LingerOp *info = new LingerOp;
info->oid = oid; info->oid = oid;
@ -352,7 +389,6 @@ tid_t Objecter::linger(const object_t& oid, const object_locator_t& oloc,
info->inbl = inbl; info->inbl = inbl;
info->poutbl = poutbl; info->poutbl = poutbl;
info->pobjver = objver; info->pobjver = objver;
info->on_reg_ack = onack;
info->on_reg_commit = onfinish; info->on_reg_commit = onfinish;
info->linger_id = ++max_linger_id; info->linger_id = ++max_linger_id;

View File

@ -848,6 +848,9 @@ public:
vector<int> acting; vector<int> acting;
snapid_t snap; snapid_t snap;
SnapContext snapc;
utime_t mtime;
int flags; int flags;
vector<OSDOp> ops; vector<OSDOp> ops;
bufferlist inbl; bufferlist inbl;
@ -863,7 +866,8 @@ public:
tid_t register_tid; tid_t register_tid;
epoch_t map_dne_bound; epoch_t map_dne_bound;
LingerOp() : linger_id(0), flags(0), poutbl(NULL), pobjver(NULL), LingerOp() : linger_id(0), snap(CEPH_NOSNAP), flags(0),
poutbl(NULL), pobjver(NULL),
registered(false), registered(false),
on_reg_ack(NULL), on_reg_commit(NULL), on_reg_ack(NULL), on_reg_commit(NULL),
session(NULL), session_item(this), session(NULL), session_item(this),
@ -1114,11 +1118,17 @@ private:
o->out_rval.swap(op.out_rval); o->out_rval.swap(op.out_rval);
return op_submit(o); return op_submit(o);
} }
tid_t linger(const object_t& oid, const object_locator_t& oloc, tid_t linger_mutate(const object_t& oid, const object_locator_t& oloc,
ObjectOperation& op, ObjectOperation& op,
snapid_t snap, bufferlist& inbl, bufferlist *poutbl, int flags, const SnapContext& snapc, utime_t mtime,
Context *onack, Context *onfinish, bufferlist& inbl, int flags,
eversion_t *objver); Context *onack, Context *onfinish,
eversion_t *objver);
tid_t linger_read(const object_t& oid, const object_locator_t& oloc,
ObjectOperation& op,
snapid_t snap, bufferlist& inbl, bufferlist *poutbl, int flags,
Context *onack,
eversion_t *objver);
void unregister_linger(uint64_t linger_id); void unregister_linger(uint64_t linger_id);
/** /**

View File

@ -845,7 +845,12 @@ do_clone()
simple_err("do_clone: rbd clone", ret); simple_err("do_clone: rbd clone", ret);
exit(165); exit(165);
} }
rbd_close(image);
if ((ret = rbd_close(image)) < 0) {
simple_err("do_clone: rbd close", ret);
exit(174);
}
if ((ret = rbd_open(ioctx, imagename, &image, NULL)) < 0) { if ((ret = rbd_open(ioctx, imagename, &image, NULL)) < 0) {
simple_err("do_clone: rbd open", ret); simple_err("do_clone: rbd open", ret);
exit(166); exit(166);
@ -896,6 +901,10 @@ check_clone(int clonenum)
exit(171); exit(171);
} }
close(fd); close(fd);
if ((ret = rbd_close(cur_image)) < 0) {
simple_err("check_clone: rbd close", ret);
exit(174);
}
check_buffers(good_buf, temp_buf, 0, file_info.st_size); check_buffers(good_buf, temp_buf, 0, file_info.st_size);
unlink(filename); unlink(filename);