Merge pull request #11326 from dillaman/wip-17416

rbd-mirror: improve resiliency of stress test case

Reviewed-by: Mykola Golub <mgolub@mirantis.com>
This commit is contained in:
Mykola Golub 2016-10-06 21:47:45 +03:00 committed by GitHub
commit 5a98a8cdef
7 changed files with 110 additions and 70 deletions

View File

@ -671,7 +671,7 @@ stress_write_image()
local cluster=$1
local pool=$2
local image=$3
local duration=$(awk 'BEGIN {srand(); print int(35 * rand()) + 15}')
local duration=$(awk 'BEGIN {srand(); print int(10 * rand()) + 5}')
timeout ${duration}s ceph_test_rbd_mirror_random_write \
--cluster ${cluster} ${pool} ${image} \

View File

@ -105,10 +105,7 @@ Future JournalRecorder::append(uint64_t tag_tid,
entry_bl);
assert(entry_bl.length() <= m_journal_metadata->get_object_size());
AppendBuffers append_buffers;
append_buffers.push_back(std::make_pair(future, entry_bl));
bool object_full = object_ptr->append_unlock(append_buffers);
bool object_full = object_ptr->append_unlock({{future, entry_bl}});
if (object_full) {
ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
<< dendl;
@ -284,8 +281,7 @@ void JournalRecorder::create_next_object_recorder_unlock(
new_object_recorder->get_object_number());
}
new_object_recorder->append_unlock(append_buffers);
new_object_recorder->append_unlock(std::move(append_buffers));
m_object_ptrs[splay_offset] = new_object_recorder;
}

View File

