librados: can set up object locator

This commit is contained in:
Yehuda Sadeh 2011-03-01 14:20:11 -08:00 committed by Josh Durgin
parent f45a790f6e
commit 2e3b84486c
3 changed files with 45 additions and 55 deletions

View File

@ -47,7 +47,6 @@ struct rados_statfs_t {
uint64_t num_objects;
};
void rados_version(int *major, int *minor, int *extra);
/* initialization */
@ -112,6 +111,8 @@ int rados_pool_create_with_all(rados_t cluster, const char *pool_name, uint64_t
int rados_pool_delete(rados_t cluster, const char *pool_name);
int rados_ioctx_pool_set_auid(rados_ioctx_t io, uint64_t auid);
void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key);
/* objects */
int rados_objects_list_open(rados_ioctx_t io, rados_list_ctx_t *ctx);
int rados_objects_list_next(rados_list_ctx_t ctx, const char **entry);

View File

@ -182,6 +182,8 @@ namespace librados
void set_assert_version(uint64_t ver);
const std::string& get_pool_name() const;
void locator_set_key(const std::string& key);
private:
/* You can only get IoCtx instances from Rados */
IoCtx(IoCtxImpl *io_ctx_impl_);

View File

@ -81,11 +81,12 @@ struct librados::IoCtxImpl {
uint64_t assert_ver;
eversion_t last_objver;
uint32_t notify_timeout;
object_locator_t oloc;
IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s = CEPH_NOSNAP) :
ref_cnt(0), client(c), poolid(pid),
pool_name(pool_name_), snap_seq(s), assert_ver(0),
notify_timeout(g_conf.client_notify_timeout) {}
notify_timeout(g_conf.client_notify_timeout), oloc(pid) {}
void set_snap_read(snapid_t s) {
if (!s)
@ -796,15 +797,13 @@ selfmanaged_snap_rollback_object(rados_ioctx_t io,
int reply;
IoCtxImpl* ctx = (IoCtxImpl *) io;
object_locator_t oloc(ctx->poolid);
Mutex mylock("RadosClient::snap_rollback::mylock");
Cond cond;
bool done;
Context *onack = new C_SafeCond(&mylock, &cond, &done, &reply);
lock.Lock();
objecter->rollback_object(oid, oloc, snapc, snapid,
objecter->rollback_object(oid, ctx->oloc, snapc, snapid,
g_clock.now(), onack, NULL);
lock.Unlock();
@ -1029,8 +1028,7 @@ create(IoCtxImpl& io, const object_t& oid, bool exclusive)
Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
lock.Lock();
object_locator_t oloc(io.poolid);
objecter->create(oid, oloc,
objecter->create(oid, io.oloc,
io.snapc, ut, 0, (exclusive ? CEPH_OSD_OP_FLAG_EXCL : 0),
onack, NULL, &ver);
lock.Unlock();
@ -1070,8 +1068,7 @@ write(IoCtxImpl& io, const object_t& oid, bufferlist& bl, size_t len, off_t off)
}
lock.Lock();
object_locator_t oloc(io.poolid);
objecter->write(oid, oloc,
objecter->write(oid, io.oloc,
off, len, io.snapc, bl, ut, 0,
onack, NULL, &ver, pop);
lock.Unlock();
@ -1114,8 +1111,7 @@ write_full(IoCtxImpl& io, const object_t& oid, bufferlist& bl)
}
lock.Lock();
object_locator_t oloc(io.poolid);
objecter->write_full(oid, oloc,
objecter->write_full(oid, io.oloc,
io.snapc, bl, ut, 0,
onack, NULL, &ver, pop);
lock.Unlock();
@ -1141,8 +1137,7 @@ aio_read(IoCtxImpl& io, const object_t oid, AioCompletionImpl *c,
c->pbl = pbl;
Mutex::Locker l(lock);
object_locator_t oloc(io.poolid);
objecter->read(oid, oloc,
objecter->read(oid, io.oloc,
off, len, io.snap_seq, &c->bl, 0,
onack, &c->objver);
return 0;
@ -1158,8 +1153,7 @@ aio_read(IoCtxImpl& io, const object_t oid, AioCompletionImpl *c,
c->maxlen = len;
Mutex::Locker l(lock);
object_locator_t oloc(io.poolid);
objecter->read(oid, oloc,
objecter->read(oid, io.oloc,
off, len, io.snap_seq, &c->bl, 0,
onack, &c->objver);
@ -1180,8 +1174,7 @@ aio_sparse_read(IoCtxImpl& io, const object_t oid,
c->pbl = NULL;
Mutex::Locker l(lock);
object_locator_t oloc(io.poolid);
objecter->sparse_read(oid, oloc,
objecter->sparse_read(oid, io.oloc,
off, len, io.snap_seq, &c->bl, 0,
onack);
return 0;
@ -1197,8 +1190,7 @@ aio_write(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c,
Context *onsafe = new C_aio_Safe(c);
Mutex::Locker l(lock);
object_locator_t oloc(io.poolid);
objecter->write(oid, oloc,
objecter->write(oid, io.oloc,
off, len, io.snapc, bl, ut, 0,
onack, onsafe, &c->objver);
@ -1215,8 +1207,7 @@ aio_write_full(IoCtxImpl& io, const object_t &oid,
Context *onsafe = new C_aio_Safe(c);
Mutex::Locker l(lock);
object_locator_t oloc(io.poolid);
objecter->write_full(oid, oloc,
objecter->write_full(oid, io.oloc,
io.snapc, bl, ut, 0,
onack, onsafe, &c->objver);
@ -1244,8 +1235,7 @@ remove(IoCtxImpl& io, const object_t& oid)
}
lock.Lock();
object_locator_t oloc(io.poolid);
objecter->remove(oid, oloc,
objecter->remove(oid, io.oloc,
snapc, ut, 0,
onack, NULL, &ver, pop);
lock.Unlock();
@ -1285,8 +1275,7 @@ trunc(IoCtxImpl& io, const object_t& oid, size_t size)
}
lock.Lock();
object_locator_t oloc(io.poolid);
objecter->trunc(oid, oloc,
objecter->trunc(oid, io.oloc,
io.snapc, ut, 0,
size, 0,
onack, NULL, &ver, pop);
@ -1318,14 +1307,13 @@ tmap_update(IoCtxImpl& io, const object_t& oid, bufferlist& cmdbl)
lock.Lock();
::SnapContext snapc;
object_locator_t oloc(io.poolid);
ObjectOperation wr;
if (io.assert_ver) {
wr.assert_version(io.assert_ver);
io.assert_ver = 0;
}
wr.tmap_update(cmdbl);
objecter->mutate(oid, oloc, wr, snapc, ut, 0, onack, NULL, &ver);
objecter->mutate(oid, io.oloc, wr, snapc, ut, 0, onack, NULL, &ver);
lock.Unlock();
mylock.Lock();
@ -1354,14 +1342,13 @@ exec(IoCtxImpl& io, const object_t& oid, const char *cls, const char *method,
lock.Lock();
object_locator_t oloc(io.poolid);
ObjectOperation rd;
if (io.assert_ver) {
rd.assert_version(io.assert_ver);
io.assert_ver = 0;
}
rd.call(cls, method, inbl);
objecter->read(oid, oloc, rd, io.snap_seq, &outbl, 0, onack, &ver);
objecter->read(oid, io.oloc, rd, io.snap_seq, &outbl, 0, onack, &ver);
lock.Unlock();
mylock.Lock();
@ -1392,8 +1379,7 @@ RadosClient::read(IoCtxImpl& io, const object_t& oid,
pop = &op;
}
lock.Lock();
object_locator_t oloc(io.poolid);
objecter->read(oid, oloc,
objecter->read(oid, io.oloc,
off, len, io.snap_seq, &bl, 0,
onack, &ver, pop);
lock.Unlock();
@ -1429,8 +1415,7 @@ mapext(IoCtxImpl& io, const object_t& oid, off_t off, size_t len, std::map<off_t
Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
lock.Lock();
object_locator_t oloc(io.poolid);
objecter->mapext(oid, oloc,
objecter->mapext(oid, io.oloc,
off, len, io.snap_seq, &bl, 0,
onack);
lock.Unlock();
@ -1463,8 +1448,7 @@ sparse_read(IoCtxImpl& io, const object_t& oid,
Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
lock.Lock();
object_locator_t oloc(io.poolid);
objecter->sparse_read(oid, oloc,
objecter->sparse_read(oid, io.oloc,
off, len, io.snap_seq, &bl, 0,
onack);
lock.Unlock();
@ -1507,8 +1491,7 @@ stat(IoCtxImpl& io, const object_t& oid, uint64_t *psize, time_t *pmtime)
pop = &op;
}
lock.Lock();
object_locator_t oloc(io.poolid);
objecter->stat(oid, oloc,
objecter->stat(oid, io.oloc,
io.snap_seq, psize, &mtime, 0,
onack, &ver, pop);
lock.Unlock();
@ -1545,8 +1528,7 @@ getxattr(IoCtxImpl& io, const object_t& oid, const char *name, bufferlist& bl)
pop = &op;
}
lock.Lock();
object_locator_t oloc(io.poolid);
objecter->getxattr(oid, oloc,
objecter->getxattr(oid, io.oloc,
name, io.snap_seq, &bl, 0,
onack, &ver, pop);
lock.Unlock();
@ -1582,8 +1564,6 @@ rmxattr(IoCtxImpl& io, const object_t& oid, const char *name)
Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
eversion_t ver;
object_locator_t oloc(io.poolid);
ObjectOperation op, *pop = NULL;
if (io.assert_ver) {
op.assert_version(io.assert_ver);
@ -1591,7 +1571,7 @@ rmxattr(IoCtxImpl& io, const object_t& oid, const char *name)
pop = &op;
}
lock.Lock();
objecter->removexattr(oid, oloc, name,
objecter->removexattr(oid, io.oloc, name,
io.snapc, ut, 0,
onack, NULL, &ver, pop);
lock.Unlock();
@ -1633,8 +1613,7 @@ setxattr(IoCtxImpl& io, const object_t& oid, const char *name, bufferlist& bl)
pop = &op;
}
lock.Lock();
object_locator_t oloc(io.poolid);
objecter->setxattr(oid, oloc, name,
objecter->setxattr(oid, io.oloc, name,
io.snapc, bl, ut, 0,
onack, NULL, &ver, pop);
lock.Unlock();
@ -1676,9 +1655,8 @@ getxattrs(IoCtxImpl& io, const object_t& oid, map<std::string, bufferlist>& attr
Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
lock.Lock();
object_locator_t oloc(io.poolid);
map<string, bufferlist> aset;
objecter->getxattrs(oid, oloc, io.snap_seq,
objecter->getxattrs(oid, io.oloc, io.snap_seq,
aset,
0, onack, &ver, pop);
lock.Unlock();
@ -1738,15 +1716,13 @@ watch(IoCtxImpl& io, const object_t& oid, uint64_t ver,
WatchContext *wc;
register_watcher(io, oid, ctx, rd, cookie, &wc);
object_locator_t oloc(io.poolid);
if (io.assert_ver) {
rd->assert_version(io.assert_ver);
io.assert_ver = 0;
}
rd->watch(*cookie, ver, 1);
bufferlist bl;
wc->linger_id = objecter->linger(oid, oloc, *rd, io.snap_seq, bl, NULL, 0, onack, NULL, &objver);
wc->linger_id = objecter->linger(oid, io.oloc, *rd, io.snap_seq, bl, NULL, 0, onack, NULL, &objver);
lock.Unlock();
mylock.Lock();
@ -1776,14 +1752,13 @@ _notify_ack(IoCtxImpl& io, const object_t& oid, uint64_t notify_id, uint64_t ver
Cond cond;
eversion_t objver;
object_locator_t oloc(io.poolid);
ObjectOperation rd;
if (io.assert_ver) {
rd.assert_version(io.assert_ver);
io.assert_ver = 0;
}
rd.notify_ack(notify_id, ver);
objecter->read(oid, oloc, rd, io.snap_seq, NULL, 0, 0, 0);
objecter->read(oid, io.oloc, rd, io.snap_seq, NULL, 0, 0, 0);
return 0;
}
@ -1804,14 +1779,13 @@ unwatch(IoCtxImpl& io, const object_t& oid, uint64_t cookie)
unregister_watcher(cookie);
object_locator_t oloc(io.poolid);
ObjectOperation rd;
if (io.assert_ver) {
rd.assert_version(io.assert_ver);
io.assert_ver = 0;
}
rd.watch(cookie, 0, 0);
objecter->read(oid, oloc, rd, io.snap_seq, &outbl, 0, onack, &ver);
objecter->read(oid, io.oloc, rd, io.snap_seq, &outbl, 0, onack, &ver);
lock.Unlock();
mylock.Lock();
@ -1841,7 +1815,6 @@ notify(IoCtxImpl& io, const object_t& oid, uint64_t ver)
C_NotifyComplete *ctx = new C_NotifyComplete(&mylock_all, &cond_all, &done_all);
ObjectOperation rd;
object_locator_t oloc(io.poolid);
if (io.assert_ver) {
rd.assert_version(io.assert_ver);
io.assert_ver = 0;
@ -1853,7 +1826,7 @@ notify(IoCtxImpl& io, const object_t& oid, uint64_t ver)
::encode(prot_ver, inbl);
::encode(timeout, inbl);
rd.notify(cookie, ver, inbl);
objecter->read(oid, oloc, rd, io.snap_seq, &outbl, 0, onack, &objver);
objecter->read(oid, io.oloc, rd, io.snap_seq, &outbl, 0, onack, &objver);
lock.Unlock();
mylock_all.Lock();
@ -2320,6 +2293,12 @@ get_pool_name() const
return io_ctx_impl->pool_name;
}
void librados::IoCtx::
locator_set_key(const string& key)
{
io_ctx_impl->oloc.key = key;
}
librados::IoCtx::
IoCtx(IoCtxImpl *io_ctx_impl_)
: io_ctx_impl(io_ctx_impl_)
@ -2807,6 +2786,14 @@ extern "C" int rados_ioctx_pool_set_auid(rados_ioctx_t io, uint64_t auid)
return ctx->client->pool_change_auid(ctx, auid);
}
extern "C" void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
if (key)
ctx->oloc.key = key;
else
ctx->oloc.key = "";
}
// snaps
extern "C" int rados_ioctx_snap_create(rados_ioctx_t io, const char *snapname)