librbd: implement read/write from parent images

Put the completion handling logic into new subclases of
librbd::AioRequest, so the caching/non-caching paths can share
logic. These AioRequests replace AioBlockCompletion as representing
the I/O to a single object in an RBD image.

Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
This commit is contained in:
Josh Durgin 2012-07-24 10:13:39 -07:00
parent c6bc3e1030
commit 90dc565022
11 changed files with 962 additions and 324 deletions

View File

@ -351,6 +351,7 @@ endif
librbd_la_SOURCES = \
librbd/librbd.cc \
librbd/AioCompletion.cc \
librbd/AioRequest.cc \
librbd/cls_rbd_client.cc \
librbd/ImageCtx.cc \
librbd/internal.cc \
@ -1377,6 +1378,7 @@ noinst_HEADERS = \
librados/PoolAsyncCompletionImpl.h\
librados/RadosClient.h\
librbd/AioCompletion.h\
librbd/AioRequest.h\
librbd/cls_rbd.h\
librbd/cls_rbd_client.h\
librbd/ImageCtx.h\

View File

@ -6,6 +6,7 @@
#include "common/ceph_context.h"
#include "common/dout.h"
#include "librbd/AioRequest.h"
#include "librbd/internal.h"
#include "librbd/AioCompletion.h"
@ -16,20 +17,9 @@
namespace librbd {
void AioBlockCompletion::finish(int r)
void AioCompletion::complete_request(CephContext *cct, ssize_t r)
{
ldout(cct, 10) << "AioBlockCompletion::finish()" << dendl;
if ((r >= 0 || r == -ENOENT) && buf) { // this was a sparse_read operation
ldout(cct, 10) << "ofs=" << ofs << " len=" << len << dendl;
r = handle_sparse_read(cct, data_bl, ofs, m, 0, len, simple_read_cb, buf);
}
completion->complete_block(this, r);
}
void AioCompletion::complete_block(AioBlockCompletion *block_completion, ssize_t r)
{
CephContext *cct = block_completion->cct;
ldout(cct, 20) << "AioCompletion::complete_block() this="
ldout(cct, 20) << "AioCompletion::complete_request() this="
<< (void *)this << " complete_cb=" << (void *)complete_cb << dendl;
lock.Lock();
if (rval >= 0) {
@ -45,4 +35,23 @@ namespace librbd {
}
put_unlock();
}
void C_AioRead::finish(int r)
{
ldout(m_cct, 10) << "C_AioRead::finish() " << this << dendl;
if (r >= 0 || r == -ENOENT) { // this was a sparse_read operation
ldout(m_cct, 10) << "ofs=" << m_req->offset()
<< " len=" << m_req->length() << dendl;
r = handle_sparse_read(m_cct, m_req->data(), m_req->offset(),
m_req->ext_map(), 0, m_req->length(),
simple_read_cb, m_out_buf);
}
m_completion->complete_request(m_cct, r);
}
void C_CacheRead::finish(int r)
{
m_completion->complete(r);
delete m_req;
}
}

View File

@ -5,7 +5,9 @@
#include "common/Cond.h"
#include "common/Mutex.h"
#include "common/ceph_context.h"
#include "common/perf_counters.h"
#include "include/Context.h"
#include "include/utime.h"
#include "include/rbd/librbd.hpp"
@ -13,13 +15,27 @@
#include "librbd/internal.h"
namespace librbd {
class AioRead;
typedef enum {
AIO_TYPE_READ = 0,
AIO_TYPE_WRITE,
AIO_TYPE_DISCARD
} aio_type_t;
struct AioBlockCompletion;
/**
* AioCompletion is the overall completion for a single
* rbd I/O request. It may be composed of many AioRequests,
* which each go to a single object.
*
* The retrying of individual requests is handled at a lower level,
* so all AioCompletion cares about is the count of outstanding
* requests. Note that this starts at 1 to prevent the reference
* count from reaching 0 while more requests are being added. When
* all requests have been added, finish_adding_requests() releases
* this initial reference.
*/
struct AioCompletion {
Mutex lock;
Cond cond;
@ -51,14 +67,14 @@ namespace librbd {
return 0;
}
void add_block_completion(AioBlockCompletion *aio_completion) {
void add_request() {
lock.Lock();
pending_count++;
lock.Unlock();
get();
}
void finish_adding_completions() {
void finish_adding_requests() {
lock.Lock();
assert(pending_count);
int count = --pending_count;
@ -99,7 +115,7 @@ namespace librbd {
complete_arg = cb_arg;
}
void complete_block(AioBlockCompletion *block_completion, ssize_t r);
void complete_request(CephContext *cct, ssize_t r);
ssize_t get_return_value() {
lock.Lock();
@ -133,22 +149,44 @@ namespace librbd {
}
};
struct AioBlockCompletion : Context {
CephContext *cct;
AioCompletion *completion;
uint64_t ofs;
size_t len;
char *buf;
std::map<uint64_t,uint64_t> m;
ceph::bufferlist data_bl;
librados::ObjectWriteOperation write_op;
AioBlockCompletion(CephContext *cct_, AioCompletion *aio_completion,
uint64_t _ofs, size_t _len, char *_buf)
: cct(cct_), completion(aio_completion),
ofs(_ofs), len(_len), buf(_buf) {}
virtual ~AioBlockCompletion() {}
class C_AioRead : public Context {
public:
C_AioRead(CephContext *cct, AioCompletion *completion, char *out_buf)
: m_cct(cct), m_completion(completion), m_out_buf(out_buf) {}
virtual ~C_AioRead() {}
virtual void finish(int r);
void set_req(AioRead *req) {
m_req = req;
}
private:
CephContext *m_cct;
AioCompletion *m_completion;
AioRead *m_req;
char *m_out_buf;
};
class C_AioWrite : public Context {
public:
C_AioWrite(CephContext *cct, AioCompletion *completion)
: m_cct(cct), m_completion(completion) {}
virtual ~C_AioWrite() {}
virtual void finish(int r) {
m_completion->complete_request(m_cct, r);
}
private:
CephContext *m_cct;
AioCompletion *m_completion;
};
class C_CacheRead : public Context {
public:
C_CacheRead(Context *completion, AioRead *req)
: m_completion(completion), m_req(req) {}
virtual ~C_CacheRead() {}
virtual void finish(int r);
private:
Context *m_completion;
AioRead *m_req;
};
}

205
src/librbd/AioRequest.cc Normal file
View File

@ -0,0 +1,205 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include "common/ceph_context.h"
#include "common/dout.h"
#include "common/Mutex.h"
#include "librbd/AioCompletion.h"
#include "librbd/ImageCtx.h"
#include "librbd/internal.h"
#include "librbd/AioRequest.h"
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
#define dout_prefix *_dout << "librbd::AioRequest: "
namespace librbd {
AioRequest::AioRequest() {}
AioRequest::AioRequest(ImageCtx *ictx, const std::string &oid,
uint64_t image_ofs, size_t len,
librados::snap_t snap_id,
Context *completion) {
m_ictx = ictx;
m_ioctx.dup(ictx->data_ctx);
m_ioctx.snap_set_read(snap_id);
m_oid = oid;
m_image_ofs = image_ofs;
m_block_ofs = get_block_ofs(ictx->order, image_ofs);
m_len = len;
m_snap_id = snap_id;
m_completion = completion;
m_parent_completion = NULL;
}
AioRequest::~AioRequest() {
if (m_parent_completion) {
m_parent_completion->release();
m_parent_completion = NULL;
}
}
void AioRequest::read_from_parent(uint64_t image_ofs, size_t len)
{
ldout(m_ictx->cct, 20) << "read_from_parent this = " << this << dendl;
assert(!m_parent_completion);
assert(m_ictx->parent_lock.is_locked());
m_parent_completion = aio_create_completion_internal(this, rbd_req_cb);
aio_read(m_ictx->parent, image_ofs, len, m_read_data.c_str(),
m_parent_completion);
}
bool AioRead::should_complete(int r)
{
ldout(m_ictx->cct, 20) << "read should_complete: r = " << r << dendl;
if (!m_tried_parent && r == -ENOENT) {
Mutex::Locker l(m_ictx->snap_lock);
Mutex::Locker l2(m_ictx->parent_lock);
size_t len = m_ictx->parent_io_len(m_image_ofs, m_len, m_snap_id);
if (len) {
m_tried_parent = true;
// zero the buffer so we have the full requested length result,
// even if we actually read less due to overlap
ceph::buffer::ptr bp(len);
bp.zero();
m_read_data.append(bp);
// fill in single extent for sparse read callback
m_ext_map[m_block_ofs] = len;
read_from_parent(m_image_ofs, len);
return false;
}
}
return true;
}
int AioRead::send() {
librados::AioCompletion *rados_completion =
librados::Rados::aio_create_completion(this, rados_req_cb, NULL);
int r;
if (m_sparse) {
r = m_ioctx.aio_sparse_read(m_oid, rados_completion, &m_ext_map,
&m_read_data, m_len, m_block_ofs);
} else {
r = m_ioctx.aio_read(m_oid, rados_completion, &m_read_data,
m_len, m_block_ofs);
}
rados_completion->release();
return r;
}
AbstractWrite::AbstractWrite() {}
AbstractWrite::AbstractWrite(ImageCtx *ictx, const std::string &oid,
uint64_t image_ofs, size_t len,
librados::snap_t snap_id, Context *completion,
bool has_parent, const ::SnapContext &snapc)
: AioRequest(ictx, oid, image_ofs, len, snap_id, completion)
{
m_state = LIBRBD_AIO_WRITE_FINAL;
m_has_parent = has_parent;
// TODO: find a way to make this less stupid
std::vector<librados::snap_t> snaps;
for (std::vector<snapid_t>::const_iterator it = snapc.snaps.begin();
it != snapc.snaps.end(); ++it) {
snaps.push_back(it->val);
}
m_ioctx.selfmanaged_snap_set_write_ctx(snapc.seq.val, snaps);
}
void AbstractWrite::guard_write()
{
if (m_has_parent) {
m_state = LIBRBD_AIO_WRITE_CHECK_EXISTS;
m_read.stat(NULL, NULL, NULL);
}
ldout(m_ictx->cct, 20) << __func__ << " m_has_parent = " << m_has_parent
<< " m_state = " << m_state << " check exists = "
<< LIBRBD_AIO_WRITE_CHECK_EXISTS << dendl;
}
bool AbstractWrite::should_complete(int r)
{
ldout(m_ictx->cct, 20) << "write " << this << " should_complete: r = "
<< r << dendl;
bool finished = true;
switch (m_state) {
case LIBRBD_AIO_WRITE_CHECK_EXISTS:
ldout(m_ictx->cct, 20) << "WRITE_CHECK_EXISTS" << dendl;
if (r < 0 && r != -ENOENT) {
ldout(m_ictx->cct, 20) << "error checking for object existence" << dendl;
break;
}
finished = false;
if (r == -ENOENT) {
Mutex::Locker l(m_ictx->snap_lock);
Mutex::Locker l2(m_ictx->parent_lock);
// copyup the entire object up to the overlap point
uint64_t block_begin = m_image_ofs - m_block_ofs;
size_t len = m_ictx->parent_io_len(block_begin,
get_block_size(m_ictx->order),
m_snap_id);
if (len) {
ldout(m_ictx->cct, 20) << "reading from parent" << dendl;
m_state = LIBRBD_AIO_WRITE_COPYUP;
ceph::buffer::ptr bp(len);
m_read_data.append(bp);
read_from_parent(block_begin, len);
break;
}
}
ldout(m_ictx->cct, 20) << "no need to read from parent" << dendl;
m_state = LIBRBD_AIO_WRITE_FINAL;
send();
break;
case LIBRBD_AIO_WRITE_COPYUP:
ldout(m_ictx->cct, 20) << "WRITE_COPYUP" << dendl;
m_state = LIBRBD_AIO_WRITE_FINAL;
if (r < 0)
return should_complete(r);
send_copyup();
finished = false;
break;
case LIBRBD_AIO_WRITE_FINAL:
ldout(m_ictx->cct, 20) << "WRITE_FINAL" << dendl;
// nothing to do
break;
default:
lderr(m_ictx->cct) << "invalid request state: " << m_state << dendl;
assert(0);
}
return finished;
}
int AbstractWrite::send() {
librados::AioCompletion *rados_completion =
librados::Rados::aio_create_completion(this, NULL, rados_req_cb);
int r;
if (m_state == LIBRBD_AIO_WRITE_CHECK_EXISTS) {
assert(m_read.size());
r = m_ioctx.aio_operate(m_oid, rados_completion, &m_read, &m_read_data);
} else {
assert(m_write.size());
r = m_ioctx.aio_operate(m_oid, rados_completion, &m_write);
}
rados_completion->release();
return r;
}
void AbstractWrite::send_copyup() {
m_copyup.exec("rbd", "copyup", m_read_data);
add_copyup_ops();
librados::AioCompletion *rados_completion =
librados::Rados::aio_create_completion(this, NULL, rados_req_cb);
m_ictx->md_ctx.aio_operate(m_oid, rados_completion, &m_copyup);
rados_completion->release();
}
}

222
src/librbd/AioRequest.h Normal file
View File

@ -0,0 +1,222 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef CEPH_LIBRBD_AIOREQUEST_H
#define CEPH_LIBRBD_AIOREQUEST_H
#include <map>
#include "inttypes.h"
#include "common/snap_types.h"
#include "include/buffer.h"
#include "include/Context.h"
#include "include/rados/librados.hpp"
namespace librbd {
class AioCompletion;
class ImageCtx;
/**
* This class represents an I/O operation to a single RBD data object.
* Its subclasses encapsulate logic for dealing with special cases
* for I/O due to layering.
*/
class AioRequest
{
public:
AioRequest();
AioRequest(ImageCtx *ictx, const std::string &oid, uint64_t image_ofs,
size_t len, librados::snap_t snap_id, Context *completion);
virtual ~AioRequest();
uint64_t offset()
{
return m_block_ofs;
}
size_t length()
{
return m_len;
}
void complete(int r)
{
if (should_complete(r)) {
m_completion->complete(r);
delete this;
}
}
virtual bool should_complete(int r) = 0;
virtual int send() = 0;
protected:
void read_from_parent(uint64_t image_ofs, size_t len);
ImageCtx *m_ictx;
librados::IoCtx m_ioctx;
std::string m_oid;
uint64_t m_image_ofs;
uint64_t m_block_ofs;
size_t m_len;
librados::snap_t m_snap_id;
Context *m_completion;
AioCompletion *m_parent_completion;
ceph::bufferlist m_read_data;
};
class AioRead : public AioRequest {
public:
AioRead(ImageCtx *ictx, const std::string &oid, uint64_t image_ofs,
size_t len, librados::snap_t snap_id, bool sparse,
Context *completion)
: AioRequest(ictx, oid, image_ofs, len, snap_id, completion),
m_tried_parent(false), m_sparse(sparse) {
m_ioctx.snap_set_read(m_snap_id);
}
virtual ~AioRead() {}
virtual bool should_complete(int r);
virtual int send();
ceph::bufferlist &data() {
return m_read_data;
}
std::map<uint64_t, uint64_t> &ext_map() {
return m_ext_map;
}
private:
std::map<uint64_t, uint64_t> m_ext_map;
bool m_tried_parent;
bool m_sparse;
};
class AbstractWrite : public AioRequest {
public:
AbstractWrite();
AbstractWrite(ImageCtx *ictx, const std::string &oid, uint64_t image_ofs,
size_t len, librados::snap_t snap_id, Context *completion,
bool has_parent, const ::SnapContext &snapc);
virtual ~AbstractWrite() {}
virtual bool should_complete(int r);
virtual int send();
void guard_write();
private:
/**
* Writes go through the following state machine to
* deal with layering:
* need copyup
* LIBRBD_AIO_CHECK_EXISTS ---------------> LIBRBD_AIO_WRITE_COPYUP
* | |
* | no overlap or object exists | parent data read
* | |
* v |
* LIBRBD_AIO_WRITE_FINAL <--------------------------/
*
* By default images start in LIBRBD_AIO_WRITE_FINAL.
* If the write may need a copyup, it will start in
* LIBRBD_AIO_WRITE_CHECK_EXISTS instead.
*/
enum write_state_d {
LIBRBD_AIO_WRITE_CHECK_EXISTS,
LIBRBD_AIO_WRITE_COPYUP,
LIBRBD_AIO_WRITE_FINAL
};
protected:
virtual void add_copyup_ops() = 0;
write_state_d m_state;
bool m_has_parent;
librados::ObjectReadOperation m_read;
librados::ObjectWriteOperation m_write;
librados::ObjectWriteOperation m_copyup;
private:
void send_copyup();
};
class AioWrite : public AbstractWrite {
public:
AioWrite(ImageCtx *ictx, const std::string &oid, uint64_t image_ofs,
const ceph::bufferlist &data, const ::SnapContext &snapc,
librados::snap_t snap_id, bool has_parent, Context *completion)
: AbstractWrite(ictx, oid, image_ofs, data.length(), snap_id, completion,
has_parent, snapc),
m_write_data(data) {
guard_write();
m_write.write(m_block_ofs, data);
}
virtual ~AioWrite() {}
protected:
virtual void add_copyup_ops() {
m_copyup.write(m_block_ofs, m_write_data);
}
private:
ceph::bufferlist m_write_data;
};
class AioRemove : public AbstractWrite {
public:
AioRemove(ImageCtx *ictx, const std::string &oid, uint64_t image_ofs,
const ::SnapContext &snapc, librados::snap_t snap_id,
bool has_parent, Context *completion)
: AbstractWrite(ictx, oid, image_ofs, 0, snap_id, completion,
has_parent, snapc) {
if (has_parent)
m_write.truncate(0);
else
m_write.remove();
}
virtual ~AioRemove() {}
protected:
virtual void add_copyup_ops() {
// removing an object never needs to copyup
assert(0);
}
};
class AioTruncate : public AbstractWrite {
public:
AioTruncate(ImageCtx *ictx, const std::string &oid, uint64_t image_ofs,
const ::SnapContext &snapc, librados::snap_t snap_id,
bool has_parent, Context *completion)
: AbstractWrite(ictx, oid, image_ofs, 0, snap_id, completion,
has_parent, snapc) {
guard_write();
m_write.truncate(m_block_ofs);
}
virtual ~AioTruncate() {}
protected:
virtual void add_copyup_ops() {
m_copyup.truncate(m_block_ofs);
}
};
class AioZero : public AbstractWrite {
public:
AioZero(ImageCtx *ictx, const std::string &oid, uint64_t image_ofs,
size_t len, const ::SnapContext &snapc, librados::snap_t snap_id,
bool has_parent, Context *completion)
: AbstractWrite(ictx, oid, image_ofs, len, snap_id, completion,
has_parent, snapc) {
guard_write();
m_write.zero(m_block_ofs, len);
}
virtual ~AioZero() {}
protected:
virtual void add_copyup_ops() {
m_copyup.zero(m_block_ofs, m_len);
}
};
}
#endif

View File

@ -38,9 +38,11 @@ namespace librbd {
wctx(NULL),
refresh_seq(0),
last_refresh(0),
refresh_lock("librbd::ImageCtx::refresh_lock"),
lock("librbd::ImageCtx::lock"),
md_lock("librbd::ImageCtx::md_lock"),
cache_lock("librbd::ImageCtx::cache_lock"),
snap_lock("librbd::ImageCtx::snap_lock"),
parent_lock("librbd::ImageCtx::parent_lock"),
refresh_lock("librbd::ImageCtx::refresh_lock"),
old_format(true),
order(0), size(0), features(0), id(image_id), parent(NULL),
object_cacher(NULL), writeback_handler(NULL), object_set(NULL)
@ -60,7 +62,7 @@ namespace librbd {
if (cct->_conf->rbd_cache) {
Mutex::Locker l(cache_lock);
ldout(cct, 20) << "enabling writeback caching..." << dendl;
writeback_handler = new LibrbdWriteback(data_ctx, cache_lock);
writeback_handler = new LibrbdWriteback(this, cache_lock);
object_cacher = new ObjectCacher(cct, pname, *writeback_handler, cache_lock,
NULL, NULL,
cct->_conf->rbd_cache_size,
@ -165,6 +167,7 @@ namespace librbd {
int ImageCtx::snap_set(string in_snap_name)
{
assert(snap_lock.is_locked());
map<string, SnapInfo>::iterator it = snaps_by_name.find(in_snap_name);
if (it != snaps_by_name.end()) {
snap_name = in_snap_name;
@ -178,6 +181,7 @@ namespace librbd {
void ImageCtx::snap_unset()
{
assert(snap_lock.is_locked());
snap_id = CEPH_NOSNAP;
snap_name = "";
snap_exists = true;
@ -186,6 +190,7 @@ namespace librbd {
snap_t ImageCtx::get_snap_id(string in_snap_name) const
{
assert(snap_lock.is_locked());
map<string, SnapInfo>::const_iterator it = snaps_by_name.find(in_snap_name);
if (it != snaps_by_name.end())
return it->second.id;
@ -194,6 +199,7 @@ namespace librbd {
int ImageCtx::get_snap_name(snapid_t in_snap_id, string *out_snap_name) const
{
assert(snap_lock.is_locked());
map<string, SnapInfo>::const_iterator it;
for (it = snaps_by_name.begin(); it != snaps_by_name.end(); it++) {
@ -207,6 +213,7 @@ namespace librbd {
int ImageCtx::get_snap_size(string in_snap_name, uint64_t *out_size) const
{
assert(snap_lock.is_locked());
map<string, SnapInfo>::const_iterator it = snaps_by_name.find(in_snap_name);
if (it != snaps_by_name.end()) {
*out_size = it->second.size;
@ -219,6 +226,7 @@ namespace librbd {
uint64_t features,
cls_client::parent_info parent)
{
assert(snap_lock.is_locked());
snaps.push_back(id);
SnapInfo info(id, in_size, features, parent);
snaps_by_name.insert(pair<string, SnapInfo>(in_snap_name, info));
@ -226,6 +234,8 @@ namespace librbd {
uint64_t ImageCtx::get_image_size(snap_t in_snap_id) const
{
assert(md_lock.is_locked());
assert(snap_lock.is_locked());
if (in_snap_id == CEPH_NOSNAP) {
return size;
}
@ -241,6 +251,8 @@ namespace librbd {
int ImageCtx::get_features(snap_t in_snap_id, uint64_t *out_features) const
{
assert(md_lock.is_locked());
assert(snap_lock.is_locked());
if (in_snap_id == CEPH_NOSNAP) {
*out_features = features;
return 0;
@ -258,6 +270,8 @@ namespace librbd {
int64_t ImageCtx::get_parent_pool_id(snap_t in_snap_id) const
{
assert(snap_lock.is_locked());
assert(parent_lock.is_locked());
if (in_snap_id == CEPH_NOSNAP) {
return parent_md.pool_id;
}
@ -273,6 +287,8 @@ namespace librbd {
string ImageCtx::get_parent_image_id(snap_t in_snap_id) const
{
assert(snap_lock.is_locked());
assert(parent_lock.is_locked());
if (in_snap_id == CEPH_NOSNAP) {
return parent_md.image_id;
}
@ -288,6 +304,8 @@ namespace librbd {
uint64_t ImageCtx::get_parent_snap_id(snap_t in_snap_id) const
{
assert(snap_lock.is_locked());
assert(parent_lock.is_locked());
if (in_snap_id == CEPH_NOSNAP) {
return parent_md.snap_id;
}
@ -303,6 +321,8 @@ namespace librbd {
int ImageCtx::get_parent_overlap(snap_t in_snap_id, uint64_t *overlap) const
{
assert(snap_lock.is_locked());
assert(parent_lock.is_locked());
if (in_snap_id == CEPH_NOSNAP) {
*overlap = parent_md.overlap;
return 0;
@ -320,9 +340,9 @@ namespace librbd {
void ImageCtx::aio_read_from_cache(object_t o, bufferlist *bl, size_t len,
uint64_t off, Context *onfinish) {
lock.Lock();
snap_lock.Lock();
ObjectCacher::OSDRead *rd = object_cacher->prepare_read(snap_id, bl, 0);
lock.Unlock();
snap_lock.Unlock();
ObjectExtent extent(o, off, len);
extent.oloc.pool = data_ctx.get_id();
extent.buffer_extents[0] = len;
@ -336,10 +356,10 @@ namespace librbd {
void ImageCtx::write_to_cache(object_t o, bufferlist& bl, size_t len,
uint64_t off) {
lock.Lock();
snap_lock.Lock();
ObjectCacher::OSDWrite *wr = object_cacher->prepare_write(snapc, bl,
utime_t(), 0);
lock.Unlock();
snap_lock.Unlock();
ObjectExtent extent(o, off, len);
extent.oloc.pool = data_ctx.get_id();
extent.buffer_extents[0] = len;
@ -387,14 +407,14 @@ namespace librbd {
}
void ImageCtx::shutdown_cache() {
lock.Lock();
md_lock.Lock();
invalidate_cache();
lock.Unlock();
md_lock.Unlock();
object_cacher->stop();
}
void ImageCtx::invalidate_cache() {
assert(lock.is_locked());
assert(md_lock.is_locked());
if (!object_cacher)
return;
cache_lock.Lock();
@ -418,11 +438,27 @@ namespace librbd {
void ImageCtx::unregister_watch() {
assert(wctx);
lock.Lock();
wctx->invalidate();
md_ctx.unwatch(header_oid, wctx->cookie);
lock.Unlock();
delete wctx;
wctx = NULL;
}
size_t ImageCtx::parent_io_len(uint64_t offset, size_t length,
snap_t in_snap_id)
{
assert(snap_lock.is_locked());
assert(parent_lock.is_locked());
uint64_t overlap = 0;
get_parent_overlap(in_snap_id, &overlap);
size_t parent_len = 0;
if (get_parent_pool_id(in_snap_id) != -1 && offset <= overlap)
parent_len = min(overlap, offset + length) - offset;
ldout(cct, 20) << __func__ << " off = " << offset << " len = " << length
<< " overlap = " << overlap << " parent_io_len = "
<< parent_len << dendl;
return parent_len;
}
}

View File

@ -47,9 +47,18 @@ namespace librbd {
WatchCtx *wctx;
int refresh_seq; ///< sequence for refresh requests
int last_refresh; ///< last completed refresh
Mutex refresh_lock;
Mutex lock; // protects access to snapshot and header information
/**
* Lock ordering:
* md_lock, cache_lock, snap_lock, parent_lock, refresh_lock
*/
Mutex md_lock; // protects access to the mutable image metadata that
// isn't guarded by other locks below
// (size, features, image locks, etc)
Mutex cache_lock; // used as client_lock for the ObjectCacher
Mutex snap_lock; // protects snapshot-related member variables:
Mutex parent_lock; // protects parent_md and parent
Mutex refresh_lock; // protects refresh_seq and last_refresh
bool old_format;
uint8_t order;
@ -102,6 +111,8 @@ namespace librbd {
void invalidate_cache();
int register_watch();
void unregister_watch();
size_t parent_io_len(uint64_t offset, size_t length,
librados::snap_t in_snap_id);
};
}

View File

@ -1,12 +1,19 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include <errno.h>
#include "common/ceph_context.h"
#include "common/dout.h"
#include "common/Mutex.h"
#include "include/rados/librados.h"
#include "include/Context.h"
#include "include/rados/librados.hpp"
#include "include/rbd/librbd.hpp"
#include "LibrbdWriteback.h"
#include "librbd/AioRequest.h"
#include "librbd/ImageCtx.h"
#include "librbd/internal.h"
#include "librbd/LibrbdWriteback.h"
#include "include/assert.h"
@ -14,72 +21,93 @@
#undef dout_prefix
#define dout_prefix *_dout << "librbdwriteback: "
// If we change the librados api to use an overrideable class for callbacks
// (like it does with watch/notify) this will be much nicer
struct CallbackArgs {
CephContext *cct;
Context *ctx;
Mutex *lock;
CallbackArgs(CephContext *cct, Context *c, Mutex *l) :
cct(cct), ctx(c), lock(l) {}
};
namespace librbd {
static void librbd_writeback_librados_aio_cb(rados_completion_t c, void *arg)
{
CallbackArgs *args = reinterpret_cast<CallbackArgs *>(arg);
ldout(args->cct, 20) << "aio_cb completing " << dendl;
class C_Request : public Context {
public:
C_Request(CephContext *cct, Context *c, Mutex *l)
: m_cct(cct), m_ctx(c), m_lock(l) {}
virtual ~C_Request() {}
void set_req(AioRequest *req);
virtual void finish(int r) {
ldout(m_cct, 20) << "aio_cb completing " << dendl;
{
Mutex::Locker l(*m_lock);
m_ctx->complete(r);
}
ldout(m_cct, 20) << "aio_cb finished" << dendl;
}
private:
CephContext *m_cct;
Context *m_ctx;
Mutex *m_lock;
};
class C_Read : public Context {
public:
C_Read(Context *real_context, bufferlist *pbl)
: m_ctx(real_context), m_out_bl(pbl) {}
virtual ~C_Read() {}
virtual void finish(int r) {
if (r >= 0)
*m_out_bl = m_req->data();
m_ctx->complete(r);
}
void set_req(AioRead *req) {
m_req = req;
}
private:
Context *m_ctx;
AioRead *m_req;
bufferlist *m_out_bl;
};
LibrbdWriteback::LibrbdWriteback(ImageCtx *ictx, Mutex& lock)
: m_tid(0), m_lock(lock), m_ictx(ictx)
{
Mutex::Locker l(*args->lock);
args->ctx->complete(rados_aio_get_return_value(c));
}
rados_aio_release(c);
ldout(args->cct, 20) << "aio_cb finished" << dendl;
delete args;
}
LibrbdWriteback::LibrbdWriteback(const librados::IoCtx& io, Mutex& lock)
: m_tid(0), m_lock(lock)
{
m_ioctx.dup(io);
}
tid_t LibrbdWriteback::read(const object_t& oid,
const object_locator_t& oloc,
uint64_t off, uint64_t len, snapid_t snapid,
bufferlist *pbl, uint64_t trunc_size,
__u32 trunc_seq, Context *onfinish)
{
CallbackArgs *args = new CallbackArgs((CephContext *)m_ioctx.cct(),
onfinish, &m_lock);
librados::AioCompletion *rados_cb =
librados::Rados::aio_create_completion(args, librbd_writeback_librados_aio_cb, NULL);
m_ioctx.snap_set_read(snapid.val);
m_ioctx.aio_read(oid.name, rados_cb, pbl, len, off);
return ++m_tid;
}
tid_t LibrbdWriteback::write(const object_t& oid,
const object_locator_t& oloc,
uint64_t off, uint64_t len,
const SnapContext& snapc,
const bufferlist &bl, utime_t mtime,
uint64_t trunc_size, __u32 trunc_seq,
Context *oncommit)
{
CallbackArgs *args = new CallbackArgs((CephContext *)m_ioctx.cct(),
oncommit, &m_lock);
librados::AioCompletion *rados_cb =
librados::Rados::aio_create_completion(args, NULL, librbd_writeback_librados_aio_cb);
// TODO: find a way to make this less stupid
vector<librados::snap_t> snaps;
for (vector<snapid_t>::const_iterator it = snapc.snaps.begin();
it != snapc.snaps.end(); ++it) {
snaps.push_back(it->val);
}
m_ioctx.snap_set_read(CEPH_NOSNAP);
m_ioctx.selfmanaged_snap_set_write_ctx(snapc.seq.val, snaps);
m_ioctx.aio_write(oid.name, rados_cb, bl, len, off);
return ++m_tid;
tid_t LibrbdWriteback::read(const object_t& oid,
const object_locator_t& oloc,
uint64_t off, uint64_t len, snapid_t snapid,
bufferlist *pbl, uint64_t trunc_size,
__u32 trunc_seq, Context *onfinish)
{
C_Request *req_comp = new C_Request(m_ictx->cct, onfinish, &m_lock);
C_Read *read_comp = new C_Read(req_comp, pbl);
uint64_t total_off = offset_of_object(oid.name, m_ictx->object_prefix,
m_ictx->order) + off;
AioRead *req = new AioRead(m_ictx, oid.name, total_off, len, snapid.val,
false, read_comp);
read_comp->set_req(req);
req->send();
return ++m_tid;
}
tid_t LibrbdWriteback::write(const object_t& oid,
const object_locator_t& oloc,
uint64_t off, uint64_t len,
const SnapContext& snapc,
const bufferlist &bl, utime_t mtime,
uint64_t trunc_size, __u32 trunc_seq,
Context *oncommit)
{
m_ictx->snap_lock.Lock();
librados::snap_t snap_id = m_ictx->snap_id;
m_ictx->parent_lock.Lock();
int64_t parent_pool_id = m_ictx->get_parent_pool_id(snap_id);
uint64_t overlap = 0;
m_ictx->get_parent_overlap(snap_id, &overlap);
m_ictx->parent_lock.Unlock();
m_ictx->snap_lock.Unlock();
uint64_t total_off = offset_of_object(oid.name, m_ictx->object_prefix,
m_ictx->order) + off;
bool parent_exists = has_parent(parent_pool_id, total_off - off, overlap);
C_Request *req_comp = new C_Request(m_ictx->cct, oncommit, &m_lock);
AioWrite *req = new AioWrite(m_ictx, oid.name, total_off, bl, snapc,
snap_id, parent_exists, req_comp);
req->send();
return ++m_tid;
}
}

View File

@ -1,7 +1,7 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef CEPH_OSDC_LIBRBDWRITEBACKHANDLER_H
#define CEPH_OSDC_LIBRBDWRITEBACKHANDLER_H
#ifndef CEPH_LIBRBD_LIBRBDWRITEBACKHANDLER_H
#define CEPH_LIBRBD_LIBRBDWRITEBACKHANDLER_H
#include "include/Context.h"
#include "include/types.h"
@ -9,27 +9,34 @@
#include "osd/osd_types.h"
#include "osdc/WritebackHandler.h"
class LibrbdWriteback : public WritebackHandler {
public:
LibrbdWriteback(const librados::IoCtx& io, Mutex& lock);
virtual ~LibrbdWriteback() {}
class Mutex;
// Note that oloc, trunc_size, and trunc_seq are ignored
virtual tid_t read(const object_t& oid, const object_locator_t& oloc,
uint64_t off, uint64_t len, snapid_t snapid,
bufferlist *pbl, uint64_t trunc_size, __u32 trunc_seq,
Context *onfinish);
namespace librbd {
// Note that oloc, trunc_size, and trunc_seq are ignored
virtual tid_t write(const object_t& oid, const object_locator_t& oloc,
uint64_t off, uint64_t len, const SnapContext& snapc,
const bufferlist &bl, utime_t mtime, uint64_t trunc_size,
__u32 trunc_seq, Context *oncommit);
class ImageCtx;
private:
int m_tid;
Mutex& m_lock;
librados::IoCtx m_ioctx;
};
class LibrbdWriteback : public WritebackHandler {
public:
LibrbdWriteback(ImageCtx *ictx, Mutex& lock);
virtual ~LibrbdWriteback() {}
// Note that oloc, trunc_size, and trunc_seq are ignored
virtual tid_t read(const object_t& oid, const object_locator_t& oloc,
uint64_t off, uint64_t len, snapid_t snapid,
bufferlist *pbl, uint64_t trunc_size, __u32 trunc_seq,
Context *onfinish);
// Note that oloc, trunc_size, and trunc_seq are ignored
virtual tid_t write(const object_t& oid, const object_locator_t& oloc,
uint64_t off, uint64_t len, const SnapContext& snapc,
const bufferlist &bl, utime_t mtime, uint64_t trunc_size,
__u32 trunc_seq, Context *oncommit);
private:
int m_tid;
Mutex& m_lock;
librbd::ImageCtx *m_ictx;
};
}
#endif

View File

@ -8,6 +8,7 @@
#include "common/errno.h"
#include "librbd/AioCompletion.h"
#include "librbd/AioRequest.h"
#include "librbd/ImageCtx.h"
#include "librbd/internal.h"
@ -66,6 +67,11 @@ namespace librbd {
return 0;
}
bool has_parent(int64_t parent_pool_id, uint64_t off, uint64_t overlap)
{
return (parent_pool_id != -1 && off <= overlap);
}
void init_rbd_header(struct rbd_obj_header_ondisk& ondisk,
uint64_t size, int *order, uint64_t bid)
{
@ -93,7 +99,11 @@ namespace librbd {
void image_info(ImageCtx *ictx, image_info_t& info, size_t infosize)
{
int obj_order = ictx->order;
ictx->md_lock.Lock();
ictx->snap_lock.Lock();
info.size = ictx->get_image_size(ictx->snap_id);
ictx->snap_lock.Unlock();
ictx->md_lock.Unlock();
info.obj_size = 1 << obj_order;
info.num_objs = howmany(info.size, get_block_size(obj_order));
info.order = obj_order;
@ -114,6 +124,18 @@ namespace librbd {
return oss.str();
}
uint64_t offset_of_object(const string &oid, const string &object_prefix,
uint8_t order)
{
istringstream iss(oid);
// skip object prefix and separator
iss.ignore(object_prefix.length() + 1);
uint64_t num, offset;
iss >> std::hex >> num;
offset = num * (1 << order);
return offset;
}
uint64_t get_max_block(uint64_t size, uint8_t obj_order)
{
uint64_t block_size = 1 << obj_order;
@ -147,6 +169,7 @@ namespace librbd {
void trim_image(ImageCtx *ictx, uint64_t newsize, ProgressContext& prog_ctx)
{
assert(ictx->md_lock.is_locked());
CephContext *cct = (CephContext *)ictx->data_ctx.cct();
uint64_t bsize = get_block_size(ictx->order);
uint64_t numseg = get_max_block(ictx->size, ictx->order);
@ -247,7 +270,7 @@ namespace librbd {
uint64_t ver;
if (ictx) {
assert(ictx->lock.is_locked());
assert(ictx->md_lock.is_locked());
ictx->refresh_lock.Lock();
++ictx->refresh_seq;
ictx->refresh_lock.Unlock();
@ -308,7 +331,7 @@ namespace librbd {
int rollback_image(ImageCtx *ictx, uint64_t snap_id,
ProgressContext& prog_ctx)
{
assert(ictx->lock.is_locked());
assert(ictx->md_lock.is_locked());
uint64_t numseg = get_max_block(ictx->size, ictx->order);
uint64_t bsize = get_block_size(ictx->order);
@ -374,7 +397,7 @@ namespace librbd {
if (r < 0)
return r;
Mutex::Locker l(ictx->lock);
Mutex::Locker l(ictx->md_lock);
r = add_snap(ictx, snap_name);
if (r < 0)
@ -394,8 +417,10 @@ namespace librbd {
if (r < 0)
return r;
Mutex::Locker l(ictx->lock);
Mutex::Locker l(ictx->md_lock);
ictx->snap_lock.Lock();
snap_t snap_id = ictx->get_snap_id(snap_name);
ictx->snap_lock.Unlock();
if (snap_id == CEPH_NOSNAP)
return -ENOENT;
@ -559,7 +584,15 @@ namespace librbd {
return -EINVAL;
}
if ((p_imctx->features & RBD_FEATURE_LAYERING) != RBD_FEATURE_LAYERING) {
p_imctx->md_lock.Lock();
p_imctx->snap_lock.Lock();
uint64_t p_features;
p_imctx->get_features(p_imctx->snap_id, &p_features);
uint64_t size = p_imctx->get_image_size(p_imctx->snap_id);
p_imctx->snap_lock.Unlock();
p_imctx->md_lock.Unlock();
if ((p_features & RBD_FEATURE_LAYERING) != RBD_FEATURE_LAYERING) {
lderr(cct) << "parent image must support layering" << dendl;
return -EINVAL;
}
@ -570,7 +603,6 @@ namespace librbd {
order = p_imctx->order;
}
uint64_t size = p_imctx->get_image_size(p_imctx->snap_id);
int remove_r;
librbd::NoOpProgressContext no_op;
ImageCtx *c_imctx = NULL;
@ -733,7 +765,6 @@ namespace librbd {
if (r < 0)
return r;
Mutex::Locker l(ictx->lock);
image_info(ictx, info, infosize);
return 0;
}
@ -743,7 +774,6 @@ namespace librbd {
int r = ictx_check(ictx);
if (r < 0)
return r;
Mutex::Locker(ictx->lock);
*old = ictx->old_format;
return 0;
}
@ -753,7 +783,8 @@ namespace librbd {
int r = ictx_check(ictx);
if (r < 0)
return r;
Mutex::Locker(ictx->lock);
Mutex::Locker l(ictx->md_lock);
Mutex::Locker l2(ictx->snap_lock);
*size = ictx->get_image_size(ictx->snap_id);
return 0;
}
@ -763,7 +794,8 @@ namespace librbd {
int r = ictx_check(ictx);
if (r < 0)
return r;
Mutex::Locker(ictx->lock);
Mutex::Locker l(ictx->md_lock);
Mutex::Locker l2(ictx->snap_lock);
return ictx->get_features(ictx->snap_id, features);
}
@ -772,22 +804,25 @@ namespace librbd {
int r = ictx_check(ictx);
if (r < 0)
return r;
Mutex::Locker(ictx->lock);
Mutex::Locker l(ictx->snap_lock);
Mutex::Locker l2(ictx->parent_lock);
return ictx->get_parent_overlap(ictx->snap_id, overlap);
}
int open_parent(ImageCtx *ictx, ImageCtx **parent_ctx,
string *parent_pool_name, string *parent_image_name)
{
assert(ictx->snap_lock.is_locked());
assert(ictx->parent_lock.is_locked());
assert(!(*parent_ctx));
assert(ictx->parent_md.pool_id >= 0);
string pool_name;
Rados rados(ictx->md_ctx);
ictx->lock.Lock();
int64_t pool_id = ictx->get_parent_pool_id(ictx->snap_id);
string parent_image_id = ictx->get_parent_image_id(ictx->snap_id);
snap_t parent_snap_id = ictx->get_parent_snap_id(ictx->snap_id);
ictx->lock.Unlock();
assert(parent_snap_id != CEPH_NOSNAP);
if (pool_id < 0)
return -ENOENT;
int r = rados.pool_reverse_lookup(pool_id, &pool_name);
@ -805,7 +840,6 @@ namespace librbd {
return r;
}
if (parent_image_name) {
r = cls_client::dir_get_name(&p_ioctx, RBD_DIRECTORY,
parent_image_id, parent_image_name);
@ -826,14 +860,16 @@ namespace librbd {
close_image(parent);
return r;
}
parent->snap_lock.Lock();
r = parent->get_snap_name(parent_snap_id, &parent->snap_name);
if (r < 0) {
lderr(ictx->cct) << "parent snapshot does not exist" << dendl;
parent->snap_lock.Unlock();
close_image(parent);
return r;
}
parent->snap_set(parent->snap_name);
parent->snap_lock.Unlock();
if (parent_ctx)
*parent_ctx = parent;
if (parent_pool_name)
@ -849,12 +885,14 @@ namespace librbd {
if (r < 0)
return r;
Mutex::Locker l(ictx->lock);
Mutex::Locker l(ictx->snap_lock);
Mutex::Locker l2(ictx->parent_lock);
if (ictx->get_parent_pool_id(ictx->snap_id) < 0)
return -ENOENT;
// for parent snap_name, we need to open the parent ImageCtx, for which
// we use the same rados handle
// TODO: parent is already open!
ImageCtx *p_imctx = NULL;
r = open_parent(ictx, &p_imctx, parent_pool_name, parent_name);
if (r < 0)
@ -891,7 +929,9 @@ namespace librbd {
old_format = ictx->old_format;
unknown_format = false;
id = ictx->id;
ictx->md_lock.Lock();
trim_image(ictx, 0, prog_ctx);
ictx->md_lock.Unlock();
close_image(ictx);
ldout(cct, 2) << "removing header..." << dendl;
@ -941,7 +981,9 @@ namespace librbd {
int resize_helper(ImageCtx *ictx, uint64_t size, ProgressContext& prog_ctx)
{
assert(ictx->md_lock.is_locked());
CephContext *cct = ictx->cct;
if (size == ictx->size) {
ldout(cct, 2) << "no change in size (" << ictx->size << " -> " << size
<< ")" << dendl;
@ -970,6 +1012,7 @@ namespace librbd {
r = cls_client::set_size(&(ictx->md_ctx), ictx->header_oid, size);
}
// TODO: remove this useless check
if (r == -ERANGE)
lderr(cct) << "operation might have conflicted with another client!"
<< dendl;
@ -993,7 +1036,7 @@ namespace librbd {
if (r < 0)
return r;
Mutex::Locker l(ictx->lock);
Mutex::Locker l(ictx->md_lock);
if (size < ictx->size && ictx->object_cacher) {
// need to invalidate since we're deleting objects, and
// ObjectCacher doesn't track non-existent objects
@ -1016,7 +1059,7 @@ namespace librbd {
return r;
bufferlist bl, bl2;
Mutex::Locker l(ictx->lock);
Mutex::Locker l(ictx->snap_lock);
for (map<string, SnapInfo>::iterator it = ictx->snaps_by_name.begin();
it != ictx->snaps_by_name.end(); ++it) {
snap_info_t info;
@ -1031,7 +1074,7 @@ namespace librbd {
int add_snap(ImageCtx *ictx, const char *snap_name)
{
assert(ictx->lock.is_locked());
assert(ictx->md_lock.is_locked());
uint64_t snap_id;
@ -1061,13 +1104,14 @@ namespace librbd {
int rm_snap(ImageCtx *ictx, const char *snap_name)
{
assert(ictx->lock.is_locked());
assert(ictx->md_lock.is_locked());
int r;
if (ictx->old_format) {
r = cls_client::old_snapshot_remove(&ictx->md_ctx,
ictx->header_oid, snap_name);
} else {
Mutex::Locker l(ictx->snap_lock);
r = cls_client::snapshot_remove(&ictx->md_ctx,
ictx->header_oid,
ictx->get_snap_id(snap_name));
@ -1091,7 +1135,7 @@ namespace librbd {
ictx->refresh_lock.Unlock();
if (needs_refresh) {
Mutex::Locker l(ictx->lock);
Mutex::Locker l(ictx->md_lock);
int r = ictx_refresh(ictx);
if (r < 0) {
@ -1104,6 +1148,8 @@ namespace librbd {
}
int refresh_parent(ImageCtx *ictx) {
assert(ictx->snap_lock.is_locked());
assert(ictx->parent_lock.is_locked());
// close the parent if it changed or this image no longer needs
// to read from it
int r;
@ -1137,7 +1183,7 @@ namespace librbd {
int ictx_refresh(ImageCtx *ictx)
{
CephContext *cct = ictx->cct;
assert(ictx->lock.is_locked());
assert(ictx->md_lock.is_locked());
bufferlist bl, bl2;
ldout(cct, 20) << "ictx_refresh " << ictx << dendl;
@ -1153,99 +1199,115 @@ namespace librbd {
vector<uint64_t> snap_sizes;
vector<uint64_t> snap_features;
vector<cls_client::parent_info> snap_parents;
if (ictx->old_format) {
r = read_header(ictx->md_ctx, ictx->header_oid, &ictx->header, NULL);
if (r < 0) {
lderr(cct) << "Error reading header: " << cpp_strerror(r) << dendl;
return r;
}
r = cls_client::old_snapshot_list(&ictx->md_ctx, ictx->header_oid,
&snap_names, &snap_sizes, &new_snapc);
if (r < 0) {
lderr(cct) << "Error listing snapshots: " << cpp_strerror(r) << dendl;
return r;
}
ictx->order = ictx->header.options.order;
ictx->size = ictx->header.image_size;
ictx->object_prefix = ictx->header.block_name;
} else {
do {
uint64_t incompatible_features;
r = cls_client::get_mutable_metadata(&ictx->md_ctx, ictx->header_oid,
&ictx->size, &ictx->features,
&incompatible_features,
&ictx->locks,
&ictx->exclusive_locked,
&new_snapc,
&ictx->parent_md);
if (r < 0) {
lderr(cct) << "Error reading mutable metadata: " << cpp_strerror(r)
<< dendl;
return r;
{
Mutex::Locker l(ictx->snap_lock);
{
Mutex::Locker l2(ictx->parent_lock);
if (ictx->old_format) {
r = read_header(ictx->md_ctx, ictx->header_oid, &ictx->header, NULL);
if (r < 0) {
lderr(cct) << "Error reading header: " << cpp_strerror(r) << dendl;
return r;
}
r = cls_client::old_snapshot_list(&ictx->md_ctx, ictx->header_oid,
&snap_names, &snap_sizes, &new_snapc);
if (r < 0) {
lderr(cct) << "Error listing snapshots: " << cpp_strerror(r) << dendl;
return r;
}
ictx->order = ictx->header.options.order;
ictx->size = ictx->header.image_size;
ictx->object_prefix = ictx->header.block_name;
} else {
do {
uint64_t incompatible_features;
r = cls_client::get_mutable_metadata(&ictx->md_ctx, ictx->header_oid,
&ictx->size, &ictx->features,
&incompatible_features,
&ictx->locks,
&ictx->exclusive_locked,
&new_snapc,
&ictx->parent_md);
if (r < 0) {
lderr(cct) << "Error reading mutable metadata: " << cpp_strerror(r)
<< dendl;
return r;
}
uint64_t unsupported = incompatible_features & ~RBD_FEATURES_ALL;
if (unsupported) {
lderr(ictx->cct) << "Image uses unsupported features: "
<< unsupported << dendl;
return -ENOSYS;
}
r = cls_client::snapshot_list(&(ictx->md_ctx), ictx->header_oid,
new_snapc.snaps, &snap_names,
&snap_sizes, &snap_features,
&snap_parents);
// -ENOENT here means we raced with snapshot deletion
if (r < 0 && r != -ENOENT) {
lderr(ictx->cct) << "snapc = " << new_snapc << dendl;
lderr(ictx->cct) << "Error listing snapshots: " << cpp_strerror(r)
<< dendl;
return r;
}
} while (r == -ENOENT);
}
uint64_t unsupported = incompatible_features & ~RBD_FEATURES_ALL;
if (unsupported) {
lderr(ictx->cct) << "Image uses unsupported features: "
<< unsupported << dendl;
return -ENOSYS;
}
r = cls_client::snapshot_list(&(ictx->md_ctx), ictx->header_oid,
new_snapc.snaps, &snap_names,
&snap_sizes, &snap_features,
&snap_parents);
// -ENOENT here means we raced with snapshot deletion
if (r < 0 && r != -ENOENT) {
lderr(ictx->cct) << "snapc = " << new_snapc << dendl;
lderr(ictx->cct) << "Error listing snapshots: " << cpp_strerror(r)
for (size_t i = 0; i < new_snapc.snaps.size(); ++i) {
uint64_t features = ictx->old_format ? 0 : snap_features[i];
cls_client::parent_info parent;
if (!ictx->old_format)
parent = snap_parents[i];
vector<snap_t>::const_iterator it =
find(ictx->snaps.begin(), ictx->snaps.end(), new_snapc.snaps[i].val);
if (it == ictx->snaps.end()) {
new_snap = true;
ldout(cct, 20) << "new snapshot id=" << new_snapc.snaps[i].val
<< " name=" << snap_names[i]
<< " size=" << snap_sizes[i]
<< " features=" << features
<< dendl;
return r;
}
}
} while (r == -ENOENT);
}
ictx->snaps.clear();
ictx->snaps_by_name.clear();
for (size_t i = 0; i < new_snapc.snaps.size(); ++i) {
uint64_t features = ictx->old_format ? 0 : snap_features[i];
cls_client::parent_info parent;
if (!ictx->old_format)
parent = snap_parents[i];
ictx->add_snap(snap_names[i], new_snapc.snaps[i].val,
snap_sizes[i], features, parent);
vector<snap_t>::const_iterator it =
find(ictx->snaps.begin(), ictx->snaps.end(), new_snapc.snaps[i].val);
if (it == ictx->snaps.end()) {
new_snap = true;
ldout(cct, 20) << "new snapshot id " << *it << " size " << snap_sizes[i]
<< dendl;
ictx->snaps.clear();
ictx->snaps_by_name.clear();
for (size_t i = 0; i < new_snapc.snaps.size(); ++i) {
uint64_t features = ictx->old_format ? 0 : snap_features[i];
cls_client::parent_info parent;
if (!ictx->old_format)
parent = snap_parents[i];
ictx->add_snap(snap_names[i], new_snapc.snaps[i].val,
snap_sizes[i], features, parent);
}
r = refresh_parent(ictx);
if (r < 0)
return r;
} // release parent_lock
if (new_snap) {
_flush(ictx);
}
}
if (new_snap) {
_flush(ictx);
}
if (!ictx->snapc.is_valid()) {
lderr(cct) << "image snap context is invalid!" << dendl;
return -EIO;
}
if (!ictx->snapc.is_valid()) {
lderr(cct) << "image snap context is invalid!" << dendl;
return -EIO;
}
ictx->snapc = new_snapc;
ictx->snapc = new_snapc;
if (ictx->snap_id != CEPH_NOSNAP &&
ictx->get_snap_id(ictx->snap_name) != ictx->snap_id) {
lderr(cct) << "tried to read from a snapshot that no longer exists: "
<< ictx->snap_name << dendl;
ictx->snap_exists = false;
}
if (ictx->snap_id != CEPH_NOSNAP &&
ictx->get_snap_id(ictx->snap_name) != ictx->snap_id) {
lderr(cct) << "tried to read from a snapshot that no longer exists: "
<< ictx->snap_name << dendl;
ictx->snap_exists = false;
}
r = refresh_parent(ictx);
if (r < 0)
return r;
ictx->data_ctx.selfmanaged_snap_set_write_ctx(ictx->snapc.seq, ictx->snaps);
ictx->data_ctx.selfmanaged_snap_set_write_ctx(ictx->snapc.seq, ictx->snaps);
} // release snap_lock
ictx->refresh_lock.Lock();
ictx->last_refresh = refresh_seq;
@ -1265,13 +1327,14 @@ namespace librbd {
if (r < 0)
return r;
Mutex::Locker l(ictx->md_lock);
Mutex::Locker l2(ictx->snap_lock);
if (!ictx->snap_exists)
return -ENOENT;
if (ictx->snap_id != CEPH_NOSNAP)
return -EROFS;
Mutex::Locker l(ictx->lock);
snap_t snap_id = ictx->get_snap_id(snap_name);
if (snap_id == CEPH_NOSNAP) {
lderr(cct) << "No such snapshot found." << dendl;
@ -1285,6 +1348,7 @@ namespace librbd {
uint64_t new_size = ictx->get_image_size(ictx->snap_id);
ictx->get_snap_size(snap_name, &new_size);
ldout(cct, 2) << "resizing to snapshot size..." << dendl;
NoOpProgressContext no_op;
r = resize_helper(ictx, new_size, no_op);
@ -1300,7 +1364,6 @@ namespace librbd {
return r;
}
ictx_refresh(ictx);
snap_t new_snap_id = ictx->get_snap_id(snap_name);
ldout(cct, 20) << "snap_id is " << ictx->snap_id << " new snap_id is "
<< new_snap_id << dendl;
@ -1337,7 +1400,11 @@ namespace librbd {
{
CephContext *cct = (CephContext *)dest_md_ctx.cct();
CopyProgressCtx cp(prog_ctx);
ictx->md_lock.Lock();
ictx->snap_lock.Lock();
uint64_t src_size = ictx->get_image_size(ictx->snap_id);
ictx->snap_lock.Unlock();
ictx->md_lock.Unlock();
int64_t r;
int order = ictx->order;
@ -1375,7 +1442,7 @@ namespace librbd {
// snapshot and the user is trying to fix that
ictx_check(ictx);
Mutex::Locker l(ictx->lock);
Mutex::Locker l(ictx->snap_lock);
if (snap_name) {
int r = ictx->snap_set(snap_name);
if (r < 0) {
@ -1399,13 +1466,14 @@ namespace librbd {
if (r < 0)
return r;
ictx->lock.Lock();
ictx->md_lock.Lock();
r = ictx_refresh(ictx);
ictx->lock.Unlock();
ictx->md_lock.Unlock();
if (r < 0)
return r;
if (ictx->snap_name.length()) {
Mutex::Locker l(ictx->snap_lock);
r = ictx->snap_set(ictx->snap_name);
if (r < 0)
return r;
@ -1491,7 +1559,9 @@ namespace librbd {
return r;
}
Mutex::Locker l(ictx->lock);
Mutex::Locker l(ictx->md_lock);
Mutex::Locker l2(ictx->snap_lock);
Mutex::Locker l3(ictx->parent_lock);
// can't flatten a non-clone
if (ictx->parent_md.pool_id == -1) {
lderr(ictx->cct) << "image has no parent" << dendl;
@ -1548,7 +1618,7 @@ namespace librbd {
if (r < 0)
return r;
Mutex::Locker locker(ictx->lock);
Mutex::Locker locker(ictx->md_lock);
locks = ictx->locks;
exclusive = ictx->exclusive_locked;
return 0;
@ -1608,11 +1678,9 @@ namespace librbd {
return r;
int64_t total_read = 0;
ictx->lock.Lock();
uint64_t start_block = get_block_num(ictx->order, off);
uint64_t end_block = get_block_num(ictx->order, off + len - 1);
uint64_t block_size = get_block_size(ictx->order);
ictx->lock.Unlock();
uint64_t left = len;
start_time = ceph_clock_now(ictx->cct);
@ -1822,20 +1890,20 @@ namespace librbd {
return buf_len;
}
void rados_cb(rados_completion_t c, void *arg)
void rados_req_cb(rados_completion_t c, void *arg)
{
AioBlockCompletion *block_completion = (AioBlockCompletion *)arg;
block_completion->finish(rados_aio_get_return_value(c));
delete block_completion;
AioRequest *req = reinterpret_cast<AioRequest *>(arg);
req->complete(rados_aio_get_return_value(c));
}
int check_io(ImageCtx *ictx, uint64_t off, uint64_t len)
{
ictx->lock.Lock();
ictx->md_lock.Lock();
ictx->snap_lock.Lock();
uint64_t image_size = ictx->get_image_size(ictx->snap_id);
bool snap_exists = ictx->snap_exists;
ictx->lock.Unlock();
ictx->snap_lock.Unlock();
ictx->md_lock.Unlock();
if (!snap_exists)
return -ENOENT;
@ -1879,7 +1947,7 @@ namespace librbd {
{
CephContext *cct = ictx->cct;
ldout(cct, 20) << "aio_write " << ictx << " off = " << off << " len = "
<< len << dendl;
<< len << " buf = " << &buf << dendl;
if (!len)
return 0;
@ -1889,44 +1957,51 @@ namespace librbd {
return r;
size_t total_write = 0;
ictx->lock.Lock();
uint64_t start_block = get_block_num(ictx->order, off);
uint64_t end_block = get_block_num(ictx->order, off + len - 1);
uint64_t block_size = get_block_size(ictx->order);
snapid_t snap = ictx->snap_id;
ictx->lock.Unlock();
ictx->snap_lock.Lock();
snapid_t snap_id = ictx->snap_id;
::SnapContext snapc = ictx->snapc;
ictx->parent_lock.Lock();
int64_t parent_pool_id = ictx->get_parent_pool_id(ictx->snap_id);
uint64_t overlap = 0;
ictx->get_parent_overlap(ictx->snap_id, &overlap);
ictx->parent_lock.Unlock();
ictx->snap_lock.Unlock();
uint64_t left = len;
r = check_io(ictx, off, len);
if (r < 0)
return r;
if (snap != CEPH_NOSNAP)
if (snap_id != CEPH_NOSNAP)
return -EROFS;
c->get();
c->init_time(ictx, AIO_TYPE_WRITE);
for (uint64_t i = start_block; i <= end_block; i++) {
ictx->lock.Lock();
string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
uint64_t block_ofs = get_block_ofs(ictx->order, off + total_write);
ictx->lock.Unlock();
uint64_t total_off = off + total_write;
uint64_t block_ofs = get_block_ofs(ictx->order, total_off);
uint64_t write_len = min(block_size - block_ofs, left);
bufferlist bl;
bl.append(buf + total_write, write_len);
if (ictx->object_cacher) {
// may block
ictx->write_to_cache(oid, bl, write_len, block_ofs);
} else {
AioBlockCompletion *block_completion =
new AioBlockCompletion(cct, c, off, len, NULL);
c->add_block_completion(block_completion);
librados::AioCompletion *rados_completion =
Rados::aio_create_completion(block_completion, NULL, rados_cb);
r = ictx->data_ctx.aio_write(oid, rados_completion,
bl, write_len, block_ofs);
rados_completion->release();
C_AioWrite *req_comp = new C_AioWrite(cct, c);
bool parent_exists = has_parent(parent_pool_id, total_off - block_ofs, overlap);
ldout(ictx->cct, 20) << "has_parent(pool=" << parent_pool_id
<< ", off=" << total_off
<< ", overlap=" << overlap << ") = "
<< parent_exists << dendl;
AioWrite *req = new AioWrite(ictx, oid, total_off, bl, snapc, snap_id,
parent_exists, req_comp);
c->add_request();
r = req->send();
if (r < 0)
goto done;
}
@ -1934,7 +2009,7 @@ namespace librbd {
left -= write_len;
}
done:
c->finish_adding_completions();
c->finish_adding_requests();
c->put();
ictx->perfcounter->inc(l_librbd_aio_wr);
@ -1959,11 +2034,18 @@ namespace librbd {
// TODO: check for snap
size_t total_write = 0;
ictx->lock.Lock();
uint64_t start_block = get_block_num(ictx->order, off);
uint64_t end_block = get_block_num(ictx->order, off + len - 1);
uint64_t block_size = get_block_size(ictx->order);
ictx->lock.Unlock();
ictx->snap_lock.Lock();
snapid_t snap_id = ictx->snap_id;
::SnapContext snapc = ictx->snapc;
ictx->parent_lock.Lock();
int64_t parent_pool_id = ictx->get_parent_pool_id(ictx->snap_id);
uint64_t overlap = 0;
ictx->get_parent_overlap(ictx->snap_id, &overlap);
ictx->parent_lock.Unlock();
ictx->snap_lock.Unlock();
uint64_t left = len;
r = check_io(ictx, off, len);
@ -1977,14 +2059,9 @@ namespace librbd {
c->get();
c->init_time(ictx, AIO_TYPE_DISCARD);
for (uint64_t i = start_block; i <= end_block; i++) {
ictx->lock.Lock();
string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
uint64_t block_ofs = get_block_ofs(ictx->order, off + total_write);
ictx->lock.Unlock();
AioBlockCompletion *block_completion =
new AioBlockCompletion(cct, c, off, len, NULL);
uint64_t total_off = off + total_write;
uint64_t block_ofs = get_block_ofs(ictx->order, total_off);;
uint64_t write_len = min(block_size - block_ofs, left);
if (ictx->object_cacher) {
@ -1992,20 +2069,23 @@ namespace librbd {
v.back().oloc.pool = ictx->data_ctx.get_id();
}
if (block_ofs == 0 && write_len == block_size)
block_completion->write_op.remove();
else if (block_ofs + write_len == block_size)
block_completion->write_op.truncate(block_ofs);
else
block_completion->write_op.zero(block_ofs, write_len);
C_AioWrite *req_comp = new C_AioWrite(cct, c);
AbstractWrite *req;
c->add_request();
c->add_block_completion(block_completion);
librados::AioCompletion *rados_completion =
Rados::aio_create_completion(block_completion, NULL, rados_cb);
bool parent_exists = has_parent(parent_pool_id, total_off - block_ofs, overlap);
if (block_ofs == 0 && write_len == block_size) {
req = new AioRemove(ictx, oid, total_off, snapc, snap_id,
parent_exists, req_comp);
} else if (block_ofs + write_len == block_size) {
req = new AioTruncate(ictx, oid, total_off, snapc, snap_id,
parent_exists, req_comp);
} else {
req = new AioZero(ictx, oid, total_off, write_len, snapc, snap_id,
parent_exists, req_comp);
}
r = ictx->data_ctx.aio_operate(oid, rados_completion,
&block_completion->write_op);
rados_completion->release();
r = req->send();
if (r < 0)
goto done;
total_write += write_len;
@ -2016,7 +2096,7 @@ namespace librbd {
if (ictx->object_cacher)
ictx->object_cacher->discard_set(ictx->object_set, v);
c->finish_adding_completions();
c->finish_adding_requests();
c->put();
ictx->perfcounter->inc(l_librbd_aio_discard);
@ -2026,11 +2106,11 @@ namespace librbd {
return r;
}
void rados_aio_sparse_read_cb(rados_completion_t c, void *arg)
void rbd_req_cb(completion_t cb, void *arg)
{
AioBlockCompletion *block_completion = (AioBlockCompletion *)arg;
block_completion->finish(rados_aio_get_return_value(c));
delete block_completion;
AioRequest *req = reinterpret_cast<AioRequest *>(arg);
AioCompletion *comp = reinterpret_cast<AioCompletion *>(cb);
req->complete(comp->get_return_value());
}
int aio_read(ImageCtx *ictx, uint64_t off, size_t len,
@ -2050,43 +2130,39 @@ namespace librbd {
int64_t ret;
int total_read = 0;
ictx->lock.Lock();
uint64_t start_block = get_block_num(ictx->order, off);
uint64_t end_block = get_block_num(ictx->order, off + len - 1);
uint64_t block_size = get_block_size(ictx->order);
ictx->lock.Unlock();
ictx->snap_lock.Lock();
snap_t snap_id = ictx->snap_id;
ictx->snap_lock.Unlock();
uint64_t left = len;
c->get();
c->init_time(ictx, AIO_TYPE_READ);
for (uint64_t i = start_block; i <= end_block; i++) {
bufferlist bl;
ictx->lock.Lock();
string oid = get_block_oid(ictx->object_prefix, i, ictx->old_format);
uint64_t block_ofs = get_block_ofs(ictx->order, off + total_read);
ictx->lock.Unlock();
uint64_t read_len = min(block_size - block_ofs, left);
map<uint64_t,uint64_t> m;
map<uint64_t,uint64_t>::iterator iter;
AioBlockCompletion *block_completion =
new AioBlockCompletion(ictx->cct, c, block_ofs, read_len, buf + total_read);
c->add_block_completion(block_completion);
C_AioRead *req_comp = new C_AioRead(ictx->cct, c, buf + total_read);
AioRead *req = new AioRead(ictx, oid, off + total_read,
read_len, snap_id, true, req_comp);
req_comp->set_req(req);
c->add_request();
if (ictx->object_cacher) {
block_completion->m[block_ofs] = read_len;
ictx->aio_read_from_cache(oid, &block_completion->data_bl,
read_len, block_ofs, block_completion);
req->ext_map()[block_ofs] = read_len;
// cache has already handled possible reading from parent, so
// this AioRead is just used to pass data to the
// AioCompletion. The AioRead isn't being used as a
// completion, so wrap the completion in a C_CacheRead to
// delete it
C_CacheRead *cache_comp = new C_CacheRead(req_comp, req);
ictx->aio_read_from_cache(oid, &req->data(),
read_len, block_ofs, cache_comp);
} else {
librados::AioCompletion *rados_completion =
Rados::aio_create_completion(block_completion,
rados_aio_sparse_read_cb, NULL);
r = ictx->data_ctx.aio_sparse_read(oid, rados_completion,
&block_completion->m,
&block_completion->data_bl,
read_len, block_ofs);
rados_completion->release();
r = req->send();
if (r < 0 && r == -ENOENT)
r = 0;
if (r < 0) {
@ -2100,7 +2176,7 @@ namespace librbd {
}
ret = total_read;
done:
c->finish_adding_completions();
c->finish_adding_requests();
c->put();
ictx->perfcounter->inc(l_librbd_aio_rd);

View File

@ -72,6 +72,8 @@ namespace librbd {
int detect_format(librados::IoCtx &io_ctx, const std::string &name,
bool *old_format, uint64_t *size);
bool has_parent(int64_t parent_pool_id, uint64_t off, uint64_t overlap);
int snap_set(ImageCtx *ictx, const char *snap_name);
int list(librados::IoCtx& io_ctx, std::vector<std::string>& names);
int create(librados::IoCtx& io_ctx, const char *imgname, uint64_t size,
@ -146,6 +148,8 @@ namespace librbd {
void image_info(const ImageCtx *ictx, image_info_t& info, size_t info_size);
std::string get_block_oid(const std::string &object_prefix, uint64_t num,
bool old_format);
uint64_t offset_of_object(const string &oid, const string &object_prefix,
uint8_t order);
uint64_t get_max_block(uint64_t size, uint8_t obj_order);
uint64_t get_block_size(uint8_t order);
uint64_t get_block_num(uint8_t order, uint64_t ofs);
@ -185,8 +189,8 @@ namespace librbd {
// raw callbacks
int simple_read_cb(uint64_t ofs, size_t len, const char *buf, void *arg);
void rados_cb(rados_completion_t cb, void *arg);
void rados_aio_sparse_read_cb(rados_completion_t cb, void *arg);
void rados_req_cb(rados_completion_t cb, void *arg);
void rbd_req_cb(completion_t cb, void *arg);
}
#endif