Merge pull request #9023 from dillaman/wip-15791

librbd: journal IO error results in failed assertion in AioCompletion

Reviewed-by: Mykola Golub <mgolub@mirantis.com>
This commit is contained in:
Mykola Golub 2016-06-08 18:22:17 +03:00
commit 5568107d56
13 changed files with 197 additions and 152 deletions

View File

@ -38,8 +38,12 @@ namespace librbd {
return 0;
}
void AioCompletion::finalize(CephContext *cct, ssize_t rval)
void AioCompletion::finalize(ssize_t rval)
{
assert(lock.is_locked());
assert(ictx != nullptr);
CephContext *cct = ictx->cct;
ldout(cct, 20) << this << " " << __func__ << ": r=" << rval << ", "
<< "read_buf=" << reinterpret_cast<void*>(read_buf) << ", "
<< "real_bl=" << reinterpret_cast<void*>(read_bl) << dendl;
@ -67,10 +71,13 @@ namespace librbd {
}
}
void AioCompletion::complete(CephContext *cct) {
void AioCompletion::complete() {
assert(lock.is_locked());
assert(ictx != nullptr);
CephContext *cct = ictx->cct;
tracepoint(librbd, aio_complete_enter, this, rval);
utime_t elapsed;
assert(lock.is_locked());
elapsed = ceph_clock_now(cct) - start_time;
switch (aio_type) {
case AIO_TYPE_OPEN:
@ -120,47 +127,57 @@ namespace librbd {
}
void AioCompletion::init_time(ImageCtx *i, aio_type_t t) {
if (ictx == NULL) {
Mutex::Locker locker(lock);
if (ictx == nullptr) {
ictx = i;
aio_type = t;
start_time = ceph_clock_now(ictx->cct);
}
}
void AioCompletion::start_op(ImageCtx *i, aio_type_t t) {
init_time(i, t);
void AioCompletion::start_op() {
Mutex::Locker locker(lock);
if (state == STATE_PENDING && !async_op.started()) {
assert(ictx != nullptr);
assert(!async_op.started());
if (state == STATE_PENDING && aio_type != AIO_TYPE_FLUSH) {
async_op.start_op(*ictx);
}
}
void AioCompletion::fail(CephContext *cct, int r)
void AioCompletion::fail(int r)
{
lock.Lock();
assert(ictx != nullptr);
CephContext *cct = ictx->cct;
lderr(cct) << this << " " << __func__ << ": " << cpp_strerror(r)
<< dendl;
lock.Lock();
assert(pending_count == 0);
rval = r;
complete(cct);
complete();
put_unlock();
}
void AioCompletion::set_request_count(CephContext *cct, uint32_t count) {
ldout(cct, 20) << this << " " << __func__ << ": pending=" << count << dendl;
void AioCompletion::set_request_count(uint32_t count) {
lock.Lock();
assert(ictx != nullptr);
CephContext *cct = ictx->cct;
ldout(cct, 20) << this << " " << __func__ << ": pending=" << count << dendl;
assert(pending_count == 0);
pending_count = count;
lock.Unlock();
// if no pending requests, completion will fire now
unblock(cct);
unblock();
}
void AioCompletion::complete_request(CephContext *cct, ssize_t r)
void AioCompletion::complete_request(ssize_t r)
{
lock.Lock();
assert(ictx != nullptr);
CephContext *cct = ictx->cct;
if (rval >= 0) {
if (r < 0 && r != -EEXIST)
rval = r;
@ -173,8 +190,8 @@ namespace librbd {
ldout(cct, 20) << this << " " << __func__ << ": cb=" << complete_cb << ", "
<< "pending=" << pending_count << dendl;
if (!count && blockers == 0) {
finalize(cct, rval);
complete(cct);
finalize(rval);
complete();
}
put_unlock();
}
@ -207,24 +224,27 @@ namespace librbd {
void C_AioRead::finish(int r)
{
ldout(m_cct, 10) << "C_AioRead::finish() " << this << " r = " << r << dendl;
m_completion->lock.Lock();
CephContext *cct = m_completion->ictx->cct;
ldout(cct, 10) << "C_AioRead::finish() " << this << " r = " << r << dendl;
if (r >= 0 || r == -ENOENT) { // this was a sparse_read operation
ldout(m_cct, 10) << " got " << m_req->m_ext_map
<< " for " << m_req->m_buffer_extents
<< " bl " << m_req->data().length() << dendl;
ldout(cct, 10) << " got " << m_req->m_ext_map
<< " for " << m_req->m_buffer_extents
<< " bl " << m_req->data().length() << dendl;
// reads from the parent don't populate the m_ext_map and the overlap
// may not be the full buffer. compensate here by filling in m_ext_map
// with the read extent when it is empty.
if (m_req->m_ext_map.empty())
m_req->m_ext_map[m_req->m_object_off] = m_req->data().length();
m_completion->lock.Lock();
m_completion->destriper.add_partial_sparse_result(
m_cct, m_req->data(), m_req->m_ext_map, m_req->m_object_off,
cct, m_req->data(), m_req->m_ext_map, m_req->m_object_off,
m_req->m_buffer_extents);
m_completion->lock.Unlock();
r = m_req->m_object_len;
}
m_completion->lock.Unlock();
C_AioRequest::finish(r);
}

View File

@ -115,27 +115,27 @@ namespace librbd {
int wait_for_complete();
void finalize(CephContext *cct, ssize_t rval);
void finalize(ssize_t rval);
void init_time(ImageCtx *i, aio_type_t t);
void start_op(ImageCtx *i, aio_type_t t);
void fail(CephContext *cct, int r);
void start_op();
void fail(int r);
void complete(CephContext *cct);
void complete();
void set_complete_cb(void *cb_arg, callback_t cb) {
complete_cb = cb;
complete_arg = cb_arg;
}
void set_request_count(CephContext *cct, uint32_t num);
void set_request_count(uint32_t num);
void add_request() {
lock.Lock();
assert(pending_count > 0);
lock.Unlock();
get();
}
void complete_request(CephContext *cct, ssize_t r);
void complete_request(ssize_t r);
void associate_journal_event(uint64_t tid);
@ -183,13 +183,13 @@ namespace librbd {
Mutex::Locker l(lock);
++blockers;
}
void unblock(CephContext *cct) {
void unblock() {
Mutex::Locker l(lock);
assert(blockers > 0);
--blockers;
if (pending_count == 0 && blockers == 0) {
finalize(cct, rval);
complete(cct);
finalize(rval);
complete();
}
}
@ -205,23 +205,21 @@ namespace librbd {
class C_AioRequest : public Context {
public:
C_AioRequest(CephContext *cct, AioCompletion *completion)
: m_cct(cct), m_completion(completion) {
C_AioRequest(AioCompletion *completion) : m_completion(completion) {
m_completion->add_request();
}
virtual ~C_AioRequest() {}
virtual void finish(int r) {
m_completion->complete_request(m_cct, r);
m_completion->complete_request(r);
}
protected:
CephContext *m_cct;
AioCompletion *m_completion;
};
class C_AioRead : public C_AioRequest {
public:
C_AioRead(CephContext *cct, AioCompletion *completion)
: C_AioRequest(cct, completion), m_req(NULL) {
C_AioRead(AioCompletion *completion)
: C_AioRequest(completion), m_req(nullptr) {
}
virtual ~C_AioRead() {}
virtual void finish(int r);

View File

@ -48,7 +48,7 @@ struct C_DiscardJournalCommit : public Context {
Mutex::Locker cache_locker(image_ctx.cache_lock);
image_ctx.object_cacher->discard_set(image_ctx.object_set, object_extents);
aio_comp->complete_request(cct, r);
aio_comp->complete_request(r);
}
};
@ -71,7 +71,7 @@ struct C_FlushJournalCommit : public Context {
CephContext *cct = image_ctx.cct;
ldout(cct, 20) << this << " C_FlushJournalCommit: journal committed"
<< dendl;
aio_comp->complete_request(cct, r);
aio_comp->complete_request(r);
}
};
@ -82,7 +82,10 @@ void AioImageRequest<I>::aio_read(
I *ictx, AioCompletion *c,
const std::vector<std::pair<uint64_t,uint64_t> > &extents,
char *buf, bufferlist *pbl, int op_flags) {
c->init_time(ictx, librbd::AIO_TYPE_READ);
AioImageRead req(*ictx, c, extents, buf, pbl, op_flags);
req.start_op();
req.send();
}
@ -90,7 +93,10 @@ template <typename I>
void AioImageRequest<I>::aio_read(I *ictx, AioCompletion *c,
uint64_t off, size_t len, char *buf,
bufferlist *pbl, int op_flags) {
c->init_time(ictx, librbd::AIO_TYPE_READ);
AioImageRead req(*ictx, c, off, len, buf, pbl, op_flags);
req.start_op();
req.send();
}
@ -98,20 +104,29 @@ template <typename I>
void AioImageRequest<I>::aio_write(I *ictx, AioCompletion *c,
uint64_t off, size_t len, const char *buf,
int op_flags) {
c->init_time(ictx, librbd::AIO_TYPE_WRITE);
AioImageWrite req(*ictx, c, off, len, buf, op_flags);
req.start_op();
req.send();
}
template <typename I>
void AioImageRequest<I>::aio_discard(I *ictx, AioCompletion *c,
uint64_t off, uint64_t len) {
c->init_time(ictx, librbd::AIO_TYPE_DISCARD);
AioImageDiscard req(*ictx, c, off, len);
req.start_op();
req.send();
}
template <typename I>
void AioImageRequest<I>::aio_flush(I *ictx, AioCompletion *c) {
c->init_time(ictx, librbd::AIO_TYPE_FLUSH);
AioImageFlush req(*ictx, c);
req.start_op();
req.send();
}
@ -130,7 +145,7 @@ void AioImageRequest<I>::send() {
template <typename I>
void AioImageRequest<I>::fail(int r) {
m_aio_comp->get();
m_aio_comp->fail(m_image_ctx.cct, r);
m_aio_comp->fail(r);
}
void AioImageRead::send_request() {
@ -157,7 +172,7 @@ void AioImageRead::send_request() {
uint64_t len = p->second;
int r = clip_io(&m_image_ctx, p->first, &len);
if (r < 0) {
m_aio_comp->fail(cct, r);
m_aio_comp->fail(r);
return;
}
if (len == 0) {
@ -169,8 +184,6 @@ void AioImageRead::send_request() {
object_extents, buffer_ofs);
buffer_ofs += len;
}
m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_READ);
}
m_aio_comp->read_buf = m_buf;
@ -182,7 +195,7 @@ void AioImageRead::send_request() {
for (auto &object_extent : object_extents) {
request_count += object_extent.second.size();
}
m_aio_comp->set_request_count(cct, request_count);
m_aio_comp->set_request_count(request_count);
// issue the requests
for (auto &object_extent : object_extents) {
@ -191,7 +204,7 @@ void AioImageRead::send_request() {
<< extent.length << " from " << extent.buffer_extents
<< dendl;
C_AioRead *req_comp = new C_AioRead(cct, m_aio_comp);
C_AioRead *req_comp = new C_AioRead(m_aio_comp);
AioObjectRead *req = new AioObjectRead(&m_image_ctx, extent.oid.name,
extent.objectno, extent.offset,
extent.length,
@ -231,18 +244,17 @@ void AbstractAioImageWrite::send_request() {
// pending async operation
RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) {
m_aio_comp->fail(cct, -EROFS);
m_aio_comp->fail(-EROFS);
return;
}
int r = clip_io(&m_image_ctx, m_off, &clip_len);
if (r < 0) {
m_aio_comp->fail(cct, r);
m_aio_comp->fail(r);
return;
}
snapc = m_image_ctx.snapc;
m_aio_comp->start_op(&m_image_ctx, get_aio_type());
// map to object extents
if (clip_len > 0) {
@ -258,7 +270,7 @@ void AbstractAioImageWrite::send_request() {
if (!object_extents.empty()) {
uint64_t journal_tid = 0;
m_aio_comp->set_request_count(
cct, object_extents.size() + get_cache_request_count(journaling));
object_extents.size() + get_cache_request_count(journaling));
AioObjectRequests requests;
send_object_requests(object_extents, snapc,
@ -275,7 +287,7 @@ void AbstractAioImageWrite::send_request() {
}
} else {
// no IO to perform -- fire completion
m_aio_comp->unblock(cct);
m_aio_comp->unblock();
}
update_stats(clip_len);
@ -291,7 +303,7 @@ void AbstractAioImageWrite::send_object_requests(
p != object_extents.end(); ++p) {
ldout(cct, 20) << " oid " << p->oid << " " << p->offset << "~" << p->length
<< " from " << p->buffer_extents << dendl;
C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
C_AioRequest *req_comp = new C_AioRequest(m_aio_comp);
AioObjectRequest *request = create_object_request(*p, snapc, req_comp);
// if journaling, stash the request for later; otherwise send
@ -318,9 +330,8 @@ uint64_t AioImageWrite::append_journal_event(
bufferlist bl;
bl.append(m_buf, m_len);
uint64_t tid = m_image_ctx.journal->append_write_event(m_aio_comp, m_off,
m_len, bl, requests,
synchronous);
uint64_t tid = m_image_ctx.journal->append_write_event(m_off, m_len, bl,
requests, synchronous);
if (m_image_ctx.object_cacher == NULL) {
m_aio_comp->associate_journal_event(tid);
}
@ -329,7 +340,6 @@ uint64_t AioImageWrite::append_journal_event(
void AioImageWrite::send_cache_requests(const ObjectExtents &object_extents,
uint64_t journal_tid) {
CephContext *cct = m_image_ctx.cct;
for (ObjectExtents::const_iterator p = object_extents.begin();
p != object_extents.end(); ++p) {
const ObjectExtent &object_extent = *p;
@ -337,7 +347,7 @@ void AioImageWrite::send_cache_requests(const ObjectExtents &object_extents,
bufferlist bl;
assemble_extent(object_extent, &bl);
C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
C_AioRequest *req_comp = new C_AioRequest(m_aio_comp);
m_image_ctx.write_to_cache(object_extent.oid, bl, object_extent.length,
object_extent.offset, req_comp, m_op_flags,
journal_tid);
@ -378,8 +388,7 @@ void AioImageWrite::update_stats(size_t length) {
uint64_t AioImageDiscard::append_journal_event(
const AioObjectRequests &requests, bool synchronous) {
journal::EventEntry event_entry(journal::AioDiscardEvent(m_off, m_len));
uint64_t tid = m_image_ctx.journal->append_io_event(m_aio_comp,
std::move(event_entry),
uint64_t tid = m_image_ctx.journal->append_io_event(std::move(event_entry),
requests, m_off, m_len,
synchronous);
m_aio_comp->associate_journal_event(tid);
@ -438,8 +447,6 @@ void AioImageDiscard::update_stats(size_t length) {
}
void AioImageFlush::send_request() {
CephContext *cct = m_image_ctx.cct;
bool journaling = false;
{
RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
@ -447,12 +454,12 @@ void AioImageFlush::send_request() {
!m_image_ctx.journal->is_journal_replaying());
}
m_aio_comp->set_request_count(cct, journaling ? 2 : 1);
m_aio_comp->set_request_count(journaling ? 2 : 1);
if (journaling) {
// in-flight ops are flushed prior to closing the journal
uint64_t journal_tid = m_image_ctx.journal->append_io_event(
m_aio_comp, journal::EventEntry(journal::AioFlushEvent()),
journal::EventEntry(journal::AioFlushEvent()),
AioObjectRequests(), 0, 0, false);
C_FlushJournalCommit *ctx = new C_FlushJournalCommit(m_image_ctx,
@ -462,10 +469,9 @@ void AioImageFlush::send_request() {
m_aio_comp->associate_journal_event(journal_tid);
}
C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
C_AioRequest *req_comp = new C_AioRequest(m_aio_comp);
m_image_ctx.flush(req_comp);
m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_FLUSH);
m_aio_comp->put();
m_image_ctx.perfcounter->inc(l_librbd_aio_flush);

View File

@ -40,6 +40,10 @@ public:
return false;
}
void start_op() {
m_aio_comp->start_op();
}
void send();
void fail(int r);
@ -106,8 +110,6 @@ protected:
m_synchronous(false) {
}
virtual aio_type_t get_aio_type() const = 0;
virtual void send_request();
virtual uint32_t get_cache_request_count(bool journaling) const {
@ -140,9 +142,6 @@ public:
}
protected:
virtual aio_type_t get_aio_type() const {
return AIO_TYPE_WRITE;
}
virtual const char *get_request_type() const {
return "aio_write";
}
@ -175,9 +174,6 @@ public:
}
protected:
virtual aio_type_t get_aio_type() const {
return AIO_TYPE_DISCARD;
}
virtual const char *get_request_type() const {
return "aio_discard";
}

View File

@ -341,6 +341,8 @@ void *AioImageRequestWQ::_void_dequeue() {
get_pool_lock().Lock();
return nullptr;
}
item->start_op();
return item;
}
@ -398,8 +400,7 @@ int AioImageRequestWQ::start_in_flight_op(AioCompletion *c) {
CephContext *cct = m_image_ctx.cct;
lderr(cct) << "IO received on closed image" << dendl;
c->get();
c->fail(cct, -ESHUTDOWN);
c->fail(-ESHUTDOWN);
return false;
}

View File

@ -173,7 +173,7 @@ namespace librbd {
// release reference to the parent read completion. this request
// might be completed after unblock is invoked.
AioCompletion *parent_completion = m_parent_completion;
parent_completion->unblock(m_ictx->cct);
parent_completion->unblock();
parent_completion->put();
}
}

View File

@ -2,7 +2,6 @@
// vim: ts=8 sw=2 smarttab
#include "librbd/Journal.h"
#include "librbd/AioCompletion.h"
#include "librbd/AioImageRequestWQ.h"
#include "librbd/AioObjectRequest.h"
#include "librbd/ExclusiveLock.h"
@ -607,6 +606,7 @@ bool Journal<I>::is_journal_replaying() const {
Mutex::Locker locker(m_lock);
return (m_state == STATE_REPLAYING ||
m_state == STATE_FLUSHING_REPLAY ||
m_state == STATE_FLUSHING_RESTART ||
m_state == STATE_RESTARTING_REPLAY);
}
@ -802,8 +802,7 @@ void Journal<I>::flush_commit_position(Context *on_finish) {
}
template <typename I>
uint64_t Journal<I>::append_write_event(AioCompletion *aio_comp,
uint64_t offset, size_t length,
uint64_t Journal<I>::append_write_event(uint64_t offset, size_t length,
const bufferlist &bl,
const AioObjectRequests &requests,
bool flush_entry) {
@ -833,13 +832,12 @@ uint64_t Journal<I>::append_write_event(AioCompletion *aio_comp,
bytes_remaining -= event_length;
} while (bytes_remaining > 0);
return append_io_events(aio_comp, journal::EVENT_TYPE_AIO_WRITE, bufferlists,
requests, offset, length, flush_entry);
return append_io_events(journal::EVENT_TYPE_AIO_WRITE, bufferlists, requests,
offset, length, flush_entry);
}
template <typename I>
uint64_t Journal<I>::append_io_event(AioCompletion *aio_comp,
journal::EventEntry &&event_entry,
uint64_t Journal<I>::append_io_event(journal::EventEntry &&event_entry,
const AioObjectRequests &requests,
uint64_t offset, size_t length,
bool flush_entry) {
@ -847,13 +845,12 @@ uint64_t Journal<I>::append_io_event(AioCompletion *aio_comp,
bufferlist bl;
::encode(event_entry, bl);
return append_io_events(aio_comp, event_entry.get_event_type(), {bl},
requests, offset, length, flush_entry);
return append_io_events(event_entry.get_event_type(), {bl}, requests, offset,
length, flush_entry);
}
template <typename I>
uint64_t Journal<I>::append_io_events(AioCompletion *aio_comp,
journal::EventType event_type,
uint64_t Journal<I>::append_io_events(journal::EventType event_type,
const Bufferlists &bufferlists,
const AioObjectRequests &requests,
uint64_t offset, size_t length,
@ -875,7 +872,7 @@ uint64_t Journal<I>::append_io_events(AioCompletion *aio_comp,
assert(bl.length() <= m_max_append_size);
futures.push_back(m_journaler->append(m_tag_tid, bl));
}
m_events[tid] = Event(futures, aio_comp, requests, offset, length);
m_events[tid] = Event(futures, requests, offset, length);
}
CephContext *cct = m_image_ctx.cct;
@ -968,6 +965,10 @@ void Journal<I>::append_op_event(uint64_t op_tid,
}
on_safe = create_async_context_callback(m_image_ctx, on_safe);
on_safe = new FunctionContext([this, on_safe](int r) {
// ensure all committed IO before this op is committed
m_journaler->flush_commit_position(on_safe);
});
future.flush(on_safe);
CephContext *cct = m_image_ctx.cct;
@ -1349,6 +1350,10 @@ void Journal<I>::handle_replay_process_safe(ReplayEntry replay_entry, int r) {
CephContext *cct = m_image_ctx.cct;
m_lock.Lock();
assert(m_state == STATE_REPLAYING ||
m_state == STATE_FLUSHING_RESTART ||
m_state == STATE_FLUSHING_REPLAY);
ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
if (r < 0) {
lderr(cct) << "failed to commit journal event to disk: " << cpp_strerror(r)
@ -1382,8 +1387,8 @@ void Journal<I>::handle_replay_process_safe(ReplayEntry replay_entry, int r) {
} else {
// only commit the entry if written successfully
m_journaler->committed(replay_entry);
m_lock.Unlock();
}
m_lock.Unlock();
}
template <typename I>
@ -1477,7 +1482,6 @@ void Journal<I>::handle_io_event_safe(int r, uint64_t tid) {
lderr(cct) << "failed to commit IO event: " << cpp_strerror(r) << dendl;
}
AioCompletion *aio_comp;
AioObjectRequests aio_object_requests;
Contexts on_safe_contexts;
{
@ -1486,7 +1490,6 @@ void Journal<I>::handle_io_event_safe(int r, uint64_t tid) {
assert(it != m_events.end());
Event &event = it->second;
aio_comp = event.aio_comp;
aio_object_requests.swap(event.aio_object_requests);
on_safe_contexts.swap(event.on_safe_contexts);
@ -1507,15 +1510,14 @@ void Journal<I>::handle_io_event_safe(int r, uint64_t tid) {
}
ldout(cct, 20) << "completing tid=" << tid << dendl;
if (r < 0) {
// don't send aio requests if the journal fails -- bubble error up
aio_comp->fail(cct, r);
} else {
// send any waiting aio requests now that journal entry is safe
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
for (AioObjectRequests::iterator it = aio_object_requests.begin();
it != aio_object_requests.end(); ++it) {
for (AioObjectRequests::iterator it = aio_object_requests.begin();
it != aio_object_requests.end(); ++it) {
if (r < 0) {
// don't send aio requests if the journal fails -- bubble error up
(*it)->complete(r);
} else {
// send any waiting aio requests now that journal entry is safe
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
(*it)->send();
}
}

View File

@ -30,7 +30,6 @@ class Journaler;
namespace librbd {
class AioCompletion;
class AioObjectRequest;
class ImageCtx;
@ -129,13 +128,11 @@ public:
void flush_commit_position(Context *on_finish);
uint64_t append_write_event(AioCompletion *aio_comp,
uint64_t offset, size_t length,
uint64_t append_write_event(uint64_t offset, size_t length,
const bufferlist &bl,
const AioObjectRequests &requests,
bool flush_entry);
uint64_t append_io_event(AioCompletion *aio_comp,
journal::EventEntry &&event_entry,
uint64_t append_io_event(journal::EventEntry &&event_entry,
const AioObjectRequests &requests,
uint64_t offset, size_t length,
bool flush_entry);
@ -176,7 +173,6 @@ private:
struct Event {
Futures futures;
AioCompletion *aio_comp = nullptr;
AioObjectRequests aio_object_requests;
Contexts on_safe_contexts;
ExtentInterval pending_extents;
@ -186,9 +182,9 @@ private:
Event() {
}
Event(const Futures &_futures, AioCompletion *_aio_comp,
const AioObjectRequests &_requests, uint64_t offset, size_t length)
: futures(_futures), aio_comp(_aio_comp), aio_object_requests(_requests) {
Event(const Futures &_futures, const AioObjectRequests &_requests,
uint64_t offset, size_t length)
: futures(_futures), aio_object_requests(_requests) {
if (length > 0) {
pending_extents.insert(offset, length);
}
@ -290,8 +286,7 @@ private:
journal::Replay<ImageCtxT> *m_journal_replay;
uint64_t append_io_events(AioCompletion *aio_comp,
journal::EventType event_type,
uint64_t append_io_events(journal::EventType event_type,
const Bufferlists &bufferlists,
const AioObjectRequests &requests,
uint64_t offset, size_t length, bool flush_entry);

View File

@ -87,6 +87,14 @@ struct ExecuteOp : public Context {
}
virtual void finish(int r) override {
CephContext *cct = image_ctx.cct;
if (r < 0) {
lderr(cct) << "ExecuteOp: " << __func__ << ": r=" << r << dendl;
on_op_complete->complete(r);
return;
}
ldout(cct, 20) << "ExecuteOp: " << __func__ << dendl;
RWLock::RLocker owner_locker(image_ctx.owner_lock);
execute(event);
}
@ -102,7 +110,17 @@ struct C_RefreshIfRequired : public Context {
}
virtual void finish(int r) override {
CephContext *cct = image_ctx.cct;
if (r < 0) {
lderr(cct) << "C_RefreshIfRequired: " << __func__ << ": r=" << r << dendl;
image_ctx.op_work_queue->queue(on_finish, r);
return;
}
if (image_ctx.state->is_refresh_required()) {
ldout(cct, 20) << "C_RefreshIfRequired: " << __func__ << ": "
<< "refresh required" << dendl;
image_ctx.state->refresh(on_finish);
return;
}
@ -156,8 +174,6 @@ void Replay<I>::shut_down(bool cancel_ops, Context *on_finish) {
ldout(cct, 20) << this << " " << __func__ << dendl;
AioCompletion *flush_comp = nullptr;
OpTids cancel_op_tids;
Contexts op_finish_events;
on_finish = util::create_async_context_callback(
m_image_ctx, on_finish);
@ -176,7 +192,9 @@ void Replay<I>::shut_down(bool cancel_ops, Context *on_finish) {
// OpFinishEvent or waiting for ready)
if (op_event.on_start_ready == nullptr &&
op_event.on_op_finish_event != nullptr) {
cancel_op_tids.push_back(op_event_pair.first);
Context *on_op_finish_event = nullptr;
std::swap(on_op_finish_event, op_event.on_op_finish_event);
m_image_ctx.op_work_queue->queue(on_op_finish_event, -ERESTART);
}
} else if (op_event.on_op_finish_event != nullptr) {
// start ops waiting for OpFinishEvent
@ -200,9 +218,6 @@ void Replay<I>::shut_down(bool cancel_ops, Context *on_finish) {
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
AioImageRequest<I>::aio_flush(&m_image_ctx, flush_comp);
}
for (auto op_tid : cancel_op_tids) {
handle_op_complete(op_tid, -ERESTART);
}
if (on_finish != nullptr) {
on_finish->complete(0);
}
@ -743,10 +758,8 @@ void Replay<I>::handle_op_complete(uint64_t op_tid, int r) {
op_event.on_finish_safe != nullptr) || shutting_down);
}
// skipped upon error -- so clean up if non-null
delete op_event.on_op_finish_event;
if (r == -ERESTART) {
delete op_event.on_op_complete;
if (op_event.on_op_finish_event != nullptr) {
op_event.on_op_finish_event->complete(r);
}
if (op_event.on_finish_ready != nullptr) {

View File

@ -87,11 +87,11 @@ struct C_OpenComplete : public Context {
}
if (r < 0) {
*ictxp = nullptr;
comp->fail(ictx->cct, r);
comp->fail(r);
} else {
*ictxp = ictx;
comp->lock.Lock();
comp->complete(ictx->cct);
comp->complete();
comp->put_unlock();
}
}
@ -123,10 +123,10 @@ struct C_CloseComplete : public Context {
virtual void finish(int r) {
ldout(cct, 20) << "C_CloseComplete::finish: r=" << r << dendl;
if (r < 0) {
comp->fail(cct, r);
comp->fail(r);
} else {
comp->lock.Lock();
comp->complete(cct);
comp->complete();
comp->put_unlock();
}
}

View File

@ -47,8 +47,8 @@ public:
librbd::Journal<>::AioObjectRequests requests;
{
RWLock::RLocker owner_locker(ictx->owner_lock);
ictx->journal->append_io_event(NULL, std::move(event_entry), requests, 0,
0, true);
ictx->journal->append_io_event(std::move(event_entry), requests, 0, 0,
true);
}
}

View File

@ -77,11 +77,11 @@ ACTION_P2(NotifyInvoke, lock, cond) {
}
ACTION_P2(CompleteAioCompletion, r, image_ctx) {
CephContext *cct = image_ctx->cct;
image_ctx->op_work_queue->queue(new FunctionContext([cct, arg0](int r) {
image_ctx->op_work_queue->queue(new FunctionContext([this, arg0](int r) {
arg0->get();
arg0->set_request_count(cct, 1);
arg0->complete_request(cct, r);
arg0->init_time(image_ctx, librbd::AIO_TYPE_NONE);
arg0->set_request_count(1);
arg0->complete_request(r);
}), r);
}
@ -217,8 +217,9 @@ public:
void when_complete(MockReplayImageCtx &mock_image_ctx, AioCompletion *aio_comp,
int r) {
aio_comp->get();
aio_comp->set_request_count(mock_image_ctx.cct, 1);
aio_comp->complete_request(mock_image_ctx.cct, r);
aio_comp->init_time(mock_image_ctx.image_ctx, librbd::AIO_TYPE_NONE);
aio_comp->set_request_count(1);
aio_comp->complete_request(r);
}
int when_flush(MockJournalReplay &mock_journal_replay) {
@ -460,7 +461,7 @@ TEST_F(TestMockJournalReplay, Flush) {
expect_op_work_queue(mock_image_ctx);
InSequence seq;
AioCompletion *aio_comp;
AioCompletion *aio_comp = nullptr;
C_SaferCond on_ready;
C_SaferCond on_safe;
expect_aio_discard(mock_aio_image_request, &aio_comp, 123, 456);
@ -614,9 +615,15 @@ TEST_F(TestMockJournalReplay, MissingOpFinishEventCancelOps) {
when_replay_op_ready(mock_journal_replay, 123, &on_resume);
ASSERT_EQ(0, on_snap_create_ready.wait());
ASSERT_EQ(0, when_shut_down(mock_journal_replay, true));
ASSERT_EQ(-ERESTART, on_snap_remove_safe.wait());
C_SaferCond on_shut_down;
mock_journal_replay.shut_down(true, &on_shut_down);
ASSERT_EQ(-ERESTART, on_resume.wait());
on_snap_create_finish->complete(-ERESTART);
ASSERT_EQ(-ERESTART, on_snap_create_safe.wait());
ASSERT_EQ(-ERESTART, on_snap_remove_safe.wait());
ASSERT_EQ(0, on_shut_down.wait());
}
TEST_F(TestMockJournalReplay, UnknownOpFinishEvent) {

View File

@ -9,6 +9,8 @@
#include "common/Mutex.h"
#include "cls/journal/cls_journal_types.h"
#include "journal/Journaler.h"
#include "librbd/AioCompletion.h"
#include "librbd/AioObjectRequest.h"
#include "librbd/Journal.h"
#include "librbd/Utils.h"
#include "librbd/journal/Replay.h"
@ -291,15 +293,20 @@ public:
bl.append_zero(length);
RWLock::RLocker owner_locker(mock_image_ctx.owner_lock);
return mock_journal.append_write_event(nullptr, 0, length, bl, {}, false);
return mock_journal.append_write_event(0, length, bl, {}, false);
}
uint64_t when_append_io_event(MockJournalImageCtx &mock_image_ctx,
MockJournal &mock_journal,
AioCompletion *aio_comp = nullptr) {
AioObjectRequest *object_request = nullptr) {
RWLock::RLocker owner_locker(mock_image_ctx.owner_lock);
MockJournal::AioObjectRequests object_requests;
if (object_request != nullptr) {
object_requests.push_back(object_request);
}
return mock_journal.append_io_event(
aio_comp, journal::EventEntry{journal::AioFlushEvent{}}, {}, 0, 0, false);
journal::EventEntry{journal::AioFlushEvent{}}, object_requests, 0, 0,
false);
}
void save_commit_context(Context *ctx) {
@ -878,21 +885,21 @@ TEST_F(TestMockJournal, EventCommitError) {
close_journal(mock_journal, mock_journaler);
};
AioCompletion *comp = new AioCompletion();
comp->get();
C_SaferCond object_request_ctx;
AioObjectRemove *object_request = new AioObjectRemove(
ictx, "oid", 0, {}, &object_request_ctx);
::journal::MockFuture mock_future;
Context *on_journal_safe;
expect_append_journaler(mock_journaler);
expect_wait_future(mock_future, &on_journal_safe);
ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal, comp));
ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal,
object_request));
// commit the event in the journal w/o waiting writeback
expect_future_committed(mock_journaler);
on_journal_safe->complete(-EINVAL);
ASSERT_EQ(0, comp->wait_for_complete());
ASSERT_EQ(-EINVAL, comp->get_return_value());
comp->put();
ASSERT_EQ(-EINVAL, object_request_ctx.wait());
// cache should receive the error after attempting writeback
expect_future_is_valid(mock_future);
@ -917,14 +924,16 @@ TEST_F(TestMockJournal, EventCommitErrorWithPendingWriteback) {
close_journal(mock_journal, mock_journaler);
};
AioCompletion *comp = new AioCompletion();
comp->get();
C_SaferCond object_request_ctx;
AioObjectRemove *object_request = new AioObjectRemove(
ictx, "oid", 0, {}, &object_request_ctx);
::journal::MockFuture mock_future;
Context *on_journal_safe;
expect_append_journaler(mock_journaler);
expect_wait_future(mock_future, &on_journal_safe);
ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal, comp));
ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal,
object_request));
expect_future_is_valid(mock_future);
C_SaferCond flush_ctx;
@ -933,9 +942,7 @@ TEST_F(TestMockJournal, EventCommitErrorWithPendingWriteback) {
// commit the event in the journal w/ waiting cache writeback
expect_future_committed(mock_journaler);
on_journal_safe->complete(-EINVAL);
ASSERT_EQ(0, comp->wait_for_complete());
ASSERT_EQ(-EINVAL, comp->get_return_value());
comp->put();
ASSERT_EQ(-EINVAL, object_request_ctx.wait());
// cache should receive the error if waiting
ASSERT_EQ(-EINVAL, flush_ctx.wait());