Merge pull request #10378 from trociny/wip-14738

librbd: optionally unregister "laggy" journal clients

Reviewed-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
Jason Dillaman 2016-09-06 22:50:15 -04:00 committed by GitHub
commit c2a5e70338
23 changed files with 586 additions and 30 deletions

View File

@ -256,4 +256,67 @@ wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image}
test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position'
compare_images ${POOL} ${image}
testlog "TEST: client disconnect"
image=laggy
create_image ${CLUSTER2} ${POOL} ${image} 128 --journal-object-size 64K
write_image ${CLUSTER2} ${POOL} ${image} 10
testlog " - replay stopped after disconnect"
wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image}
wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image}
test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
disconnect_image ${CLUSTER2} ${POOL} ${image}
test -z "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image}
test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+error' 'disconnected'
testlog " - replay started after resync requested"
request_resync_image ${CLUSTER1} ${POOL} ${image}
wait_for_image_present ${CLUSTER1} ${POOL} ${image} 'deleted'
wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image}
wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image}
test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
compare_images ${POOL} ${image}
testlog " - disconnected after max_concurrent_object_sets reached"
admin_daemon ${CLUSTER1} rbd mirror stop ${POOL}/${image}
wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image}
test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
set_image_meta ${CLUSTER2} ${POOL} ${image} \
conf_rbd_journal_max_concurrent_object_sets 1
write_image ${CLUSTER2} ${POOL} ${image} 20 16384
write_image ${CLUSTER2} ${POOL} ${image} 20 16384
test -z "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
set_image_meta ${CLUSTER2} ${POOL} ${image} \
conf_rbd_journal_max_concurrent_object_sets 0
testlog " - replay is still stopped (disconnected) after restart"
admin_daemon ${CLUSTER1} rbd mirror start ${POOL}/${image}
wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image}
test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+error' 'disconnected'
testlog " - replay started after resync requested"
request_resync_image ${CLUSTER1} ${POOL} ${image}
wait_for_image_present ${CLUSTER1} ${POOL} ${image} 'deleted'
wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image}
wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image}
test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
compare_images ${POOL} ${image}
testlog " - rbd_mirroring_resync_after_disconnect config option"
set_image_meta ${CLUSTER1} ${POOL} ${image} \
conf_rbd_mirroring_resync_after_disconnect true
disconnect_image ${CLUSTER2} ${POOL} ${image}
wait_for_image_present ${CLUSTER1} ${POOL} ${image} 'deleted'
wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image}
wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image}
test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
compare_images ${POOL} ${image}
set_image_meta ${CLUSTER1} ${POOL} ${image} \
conf_rbd_mirroring_resync_after_disconnect false
disconnect_image ${CLUSTER2} ${POOL} ${image}
test -z "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image}
test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+error' 'disconnected'
echo OK

View File

@ -418,7 +418,7 @@ get_position()
local status_log=${TEMPDIR}/${CLUSTER2}-${pool}-${image}.status
rbd --cluster ${cluster} -p ${pool} journal status --image ${image} |
tee ${status_log} >&2
sed -nEe 's/^.*\[id='"${id_regexp}"',.*positions=\[\[([^]]*)\],.*$/\1/p' \
sed -nEe 's/^.*\[id='"${id_regexp}"',.*positions=\[\[([^]]*)\],.*state=connected.*$/\1/p' \
${status_log}
}
@ -488,13 +488,30 @@ test_status_in_pool_dir()
}
create_image()
{
local cluster=$1 ; shift
local pool=$1 ; shift
local image=$1 ; shift
local size=128
if [ -n "$1" ]; then
size=$1
shift
fi
rbd --cluster ${cluster} -p ${pool} create --size ${size} \
--image-feature layering,exclusive-lock,journaling $@ ${image}
}
set_image_meta()
{
local cluster=$1
local pool=$2
local image=$3
local key=$4
local val=$5
rbd --cluster ${cluster} -p ${pool} create --size 128 \
--image-feature layering,exclusive-lock,journaling ${image}
rbd --cluster ${cluster} -p ${pool} image-meta set ${image} $key $val
}
remove_image()
@ -532,6 +549,16 @@ clone_image()
${clone_pool}/${clone_image} --image-feature layering,exclusive-lock,journaling
}
disconnect_image()
{
local cluster=$1
local pool=$2
local image=$3
rbd --cluster ${cluster} -p ${pool} journal client disconnect \
--image ${image}
}
create_snapshot()
{
local cluster=$1
@ -614,9 +641,12 @@ write_image()
local pool=$2
local image=$3
local count=$4
local size=$5
test -n "${size}" || size=4096
rbd --cluster ${cluster} -p ${pool} bench-write ${image} \
--io-size 4096 --io-threads 1 --io-total $((4096 * count)) \
--io-size ${size} --io-threads 1 --io-total $((size * count)) \
--io-pattern rand
}

