diff --git a/src/librbd/AioCompletion.cc b/src/librbd/AioCompletion.cc index e439aaaf2ef..db4b2ca74fe 100644 --- a/src/librbd/AioCompletion.cc +++ b/src/librbd/AioCompletion.cc @@ -38,8 +38,12 @@ namespace librbd { return 0; } - void AioCompletion::finalize(CephContext *cct, ssize_t rval) + void AioCompletion::finalize(ssize_t rval) { + assert(lock.is_locked()); + assert(ictx != nullptr); + CephContext *cct = ictx->cct; + ldout(cct, 20) << this << " " << __func__ << ": r=" << rval << ", " << "read_buf=" << reinterpret_cast(read_buf) << ", " << "real_bl=" << reinterpret_cast(read_bl) << dendl; @@ -67,10 +71,13 @@ namespace librbd { } } - void AioCompletion::complete(CephContext *cct) { + void AioCompletion::complete() { + assert(lock.is_locked()); + assert(ictx != nullptr); + CephContext *cct = ictx->cct; + tracepoint(librbd, aio_complete_enter, this, rval); utime_t elapsed; - assert(lock.is_locked()); elapsed = ceph_clock_now(cct) - start_time; switch (aio_type) { case AIO_TYPE_OPEN: @@ -120,47 +127,57 @@ namespace librbd { } void AioCompletion::init_time(ImageCtx *i, aio_type_t t) { - if (ictx == NULL) { + Mutex::Locker locker(lock); + if (ictx == nullptr) { ictx = i; aio_type = t; start_time = ceph_clock_now(ictx->cct); } } - void AioCompletion::start_op(ImageCtx *i, aio_type_t t) { - init_time(i, t); - + void AioCompletion::start_op() { Mutex::Locker locker(lock); - if (state == STATE_PENDING && !async_op.started()) { + assert(ictx != nullptr); + assert(!async_op.started()); + if (state == STATE_PENDING && aio_type != AIO_TYPE_FLUSH) { async_op.start_op(*ictx); } } - void AioCompletion::fail(CephContext *cct, int r) + void AioCompletion::fail(int r) { + lock.Lock(); + assert(ictx != nullptr); + CephContext *cct = ictx->cct; + lderr(cct) << this << " " << __func__ << ": " << cpp_strerror(r) << dendl; - lock.Lock(); assert(pending_count == 0); rval = r; - complete(cct); + complete(); put_unlock(); } - void AioCompletion::set_request_count(CephContext *cct, uint32_t count) { - ldout(cct, 20) << this << " " << __func__ << ": pending=" << count << dendl; + void AioCompletion::set_request_count(uint32_t count) { lock.Lock(); + assert(ictx != nullptr); + CephContext *cct = ictx->cct; + + ldout(cct, 20) << this << " " << __func__ << ": pending=" << count << dendl; assert(pending_count == 0); pending_count = count; lock.Unlock(); // if no pending requests, completion will fire now - unblock(cct); + unblock(); } - void AioCompletion::complete_request(CephContext *cct, ssize_t r) + void AioCompletion::complete_request(ssize_t r) { lock.Lock(); + assert(ictx != nullptr); + CephContext *cct = ictx->cct; + if (rval >= 0) { if (r < 0 && r != -EEXIST) rval = r; @@ -173,8 +190,8 @@ namespace librbd { ldout(cct, 20) << this << " " << __func__ << ": cb=" << complete_cb << ", " << "pending=" << pending_count << dendl; if (!count && blockers == 0) { - finalize(cct, rval); - complete(cct); + finalize(rval); + complete(); } put_unlock(); } @@ -207,24 +224,27 @@ namespace librbd { void C_AioRead::finish(int r) { - ldout(m_cct, 10) << "C_AioRead::finish() " << this << " r = " << r << dendl; + m_completion->lock.Lock(); + CephContext *cct = m_completion->ictx->cct; + ldout(cct, 10) << "C_AioRead::finish() " << this << " r = " << r << dendl; + if (r >= 0 || r == -ENOENT) { // this was a sparse_read operation - ldout(m_cct, 10) << " got " << m_req->m_ext_map - << " for " << m_req->m_buffer_extents - << " bl " << m_req->data().length() << dendl; + ldout(cct, 10) << " got " << m_req->m_ext_map + << " for " << m_req->m_buffer_extents + << " bl " << m_req->data().length() << dendl; // reads from the parent don't populate the m_ext_map and the overlap // may not be the full buffer. compensate here by filling in m_ext_map // with the read extent when it is empty. if (m_req->m_ext_map.empty()) m_req->m_ext_map[m_req->m_object_off] = m_req->data().length(); - m_completion->lock.Lock(); m_completion->destriper.add_partial_sparse_result( - m_cct, m_req->data(), m_req->m_ext_map, m_req->m_object_off, + cct, m_req->data(), m_req->m_ext_map, m_req->m_object_off, m_req->m_buffer_extents); - m_completion->lock.Unlock(); r = m_req->m_object_len; } + m_completion->lock.Unlock(); + C_AioRequest::finish(r); } diff --git a/src/librbd/AioCompletion.h b/src/librbd/AioCompletion.h index 6c37f91d55d..37d8977be95 100644 --- a/src/librbd/AioCompletion.h +++ b/src/librbd/AioCompletion.h @@ -115,27 +115,27 @@ namespace librbd { int wait_for_complete(); - void finalize(CephContext *cct, ssize_t rval); + void finalize(ssize_t rval); void init_time(ImageCtx *i, aio_type_t t); - void start_op(ImageCtx *i, aio_type_t t); - void fail(CephContext *cct, int r); + void start_op(); + void fail(int r); - void complete(CephContext *cct); + void complete(); void set_complete_cb(void *cb_arg, callback_t cb) { complete_cb = cb; complete_arg = cb_arg; } - void set_request_count(CephContext *cct, uint32_t num); + void set_request_count(uint32_t num); void add_request() { lock.Lock(); assert(pending_count > 0); lock.Unlock(); get(); } - void complete_request(CephContext *cct, ssize_t r); + void complete_request(ssize_t r); void associate_journal_event(uint64_t tid); @@ -183,13 +183,13 @@ namespace librbd { Mutex::Locker l(lock); ++blockers; } - void unblock(CephContext *cct) { + void unblock() { Mutex::Locker l(lock); assert(blockers > 0); --blockers; if (pending_count == 0 && blockers == 0) { - finalize(cct, rval); - complete(cct); + finalize(rval); + complete(); } } @@ -205,23 +205,21 @@ namespace librbd { class C_AioRequest : public Context { public: - C_AioRequest(CephContext *cct, AioCompletion *completion) - : m_cct(cct), m_completion(completion) { + C_AioRequest(AioCompletion *completion) : m_completion(completion) { m_completion->add_request(); } virtual ~C_AioRequest() {} virtual void finish(int r) { - m_completion->complete_request(m_cct, r); + m_completion->complete_request(r); } protected: - CephContext *m_cct; AioCompletion *m_completion; }; class C_AioRead : public C_AioRequest { public: - C_AioRead(CephContext *cct, AioCompletion *completion) - : C_AioRequest(cct, completion), m_req(NULL) { + C_AioRead(AioCompletion *completion) + : C_AioRequest(completion), m_req(nullptr) { } virtual ~C_AioRead() {} virtual void finish(int r); diff --git a/src/librbd/AioImageRequest.cc b/src/librbd/AioImageRequest.cc index 5a1904b8cb3..3b2868801da 100644 --- a/src/librbd/AioImageRequest.cc +++ b/src/librbd/AioImageRequest.cc @@ -48,7 +48,7 @@ struct C_DiscardJournalCommit : public Context { Mutex::Locker cache_locker(image_ctx.cache_lock); image_ctx.object_cacher->discard_set(image_ctx.object_set, object_extents); - aio_comp->complete_request(cct, r); + aio_comp->complete_request(r); } }; @@ -71,7 +71,7 @@ struct C_FlushJournalCommit : public Context { CephContext *cct = image_ctx.cct; ldout(cct, 20) << this << " C_FlushJournalCommit: journal committed" << dendl; - aio_comp->complete_request(cct, r); + aio_comp->complete_request(r); } }; @@ -82,7 +82,10 @@ void AioImageRequest::aio_read( I *ictx, AioCompletion *c, const std::vector > &extents, char *buf, bufferlist *pbl, int op_flags) { + c->init_time(ictx, librbd::AIO_TYPE_READ); + AioImageRead req(*ictx, c, extents, buf, pbl, op_flags); + req.start_op(); req.send(); } @@ -90,7 +93,10 @@ template void AioImageRequest::aio_read(I *ictx, AioCompletion *c, uint64_t off, size_t len, char *buf, bufferlist *pbl, int op_flags) { + c->init_time(ictx, librbd::AIO_TYPE_READ); + AioImageRead req(*ictx, c, off, len, buf, pbl, op_flags); + req.start_op(); req.send(); } @@ -98,20 +104,29 @@ template void AioImageRequest::aio_write(I *ictx, AioCompletion *c, uint64_t off, size_t len, const char *buf, int op_flags) { + c->init_time(ictx, librbd::AIO_TYPE_WRITE); + AioImageWrite req(*ictx, c, off, len, buf, op_flags); + req.start_op(); req.send(); } template void AioImageRequest::aio_discard(I *ictx, AioCompletion *c, uint64_t off, uint64_t len) { + c->init_time(ictx, librbd::AIO_TYPE_DISCARD); + AioImageDiscard req(*ictx, c, off, len); + req.start_op(); req.send(); } template void AioImageRequest::aio_flush(I *ictx, AioCompletion *c) { + c->init_time(ictx, librbd::AIO_TYPE_FLUSH); + AioImageFlush req(*ictx, c); + req.start_op(); req.send(); } @@ -130,7 +145,7 @@ void AioImageRequest::send() { template void AioImageRequest::fail(int r) { m_aio_comp->get(); - m_aio_comp->fail(m_image_ctx.cct, r); + m_aio_comp->fail(r); } void AioImageRead::send_request() { @@ -157,7 +172,7 @@ void AioImageRead::send_request() { uint64_t len = p->second; int r = clip_io(&m_image_ctx, p->first, &len); if (r < 0) { - m_aio_comp->fail(cct, r); + m_aio_comp->fail(r); return; } if (len == 0) { @@ -169,8 +184,6 @@ void AioImageRead::send_request() { object_extents, buffer_ofs); buffer_ofs += len; } - - m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_READ); } m_aio_comp->read_buf = m_buf; @@ -182,7 +195,7 @@ void AioImageRead::send_request() { for (auto &object_extent : object_extents) { request_count += object_extent.second.size(); } - m_aio_comp->set_request_count(cct, request_count); + m_aio_comp->set_request_count(request_count); // issue the requests for (auto &object_extent : object_extents) { @@ -191,7 +204,7 @@ void AioImageRead::send_request() { << extent.length << " from " << extent.buffer_extents << dendl; - C_AioRead *req_comp = new C_AioRead(cct, m_aio_comp); + C_AioRead *req_comp = new C_AioRead(m_aio_comp); AioObjectRead *req = new AioObjectRead(&m_image_ctx, extent.oid.name, extent.objectno, extent.offset, extent.length, @@ -231,18 +244,17 @@ void AbstractAioImageWrite::send_request() { // pending async operation RWLock::RLocker snap_locker(m_image_ctx.snap_lock); if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) { - m_aio_comp->fail(cct, -EROFS); + m_aio_comp->fail(-EROFS); return; } int r = clip_io(&m_image_ctx, m_off, &clip_len); if (r < 0) { - m_aio_comp->fail(cct, r); + m_aio_comp->fail(r); return; } snapc = m_image_ctx.snapc; - m_aio_comp->start_op(&m_image_ctx, get_aio_type()); // map to object extents if (clip_len > 0) { @@ -258,7 +270,7 @@ void AbstractAioImageWrite::send_request() { if (!object_extents.empty()) { uint64_t journal_tid = 0; m_aio_comp->set_request_count( - cct, object_extents.size() + get_cache_request_count(journaling)); + object_extents.size() + get_cache_request_count(journaling)); AioObjectRequests requests; send_object_requests(object_extents, snapc, @@ -275,7 +287,7 @@ void AbstractAioImageWrite::send_request() { } } else { // no IO to perform -- fire completion - m_aio_comp->unblock(cct); + m_aio_comp->unblock(); } update_stats(clip_len); @@ -291,7 +303,7 @@ void AbstractAioImageWrite::send_object_requests( p != object_extents.end(); ++p) { ldout(cct, 20) << " oid " << p->oid << " " << p->offset << "~" << p->length << " from " << p->buffer_extents << dendl; - C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp); + C_AioRequest *req_comp = new C_AioRequest(m_aio_comp); AioObjectRequest *request = create_object_request(*p, snapc, req_comp); // if journaling, stash the request for later; otherwise send @@ -318,9 +330,8 @@ uint64_t AioImageWrite::append_journal_event( bufferlist bl; bl.append(m_buf, m_len); - uint64_t tid = m_image_ctx.journal->append_write_event(m_aio_comp, m_off, - m_len, bl, requests, - synchronous); + uint64_t tid = m_image_ctx.journal->append_write_event(m_off, m_len, bl, + requests, synchronous); if (m_image_ctx.object_cacher == NULL) { m_aio_comp->associate_journal_event(tid); } @@ -329,7 +340,6 @@ uint64_t AioImageWrite::append_journal_event( void AioImageWrite::send_cache_requests(const ObjectExtents &object_extents, uint64_t journal_tid) { - CephContext *cct = m_image_ctx.cct; for (ObjectExtents::const_iterator p = object_extents.begin(); p != object_extents.end(); ++p) { const ObjectExtent &object_extent = *p; @@ -337,7 +347,7 @@ void AioImageWrite::send_cache_requests(const ObjectExtents &object_extents, bufferlist bl; assemble_extent(object_extent, &bl); - C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp); + C_AioRequest *req_comp = new C_AioRequest(m_aio_comp); m_image_ctx.write_to_cache(object_extent.oid, bl, object_extent.length, object_extent.offset, req_comp, m_op_flags, journal_tid); @@ -378,8 +388,7 @@ void AioImageWrite::update_stats(size_t length) { uint64_t AioImageDiscard::append_journal_event( const AioObjectRequests &requests, bool synchronous) { journal::EventEntry event_entry(journal::AioDiscardEvent(m_off, m_len)); - uint64_t tid = m_image_ctx.journal->append_io_event(m_aio_comp, - std::move(event_entry), + uint64_t tid = m_image_ctx.journal->append_io_event(std::move(event_entry), requests, m_off, m_len, synchronous); m_aio_comp->associate_journal_event(tid); @@ -438,8 +447,6 @@ void AioImageDiscard::update_stats(size_t length) { } void AioImageFlush::send_request() { - CephContext *cct = m_image_ctx.cct; - bool journaling = false; { RWLock::RLocker snap_locker(m_image_ctx.snap_lock); @@ -447,12 +454,12 @@ void AioImageFlush::send_request() { !m_image_ctx.journal->is_journal_replaying()); } - m_aio_comp->set_request_count(cct, journaling ? 2 : 1); + m_aio_comp->set_request_count(journaling ? 2 : 1); if (journaling) { // in-flight ops are flushed prior to closing the journal uint64_t journal_tid = m_image_ctx.journal->append_io_event( - m_aio_comp, journal::EventEntry(journal::AioFlushEvent()), + journal::EventEntry(journal::AioFlushEvent()), AioObjectRequests(), 0, 0, false); C_FlushJournalCommit *ctx = new C_FlushJournalCommit(m_image_ctx, @@ -462,10 +469,9 @@ void AioImageFlush::send_request() { m_aio_comp->associate_journal_event(journal_tid); } - C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp); + C_AioRequest *req_comp = new C_AioRequest(m_aio_comp); m_image_ctx.flush(req_comp); - m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_FLUSH); m_aio_comp->put(); m_image_ctx.perfcounter->inc(l_librbd_aio_flush); diff --git a/src/librbd/AioImageRequest.h b/src/librbd/AioImageRequest.h index e8a3fd5b106..b30cc30f417 100644 --- a/src/librbd/AioImageRequest.h +++ b/src/librbd/AioImageRequest.h @@ -40,6 +40,10 @@ public: return false; } + void start_op() { + m_aio_comp->start_op(); + } + void send(); void fail(int r); @@ -106,8 +110,6 @@ protected: m_synchronous(false) { } - virtual aio_type_t get_aio_type() const = 0; - virtual void send_request(); virtual uint32_t get_cache_request_count(bool journaling) const { @@ -140,9 +142,6 @@ public: } protected: - virtual aio_type_t get_aio_type() const { - return AIO_TYPE_WRITE; - } virtual const char *get_request_type() const { return "aio_write"; } @@ -175,9 +174,6 @@ public: } protected: - virtual aio_type_t get_aio_type() const { - return AIO_TYPE_DISCARD; - } virtual const char *get_request_type() const { return "aio_discard"; } diff --git a/src/librbd/AioImageRequestWQ.cc b/src/librbd/AioImageRequestWQ.cc index 5ed3e2e8428..fba79339fb7 100644 --- a/src/librbd/AioImageRequestWQ.cc +++ b/src/librbd/AioImageRequestWQ.cc @@ -341,6 +341,8 @@ void *AioImageRequestWQ::_void_dequeue() { get_pool_lock().Lock(); return nullptr; } + + item->start_op(); return item; } @@ -398,8 +400,7 @@ int AioImageRequestWQ::start_in_flight_op(AioCompletion *c) { CephContext *cct = m_image_ctx.cct; lderr(cct) << "IO received on closed image" << dendl; - c->get(); - c->fail(cct, -ESHUTDOWN); + c->fail(-ESHUTDOWN); return false; } diff --git a/src/librbd/AioObjectRequest.cc b/src/librbd/AioObjectRequest.cc index 77aaa7e1f61..faee3d2ff9e 100644 --- a/src/librbd/AioObjectRequest.cc +++ b/src/librbd/AioObjectRequest.cc @@ -173,7 +173,7 @@ namespace librbd { // release reference to the parent read completion. this request // might be completed after unblock is invoked. AioCompletion *parent_completion = m_parent_completion; - parent_completion->unblock(m_ictx->cct); + parent_completion->unblock(); parent_completion->put(); } } diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index ffdfe1d40da..b24e8611e0b 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -2,7 +2,6 @@ // vim: ts=8 sw=2 smarttab #include "librbd/Journal.h" -#include "librbd/AioCompletion.h" #include "librbd/AioImageRequestWQ.h" #include "librbd/AioObjectRequest.h" #include "librbd/ExclusiveLock.h" @@ -607,6 +606,7 @@ bool Journal::is_journal_replaying() const { Mutex::Locker locker(m_lock); return (m_state == STATE_REPLAYING || m_state == STATE_FLUSHING_REPLAY || + m_state == STATE_FLUSHING_RESTART || m_state == STATE_RESTARTING_REPLAY); } @@ -802,8 +802,7 @@ void Journal::flush_commit_position(Context *on_finish) { } template -uint64_t Journal::append_write_event(AioCompletion *aio_comp, - uint64_t offset, size_t length, +uint64_t Journal::append_write_event(uint64_t offset, size_t length, const bufferlist &bl, const AioObjectRequests &requests, bool flush_entry) { @@ -833,13 +832,12 @@ uint64_t Journal::append_write_event(AioCompletion *aio_comp, bytes_remaining -= event_length; } while (bytes_remaining > 0); - return append_io_events(aio_comp, journal::EVENT_TYPE_AIO_WRITE, bufferlists, - requests, offset, length, flush_entry); + return append_io_events(journal::EVENT_TYPE_AIO_WRITE, bufferlists, requests, + offset, length, flush_entry); } template -uint64_t Journal::append_io_event(AioCompletion *aio_comp, - journal::EventEntry &&event_entry, +uint64_t Journal::append_io_event(journal::EventEntry &&event_entry, const AioObjectRequests &requests, uint64_t offset, size_t length, bool flush_entry) { @@ -847,13 +845,12 @@ uint64_t Journal::append_io_event(AioCompletion *aio_comp, bufferlist bl; ::encode(event_entry, bl); - return append_io_events(aio_comp, event_entry.get_event_type(), {bl}, - requests, offset, length, flush_entry); + return append_io_events(event_entry.get_event_type(), {bl}, requests, offset, + length, flush_entry); } template -uint64_t Journal::append_io_events(AioCompletion *aio_comp, - journal::EventType event_type, +uint64_t Journal::append_io_events(journal::EventType event_type, const Bufferlists &bufferlists, const AioObjectRequests &requests, uint64_t offset, size_t length, @@ -875,7 +872,7 @@ uint64_t Journal::append_io_events(AioCompletion *aio_comp, assert(bl.length() <= m_max_append_size); futures.push_back(m_journaler->append(m_tag_tid, bl)); } - m_events[tid] = Event(futures, aio_comp, requests, offset, length); + m_events[tid] = Event(futures, requests, offset, length); } CephContext *cct = m_image_ctx.cct; @@ -968,6 +965,10 @@ void Journal::append_op_event(uint64_t op_tid, } on_safe = create_async_context_callback(m_image_ctx, on_safe); + on_safe = new FunctionContext([this, on_safe](int r) { + // ensure all committed IO before this op is committed + m_journaler->flush_commit_position(on_safe); + }); future.flush(on_safe); CephContext *cct = m_image_ctx.cct; @@ -1349,6 +1350,10 @@ void Journal::handle_replay_process_safe(ReplayEntry replay_entry, int r) { CephContext *cct = m_image_ctx.cct; m_lock.Lock(); + assert(m_state == STATE_REPLAYING || + m_state == STATE_FLUSHING_RESTART || + m_state == STATE_FLUSHING_REPLAY); + ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl; if (r < 0) { lderr(cct) << "failed to commit journal event to disk: " << cpp_strerror(r) @@ -1382,8 +1387,8 @@ void Journal::handle_replay_process_safe(ReplayEntry replay_entry, int r) { } else { // only commit the entry if written successfully m_journaler->committed(replay_entry); - m_lock.Unlock(); } + m_lock.Unlock(); } template @@ -1477,7 +1482,6 @@ void Journal::handle_io_event_safe(int r, uint64_t tid) { lderr(cct) << "failed to commit IO event: " << cpp_strerror(r) << dendl; } - AioCompletion *aio_comp; AioObjectRequests aio_object_requests; Contexts on_safe_contexts; { @@ -1486,7 +1490,6 @@ void Journal::handle_io_event_safe(int r, uint64_t tid) { assert(it != m_events.end()); Event &event = it->second; - aio_comp = event.aio_comp; aio_object_requests.swap(event.aio_object_requests); on_safe_contexts.swap(event.on_safe_contexts); @@ -1507,15 +1510,14 @@ void Journal::handle_io_event_safe(int r, uint64_t tid) { } ldout(cct, 20) << "completing tid=" << tid << dendl; - - if (r < 0) { - // don't send aio requests if the journal fails -- bubble error up - aio_comp->fail(cct, r); - } else { - // send any waiting aio requests now that journal entry is safe - RWLock::RLocker owner_locker(m_image_ctx.owner_lock); - for (AioObjectRequests::iterator it = aio_object_requests.begin(); - it != aio_object_requests.end(); ++it) { + for (AioObjectRequests::iterator it = aio_object_requests.begin(); + it != aio_object_requests.end(); ++it) { + if (r < 0) { + // don't send aio requests if the journal fails -- bubble error up + (*it)->complete(r); + } else { + // send any waiting aio requests now that journal entry is safe + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); (*it)->send(); } } diff --git a/src/librbd/Journal.h b/src/librbd/Journal.h index 48fe9a2e007..3d36b28b402 100644 --- a/src/librbd/Journal.h +++ b/src/librbd/Journal.h @@ -30,7 +30,6 @@ class Journaler; namespace librbd { -class AioCompletion; class AioObjectRequest; class ImageCtx; @@ -129,13 +128,11 @@ public: void flush_commit_position(Context *on_finish); - uint64_t append_write_event(AioCompletion *aio_comp, - uint64_t offset, size_t length, + uint64_t append_write_event(uint64_t offset, size_t length, const bufferlist &bl, const AioObjectRequests &requests, bool flush_entry); - uint64_t append_io_event(AioCompletion *aio_comp, - journal::EventEntry &&event_entry, + uint64_t append_io_event(journal::EventEntry &&event_entry, const AioObjectRequests &requests, uint64_t offset, size_t length, bool flush_entry); @@ -176,7 +173,6 @@ private: struct Event { Futures futures; - AioCompletion *aio_comp = nullptr; AioObjectRequests aio_object_requests; Contexts on_safe_contexts; ExtentInterval pending_extents; @@ -186,9 +182,9 @@ private: Event() { } - Event(const Futures &_futures, AioCompletion *_aio_comp, - const AioObjectRequests &_requests, uint64_t offset, size_t length) - : futures(_futures), aio_comp(_aio_comp), aio_object_requests(_requests) { + Event(const Futures &_futures, const AioObjectRequests &_requests, + uint64_t offset, size_t length) + : futures(_futures), aio_object_requests(_requests) { if (length > 0) { pending_extents.insert(offset, length); } @@ -290,8 +286,7 @@ private: journal::Replay *m_journal_replay; - uint64_t append_io_events(AioCompletion *aio_comp, - journal::EventType event_type, + uint64_t append_io_events(journal::EventType event_type, const Bufferlists &bufferlists, const AioObjectRequests &requests, uint64_t offset, size_t length, bool flush_entry); diff --git a/src/librbd/journal/Replay.cc b/src/librbd/journal/Replay.cc index c6c4f55b602..c57202a0ba3 100644 --- a/src/librbd/journal/Replay.cc +++ b/src/librbd/journal/Replay.cc @@ -87,6 +87,14 @@ struct ExecuteOp : public Context { } virtual void finish(int r) override { + CephContext *cct = image_ctx.cct; + if (r < 0) { + lderr(cct) << "ExecuteOp: " << __func__ << ": r=" << r << dendl; + on_op_complete->complete(r); + return; + } + + ldout(cct, 20) << "ExecuteOp: " << __func__ << dendl; RWLock::RLocker owner_locker(image_ctx.owner_lock); execute(event); } @@ -102,7 +110,17 @@ struct C_RefreshIfRequired : public Context { } virtual void finish(int r) override { + CephContext *cct = image_ctx.cct; + + if (r < 0) { + lderr(cct) << "C_RefreshIfRequired: " << __func__ << ": r=" << r << dendl; + image_ctx.op_work_queue->queue(on_finish, r); + return; + } + if (image_ctx.state->is_refresh_required()) { + ldout(cct, 20) << "C_RefreshIfRequired: " << __func__ << ": " + << "refresh required" << dendl; image_ctx.state->refresh(on_finish); return; } @@ -156,8 +174,6 @@ void Replay::shut_down(bool cancel_ops, Context *on_finish) { ldout(cct, 20) << this << " " << __func__ << dendl; AioCompletion *flush_comp = nullptr; - OpTids cancel_op_tids; - Contexts op_finish_events; on_finish = util::create_async_context_callback( m_image_ctx, on_finish); @@ -176,7 +192,9 @@ void Replay::shut_down(bool cancel_ops, Context *on_finish) { // OpFinishEvent or waiting for ready) if (op_event.on_start_ready == nullptr && op_event.on_op_finish_event != nullptr) { - cancel_op_tids.push_back(op_event_pair.first); + Context *on_op_finish_event = nullptr; + std::swap(on_op_finish_event, op_event.on_op_finish_event); + m_image_ctx.op_work_queue->queue(on_op_finish_event, -ERESTART); } } else if (op_event.on_op_finish_event != nullptr) { // start ops waiting for OpFinishEvent @@ -200,9 +218,6 @@ void Replay::shut_down(bool cancel_ops, Context *on_finish) { RWLock::RLocker owner_locker(m_image_ctx.owner_lock); AioImageRequest::aio_flush(&m_image_ctx, flush_comp); } - for (auto op_tid : cancel_op_tids) { - handle_op_complete(op_tid, -ERESTART); - } if (on_finish != nullptr) { on_finish->complete(0); } @@ -743,10 +758,8 @@ void Replay::handle_op_complete(uint64_t op_tid, int r) { op_event.on_finish_safe != nullptr) || shutting_down); } - // skipped upon error -- so clean up if non-null - delete op_event.on_op_finish_event; - if (r == -ERESTART) { - delete op_event.on_op_complete; + if (op_event.on_op_finish_event != nullptr) { + op_event.on_op_finish_event->complete(r); } if (op_event.on_finish_ready != nullptr) { diff --git a/src/librbd/librbd.cc b/src/librbd/librbd.cc index d435c018419..6711b39db17 100644 --- a/src/librbd/librbd.cc +++ b/src/librbd/librbd.cc @@ -87,11 +87,11 @@ struct C_OpenComplete : public Context { } if (r < 0) { *ictxp = nullptr; - comp->fail(ictx->cct, r); + comp->fail(r); } else { *ictxp = ictx; comp->lock.Lock(); - comp->complete(ictx->cct); + comp->complete(); comp->put_unlock(); } } @@ -123,10 +123,10 @@ struct C_CloseComplete : public Context { virtual void finish(int r) { ldout(cct, 20) << "C_CloseComplete::finish: r=" << r << dendl; if (r < 0) { - comp->fail(cct, r); + comp->fail(r); } else { comp->lock.Lock(); - comp->complete(cct); + comp->complete(); comp->put_unlock(); } } diff --git a/src/test/librbd/journal/test_Replay.cc b/src/test/librbd/journal/test_Replay.cc index f7e20f71c5a..cbde9ae72bd 100644 --- a/src/test/librbd/journal/test_Replay.cc +++ b/src/test/librbd/journal/test_Replay.cc @@ -47,8 +47,8 @@ public: librbd::Journal<>::AioObjectRequests requests; { RWLock::RLocker owner_locker(ictx->owner_lock); - ictx->journal->append_io_event(NULL, std::move(event_entry), requests, 0, - 0, true); + ictx->journal->append_io_event(std::move(event_entry), requests, 0, 0, + true); } } diff --git a/src/test/librbd/journal/test_mock_Replay.cc b/src/test/librbd/journal/test_mock_Replay.cc index f4145e1f6b1..43f2909b2bb 100644 --- a/src/test/librbd/journal/test_mock_Replay.cc +++ b/src/test/librbd/journal/test_mock_Replay.cc @@ -77,11 +77,11 @@ ACTION_P2(NotifyInvoke, lock, cond) { } ACTION_P2(CompleteAioCompletion, r, image_ctx) { - CephContext *cct = image_ctx->cct; - image_ctx->op_work_queue->queue(new FunctionContext([cct, arg0](int r) { + image_ctx->op_work_queue->queue(new FunctionContext([this, arg0](int r) { arg0->get(); - arg0->set_request_count(cct, 1); - arg0->complete_request(cct, r); + arg0->init_time(image_ctx, librbd::AIO_TYPE_NONE); + arg0->set_request_count(1); + arg0->complete_request(r); }), r); } @@ -217,8 +217,9 @@ public: void when_complete(MockReplayImageCtx &mock_image_ctx, AioCompletion *aio_comp, int r) { aio_comp->get(); - aio_comp->set_request_count(mock_image_ctx.cct, 1); - aio_comp->complete_request(mock_image_ctx.cct, r); + aio_comp->init_time(mock_image_ctx.image_ctx, librbd::AIO_TYPE_NONE); + aio_comp->set_request_count(1); + aio_comp->complete_request(r); } int when_flush(MockJournalReplay &mock_journal_replay) { @@ -460,7 +461,7 @@ TEST_F(TestMockJournalReplay, Flush) { expect_op_work_queue(mock_image_ctx); InSequence seq; - AioCompletion *aio_comp; + AioCompletion *aio_comp = nullptr; C_SaferCond on_ready; C_SaferCond on_safe; expect_aio_discard(mock_aio_image_request, &aio_comp, 123, 456); @@ -614,9 +615,15 @@ TEST_F(TestMockJournalReplay, MissingOpFinishEventCancelOps) { when_replay_op_ready(mock_journal_replay, 123, &on_resume); ASSERT_EQ(0, on_snap_create_ready.wait()); - ASSERT_EQ(0, when_shut_down(mock_journal_replay, true)); - ASSERT_EQ(-ERESTART, on_snap_remove_safe.wait()); + C_SaferCond on_shut_down; + mock_journal_replay.shut_down(true, &on_shut_down); + + ASSERT_EQ(-ERESTART, on_resume.wait()); + on_snap_create_finish->complete(-ERESTART); ASSERT_EQ(-ERESTART, on_snap_create_safe.wait()); + + ASSERT_EQ(-ERESTART, on_snap_remove_safe.wait()); + ASSERT_EQ(0, on_shut_down.wait()); } TEST_F(TestMockJournalReplay, UnknownOpFinishEvent) { diff --git a/src/test/librbd/test_mock_Journal.cc b/src/test/librbd/test_mock_Journal.cc index 53a6fa8632d..b3ed6fd53be 100644 --- a/src/test/librbd/test_mock_Journal.cc +++ b/src/test/librbd/test_mock_Journal.cc @@ -9,6 +9,8 @@ #include "common/Mutex.h" #include "cls/journal/cls_journal_types.h" #include "journal/Journaler.h" +#include "librbd/AioCompletion.h" +#include "librbd/AioObjectRequest.h" #include "librbd/Journal.h" #include "librbd/Utils.h" #include "librbd/journal/Replay.h" @@ -291,15 +293,20 @@ public: bl.append_zero(length); RWLock::RLocker owner_locker(mock_image_ctx.owner_lock); - return mock_journal.append_write_event(nullptr, 0, length, bl, {}, false); + return mock_journal.append_write_event(0, length, bl, {}, false); } uint64_t when_append_io_event(MockJournalImageCtx &mock_image_ctx, MockJournal &mock_journal, - AioCompletion *aio_comp = nullptr) { + AioObjectRequest *object_request = nullptr) { RWLock::RLocker owner_locker(mock_image_ctx.owner_lock); + MockJournal::AioObjectRequests object_requests; + if (object_request != nullptr) { + object_requests.push_back(object_request); + } return mock_journal.append_io_event( - aio_comp, journal::EventEntry{journal::AioFlushEvent{}}, {}, 0, 0, false); + journal::EventEntry{journal::AioFlushEvent{}}, object_requests, 0, 0, + false); } void save_commit_context(Context *ctx) { @@ -878,21 +885,21 @@ TEST_F(TestMockJournal, EventCommitError) { close_journal(mock_journal, mock_journaler); }; - AioCompletion *comp = new AioCompletion(); - comp->get(); + C_SaferCond object_request_ctx; + AioObjectRemove *object_request = new AioObjectRemove( + ictx, "oid", 0, {}, &object_request_ctx); ::journal::MockFuture mock_future; Context *on_journal_safe; expect_append_journaler(mock_journaler); expect_wait_future(mock_future, &on_journal_safe); - ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal, comp)); + ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal, + object_request)); // commit the event in the journal w/o waiting writeback expect_future_committed(mock_journaler); on_journal_safe->complete(-EINVAL); - ASSERT_EQ(0, comp->wait_for_complete()); - ASSERT_EQ(-EINVAL, comp->get_return_value()); - comp->put(); + ASSERT_EQ(-EINVAL, object_request_ctx.wait()); // cache should receive the error after attempting writeback expect_future_is_valid(mock_future); @@ -917,14 +924,16 @@ TEST_F(TestMockJournal, EventCommitErrorWithPendingWriteback) { close_journal(mock_journal, mock_journaler); }; - AioCompletion *comp = new AioCompletion(); - comp->get(); + C_SaferCond object_request_ctx; + AioObjectRemove *object_request = new AioObjectRemove( + ictx, "oid", 0, {}, &object_request_ctx); ::journal::MockFuture mock_future; Context *on_journal_safe; expect_append_journaler(mock_journaler); expect_wait_future(mock_future, &on_journal_safe); - ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal, comp)); + ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal, + object_request)); expect_future_is_valid(mock_future); C_SaferCond flush_ctx; @@ -933,9 +942,7 @@ TEST_F(TestMockJournal, EventCommitErrorWithPendingWriteback) { // commit the event in the journal w/ waiting cache writeback expect_future_committed(mock_journaler); on_journal_safe->complete(-EINVAL); - ASSERT_EQ(0, comp->wait_for_complete()); - ASSERT_EQ(-EINVAL, comp->get_return_value()); - comp->put(); + ASSERT_EQ(-EINVAL, object_request_ctx.wait()); // cache should receive the error if waiting ASSERT_EQ(-EINVAL, flush_ctx.wait());