From 081d28ae7ca46fd1f40034cc558def77a95a9294 Mon Sep 17 00:00:00 2001 From: Mahati Chamarthy Date: Mon, 16 Sep 2019 14:30:52 +0530 Subject: [PATCH] librbd: implement ordering for overlapping IOs ..and block flushes until previous writes are completed Signed-off-by: Mahati Chamarthy --- src/librbd/io/ImageDispatchSpec.cc | 10 + src/librbd/io/ImageDispatchSpec.h | 33 ++- src/librbd/io/ImageRequestWQ.cc | 266 ++++++++++++++++-- src/librbd/io/ImageRequestWQ.h | 23 +- .../librbd/io/test_mock_ImageRequestWQ.cc | 5 +- 5 files changed, 291 insertions(+), 46 deletions(-) diff --git a/src/librbd/io/ImageDispatchSpec.cc b/src/librbd/io/ImageDispatchSpec.cc index f33b8ef6aed..a787c57f49e 100644 --- a/src/librbd/io/ImageDispatchSpec.cc +++ b/src/librbd/io/ImageDispatchSpec.cc @@ -131,6 +131,16 @@ uint64_t ImageDispatchSpec::extents_length() { return length; } +template +const Extents& ImageDispatchSpec::get_image_extents() const { + return this->m_image_extents; +} + +template +uint64_t ImageDispatchSpec::get_tid() { + return this->m_tid; +} + template bool ImageDispatchSpec::is_write_op() const { return boost::apply_visitor(IsWriteOpVisitor(), m_request); diff --git a/src/librbd/io/ImageDispatchSpec.h b/src/librbd/io/ImageDispatchSpec.h index 60f1ea5bb80..353cdd16f98 100644 --- a/src/librbd/io/ImageDispatchSpec.h +++ b/src/librbd/io/ImageDispatchSpec.h @@ -76,49 +76,49 @@ public: return new ImageDispatchSpec(image_ctx, aio_comp, std::move(image_extents), Read{std::move(read_result)}, - op_flags, parent_trace); + op_flags, parent_trace, 0); } static ImageDispatchSpec* create_discard_request( ImageCtxT &image_ctx, AioCompletion *aio_comp, uint64_t off, uint64_t len, - uint32_t discard_granularity_bytes, const ZTracer::Trace &parent_trace) { + uint32_t discard_granularity_bytes, const ZTracer::Trace &parent_trace, uint64_t tid) { return new ImageDispatchSpec(image_ctx, aio_comp, {{off, len}}, Discard{discard_granularity_bytes}, - 0, parent_trace); + 0, parent_trace, tid); } static ImageDispatchSpec* create_write_request( ImageCtxT &image_ctx, AioCompletion *aio_comp, Extents &&image_extents, - bufferlist &&bl, int op_flags, const ZTracer::Trace &parent_trace) { + bufferlist &&bl, int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid) { return new ImageDispatchSpec(image_ctx, aio_comp, std::move(image_extents), - Write{std::move(bl)}, op_flags, parent_trace); + Write{std::move(bl)}, op_flags, parent_trace, tid); } static ImageDispatchSpec* create_write_same_request( ImageCtxT &image_ctx, AioCompletion *aio_comp, uint64_t off, uint64_t len, - bufferlist &&bl, int op_flags, const ZTracer::Trace &parent_trace) { + bufferlist &&bl, int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid) { return new ImageDispatchSpec(image_ctx, aio_comp, {{off, len}}, WriteSame{std::move(bl)}, op_flags, - parent_trace); + parent_trace, tid); } static ImageDispatchSpec* create_compare_and_write_request( ImageCtxT &image_ctx, AioCompletion *aio_comp, Extents &&image_extents, bufferlist &&cmp_bl, bufferlist &&bl, uint64_t *mismatch_offset, - int op_flags, const ZTracer::Trace &parent_trace) { + int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid) { return new ImageDispatchSpec(image_ctx, aio_comp, std::move(image_extents), CompareAndWrite{std::move(cmp_bl), std::move(bl), mismatch_offset}, - op_flags, parent_trace); + op_flags, parent_trace, tid); } static ImageDispatchSpec* create_flush_request( ImageCtxT &image_ctx, AioCompletion *aio_comp, FlushSource flush_source, const ZTracer::Trace &parent_trace) { return new ImageDispatchSpec(image_ctx, aio_comp, {}, Flush{flush_source}, - 0, parent_trace); + 0, parent_trace, 0); } ~ImageDispatchSpec() { @@ -146,6 +146,14 @@ public: return (m_throttled_flag & RBD_QOS_MASK) == RBD_QOS_MASK; } + const Extents& get_image_extents() const; + + AioCompletion* get_aio_completion() const { + return m_aio_comp; + } + + uint64_t get_tid(); + private: typedef boost::variantget(); } @@ -173,6 +181,7 @@ private: Request m_request; int m_op_flags; ZTracer::Trace m_parent_trace; + uint64_t m_tid; std::atomic m_throttled_flag = 0; uint64_t extents_length(); diff --git a/src/librbd/io/ImageRequestWQ.cc b/src/librbd/io/ImageRequestWQ.cc index a3a70be4cb9..1836eb29733 100644 --- a/src/librbd/io/ImageRequestWQ.cc +++ b/src/librbd/io/ImageRequestWQ.cc @@ -324,14 +324,22 @@ void ImageRequestWQ::aio_write(AioCompletion *c, uint64_t off, uint64_t len, return; } + auto tid = ++m_last_tid; + + { + std::lock_guard locker{m_lock}; + m_queued_or_blocked_io_tids.insert(tid); + } + + ImageDispatchSpec *req = ImageDispatchSpec::create_write_request( + m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace, tid); + std::shared_lock owner_locker{m_image_ctx.owner_lock}; if (m_image_ctx.non_blocking_aio || writes_blocked()) { - queue(ImageDispatchSpec::create_write_request( - m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace)); + queue(req); } else { c->start_op(); - ImageRequest::aio_write(&m_image_ctx, c, {{off, len}}, - std::move(bl), op_flags, trace); + process_io(req, false); finish_in_flight_io(); } trace.event("finish"); @@ -363,14 +371,22 @@ void ImageRequestWQ::aio_discard(AioCompletion *c, uint64_t off, return; } + auto tid = ++m_last_tid; + + { + std::lock_guard locker{m_lock}; + m_queued_or_blocked_io_tids.insert(tid); + } + + ImageDispatchSpec *req = ImageDispatchSpec::create_discard_request( + m_image_ctx, c, off, len, discard_granularity_bytes, trace, tid); + std::shared_lock owner_locker{m_image_ctx.owner_lock}; if (m_image_ctx.non_blocking_aio || writes_blocked()) { - queue(ImageDispatchSpec::create_discard_request( - m_image_ctx, c, off, len, discard_granularity_bytes, trace)); + queue(req); } else { c->start_op(); - ImageRequest::aio_discard(&m_image_ctx, c, {{off, len}}, - discard_granularity_bytes, trace); + process_io(req, false); finish_in_flight_io(); } trace.event("finish"); @@ -398,13 +414,27 @@ void ImageRequestWQ::aio_flush(AioCompletion *c, bool native_async) { return; } + auto tid = ++m_last_tid; + + ImageDispatchSpec *req = ImageDispatchSpec::create_flush_request( + m_image_ctx, c, FLUSH_SOURCE_USER, trace); + + { + std::lock_guard locker{m_lock}; + if(!m_queued_or_blocked_io_tids.empty()) { + ldout(cct, 20) << "queueing flush, tid: " << tid << dendl; + m_queued_flushes.emplace(tid, req); + --m_in_flight_ios; + return; + } + } + std::shared_lock owner_locker{m_image_ctx.owner_lock}; if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) { - queue(ImageDispatchSpec::create_flush_request( - m_image_ctx, c, FLUSH_SOURCE_USER, trace)); + queue(req); } else { c->start_op(); - ImageRequest::aio_flush(&m_image_ctx, c, FLUSH_SOURCE_USER, trace); + process_io(req, false); finish_in_flight_io(); } trace.event("finish"); @@ -436,14 +466,22 @@ void ImageRequestWQ::aio_writesame(AioCompletion *c, uint64_t off, return; } + auto tid = ++m_last_tid; + + { + std::lock_guard locker{m_lock}; + m_queued_or_blocked_io_tids.insert(tid); + } + + ImageDispatchSpec *req = ImageDispatchSpec::create_write_same_request( + m_image_ctx, c, off, len, std::move(bl), op_flags, trace, tid); + std::shared_lock owner_locker{m_image_ctx.owner_lock}; if (m_image_ctx.non_blocking_aio || writes_blocked()) { - queue(ImageDispatchSpec::create_write_same_request( - m_image_ctx, c, off, len, std::move(bl), op_flags, trace)); + queue(req); } else { c->start_op(); - ImageRequest::aio_writesame(&m_image_ctx, c, {{off, len}}, std::move(bl), - op_flags, trace); + process_io(req, false); finish_in_flight_io(); } trace.event("finish"); @@ -477,21 +515,116 @@ void ImageRequestWQ::aio_compare_and_write(AioCompletion *c, return; } + auto tid = ++m_last_tid; + + { + std::lock_guard locker{m_lock}; + m_queued_or_blocked_io_tids.insert(tid); + } + + ImageDispatchSpec *req = ImageDispatchSpec::create_compare_and_write_request( + m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl), + mismatch_off, op_flags, trace, tid); + std::shared_lock owner_locker{m_image_ctx.owner_lock}; if (m_image_ctx.non_blocking_aio || writes_blocked()) { - queue(ImageDispatchSpec::create_compare_and_write_request( - m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl), - mismatch_off, op_flags, trace)); + queue(req); } else { c->start_op(); - ImageRequest::aio_compare_and_write(&m_image_ctx, c, {{off, len}}, - std::move(cmp_bl), std::move(bl), - mismatch_off, op_flags, trace); + process_io(req, false); finish_in_flight_io(); } trace.event("finish"); } +template +bool ImageRequestWQ::block_overlapping_io( + ImageExtentIntervals* in_flight_image_extents, uint64_t off, uint64_t len) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "ictx=" << &m_image_ctx + << "off: " << off << " len: " << len <empty() || + !in_flight_image_extents->intersects(off, len)) { + in_flight_image_extents->insert(off, len); + return false; + } + + return true; +} + +template +void ImageRequestWQ::unblock_overlapping_io(uint64_t offset, uint64_t length, + uint64_t tid) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl; + + remove_in_flight_write_ios(offset, length, true, tid); + + std::unique_lock locker{m_lock}; + if (!m_blocked_ios.empty()) { + auto it = m_blocked_ios.begin(); + while (it != m_blocked_ios.end()) { + auto next_blocked_object_ios_it = it; + ++next_blocked_object_ios_it; + auto blocked_io = *it; + + if (block_overlapping_io(&m_in_flight_extents, offset, length)) { + break; + } + ldout(cct, 20) << "unblocking off: " << offset << ", " + << "len: " << length << dendl; + AioCompletion *aio_comp = blocked_io->get_aio_completion(); + + m_blocked_ios.erase(it); + locker.unlock(); + queue_unblocked_io(aio_comp, blocked_io); + locker.lock(); + } + } +} + +template +void ImageRequestWQ::unblock_flushes(uint64_t tid) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl; + std::unique_lock locker{m_lock}; + auto io_tid_it = m_queued_or_blocked_io_tids.begin(); + while (true) { + auto it = m_queued_flushes.begin(); + if (it == m_queued_flushes.end() || + (io_tid_it != m_queued_or_blocked_io_tids.end() && + *io_tid_it < it->first)) { + break; + } + + auto blocked_flush = *it; + ldout(cct, 20) << "unblocking flush: tid " << blocked_flush.first << dendl; + + AioCompletion *aio_comp = blocked_flush.second->get_aio_completion(); + + m_queued_flushes.erase(it); + locker.unlock(); + queue_unblocked_io(aio_comp, blocked_flush.second); + locker.lock(); + } +} + +template +void ImageRequestWQ::queue_unblocked_io(AioCompletion *comp, + ImageDispatchSpec *req) { + if (!start_in_flight_io(comp)) { + return; + } + + std::shared_lock owner_locker{m_image_ctx.owner_lock}; + queue(req); +} + template void ImageRequestWQ::shut_down(Context *on_shutdown) { ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock)); @@ -786,27 +919,91 @@ void *ImageRequestWQ::_void_dequeue() { return item; } +template +void ImageRequestWQ::process_io(ImageDispatchSpec *req, + bool non_blocking_io) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " + << "req=" << req << dendl; + + //extents are invalidated after the request is sent + //so gather them ahead of that + const auto& extents = req->get_image_extents(); + bool write_op = req->is_write_op(); + uint64_t tid = req->get_tid(); + uint64_t offset; + uint64_t length; + + if (write_op) { + std::lock_guard locker{m_lock}; + offset = extents.size() ? extents.front().first : 0; + length = extents.size() ? extents.front().second : 0; + bool blocked = block_overlapping_io(&m_in_flight_extents, offset, length); + if (blocked) { + ldout(cct, 20) << "blocking overlapping IO: " << "ictx=" + << &m_image_ctx << ", " + << "off=" << offset << ", len=" << length << dendl; + m_blocked_ios.push_back(req); + --m_in_flight_ios; + return; + } + } + + req->send(); + + if (write_op) { + if (non_blocking_io) { + finish_in_flight_write(); + } + unblock_overlapping_io(offset, length, tid); + unblock_flushes(tid); + } + delete req; +} + template void ImageRequestWQ::process(ImageDispatchSpec *req) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " << "req=" << req << dendl; - req->send(); + bool write_op = req->is_write_op(); - finish_queued_io(req); - if (req->is_write_op()) { - finish_in_flight_write(); - } - delete req; + process_io(req, true); + finish_queued_io(write_op); finish_in_flight_io(); } template -void ImageRequestWQ::finish_queued_io(ImageDispatchSpec *req) { +void ImageRequestWQ::remove_in_flight_write_ios(uint64_t offset, uint64_t length, + bool write_op, uint64_t tid) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl; + { + std::lock_guard locker{m_lock}; + if (write_op) { + if (length > 0) { + if(!m_in_flight_extents.empty()) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "erasing in flight extents with tid:" + << tid << dendl; + ImageExtentIntervals extents; + extents.insert(offset, length); + ImageExtentIntervals intersect; + intersect.intersection_of(extents, m_in_flight_extents); + m_in_flight_extents.subtract(intersect); + } + } + m_queued_or_blocked_io_tids.erase(tid); + } + } +} + +template +void ImageRequestWQ::finish_queued_io(bool write_op) { std::shared_lock locker{m_lock}; - if (req->is_write_op()) { + if (write_op) { ceph_assert(m_queued_writes > 0); m_queued_writes--; } else { @@ -826,7 +1023,6 @@ void ImageRequestWQ::finish_in_flight_write() { writes_blocked = true; } } - if (writes_blocked) { flush_image(m_image_ctx, new C_BlockedWrites(this)); } @@ -879,7 +1075,15 @@ void ImageRequestWQ::fail_in_flight_io( int r, ImageDispatchSpec *req) { this->process_finish(); req->fail(r); - finish_queued_io(req); + + bool write_op = req->is_write_op(); + uint64_t tid = req->get_tid(); + const auto& extents = req->get_image_extents(); + uint64_t offset = extents.size() ? extents.front().first : 0; + uint64_t length = extents.size() ? extents.front().second : 0; + + finish_queued_io(write_op); + remove_in_flight_write_ios(offset, length, write_op, tid); delete req; finish_in_flight_io(); } diff --git a/src/librbd/io/ImageRequestWQ.h b/src/librbd/io/ImageRequestWQ.h index daa596330d8..ffb483bec80 100644 --- a/src/librbd/io/ImageRequestWQ.h +++ b/src/librbd/io/ImageRequestWQ.h @@ -9,9 +9,10 @@ #include "common/Throttle.h" #include "common/WorkQueue.h" #include "librbd/io/Types.h" - +#include "include/interval_set.h" #include #include +#include namespace librbd { @@ -77,6 +78,7 @@ public: void apply_qos_schedule_tick_min(uint64_t tick); void apply_qos_limit(const uint64_t flag, uint64_t limit, uint64_t burst); + protected: void *_void_dequeue() override; void process(ImageDispatchSpec *req) override; @@ -107,6 +109,14 @@ private: std::atomic m_io_blockers { 0 }; std::atomic m_io_throttled { 0 }; + typedef interval_set ImageExtentIntervals; + ImageExtentIntervals m_in_flight_extents; + + std::vector*> m_blocked_ios; + std::atomic m_last_tid { 0 }; + std::set m_queued_or_blocked_io_tids; + std::map*> m_queued_flushes; + std::list > m_throttles; uint64_t m_qos_enabled_flag = 0; @@ -126,14 +136,23 @@ private: bool needs_throttle(ImageDispatchSpec *item); - void finish_queued_io(ImageDispatchSpec *req); + void finish_queued_io(bool write_op); + void remove_in_flight_write_ios(uint64_t offset, uint64_t length, + bool write_op, uint64_t tid); void finish_in_flight_write(); + void unblock_flushes(uint64_t tid); + bool block_overlapping_io(ImageExtentIntervals* in_flight_image_extents, + uint64_t object_off, uint64_t object_len); + void unblock_overlapping_io(uint64_t offset, uint64_t length, uint64_t tid); int start_in_flight_io(AioCompletion *c); void finish_in_flight_io(); void fail_in_flight_io(int r, ImageDispatchSpec *req); + void process_io(ImageDispatchSpec *req, bool non_blocking_io); void queue(ImageDispatchSpec *req); + void queue_unblocked_io(AioCompletion *comp, + ImageDispatchSpec *req); void handle_acquire_lock(int r, ImageDispatchSpec *req); void handle_refreshed(int r, ImageDispatchSpec *req); diff --git a/src/test/librbd/io/test_mock_ImageRequestWQ.cc b/src/test/librbd/io/test_mock_ImageRequestWQ.cc index c2e86b10434..c5478b6be7f 100644 --- a/src/test/librbd/io/test_mock_ImageRequestWQ.cc +++ b/src/test/librbd/io/test_mock_ImageRequestWQ.cc @@ -44,7 +44,7 @@ struct ImageDispatchSpec { static ImageDispatchSpec* create_write_request( librbd::MockTestImageCtx &image_ctx, AioCompletion *aio_comp, Extents &&image_extents, bufferlist &&bl, int op_flags, - const ZTracer::Trace &parent_trace) { + const ZTracer::Trace &parent_trace, uint64_t tid) { ceph_assert(s_instance != nullptr); s_instance->aio_comp = aio_comp; return s_instance; @@ -66,6 +66,9 @@ struct ImageDispatchSpec { MOCK_CONST_METHOD0(were_all_throttled, bool()); MOCK_CONST_METHOD1(set_throttled, void(uint64_t)); MOCK_CONST_METHOD2(tokens_requested, bool(uint64_t, uint64_t *)); + MOCK_CONST_METHOD0(get_image_extents, Extents()); + MOCK_CONST_METHOD0(get_aio_completion, AioCompletion*()); + MOCK_CONST_METHOD0(get_tid, uint64_t()); ImageDispatchSpec() { s_instance = this;