@ -44,12 +44,12 @@ ObjectRecorder::~ObjectRecorder() {
assert(!m_aio_scheduled);
}
bool ObjectRecorder::append_unlock(const AppendBuffers &append_buffers) {
bool ObjectRecorder::append_unlock(AppendBuffers &&append_buffers) {
assert(m_lock->is_locked());
FutureImplPtr last_flushed_future;
bool schedule_append = false;
if (m_overflowed) {
m_append_buffers.insert(m_append_buffers.end(),
append_buffers.begin(), append_buffers.end());
@ -132,12 +132,16 @@ void ObjectRecorder::flush(const FutureImplPtr &future) {
return;
}
AppendBuffers::iterator it;
for (it = m_append_buffers.begin(); it != m_append_buffers.end(); ++it) {
if (it->first == future) {
AppendBuffers::reverse_iterator r_it;
for (r_it = m_append_buffers.rbegin(); r_it != m_append_buffers.rend();
++r_it) {
if (r_it->first == future) {
break;
}
}
assert(r_it != m_append_buffers.rend());
auto it = (++r_it).base();
assert(it != m_append_buffers.end());
++it;
@ -241,7 +245,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
AppendBuffers append_buffers;
{
Mutex::Locker locker(*m_lock);
m_lock->Lock();
auto tid_iter = m_in_flight_tids.find(tid);
assert(tid_iter != m_in_flight_tids.end());
m_in_flight_tids.erase(tid_iter);
@ -250,7 +254,6 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
if (r == -EOVERFLOW || m_overflowed) {
if (iter != m_in_flight_appends.end()) {
m_overflowed = true;
append_overflowed(tid);
} else {
// must have seen an overflow on a previous append op
assert(r == -EOVERFLOW && m_overflowed);
@ -258,7 +261,10 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
// notify of overflow once all in-flight ops are complete
if (m_in_flight_tids.empty() && !m_aio_scheduled) {
notify_handler();
append_overflowed();
notify_handler_unlock();
} else {
m_lock->Unlock();
}
return;
}
@ -268,11 +274,8 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
assert(!append_buffers.empty());
m_in_flight_appends.erase(iter);
if (m_in_flight_appends.empty() && !m_aio_scheduled && m_object_closed) {
// all remaining unsent appends should be redirected to new object
notify_handler();
}
m_in_flight_flushes = true;
m_lock->Unlock();
}
// Flag the associated futures as complete.
@ -284,18 +287,24 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
}
// wake up any flush requests that raced with a RADOS callback
Mutex::Locker locker(*m_lock);
m_lock->Lock();
m_in_flight_flushes = false;
m_in_flight_flushes_cond.Signal();
if (m_in_flight_appends.empty() && !m_aio_scheduled && m_object_closed) {
// all remaining unsent appends should be redirected to new object
notify_handler_unlock();
} else {
m_lock->Unlock();
}
}
void ObjectRecorder::append_overflowed(uint64_t tid) {
void ObjectRecorder::append_overflowed() {
ldout(m_cct, 10) << __func__ << ": " << m_oid << " append overflowed"
<< dendl;
assert(m_lock->is_locked());
assert(!m_in_flight_appends.empty());
assert(m_in_flight_appends.begin()->first == tid);
cancel_append_task();
@ -314,6 +323,13 @@ void ObjectRecorder::append_overflowed(uint64_t tid) {
m_append_buffers.begin(),
m_append_buffers.end());
restart_append_buffers.swap(m_append_buffers);
for (AppendBuffers::const_iterator it = m_append_buffers.begin();
it != m_append_buffers.end(); ++it) {
ldout(m_cct, 20) << __func__ << ": overflowed " << *it->first
<< dendl;
it->first->detach();
}
}
void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
@ -339,58 +355,72 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
}
void ObjectRecorder::send_appends_aio() {
Mutex::Locker locker(*m_lock);
AppendBuffers *append_buffers;
uint64_t append_tid;
{
Mutex::Locker locker(*m_lock);
append_tid = m_append_tid++;
m_in_flight_tids.insert(append_tid);
m_aio_scheduled = false;
// safe to hold pointer outside lock until op is submitted
append_buffers = &m_in_flight_appends[append_tid];
append_buffers->swap(m_pending_buffers);
}
AppendBuffers append_buffers;
m_pending_buffers.swap(append_buffers);
uint64_t append_tid = m_append_tid++;
ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid="
<< append_tid << dendl;
C_AppendFlush *append_flush = new C_AppendFlush(this, append_tid);
C_Gather *gather_ctx = new C_Gather(m_cct, append_flush);
librados::ObjectWriteOperation op;
client::guard_append(&op, m_soft_max_size);
for (AppendBuffers::iterator it = append_buffers.begin();
it != append_buffers.end(); ++it) {
for (AppendBuffers::iterator it = append_buffers->begin();
it != append_buffers->end(); ++it) {
ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
<< dendl;
op.append(it->second);
op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
}
m_in_flight_tids.insert(append_tid);
m_in_flight_appends[append_tid].swap(append_buffers);
librados::AioCompletion *rados_completion =
librados::Rados::aio_create_completion(append_flush, NULL,
librados::Rados::aio_create_completion(gather_ctx->new_sub(), nullptr,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
assert(r == 0);
rados_completion->release();
}
void ObjectRecorder::notify_handler() {
assert(m_lock->is_locked());
for (AppendBuffers::const_iterator it = m_append_buffers.begin();
it != m_append_buffers.end(); ++it) {
ldout(m_cct, 20) << __func__ << ": overflowed " << *it->first
<< dendl;
it->first->detach();
{
m_lock->Lock();
if (m_pending_buffers.empty()) {
m_aio_scheduled = false;
if (m_in_flight_appends.empty() && m_object_closed) {
// all remaining unsent appends should be redirected to new object
notify_handler_unlock();
} else {
m_lock->Unlock();
}
} else {
// additional pending items -- reschedule
m_op_work_queue->queue(new FunctionContext([this] (int r) {
send_appends_aio();
}));
m_lock->Unlock();
}
}
// allow append op to complete
gather_ctx->activate();
}
void ObjectRecorder::notify_handler_unlock() {
assert(m_lock->is_locked());
if (m_object_closed) {
m_lock->Unlock();
m_handler->closed(this);
m_lock->Lock();
} else {
// TODO need to delay completion until after aio_notify completes
m_lock->Unlock();
m_handler->overflow(this);
m_lock->Lock();
}
}

View File

@ -51,7 +51,7 @@ public:
return m_oid;
}
bool append_unlock(const AppendBuffers &append_buffers);
bool append_unlock(AppendBuffers &&append_buffers);
void flush(Context *on_safe);
void flush(const FutureImplPtr &future);
@ -160,11 +160,11 @@ private:
bool append(const AppendBuffer &append_buffer, bool *schedule_append);
bool flush_appends(bool force);
void handle_append_flushed(uint64_t tid, int r);
void append_overflowed(uint64_t tid);
void append_overflowed();
void send_appends(AppendBuffers *append_buffers);
void send_appends_aio();
void notify_handler();
void notify_handler_unlock();
};
} // namespace journal

View File

@ -475,11 +475,12 @@ void ImageWatcher<I>::notify_request_lock() {
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
// ExclusiveLock state machine can be dynamically disabled
if (m_image_ctx.exclusive_lock == nullptr) {
// ExclusiveLock state machine can be dynamically disabled or
// race with task cancel
if (m_image_ctx.exclusive_lock == nullptr ||
m_image_ctx.exclusive_lock->is_lock_owner()) {
return;
}
assert(!m_image_ctx.exclusive_lock->is_lock_owner());
ldout(m_image_ctx.cct, 10) << this << " notify request lock" << dendl;

View File

@ -119,14 +119,14 @@ TEST_F(TestObjectRecorder, Append) {
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
ASSERT_EQ(1U, object->get_pending_appends());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
ASSERT_EQ(2U, object->get_pending_appends());
C_SaferCond cond;
@ -151,14 +151,14 @@ TEST_F(TestObjectRecorder, AppendFlushByCount) {
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
ASSERT_EQ(1U, object->get_pending_appends());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
ASSERT_EQ(0U, object->get_pending_appends());
C_SaferCond cond;
@ -182,14 +182,14 @@ TEST_F(TestObjectRecorder, AppendFlushByBytes) {
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
ASSERT_EQ(1U, object->get_pending_appends());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
ASSERT_EQ(0U, object->get_pending_appends());
C_SaferCond cond;
@ -213,13 +213,13 @@ TEST_F(TestObjectRecorder, AppendFlushByAge) {
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
C_SaferCond cond;
append_buffer2.first->wait(&cond);
@ -243,13 +243,13 @@ TEST_F(TestObjectRecorder, AppendFilledObject) {
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
payload);
append_buffers = {append_buffer2};
lock->Lock();
ASSERT_TRUE(object->append_unlock(append_buffers));
ASSERT_TRUE(object->append_unlock(std::move(append_buffers)));
C_SaferCond cond;
append_buffer2.first->wait(&cond);
@ -272,7 +272,7 @@ TEST_F(TestObjectRecorder, Flush) {
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
ASSERT_EQ(1U, object->get_pending_appends());
C_SaferCond cond1;
@ -300,7 +300,7 @@ TEST_F(TestObjectRecorder, FlushFuture) {
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
ASSERT_EQ(1U, object->get_pending_appends());
C_SaferCond cond;
@ -336,7 +336,7 @@ TEST_F(TestObjectRecorder, FlushDetachedFuture) {
lock->Unlock();
ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
// should automatically flush once its attached to the object
C_SaferCond cond;
@ -360,7 +360,7 @@ TEST_F(TestObjectRecorder, Close) {
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
ASSERT_EQ(1U, object->get_pending_appends());
lock->Lock();
@ -403,7 +403,7 @@ TEST_F(TestObjectRecorder, Overflow) {
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1, append_buffer2};
lock1->Lock();
ASSERT_TRUE(object1->append_unlock(append_buffers));
ASSERT_TRUE(object1->append_unlock(std::move(append_buffers)));
C_SaferCond cond;
append_buffer2.first->wait(&cond);
@ -415,7 +415,7 @@ TEST_F(TestObjectRecorder, Overflow) {
append_buffers = {append_buffer3};
lock2->Lock();
ASSERT_FALSE(object2->append_unlock(append_buffers));
ASSERT_FALSE(object2->append_unlock(std::move(append_buffers)));
append_buffer3.first->flush(NULL);
bool overflowed = false;

View File

@ -726,6 +726,7 @@ void ImageReplayer<I>::handle_replay_ready()
return;
}
m_event_replay_tracker.start_op();
if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
preprocess_entry();
return;
@ -878,13 +879,20 @@ template <typename I>
void ImageReplayer<I>::replay_flush() {
dout(20) << dendl;
bool interrupted = false;
{
Mutex::Locker locker(m_lock);
if (m_state != STATE_REPLAYING) {
dout(20) << "replay interrupted" << dendl;
return;
interrupted = true;
} else {
m_state = STATE_REPLAY_FLUSHING;
}
m_state = STATE_REPLAY_FLUSHING;
}
if (interrupted) {
m_event_replay_tracker.finish_op();
return;
}
// shut down the replay to flush all IO and ops and create a new
@ -917,9 +925,11 @@ void ImageReplayer<I>::handle_replay_flush(int r) {
if (r < 0) {
derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
m_event_replay_tracker.finish_op();
handle_replay_complete(r, "replay flush encountered an error");
return;
} else if (on_replay_interrupted()) {
m_event_replay_tracker.finish_op();
return;
}
@ -951,6 +961,7 @@ void ImageReplayer<I>::handle_get_remote_tag(int r) {
if (r < 0) {
derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
<< cpp_strerror(r) << dendl;
m_event_replay_tracker.finish_op();
handle_replay_complete(r, "failed to retrieve remote tag");
return;
}
@ -998,6 +1009,7 @@ void ImageReplayer<I>::handle_allocate_local_tag(int r) {
if (r < 0) {
derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
m_event_replay_tracker.finish_op();
handle_replay_complete(r, "failed to allocate journal tag");
return;
}
@ -1015,6 +1027,7 @@ void ImageReplayer<I>::preprocess_entry() {
int r = m_local_replay->decode(&it, &m_event_entry);
if (r < 0) {
derr << "failed to decode journal event" << dendl;
m_event_replay_tracker.finish_op();
handle_replay_complete(r, "failed to decode journal event");
return;
}
@ -1035,6 +1048,7 @@ void ImageReplayer<I>::handle_preprocess_entry(int r) {
if (r < 0) {
derr << "failed to preprocess journal event" << dendl;
m_event_replay_tracker.finish_op();
handle_replay_complete(r, "failed to preprocess journal event");
return;
}
@ -1051,7 +1065,6 @@ void ImageReplayer<I>::process_entry() {
ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry));
m_event_replay_tracker.start_op();
m_local_replay->process(m_event_entry, on_ready, on_commit);
}