mirror of
https://github.com/ceph/ceph
synced 2025-02-21 18:17:42 +00:00
Merge pull request #33563 from dillaman/wip-rbd-op-threads
librbd: fix potential race conditions Reviewed-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
commit
b9bf850e5d
@ -153,6 +153,7 @@ public:
|
||||
}
|
||||
|
||||
uint64_t get_tid();
|
||||
bool blocked = false;
|
||||
|
||||
private:
|
||||
typedef boost::variant<Read,
|
||||
|
@ -341,7 +341,6 @@ void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
|
||||
if (m_image_ctx.non_blocking_aio || writes_blocked()) {
|
||||
queue(req);
|
||||
} else {
|
||||
c->start_op();
|
||||
process_io(req, false);
|
||||
finish_in_flight_io();
|
||||
}
|
||||
@ -388,7 +387,6 @@ void ImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off,
|
||||
if (m_image_ctx.non_blocking_aio || writes_blocked()) {
|
||||
queue(req);
|
||||
} else {
|
||||
c->start_op();
|
||||
process_io(req, false);
|
||||
finish_in_flight_io();
|
||||
}
|
||||
@ -436,7 +434,6 @@ void ImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) {
|
||||
if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
|
||||
queue(req);
|
||||
} else {
|
||||
c->start_op();
|
||||
process_io(req, false);
|
||||
finish_in_flight_io();
|
||||
}
|
||||
@ -483,7 +480,6 @@ void ImageRequestWQ<I>::aio_writesame(AioCompletion *c, uint64_t off,
|
||||
if (m_image_ctx.non_blocking_aio || writes_blocked()) {
|
||||
queue(req);
|
||||
} else {
|
||||
c->start_op();
|
||||
process_io(req, false);
|
||||
finish_in_flight_io();
|
||||
}
|
||||
@ -533,7 +529,6 @@ void ImageRequestWQ<I>::aio_compare_and_write(AioCompletion *c,
|
||||
if (m_image_ctx.non_blocking_aio || writes_blocked()) {
|
||||
queue(req);
|
||||
} else {
|
||||
c->start_op();
|
||||
process_io(req, false);
|
||||
finish_in_flight_io();
|
||||
}
|
||||
@ -572,15 +567,17 @@ void ImageRequestWQ<I>::unblock_overlapping_io(uint64_t offset, uint64_t length,
|
||||
if (!m_blocked_ios.empty()) {
|
||||
auto it = m_blocked_ios.begin();
|
||||
while (it != m_blocked_ios.end()) {
|
||||
auto next_blocked_object_ios_it = it;
|
||||
++next_blocked_object_ios_it;
|
||||
auto blocked_io = *it;
|
||||
|
||||
if (block_overlapping_io(&m_in_flight_extents, offset, length)) {
|
||||
const auto& extents = blocked_io->get_image_extents();
|
||||
uint64_t off = extents.size() ? extents.front().first : 0;
|
||||
uint64_t len = extents.size() ? extents.front().second : 0;
|
||||
|
||||
if (block_overlapping_io(&m_in_flight_extents, off, len)) {
|
||||
break;
|
||||
}
|
||||
ldout(cct, 20) << "unblocking off: " << offset << ", "
|
||||
<< "len: " << length << dendl;
|
||||
ldout(cct, 20) << "unblocking off: " << off << ", "
|
||||
<< "len: " << len << dendl;
|
||||
AioCompletion *aio_comp = blocked_io->get_aio_completion();
|
||||
|
||||
m_blocked_ios.erase(it);
|
||||
@ -592,7 +589,7 @@ void ImageRequestWQ<I>::unblock_overlapping_io(uint64_t offset, uint64_t length,
|
||||
}
|
||||
|
||||
template <typename I>
|
||||
void ImageRequestWQ<I>::unblock_flushes(uint64_t tid) {
|
||||
void ImageRequestWQ<I>::unblock_flushes() {
|
||||
CephContext *cct = m_image_ctx.cct;
|
||||
ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
|
||||
std::unique_lock locker{m_lock};
|
||||
@ -866,7 +863,7 @@ void *ImageRequestWQ<I>::_void_dequeue() {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (!lock_required && !refresh_required) {
|
||||
if (!lock_required && !refresh_required && !peek_item->blocked) {
|
||||
// completed ops will requeue the IO -- don't count it as in-progress
|
||||
m_in_flight_writes++;
|
||||
}
|
||||
@ -922,7 +919,6 @@ void *ImageRequestWQ<I>::_void_dequeue() {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
item->start_op();
|
||||
return item;
|
||||
}
|
||||
|
||||
@ -938,24 +934,23 @@ void ImageRequestWQ<I>::process_io(ImageDispatchSpec<I> *req,
|
||||
const auto& extents = req->get_image_extents();
|
||||
bool write_op = req->is_write_op();
|
||||
uint64_t tid = req->get_tid();
|
||||
uint64_t offset = 0;
|
||||
uint64_t length = 0;
|
||||
uint64_t offset = extents.size() ? extents.front().first : 0;
|
||||
uint64_t length = extents.size() ? extents.front().second : 0;
|
||||
|
||||
if (write_op) {
|
||||
if (write_op && !req->blocked) {
|
||||
std::lock_guard locker{m_lock};
|
||||
offset = extents.size() ? extents.front().first : 0;
|
||||
length = extents.size() ? extents.front().second : 0;
|
||||
bool blocked = block_overlapping_io(&m_in_flight_extents, offset, length);
|
||||
if (blocked) {
|
||||
ldout(cct, 20) << "blocking overlapping IO: " << "ictx="
|
||||
<< &m_image_ctx << ", "
|
||||
<< "off=" << offset << ", len=" << length << dendl;
|
||||
req->blocked = true;
|
||||
m_blocked_ios.push_back(req);
|
||||
--m_in_flight_ios;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
req->start_op();
|
||||
req->send();
|
||||
|
||||
if (write_op) {
|
||||
@ -963,7 +958,7 @@ void ImageRequestWQ<I>::process_io(ImageDispatchSpec<I> *req,
|
||||
finish_in_flight_write();
|
||||
}
|
||||
unblock_overlapping_io(offset, length, tid);
|
||||
unblock_flushes(tid);
|
||||
unblock_flushes();
|
||||
}
|
||||
delete req;
|
||||
}
|
||||
@ -994,7 +989,7 @@ void ImageRequestWQ<I>::remove_in_flight_write_ios(uint64_t offset, uint64_t len
|
||||
if(!m_in_flight_extents.empty()) {
|
||||
CephContext *cct = m_image_ctx.cct;
|
||||
ldout(cct, 20) << "erasing in flight extents with tid:"
|
||||
<< tid << dendl;
|
||||
<< tid << ", offset: " << offset << dendl;
|
||||
ImageExtentIntervals extents;
|
||||
extents.insert(offset, length);
|
||||
ImageExtentIntervals intersect;
|
||||
|
@ -141,7 +141,7 @@ private:
|
||||
bool write_op, uint64_t tid);
|
||||
void finish_in_flight_write();
|
||||
|
||||
void unblock_flushes(uint64_t tid);
|
||||
void unblock_flushes();
|
||||
bool block_overlapping_io(ImageExtentIntervals* in_flight_image_extents,
|
||||
uint64_t object_off, uint64_t object_len);
|
||||
void unblock_overlapping_io(uint64_t offset, uint64_t length, uint64_t tid);
|
||||
|
@ -40,6 +40,7 @@ template <>
|
||||
struct ImageDispatchSpec<librbd::MockTestImageCtx> {
|
||||
static ImageDispatchSpec* s_instance;
|
||||
AioCompletion *aio_comp = nullptr;
|
||||
bool blocked = false;
|
||||
|
||||
static ImageDispatchSpec* create_write_request(
|
||||
librbd::MockTestImageCtx &image_ctx, AioCompletion *aio_comp,
|
||||
@ -421,7 +422,6 @@ TEST_F(TestMockIoImageRequestWQ, QosNoLimit) {
|
||||
expect_is_refresh_request(mock_image_ctx, false);
|
||||
expect_is_write_op(mock_queued_image_request, true);
|
||||
expect_dequeue(mock_image_request_wq, &mock_queued_image_request);
|
||||
expect_start_op(mock_queued_image_request);
|
||||
ASSERT_TRUE(mock_image_request_wq.invoke_dequeue() == &mock_queued_image_request);
|
||||
}
|
||||
|
||||
|
@ -125,12 +125,11 @@ TEST_F(TestInstances, NotifyRemove)
|
||||
ASSERT_EQ(0, librbd::cls_client::mirror_instances_add(&m_local_io_ctx,
|
||||
instance_id1));
|
||||
|
||||
|
||||
C_SaferCond on_init;
|
||||
instances.init(&on_init);
|
||||
ASSERT_EQ(0, on_init.wait());
|
||||
|
||||
instances.acked({instance_id1, instance_id2});
|
||||
instances.acked({instance_id2});
|
||||
|
||||
ASSERT_LT(0U, m_listener.add.count);
|
||||
instances.unblock_listener();
|
||||
|
@ -130,7 +130,7 @@ void Instances<I>::handle_acked(const InstanceIds& instance_ids) {
|
||||
|
||||
template <typename I>
|
||||
void Instances<I>::notify_instances_added(const InstanceIds& instance_ids) {
|
||||
std::lock_guard locker{m_lock};
|
||||
std::unique_lock locker{m_lock};
|
||||
InstanceIds added_instance_ids;
|
||||
for (auto& instance_id : instance_ids) {
|
||||
auto it = m_instances.find(instance_id);
|
||||
@ -144,9 +144,9 @@ void Instances<I>::notify_instances_added(const InstanceIds& instance_ids) {
|
||||
}
|
||||
|
||||
dout(5) << "instance_ids=" << added_instance_ids << dendl;
|
||||
m_lock.unlock();
|
||||
locker.unlock();
|
||||
m_listener.handle_added(added_instance_ids);
|
||||
m_lock.lock();
|
||||
locker.lock();
|
||||
|
||||
for (auto& instance_id : added_instance_ids) {
|
||||
auto it = m_instances.find(instance_id);
|
||||
|
Loading…
Reference in New Issue
Block a user