diff --git a/src/librados.cc b/src/librados.cc index 8a8797c0fae..506d7aedb6e 100644 --- a/src/librados.cc +++ b/src/librados.cc @@ -77,43 +77,6 @@ class RadosClient : public Dispatcher Cond cond; SafeTimer timer; - // watch/notify - struct WatchContext : public Context { - librados::Rados::WatchCtx *ctx; - uint64_t cookie; - uint64_t ver; - - WatchContext(librados::Rados::WatchCtx *_ctx) : ctx(_ctx) {} - ~WatchContext() { - delete ctx; - } - void finish(int r) { - ctx->finish(r); - } - }; - - Mutex watch_lock; - uint64_t max_watch_cookie; - map watchers; - - pthread_key_t tls_key; - - RadosTlsInfo *tls_info() { - struct RadosTlsInfo *info = (RadosTlsInfo *)pthread_getspecific(tls_key); - cout << "pthread_getspecific returned " << (void *)info << std::endl; - if (!info) { - info = new RadosTlsInfo(tls_key); - pthread_setspecific(tls_key, info); - } - return info; - } - - void set_sync_op_version(eversion_t& ver) { - RadosTlsInfo *info = tls_info(); - if (info) - info->objver = ver; - } - public: RadosClient() : messenger(NULL), lock("radosclient"), timer(lock), watch_lock("watch_lock"), max_watch_cookie(0) { @@ -390,8 +353,10 @@ public: uint64_t cookie; uint64_t ver; librados::Rados::WatchCtx *ctx; + ObjectOperation *op; - WatchContext(PoolCtx& _pc, const object_t& _oc, librados::Rados::WatchCtx *_ctx) : pool_ctx(_pc), oid(_oc), ctx(_ctx) {} + WatchContext(PoolCtx& _pc, const object_t& _oc, librados::Rados::WatchCtx *_ctx, + ObjectOperation *_op) : pool_ctx(_pc), oid(_oc), ctx(_ctx), op(_op) {} ~WatchContext() { delete ctx; } @@ -426,8 +391,9 @@ public: pool.last_objver = ver; } - int register_watcher(PoolCtx& pool, const object_t& oid, librados::Rados::WatchCtx *ctx, uint64_t *cookie) { - WatchContext *wc = new WatchContext(pool, oid, ctx); + int register_watcher(PoolCtx& pool, const object_t& oid, librados::Rados::WatchCtx *ctx, + ObjectOperation *op, uint64_t *cookie) { + WatchContext *wc = new WatchContext(pool, oid, ctx, op); if (!wc) return -ENOMEM; watch_lock.Lock(); @@ -1604,9 +1570,14 @@ int RadosClient::watch(PoolCtx& pool, const object_t& oid, uint64_t ver, uint64_ utime_t ut = g_clock.now(); bufferlist inbl, outbl; - int r = register_watcher(pool, oid, ctx, cookie); - if (r < 0) + ObjectOperation *rd = new ObjectOperation(); + if (!rd) + return -ENOMEM; + int r = register_watcher(pool, oid, ctx, rd, cookie); + if (r < 0) { + delete rd; return r; + } Mutex mylock("RadosClient::watch::mylock"); Cond cond; @@ -1616,13 +1587,14 @@ int RadosClient::watch(PoolCtx& pool, const object_t& oid, uint64_t ver, uint64_ lock.Lock(); object_locator_t oloc(pool.poolid); - ObjectOperation rd; + if (pool.assert_ver) { - rd.assert_version(pool.assert_ver); + rd->assert_version(pool.assert_ver); pool.assert_ver = 0; } - rd.watch(*cookie, ver, 1); - objecter->read(oid, oloc, rd, pool.snap_seq, &outbl, 0, onack, &objver); + rd->watch(*cookie, ver, 1); + rd->set_linger(true); + objecter->read(oid, oloc, *rd, pool.snap_seq, &outbl, 0, onack, &objver); lock.Unlock(); mylock.Lock(); @@ -1632,6 +1604,10 @@ int RadosClient::watch(PoolCtx& pool, const object_t& oid, uint64_t ver, uint64_ set_sync_op_version(pool, objver); + if (r < 0) { + unregister_watcher(*cookie); + } + return r; } @@ -1713,13 +1689,13 @@ int RadosClient::notify(PoolCtx& pool, const object_t& oid, uint64_t ver) eversion_t objver; uint64_t cookie; C_NotifyComplete *ctx = new C_NotifyComplete(&mylock_all, &cond_all, &done_all); + ObjectOperation rd; - r = register_watcher(pool, oid, ctx, &cookie); + r = register_watcher(pool, oid, ctx, &rd, &cookie); if (r < 0) return r; object_locator_t oloc(pool.poolid); - ObjectOperation rd; if (pool.assert_ver) { rd.assert_version(pool.assert_ver); pool.assert_ver = 0; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 3562d50ddae..563c48f43fb 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1415,8 +1415,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, session->get(); session->watches[obc] = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc); obc->ref++; +#if 0 assert(obc->unconnected_watchers.count(entity)); obc->unconnected_watchers.erase(entity); +#endif } else if (iter->second == session) { // already there dout(10) << " already connected to watch " << w << " by " << entity @@ -2554,7 +2556,7 @@ int ReplicatedPG::find_object_context(const object_t& oid, const object_locator_ // want the head? sobject_t head(oid, CEPH_NOSNAP); if (snapid == CEPH_NOSNAP) { - ObjectContext *obc = get_object_context(head, OLOC_BLANK, can_create); + ObjectContext *obc = get_object_context(head, oloc, can_create); if (!obc) return -ENOENT; dout(10) << "find_object_context " << oid << " @" << snapid << dendl; diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 4dd17fc82dd..09d080caf57 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -1389,7 +1389,7 @@ struct object_info_t { } object_info_t(const sobject_t& s, const object_locator_t& o) : - soid(s), size(0), + soid(s), oloc(o), size(0), truncate_seq(0), truncate_size(0) {} object_info_t(bufferlist& bl) { decode(bl); diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 994951cdffb..97245d5044e 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -445,9 +445,11 @@ tid_t Objecter::op_submit(Op *op) dout(20) << " note: not requesting commit" << dendl; } op_osd[op->tid] = op; + if (op->linger) { op_osd_linger[op->tid] = op; } + pg.active_tids.insert(op->tid); pg.last = g_clock.now(); diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index da7666b3bb7..512c4f7486d 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -48,8 +48,9 @@ struct ObjectOperation { vector ops; int flags; int priority; + bool linger; - ObjectOperation() : flags(0), priority(0) {} + ObjectOperation() : flags(0), priority(0), linger(false) {} void add_op(int op) { int s = ops.size(); @@ -100,6 +101,9 @@ struct ObjectOperation { ops[s].op.pgls.count = count; ops[s].op.pgls.cookie = cookie; } + void set_linger(bool l) { + linger = l; + } // ------ @@ -271,6 +275,8 @@ public: eversion_t *objver; + bool linger; + Op(const object_t& o, const object_locator_t& ol, vector& op, int f, Context *ac, Context *co, eversion_t *ov) : session_item(this), @@ -278,7 +284,7 @@ public: con(NULL), snapid(CEPH_NOSNAP), outbl(0), flags(f), priority(0), onack(ac), oncommit(co), tid(0), attempts(0), - paused(false), objver(ov) { + paused(false), objver(ov), linger(false) { ops.swap(op); } }; diff --git a/src/testradospp.cc b/src/testradospp.cc index a2c4449f004..48b0fc14a3d 100644 --- a/src/testradospp.cc +++ b/src/testradospp.cc @@ -67,7 +67,7 @@ int main(int argc, const char **argv) cout << "open pool result = " << r << " pool = " << pool << std::endl; r = rados.write(pool, oid, 0, bl, bl.length()); - uint64_t objver = rados.get_last_ver(pool); + uint64_t objver = rados.get_last_version(pool); cout << "rados.write returned " << r << " last_ver=" << objver << std::endl; uint64_t handle; @@ -82,7 +82,7 @@ int main(int argc, const char **argv) cout << "*** press enter to continue ***" << std::endl; getchar(); - rados.set_assert_ver(pool, objver); + rados.set_assert_version(pool, objver); r = rados.write(pool, oid, 0, bl, bl.length() - 1); cout << "rados.write returned " << r << std::endl;