Merge remote branch 'origin/next'

This commit is contained in:
Josh Durgin 2013-03-29 12:58:01 -07:00
commit b504e444fc
25 changed files with 680 additions and 143 deletions

View File

@ -185,7 +185,8 @@ Client::Client(Messenger *m, MonClient *mc)
cct->_conf->client_oc_max_objects,
cct->_conf->client_oc_max_dirty,
cct->_conf->client_oc_target_dirty,
cct->_conf->client_oc_max_dirty_age);
cct->_conf->client_oc_max_dirty_age,
true);
filer = new Filer(objecter);
}
@ -2709,11 +2710,7 @@ bool Client::_flush(Inode *in, Context *onfinish)
if (!onfinish) {
onfinish = new C_Client_PutInode(this, in);
}
bool safe = objectcacher->flush_set(&in->oset, onfinish);
if (safe) {
onfinish->complete(0);
}
return safe;
return objectcacher->flush_set(&in->oset, onfinish);
}
void Client::_flush_range(Inode *in, int64_t offset, uint64_t size)

View File

@ -507,6 +507,7 @@ OPTION(rbd_cache_size, OPT_LONGLONG, 32<<20) // cache size in bytes
OPTION(rbd_cache_max_dirty, OPT_LONGLONG, 24<<20) // dirty limit in bytes - set to 0 for write-through caching
OPTION(rbd_cache_target_dirty, OPT_LONGLONG, 16<<20) // target dirty limit in bytes
OPTION(rbd_cache_max_dirty_age, OPT_FLOAT, 1.0) // seconds in cache before writeback starts
OPTION(rbd_cache_block_writes_upfront, OPT_BOOL, false) // whether to block writes to the cache before the aio_write call completes (true), or block before the aio completion is called (false)
OPTION(nss_db_path, OPT_STR, "") // path to nss db

View File

