osd, librados: misc fixes, linger related issues

This commit is contained in:
Yehuda Sadeh 2010-11-30 13:21:50 -08:00
parent adbb545903
commit 1123b5c588
6 changed files with 40 additions and 54 deletions

View File

@ -77,43 +77,6 @@ class RadosClient : public Dispatcher
Cond cond;
SafeTimer timer;
// watch/notify
struct WatchContext : public Context {
librados::Rados::WatchCtx *ctx;
uint64_t cookie;
uint64_t ver;
WatchContext(librados::Rados::WatchCtx *_ctx) : ctx(_ctx) {}
~WatchContext() {
delete ctx;
}
void finish(int r) {
ctx->finish(r);
}
};
Mutex watch_lock;
uint64_t max_watch_cookie;
map<uint64_t, WatchContext *> watchers;
pthread_key_t tls_key;
RadosTlsInfo *tls_info() {
struct RadosTlsInfo *info = (RadosTlsInfo *)pthread_getspecific(tls_key);
cout << "pthread_getspecific returned " << (void *)info << std::endl;
if (!info) {
info = new RadosTlsInfo(tls_key);
pthread_setspecific(tls_key, info);
}
return info;
}
void set_sync_op_version(eversion_t& ver) {
RadosTlsInfo *info = tls_info();
if (info)
info->objver = ver;
}
public:
RadosClient() : messenger(NULL), lock("radosclient"), timer(lock),
watch_lock("watch_lock"), max_watch_cookie(0) {
@ -390,8 +353,10 @@ public:
uint64_t cookie;
uint64_t ver;
librados::Rados::WatchCtx *ctx;
ObjectOperation *op;
WatchContext(PoolCtx& _pc, const object_t& _oc, librados::Rados::WatchCtx *_ctx) : pool_ctx(_pc), oid(_oc), ctx(_ctx) {}
WatchContext(PoolCtx& _pc, const object_t& _oc, librados::Rados::WatchCtx *_ctx,
ObjectOperation *_op) : pool_ctx(_pc), oid(_oc), ctx(_ctx), op(_op) {}
~WatchContext() {
delete ctx;
}
@ -426,8 +391,9 @@ public:
pool.last_objver = ver;
}
int register_watcher(PoolCtx& pool, const object_t& oid, librados::Rados::WatchCtx *ctx, uint64_t *cookie) {
WatchContext *wc = new WatchContext(pool, oid, ctx);
int register_watcher(PoolCtx& pool, const object_t& oid, librados::Rados::WatchCtx *ctx,
ObjectOperation *op, uint64_t *cookie) {
WatchContext *wc = new WatchContext(pool, oid, ctx, op);
if (!wc)
return -ENOMEM;
watch_lock.Lock();
@ -1604,9 +1570,14 @@ int RadosClient::watch(PoolCtx& pool, const object_t& oid, uint64_t ver, uint64_
utime_t ut = g_clock.now();
bufferlist inbl, outbl;
int r = register_watcher(pool, oid, ctx, cookie);
if (r < 0)
ObjectOperation *rd = new ObjectOperation();
if (!rd)
return -ENOMEM;
int r = register_watcher(pool, oid, ctx, rd, cookie);
if (r < 0) {
delete rd;
return r;
}
Mutex mylock("RadosClient::watch::mylock");
Cond cond;
@ -1616,13 +1587,14 @@ int RadosClient::watch(PoolCtx& pool, const object_t& oid, uint64_t ver, uint64_
lock.Lock();
object_locator_t oloc(pool.poolid);
ObjectOperation rd;
if (pool.assert_ver) {
rd.assert_version(pool.assert_ver);
rd->assert_version(pool.assert_ver);
pool.assert_ver = 0;
}
rd.watch(*cookie, ver, 1);
objecter->read(oid, oloc, rd, pool.snap_seq, &outbl, 0, onack, &objver);
rd->watch(*cookie, ver, 1);
rd->set_linger(true);
objecter->read(oid, oloc, *rd, pool.snap_seq, &outbl, 0, onack, &objver);
lock.Unlock();
mylock.Lock();
@ -1632,6 +1604,10 @@ int RadosClient::watch(PoolCtx& pool, const object_t& oid, uint64_t ver, uint64_
set_sync_op_version(pool, objver);
if (r < 0) {
unregister_watcher(*cookie);
}
return r;
}
@ -1713,13 +1689,13 @@ int RadosClient::notify(PoolCtx& pool, const object_t& oid, uint64_t ver)
eversion_t objver;
uint64_t cookie;
C_NotifyComplete *ctx = new C_NotifyComplete(&mylock_all, &cond_all, &done_all);
ObjectOperation rd;
r = register_watcher(pool, oid, ctx, &cookie);
r = register_watcher(pool, oid, ctx, &rd, &cookie);
if (r < 0)
return r;
object_locator_t oloc(pool.poolid);
ObjectOperation rd;
if (pool.assert_ver) {
rd.assert_version(pool.assert_ver);
pool.assert_ver = 0;

View File

@ -1415,8 +1415,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
session->get();
session->watches[obc] = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
obc->ref++;
#if 0
assert(obc->unconnected_watchers.count(entity));
obc->unconnected_watchers.erase(entity);
#endif
} else if (iter->second == session) {
// already there
dout(10) << " already connected to watch " << w << " by " << entity
@ -2554,7 +2556,7 @@ int ReplicatedPG::find_object_context(const object_t& oid, const object_locator_
// want the head?
sobject_t head(oid, CEPH_NOSNAP);
if (snapid == CEPH_NOSNAP) {
ObjectContext *obc = get_object_context(head, OLOC_BLANK, can_create);
ObjectContext *obc = get_object_context(head, oloc, can_create);
if (!obc)
return -ENOENT;
dout(10) << "find_object_context " << oid << " @" << snapid << dendl;

View File

@ -1389,7 +1389,7 @@ struct object_info_t {
}
object_info_t(const sobject_t& s, const object_locator_t& o) :
soid(s), size(0),
soid(s), oloc(o), size(0),
truncate_seq(0), truncate_size(0) {}
object_info_t(bufferlist& bl) {
decode(bl);

View File

@ -445,9 +445,11 @@ tid_t Objecter::op_submit(Op *op)
dout(20) << " note: not requesting commit" << dendl;
}
op_osd[op->tid] = op;
if (op->linger) {
op_osd_linger[op->tid] = op;
}
pg.active_tids.insert(op->tid);
pg.last = g_clock.now();

View File

@ -48,8 +48,9 @@ struct ObjectOperation {
vector<OSDOp> ops;
int flags;
int priority;
bool linger;
ObjectOperation() : flags(0), priority(0) {}
ObjectOperation() : flags(0), priority(0), linger(false) {}
void add_op(int op) {
int s = ops.size();
@ -100,6 +101,9 @@ struct ObjectOperation {
ops[s].op.pgls.count = count;
ops[s].op.pgls.cookie = cookie;
}
void set_linger(bool l) {
linger = l;
}
// ------
@ -271,6 +275,8 @@ public:
eversion_t *objver;
bool linger;
Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
int f, Context *ac, Context *co, eversion_t *ov) :
session_item(this),
@ -278,7 +284,7 @@ public:
con(NULL),
snapid(CEPH_NOSNAP), outbl(0), flags(f), priority(0), onack(ac), oncommit(co),
tid(0), attempts(0),
paused(false), objver(ov) {
paused(false), objver(ov), linger(false) {
ops.swap(op);
}
};

View File

@ -67,7 +67,7 @@ int main(int argc, const char **argv)
cout << "open pool result = " << r << " pool = " << pool << std::endl;
r = rados.write(pool, oid, 0, bl, bl.length());
uint64_t objver = rados.get_last_ver(pool);
uint64_t objver = rados.get_last_version(pool);
cout << "rados.write returned " << r << " last_ver=" << objver << std::endl;
uint64_t handle;
@ -82,7 +82,7 @@ int main(int argc, const char **argv)
cout << "*** press enter to continue ***" << std::endl;
getchar();
rados.set_assert_ver(pool, objver);
rados.set_assert_version(pool, objver);
r = rados.write(pool, oid, 0, bl, bl.length() - 1);
cout << "rados.write returned " << r << std::endl;