librbd: do not complete AIO callbacks within caller's thread context

Avoid rare, racy issues when individual requests associated with an AIO
completion can complete prior to marking the completion as ready-to-fire.
Pre-calculate the expected number of individual requests to avoid the
potential re-entrant callback.

Fixes: #13969
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
Jason Dillaman 2015-12-04 10:25:24 -05:00
parent 1fffb25e29
commit fde9f785cd
4 changed files with 87 additions and 59 deletions

View File

@ -28,12 +28,6 @@
namespace librbd {
void AioCompletion::finish_adding_requests(CephContext *cct)
{
ldout(cct, 20) << "AioCompletion::finish_adding_requests " << (void*)this << " pending " << pending_count << dendl;
unblock(cct);
}
int AioCompletion::wait_for_complete() {
tracepoint(librbd, aio_wait_for_complete_enter, this);
lock.Lock();
@ -46,8 +40,9 @@ namespace librbd {
void AioCompletion::finalize(CephContext *cct, ssize_t rval)
{
ldout(cct, 20) << "AioCompletion::finalize() " << (void*)this << " rval " << rval << " read_buf " << (void*)read_buf
<< " read_bl " << (void*)read_bl << dendl;
ldout(cct, 20) << this << " " << __func__ << ": r=" << rval << ", "
<< "read_buf=" << reinterpret_cast<void*>(read_buf) << ", "
<< "real_bl=" << reinterpret_cast<void*>(read_bl) << dendl;
if (rval >= 0 && aio_type == AIO_TYPE_READ) {
// FIXME: make the destriper write directly into a buffer so
// that we avoid shuffling pointers and copying zeros around.
@ -57,11 +52,11 @@ namespace librbd {
if (read_buf) {
assert(bl.length() == read_buf_len);
bl.copy(0, read_buf_len, read_buf);
ldout(cct, 20) << "AioCompletion::finalize() copied resulting " << bl.length()
ldout(cct, 20) << "copied resulting " << bl.length()
<< " bytes to " << (void*)read_buf << dendl;
}
if (read_bl) {
ldout(cct, 20) << "AioCompletion::finalize() moving resulting " << bl.length()
ldout(cct, 20) << "moving resulting " << bl.length()
<< " bytes to bl " << (void*)read_bl << dendl;
read_bl->claim(bl);
}
@ -94,6 +89,7 @@ namespace librbd {
}
// note: possible for image to be closed after op marked finished
done = true;
if (async_op.started()) {
async_op.finish_op();
}
@ -104,7 +100,6 @@ namespace librbd {
lock.Lock();
}
done = true;
if (ictx && event_notify && ictx->event_socket.is_valid()) {
ictx->completed_reqs_lock.Lock();
ictx->completed_reqs.push_back(&m_xlist_item);
@ -125,14 +120,16 @@ namespace librbd {
void AioCompletion::start_op(ImageCtx *i, aio_type_t t) {
init_time(i, t);
if (!async_op.started()) {
Mutex::Locker locker(lock);
if (!done && !async_op.started()) {
async_op.start_op(*ictx);
}
}
void AioCompletion::fail(CephContext *cct, int r)
{
lderr(cct) << "AioCompletion::fail() " << this << ": " << cpp_strerror(r)
lderr(cct) << this << " " << __func__ << ": " << cpp_strerror(r)
<< dendl;
lock.Lock();
assert(pending_count == 0);
@ -141,11 +138,19 @@ namespace librbd {
put_unlock();
}
void AioCompletion::set_request_count(CephContext *cct, uint32_t count) {
ldout(cct, 20) << this << " " << __func__ << ": pending=" << count << dendl;
lock.Lock();
assert(pending_count == 0);
pending_count = count;
lock.Unlock();
// if no pending requests, completion will fire now
unblock(cct);
}
void AioCompletion::complete_request(CephContext *cct, ssize_t r)
{
ldout(cct, 20) << "AioCompletion::complete_request() "
<< (void *)this << " complete_cb=" << (void *)complete_cb
<< " pending " << pending_count << dendl;
lock.Lock();
if (rval >= 0) {
if (r < 0 && r != -EEXIST)
@ -155,6 +160,9 @@ namespace librbd {
}
assert(pending_count);
int count = --pending_count;
ldout(cct, 20) << this << " " << __func__ << ": cb=" << complete_cb << ", "
<< "pending=" << pending_count << dendl;
if (!count && blockers == 0) {
finalize(cct, rval);
complete(cct);

View File

@ -35,10 +35,11 @@ namespace librbd {
*
* The retrying of individual requests is handled at a lower level,
* so all AioCompletion cares about is the count of outstanding
* requests. Note that this starts at 1 to prevent the reference
* count from reaching 0 while more requests are being added. When
* all requests have been added, finish_adding_requests() releases
* this initial reference.
* requests. The number of expected individual requests should be
* set initially using set_request_count() prior to issuing the
* requests. This ensures that the completion will not be completed
* within the caller's thread of execution (instead via a librados
* context or via a thread pool context for cache read hits).
*/
struct AioCompletion {
Mutex lock;
@ -48,7 +49,7 @@ namespace librbd {
callback_t complete_cb;
void *complete_arg;
rbd_completion_t rbd_comp;
int pending_count; ///< number of requests
uint32_t pending_count; ///< number of requests
uint32_t blockers;
int ref;
bool released;
@ -82,17 +83,8 @@ namespace librbd {
int wait_for_complete();
void add_request() {
lock.Lock();
pending_count++;
lock.Unlock();
get();
}
void finalize(CephContext *cct, ssize_t rval);
void finish_adding_requests(CephContext *cct);
void init_time(ImageCtx *i, aio_type_t t);
void start_op(ImageCtx *i, aio_type_t t);
void fail(CephContext *cct, int r);
@ -104,6 +96,13 @@ namespace librbd {
complete_arg = cb_arg;
}
void set_request_count(CephContext *cct, uint32_t num);
void add_request() {
lock.Lock();
assert(pending_count > 0);
lock.Unlock();
get();
}
void complete_request(CephContext *cct, ssize_t r);
void associate_journal_event(uint64_t tid);

View File

@ -168,33 +168,39 @@ void AioImageRead::send_request() {
m_aio_comp->read_buf_len = buffer_ofs;
m_aio_comp->read_bl = m_pbl;
for (map<object_t,vector<ObjectExtent> >::iterator p = object_extents.begin();
p != object_extents.end(); ++p) {
for (vector<ObjectExtent>::iterator q = p->second.begin();
q != p->second.end(); ++q) {
ldout(cct, 20) << " oid " << q->oid << " " << q->offset << "~"
<< q->length << " from " << q->buffer_extents
// pre-calculate the expected number of read requests
uint32_t request_count = 0;
for (auto &object_extent : object_extents) {
request_count += object_extent.second.size();
}
m_aio_comp->set_request_count(cct, request_count);
// issue the requests
for (auto &object_extent : object_extents) {
for (auto &extent : object_extent.second) {
ldout(cct, 20) << " oid " << extent.oid << " " << extent.offset << "~"
<< extent.length << " from " << extent.buffer_extents
<< dendl;
C_AioRead *req_comp = new C_AioRead(cct, m_aio_comp);
AioObjectRead *req = new AioObjectRead(&m_image_ctx, q->oid.name,
q->objectno, q->offset, q->length,
q->buffer_extents, snap_id, true,
req_comp, m_op_flags);
AioObjectRead *req = new AioObjectRead(&m_image_ctx, extent.oid.name,
extent.objectno, extent.offset,
extent.length,
extent.buffer_extents, snap_id,
true, req_comp, m_op_flags);
req_comp->set_req(req);
if (m_image_ctx.object_cacher) {
C_CacheRead *cache_comp = new C_CacheRead(&m_image_ctx, req);
m_image_ctx.aio_read_from_cache(q->oid, q->objectno, &req->data(),
q->length, q->offset,
cache_comp, m_op_flags);
m_image_ctx.aio_read_from_cache(extent.oid, extent.objectno,
&req->data(), extent.length,
extent.offset, cache_comp, m_op_flags);
} else {
req->send();
}
}
}
m_aio_comp->finish_adding_requests(cct);
m_aio_comp->put();
m_image_ctx.perfcounter->inc(l_librbd_rd);
@ -244,6 +250,10 @@ void AbstractAioImageWrite::send_request() {
assert(!m_image_ctx.image_watcher->is_lock_supported() ||
m_image_ctx.image_watcher->is_lock_owner());
m_aio_comp->set_request_count(
m_image_ctx.cct, object_extents.size() +
get_cache_request_count(journaling));
AioObjectRequests requests;
send_object_requests(object_extents, snapc, (journaling ? &requests : NULL));
@ -257,8 +267,6 @@ void AbstractAioImageWrite::send_request() {
send_cache_requests(object_extents, journal_tid);
}
update_stats(clip_len);
m_aio_comp->finish_adding_requests(cct);
m_aio_comp->put();
}
@ -366,6 +374,11 @@ uint64_t AioImageDiscard::append_journal_event(
return tid;
}
uint32_t AioImageDiscard::get_cache_request_count(bool journaling) const {
// extra completion request is required for tracking journal commit
return (journaling ? 1 : 0);
}
void AioImageDiscard::send_cache_requests(const ObjectExtents &object_extents,
uint64_t journal_tid) {
if (journal_tid == 0) {
@ -415,28 +428,32 @@ void AioImageDiscard::update_stats(size_t length) {
void AioImageFlush::send_request() {
CephContext *cct = m_image_ctx.cct;
bool journaling = false;
{
// journal the flush event
RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
if (m_image_ctx.journal != NULL &&
!m_image_ctx.journal->is_journal_replaying()) {
uint64_t journal_tid = m_image_ctx.journal->append_io_event(
m_aio_comp, journal::EventEntry(journal::AioFlushEvent()),
AioObjectRequests(), 0, 0, false);
journaling = (m_image_ctx.journal != NULL &&
!m_image_ctx.journal->is_journal_replaying());
}
C_FlushJournalCommit *ctx = new C_FlushJournalCommit(m_image_ctx,
m_aio_comp,
journal_tid);
m_image_ctx.journal->flush_event(journal_tid, ctx);
m_aio_comp->associate_journal_event(journal_tid);
}
m_aio_comp->set_request_count(cct, journaling ? 2 : 1);
if (journaling) {
// in-flight ops are flushed prior to closing the journal
uint64_t journal_tid = m_image_ctx.journal->append_io_event(
m_aio_comp, journal::EventEntry(journal::AioFlushEvent()),
AioObjectRequests(), 0, 0, false);
C_FlushJournalCommit *ctx = new C_FlushJournalCommit(m_image_ctx,
m_aio_comp,
journal_tid);
m_image_ctx.journal->flush_event(journal_tid, ctx);
m_aio_comp->associate_journal_event(journal_tid);
}
C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
m_image_ctx.flush(req_comp);
m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_FLUSH);
m_aio_comp->finish_adding_requests(cct);
m_aio_comp->put();
m_image_ctx.perfcounter->inc(l_librbd_aio_flush);

View File

@ -108,6 +108,9 @@ protected:
virtual void send_request();
virtual uint32_t get_cache_request_count(bool journaling) const {
return 0;
}
virtual void send_cache_requests(const ObjectExtents &object_extents,
uint64_t journal_tid) = 0;
@ -177,6 +180,7 @@ protected:
return "aio_discard";
}
virtual uint32_t get_cache_request_count(bool journaling) const override;
virtual void send_cache_requests(const ObjectExtents &object_extents,
uint64_t journal_tid);