@ -24,7 +24,7 @@ extern "C" {
#endif
#define LIBRADOS_VER_MAJOR 0
#define LIBRADOS_VER_MINOR 49
#define LIBRADOS_VER_MINOR 51
#define LIBRADOS_VER_EXTRA 0
#define LIBRADOS_VERSION(maj, min, extra) ((maj << 16) + (min << 8) + extra)
@ -1446,6 +1446,18 @@ int rados_aio_read(rados_ioctx_t io, const char *oid,
int rados_aio_flush(rados_ioctx_t io);
/**
* Schedule a callback for when all currently pending
* aio writes are safe. This is a non-blocking version of
* rados_aio_flush().
*
* @param io the context to flush
* @param completion what to do when the writes are safe
* @returns 0 on success, negative error code on failure
*/
int rados_aio_flush_async(rados_ioctx_t io, rados_completion_t completion);
/**
* Asynchronously get object stats (size/mtime)
*

View File

@ -457,9 +457,54 @@ namespace librados
int aio_read(const std::string& oid, AioCompletion *c,
bufferlist *pbl, size_t len, uint64_t off);
/**
* Asynchronously read from an object at a particular snapshot
*
* This is the same as normal aio_read, except that it chooses
* the snapshot to read from from its arguments instead of the
* internal IoCtx state.
*
* The return value of the completion will be number of bytes read on
* success, negative error code on failure.
*
* @param oid the name of the object to read from
* @param c what to do when the read is complete
* @param pbl where to store the results
* @param len the number of bytes to read
* @param off the offset to start reading from in the object
* @param snapid the id of the snapshot to read from
* @returns 0 on success, negative error code on failure
*/
int aio_read(const std::string& oid, AioCompletion *c,
bufferlist *pbl, size_t len, uint64_t off, uint64_t snapid);
int aio_sparse_read(const std::string& oid, AioCompletion *c,
std::map<uint64_t,uint64_t> *m, bufferlist *data_bl,
size_t len, uint64_t off);
/**
* Asynchronously read existing extents from an object at a
* particular snapshot
*
* This is the same as normal aio_sparse_read, except that it chooses
* the snapshot to read from from its arguments instead of the
* internal IoCtx state.
*
* m will be filled in with a map of extents in the object,
* mapping offsets to lengths (in bytes) within the range
* requested. The data for all of the extents are stored
* back-to-back in offset order in data_bl.
*
* @param oid the name of the object to read from
* @param c what to do when the read is complete
* @param m where to store the map of extents
* @param data_bl where to store the data
* @param len the number of bytes to read
* @param off the offset to start reading from in the object
* @param snapid the id of the snapshot to read from
* @returns 0 on success, negative error code on failure
*/
int aio_sparse_read(const std::string& oid, AioCompletion *c,
std::map<uint64_t,uint64_t> *m, bufferlist *data_bl,
size_t len, uint64_t off, uint64_t snapid);
int aio_write(const std::string& oid, AioCompletion *c, const bufferlist& bl,
size_t len, uint64_t off);
int aio_append(const std::string& oid, AioCompletion *c, const bufferlist& bl,
@ -484,6 +529,16 @@ namespace librados
int aio_flush();
/**
* Schedule a callback for when all currently pending
* aio writes are safe. This is a non-blocking version of
* aio_flush().
*
* @param c what to do when the writes are safe
* @returns 0 on success, negative error code on failure
*/
int aio_flush_async(AioCompletion *c);
int aio_stat(const std::string& oid, AioCompletion *c, uint64_t *psize, time_t *pmtime);
int aio_exec(const std::string& oid, AioCompletion *c, const char *cls, const char *method,
@ -493,6 +548,23 @@ namespace librados
int operate(const std::string& oid, ObjectWriteOperation *op);
int operate(const std::string& oid, ObjectReadOperation *op, bufferlist *pbl);
int aio_operate(const std::string& oid, AioCompletion *c, ObjectWriteOperation *op);
/**
* Schedule an async write operation with explicit snapshot parameters
*
* This is the same as the first aio_operate(), except that it
* gets the snapshot context from its arguments instead of the
* IoCtx internal state.
*
* @param oid the object to operate on
* @param c what to do when the operation is complete and safe
* @param op which operations to perform
* @param seq latest selfmanaged snapshot sequence number for this object
* @param snaps currently existing selfmanaged snapshot ids for this object
* @returns 0 on success, negative error code on failure
*/
int aio_operate(const std::string& oid, AioCompletion *c,
ObjectWriteOperation *op, snap_t seq,
std::vector<snap_t>& snaps);
int aio_operate(const std::string& oid, AioCompletion *c, ObjectReadOperation *op,
bufferlist *pbl);

View File

@ -31,13 +31,14 @@ extern "C" {
#define LIBRBD_VER_MAJOR 0
#define LIBRBD_VER_MINOR 1
#define LIBRBD_VER_EXTRA 7
#define LIBRBD_VER_EXTRA 8
#define LIBRBD_VERSION(maj, min, extra) ((maj << 16) + (min << 8) + extra)
#define LIBRBD_VERSION_CODE LIBRBD_VERSION(LIBRBD_VER_MAJOR, LIBRBD_VER_MINOR, LIBRBD_VER_EXTRA)
#define LIBRBD_SUPPORTS_WATCH 0
#define LIBRBD_SUPPORTS_AIO_FLUSH 1
typedef void *rbd_snap_t;
typedef void *rbd_image_t;
@ -318,10 +319,20 @@ int rbd_aio_write(rbd_image_t image, uint64_t off, size_t len, const char *buf,
int rbd_aio_read(rbd_image_t image, uint64_t off, size_t len, char *buf, rbd_completion_t c);
int rbd_aio_discard(rbd_image_t image, uint64_t off, uint64_t len, rbd_completion_t c);
int rbd_aio_create_completion(void *cb_arg, rbd_callback_t complete_cb, rbd_completion_t *c);
int rbd_aio_is_complete(rbd_completion_t c);
int rbd_aio_wait_for_complete(rbd_completion_t c);
ssize_t rbd_aio_get_return_value(rbd_completion_t c);
void rbd_aio_release(rbd_completion_t c);
int rbd_flush(rbd_image_t image);
/**
* Start a flush if caching is enabled. Get a callback when
* the currently pending writes are on disk.
*
* @param image the image to flush writes to
* @param c what to call when flushing is complete
* @returns 0 on success, negative error code on failure
*/
int rbd_aio_flush(rbd_image_t image, rbd_completion_t c);
#ifdef __cplusplus
}

View File

@ -63,6 +63,7 @@ public:
struct AioCompletion {
void *pc;
AioCompletion(void *cb_arg, callback_t complete_cb);
bool is_complete();
int wait_for_complete();
ssize_t get_return_value();
void release();
@ -180,6 +181,15 @@ public:
int aio_discard(uint64_t off, uint64_t len, RBD::AioCompletion *c);
int flush();
/**
* Start a flush if caching is enabled. Get a callback when
* the currently pending writes are on disk.
*
* @param image the image to flush writes to
* @param c what to call when flushing is complete
* @returns 0 on success, negative error code on failure
*/
int aio_flush(RBD::AioCompletion *c);
private:
friend class RBD;

View File

@ -196,6 +196,42 @@ struct C_AioSafe : public Context {
}
};
/**
* Fills in all completed request data, and calls both
* complete and safe callbacks if they exist.
*
* Not useful for usual I/O, but for special things like
* flush where we only want to wait for things to be safe,
* but allow users to specify any of the callbacks.
*/
struct C_AioCompleteAndSafe : public Context {
AioCompletionImpl *c;
C_AioCompleteAndSafe(AioCompletionImpl *cc) : c(cc) {
c->ref++;
}
void finish(int r) {
c->rval = r;
c->ack = true;
c->safe = true;
rados_callback_t cb_complete = c->callback_complete;
void *cb_arg = c->callback_arg;
if (cb_complete)
cb_complete(c, cb_arg);
rados_callback_t cb_safe = c->callback_safe;
if (cb_safe)
cb_safe(c, cb_arg);
c->lock.Lock();
c->callback_complete = NULL;
c->callback_safe = NULL;
c->cond.Signal();
c->put_unlock();
}
};
}
#endif

View File

@ -70,22 +70,58 @@ void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl *c)
aio_write_list_lock.Lock();
assert(c->io == this);
c->aio_write_seq = ++aio_write_seq;
ldout(client->cct, 20) << "queue_aio_write " << this << " completion " << c
<< " write_seq " << aio_write_seq << dendl;
aio_write_list.push_back(&c->aio_write_list_item);
aio_write_list_lock.Unlock();
}
void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c)
{
ldout(client->cct, 20) << "complete_aio_write " << c << dendl;
aio_write_list_lock.Lock();
assert(c->io == this);
c->aio_write_list_item.remove_myself();
// queue async flush waiters
map<tid_t, std::list<AioCompletionImpl*> >::iterator waiters =
aio_write_waiters.find(c->aio_write_seq);
if (waiters != aio_write_waiters.end()) {
ldout(client->cct, 20) << "found " << waiters->second.size()
<< " waiters" << dendl;
for (std::list<AioCompletionImpl*>::iterator it = waiters->second.begin();
it != waiters->second.end(); ++it) {
client->finisher.queue(new C_AioCompleteAndSafe(*it));
(*it)->put();
}
aio_write_waiters.erase(waiters);
} else {
ldout(client->cct, 20) << "found no waiters for tid "
<< c->aio_write_seq << dendl;
}
aio_write_cond.Signal();
aio_write_list_lock.Unlock();
put();
}
void librados::IoCtxImpl::flush_aio_writes_async(AioCompletionImpl *c)
{
ldout(client->cct, 20) << "flush_aio_writes_async " << this
<< " completion " << c << dendl;
Mutex::Locker l(aio_write_list_lock);
tid_t seq = aio_write_seq;
ldout(client->cct, 20) << "flush_aio_writes_async waiting on tid "
<< seq << dendl;
if (aio_write_list.empty()) {
client->finisher.queue(new C_AioCompleteAndSafe(c));
} else {
c->get();
aio_write_waiters[seq].push_back(c);
}
}
void librados::IoCtxImpl::flush_aio_writes()
{
ldout(client->cct, 20) << "flush_aio_writes" << dendl;
aio_write_list_lock.Lock();
tid_t seq = aio_write_seq;
while (!aio_write_list.empty() &&
@ -675,7 +711,8 @@ int librados::IoCtxImpl::aio_operate_read(const object_t &oid,
}
int librados::IoCtxImpl::aio_operate(const object_t& oid,
::ObjectOperation *o, AioCompletionImpl *c)
::ObjectOperation *o, AioCompletionImpl *c,
const SnapContext& snap_context)
{
utime_t ut = ceph_clock_now(client->cct);
/* can't write to a snapshot */
@ -689,13 +726,15 @@ int librados::IoCtxImpl::aio_operate(const object_t& oid,
queue_aio_write(c);
Mutex::Locker l(*lock);
objecter->mutate(oid, oloc, *o, snapc, ut, 0, onack, oncommit, &c->objver);
objecter->mutate(oid, oloc, *o, snap_context, ut, 0, onack, oncommit,
&c->objver);
return 0;
}
int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
bufferlist *pbl, size_t len, uint64_t off)
bufferlist *pbl, size_t len, uint64_t off,
uint64_t snapid)
{
if (len > (size_t) INT_MAX)
return -EDOM;
@ -709,13 +748,14 @@ int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
Mutex::Locker l(*lock);
objecter->read(oid, oloc,
off, len, snap_seq, &c->bl, 0,
off, len, snapid, &c->bl, 0,
onack, &c->objver);
return 0;
}
int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
char *buf, size_t len, uint64_t off)
char *buf, size_t len, uint64_t off,
uint64_t snapid)
{
if (len > (size_t) INT_MAX)
return -EDOM;
@ -729,7 +769,7 @@ int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
Mutex::Locker l(*lock);
objecter->read(oid, oloc,
off, len, snap_seq, &c->bl, 0,
off, len, snapid, &c->bl, 0,
onack, &c->objver);
return 0;
@ -739,7 +779,7 @@ int librados::IoCtxImpl::aio_sparse_read(const object_t oid,
AioCompletionImpl *c,
std::map<uint64_t,uint64_t> *m,
bufferlist *data_bl, size_t len,
uint64_t off)
uint64_t off, uint64_t snapid)
{
if (len > (size_t) INT_MAX)
return -EDOM;
@ -752,7 +792,7 @@ int librados::IoCtxImpl::aio_sparse_read(const object_t oid,
Mutex::Locker l(*lock);
objecter->sparse_read(oid, oloc,
off, len, snap_seq, &c->bl, 0,
off, len, snapid, &c->bl, 0,
onack);
return 0;
}

View File

@ -46,6 +46,7 @@ struct librados::IoCtxImpl {
tid_t aio_write_seq;
Cond aio_write_cond;
xlist<AioCompletionImpl*> aio_write_list;
map<tid_t, std::list<AioCompletionImpl*> > aio_write_waiters;
Mutex *lock;
Objecter *objecter;
@ -84,6 +85,7 @@ struct librados::IoCtxImpl {
void queue_aio_write(struct AioCompletionImpl *c);
void complete_aio_write(struct AioCompletionImpl *c);
void flush_aio_writes_async(AioCompletionImpl *c);
void flush_aio_writes();
int64_t get_id() {
@ -136,7 +138,8 @@ struct librados::IoCtxImpl {
int operate(const object_t& oid, ::ObjectOperation *o, time_t *pmtime);
int operate_read(const object_t& oid, ::ObjectOperation *o, bufferlist *pbl);
int aio_operate(const object_t& oid, ::ObjectOperation *o, AioCompletionImpl *c);
int aio_operate(const object_t& oid, ::ObjectOperation *o,
AioCompletionImpl *c, const SnapContext& snap_context);
int aio_operate_read(const object_t& oid, ::ObjectOperation *o, AioCompletionImpl *c, bufferlist *pbl);
struct C_aio_Ack : public Context {
@ -170,12 +173,12 @@ struct librados::IoCtxImpl {
};
int aio_read(const object_t oid, AioCompletionImpl *c,
bufferlist *pbl, size_t len, uint64_t off);
bufferlist *pbl, size_t len, uint64_t off, uint64_t snapid);
int aio_read(object_t oid, AioCompletionImpl *c,
char *buf, size_t len, uint64_t off);
char *buf, size_t len, uint64_t off, uint64_t snapid);
int aio_sparse_read(const object_t oid, AioCompletionImpl *c,
std::map<uint64_t,uint64_t> *m, bufferlist *data_bl,
size_t len, uint64_t off);
size_t len, uint64_t off, uint64_t snapid);
int aio_write(const object_t &oid, AioCompletionImpl *c,
const bufferlist& bl, size_t len, uint64_t off);
int aio_append(const object_t &oid, AioCompletionImpl *c,

View File

@ -855,10 +855,26 @@ int librados::IoCtx::operate(const std::string& oid, librados::ObjectReadOperati
return io_ctx_impl->operate_read(obj, (::ObjectOperation*)o->impl, pbl);
}
int librados::IoCtx::aio_operate(const std::string& oid, AioCompletion *c, librados::ObjectWriteOperation *o)
int librados::IoCtx::aio_operate(const std::string& oid, AioCompletion *c,
librados::ObjectWriteOperation *o)
{
object_t obj(oid);
return io_ctx_impl->aio_operate(obj, (::ObjectOperation*)o->impl, c->pc);
return io_ctx_impl->aio_operate(obj, (::ObjectOperation*)o->impl, c->pc,
io_ctx_impl->snapc);
}
int librados::IoCtx::aio_operate(const std::string& oid, AioCompletion *c,
librados::ObjectWriteOperation *o,
snap_t snap_seq, std::vector<snap_t>& snaps)
{
object_t obj(oid);
vector<snapid_t> snv;
snv.resize(snaps.size());
for (size_t i = 0; i < snaps.size(); ++i)
snv[i] = snaps[i];
SnapContext snapc(snap_seq, snv);
return io_ctx_impl->aio_operate(obj, (::ObjectOperation*)o->impl, c->pc,
snapc);
}
int librados::IoCtx::aio_operate(const std::string& oid, AioCompletion *c, librados::ObjectReadOperation *o, bufferlist *pbl)
@ -956,7 +972,15 @@ uint64_t librados::IoCtx::get_last_version()
int librados::IoCtx::aio_read(const std::string& oid, librados::AioCompletion *c,
bufferlist *pbl, size_t len, uint64_t off)
{
return io_ctx_impl->aio_read(oid, c->pc, pbl, len, off);
return io_ctx_impl->aio_read(oid, c->pc, pbl, len, off,
io_ctx_impl->snap_seq);
}
int librados::IoCtx::aio_read(const std::string& oid, librados::AioCompletion *c,
bufferlist *pbl, size_t len, uint64_t off,
uint64_t snapid)
{
return io_ctx_impl->aio_read(oid, c->pc, pbl, len, off, snapid);
}
int librados::IoCtx::aio_exec(const std::string& oid,
@ -973,7 +997,16 @@ int librados::IoCtx::aio_sparse_read(const std::string& oid, librados::AioComple
size_t len, uint64_t off)
{
return io_ctx_impl->aio_sparse_read(oid, c->pc,
m, data_bl, len, off);
m, data_bl, len, off,
io_ctx_impl->snap_seq);
}
int librados::IoCtx::aio_sparse_read(const std::string& oid, librados::AioCompletion *c,
std::map<uint64_t,uint64_t> *m, bufferlist *data_bl,
size_t len, uint64_t off, uint64_t snapid)
{
return io_ctx_impl->aio_sparse_read(oid, c->pc,
m, data_bl, len, off, snapid);
}
int librados::IoCtx::aio_write(const std::string& oid, librados::AioCompletion *c,
@ -1000,6 +1033,12 @@ int librados::IoCtx::aio_remove(const std::string& oid, librados::AioCompletion
return io_ctx_impl->aio_remove(oid, c->pc);
}
int librados::IoCtx::aio_flush_async(librados::AioCompletion *c)
{
io_ctx_impl->flush_aio_writes_async(c->pc);
return 0;
}
int librados::IoCtx::aio_flush()
{
io_ctx_impl->flush_aio_writes();
@ -2183,7 +2222,7 @@ extern "C" int rados_aio_read(rados_ioctx_t io, const char *o,
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
return ctx->aio_read(oid, (librados::AioCompletionImpl*)completion,
buf, len, off);
buf, len, off, ctx->snap_seq);
}
extern "C" int rados_aio_write(rados_ioctx_t io, const char *o,
@ -2229,6 +2268,14 @@ extern "C" int rados_aio_remove(rados_ioctx_t io, const char *o,
return ctx->aio_remove(oid, (librados::AioCompletionImpl*)completion);
}
extern "C" int rados_aio_flush_async(rados_ioctx_t io,
rados_completion_t completion)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
ctx->flush_aio_writes_async((librados::AioCompletionImpl*)completion);
return 0;
}
extern "C" int rados_aio_flush(rados_ioctx_t io)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;

View File

@ -24,6 +24,7 @@ namespace librbd {
AIO_TYPE_READ = 0,
AIO_TYPE_WRITE,
AIO_TYPE_DISCARD,
AIO_TYPE_FLUSH,
AIO_TYPE_NONE,
} aio_type_t;
@ -104,12 +105,14 @@ namespace librbd {
complete_cb(rbd_comp, complete_arg);
}
switch (aio_type) {
case AIO_TYPE_READ:
case AIO_TYPE_READ:
ictx->perfcounter->tinc(l_librbd_aio_rd_latency, elapsed); break;
case AIO_TYPE_WRITE:
ictx->perfcounter->tinc(l_librbd_aio_wr_latency, elapsed); break;
case AIO_TYPE_DISCARD:
ictx->perfcounter->tinc(l_librbd_aio_discard_latency, elapsed); break;
case AIO_TYPE_FLUSH:
ictx->perfcounter->tinc(l_librbd_aio_flush_latency, elapsed); break;
default:
lderr(ictx->cct) << "completed invalid aio_type: " << aio_type << dendl;
break;
@ -125,6 +128,11 @@ namespace librbd {
void complete_request(CephContext *cct, ssize_t r);
bool is_complete() {
Mutex::Locker l(lock);
return done;
}
ssize_t get_return_value() {
lock.Lock();
ssize_t r = rval;

View File

@ -19,7 +19,7 @@
namespace librbd {
AioRequest::AioRequest() :
m_ictx(NULL),
m_ictx(NULL), m_ioctx(NULL),
m_object_no(0), m_object_off(0), m_object_len(0),
m_snap_id(CEPH_NOSNAP), m_completion(NULL), m_parent_completion(NULL),
m_hide_enoent(false) {}
@ -27,19 +27,11 @@ namespace librbd {
uint64_t objectno, uint64_t off, uint64_t len,
librados::snap_t snap_id,
Context *completion,
bool hide_enoent) {
m_ictx = ictx;
m_ioctx.dup(ictx->data_ctx);
m_ioctx.snap_set_read(snap_id);
m_oid = oid;
m_object_no = objectno;
m_object_off = off;
m_object_len = len;
m_snap_id = snap_id;
m_completion = completion;
m_parent_completion = NULL;
m_hide_enoent = hide_enoent;
}
bool hide_enoent) :
m_ictx(ictx), m_ioctx(&ictx->data_ctx), m_oid(oid), m_object_no(objectno),
m_object_off(off), m_object_len(len), m_snap_id(snap_id),
m_completion(completion), m_parent_completion(NULL),
m_hide_enoent(hide_enoent) {}
AioRequest::~AioRequest() {
if (m_parent_completion) {
@ -100,11 +92,13 @@ namespace librbd {
librados::Rados::aio_create_completion(this, rados_req_cb, NULL);
int r;
if (m_sparse) {
r = m_ioctx.aio_sparse_read(m_oid, rados_completion, &m_ext_map,
&m_read_data, m_object_len, m_object_off);
r = m_ioctx->aio_sparse_read(m_oid, rados_completion, &m_ext_map,
&m_read_data, m_object_len, m_object_off,
m_snap_id);
} else {
r = m_ioctx.aio_read(m_oid, rados_completion, &m_read_data,
m_object_len, m_object_off);
r = m_ioctx->aio_read(m_oid, rados_completion, &m_read_data,
m_object_len, m_object_off,
m_snap_id);
}
rados_completion->release();
return r;
@ -123,18 +117,16 @@ namespace librbd {
Context *completion,
bool hide_enoent)
: AioRequest(ictx, oid, object_no, object_off, len, snap_id, completion, hide_enoent),
m_state(LIBRBD_AIO_WRITE_FLAT)
m_state(LIBRBD_AIO_WRITE_FLAT), m_snap_seq(snapc.seq.val)
{
m_object_image_extents = objectx;
m_parent_overlap = object_overlap;
// TODO: find a way to make this less stupid
std::vector<librados::snap_t> snaps;
for (std::vector<snapid_t>::const_iterator it = snapc.snaps.begin();
it != snapc.snaps.end(); ++it) {
snaps.push_back(it->val);
m_snaps.push_back(it->val);
}
m_ioctx.selfmanaged_snap_set_write_ctx(snapc.seq.val, snaps);
}
void AbstractWrite::guard_write()
@ -235,7 +227,8 @@ namespace librbd {
librados::Rados::aio_create_completion(this, NULL, rados_req_cb);
int r;
assert(m_write.size());
r = m_ioctx.aio_operate(m_oid, rados_completion, &m_write);
r = m_ioctx->aio_operate(m_oid, rados_completion, &m_write,
m_snap_seq, m_snaps);
rados_completion->release();
return r;
}
@ -246,7 +239,8 @@ namespace librbd {
librados::AioCompletion *rados_completion =
librados::Rados::aio_create_completion(this, NULL, rados_req_cb);
m_ictx->md_ctx.aio_operate(m_oid, rados_completion, &m_copyup);
m_ictx->md_ctx.aio_operate(m_oid, rados_completion, &m_copyup,
m_snap_seq, m_snaps);
rados_completion->release();
}
}

View File

@ -49,7 +49,7 @@ namespace librbd {
void read_from_parent(vector<pair<uint64_t,uint64_t> >& image_extents);
ImageCtx *m_ictx;
librados::IoCtx m_ioctx;
librados::IoCtx *m_ioctx;
std::string m_oid;
uint64_t m_object_no, m_object_off, m_object_len;
librados::snap_t m_snap_id;
@ -69,7 +69,6 @@ namespace librbd {
: AioRequest(ictx, oid, objectno, offset, len, snap_id, completion, false),
m_buffer_extents(be),
m_tried_parent(false), m_sparse(sparse) {
m_ioctx.snap_set_read(m_snap_id);
}
virtual ~AioRead() {}
virtual bool should_complete(int r);
@ -138,6 +137,8 @@ namespace librbd {
uint64_t m_parent_overlap;
librados::ObjectWriteOperation m_write;
librados::ObjectWriteOperation m_copyup;
uint64_t m_snap_seq;
std::vector<librados::snap_t> m_snaps;
private:
void send_copyup();

View File

@ -89,7 +89,8 @@ namespace librbd {
10, /* reset this in init */
init_max_dirty,
cct->_conf->rbd_cache_target_dirty,
cct->_conf->rbd_cache_max_dirty_age);
cct->_conf->rbd_cache_max_dirty_age,
cct->_conf->rbd_cache_block_writes_upfront);
object_set = new ObjectCacher::ObjectSet(NULL, data_ctx.get_id(), 0);
object_set->return_enoent = true;
object_cacher->start();
@ -219,6 +220,8 @@ namespace librbd {
plb.add_u64_counter(l_librbd_aio_discard, "aio_discard");
plb.add_u64_counter(l_librbd_aio_discard_bytes, "aio_discard_bytes");
plb.add_time_avg(l_librbd_aio_discard_latency, "aio_discard_latency");
plb.add_u64_counter(l_librbd_aio_flush, "aio_flush");
plb.add_time_avg(l_librbd_aio_flush_latency, "aio_flush_latency");
plb.add_u64_counter(l_librbd_snap_create, "snap_create");
plb.add_u64_counter(l_librbd_snap_remove, "snap_remove");
plb.add_u64_counter(l_librbd_snap_rollback, "snap_rollback");
@ -472,7 +475,7 @@ namespace librbd {
}
void ImageCtx::write_to_cache(object_t o, bufferlist& bl, size_t len,
uint64_t off) {
uint64_t off, Context *onfinish) {
snap_lock.get_read();
ObjectCacher::OSDWrite *wr = object_cacher->prepare_write(snapc, bl,
utime_t(), 0);
@ -483,7 +486,7 @@ namespace librbd {
wr->extents.push_back(extent);
{
Mutex::Locker l(cache_lock);
object_cacher->writex(wr, object_set, cache_lock);
object_cacher->writex(wr, object_set, cache_lock, onfinish);
}
}
@ -521,24 +524,26 @@ namespace librbd {
}
}
void ImageCtx::flush_cache_aio(Context *onfinish) {
cache_lock.Lock();
object_cacher->flush_set(object_set, onfinish);
cache_lock.Unlock();
}
int ImageCtx::flush_cache() {
int r = 0;
Mutex mylock("librbd::ImageCtx::flush_cache");
Cond cond;
bool done;
Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
cache_lock.Lock();
bool already_flushed = object_cacher->flush_set(object_set, onfinish);
cache_lock.Unlock();
if (!already_flushed) {
mylock.Lock();
while (!done) {
ldout(cct, 20) << "waiting for cache to be flushed" << dendl;
cond.Wait(mylock);
}
mylock.Unlock();
ldout(cct, 20) << "finished flushing cache" << dendl;
flush_cache_aio(onfinish);
mylock.Lock();
while (!done) {
ldout(cct, 20) << "waiting for cache to be flushed" << dendl;
cond.Wait(mylock);
}
mylock.Unlock();
ldout(cct, 20) << "finished flushing cache" << dendl;
return r;
}

View File

@ -128,9 +128,11 @@ namespace librbd {
uint64_t *overlap) const;
void aio_read_from_cache(object_t o, bufferlist *bl, size_t len,
uint64_t off, Context *onfinish);
void write_to_cache(object_t o, bufferlist& bl, size_t len, uint64_t off);
void write_to_cache(object_t o, bufferlist& bl, size_t len, uint64_t off,
Context *onfinish);
int read_from_cache(object_t o, bufferlist *bl, size_t len, uint64_t off);
void user_flushed();
void flush_cache_aio(Context *onfinish);
int flush_cache();
void shutdown_cache();
void invalidate_cache();

View File

@ -1898,6 +1898,12 @@ reprotect_and_return_err:
// ignore return value, since we may be set to a non-existent
// snapshot and the user is trying to fix that
ictx_check(ictx);
if (ictx->object_cacher) {
// complete pending writes before we're set to a snapshot and
// get -EROFS for writes
RWLock::WLocker l(ictx->md_lock);
ictx->flush_cache();
}
return _snap_set(ictx, snap_name);
}
@ -2435,6 +2441,12 @@ reprotect_and_return_err:
req->complete(rados_aio_get_return_value(c));
}
void rados_ctx_cb(rados_completion_t c, void *arg)
{
Context *comp = reinterpret_cast<Context *>(arg);
comp->complete(rados_aio_get_return_value(c));
}
// validate extent against image size; clip to image size if necessary
int clip_io(ImageCtx *ictx, uint64_t off, uint64_t *len)
{
@ -2463,6 +2475,36 @@ reprotect_and_return_err:
return 0;
}
int aio_flush(ImageCtx *ictx, AioCompletion *c)
{
CephContext *cct = ictx->cct;
ldout(cct, 20) << "aio_flush " << ictx << " completion " << c << dendl;
int r = ictx_check(ictx);
if (r < 0)
return r;
ictx->user_flushed();
c->get();
c->add_request();
c->init_time(ictx, AIO_TYPE_FLUSH);
C_AioWrite *req_comp = new C_AioWrite(cct, c);
if (ictx->object_cacher) {
ictx->flush_cache_aio(req_comp);
} else {
librados::AioCompletion *rados_completion =
librados::Rados::aio_create_completion(req_comp, NULL, rados_ctx_cb);
ictx->data_ctx.aio_flush_async(rados_completion);
rados_completion->release();
}
c->finish_adding_requests(cct);
c->put();
ictx->perfcounter->inc(l_librbd_aio_flush);
return 0;
}
int flush(ImageCtx *ictx)
{
CephContext *cct = ictx->cct;
@ -2544,9 +2586,10 @@ reprotect_and_return_err:
bl.append(buf + q->first, q->second);
}
C_AioWrite *req_comp = new C_AioWrite(cct, c);
if (ictx->object_cacher) {
// may block
ictx->write_to_cache(p->oid, bl, p->length, p->offset);
c->add_request();
ictx->write_to_cache(p->oid, bl, p->length, p->offset, req_comp);
} else {
// reverse map this object extent onto the parent
vector<pair<uint64_t,uint64_t> > objectx;
@ -2555,7 +2598,6 @@ reprotect_and_return_err:
objectx);
uint64_t object_overlap = ictx->prune_parent_extents(objectx, overlap);
C_AioWrite *req_comp = new C_AioWrite(cct, c);
AioWrite *req = new AioWrite(ictx, p->oid.name, p->objectno, p->offset,
objectx, object_overlap,
bl, snapc, snap_id, req_comp);

View File

@ -37,6 +37,8 @@ enum {
l_librbd_aio_discard,
l_librbd_aio_discard_bytes,
l_librbd_aio_discard_latency,
l_librbd_aio_flush,
l_librbd_aio_flush_latency,
l_librbd_snap_create,
l_librbd_snap_remove,
@ -177,6 +179,7 @@ namespace librbd {
char *buf, bufferlist *pbl, AioCompletion *c);
int aio_read(ImageCtx *ictx, const vector<pair<uint64_t,uint64_t> >& image_extents,
char *buf, bufferlist *pbl, AioCompletion *c);
int aio_flush(ImageCtx *ictx, AioCompletion *c);
int flush(ImageCtx *ictx);
int _flush(ImageCtx *ictx);

View File

@ -181,6 +181,12 @@ namespace librbd {
c->rbd_comp = this;
}
bool RBD::AioCompletion::is_complete()
{
librbd::AioCompletion *c = (librbd::AioCompletion *)pc;
return c->is_complete();
}
int RBD::AioCompletion::wait_for_complete()
{
librbd::AioCompletion *c = (librbd::AioCompletion *)pc;
@ -475,6 +481,12 @@ namespace librbd {
return librbd::flush(ictx);
}
int Image::aio_flush(RBD::AioCompletion *c)
{
ImageCtx *ictx = (ImageCtx *)ctx;
return librbd::aio_flush(ictx, (librbd::AioCompletion *)c->pc);
}
} // namespace librbd
extern "C" void rbd_version(int *major, int *minor, int *extra)
@ -1060,6 +1072,19 @@ extern "C" int rbd_flush(rbd_image_t image)
return librbd::flush(ictx);
}
extern "C" int rbd_aio_flush(rbd_image_t image, rbd_completion_t c)
{
librbd::ImageCtx *ictx = (librbd::ImageCtx *)image;
librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;
return librbd::aio_flush(ictx, (librbd::AioCompletion *)comp->pc);
}
extern "C" int rbd_aio_is_complete(rbd_completion_t c)
{
librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;
return comp->is_complete();
}
extern "C" int rbd_aio_wait_for_complete(rbd_completion_t c)
{
librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;

View File

@ -49,6 +49,7 @@ void PGPool::update(OSDMapRef map)
assert(pi);
info = *pi;
auid = pi->auid;
name = map->get_pool_name(id);
if (pi->get_snap_epoch() == map->get_epoch()) {
pi->build_removed_snaps(newly_removed_snaps);
newly_removed_snaps.subtract(cached_removed_snaps);

View File

@ -116,7 +116,7 @@ struct PGRecoveryStats {
};
struct PGPool {
int id;
int64_t id;
string name;
uint64_t auid;
@ -126,7 +126,7 @@ struct PGPool {
interval_set<snapid_t> cached_removed_snaps; // current removed_snaps set
interval_set<snapid_t> newly_removed_snaps; // newly removed in the last epoch
PGPool(int i, const char *_name, uint64_t au) :
PGPool(int64_t i, const char *_name, uint64_t au) :
id(i), auid(au) {
if (_name)
name = _name;

View File

@ -492,22 +492,26 @@ ObjectCacher::ObjectCacher(CephContext *cct_, string name, WritebackHandler& wb,
flush_set_callback_t flush_callback,
void *flush_callback_arg,
uint64_t max_bytes, uint64_t max_objects,
uint64_t max_dirty, uint64_t target_dirty, double max_dirty_age)
uint64_t max_dirty, uint64_t target_dirty,
double max_dirty_age, bool block_writes_upfront)
: perfcounter(NULL),
cct(cct_), writeback_handler(wb), name(name), lock(l),
max_dirty(max_dirty), target_dirty(target_dirty),
max_size(max_bytes), max_objects(max_objects),
block_writes_upfront(block_writes_upfront),
flush_set_callback(flush_callback), flush_set_callback_arg(flush_callback_arg),
flusher_stop(false), flusher_thread(this),
flusher_stop(false), flusher_thread(this), finisher(cct),
stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0),
stat_error(0), stat_dirty_waiting(0)
{
this->max_dirty_age.set_from_double(max_dirty_age);
perf_start();
finisher.start();
}
ObjectCacher::~ObjectCacher()
{
finisher.stop();
perf_stop();
// we should be empty.
for (vector<hash_map<sobject_t, Object *> >::iterator i = objects.begin();
@ -1206,7 +1210,8 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
}
int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock)
int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock,
Context *onfreespace)
{
assert(lock.is_locked());
utime_t now = ceph_clock_now(cct);
@ -1273,53 +1278,83 @@ int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock)
}
}
int r = _wait_for_write(wr, bytes_written, oset, wait_on_lock);
int r = _wait_for_write(wr, bytes_written, oset, wait_on_lock, onfreespace);
delete wr;
//verify_stats();
trim();
return r;
}
// blocking wait for write.
int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Mutex& lock)
void ObjectCacher::C_WaitForWrite::finish(int r)
{
Mutex::Locker l(m_oc->lock);
m_oc->maybe_wait_for_writeback(m_len);
m_onfinish->complete(r);
}
void ObjectCacher::maybe_wait_for_writeback(uint64_t len)
{
assert(lock.is_locked());
int blocked = 0;
utime_t start = ceph_clock_now(cct);
int blocked = 0;
// wait for writeback?
// - wait for dirty and tx bytes (relative to the max_dirty threshold)
// - do not wait for bytes other waiters are waiting on. this means that
// threads do not wait for each other. this effectively allows the cache
// size to balloon proportional to the data that is in flight.
while (get_stat_dirty() + get_stat_tx() >= max_dirty + get_stat_dirty_waiting()) {
ldout(cct, 10) << __func__ << " waiting for dirty|tx "
<< (get_stat_dirty() + get_stat_tx()) << " >= max "
<< max_dirty << " + dirty_waiting "
<< get_stat_dirty_waiting() << dendl;
flusher_cond.Signal();
stat_dirty_waiting += len;
stat_cond.Wait(lock);
stat_dirty_waiting -= len;
++blocked;
ldout(cct, 10) << __func__ << " woke up" << dendl;
}
if (blocked && perfcounter) {
perfcounter->inc(l_objectcacher_write_ops_blocked);
perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
utime_t blocked = ceph_clock_now(cct) - start;
perfcounter->tinc(l_objectcacher_write_time_blocked, blocked);
}
}
// blocking wait for write.
int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Mutex& lock, Context *onfreespace)
{
assert(lock.is_locked());
int ret = 0;
if (max_dirty > 0) {
// wait for writeback?
// - wait for dirty and tx bytes (relative to the max_dirty threshold)
// - do not wait for bytes other waiters are waiting on. this means that
// threads do not wait for each other. this effectively allows the cache size
// to balloon proportional to the data that is in flight.
while (get_stat_dirty() + get_stat_tx() >= max_dirty + get_stat_dirty_waiting()) {
ldout(cct, 10) << "wait_for_write waiting on " << len << ", dirty|tx "
<< (get_stat_dirty() + get_stat_tx())
<< " >= max " << max_dirty << " + dirty_waiting " << get_stat_dirty_waiting()
<< dendl;
flusher_cond.Signal();
stat_dirty_waiting += len;
stat_cond.Wait(lock);
stat_dirty_waiting -= len;
blocked++;
ldout(cct, 10) << "wait_for_write woke up" << dendl;
if (block_writes_upfront) {
maybe_wait_for_writeback(len);
if (onfreespace)
onfreespace->complete(0);
} else {
assert(onfreespace);
finisher.queue(new C_WaitForWrite(this, len, onfreespace));
}
} else {
// write-thru! flush what we just wrote.
Cond cond;
bool done;
C_Cond *fin = new C_Cond(&cond, &done, &ret);
Context *fin = block_writes_upfront ?
new C_Cond(&cond, &done, &ret) : onfreespace;
assert(fin);
bool flushed = flush_set(oset, wr->extents, fin);
assert(!flushed); // we just dirtied it, and didn't drop our lock!
ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len << " bytes" << dendl;
while (!done)
cond.Wait(lock);
ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl;
if (block_writes_upfront) {
while (!done)
cond.Wait(lock);
ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl;
if (onfreespace)
onfreespace->complete(ret);
}
}
// start writeback anyway?
@ -1328,12 +1363,6 @@ int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, M
<< target_dirty << ", nudging flusher" << dendl;
flusher_cond.Signal();
}
if (blocked && perfcounter) {
perfcounter->inc(l_objectcacher_write_ops_blocked);
perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
utime_t blocked = ceph_clock_now(cct) - start;
perfcounter->tinc(l_objectcacher_write_time_blocked, blocked);
}
return ret;
}
@ -1471,42 +1500,41 @@ bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length)
bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
{
assert(lock.is_locked());
assert(onfinish != NULL);
if (oset->objects.empty()) {
ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
delete onfinish;
onfinish->complete(0);
return true;
}
ldout(cct, 10) << "flush_set " << oset << dendl;
// we'll need to wait for all objects to flush!
C_GatherBuilder gather(cct, onfinish);
C_GatherBuilder gather(cct);
bool safe = true;
for (xlist<Object*>::iterator i = oset->objects.begin();
!i.end(); ++i) {
Object *ob = *i;
if (!flush(ob, 0, 0)) {
// we'll need to gather...
safe = false;
ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
<< ob->last_write_tid
<< " on " << *ob
<< dendl;
if (onfinish != NULL)
ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
}
}
if (onfinish != NULL)
if (gather.has_subs()) {
gather.set_finisher(onfinish);
gather.activate();
if (safe) {
return false;
} else {
ldout(cct, 10) << "flush_set " << oset << " has no dirty|tx bhs" << dendl;
onfinish->complete(0);
return true;
}
return false;
}
// flush. non-blocking, takes callback.
@ -1514,18 +1542,18 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv, Context *onfinish)
{
assert(lock.is_locked());
assert(onfinish != NULL);
if (oset->objects.empty()) {
ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
delete onfinish;
onfinish->complete(0);
return true;
}
ldout(cct, 10) << "flush_set " << oset << " on " << exv.size() << " ObjectExtents" << dendl;
// we'll need to wait for all objects to flush!
C_GatherBuilder gather(cct, onfinish);
C_GatherBuilder gather(cct);
bool safe = true;
for (vector<ObjectExtent>::iterator p = exv.begin();
p != exv.end();
++p) {
@ -1539,22 +1567,21 @@ bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv, Context
if (!flush(ob, ex.offset, ex.length)) {
// we'll need to gather...
safe = false;
ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
<< ob->last_write_tid << " on " << *ob << dendl;
if (onfinish != NULL)
ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
}
}
if (onfinish != NULL)
if (gather.has_subs()) {
gather.set_finisher(onfinish);
gather.activate();
if (safe) {
return false;
} else {
ldout(cct, 10) << "flush_set " << oset << " has no dirty|tx bhs" << dendl;
onfinish->complete(0);
return true;
}
return false;
}
void ObjectCacher::purge_set(ObjectSet *oset)

View File

@ -9,6 +9,7 @@
#include "include/xlist.h"
#include "common/Cond.h"
#include "common/Finisher.h"
#include "common/Thread.h"
#include "Objecter.h"
@ -327,6 +328,7 @@ class ObjectCacher {
int64_t max_dirty, target_dirty, max_size, max_objects;
utime_t max_dirty_age;
bool block_writes_upfront;
flush_set_callback_t flush_set_callback;
void *flush_set_callback_arg;
@ -349,7 +351,8 @@ class ObjectCacher {
return 0;
}
} flusher_thread;
Finisher finisher;
// objects
Object *get_object_maybe(sobject_t oid, object_locator_t &l) {
@ -495,6 +498,17 @@ class ObjectCacher {
}
};
class C_WaitForWrite : public Context {
public:
C_WaitForWrite(ObjectCacher *oc, uint64_t len, Context *onfinish) :
m_oc(oc), m_len(len), m_onfinish(onfinish) {}
void finish(int r);
private:
ObjectCacher *m_oc;
uint64_t m_len;
Context *m_onfinish;
};
void perf_start();
void perf_stop();
@ -504,7 +518,8 @@ class ObjectCacher {
flush_set_callback_t flush_callback,
void *flush_callback_arg,
uint64_t max_bytes, uint64_t max_objects,
uint64_t max_dirty, uint64_t target_dirty, double max_age);
uint64_t max_dirty, uint64_t target_dirty, double max_age,
bool block_writes_upfront);
~ObjectCacher();
void start() {
@ -549,13 +564,16 @@ class ObjectCacher {
* the return value is total bytes read
*/
int readx(OSDRead *rd, ObjectSet *oset, Context *onfinish);
int writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock);
int writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock,
Context *onfreespace);
bool is_cached(ObjectSet *oset, vector<ObjectExtent>& extents, snapid_t snapid);
private:
// write blocking
int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Mutex& lock);
int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Mutex& lock,
Context *onfreespace);
void maybe_wait_for_writeback(uint64_t len);
public:
bool set_is_cached(ObjectSet *oset);
bool set_is_dirty_or_committing(ObjectSet *oset);
@ -624,7 +642,7 @@ public:
Mutex& wait_on_lock) {
OSDWrite *wr = prepare_write(snapc, bl, mtime, flags);
Striper::file_to_extents(cct, oset->ino, layout, offset, len, wr->extents);
return writex(wr, oset, wait_on_lock);
return writex(wr, oset, wait_on_lock, NULL);
}
bool file_flush(ObjectSet *oset, ceph_file_layout *layout, const SnapContext& snapc,

View File

@ -677,6 +677,86 @@ TEST(LibRadosAio, FlushPP) {
delete my_completion2;
}
TEST(LibRadosAio, FlushAsync) {
AioTestData test_data;
rados_completion_t my_completion;
ASSERT_EQ("", test_data.init());
ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
set_completion_complete, set_completion_safe, &my_completion));
rados_completion_t flush_completion;
ASSERT_EQ(0, rados_aio_create_completion(NULL, NULL, NULL, &flush_completion));
char buf[128];
memset(buf, 0xee, sizeof(buf));
ASSERT_EQ(0, rados_aio_write(test_data.m_ioctx, "foo",
my_completion, buf, sizeof(buf), 0));
ASSERT_EQ(0, rados_aio_flush_async(test_data.m_ioctx, flush_completion));
{
TestAlarm alarm;
ASSERT_EQ(0, rados_aio_wait_for_complete(flush_completion));
ASSERT_EQ(0, rados_aio_wait_for_safe(flush_completion));
}
ASSERT_EQ(1, rados_aio_is_complete(my_completion));
ASSERT_EQ(1, rados_aio_is_safe(my_completion));
ASSERT_EQ(1, rados_aio_is_complete(flush_completion));
ASSERT_EQ(1, rados_aio_is_safe(flush_completion));
char buf2[128];
memset(buf2, 0, sizeof(buf2));
rados_completion_t my_completion2;
ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
set_completion_complete, set_completion_safe, &my_completion2));
ASSERT_EQ(0, rados_aio_read(test_data.m_ioctx, "foo",
my_completion2, buf2, sizeof(buf2), 0));
{
TestAlarm alarm;
ASSERT_EQ(0, rados_aio_wait_for_complete(my_completion2));
}
ASSERT_EQ(0, memcmp(buf, buf2, sizeof(buf)));
rados_aio_release(my_completion);
rados_aio_release(my_completion2);
rados_aio_release(flush_completion);
}
TEST(LibRadosAio, FlushAsyncPP) {
AioTestDataPP test_data;
ASSERT_EQ("", test_data.init());
AioCompletion *my_completion = test_data.m_cluster.aio_create_completion(
(void*)&test_data, set_completion_complete, set_completion_safe);
AioCompletion *flush_completion =
test_data.m_cluster.aio_create_completion(NULL, NULL, NULL);
AioCompletion *my_completion_null = NULL;
ASSERT_NE(my_completion, my_completion_null);
char buf[128];
memset(buf, 0xee, sizeof(buf));
bufferlist bl1;
bl1.append(buf, sizeof(buf));
ASSERT_EQ(0, test_data.m_ioctx.aio_write("foo", my_completion,
bl1, sizeof(buf), 0));
ASSERT_EQ(0, test_data.m_ioctx.aio_flush_async(flush_completion));
{
TestAlarm alarm;
ASSERT_EQ(0, flush_completion->wait_for_complete());
ASSERT_EQ(0, flush_completion->wait_for_safe());
}
ASSERT_EQ(1, my_completion->is_complete());
ASSERT_EQ(1, my_completion->is_safe());
ASSERT_EQ(1, flush_completion->is_complete());
ASSERT_EQ(1, flush_completion->is_safe());
bufferlist bl2;
AioCompletion *my_completion2 = test_data.m_cluster.aio_create_completion(
(void*)&test_data, set_completion_complete, set_completion_safe);
ASSERT_NE(my_completion2, my_completion_null);
ASSERT_EQ(0, test_data.m_ioctx.aio_read("foo", my_completion2,
&bl2, sizeof(buf), 0));
{
TestAlarm alarm;
ASSERT_EQ(0, my_completion2->wait_for_complete());
}
ASSERT_EQ(0, memcmp(buf, bl2.c_str(), sizeof(buf)));
delete my_completion;
delete my_completion2;
delete flush_completion;
}
TEST(LibRadosAio, RoundTripWriteFull) {
AioTestData test_data;
rados_completion_t my_completion, my_completion2, my_completion3;

View File

@ -1391,3 +1391,106 @@ TEST(LibRBD, LockingPP)
ioctx.close();
ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
}
TEST(LibRBD, FlushAio)
{
rados_t cluster;
rados_ioctx_t ioctx;
string pool_name = get_temp_pool_name();
ASSERT_EQ("", create_one_pool(pool_name, &cluster));
rados_ioctx_create(cluster, pool_name.c_str(), &ioctx);
rbd_image_t image;
int order = 0;
const char *name = "testimg";
uint64_t size = 2 << 20;
size_t num_aios = 256;
ASSERT_EQ(0, create_image(ioctx, name, size, &order));
ASSERT_EQ(0, rbd_open(ioctx, name, &image, NULL));
char test_data[TEST_IO_SIZE + 1];
size_t i;
for (i = 0; i < TEST_IO_SIZE; ++i) {
test_data[i] = (char) (rand() % (126 - 33) + 33);
}
rbd_completion_t write_comps[num_aios];
for (i = 0; i < num_aios; ++i) {
ASSERT_EQ(0, rbd_aio_create_completion(NULL, NULL, &write_comps[i]));
uint64_t offset = rand() % (size - TEST_IO_SIZE);
ASSERT_EQ(0, rbd_aio_write(image, offset, TEST_IO_SIZE, test_data,
write_comps[i]));
}
rbd_completion_t flush_comp;
ASSERT_EQ(0, rbd_aio_create_completion(NULL, NULL, &flush_comp));
ASSERT_EQ(0, rbd_aio_flush(image, flush_comp));
ASSERT_EQ(0, rbd_aio_wait_for_complete(flush_comp));
ASSERT_EQ(1, rbd_aio_is_complete(flush_comp));
rbd_aio_release(flush_comp);
for (i = 0; i < num_aios; ++i) {
ASSERT_EQ(1, rbd_aio_is_complete(write_comps[i]));
rbd_aio_release(write_comps[i]);
}
ASSERT_EQ(0, rbd_close(image));
ASSERT_EQ(0, rbd_remove(ioctx, name));
rados_ioctx_destroy(ioctx);
ASSERT_EQ(0, destroy_one_pool(pool_name, &cluster));
}
TEST(LibRBD, FlushAioPP)
{
librados::Rados rados;
librados::IoCtx ioctx;
string pool_name = get_temp_pool_name();
ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
{
librbd::RBD rbd;
librbd::Image image;
int order = 0;
const char *name = "testimg";
uint64_t size = 2 << 20;
size_t num_aios = 256;
ASSERT_EQ(0, create_image_pp(rbd, ioctx, name, size, &order));
ASSERT_EQ(0, rbd.open(ioctx, image, name, NULL));
char test_data[TEST_IO_SIZE + 1];
size_t i;
for (i = 0; i < TEST_IO_SIZE; ++i) {
test_data[i] = (char) (rand() % (126 - 33) + 33);
}
librbd::RBD::AioCompletion *write_comps[num_aios];
for (i = 0; i < num_aios; ++i) {
ceph::bufferlist bl;
bl.append(test_data, strlen(test_data));
write_comps[i] = new librbd::RBD::AioCompletion(NULL, NULL);
uint64_t offset = rand() % (size - TEST_IO_SIZE);
ASSERT_EQ(0, image.aio_write(offset, TEST_IO_SIZE, bl,
write_comps[i]));
}
librbd::RBD::AioCompletion *flush_comp =
new librbd::RBD::AioCompletion(NULL, NULL);
ASSERT_EQ(0, image.aio_flush(flush_comp));
ASSERT_EQ(0, flush_comp->wait_for_complete());
ASSERT_EQ(1, flush_comp->is_complete());
delete flush_comp;
for (i = 0; i < num_aios; ++i) {
librbd::RBD::AioCompletion *comp = write_comps[i];
ASSERT_EQ(1, comp->is_complete());
delete comp;
}
}
ioctx.close();
ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
}

View File

@ -61,7 +61,8 @@ int stress_test(uint64_t num_ops, uint64_t num_objs,
g_conf->client_oc_max_objects,
g_conf->client_oc_max_dirty,
g_conf->client_oc_target_dirty,
g_conf->client_oc_max_dirty_age);
g_conf->client_oc_max_dirty_age,
true);
obc.start();
atomic_t outstanding_reads;
@ -110,7 +111,7 @@ int stress_test(uint64_t num_ops, uint64_t num_objs,
ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, bl, utime_t(), 0);
wr->extents.push_back(op->extent);
lock.Lock();
obc.writex(wr, &object_set, lock);
obc.writex(wr, &object_set, lock, NULL);
lock.Unlock();
}
}
@ -146,13 +147,11 @@ int stress_test(uint64_t num_ops, uint64_t num_objs,
bool already_flushed = obc.flush_set(&object_set, onfinish);
std::cout << "already flushed = " << already_flushed << std::endl;
lock.Unlock();
if (!already_flushed) {
mylock.Lock();
while (!done) {
cond.Wait(mylock);
}
mylock.Unlock();
mylock.Lock();
while (!done) {
cond.Wait(mylock);
}
mylock.Unlock();
lock.Lock();
bool unclean = obc.release_set(&object_set);