librbd: implement ordering for overlapping IOs

..and block flushes until previous writes are completed

Signed-off-by: Mahati Chamarthy <mahati.chamarthy@intel.com>
This commit is contained in:
Mahati Chamarthy 2019-09-16 14:30:52 +05:30 committed by Jason Dillaman
parent 7eb7857a86
commit 081d28ae7c
5 changed files with 291 additions and 46 deletions

View File

@ -131,6 +131,16 @@ uint64_t ImageDispatchSpec<I>::extents_length() {
return length;
}
template <typename I>
const Extents& ImageDispatchSpec<I>::get_image_extents() const {
return this->m_image_extents;
}
template <typename I>
uint64_t ImageDispatchSpec<I>::get_tid() {
return this->m_tid;
}
template <typename I>
bool ImageDispatchSpec<I>::is_write_op() const {
return boost::apply_visitor(IsWriteOpVisitor(), m_request);

View File

@ -76,49 +76,49 @@ public:
return new ImageDispatchSpec(image_ctx, aio_comp,
std::move(image_extents),
Read{std::move(read_result)},
op_flags, parent_trace);
op_flags, parent_trace, 0);
}
static ImageDispatchSpec* create_discard_request(
ImageCtxT &image_ctx, AioCompletion *aio_comp, uint64_t off, uint64_t len,
uint32_t discard_granularity_bytes, const ZTracer::Trace &parent_trace) {
uint32_t discard_granularity_bytes, const ZTracer::Trace &parent_trace, uint64_t tid) {
return new ImageDispatchSpec(image_ctx, aio_comp, {{off, len}},
Discard{discard_granularity_bytes},
0, parent_trace);
0, parent_trace, tid);
}
static ImageDispatchSpec* create_write_request(
ImageCtxT &image_ctx, AioCompletion *aio_comp, Extents &&image_extents,
bufferlist &&bl, int op_flags, const ZTracer::Trace &parent_trace) {
bufferlist &&bl, int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid) {
return new ImageDispatchSpec(image_ctx, aio_comp, std::move(image_extents),
Write{std::move(bl)}, op_flags, parent_trace);
Write{std::move(bl)}, op_flags, parent_trace, tid);
}
static ImageDispatchSpec* create_write_same_request(
ImageCtxT &image_ctx, AioCompletion *aio_comp, uint64_t off, uint64_t len,
bufferlist &&bl, int op_flags, const ZTracer::Trace &parent_trace) {
bufferlist &&bl, int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid) {
return new ImageDispatchSpec(image_ctx, aio_comp, {{off, len}},
WriteSame{std::move(bl)}, op_flags,
parent_trace);
parent_trace, tid);
}
static ImageDispatchSpec* create_compare_and_write_request(
ImageCtxT &image_ctx, AioCompletion *aio_comp, Extents &&image_extents,
bufferlist &&cmp_bl, bufferlist &&bl, uint64_t *mismatch_offset,
int op_flags, const ZTracer::Trace &parent_trace) {
int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid) {
return new ImageDispatchSpec(image_ctx, aio_comp,
std::move(image_extents),
CompareAndWrite{std::move(cmp_bl),
std::move(bl),
mismatch_offset},
op_flags, parent_trace);
op_flags, parent_trace, tid);
}
static ImageDispatchSpec* create_flush_request(
ImageCtxT &image_ctx, AioCompletion *aio_comp,
FlushSource flush_source, const ZTracer::Trace &parent_trace) {
return new ImageDispatchSpec(image_ctx, aio_comp, {}, Flush{flush_source},
0, parent_trace);
0, parent_trace, 0);
}
~ImageDispatchSpec() {
@ -146,6 +146,14 @@ public:
return (m_throttled_flag & RBD_QOS_MASK) == RBD_QOS_MASK;
}
const Extents& get_image_extents() const;
AioCompletion* get_aio_completion() const {
return m_aio_comp;
}
uint64_t get_tid();
private:
typedef boost::variant<Read,
Discard,
@ -160,10 +168,10 @@ private:
ImageDispatchSpec(ImageCtxT& image_ctx, AioCompletion* aio_comp,
Extents&& image_extents, Request&& request,
int op_flags, const ZTracer::Trace& parent_trace)
int op_flags, const ZTracer::Trace& parent_trace, uint64_t tid)
: m_image_ctx(image_ctx), m_aio_comp(aio_comp),
m_image_extents(std::move(image_extents)), m_request(std::move(request)),
m_op_flags(op_flags), m_parent_trace(parent_trace) {
m_op_flags(op_flags), m_parent_trace(parent_trace), m_tid(tid) {
m_aio_comp->get();
}
@ -173,6 +181,7 @@ private:
Request m_request;
int m_op_flags;
ZTracer::Trace m_parent_trace;
uint64_t m_tid;
std::atomic<uint64_t> m_throttled_flag = 0;
uint64_t extents_length();

View File

@ -324,14 +324,22 @@ void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
return;
}
auto tid = ++m_last_tid;
{
std::lock_guard locker{m_lock};
m_queued_or_blocked_io_tids.insert(tid);
}
ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_write_request(
m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace, tid);
std::shared_lock owner_locker{m_image_ctx.owner_lock};
if (m_image_ctx.non_blocking_aio || writes_blocked()) {
queue(ImageDispatchSpec<I>::create_write_request(
m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace));
queue(req);
} else {
c->start_op();
ImageRequest<I>::aio_write(&m_image_ctx, c, {{off, len}},
std::move(bl), op_flags, trace);
process_io(req, false);
finish_in_flight_io();
}
trace.event("finish");
@ -363,14 +371,22 @@ void ImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off,
return;
}
auto tid = ++m_last_tid;
{
std::lock_guard locker{m_lock};
m_queued_or_blocked_io_tids.insert(tid);
}
ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_discard_request(
m_image_ctx, c, off, len, discard_granularity_bytes, trace, tid);
std::shared_lock owner_locker{m_image_ctx.owner_lock};
if (m_image_ctx.non_blocking_aio || writes_blocked()) {
queue(ImageDispatchSpec<I>::create_discard_request(
m_image_ctx, c, off, len, discard_granularity_bytes, trace));
queue(req);
} else {
c->start_op();
ImageRequest<I>::aio_discard(&m_image_ctx, c, {{off, len}},
discard_granularity_bytes, trace);
process_io(req, false);
finish_in_flight_io();
}
trace.event("finish");
@ -398,13 +414,27 @@ void ImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) {
return;
}
auto tid = ++m_last_tid;
ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_flush_request(
m_image_ctx, c, FLUSH_SOURCE_USER, trace);
{
std::lock_guard locker{m_lock};
if(!m_queued_or_blocked_io_tids.empty()) {
ldout(cct, 20) << "queueing flush, tid: " << tid << dendl;
m_queued_flushes.emplace(tid, req);
--m_in_flight_ios;
return;
}
}
std::shared_lock owner_locker{m_image_ctx.owner_lock};
if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
queue(ImageDispatchSpec<I>::create_flush_request(
m_image_ctx, c, FLUSH_SOURCE_USER, trace));
queue(req);
} else {
c->start_op();
ImageRequest<I>::aio_flush(&m_image_ctx, c, FLUSH_SOURCE_USER, trace);
process_io(req, false);
finish_in_flight_io();
}
trace.event("finish");
@ -436,14 +466,22 @@ void ImageRequestWQ<I>::aio_writesame(AioCompletion *c, uint64_t off,
return;
}
auto tid = ++m_last_tid;
{
std::lock_guard locker{m_lock};
m_queued_or_blocked_io_tids.insert(tid);
}
ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_write_same_request(
m_image_ctx, c, off, len, std::move(bl), op_flags, trace, tid);
std::shared_lock owner_locker{m_image_ctx.owner_lock};
if (m_image_ctx.non_blocking_aio || writes_blocked()) {
queue(ImageDispatchSpec<I>::create_write_same_request(
m_image_ctx, c, off, len, std::move(bl), op_flags, trace));
queue(req);
} else {
c->start_op();
ImageRequest<I>::aio_writesame(&m_image_ctx, c, {{off, len}}, std::move(bl),
op_flags, trace);
process_io(req, false);
finish_in_flight_io();
}
trace.event("finish");
@ -477,21 +515,116 @@ void ImageRequestWQ<I>::aio_compare_and_write(AioCompletion *c,
return;
}
auto tid = ++m_last_tid;
{
std::lock_guard locker{m_lock};
m_queued_or_blocked_io_tids.insert(tid);
}
ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_compare_and_write_request(
m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl),
mismatch_off, op_flags, trace, tid);
std::shared_lock owner_locker{m_image_ctx.owner_lock};
if (m_image_ctx.non_blocking_aio || writes_blocked()) {
queue(ImageDispatchSpec<I>::create_compare_and_write_request(
m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl),
mismatch_off, op_flags, trace));
queue(req);
} else {
c->start_op();
ImageRequest<I>::aio_compare_and_write(&m_image_ctx, c, {{off, len}},
std::move(cmp_bl), std::move(bl),
mismatch_off, op_flags, trace);
process_io(req, false);
finish_in_flight_io();
}
trace.event("finish");
}
template <typename I>
bool ImageRequestWQ<I>::block_overlapping_io(
ImageExtentIntervals* in_flight_image_extents, uint64_t off, uint64_t len) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "ictx=" << &m_image_ctx
<< "off: " << off << " len: " << len <<dendl;
if(len == 0) {
return false;
}
if (in_flight_image_extents->empty() ||
!in_flight_image_extents->intersects(off, len)) {
in_flight_image_extents->insert(off, len);
return false;
}
return true;
}
template <typename I>
void ImageRequestWQ<I>::unblock_overlapping_io(uint64_t offset, uint64_t length,
uint64_t tid) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
remove_in_flight_write_ios(offset, length, true, tid);
std::unique_lock locker{m_lock};
if (!m_blocked_ios.empty()) {
auto it = m_blocked_ios.begin();
while (it != m_blocked_ios.end()) {
auto next_blocked_object_ios_it = it;
++next_blocked_object_ios_it;
auto blocked_io = *it;
if (block_overlapping_io(&m_in_flight_extents, offset, length)) {
break;
}
ldout(cct, 20) << "unblocking off: " << offset << ", "
<< "len: " << length << dendl;
AioCompletion *aio_comp = blocked_io->get_aio_completion();
m_blocked_ios.erase(it);
locker.unlock();
queue_unblocked_io(aio_comp, blocked_io);
locker.lock();
}
}
}
template <typename I>
void ImageRequestWQ<I>::unblock_flushes(uint64_t tid) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
std::unique_lock locker{m_lock};
auto io_tid_it = m_queued_or_blocked_io_tids.begin();
while (true) {
auto it = m_queued_flushes.begin();
if (it == m_queued_flushes.end() ||
(io_tid_it != m_queued_or_blocked_io_tids.end() &&
*io_tid_it < it->first)) {
break;
}
auto blocked_flush = *it;
ldout(cct, 20) << "unblocking flush: tid " << blocked_flush.first << dendl;
AioCompletion *aio_comp = blocked_flush.second->get_aio_completion();
m_queued_flushes.erase(it);
locker.unlock();
queue_unblocked_io(aio_comp, blocked_flush.second);
locker.lock();
}
}
template <typename I>
void ImageRequestWQ<I>::queue_unblocked_io(AioCompletion *comp,
ImageDispatchSpec<I> *req) {
if (!start_in_flight_io(comp)) {
return;
}
std::shared_lock owner_locker{m_image_ctx.owner_lock};
queue(req);
}
template <typename I>
void ImageRequestWQ<I>::shut_down(Context *on_shutdown) {
ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
@ -786,27 +919,91 @@ void *ImageRequestWQ<I>::_void_dequeue() {
return item;
}
template <typename I>
void ImageRequestWQ<I>::process_io(ImageDispatchSpec<I> *req,
bool non_blocking_io) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
<< "req=" << req << dendl;
//extents are invalidated after the request is sent
//so gather them ahead of that
const auto& extents = req->get_image_extents();
bool write_op = req->is_write_op();
uint64_t tid = req->get_tid();
uint64_t offset;
uint64_t length;
if (write_op) {
std::lock_guard locker{m_lock};
offset = extents.size() ? extents.front().first : 0;
length = extents.size() ? extents.front().second : 0;
bool blocked = block_overlapping_io(&m_in_flight_extents, offset, length);
if (blocked) {
ldout(cct, 20) << "blocking overlapping IO: " << "ictx="
<< &m_image_ctx << ", "
<< "off=" << offset << ", len=" << length << dendl;
m_blocked_ios.push_back(req);
--m_in_flight_ios;
return;
}
}
req->send();
if (write_op) {
if (non_blocking_io) {
finish_in_flight_write();
}
unblock_overlapping_io(offset, length, tid);
unblock_flushes(tid);
}
delete req;
}
template <typename I>
void ImageRequestWQ<I>::process(ImageDispatchSpec<I> *req) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
<< "req=" << req << dendl;
req->send();
bool write_op = req->is_write_op();
finish_queued_io(req);
if (req->is_write_op()) {
finish_in_flight_write();
}
delete req;
process_io(req, true);
finish_queued_io(write_op);
finish_in_flight_io();
}
template <typename I>
void ImageRequestWQ<I>::finish_queued_io(ImageDispatchSpec<I> *req) {
void ImageRequestWQ<I>::remove_in_flight_write_ios(uint64_t offset, uint64_t length,
bool write_op, uint64_t tid) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
{
std::lock_guard locker{m_lock};
if (write_op) {
if (length > 0) {
if(!m_in_flight_extents.empty()) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "erasing in flight extents with tid:"
<< tid << dendl;
ImageExtentIntervals extents;
extents.insert(offset, length);
ImageExtentIntervals intersect;
intersect.intersection_of(extents, m_in_flight_extents);
m_in_flight_extents.subtract(intersect);
}
}
m_queued_or_blocked_io_tids.erase(tid);
}
}
}
template <typename I>
void ImageRequestWQ<I>::finish_queued_io(bool write_op) {
std::shared_lock locker{m_lock};
if (req->is_write_op()) {
if (write_op) {
ceph_assert(m_queued_writes > 0);
m_queued_writes--;
} else {
@ -826,7 +1023,6 @@ void ImageRequestWQ<I>::finish_in_flight_write() {
writes_blocked = true;
}
}
if (writes_blocked) {
flush_image(m_image_ctx, new C_BlockedWrites(this));
}
@ -879,7 +1075,15 @@ void ImageRequestWQ<I>::fail_in_flight_io(
int r, ImageDispatchSpec<I> *req) {
this->process_finish();
req->fail(r);
finish_queued_io(req);
bool write_op = req->is_write_op();
uint64_t tid = req->get_tid();
const auto& extents = req->get_image_extents();
uint64_t offset = extents.size() ? extents.front().first : 0;
uint64_t length = extents.size() ? extents.front().second : 0;
finish_queued_io(write_op);
remove_in_flight_write_ios(offset, length, write_op, tid);
delete req;
finish_in_flight_io();
}

View File

@ -9,9 +9,10 @@
#include "common/Throttle.h"
#include "common/WorkQueue.h"
#include "librbd/io/Types.h"
#include "include/interval_set.h"
#include <list>
#include <atomic>
#include <vector>
namespace librbd {
@ -77,6 +78,7 @@ public:
void apply_qos_schedule_tick_min(uint64_t tick);
void apply_qos_limit(const uint64_t flag, uint64_t limit, uint64_t burst);
protected:
void *_void_dequeue() override;
void process(ImageDispatchSpec<ImageCtxT> *req) override;
@ -107,6 +109,14 @@ private:
std::atomic<unsigned> m_io_blockers { 0 };
std::atomic<unsigned> m_io_throttled { 0 };
typedef interval_set<uint64_t> ImageExtentIntervals;
ImageExtentIntervals m_in_flight_extents;
std::vector<ImageDispatchSpec<ImageCtxT>*> m_blocked_ios;
std::atomic<unsigned> m_last_tid { 0 };
std::set<uint64_t> m_queued_or_blocked_io_tids;
std::map<uint64_t, ImageDispatchSpec<ImageCtxT>*> m_queued_flushes;
std::list<std::pair<uint64_t, TokenBucketThrottle*> > m_throttles;
uint64_t m_qos_enabled_flag = 0;
@ -126,14 +136,23 @@ private:
bool needs_throttle(ImageDispatchSpec<ImageCtxT> *item);
void finish_queued_io(ImageDispatchSpec<ImageCtxT> *req);
void finish_queued_io(bool write_op);
void remove_in_flight_write_ios(uint64_t offset, uint64_t length,
bool write_op, uint64_t tid);
void finish_in_flight_write();
void unblock_flushes(uint64_t tid);
bool block_overlapping_io(ImageExtentIntervals* in_flight_image_extents,
uint64_t object_off, uint64_t object_len);
void unblock_overlapping_io(uint64_t offset, uint64_t length, uint64_t tid);
int start_in_flight_io(AioCompletion *c);
void finish_in_flight_io();
void fail_in_flight_io(int r, ImageDispatchSpec<ImageCtxT> *req);
void process_io(ImageDispatchSpec<ImageCtxT> *req, bool non_blocking_io);
void queue(ImageDispatchSpec<ImageCtxT> *req);
void queue_unblocked_io(AioCompletion *comp,
ImageDispatchSpec<ImageCtxT> *req);
void handle_acquire_lock(int r, ImageDispatchSpec<ImageCtxT> *req);
void handle_refreshed(int r, ImageDispatchSpec<ImageCtxT> *req);

View File

@ -44,7 +44,7 @@ struct ImageDispatchSpec<librbd::MockTestImageCtx> {
static ImageDispatchSpec* create_write_request(
librbd::MockTestImageCtx &image_ctx, AioCompletion *aio_comp,
Extents &&image_extents, bufferlist &&bl, int op_flags,
const ZTracer::Trace &parent_trace) {
const ZTracer::Trace &parent_trace, uint64_t tid) {
ceph_assert(s_instance != nullptr);
s_instance->aio_comp = aio_comp;
return s_instance;
@ -66,6 +66,9 @@ struct ImageDispatchSpec<librbd::MockTestImageCtx> {
MOCK_CONST_METHOD0(were_all_throttled, bool());
MOCK_CONST_METHOD1(set_throttled, void(uint64_t));
MOCK_CONST_METHOD2(tokens_requested, bool(uint64_t, uint64_t *));
MOCK_CONST_METHOD0(get_image_extents, Extents());
MOCK_CONST_METHOD0(get_aio_completion, AioCompletion*());
MOCK_CONST_METHOD0(get_tid, uint64_t());
ImageDispatchSpec() {
s_instance = this;