librbd: refactor synchronous I/O

Write in terms of the asynchronous functions, so all the logic
is not duplicated. Now there's only a single point where each
operation needs to change for layering.

Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
This commit is contained in:
Josh Durgin 2012-07-23 17:28:49 -07:00
parent ff2a96c7c7
commit c6bc3e1030
2 changed files with 91 additions and 129 deletions

View File

@ -1583,6 +1583,12 @@ namespace librbd {
lock_holder, cookie);
}
void rbd_ctx_cb(completion_t cb, void *arg)
{
Context *ctx = reinterpret_cast<Context *>(arg);
AioCompletion *comp = reinterpret_cast<AioCompletion *>(cb);
ctx->complete(comp->get_return_value());
}
int64_t read_iterate(ImageCtx *ictx, uint64_t off, size_t len,
int (*cb)(uint64_t, size_t, const char *, void *),
@ -1601,7 +1607,6 @@ namespace librbd {
if (r < 0)
return r;
int64_t ret;
int64_t total_read = 0;
ictx->lock.Lock();
uint64_t start_block = get_block_num(ictx->order, off);
@ -1612,51 +1617,49 @@ namespace librbd {
start_time = ceph_clock_now(ictx->cct);
for (uint64_t i = start_block; i <= end_block; i++) {
bufferlist bl;
ictx->lock.Lock();
string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
uint64_t block_ofs = get_block_ofs(ictx->order, off + total_read);
ictx->lock.Unlock();
uint64_t total_off = off + total_read;
uint64_t block_ofs = get_block_ofs(ictx->order, total_off);
uint64_t read_len = min(block_size - block_ofs, left);
uint64_t bytes_read;
buffer::ptr bp(read_len);
bufferlist bl;
bl.append(bp);
if (ictx->object_cacher) {
r = ictx->read_from_cache(oid, &bl, read_len, block_ofs);
if (r < 0 && r != -ENOENT)
return r;
Mutex mylock("IoCtxImpl::write::mylock");
Cond cond;
bool done;
int ret;
if (r == -ENOENT)
r = cb(total_read, read_len, NULL, arg);
else
r = cb(total_read, read_len, bl.c_str(), arg);
bytes_read = read_len; // ObjectCacher pads with zeroes at end of object
} else {
map<uint64_t, uint64_t> m;
r = ictx->data_ctx.sparse_read(oid, m, bl, read_len, block_ofs);
if (r < 0 && r == -ENOENT)
r = 0;
if (r < 0) {
return r;
}
r = handle_sparse_read(ictx->cct, bl, block_ofs, m, total_read,
read_len, cb, arg);
bytes_read = r;
}
Context *ctx = new C_SafeCond(&mylock, &cond, &done, &ret);
AioCompletion *c = aio_create_completion_internal(ctx, rbd_ctx_cb);
r = aio_read(ictx, total_off, read_len, bl.c_str(), c);
if (r < 0) {
c->release();
delete ctx;
return r;
}
total_read += bytes_read;
left -= bytes_read;
mylock.Lock();
while (!done)
cond.Wait(mylock);
mylock.Unlock();
c->release();
if (ret < 0)
return ret;
r = cb(total_read, ret, bl.c_str(), arg);
if (r < 0)
return r;
total_read += ret;
left -= ret;
}
ret = total_read;
elapsed = ceph_clock_now(ictx->cct) - start_time;
ictx->perfcounter->finc(l_librbd_rd_latency, elapsed);
ictx->perfcounter->inc(l_librbd_rd);
ictx->perfcounter->inc(l_librbd_rd_bytes, len);
return ret;
return total_read;
}
int simple_read_cb(uint64_t ofs, size_t len, const char *buf, void *arg)
@ -1670,7 +1673,6 @@ namespace librbd {
return 0;
}
ssize_t read(ImageCtx *ictx, uint64_t ofs, size_t len, char *buf)
{
return read_iterate(ictx, ofs, len, simple_read_cb, buf);
@ -1682,56 +1684,35 @@ namespace librbd {
ldout(ictx->cct, 20) << "write " << ictx << " off = " << off << " len = "
<< len << dendl;
if (!len)
return 0;
int r = ictx_check(ictx);
if (r < 0)
return r;
r = check_io(ictx, off, len);
if (r < 0)
return r;
size_t total_write = 0;
ictx->lock.Lock();
uint64_t start_block = get_block_num(ictx->order, off);
uint64_t end_block = get_block_num(ictx->order, off + len - 1);
uint64_t block_size = get_block_size(ictx->order);
snapid_t snap = ictx->snap_id;
ictx->lock.Unlock();
uint64_t left = len;
if (snap != CEPH_NOSNAP)
return -EROFS;
start_time = ceph_clock_now(ictx->cct);
for (uint64_t i = start_block; i <= end_block; i++) {
bufferlist bl;
ictx->lock.Lock();
string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
uint64_t block_ofs = get_block_ofs(ictx->order, off + total_write);
ictx->lock.Unlock();
uint64_t write_len = min(block_size - block_ofs, left);
bl.append(buf + total_write, write_len);
if (ictx->object_cacher) {
ictx->write_to_cache(oid, bl, write_len, block_ofs);
} else {
r = ictx->data_ctx.write(oid, bl, write_len, block_ofs);
if (r < 0)
return r;
if ((uint64_t)r != write_len)
return -EIO;
}
total_write += write_len;
left -= write_len;
Mutex mylock("librbd::write::mylock");
Cond cond;
bool done;
int ret;
Context *ctx = new C_SafeCond(&mylock, &cond, &done, &ret);
AioCompletion *c = aio_create_completion_internal(ctx, rbd_ctx_cb);
int r = aio_write(ictx, off, len, buf, c);
if (r < 0) {
c->release();
delete ctx;
return r;
}
mylock.Lock();
while (!done)
cond.Wait(mylock);
mylock.Unlock();
c->release();
if (ret < 0)
return ret;
elapsed = ceph_clock_now(ictx->cct) - start_time;
ictx->perfcounter->finc(l_librbd_wr_latency, elapsed);
ictx->perfcounter->inc(l_librbd_wr);
ictx->perfcounter->inc(l_librbd_wr_bytes, total_write);
return total_write;
ictx->perfcounter->inc(l_librbd_wr_bytes, len);
return len;
}
int discard(ImageCtx *ictx, uint64_t off, uint64_t len)
@ -1740,64 +1721,35 @@ namespace librbd {
ldout(ictx->cct, 20) << "discard " << ictx << " off = " << off << " len = "
<< len << dendl;
if (!len)
return 0;
int r = ictx_check(ictx);
if (r < 0)
return r;
r = check_io(ictx, off, len);
if (r < 0)
return r;
size_t total_write = 0;
ictx->lock.Lock();
uint64_t start_block = get_block_num(ictx->order, off);
uint64_t end_block = get_block_num(ictx->order, off + len - 1);
uint64_t block_size = get_block_size(ictx->order);
ictx->lock.Unlock();
uint64_t left = len;
vector<ObjectExtent> v;
if (ictx->object_cacher)
v.reserve(end_block - start_block + 1);
start_time = ceph_clock_now(ictx->cct);
for (uint64_t i = start_block; i <= end_block; i++) {
ictx->lock.Lock();
string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
uint64_t block_ofs = get_block_ofs(ictx->order, off + total_write);
ictx->lock.Unlock();
uint64_t write_len = min(block_size - block_ofs, left);
Mutex mylock("librbd::discard::mylock");
Cond cond;
bool done;
int ret;
if (ictx->object_cacher) {
v.push_back(ObjectExtent(oid, block_ofs, write_len));
v.back().oloc.pool = ictx->data_ctx.get_id();
}
librados::ObjectWriteOperation write_op;
if (block_ofs == 0 && write_len == block_size)
write_op.remove();
else if (write_len + block_ofs == block_size)
write_op.truncate(block_ofs);
else
write_op.zero(block_ofs, write_len);
r = ictx->data_ctx.operate(oid, &write_op);
if (r < 0)
return r;
total_write += write_len;
left -= write_len;
Context *ctx = new C_SafeCond(&mylock, &cond, &done, &ret);
AioCompletion *c = aio_create_completion_internal(ctx, rbd_ctx_cb);
int r = aio_discard(ictx, off, len, c);
if (r < 0) {
c->release();
delete ctx;
return r;
}
if (ictx->object_cacher)
ictx->object_cacher->discard_set(ictx->object_set, v);
mylock.Lock();
while (!done)
cond.Wait(mylock);
mylock.Unlock();
c->release();
if (ret < 0)
return ret;
elapsed = ceph_clock_now(ictx->cct) - start_time;
ictx->perfcounter->inc(l_librbd_discard_latency, elapsed);
ictx->perfcounter->inc(l_librbd_discard);
ictx->perfcounter->inc(l_librbd_discard_bytes, total_write);
return total_write;
ictx->perfcounter->inc(l_librbd_discard_bytes, len);
return len;
}
ssize_t handle_sparse_read(CephContext *cct,
@ -2005,6 +1957,7 @@ namespace librbd {
if (r < 0)
return r;
// TODO: check for snap
size_t total_write = 0;
ictx->lock.Lock();
uint64_t start_block = get_block_num(ictx->order, off);
@ -2166,4 +2119,11 @@ namespace librbd {
c->set_complete_cb(cb_arg, cb_complete);
return c;
}
AioCompletion *aio_create_completion_internal(void *cb_arg,
callback_t cb_complete) {
AioCompletion *c = aio_create_completion(cb_arg, cb_complete);
c->rbd_comp = c;
return c;
}
}

View File

@ -180,6 +180,8 @@ namespace librbd {
AioCompletion *aio_create_completion();
AioCompletion *aio_create_completion(void *cb_arg, callback_t cb_complete);
AioCompletion *aio_create_completion_internal(void *cb_arg,
callback_t cb_complete);
// raw callbacks
int simple_read_cb(uint64_t ofs, size_t len, const char *buf, void *arg);