rbd-mirror: async flush for ImageReplayer

Signed-off-by: Mykola Golub <mgolub@mirantis.com>
This commit is contained in:
Mykola Golub 2016-03-20 10:02:18 +02:00
parent 5c5a50745f
commit d66e8f646a
3 changed files with 139 additions and 35 deletions

View File

@ -256,7 +256,9 @@ public:
for (int i = 0; i < 100; i++) {
printf("m_replayer->flush()\n");
m_replayer->flush();
C_SaferCond cond;
m_replayer->flush(&cond);
ASSERT_EQ(0, cond.wait());
get_commit_positions(&master_position, &mirror_position);
if (master_position == mirror_position) {
break;

View File

@ -101,7 +101,9 @@ public:
explicit FlushCommand(ImageReplayer *replayer) : replayer(replayer) {}
bool call(Formatter *f, stringstream *ss) {
int r = replayer->flush();
C_SaferCond cond;
replayer->flush(&cond);
int r = cond.wait();
if (r < 0) {
*ss << "flush: " << cpp_strerror(r);
return false;
@ -625,6 +627,21 @@ void ImageReplayer::stop(Context *on_finish)
on_finish->complete(0);
});
m_on_finish = ctx;
}
} else if (m_state == STATE_FLUSHING_REPLAY) {
dout(20) << "interrupting flush" << dendl;
if (on_finish) {
Context *on_flush_finish = m_on_finish;
FunctionContext *ctx = new FunctionContext(
[this, on_flush_finish, on_finish](int r) {
if (on_flush_finish) {
on_flush_finish->complete(r);
}
on_finish->complete(0);
});
m_on_finish = ctx;
}
} else {
@ -736,47 +753,126 @@ void ImageReplayer::handle_replay_ready()
m_local_replay->process(&it, on_ready, on_commit);
}
int ImageReplayer::flush()
void ImageReplayer::flush(Context *on_finish)
{
// TODO: provide async method
dout(20) << "enter" << dendl;
bool start_flush = false;
{
Mutex::Locker locker(m_lock);
if (m_state != STATE_REPLAYING) {
return 0;
}
if (m_state == STATE_REPLAYING) {
assert(m_on_finish == nullptr);
m_on_finish = on_finish;
m_state = STATE_FLUSHING_REPLAY;
m_state = STATE_FLUSHING_REPLAY;
start_flush = true;
}
}
C_SaferCond replay_flush_ctx;
m_local_replay->flush(&replay_flush_ctx);
int r = replay_flush_ctx.wait();
if (start_flush) {
on_flush_local_replay_flush_start();
} else if (on_finish) {
on_finish->complete(0);
}
}
void ImageReplayer::on_flush_local_replay_flush_start()
{
dout(20) << "enter" << dendl;
FunctionContext *ctx = new FunctionContext(
[this](int r) {
on_flush_local_replay_flush_finish(r);
});
m_local_replay->flush(ctx);
}
void ImageReplayer::on_flush_local_replay_flush_finish(int r)
{
dout(20) << "r=" << r << dendl;
if (r < 0) {
derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
}
C_SaferCond journaler_flush_ctx;
m_remote_journaler->flush_commit_position(&journaler_flush_ctx);
int r1 = journaler_flush_ctx.wait();
if (r1 < 0) {
derr << "error flushing remote journal commit position: "
<< cpp_strerror(r1) << dendl;
if (on_flush_interrupted()) {
return;
}
on_flush_flush_commit_position_start(r);
}
void ImageReplayer::on_flush_flush_commit_position_start(int last_r)
{
FunctionContext *ctx = new FunctionContext(
[this, last_r](int r) {
on_flush_flush_commit_position_finish(last_r, r);
});
m_remote_journaler->flush_commit_position(ctx);
}
void ImageReplayer::on_flush_flush_commit_position_finish(int last_r, int r)
{
if (r < 0) {
derr << "error flushing remote journal commit position: "
<< cpp_strerror(r) << dendl;
} else {
r = last_r;
}
Context *on_finish(nullptr);
{
Mutex::Locker locker(m_lock);
assert(m_state == STATE_FLUSHING_REPLAY);
if (m_state == STATE_STOPPING) {
r = -EINTR;
} else {
assert(m_state == STATE_FLUSHING_REPLAY);
m_state = STATE_REPLAYING;
m_state = STATE_REPLAYING;
}
std::swap(m_on_finish, on_finish);
}
dout(20) << "done" << dendl;
dout(20) << "flush complete, r=" << r << dendl;
return r < 0 ? r : r1;
if (on_finish) {
dout(20) << "on finish complete, r=" << r << dendl;
on_finish->complete(r);
}
}
bool ImageReplayer::on_flush_interrupted()
{
Context *on_finish(nullptr);
{
Mutex::Locker locker(m_lock);
if (m_state == STATE_FLUSHING_REPLAY) {
return false;
}
assert(m_state == STATE_STOPPING);
std::swap(m_on_finish, on_finish);
}
dout(20) << "flush interrupted" << dendl;
if (on_finish) {
int r = -EINTR;
dout(20) << "on finish complete, r=" << r << dendl;
on_finish->complete(r);
}
return true;
}
void ImageReplayer::handle_replay_process_ready(int r)

View File

@ -84,7 +84,7 @@ public:
void start(Context *on_finish = nullptr,
const BootstrapParams *bootstrap_params = nullptr);
void stop(Context *on_finish = nullptr);
int flush();
void flush(Context *on_finish = nullptr);
virtual void handle_replay_ready();
virtual void handle_replay_process_ready(int r);
@ -107,7 +107,7 @@ protected:
* | (sync required) *
* |\-----\ *
* | | *
* | v *
* | v (error) *
* | BOOTSTRAP_IMAGE * * * * * * * * * * *
* | | *
* | v *
@ -117,21 +117,21 @@ protected:
* REMOTE_JOURNALER_INIT * * * * * * * * * * *
* | *
* v (error) *
* LOCAL_IMAGE_OPEN (skip if not *
* LOCAL_IMAGE_OPEN (skip if not * * * * * * *
* | needed *
* v (error) *
* WAIT_FOR_LOCAL_JOURNAL_READY * * * * * * * *
* |
* v
* <replaying>
* |
* v
* <stopping>
* |
* v
* JOURNAL_REPLAY_SHUT_DOWN
* |
* v
* v-----------------------------------------------\
* <replaying> --------------> <flushing_replay> |
* | | |
* v v |
* <stopping> LOCAL_REPLAY_FLUSH |
* | | |
* v v |
* JOURNAL_REPLAY_SHUT_DOWN FLUSH_COMMIT_POSITION |
* | | |
* v \-------------------/
* LOCAL_IMAGE_CLOSE
* |
* v
@ -164,6 +164,12 @@ protected:
virtual void on_stop_local_image_close_start();
virtual void on_stop_local_image_close_finish(int r);
virtual void on_flush_local_replay_flush_start();
virtual void on_flush_local_replay_flush_finish(int r);
virtual void on_flush_flush_commit_position_start(int last_r);
virtual void on_flush_flush_commit_position_finish(int last_r, int r);
virtual bool on_flush_interrupted();
void close_local_image(Context *on_finish); // for tests
private: