librados: add async pool op operations

This commit is contained in:
Yehuda Sadeh 2011-06-17 15:30:30 -07:00
parent 6fb971ffdb
commit e2150a0890
2 changed files with 179 additions and 1 deletions

View File

@ -22,6 +22,7 @@ namespace librados
class IoCtxImpl;
class ObjectOperationImpl;
class ObjListCtx;
class PoolAsyncCompletionImpl;
class RadosClient;
typedef void *list_ctx_t;
@ -90,6 +91,16 @@ namespace librados
AioCompletionImpl *pc;
};
struct PoolAsyncCompletion {
PoolAsyncCompletion(PoolAsyncCompletionImpl *pc_) : pc(pc_) {}
int set_callback(void *cb_arg, callback_t cb);
int wait();
bool is_complete();
int get_return_value();
void release();
PoolAsyncCompletionImpl *pc;
};
/*
* ObjectOperation : compount object operation
* Batch multiple object operations into a single request, to be applied
@ -150,6 +161,9 @@ namespace librados
// set pool auid
int set_auid(uint64_t auid_);
// set pool auid
int set_auid_async(uint64_t auid_, PoolAsyncCompletion *c);
// get pool auid
int get_auid(uint64_t *auid_);
@ -281,7 +295,10 @@ namespace librados
std::map<std::string,pool_stat_t>& stats);
int cluster_stat(cluster_stat_t& result);
// -- aio --
/* pool aio */
static PoolAsyncCompletion *pool_async_create_completion();
// -- aio --
static AioCompletion *aio_create_completion();
static AioCompletion *aio_create_completion(void *cb_arg, callback_t cb_complete,
callback_t cb_safe);

View File

@ -368,6 +368,71 @@ struct librados::ObjListCtx {
}
};
struct librados::PoolAsyncCompletionImpl {
Mutex lock;
Cond cond;
int ref, rval;
bool released;
bool done;
rados_callback_t callback;
void *callback_arg;
PoolAsyncCompletionImpl() : lock("PoolAsyncCompletionImpl lock"),
ref(1), rval(0), released(false), done(false),
callback(0), callback_arg(0) {}
int set_callback(void *cb_arg, rados_callback_t cb) {
lock.Lock();
callback = cb;
callback_arg = cb_arg;
lock.Unlock();
return 0;
}
int wait() {
lock.Lock();
while (!done)
cond.Wait(lock);
lock.Unlock();
return 0;
}
int is_complete() {
lock.Lock();
int r = done;
lock.Unlock();
return r;
}
int get_return_value() {
lock.Lock();
int r = rval;
lock.Unlock();
return r;
}
void get() {
lock.Lock();
assert(ref > 0);
ref++;
lock.Unlock();
}
void release() {
lock.Lock();
assert(!released);
released = true;
put_unlock();
}
void put() {
lock.Lock();
put_unlock();
}
void put_unlock() {
assert(ref > 0);
int n = --ref;
lock.Unlock();
if (!n)
delete this;
}
};
class librados::RadosClient : public Dispatcher
{
public:
@ -476,6 +541,8 @@ public:
int pool_change_auid(rados_ioctx_t io, unsigned long long auid);
int pool_get_auid(rados_ioctx_t io, unsigned long long *auid);
int pool_change_auid_async(rados_ioctx_t io, unsigned long long auid, PoolAsyncCompletionImpl *c);
int list(Objecter::ListContext *context, int max_entries);
int operate(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, bufferlist *pbl);
@ -587,6 +654,39 @@ public:
int aio_write_full(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c,
const bufferlist& bl);
struct C_PoolAsync_Safe : public Context {
PoolAsyncCompletionImpl *c;
void finish(int r) {
c->lock.Lock();
c->rval = r;
c->done = true;
c->cond.Signal();
if (c->callback) {
rados_callback_t cb = c->callback;
void *cb_arg = c->callback_arg;
c->lock.Unlock();
cb(c, cb_arg);
c->lock.Lock();
}
c->put_unlock();
}
C_PoolAsync_Safe(PoolAsyncCompletionImpl *_c) : c(_c) {
c->get();
}
};
static PoolAsyncCompletionImpl *pool_async_create_completion() {
return new PoolAsyncCompletionImpl;
}
static PoolAsyncCompletionImpl *pool_async_create_completion(void *cb_arg, rados_callback_t cb) {
PoolAsyncCompletionImpl *c = new PoolAsyncCompletionImpl;
if (cb)
c->set_callback(cb_arg, cb);
return c;
}
static AioCompletionImpl *aio_create_completion() {
return new AioCompletionImpl;
}
@ -1123,6 +1223,17 @@ pool_change_auid(rados_ioctx_t io, unsigned long long auid)
return reply;
}
int librados::RadosClient::
pool_change_auid_async(rados_ioctx_t io, unsigned long long auid, PoolAsyncCompletionImpl *c)
{
int poolID = ((IoCtxImpl *)io)->poolid;
Mutex::Locker l(lock);
objecter->change_pool_auid(poolID,
new C_PoolAsync_Safe(c),
auid);
}
int librados::RadosClient::
pool_get_auid(rados_ioctx_t io, unsigned long long *auid)
{
@ -2287,6 +2398,43 @@ get_next()
const librados::ObjectIterator librados::ObjectIterator::
__EndObjectIterator(NULL);
///////////////////////////// PoolAsyncCompletion //////////////////////////////
int librados::PoolAsyncCompletion::PoolAsyncCompletion::
set_callback(void *cb_arg, rados_callback_t cb)
{
PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc;
return c->set_callback(cb_arg, cb);
}
int librados::PoolAsyncCompletion::PoolAsyncCompletion::
wait()
{
PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc;
return c->wait();
}
bool librados::PoolAsyncCompletion::PoolAsyncCompletion::
is_complete()
{
PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc;
return c->is_complete();
}
int librados::PoolAsyncCompletion::PoolAsyncCompletion::
get_return_value()
{
PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc;
return c->get_return_value();
}
void librados::PoolAsyncCompletion::PoolAsyncCompletion::
release()
{
PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc;
c->release();
delete this;
}
///////////////////////////// AioCompletion //////////////////////////////
int librados::AioCompletion::AioCompletion::
set_complete_callback(void *cb_arg, rados_callback_t cb)
@ -2408,6 +2556,12 @@ set_auid(uint64_t auid_)
return io_ctx_impl->client->pool_change_auid(io_ctx_impl, auid_);
}
int librados::IoCtx::
set_auid_async(uint64_t auid_, PoolAsyncCompletion *c)
{
return io_ctx_impl->client->pool_change_auid_async(io_ctx_impl, auid_, c->pc);
}
int librados::IoCtx::
get_auid(uint64_t *auid_)
{
@ -2912,6 +3066,13 @@ cluster_stat(cluster_stat_t& result)
return r;
}
librados::PoolAsyncCompletion *librados::Rados::
pool_async_create_completion()
{
PoolAsyncCompletionImpl *c = RadosClient::pool_async_create_completion();
return new PoolAsyncCompletion(c);
}
librados::AioCompletion *librados::Rados::
aio_create_completion()
{