journal: flush commit positions should wait for refresh

Fixes: http://tracker.ceph.com/issues/22945
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
Jason Dillaman 2018-02-13 10:05:01 -05:00
parent c5a5b20e08
commit 24df022e0b
2 changed files with 96 additions and 60 deletions

View File

@ -673,14 +673,9 @@ void JournalMetadata::assert_active_tag(uint64_t tag_tid, Context *on_finish) {
void JournalMetadata::flush_commit_position() {
ldout(m_cct, 20) << __func__ << dendl;
Mutex::Locker timer_locker(*m_timer_lock);
Mutex::Locker locker(m_lock);
if (m_commit_position_ctx == nullptr) {
return;
}
cancel_commit_task();
handle_commit_position_task();
C_SaferCond ctx;
flush_commit_position(&ctx);
ctx.wait();
}
void JournalMetadata::flush_commit_position(Context *on_safe) {
@ -688,7 +683,7 @@ void JournalMetadata::flush_commit_position(Context *on_safe) {
Mutex::Locker timer_locker(*m_timer_lock);
Mutex::Locker locker(m_lock);
if (m_commit_position_ctx == nullptr) {
if (m_commit_position_ctx == nullptr && m_flush_commits_in_progress == 0) {
// nothing to flush
if (on_safe != nullptr) {
m_work_queue->queue(on_safe, 0);
@ -697,9 +692,12 @@ void JournalMetadata::flush_commit_position(Context *on_safe) {
}
if (on_safe != nullptr) {
m_commit_position_ctx = new C_FlushCommitPosition(
m_commit_position_ctx, on_safe);
m_flush_commit_position_ctxs.push_back(on_safe);
}
if (m_commit_position_ctx == nullptr) {
return;
}
cancel_commit_task();
handle_commit_position_task();
}
@ -807,7 +805,6 @@ void JournalMetadata::cancel_commit_task() {
assert(m_lock.is_locked());
assert(m_commit_position_ctx != nullptr);
assert(m_commit_position_task_ctx != nullptr);
m_timer->cancel_event(m_commit_position_task_ctx);
m_commit_position_task_ctx = NULL;
}
@ -818,7 +815,7 @@ void JournalMetadata::schedule_commit_task() {
assert(m_timer_lock->is_locked());
assert(m_lock.is_locked());
assert(m_commit_position_ctx != nullptr);
if (m_commit_position_task_ctx == NULL) {
if (m_commit_position_task_ctx == nullptr) {
m_commit_position_task_ctx =
m_timer->add_event_after(m_settings.commit_interval,
new C_CommitPositionTask(this));
@ -832,22 +829,51 @@ void JournalMetadata::handle_commit_position_task() {
<< "client_id=" << m_client_id << ", "
<< "commit_position=" << m_commit_position << dendl;
m_commit_position_task_ctx = nullptr;
Context* commit_position_ctx = nullptr;
std::swap(commit_position_ctx, m_commit_position_ctx);
m_async_op_tracker.start_op();
++m_flush_commits_in_progress;
Context* ctx = new FunctionContext([this, commit_position_ctx](int r) {
Contexts flush_commit_position_ctxs;
m_lock.Lock();
assert(m_flush_commits_in_progress > 0);
--m_flush_commits_in_progress;
if (m_flush_commits_in_progress == 0) {
std::swap(flush_commit_position_ctxs, m_flush_commit_position_ctxs);
}
m_lock.Unlock();
commit_position_ctx->complete(0);
for (auto ctx : flush_commit_position_ctxs) {
ctx->complete(0);
}
m_async_op_tracker.finish_op();
});
ctx = new C_NotifyUpdate(this, ctx);
ctx = new FunctionContext([this, ctx](int r) {
// manually kick of a refresh in case the notification is missed
// and ignore the next notification that we are about to send
m_lock.Lock();
++m_ignore_watch_notifies;
m_lock.Unlock();
refresh(ctx);
});
ctx = new FunctionContext([this, ctx](int r) {
schedule_laggy_clients_disconnect(ctx);
});
librados::ObjectWriteOperation op;
client::client_commit(&op, m_client_id, m_commit_position);
Context *ctx = new C_NotifyUpdate(this, m_commit_position_ctx);
m_commit_position_ctx = NULL;
ctx = schedule_laggy_clients_disconnect(ctx);
librados::AioCompletion *comp =
librados::Rados::aio_create_completion(ctx, NULL,
utils::rados_ctx_callback);
auto comp = librados::Rados::aio_create_completion(ctx, nullptr,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, comp, &op);
assert(r == 0);
comp->release();
m_commit_position_task_ctx = NULL;
}
void JournalMetadata::schedule_watch_reset() {
@ -884,6 +910,14 @@ void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) {
bufferlist bl;
m_ioctx.notify_ack(m_oid, notify_id, cookie, bl);
{
Mutex::Locker locker(m_lock);
if (m_ignore_watch_notifies > 0) {
--m_ignore_watch_notifies;
return;
}
}
refresh(NULL);
}
@ -1060,57 +1094,55 @@ void JournalMetadata::handle_notified(int r) {
ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl;
}
Context *JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) {
assert(m_lock.is_locked());
void JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) {
ldout(m_cct, 20) << __func__ << dendl;
if (m_settings.max_concurrent_object_sets <= 0) {
return on_finish;
on_finish->complete(0);
return;
}
Context *ctx = on_finish;
{
Mutex::Locker locker(m_lock);
for (auto &c : m_registered_clients) {
if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED ||
c.id == m_client_id ||
m_settings.whitelisted_laggy_clients.count(c.id) > 0) {
continue;
}
const std::string &client_id = c.id;
uint64_t object_set = 0;
if (!c.commit_position.object_positions.empty()) {
auto &position = *(c.commit_position.object_positions.begin());
object_set = position.object_number / m_splay_width;
}
for (auto &c : m_registered_clients) {
if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED ||
c.id == m_client_id ||
m_settings.whitelisted_laggy_clients.count(c.id) > 0) {
continue;
}
const std::string &client_id = c.id;
uint64_t object_set = 0;
if (!c.commit_position.object_positions.empty()) {
auto &position = *(c.commit_position.object_positions.begin());
object_set = position.object_number / m_splay_width;
}
if (m_active_set > object_set + m_settings.max_concurrent_object_sets) {
ldout(m_cct, 1) << __func__ << ": " << client_id
<< ": scheduling disconnect" << dendl;
if (m_active_set > object_set + m_settings.max_concurrent_object_sets) {
ldout(m_cct, 1) << __func__ << ": " << client_id
<< ": scheduling disconnect" << dendl;
ctx = new FunctionContext([this, client_id, ctx](int r1) {
ldout(m_cct, 10) << __func__ << ": " << client_id
<< ": flagging disconnected" << dendl;
ctx = new FunctionContext([this, client_id, ctx](int r1) {
ldout(m_cct, 10) << __func__ << ": " << client_id
<< ": flagging disconnected" << dendl;
librados::ObjectWriteOperation op;
client::client_update_state(
&op, client_id, cls::journal::CLIENT_STATE_DISCONNECTED);
librados::ObjectWriteOperation op;
client::client_update_state(&op, client_id,
cls::journal::CLIENT_STATE_DISCONNECTED);
librados::AioCompletion *comp =
librados::Rados::aio_create_completion(ctx, nullptr,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, comp, &op);
assert(r == 0);
comp->release();
});
auto comp = librados::Rados::aio_create_completion(
ctx, nullptr, utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, comp, &op);
assert(r == 0);
comp->release();
});
}
}
}
if (ctx == on_finish) {
ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl;
}
return ctx;
ctx->complete(0);
}
std::ostream &operator<<(std::ostream &os,

View File

@ -333,6 +333,7 @@ private:
size_t m_update_notifications;
Cond m_update_cond;
size_t m_ignore_watch_notifies = 0;
size_t m_refreshes_in_progress = 0;
Contexts m_refresh_ctxs;
@ -341,6 +342,9 @@ private:
Context *m_commit_position_ctx;
Context *m_commit_position_task_ctx;
size_t m_flush_commits_in_progress = 0;
Contexts m_flush_commit_position_ctxs;
AsyncOpTracker m_async_op_tracker;
void handle_immutable_metadata(int r, Context *on_init);
@ -358,7 +362,7 @@ private:
void handle_watch_error(int err);
void handle_notified(int r);
Context *schedule_laggy_clients_disconnect(Context *on_finish);
void schedule_laggy_clients_disconnect(Context *on_finish);
friend std::ostream &operator<<(std::ostream &os,
const JournalMetadata &journal_metadata);