View File

@ -296,13 +296,18 @@ void client_update_data(librados::ObjectWriteOperation *op,
int client_update_state(librados::IoCtx &ioctx, const std::string &oid,
const std::string &id, cls::journal::ClientState state) {
librados::ObjectWriteOperation op;
client_update_state(&op, id, state);
return ioctx.operate(oid, &op);
}
void client_update_state(librados::ObjectWriteOperation *op,
const std::string &id,
cls::journal::ClientState state) {
bufferlist bl;
::encode(id, bl);
::encode(static_cast<uint8_t>(state), bl);
librados::ObjectWriteOperation op;
op.exec("journal", "client_update_state", bl);
return ioctx.operate(oid, &op);
op->exec("journal", "client_update_state", bl);
}
int client_unregister(librados::IoCtx &ioctx, const std::string &oid,

View File

@ -53,6 +53,9 @@ void client_update_data(librados::ObjectWriteOperation *op,
const std::string &id, const bufferlist &data);
int client_update_state(librados::IoCtx &ioctx, const std::string &oid,
const std::string &id, cls::journal::ClientState state);
void client_update_state(librados::ObjectWriteOperation *op,
const std::string &id,
cls::journal::ClientState state);
int client_unregister(librados::IoCtx &ioctx, const std::string &oid,
const std::string &id);

View File

@ -1211,6 +1211,7 @@ OPTION(rbd_tracing, OPT_BOOL, false) // true if LTTng-UST tracepoints should be
OPTION(rbd_validate_pool, OPT_BOOL, true) // true if empty pools should be validated for RBD compatibility
OPTION(rbd_validate_names, OPT_BOOL, true) // true if image specs should be validated
OPTION(rbd_auto_exclusive_lock_until_manual_request, OPT_BOOL, true) // whether to automatically acquire/release exclusive lock until it is explicitly requested, i.e. before we know the user of librbd is properly using the lock API
OPTION(rbd_mirroring_resync_after_disconnect, OPT_BOOL, false) // automatically start image resync after mirroring is disconnected due to being laggy
/*
* The following options change the behavior for librbd's image creation methods that
@ -1251,6 +1252,7 @@ OPTION(rbd_journal_object_flush_bytes, OPT_INT, 0) // maximum number of pending
OPTION(rbd_journal_object_flush_age, OPT_DOUBLE, 0) // maximum age (in seconds) for pending commits
OPTION(rbd_journal_pool, OPT_STR, "") // pool for journal objects
OPTION(rbd_journal_max_payload_bytes, OPT_U32, 16384) // maximum journal payload size before splitting
OPTION(rbd_journal_max_concurrent_object_sets, OPT_INT, 0) // maximum number of object sets a journal client can be behind before it is automatically unregistered
/**
* RBD Mirror options

View File

@ -749,6 +749,10 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
Client client(m_client_id, bufferlist());
RegisteredClients::iterator it = refresh->registered_clients.find(client);
if (it != refresh->registered_clients.end()) {
if (it->state == cls::journal::CLIENT_STATE_DISCONNECTED) {
ldout(m_cct, 0) << "client flagged disconnected: " << m_client_id
<< dendl;
}
m_minimum_set = MAX(m_minimum_set, refresh->minimum_set);
m_active_set = MAX(m_active_set, refresh->active_set);
m_registered_clients = refresh->registered_clients;
@ -810,9 +814,11 @@ void JournalMetadata::handle_commit_position_task() {
librados::ObjectWriteOperation op;
client::client_commit(&op, m_client_id, m_commit_position);
C_NotifyUpdate *ctx = new C_NotifyUpdate(this, m_commit_position_ctx);
Context *ctx = new C_NotifyUpdate(this, m_commit_position_ctx);
m_commit_position_ctx = NULL;
ctx = schedule_laggy_clients_disconnect(ctx);
librados::AioCompletion *comp =
librados::Rados::aio_create_completion(ctx, NULL,
utils::rados_ctx_callback);
@ -839,7 +845,7 @@ void JournalMetadata::handle_watch_reset() {
if (r == -ENOENT) {
ldout(m_cct, 5) << __func__ << ": journal header not found" << dendl;
} else {
lderr(m_cct) << __func__ << ": failed to watch journal"
lderr(m_cct) << __func__ << ": failed to watch journal: "
<< cpp_strerror(r) << dendl;
}
schedule_watch_reset();
@ -1023,6 +1029,59 @@ void JournalMetadata::handle_notified(int r) {
ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl;
}
Context *JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) {
assert(m_lock.is_locked());
ldout(m_cct, 20) << __func__ << dendl;
if (m_settings.max_concurrent_object_sets <= 0) {
return on_finish;
}
Context *ctx = on_finish;
for (auto &c : m_registered_clients) {
if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED ||
c.id == m_client_id ||
m_settings.whitelisted_laggy_clients.count(c.id) > 0) {
continue;
}
const std::string &client_id = c.id;
uint64_t object_set = 0;
if (!c.commit_position.object_positions.empty()) {
auto &position = *(c.commit_position.object_positions.begin());
object_set = position.object_number / m_splay_width;
}
if (m_active_set > object_set + m_settings.max_concurrent_object_sets) {
ldout(m_cct, 1) << __func__ << ": " << client_id
<< ": scheduling disconnect" << dendl;
ctx = new FunctionContext([this, client_id, ctx](int r1) {
ldout(m_cct, 10) << __func__ << ": " << client_id
<< ": flagging disconnected" << dendl;
librados::ObjectWriteOperation op;
client::client_update_state(&op, client_id,
cls::journal::CLIENT_STATE_DISCONNECTED);
librados::AioCompletion *comp =
librados::Rados::aio_create_completion(ctx, nullptr,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, comp, &op);
assert(r == 0);
comp->release();
});
}
}
if (ctx == on_finish) {
ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl;
}
return ctx;
}
std::ostream &operator<<(std::ostream &os,
const JournalMetadata::RegisteredClients &clients) {
os << "[";

View File

@ -344,6 +344,8 @@ private:
void handle_watch_error(int err);
void handle_notified(int r);
Context *schedule_laggy_clients_disconnect(Context *on_finish);
friend std::ostream &operator<<(std::ostream &os,
const JournalMetadata &journal_metadata);
};

View File

@ -157,8 +157,11 @@ void JournalTrimmer::handle_metadata_updated() {
uint64_t minimum_commit_set = active_set;
std::string minimum_client_id;
// TODO: add support for trimming past "laggy" clients
for (auto &client : registered_clients) {
if (client.state == cls::journal::CLIENT_STATE_DISCONNECTED) {
continue;
}
if (client.commit_position.object_positions.empty()) {
// client hasn't recorded any commits
minimum_commit_set = minimum_set;

View File

@ -12,6 +12,9 @@ struct Settings {
double commit_interval = 5; ///< commit position throttle (in secs)
uint64_t max_fetch_bytes = 0; ///< 0 implies no limit
uint64_t max_payload_bytes = 0; ///< 0 implies object size limit
int max_concurrent_object_sets = 0; ///< 0 implies no limit
std::set<std::string> whitelisted_laggy_clients;
///< clients that mustn't be disconnected
};
} // namespace journal

View File

@ -944,7 +944,9 @@ struct C_InvalidateCache : public Context {
"rbd_journal_object_flush_bytes", false)(
"rbd_journal_object_flush_age", false)(
"rbd_journal_pool", false)(
"rbd_journal_max_payload_bytes", false);
"rbd_journal_max_payload_bytes", false)(
"rbd_journal_max_concurrent_object_sets", false)(
"rbd_mirroring_resync_after_disconnect", false);
md_config_t local_config_t;
std::map<std::string, bufferlist> res;
@ -1000,6 +1002,8 @@ struct C_InvalidateCache : public Context {
ASSIGN_OPTION(journal_object_flush_age);
ASSIGN_OPTION(journal_pool);
ASSIGN_OPTION(journal_max_payload_bytes);
ASSIGN_OPTION(journal_max_concurrent_object_sets);
ASSIGN_OPTION(mirroring_resync_after_disconnect);
}
ExclusiveLock<ImageCtx> *ImageCtx::create_exclusive_lock() {

View File

@ -188,6 +188,8 @@ namespace librbd {
double journal_object_flush_age;
std::string journal_pool;
uint32_t journal_max_payload_bytes;
int journal_max_concurrent_object_sets;
bool mirroring_resync_after_disconnect;
LibrbdAdminSocketHook *asok_hook;

View File

@ -1141,6 +1141,11 @@ void Journal<I>::create_journaler() {
::journal::Settings settings;
settings.commit_interval = m_image_ctx.journal_commit_age;
settings.max_payload_bytes = m_image_ctx.journal_max_payload_bytes;
settings.max_concurrent_object_sets =
m_image_ctx.journal_max_concurrent_object_sets;
// TODO: a configurable filter to exclude certain peers from being
// disconnected.
settings.whitelisted_laggy_clients = {IMAGE_CLIENT_ID};
m_journaler = new Journaler(m_work_queue, m_timer, m_timer_lock,
m_image_ctx.md_ctx, m_image_ctx.id,

View File

@ -36,6 +36,7 @@
import-diff Import an incremental diff.
info Show information about image size, striping,
etc.
journal client disconnect Flag image journal client as disconnected.
journal export Export image journal.
journal import Import image journal.
journal info Show information about image journal.
@ -653,6 +654,24 @@
--format arg output format [plain, json, or xml]
--pretty-format pretty formatting (json and xml)
rbd help journal client disconnect
usage: rbd journal client disconnect [--pool <pool>] [--image <image>]
[--journal <journal>]
[--client-id <client-id>]
<journal-spec>
Flag image journal client as disconnected.
Positional arguments
<journal-spec> journal specification
(example: [<pool-name>/]<journal-name>)
Optional arguments
-p [ --pool ] arg pool name
--image arg image name
--journal arg journal name
--client-id arg client ID (or leave unspecified to disconnect all)
rbd help journal export
usage: rbd journal export [--pool <pool>] [--image <image>]
[--journal <journal>] [--path <path>] [--verbose]

View File

@ -68,10 +68,12 @@ int RadosTestFixture::create(const std::string &oid, uint8_t order,
journal::JournalMetadataPtr RadosTestFixture::create_metadata(
const std::string &oid, const std::string &client_id,
double commit_interval, uint64_t max_fetch_bytes) {
double commit_interval, uint64_t max_fetch_bytes,
int max_concurrent_object_sets) {
journal::Settings settings;
settings.commit_interval = commit_interval;
settings.max_fetch_bytes = max_fetch_bytes;
settings.max_concurrent_object_sets = max_concurrent_object_sets;
journal::JournalMetadataPtr metadata(new journal::JournalMetadata(
m_work_queue, m_timer, &m_timer_lock, m_ioctx, oid, client_id, settings));

View File

@ -26,7 +26,8 @@ public:
journal::JournalMetadataPtr create_metadata(const std::string &oid,
const std::string &client_id = "client",
double commit_internal = 0.1,
uint64_t max_fetch_bytes = 0);
uint64_t max_fetch_bytes = 0,
int max_concurrent_object_sets = 0);
int append(const std::string &oid, const bufferlist &bl);
int client_register(const std::string &oid, const std::string &id = "client",

View File

@ -102,6 +102,7 @@ struct MockJournaler {
Context*));
MOCK_METHOD2(register_client, void(const bufferlist &, Context *));
MOCK_METHOD1(unregister_client, void(Context *));
MOCK_METHOD3(get_client, void(const std::string &, cls::journal::Client *,
Context *));
MOCK_METHOD2(get_cached_client, int(const std::string&, cls::journal::Client*));
@ -159,9 +160,6 @@ struct MockJournalerProxy {
int register_client(const bufferlist &data) {
return -EINVAL;
}
void unregister_client(Context *ctx) {
ctx->complete(-EINVAL);
}
void allocate_tag(uint64_t, const bufferlist &,
cls::journal::Tag*, Context *on_finish) {
@ -196,6 +194,10 @@ struct MockJournalerProxy {
MockJournaler::get_instance().register_client(data, on_finish);
}
void unregister_client(Context *on_finish) {
MockJournaler::get_instance().unregister_client(on_finish);
}
void get_client(const std::string &client_id, cls::journal::Client *client,
Context *on_finish) {
MockJournaler::get_instance().get_client(client_id, client, on_finish);

View File

@ -21,9 +21,12 @@ public:
journal::JournalMetadataPtr create_metadata(const std::string &oid,
const std::string &client_id,
double commit_internal = 0.1) {
double commit_interval = 0.1,
uint64_t max_fetch_bytes = 0,
int max_concurrent_object_sets = 0) {
journal::JournalMetadataPtr metadata = RadosTestFixture::create_metadata(
oid, client_id, commit_internal);
oid, client_id, commit_interval, max_fetch_bytes,
max_concurrent_object_sets);
m_metadata_list.push_back(metadata);
metadata->add_listener(&m_listener);
return metadata;
@ -116,6 +119,70 @@ TEST_F(TestJournalMetadata, UpdateActiveObject) {
ASSERT_EQ(123U, metadata1->get_active_set());
}
TEST_F(TestJournalMetadata, DisconnectLaggyClient) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid, "client1", ""));
ASSERT_EQ(0, client_register(oid, "client2", "laggy"));
int max_concurrent_object_sets = 100;
journal::JournalMetadataPtr metadata =
create_metadata(oid, "client1", 0.1, 0, max_concurrent_object_sets);
ASSERT_EQ(0, init_metadata(metadata));
ASSERT_TRUE(wait_for_update(metadata));
ASSERT_EQ(0U, metadata->get_active_set());
journal::JournalMetadata::RegisteredClients clients;
#define ASSERT_CLIENT_STATES(s1, s2) \
ASSERT_EQ(2U, clients.size()); \
for (auto &c : clients) { \
if (c.id == "client1") { \
ASSERT_EQ(c.state, s1); \
} else if (c.id == "client2") { \
ASSERT_EQ(c.state, s2); \
} else { \
ASSERT_TRUE(false); \
} \
}
metadata->get_registered_clients(&clients);
ASSERT_CLIENT_STATES(cls::journal::CLIENT_STATE_CONNECTED,
cls::journal::CLIENT_STATE_CONNECTED);
// client2 is connected when active set <= max_concurrent_object_sets
ASSERT_EQ(0, metadata->set_active_set(max_concurrent_object_sets));
ASSERT_TRUE(wait_for_update(metadata));
uint64_t commit_tid = metadata->allocate_commit_tid(0, 0, 0);
C_SaferCond cond1;
metadata->committed(commit_tid, [&cond1]() { return &cond1; });
ASSERT_EQ(0, cond1.wait());
metadata->flush_commit_position();
ASSERT_TRUE(wait_for_update(metadata));
ASSERT_EQ(100U, metadata->get_active_set());
clients.clear();
metadata->get_registered_clients(&clients);
ASSERT_CLIENT_STATES(cls::journal::CLIENT_STATE_CONNECTED,
cls::journal::CLIENT_STATE_CONNECTED);
// client2 is disconnected when active set > max_concurrent_object_sets
ASSERT_EQ(0, metadata->set_active_set(max_concurrent_object_sets + 1));
ASSERT_TRUE(wait_for_update(metadata));
commit_tid = metadata->allocate_commit_tid(0, 0, 1);
C_SaferCond cond2;
metadata->committed(commit_tid, [&cond2]() { return &cond2; });
ASSERT_EQ(0, cond2.wait());
metadata->flush_commit_position();
ASSERT_TRUE(wait_for_update(metadata));
ASSERT_EQ(101U, metadata->get_active_set());
clients.clear();
metadata->get_registered_clients(&clients);
ASSERT_CLIENT_STATES(cls::journal::CLIENT_STATE_CONNECTED,
cls::journal::CLIENT_STATE_DISCONNECTED);
}
TEST_F(TestJournalMetadata, AssertActiveTag) {
std::string oid = get_temp_oid();

View File

@ -92,7 +92,11 @@ struct MockImageCtx {
journal_object_flush_bytes(image_ctx.journal_object_flush_bytes),
journal_object_flush_age(image_ctx.journal_object_flush_age),
journal_pool(image_ctx.journal_pool),
journal_max_payload_bytes(image_ctx.journal_max_payload_bytes)
journal_max_payload_bytes(image_ctx.journal_max_payload_bytes),
journal_max_concurrent_object_sets(
image_ctx.journal_max_concurrent_object_sets),
mirroring_resync_after_disconnect(
image_ctx.mirroring_resync_after_disconnect)
{
md_ctx.dup(image_ctx.md_ctx);
data_ctx.dup(image_ctx.data_ctx);
@ -260,6 +264,8 @@ struct MockImageCtx {
double journal_object_flush_age;
std::string journal_pool;
uint32_t journal_max_payload_bytes;
int journal_max_concurrent_object_sets;
bool mirroring_resync_after_disconnect;
};
} // namespace librbd

View File

@ -233,6 +233,9 @@ public:
std::set<cls::journal::Client>::const_iterator c;
for (c = registered_clients.begin(); c != registered_clients.end(); c++) {
std::cout << __func__ << ": client: " << *c << std::endl;
if (c->state != cls::journal::CLIENT_STATE_CONNECTED) {
continue;
}
cls::journal::ObjectPositions object_positions =
c->commit_position.object_positions;
cls::journal::ObjectPositions::const_iterator p =
@ -822,3 +825,91 @@ TEST_F(TestImageReplayer, MultipleReplayFailures_MultiEpoch) {
close_image(ictx);
}
TEST_F(TestImageReplayer, Disconnect)
{
bootstrap();
// Make sure rbd_mirroring_resync_after_disconnect is not set
EXPECT_EQ(0, m_local_cluster->conf_set("rbd_mirroring_resync_after_disconnect", "false"));
// Test start fails if disconnected
librbd::ImageCtx *ictx;
generate_test_data();
open_remote_image(&ictx);
for (int i = 0; i < TEST_IO_COUNT; ++i) {
write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
}
flush(ictx);
close_image(ictx);
std::string oid = ::journal::Journaler::header_oid(m_remote_image_id);
ASSERT_EQ(0, cls::journal::client::client_update_state(m_remote_ioctx, oid,
m_local_mirror_uuid, cls::journal::CLIENT_STATE_DISCONNECTED));
C_SaferCond cond1;
m_replayer->start(&cond1);
ASSERT_EQ(-ENOTCONN, cond1.wait());
// Test start succeeds after resync
open_local_image(&ictx);
librbd::Journal<>::request_resync(ictx);
close_image(ictx);
C_SaferCond cond2;
m_replayer->start(&cond2);
ASSERT_EQ(-ENOTCONN, cond2.wait());
C_SaferCond delete_cond;
m_image_deleter->wait_for_scheduled_deletion(
m_local_ioctx.get_id(), m_replayer->get_global_image_id(), &delete_cond);
EXPECT_EQ(0, delete_cond.wait());
start();
wait_for_replay_complete();
// Test replay stopped after disconnect
open_remote_image(&ictx);
for (int i = TEST_IO_COUNT; i < 2 * TEST_IO_COUNT; ++i) {
write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
}
flush(ictx);
close_image(ictx);
ASSERT_EQ(0, cls::journal::client::client_update_state(m_remote_ioctx, oid,
m_local_mirror_uuid, cls::journal::CLIENT_STATE_DISCONNECTED));
bufferlist bl;
ASSERT_EQ(0, m_remote_ioctx.notify2(oid, bl, 5000, NULL));
wait_for_stopped();
// Test start fails after disconnect
C_SaferCond cond3;
m_replayer->start(&cond3);
ASSERT_EQ(-ENOTCONN, cond3.wait());
C_SaferCond cond4;
m_replayer->start(&cond4);
ASSERT_EQ(-ENOTCONN, cond4.wait());
// Test automatic resync if rbd_mirroring_resync_after_disconnect is set
EXPECT_EQ(0, m_local_cluster->conf_set("rbd_mirroring_resync_after_disconnect", "true"));
// Resync is flagged on first start attempt
C_SaferCond cond5;
m_replayer->start(&cond5);
ASSERT_EQ(-ENOTCONN, cond5.wait());
C_SaferCond delete_cond1;
m_image_deleter->wait_for_scheduled_deletion(
m_local_ioctx.get_id(), m_replayer->get_global_image_id(), &delete_cond1);
EXPECT_EQ(0, delete_cond1.wait());
C_SaferCond cond6;
m_replayer->start(&cond6);
ASSERT_EQ(0, cond6.wait());
wait_for_replay_complete();
stop();
}

View File

@ -168,6 +168,66 @@ static int do_reset_journal(librados::IoCtx& io_ctx,
return 0;
}
static int do_disconnect_journal_client(librados::IoCtx& io_ctx,
const std::string& journal_id,
const std::string& client_id)
{
int r;
C_SaferCond cond;
uint64_t minimum_set;
uint64_t active_set;
std::set<cls::journal::Client> registered_clients;
std::string oid = ::journal::Journaler::header_oid(journal_id);
cls::journal::client::get_mutable_metadata(io_ctx, oid, &minimum_set,
&active_set, &registered_clients,
&cond);
r = cond.wait();
if (r < 0) {
std::cerr << "warning: failed to get journal metadata" << std::endl;
return r;
}
static const std::string IMAGE_CLIENT_ID("");
bool found = false;
for (auto &c : registered_clients) {
if (c.id == IMAGE_CLIENT_ID || (!client_id.empty() && client_id != c.id)) {
continue;
}
r = cls::journal::client::client_update_state(io_ctx, oid, c.id,
cls::journal::CLIENT_STATE_DISCONNECTED);
if (r < 0) {
std::cerr << "warning: failed to disconnect client " << c.id << ": "
<< cpp_strerror(r) << std::endl;
return r;
}
std::cout << "client " << c.id << " disconnected" << std::endl;
found = true;
}
if (!found) {
if (!client_id.empty()) {
std::cerr << "warning: client " << client_id << " is not registered"
<< std::endl;
} else {
std::cerr << "no registered clients to disconnect" << std::endl;
}
return -ENOENT;
}
bufferlist bl;
r = io_ctx.notify2(oid, bl, 5000, NULL);
if (r < 0) {
std::cerr << "warning: failed to notify state change:" << ": "
<< cpp_strerror(r) << std::endl;
return r;
}
return 0;
}
class Journaler : public ::journal::Journaler {
public:
Journaler(librados::IoCtx& io_ctx, const std::string& journal_id,
@ -847,6 +907,45 @@ int execute_reset(const po::variables_map &vm) {
return 0;
}
void get_client_disconnect_arguments(po::options_description *positional,
po::options_description *options) {
at::add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
options->add_options()
("client-id", po::value<std::string>(),
"client ID (or leave unspecified to disconnect all)");
}
int execute_client_disconnect(const po::variables_map &vm) {
size_t arg_index = 0;
std::string pool_name;
std::string journal_name;
int r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE,
&arg_index, &pool_name, &journal_name);
if (r < 0) {
return r;
}
std::string client_id;
if (vm.count("client-id")) {
client_id = vm["client-id"].as<std::string>();
}
librados::Rados rados;
librados::IoCtx io_ctx;
r = utils::init(pool_name, &rados, &io_ctx);
if (r < 0) {
return r;
}
r = do_disconnect_journal_client(io_ctx, journal_name, client_id);
if (r < 0) {
std::cerr << "rbd: journal client disconnect: " << cpp_strerror(r)
<< std::endl;
return r;
}
return 0;
}
void get_inspect_arguments(po::options_description *positional,
po::options_description *options) {
at::add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
@ -985,6 +1084,11 @@ Shell::Action action_import(
{"journal", "import"}, {}, "Import image journal.", "",
&get_import_arguments, &execute_import);
Shell::Action action_disconnect(
{"journal", "client", "disconnect"}, {},
"Flag image journal client as disconnected.", "",
&get_client_disconnect_arguments, &execute_client_disconnect);
} // namespace journal
} // namespace action
} // namespace rbd

View File

@ -251,6 +251,15 @@ void ImageReplayer<I>::BootstrapProgressContext::update_progress(
}
}
template <typename I>
void ImageReplayer<I>::RemoteJournalerListener::handle_update(
::journal::JournalMetadata *) {
FunctionContext *ctx = new FunctionContext([this](int r) {
replayer->handle_remote_journal_metadata_updated();
});
replayer->m_threads->work_queue->queue(ctx, 0);
}
template <typename I>
ImageReplayer<I>::ImageReplayer(Threads *threads,
shared_ptr<ImageDeleter> image_deleter,
@ -277,7 +286,8 @@ ImageReplayer<I>::ImageReplayer(Threads *threads,
m_lock("rbd::mirror::ImageReplayer " + stringify(remote_pool_id) + " " +
remote_image_id),
m_progress_cxt(this),
m_resync_listener(new ResyncListener<I>(this))
m_resync_listener(new ResyncListener<I>(this)),
m_remote_listener(this)
{
// Register asok commands using a temporary "remote_pool_name/global_image_id"
// name. When the image name becomes known on start the asok commands will be
@ -455,7 +465,14 @@ void ImageReplayer<I>::handle_bootstrap(int r) {
if (do_resync) {
Context *on_finish = m_on_start_finish;
m_stopping_for_resync = true;
FunctionContext *ctx = new FunctionContext([this, on_finish](int r) {
if (r < 0) {
if (on_finish) {
on_finish->complete(r);
}
return;
}
resync_image(on_finish);
});
m_on_start_finish = ctx;
@ -502,6 +519,27 @@ void ImageReplayer<I>::handle_init_remote_journaler(int r) {
return;
}
m_remote_journaler->add_listener(&m_remote_listener);
cls::journal::Client client;
r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
if (r < 0) {
derr << "error retrieving remote journal client: " << cpp_strerror(r)
<< dendl;
on_start_fail(r, "error retrieving remote journal client");
return;
}
if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
dout(5) << "client flagged disconnected, stopping image replay" << dendl;
if (m_local_image_ctx->mirroring_resync_after_disconnect) {
Mutex::Locker locker(m_lock);
m_stopping_for_resync = true;
}
on_start_fail(-ENOTCONN, "disconnected");
return;
}
start_replay();
}
@ -630,15 +668,18 @@ bool ImageReplayer<I>::on_start_interrupted()
}
template <typename I>
void ImageReplayer<I>::stop(Context *on_finish, bool manual)
void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
const std::string& desc)
{
dout(20) << "on_finish=" << on_finish << dendl;
dout(20) << "on_finish=" << on_finish << ", manual=" << manual
<< ", desc=" << desc << dendl;
image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
bool shut_down_replay = false;
bool running = true;
{
Mutex::Locker locker(m_lock);
if (!is_running_()) {
running = false;
} else {
@ -677,14 +718,14 @@ void ImageReplayer<I>::stop(Context *on_finish, bool manual)
}
if (shut_down_replay) {
on_stop_journal_replay();
on_stop_journal_replay(r, desc);
} else if (on_finish != nullptr) {
on_finish->complete(0);
}
}
template <typename I>
void ImageReplayer<I>::on_stop_journal_replay()
void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
{
dout(20) << "enter" << dendl;
@ -698,7 +739,7 @@ void ImageReplayer<I>::on_stop_journal_replay()
m_state = STATE_STOPPING;
}
set_state_description(0, "");
set_state_description(r, desc);
update_mirror_image_status(false, boost::none);
reschedule_update_status_task(-1);
shut_down(0);
@ -1336,6 +1377,7 @@ void ImageReplayer<I>::shut_down(int r) {
ctx->complete(0);
});
ctx = new FunctionContext([this, ctx](int r) {
m_remote_journaler->remove_listener(&m_remote_listener);
m_remote_journaler->shut_down(ctx);
});
if (m_stopping_for_resync) {
@ -1436,6 +1478,30 @@ void ImageReplayer<I>::handle_shut_down(int r) {
}
}
template <typename I>
void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
dout(20) << dendl;
cls::journal::Client client;
{
Mutex::Locker locker(m_lock);
if (!is_running_()) {
return;
}
int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
if (r < 0) {
derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
return;
}
}
if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
dout(0) << "client flagged disconnected, stopping image replay" << dendl;
stop(nullptr, false, -ENOTCONN, "disconnected");
}
}
template <typename I>
std::string ImageReplayer<I>::to_string(const State state) {
switch (state) {

View File

@ -14,6 +14,7 @@
#include "include/rados/librados.hpp"
#include "cls/journal/cls_journal_types.h"
#include "cls/rbd/cls_rbd_types.h"
#include "journal/JournalMetadataListener.h"
#include "journal/ReplayEntry.h"
#include "librbd/ImageCtx.h"
#include "librbd/journal/Types.h"
@ -111,7 +112,8 @@ public:
}
void start(Context *on_finish = nullptr, bool manual = false);
void stop(Context *on_finish = nullptr, bool manual = false);
void stop(Context *on_finish = nullptr, bool manual = false,
int r = 0, const std::string& desc = "");
void restart(Context *on_finish = nullptr);
void flush(Context *on_finish = nullptr);
@ -190,7 +192,7 @@ protected:
virtual void on_start_fail(int r, const std::string &desc = "");
virtual bool on_start_interrupted();
virtual void on_stop_journal_replay();
virtual void on_stop_journal_replay(int r = 0, const std::string &desc = "");
virtual void on_flush_local_replay_flush_start(Context *on_flush);
virtual void on_flush_local_replay_flush_finish(Context *on_flush, int r);
@ -268,6 +270,14 @@ private:
librbd::journal::TagData m_replay_tag_data;
librbd::journal::EventEntry m_event_entry;
struct RemoteJournalerListener : public ::journal::JournalMetadataListener {
ImageReplayer *replayer;
RemoteJournalerListener(ImageReplayer *replayer) : replayer(replayer) { }
void handle_update(::journal::JournalMetadata *);
} m_remote_listener;
struct C_ReplayCommitted : public Context {
ImageReplayer *replayer;
ReplayEntry replay_entry;
@ -307,6 +317,7 @@ private:
void shut_down(int r);
void handle_shut_down(int r);
void handle_remote_journal_metadata_updated();
void bootstrap();
void handle_bootstrap(int r);

View File

@ -370,6 +370,12 @@ void BootstrapRequest<I>::handle_open_local_image(int r) {
m_ret_val = r;
close_remote_image();
return;
} if (m_client.state == cls::journal::CLIENT_STATE_DISCONNECTED) {
dout(10) << ": client flagged disconnected -- skipping bootstrap" << dendl;
// The caller is expected to detect disconnect initializing remote journal.
m_ret_val = 0;
close_remote_image();
return;
}
update_client_image();
@ -728,14 +734,14 @@ bool BootstrapRequest<I>::decode_client_meta() {
::decode(client_data, it);
} catch (const buffer::error &err) {
derr << ": failed to decode client meta data: " << err.what() << dendl;
return true;
return false;
}
librbd::journal::MirrorPeerClientMeta *client_meta =
boost::get<librbd::journal::MirrorPeerClientMeta>(&client_data.client_meta);
if (client_meta == nullptr) {
derr << ": unknown peer registration" << dendl;
return true;
return false;
} else if (!client_meta->image_id.empty()) {
// have an image id -- use that to open the image
m_local_image_id = client_meta->image_id;