diff --git a/src/include/rados/librados.hpp b/src/include/rados/librados.hpp index fd7ec6468d7..31b9b2b8c81 100644 --- a/src/include/rados/librados.hpp +++ b/src/include/rados/librados.hpp @@ -22,6 +22,7 @@ namespace librados class IoCtxImpl; class ObjectOperationImpl; class ObjListCtx; + class PoolAsyncCompletionImpl; class RadosClient; typedef void *list_ctx_t; @@ -90,6 +91,16 @@ namespace librados AioCompletionImpl *pc; }; + struct PoolAsyncCompletion { + PoolAsyncCompletion(PoolAsyncCompletionImpl *pc_) : pc(pc_) {} + int set_callback(void *cb_arg, callback_t cb); + int wait(); + bool is_complete(); + int get_return_value(); + void release(); + PoolAsyncCompletionImpl *pc; + }; + /* * ObjectOperation : compount object operation * Batch multiple object operations into a single request, to be applied @@ -150,6 +161,9 @@ namespace librados // set pool auid int set_auid(uint64_t auid_); + // set pool auid + int set_auid_async(uint64_t auid_, PoolAsyncCompletion *c); + // get pool auid int get_auid(uint64_t *auid_); @@ -281,7 +295,10 @@ namespace librados std::map& stats); int cluster_stat(cluster_stat_t& result); - // -- aio -- + /* pool aio */ + static PoolAsyncCompletion *pool_async_create_completion(); + + // -- aio -- static AioCompletion *aio_create_completion(); static AioCompletion *aio_create_completion(void *cb_arg, callback_t cb_complete, callback_t cb_safe); diff --git a/src/librados.cc b/src/librados.cc index 85b9e91ef2a..767fc82b8ce 100644 --- a/src/librados.cc +++ b/src/librados.cc @@ -368,6 +368,71 @@ struct librados::ObjListCtx { } }; +struct librados::PoolAsyncCompletionImpl { + Mutex lock; + Cond cond; + int ref, rval; + bool released; + bool done; + + rados_callback_t callback; + void *callback_arg; + + PoolAsyncCompletionImpl() : lock("PoolAsyncCompletionImpl lock"), + ref(1), rval(0), released(false), done(false), + callback(0), callback_arg(0) {} + + int set_callback(void *cb_arg, rados_callback_t cb) { + lock.Lock(); + callback = cb; + callback_arg = cb_arg; + lock.Unlock(); + return 0; + } + int wait() { + lock.Lock(); + while (!done) + cond.Wait(lock); + lock.Unlock(); + return 0; + } + int is_complete() { + lock.Lock(); + int r = done; + lock.Unlock(); + return r; + } + int get_return_value() { + lock.Lock(); + int r = rval; + lock.Unlock(); + return r; + } + void get() { + lock.Lock(); + assert(ref > 0); + ref++; + lock.Unlock(); + } + void release() { + lock.Lock(); + assert(!released); + released = true; + put_unlock(); + } + void put() { + lock.Lock(); + put_unlock(); + } + void put_unlock() { + assert(ref > 0); + int n = --ref; + lock.Unlock(); + if (!n) + delete this; + } +}; + class librados::RadosClient : public Dispatcher { public: @@ -476,6 +541,8 @@ public: int pool_change_auid(rados_ioctx_t io, unsigned long long auid); int pool_get_auid(rados_ioctx_t io, unsigned long long *auid); + int pool_change_auid_async(rados_ioctx_t io, unsigned long long auid, PoolAsyncCompletionImpl *c); + int list(Objecter::ListContext *context, int max_entries); int operate(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, bufferlist *pbl); @@ -587,6 +654,39 @@ public: int aio_write_full(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, const bufferlist& bl); + struct C_PoolAsync_Safe : public Context { + PoolAsyncCompletionImpl *c; + void finish(int r) { + c->lock.Lock(); + c->rval = r; + c->done = true; + c->cond.Signal(); + + if (c->callback) { + rados_callback_t cb = c->callback; + void *cb_arg = c->callback_arg; + c->lock.Unlock(); + cb(c, cb_arg); + c->lock.Lock(); + } + + c->put_unlock(); + } + C_PoolAsync_Safe(PoolAsyncCompletionImpl *_c) : c(_c) { + c->get(); + } + }; + + static PoolAsyncCompletionImpl *pool_async_create_completion() { + return new PoolAsyncCompletionImpl; + } + static PoolAsyncCompletionImpl *pool_async_create_completion(void *cb_arg, rados_callback_t cb) { + PoolAsyncCompletionImpl *c = new PoolAsyncCompletionImpl; + if (cb) + c->set_callback(cb_arg, cb); + return c; + } + static AioCompletionImpl *aio_create_completion() { return new AioCompletionImpl; } @@ -1123,6 +1223,17 @@ pool_change_auid(rados_ioctx_t io, unsigned long long auid) return reply; } +int librados::RadosClient:: +pool_change_auid_async(rados_ioctx_t io, unsigned long long auid, PoolAsyncCompletionImpl *c) +{ + int poolID = ((IoCtxImpl *)io)->poolid; + + Mutex::Locker l(lock); + objecter->change_pool_auid(poolID, + new C_PoolAsync_Safe(c), + auid); +} + int librados::RadosClient:: pool_get_auid(rados_ioctx_t io, unsigned long long *auid) { @@ -2287,6 +2398,43 @@ get_next() const librados::ObjectIterator librados::ObjectIterator:: __EndObjectIterator(NULL); +///////////////////////////// PoolAsyncCompletion ////////////////////////////// +int librados::PoolAsyncCompletion::PoolAsyncCompletion:: +set_callback(void *cb_arg, rados_callback_t cb) +{ + PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc; + return c->set_callback(cb_arg, cb); +} + +int librados::PoolAsyncCompletion::PoolAsyncCompletion:: +wait() +{ + PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc; + return c->wait(); +} + +bool librados::PoolAsyncCompletion::PoolAsyncCompletion:: +is_complete() +{ + PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc; + return c->is_complete(); +} + +int librados::PoolAsyncCompletion::PoolAsyncCompletion:: +get_return_value() +{ + PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc; + return c->get_return_value(); +} + +void librados::PoolAsyncCompletion::PoolAsyncCompletion:: +release() +{ + PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc; + c->release(); + delete this; +} + ///////////////////////////// AioCompletion ////////////////////////////// int librados::AioCompletion::AioCompletion:: set_complete_callback(void *cb_arg, rados_callback_t cb) @@ -2408,6 +2556,12 @@ set_auid(uint64_t auid_) return io_ctx_impl->client->pool_change_auid(io_ctx_impl, auid_); } +int librados::IoCtx:: +set_auid_async(uint64_t auid_, PoolAsyncCompletion *c) +{ + return io_ctx_impl->client->pool_change_auid_async(io_ctx_impl, auid_, c->pc); +} + int librados::IoCtx:: get_auid(uint64_t *auid_) { @@ -2912,6 +3066,13 @@ cluster_stat(cluster_stat_t& result) return r; } +librados::PoolAsyncCompletion *librados::Rados:: +pool_async_create_completion() +{ + PoolAsyncCompletionImpl *c = RadosClient::pool_async_create_completion(); + return new PoolAsyncCompletion(c); +} + librados::AioCompletion *librados::Rados:: aio_create_completion() {