diff --git a/src/librbd/AioImageRequestWQ.cc b/src/librbd/AioImageRequestWQ.cc index 352b7e9f38d..97c6fa4cda0 100644 --- a/src/librbd/AioImageRequestWQ.cc +++ b/src/librbd/AioImageRequestWQ.cc @@ -302,17 +302,24 @@ void AioImageRequestWQ::clear_require_lock_on_read() { void *AioImageRequestWQ::_void_dequeue() { AioImageRequest<> *peek_item = front(); - if (peek_item == NULL || m_refresh_in_progress) { - return NULL; + + // no IO ops available or refresh in-progress (IO stalled) + if (peek_item == nullptr || m_refresh_in_progress) { + return nullptr; } + bool refresh_required = m_image_ctx.state->is_refresh_required(); { RWLock::RLocker locker(m_lock); if (peek_item->is_write_op()) { if (m_write_blockers > 0) { - return NULL; + return nullptr; + } + + // refresh will requeue the op -- don't count it as in-progress + if (!refresh_required) { + m_in_progress_writes.inc(); } - m_in_progress_writes.inc(); } else if (m_require_lock_on_read) { return nullptr; } @@ -322,15 +329,17 @@ void *AioImageRequestWQ::_void_dequeue() { ThreadPool::PointerWQ >::_void_dequeue()); assert(peek_item == item); - if (m_image_ctx.state->is_refresh_required()) { + if (refresh_required) { ldout(m_image_ctx.cct, 15) << "image refresh required: delaying IO " << item << dendl; + + // stall IO until the refresh completes m_refresh_in_progress = true; get_pool_lock().Unlock(); m_image_ctx.state->refresh(new C_RefreshFinish(this, item)); get_pool_lock().Lock(); - return NULL; + return nullptr; } return item; } @@ -345,21 +354,34 @@ void AioImageRequestWQ::process(AioImageRequest<> *req) { req->send(); } + finish_queued_op(req); + if (req->is_write_op()) { + finish_in_progress_write(); + } + delete req; + + finish_in_flight_op(); +} + +void AioImageRequestWQ::finish_queued_op(AioImageRequest<> *req) { + RWLock::RLocker locker(m_lock); + if (req->is_write_op()) { + assert(m_queued_writes.read() > 0); + m_queued_writes.dec(); + } else { + assert(m_queued_reads.read() > 0); + m_queued_reads.dec(); + } +} + +void AioImageRequestWQ::finish_in_progress_write() { bool writes_blocked = false; { RWLock::RLocker locker(m_lock); - if (req->is_write_op()) { - assert(m_queued_writes.read() > 0); - m_queued_writes.dec(); - - assert(m_in_progress_writes.read() > 0); - if (m_in_progress_writes.dec() == 0 && - !m_write_blocker_contexts.empty()) { - writes_blocked = true; - } - } else { - assert(m_queued_reads.read() > 0); - m_queued_reads.dec(); + assert(m_in_progress_writes.read() > 0); + if (m_in_progress_writes.dec() == 0 && + !m_write_blocker_contexts.empty()) { + writes_blocked = true; } } @@ -367,9 +389,6 @@ void AioImageRequestWQ::process(AioImageRequest<> *req) { RWLock::RLocker owner_locker(m_image_ctx.owner_lock); m_image_ctx.flush(new C_BlockedWrites(this)); } - delete req; - - finish_in_flight_op(); } int AioImageRequestWQ::start_in_flight_op(AioCompletion *c) { @@ -440,12 +459,24 @@ void AioImageRequestWQ::handle_refreshed(int r, AioImageRequest<> *req) { << "req=" << req << dendl; if (r < 0) { req->fail(r); - } else { - process(req); - process_finish(); + delete req; - m_refresh_in_progress = false; - signal(); + finish_queued_op(req); + finish_in_flight_op(); + } else { + // since IO was stalled for refresh -- original IO order is preserved + // if we requeue this op for work queue processing + requeue(req); + } + + m_refresh_in_progress = false; + signal(); + + // refresh might have enabled exclusive lock -- IO stalled until + // we acquire the lock + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + if (is_lock_required() && is_lock_request_needed()) { + m_image_ctx.exclusive_lock->request_lock(nullptr); } } diff --git a/src/librbd/AioImageRequestWQ.h b/src/librbd/AioImageRequestWQ.h index fa429fc7e9e..e21aa860f67 100644 --- a/src/librbd/AioImageRequestWQ.h +++ b/src/librbd/AioImageRequestWQ.h @@ -104,6 +104,9 @@ private: return (m_queued_writes.read() == 0); } + void finish_queued_op(AioImageRequest *req); + void finish_in_progress_write(); + int start_in_flight_op(AioCompletion *c); void finish_in_flight_op();