diff --git a/src/librbd/io/ImageDispatchSpec.h b/src/librbd/io/ImageDispatchSpec.h index 353cdd16f98..bb326374a84 100644 --- a/src/librbd/io/ImageDispatchSpec.h +++ b/src/librbd/io/ImageDispatchSpec.h @@ -153,6 +153,7 @@ public: } uint64_t get_tid(); + bool blocked = false; private: typedef boost::variant::aio_write(AioCompletion *c, uint64_t off, uint64_t len, if (m_image_ctx.non_blocking_aio || writes_blocked()) { queue(req); } else { - c->start_op(); process_io(req, false); finish_in_flight_io(); } @@ -388,7 +387,6 @@ void ImageRequestWQ::aio_discard(AioCompletion *c, uint64_t off, if (m_image_ctx.non_blocking_aio || writes_blocked()) { queue(req); } else { - c->start_op(); process_io(req, false); finish_in_flight_io(); } @@ -436,7 +434,6 @@ void ImageRequestWQ::aio_flush(AioCompletion *c, bool native_async) { if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) { queue(req); } else { - c->start_op(); process_io(req, false); finish_in_flight_io(); } @@ -483,7 +480,6 @@ void ImageRequestWQ::aio_writesame(AioCompletion *c, uint64_t off, if (m_image_ctx.non_blocking_aio || writes_blocked()) { queue(req); } else { - c->start_op(); process_io(req, false); finish_in_flight_io(); } @@ -533,7 +529,6 @@ void ImageRequestWQ::aio_compare_and_write(AioCompletion *c, if (m_image_ctx.non_blocking_aio || writes_blocked()) { queue(req); } else { - c->start_op(); process_io(req, false); finish_in_flight_io(); } @@ -572,15 +567,17 @@ void ImageRequestWQ::unblock_overlapping_io(uint64_t offset, uint64_t length, if (!m_blocked_ios.empty()) { auto it = m_blocked_ios.begin(); while (it != m_blocked_ios.end()) { - auto next_blocked_object_ios_it = it; - ++next_blocked_object_ios_it; auto blocked_io = *it; - if (block_overlapping_io(&m_in_flight_extents, offset, length)) { + const auto& extents = blocked_io->get_image_extents(); + uint64_t off = extents.size() ? extents.front().first : 0; + uint64_t len = extents.size() ? extents.front().second : 0; + + if (block_overlapping_io(&m_in_flight_extents, off, len)) { break; } - ldout(cct, 20) << "unblocking off: " << offset << ", " - << "len: " << length << dendl; + ldout(cct, 20) << "unblocking off: " << off << ", " + << "len: " << len << dendl; AioCompletion *aio_comp = blocked_io->get_aio_completion(); m_blocked_ios.erase(it); @@ -592,7 +589,7 @@ void ImageRequestWQ::unblock_overlapping_io(uint64_t offset, uint64_t length, } template -void ImageRequestWQ::unblock_flushes(uint64_t tid) { +void ImageRequestWQ::unblock_flushes() { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl; std::unique_lock locker{m_lock}; @@ -866,7 +863,7 @@ void *ImageRequestWQ::_void_dequeue() { return nullptr; } - if (!lock_required && !refresh_required) { + if (!lock_required && !refresh_required && !peek_item->blocked) { // completed ops will requeue the IO -- don't count it as in-progress m_in_flight_writes++; } @@ -922,7 +919,6 @@ void *ImageRequestWQ::_void_dequeue() { return nullptr; } - item->start_op(); return item; } @@ -938,24 +934,23 @@ void ImageRequestWQ::process_io(ImageDispatchSpec *req, const auto& extents = req->get_image_extents(); bool write_op = req->is_write_op(); uint64_t tid = req->get_tid(); - uint64_t offset = 0; - uint64_t length = 0; + uint64_t offset = extents.size() ? extents.front().first : 0; + uint64_t length = extents.size() ? extents.front().second : 0; - if (write_op) { + if (write_op && !req->blocked) { std::lock_guard locker{m_lock}; - offset = extents.size() ? extents.front().first : 0; - length = extents.size() ? extents.front().second : 0; bool blocked = block_overlapping_io(&m_in_flight_extents, offset, length); if (blocked) { ldout(cct, 20) << "blocking overlapping IO: " << "ictx=" << &m_image_ctx << ", " << "off=" << offset << ", len=" << length << dendl; + req->blocked = true; m_blocked_ios.push_back(req); - --m_in_flight_ios; return; } } + req->start_op(); req->send(); if (write_op) { @@ -963,7 +958,7 @@ void ImageRequestWQ::process_io(ImageDispatchSpec *req, finish_in_flight_write(); } unblock_overlapping_io(offset, length, tid); - unblock_flushes(tid); + unblock_flushes(); } delete req; } @@ -994,7 +989,7 @@ void ImageRequestWQ::remove_in_flight_write_ios(uint64_t offset, uint64_t len if(!m_in_flight_extents.empty()) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << "erasing in flight extents with tid:" - << tid << dendl; + << tid << ", offset: " << offset << dendl; ImageExtentIntervals extents; extents.insert(offset, length); ImageExtentIntervals intersect; diff --git a/src/librbd/io/ImageRequestWQ.h b/src/librbd/io/ImageRequestWQ.h index ffb483bec80..ecbf33f3d20 100644 --- a/src/librbd/io/ImageRequestWQ.h +++ b/src/librbd/io/ImageRequestWQ.h @@ -141,7 +141,7 @@ private: bool write_op, uint64_t tid); void finish_in_flight_write(); - void unblock_flushes(uint64_t tid); + void unblock_flushes(); bool block_overlapping_io(ImageExtentIntervals* in_flight_image_extents, uint64_t object_off, uint64_t object_len); void unblock_overlapping_io(uint64_t offset, uint64_t length, uint64_t tid); diff --git a/src/test/librbd/io/test_mock_ImageRequestWQ.cc b/src/test/librbd/io/test_mock_ImageRequestWQ.cc index e7f0acdb3c4..8ebea73cf85 100644 --- a/src/test/librbd/io/test_mock_ImageRequestWQ.cc +++ b/src/test/librbd/io/test_mock_ImageRequestWQ.cc @@ -40,6 +40,7 @@ template <> struct ImageDispatchSpec { static ImageDispatchSpec* s_instance; AioCompletion *aio_comp = nullptr; + bool blocked = false; static ImageDispatchSpec* create_write_request( librbd::MockTestImageCtx &image_ctx, AioCompletion *aio_comp, @@ -421,7 +422,6 @@ TEST_F(TestMockIoImageRequestWQ, QosNoLimit) { expect_is_refresh_request(mock_image_ctx, false); expect_is_write_op(mock_queued_image_request, true); expect_dequeue(mock_image_request_wq, &mock_queued_image_request); - expect_start_op(mock_queued_image_request); ASSERT_TRUE(mock_image_request_wq.invoke_dequeue() == &mock_queued_image_request); } diff --git a/src/test/rbd_mirror/test_Instances.cc b/src/test/rbd_mirror/test_Instances.cc index c4e8bd30437..4b189d903d2 100644 --- a/src/test/rbd_mirror/test_Instances.cc +++ b/src/test/rbd_mirror/test_Instances.cc @@ -125,12 +125,11 @@ TEST_F(TestInstances, NotifyRemove) ASSERT_EQ(0, librbd::cls_client::mirror_instances_add(&m_local_io_ctx, instance_id1)); - C_SaferCond on_init; instances.init(&on_init); ASSERT_EQ(0, on_init.wait()); - instances.acked({instance_id1, instance_id2}); + instances.acked({instance_id2}); ASSERT_LT(0U, m_listener.add.count); instances.unblock_listener(); diff --git a/src/tools/rbd_mirror/Instances.cc b/src/tools/rbd_mirror/Instances.cc index 47c7de2a7ef..c3643af2c08 100644 --- a/src/tools/rbd_mirror/Instances.cc +++ b/src/tools/rbd_mirror/Instances.cc @@ -130,7 +130,7 @@ void Instances::handle_acked(const InstanceIds& instance_ids) { template void Instances::notify_instances_added(const InstanceIds& instance_ids) { - std::lock_guard locker{m_lock}; + std::unique_lock locker{m_lock}; InstanceIds added_instance_ids; for (auto& instance_id : instance_ids) { auto it = m_instances.find(instance_id); @@ -144,9 +144,9 @@ void Instances::notify_instances_added(const InstanceIds& instance_ids) { } dout(5) << "instance_ids=" << added_instance_ids << dendl; - m_lock.unlock(); + locker.unlock(); m_listener.handle_added(added_instance_ids); - m_lock.lock(); + locker.lock(); for (auto& instance_id : added_instance_ids) { auto it = m_instances.find(instance_id);