Merge pull request #7884 from dillaman/wip-14663

librbd: allocate new journal tag after acquiring exclusive lock

Reviewed-by: Josh Durgin <jdurgin@redhat.com>
This commit is contained in:
Josh Durgin 2016-03-09 14:02:38 -08:00
commit 55efa7ae74
30 changed files with 1294 additions and 325 deletions

View File

@ -111,6 +111,8 @@ cls_method_handle_t h_old_snapshots_list;
cls_method_handle_t h_old_snapshot_add;
cls_method_handle_t h_old_snapshot_remove;
cls_method_handle_t h_old_snapshot_rename;
cls_method_handle_t h_mirror_uuid_get;
cls_method_handle_t h_mirror_uuid_set;
cls_method_handle_t h_mirror_mode_get;
cls_method_handle_t h_mirror_mode_set;
cls_method_handle_t h_mirror_peer_list;
@ -204,6 +206,15 @@ static int read_key(cls_method_context_t hctx, const string &key, T *out)
return 0;
}
static int remove_key(cls_method_context_t hctx, const string &key) {
int r = cls_cxx_map_remove_key(hctx, key);
if (r < 0 && r != -ENOENT) {
CLS_ERR("failed to remove key: %s", key.c_str());
return r;
}
return 0;
}
static bool is_valid_id(const string &id) {
if (!id.size())
return false;
@ -2939,6 +2950,7 @@ int old_snapshot_rename(cls_method_context_t hctx, bufferlist *in, bufferlist *o
namespace mirror {
static const std::string UUID("mirror_uuid");
static const std::string MODE("mirror_mode");
static const std::string PEER_KEY_PREFIX("mirror_peer_");
static const std::string IMAGE_KEY_PREFIX("image_");
@ -2951,6 +2963,20 @@ std::string image_key(const string &image_id) {
return IMAGE_KEY_PREFIX + image_id;
}
int uuid_get(cls_method_context_t hctx, std::string *mirror_uuid) {
bufferlist mirror_uuid_bl;
int r = cls_cxx_map_get_val(hctx, mirror::UUID, &mirror_uuid_bl);
if (r < 0) {
if (r != -ENOENT) {
CLS_ERR("error reading mirror uuid: %s", cpp_strerror(r).c_str());
}
return r;
}
*mirror_uuid = std::string(mirror_uuid_bl.c_str(), mirror_uuid_bl.length());
return 0;
}
int read_peers(cls_method_context_t hctx,
std::vector<cls::rbd::MirrorPeer> *peers) {
std::string last_read = PEER_KEY_PREFIX;
@ -3115,6 +3141,67 @@ int image_remove(cls_method_context_t hctx, const string &image_id) {
} // namespace mirror
/**
* Input:
* none
*
* Output:
* @param uuid (std::string)
* @returns 0 on success, negative error code on failure
*/
int mirror_uuid_get(cls_method_context_t hctx, bufferlist *in,
bufferlist *out) {
std::string mirror_uuid;
int r = mirror::uuid_get(hctx, &mirror_uuid);
if (r < 0) {
return r;
}
::encode(mirror_uuid, *out);
return 0;
}
/**
* Input:
* @param mirror_uuid (std::string)
*
* Output:
* @returns 0 on success, negative error code on failure
*/
int mirror_uuid_set(cls_method_context_t hctx, bufferlist *in,
bufferlist *out) {
std::string mirror_uuid;
try {
bufferlist::iterator bl_it = in->begin();
::decode(mirror_uuid, bl_it);
} catch (const buffer::error &err) {
return -EINVAL;
}
if (mirror_uuid.empty()) {
CLS_ERR("cannot set empty mirror uuid");
return -EINVAL;
}
uint32_t mirror_mode;
int r = read_key(hctx, mirror::MODE, &mirror_mode);
if (r < 0 && r != -ENOENT) {
return r;
} else if (r == 0 && mirror_mode != cls::rbd::MIRROR_MODE_DISABLED) {
CLS_ERR("cannot set mirror uuid while mirroring enabled");
return -EINVAL;
}
bufferlist mirror_uuid_bl;
mirror_uuid_bl.append(mirror_uuid);
r = cls_cxx_map_set_val(hctx, mirror::UUID, &mirror_uuid_bl);
if (r < 0) {
CLS_ERR("failed to set mirror uuid");
return r;
}
return 0;
}
/**
* Input:
* none
@ -3168,6 +3255,14 @@ int mirror_mode_set(cls_method_context_t hctx, bufferlist *in,
int r;
if (enabled) {
std::string mirror_uuid;
r = mirror::uuid_get(hctx, &mirror_uuid);
if (r == -ENOENT) {
return -EINVAL;
} else if (r < 0) {
return r;
}
bufferlist bl;
::encode(mirror_mode_decode, bl);
@ -3188,9 +3283,13 @@ int mirror_mode_set(cls_method_context_t hctx, bufferlist *in,
return -EBUSY;
}
r = cls_cxx_map_remove_key(hctx, mirror::MODE);
if (r < 0 && r != -ENOENT) {
CLS_ERR("error disabling mirroring: %s", cpp_strerror(r).c_str());
r = remove_key(hctx, mirror::MODE);
if (r < 0) {
return r;
}
r = remove_key(hctx, mirror::UUID);
if (r < 0) {
return r;
}
}
@ -3242,6 +3341,17 @@ int mirror_peer_add(cls_method_context_t hctx, bufferlist *in,
mirror_mode_decode == cls::rbd::MIRROR_MODE_DISABLED) {
CLS_ERR("mirroring must be enabled on the pool");
return -EINVAL;
} else if (!mirror_peer.is_valid()) {
CLS_ERR("mirror peer is not valid");
return -EINVAL;
}
std::string mirror_uuid;
r = mirror::uuid_get(hctx, &mirror_uuid);
if (mirror_peer.uuid == mirror_uuid) {
CLS_ERR("peer uuid '%s' matches pool mirroring uuid",
mirror_uuid.c_str());
return -EINVAL;
}
std::vector<cls::rbd::MirrorPeer> peers;
@ -3626,6 +3736,11 @@ void __cls_init()
old_snapshot_rename, &h_old_snapshot_rename);
/* methods for the rbd_mirroring object */
cls_register_cxx_method(h_class, "mirror_uuid_get", CLS_METHOD_RD,
mirror_uuid_get, &h_mirror_uuid_get);
cls_register_cxx_method(h_class, "mirror_uuid_set",
CLS_METHOD_RD | CLS_METHOD_WR,
mirror_uuid_set, &h_mirror_uuid_set);
cls_register_cxx_method(h_class, "mirror_mode_get", CLS_METHOD_RD,
mirror_mode_get, &h_mirror_mode_get);
cls_register_cxx_method(h_class, "mirror_mode_set",

View File

@ -980,6 +980,37 @@ namespace librbd {
return 0;
}
int mirror_uuid_get(librados::IoCtx *ioctx, std::string *uuid) {
bufferlist in_bl;
bufferlist out_bl;
int r = ioctx->exec(RBD_MIRRORING, "rbd", "mirror_uuid_get", in_bl,
out_bl);
if (r < 0) {
return r;
}
try {
bufferlist::iterator bl_it = out_bl.begin();
::decode(*uuid, bl_it);
} catch (const buffer::error &err) {
return -EBADMSG;
}
return 0;
}
int mirror_uuid_set(librados::IoCtx *ioctx, const std::string &uuid) {
bufferlist in_bl;
::encode(uuid, in_bl);
bufferlist out_bl;
int r = ioctx->exec(RBD_MIRRORING, "rbd", "mirror_uuid_set", in_bl,
out_bl);
if (r < 0) {
return r;
}
return 0;
}
int mirror_mode_get(librados::IoCtx *ioctx,
cls::rbd::MirrorMode *mirror_mode) {
bufferlist in_bl;

View File

@ -207,6 +207,8 @@ namespace librbd {
::SnapContext *snapc);
// operations on the rbd_mirroring object
int mirror_uuid_get(librados::IoCtx *ioctx, std::string *uuid);
int mirror_uuid_set(librados::IoCtx *ioctx, const std::string &uuid);
int mirror_mode_get(librados::IoCtx *ioctx,
cls::rbd::MirrorMode *mirror_mode);
int mirror_mode_set(librados::IoCtx *ioctx,

View File

@ -35,6 +35,10 @@ struct MirrorPeer {
std::string client_name;
int64_t pool_id = -1;
inline bool is_valid() const {
return (!uuid.empty() && !cluster_name.empty() && !client_name.empty());
}
void encode(bufferlist &bl) const;
void decode(bufferlist::iterator &it);
void dump(Formatter *f) const;

View File

@ -591,7 +591,9 @@ void JournalMetadata::schedule_commit_task() {
void JournalMetadata::handle_commit_position_task() {
assert(m_timer_lock.is_locked());
assert(m_lock.is_locked());
ldout(m_cct, 20) << __func__ << dendl;
ldout(m_cct, 20) << __func__ << ": "
<< "client_id=" << m_client_id << ", "
<< "commit_position=" << m_commit_position << dendl;
librados::ObjectWriteOperation op;
client::client_commit(&op, m_client_id, m_commit_position);

View File

@ -68,7 +68,9 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
// start replay after the last committed entry's object
uint8_t splay_width = m_journal_metadata->get_splay_width();
m_commit_object = commit_position.object_positions.front().object_number;
auto &active_position = commit_position.object_positions.front();
m_active_tag_tid = active_position.tag_tid;
m_commit_object = active_position.object_number;
m_splay_offset = m_commit_object % splay_width;
for (auto &position : commit_position.object_positions) {
uint8_t splay_offset = position.object_number % splay_width;
@ -79,6 +81,10 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
JournalPlayer::~JournalPlayer() {
m_async_op_tracker.wait_for_ops();
{
Mutex::Locker locker(m_lock);
assert(m_fetch_object_numbers.empty());
}
m_replay_handler->put();
}
@ -134,13 +140,13 @@ void JournalPlayer::prefetch_and_watch(double interval) {
}
void JournalPlayer::unwatch() {
ldout(m_cct, 20) << __func__ << dendl;
Mutex::Locker locker(m_lock);
m_watch_enabled = false;
if (m_watch_scheduled) {
ObjectPlayerPtr object_player = get_object_player();
assert(object_player);
object_player->unwatch();
for (auto &players : m_object_players) {
players.second.begin()->second->unwatch();
}
m_watch_scheduled = false;
}
}
@ -148,56 +154,51 @@ void JournalPlayer::unwatch() {
bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
ldout(m_cct, 20) << __func__ << dendl;
Mutex::Locker locker(m_lock);
m_handler_notified = false;
if (m_state != STATE_PLAYBACK) {
return false;
}
ObjectPlayerPtr object_player = get_object_player();
assert(object_player);
if (!is_object_set_ready()) {
return false;
}
if (object_player->empty()) {
if (m_watch_enabled && !m_watch_scheduled) {
object_player->watch(
new C_Watch(this, object_player->get_object_number()),
m_watch_interval);
m_watch_scheduled = true;
} else if (!m_watch_enabled && !object_player->is_fetch_in_progress()) {
ldout(m_cct, 10) << __func__ << ": replay complete" << dendl;
m_journal_metadata->get_finisher().queue(new C_HandleComplete(
m_replay_handler), 0);
if (!verify_playback_ready()) {
if (!m_watch_enabled) {
notify_complete(0);
} else if (!m_watch_scheduled) {
schedule_watch();
}
return false;
}
ObjectPlayerPtr object_player = get_object_player();
assert(object_player && !object_player->empty());
object_player->front(entry);
object_player->pop_front();
uint64_t last_entry_tid;
if (m_journal_metadata->get_last_allocated_entry_tid(
entry->get_tag_tid(), &last_entry_tid) &&
entry->get_entry_tid() != last_entry_tid + 1) {
if (m_active_tag_tid && *m_active_tag_tid != entry->get_tag_tid()) {
lderr(m_cct) << "unexpected tag in journal entry: " << *entry << dendl;
m_state = STATE_ERROR;
notify_complete(-ENOMSG);
return false;
} else if (m_journal_metadata->get_last_allocated_entry_tid(
entry->get_tag_tid(), &last_entry_tid) &&
entry->get_entry_tid() != last_entry_tid + 1) {
lderr(m_cct) << "missing prior journal entry: " << *entry << dendl;
m_state = STATE_ERROR;
m_journal_metadata->get_finisher().queue(new C_HandleComplete(
m_replay_handler), -ENOMSG);
notify_complete(-ENOMSG);
return false;
}
// skip to next splay offset if we cannot apply the next entry in-sequence
if (!object_player->empty()) {
Entry peek_entry;
object_player->front(&peek_entry);
if (peek_entry.get_tag_tid() == entry->get_tag_tid() ||
(m_journal_metadata->get_last_allocated_entry_tid(
peek_entry.get_tag_tid(), &last_entry_tid) &&
last_entry_tid + 1 != peek_entry.get_entry_tid())) {
advance_splay_object();
}
} else {
advance_splay_object();
remove_empty_object_player(object_player);
}
m_active_tag_tid = entry->get_tag_tid();
advance_splay_object();
remove_empty_object_player(object_player);
m_journal_metadata->reserve_entry_tid(entry->get_tag_tid(),
entry->get_entry_tid());
@ -210,8 +211,9 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
void JournalPlayer::process_state(uint64_t object_number, int r) {
ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << ", "
<< "r=" << r << dendl;
assert(m_lock.is_locked());
if (r >= 0) {
Mutex::Locker locker(m_lock);
switch (m_state) {
case STATE_PREFETCH:
ldout(m_cct, 10) << "PREFETCH" << dendl;
@ -232,11 +234,8 @@ void JournalPlayer::process_state(uint64_t object_number, int r) {
}
if (r < 0) {
{
Mutex::Locker locker(m_lock);
m_state = STATE_ERROR;
}
m_replay_handler->handle_complete(r);
m_state = STATE_ERROR;
notify_complete(r);
}
}
@ -258,7 +257,8 @@ int JournalPlayer::process_prefetch(uint64_t object_number) {
ObjectPlayers &object_players = m_object_players[splay_offset];
// prefetch in-order since a newer splay object could prefetch first
while (!object_players.begin()->second->is_fetch_in_progress()) {
while (m_fetch_object_numbers.count(
object_players.begin()->second->get_object_number()) == 0) {
ObjectPlayerPtr object_player = object_players.begin()->second;
uint64_t player_object_number = object_player->get_object_number();
@ -328,21 +328,16 @@ int JournalPlayer::process_prefetch(uint64_t object_number) {
}
m_state = STATE_PLAYBACK;
ObjectPlayerPtr object_player = get_object_player();
if (!object_player->empty()) {
ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
m_replay_handler), 0);
if (!is_object_set_ready()) {
ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl;
} else if (verify_playback_ready()) {
notify_entries_available();
} else if (m_watch_enabled) {
object_player->watch(
new C_Watch(this, object_player->get_object_number()),
m_watch_interval);
m_watch_scheduled = true;
schedule_watch();
} else {
ldout(m_cct, 10) << __func__ << ": no uncommitted entries available"
<< dendl;
m_journal_metadata->get_finisher().queue(new C_HandleComplete(
m_replay_handler), 0);
notify_complete(0);
}
return 0;
}
@ -351,26 +346,74 @@ int JournalPlayer::process_playback(uint64_t object_number) {
ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl;
assert(m_lock.is_locked());
m_watch_scheduled = false;
if (!is_object_set_ready()) {
return 0;
}
ObjectPlayerPtr object_player = get_object_player();
if (object_player->get_object_number() == object_number) {
if (verify_playback_ready()) {
notify_entries_available();
} else if (!m_watch_enabled && is_object_set_ready()) {
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint64_t active_set = m_journal_metadata->get_active_set();
uint64_t object_set = object_player->get_object_number() / splay_width;
if (!object_player->empty()) {
ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
m_replay_handler), 0);
} else if (object_set == active_set) {
ldout(m_cct, 10) << __func__ << ": replay complete" << dendl;
m_journal_metadata->get_finisher().queue(new C_HandleComplete(
m_replay_handler), 0);
if (object_set == active_set) {
notify_complete(0);
}
}
return 0;
}
bool JournalPlayer::is_object_set_ready() const {
assert(m_lock.is_locked());
if (m_watch_scheduled || !m_fetch_object_numbers.empty()) {
return false;
}
return true;
}
bool JournalPlayer::verify_playback_ready() {
assert(m_lock.is_locked());
assert(is_object_set_ready());
ObjectPlayerPtr object_player = get_object_player();
assert(object_player);
// Verify is the active object player has another entry available
// in the sequence
Entry entry;
bool entry_available = false;
if (!object_player->empty()) {
entry_available = true;
object_player->front(&entry);
if (!m_active_tag_tid || entry.get_tag_tid() == *m_active_tag_tid) {
return true;
}
}
// NOTE: replay currently does not check tag class to playback multiple tags
// from different classes (issue #14909). When a new tag is discovered, it
// is assumed that the previous tag was closed at the last replayable entry.
object_player = m_object_players.begin()->second.begin()->second;
if (!object_player->empty() && m_active_tag_tid) {
object_player->front(&entry);
if (entry.get_tag_tid() > *m_active_tag_tid &&
entry.get_entry_tid() == 0) {
uint8_t splay_width = m_journal_metadata->get_splay_width();
m_active_tag_tid = entry.get_tag_tid();
m_splay_offset = object_player->get_object_number() / splay_width;
ldout(m_cct, 20) << __func__ << ": new tag " << entry.get_tag_tid() << " "
<< "detected, adjusting offset to "
<< static_cast<uint32_t>(m_splay_offset) << dendl;
return true;
}
}
// if any entry is available, we can test if the sequence is corrupt
return entry_available;
}
const JournalPlayer::ObjectPlayers &JournalPlayer::get_object_players() const {
assert(m_lock.is_locked());
@ -405,6 +448,7 @@ void JournalPlayer::advance_splay_object() {
bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
assert(m_lock.is_locked());
assert(!m_watch_scheduled);
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint64_t object_set = player->get_object_number() / splay_width;
@ -432,6 +476,9 @@ void JournalPlayer::fetch(uint64_t object_num) {
std::string oid = utils::get_object_name(m_object_oid_prefix, object_num);
assert(m_fetch_object_numbers.count(object_num) == 0);
m_fetch_object_numbers.insert(object_num);
ldout(m_cct, 10) << __func__ << ": " << oid << dendl;
C_Fetch *fetch_ctx = new C_Fetch(this, object_num);
ObjectPlayerPtr object_player(new ObjectPlayer(
@ -447,11 +494,15 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
ldout(m_cct, 10) << __func__ << ": "
<< utils::get_object_name(m_object_oid_prefix, object_num)
<< ": r=" << r << dendl;
Mutex::Locker locker(m_lock);
assert(m_fetch_object_numbers.count(object_num) == 1);
m_fetch_object_numbers.erase(object_num);
if (r == -ENOENT) {
r = 0;
}
if (r == 0) {
Mutex::Locker locker(m_lock);
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint8_t splay_offset = object_num % splay_width;
assert(m_object_players.count(splay_offset) == 1);
@ -461,15 +512,67 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
ObjectPlayerPtr object_player = object_players[object_num];
remove_empty_object_player(object_player);
}
process_state(object_num, r);
}
void JournalPlayer::handle_watch(uint64_t object_num, int r) {
ldout(m_cct, 10) << __func__ << ": "
<< utils::get_object_name(m_object_oid_prefix, object_num)
<< ": r=" << r << dendl;
process_state(object_num, r);
void JournalPlayer::schedule_watch() {
ldout(m_cct, 10) << __func__ << dendl;
assert(m_lock.is_locked());
if (m_watch_scheduled) {
return;
}
// poll first splay offset and active splay offset since
// new records should only appear in those two objects
C_Watch *ctx = new C_Watch(this);
ObjectPlayerPtr object_player = get_object_player();
object_player->watch(ctx, m_watch_interval);
uint8_t splay_width = m_journal_metadata->get_splay_width();
if (object_player->get_object_number() % splay_width != 0) {
++ctx->pending_fetches;
object_player = m_object_players.begin()->second.begin()->second;
object_player->watch(ctx, m_watch_interval);
}
m_watch_scheduled = true;
}
void JournalPlayer::handle_watch(int r) {
ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
Mutex::Locker locker(m_lock);
m_watch_scheduled = false;
std::set<uint64_t> object_numbers;
for (auto &players : m_object_players) {
object_numbers.insert(
players.second.begin()->second->get_object_number());
}
for (auto object_num : object_numbers) {
process_state(object_num, r);
}
}
void JournalPlayer::notify_entries_available() {
assert(m_lock.is_locked());
if (m_handler_notified) {
return;
}
m_handler_notified = true;
ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
m_replay_handler), 0);
}
void JournalPlayer::notify_complete(int r) {
assert(m_lock.is_locked());
m_handler_notified = true;
ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl;
m_journal_metadata->get_finisher().queue(new C_HandleComplete(
m_replay_handler), r);
}
} // namespace journal

View File

@ -12,6 +12,8 @@
#include "journal/JournalMetadata.h"
#include "journal/ObjectPlayer.h"
#include "cls/journal/cls_journal_types.h"
#include <boost/none.hpp>
#include <boost/optional.hpp>
#include <map>
class SafeTimer;
@ -43,6 +45,7 @@ private:
typedef std::map<uint64_t, ObjectPlayerPtr> ObjectPlayers;
typedef std::map<uint8_t, ObjectPlayers> SplayedObjectPlayers;
typedef std::map<uint8_t, ObjectPosition> SplayedObjectPositions;
typedef std::set<uint64_t> ObjectNumbers;
enum State {
STATE_INIT,
@ -51,17 +54,6 @@ private:
STATE_ERROR
};
struct C_Watch : public Context {
JournalPlayer *player;
uint64_t object_num;
C_Watch(JournalPlayer *p, uint64_t o) : player(p), object_num(o) {
}
virtual void finish(int r) {
player->handle_watch(object_num, r);
}
};
struct C_Fetch : public Context {
JournalPlayer *player;
uint64_t object_num;
@ -76,6 +68,34 @@ private:
}
};
struct C_Watch : public Context {
JournalPlayer *player;
uint8_t pending_fetches = 1;
int ret_val = 0;
C_Watch(JournalPlayer *player) : player(player) {
}
virtual void complete(int r) override {
player->m_lock.Lock();
if (ret_val == 0 && r < 0) {
ret_val = r;
}
assert(pending_fetches > 0);
if (--pending_fetches == 0) {
player->m_lock.Unlock();
Context::complete(ret_val);
} else {
player->m_lock.Unlock();
}
}
virtual void finish(int r) override {
player->handle_watch(r);
}
};
librados::IoCtx m_ioctx;
CephContext *m_cct;
std::string m_object_oid_prefix;
@ -93,13 +113,20 @@ private:
bool m_watch_scheduled;
double m_watch_interval;
bool m_handler_notified = false;
ObjectNumbers m_fetch_object_numbers;
PrefetchSplayOffsets m_prefetch_splay_offsets;
SplayedObjectPlayers m_object_players;
uint64_t m_commit_object;
SplayedObjectPositions m_commit_positions;
boost::optional<uint64_t> m_active_tag_tid = boost::none;
void advance_splay_object();
bool is_object_set_ready() const;
bool verify_playback_ready();
const ObjectPlayers &get_object_players() const;
ObjectPlayerPtr get_object_player() const;
ObjectPlayerPtr get_next_set_object_player() const;
@ -111,7 +138,12 @@ private:
void fetch(uint64_t object_num);
void handle_fetched(uint64_t object_num, int r);
void handle_watch(uint64_t object_num, int r);
void schedule_watch();
void handle_watch(int r);
void notify_entries_available();
void notify_complete(int r);
};
} // namespace journal

View File

@ -199,6 +199,20 @@ void Journaler::unregister_client(Context *on_finish) {
return m_metadata->unregister_client(on_finish);
}
int Journaler::get_cached_client(const std::string &client_id,
cls::journal::Client *client) {
RegisteredClients clients;
m_metadata->get_registered_clients(&clients);
auto it = clients.find({client_id, {}});
if (it == clients.end()) {
return -ENOENT;
}
*client = *it;
return 0;
}
void Journaler::allocate_tag(const bufferlist &data, cls::journal::Tag *tag,
Context *on_finish) {
m_metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, data, tag,

View File

@ -52,11 +52,16 @@ public:
RegisteredClients *clients, Context *on_finish);
int register_client(const bufferlist &data);
int unregister_client();
void register_client(const bufferlist &data, Context *on_finish);
void update_client(const bufferlist &data, Context *on_finish);
int unregister_client();
void unregister_client(Context *on_finish);
void update_client(const bufferlist &data, Context *on_finish);
int get_cached_client(const std::string &client_id,
cls::journal::Client *client);
void flush_commit_position(Context *on_safe);
void allocate_tag(const bufferlist &data, cls::journal::Tag *tag,

View File

@ -207,11 +207,10 @@ void ObjectPlayer::handle_watch_fetched(int r) {
Mutex::Locker timer_locker(m_timer_lock);
assert(m_watch_in_progress);
if (r == -ENOENT) {
schedule_watch();
} else {
on_finish = m_watch_ctx;
m_watch_ctx = NULL;
r = 0;
}
on_finish = m_watch_ctx;
m_watch_ctx = NULL;
}
if (on_finish != NULL) {

View File

@ -46,11 +46,6 @@ public:
void watch(Context *on_fetch, double interval);
void unwatch();
inline bool is_fetch_in_progress() const {
Mutex::Locker locker(m_lock);
return m_fetch_in_progress;
}
void front(Entry *entry) const;
void pop_front();
inline bool empty() const {

View File

@ -8,8 +8,8 @@
#include "librbd/ExclusiveLock.h"
#include "librbd/ImageCtx.h"
#include "librbd/journal/Replay.h"
#include "librbd/journal/Types.h"
#include "librbd/Utils.h"
#include "cls/journal/cls_journal_types.h"
#include "journal/Journaler.h"
#include "journal/ReplayEntry.h"
#include "common/errno.h"
@ -20,9 +20,128 @@
namespace librbd {
namespace {
struct C_DecodeTag : public Context {
CephContext *cct;
Mutex *lock;
uint64_t *tag_tid;
journal::TagData *tag_data;
Context *on_finish;
cls::journal::Tag tag;
C_DecodeTag(CephContext *cct, Mutex *lock, uint64_t *tag_tid,
journal::TagData *tag_data, Context *on_finish)
: cct(cct), lock(lock), tag_tid(tag_tid), tag_data(tag_data),
on_finish(on_finish) {
}
virtual void complete(int r) override {
on_finish->complete(process(r));
Context::complete(0);
}
virtual void finish(int r) override {
}
int process(int r) {
if (r < 0) {
lderr(cct) << "failed to allocate tag: " << cpp_strerror(r) << dendl;
return r;
}
Mutex::Locker locker(*lock);
*tag_tid = tag.tid;
bufferlist::iterator data_it = tag.data.begin();
r = decode(&data_it, tag_data);
if (r < 0) {
lderr(cct) << "failed to decode allocated tag" << dendl;
return r;
}
ldout(cct, 20) << "allocated journal tag: "
<< "tid=" << tag.tid << ", "
<< "data=" << *tag_data << dendl;
return 0;
}
static int decode(bufferlist::iterator *it,
journal::TagData *tag_data) {
try {
::decode(*tag_data, *it);
} catch (const buffer::error &err) {
return -EBADMSG;
}
return 0;
}
};
struct C_DecodeTags : public Context {
CephContext *cct;
Mutex *lock;
uint64_t *tag_tid;
journal::TagData *tag_data;
Context *on_finish;
::journal::Journaler::Tags tags;
C_DecodeTags(CephContext *cct, Mutex *lock, uint64_t *tag_tid,
journal::TagData *tag_data, Context *on_finish)
: cct(cct), lock(lock), tag_tid(tag_tid), tag_data(tag_data),
on_finish(on_finish) {
}
virtual void complete(int r) {
on_finish->complete(process(r));
Context::complete(0);
}
virtual void finish(int r) override {
}
int process(int r) {
if (r < 0) {
lderr(cct) << "failed to retrieve journal tags: " << cpp_strerror(r)
<< dendl;
return r;
}
if (tags.empty()) {
lderr(cct) << "no journal tags retrieved" << dendl;
return -ENOENT;
}
Mutex::Locker locker(*lock);
*tag_tid = tags.back().tid;
bufferlist::iterator data_it = tags.back().data.begin();
r = C_DecodeTag::decode(&data_it, tag_data);
if (r < 0) {
lderr(cct) << "failed to decode journal tag" << dendl;
return r;
}
ldout(cct, 20) << "most recent journal tag: "
<< "tid=" << *tag_tid << ", "
<< "data=" << *tag_data << dendl;
return 0;
}
};
} // anonymous namespace
using util::create_async_context_callback;
using util::create_context_callback;
// client id for local image
template <typename I>
const std::string Journal<I>::IMAGE_CLIENT_ID("");
// mirror uuid to use for local images
template <typename I>
const std::string Journal<I>::LOCAL_MIRROR_UUID("");
template <typename I>
std::ostream &operator<<(std::ostream &os,
const typename Journal<I>::State &state) {
@ -111,7 +230,8 @@ int Journal<I>::create(librados::IoCtx &io_ctx, const std::string &image_id,
pool_id = data_io_ctx.get_id();
}
Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age);
Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID,
cct->_conf->rbd_journal_commit_age);
int r = journaler.create(order, splay_width, pool_id);
if (r < 0) {
@ -119,16 +239,9 @@ int Journal<I>::create(librados::IoCtx &io_ctx, const std::string &image_id,
return r;
}
std::string cluster_id;
r = rados.cluster_fsid(&cluster_id);
if (r < 0) {
lderr(cct) << "failed to retrieve cluster id: " << cpp_strerror(r) << dendl;
return r;
}
// create tag class for this image's journal events
bufferlist tag_data;
::encode(journal::TagData{cluster_id, pool_id, image_id}, tag_data);
::encode(journal::TagData(), tag_data);
C_SaferCond tag_ctx;
cls::journal::Tag tag;
@ -157,7 +270,8 @@ int Journal<I>::remove(librados::IoCtx &io_ctx, const std::string &image_id) {
CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age);
Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID,
cct->_conf->rbd_journal_commit_age);
bool journal_exists;
int r = journaler.exists(&journal_exists);
@ -192,7 +306,8 @@ int Journal<I>::reset(librados::IoCtx &io_ctx, const std::string &image_id) {
CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age);
Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID,
cct->_conf->rbd_journal_commit_age);
C_SaferCond cond;
journaler.init(&cond);
@ -288,6 +403,38 @@ void Journal<I>::close(Context *on_finish) {
wait_for_steady_state(on_finish);
}
template <typename I>
bool Journal<I>::is_tag_owner() const {
return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
}
template <typename I>
void Journal<I>::allocate_tag(const std::string &mirror_uuid,
Context *on_finish) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": mirror_uuid=" << mirror_uuid
<< dendl;
Mutex::Locker locker(m_lock);
assert(m_journaler != nullptr && is_tag_owner());
// NOTE: currently responsibility of caller to provide local mirror
// uuid constant or remote peer uuid
journal::TagData tag_data;
tag_data.mirror_uuid = mirror_uuid;
// TODO: inject current commit position into tag data (need updated journaler PR)
tag_data.predecessor_mirror_uuid = m_tag_data.mirror_uuid;
bufferlist tag_bl;
::encode(tag_data, tag_bl);
C_DecodeTag *decode_tag_ctx = new C_DecodeTag(cct, &m_lock, &m_tag_tid,
&m_tag_data, on_finish);
m_journaler->allocate_tag(m_tag_class, tag_bl, &decode_tag_ctx->tag,
decode_tag_ctx);
}
template <typename I>
void Journal<I>::flush_commit_position(Context *on_finish) {
CephContext *cct = m_image_ctx.cct;
@ -319,8 +466,7 @@ uint64_t Journal<I>::append_io_event(AioCompletion *aio_comp,
tid = ++m_event_tid;
assert(tid != 0);
// TODO: use allocated tag_id
future = m_journaler->append(0, bl);
future = m_journaler->append(m_tag_tid, bl);
m_events[tid] = Event(future, aio_comp, requests, offset, length);
}
@ -405,8 +551,7 @@ void Journal<I>::append_op_event(uint64_t op_tid,
Mutex::Locker locker(m_lock);
assert(m_state == STATE_READY);
// TODO: use allocated tag_id
future = m_journaler->append(0, bl);
future = m_journaler->append(m_tag_tid, bl);
// delay committing op event to ensure consistent replay
assert(m_op_futures.count(op_tid) == 0);
@ -445,8 +590,7 @@ void Journal<I>::commit_op_event(uint64_t op_tid, int r) {
op_start_future = it->second;
m_op_futures.erase(it);
// TODO: use allocated tag_id
op_finish_future = m_journaler->append(0, bl);
op_finish_future = m_journaler->append(m_tag_tid, bl);
}
op_finish_future.flush(new C_OpEventSafe(this, op_tid, op_start_future,
@ -560,8 +704,8 @@ void Journal<I>::create_journaler() {
assert(m_journaler == NULL);
transition_state(STATE_INITIALIZING, 0);
m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id, "",
m_image_ctx.journal_commit_age);
m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id,
IMAGE_CLIENT_ID, m_image_ctx.journal_commit_age);
m_journaler->init(create_async_context_callback(
m_image_ctx, create_context_callback<
Journal<I>, &Journal<I>::handle_initialized>(this)));
@ -632,15 +776,69 @@ void Journal<I>::handle_initialized(int r) {
ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
Mutex::Locker locker(m_lock);
assert(m_state == STATE_INITIALIZING);
if (r < 0) {
lderr(cct) << this << " " << __func__
lderr(cct) << this << " " << __func__ << ": "
<< "failed to initialize journal: " << cpp_strerror(r)
<< dendl;
destroy_journaler(r);
return;
}
// locate the master image client record
cls::journal::Client client;
r = m_journaler->get_cached_client(Journal<ImageCtx>::IMAGE_CLIENT_ID,
&client);
if (r < 0) {
lderr(cct) << "failed to locate master image client" << dendl;
destroy_journaler(r);
return;
}
librbd::journal::ClientData client_data;
bufferlist::iterator bl = client.data.begin();
try {
::decode(client_data, bl);
} catch (const buffer::error &err) {
lderr(cct) << "failed to decode client meta data: " << err.what()
<< dendl;
destroy_journaler(-EINVAL);
return;
}
librbd::journal::ImageClientMeta *image_client_meta =
boost::get<librbd::journal::ImageClientMeta>(&client_data.client_meta);
if (image_client_meta == nullptr) {
lderr(cct) << "failed to extract client meta data" << dendl;
destroy_journaler(-EINVAL);
return;
}
m_tag_class = image_client_meta->tag_class;
ldout(cct, 20) << "client: " << client << ", "
<< "image meta: " << *image_client_meta << dendl;
C_DecodeTags *tags_ctx = new C_DecodeTags(
cct, &m_lock, &m_tag_tid, &m_tag_data, create_async_context_callback(
m_image_ctx, create_context_callback<
Journal<I>, &Journal<I>::handle_get_tags>(this)));
m_journaler->get_tags(m_tag_class, &tags_ctx->tags, tags_ctx);
}
template <typename I>
void Journal<I>::handle_get_tags(int r) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
Mutex::Locker locker(m_lock);
assert(m_state == STATE_INITIALIZING);
if (r < 0) {
destroy_journaler(r);
return;
}
transition_state(STATE_REPLAYING, 0);
m_journal_replay = journal::Replay<I>::create(m_image_ctx);
m_journaler->start_replay(&m_replay_handler);

View File

@ -13,6 +13,7 @@
#include "journal/Future.h"
#include "journal/ReplayEntry.h"
#include "journal/ReplayHandler.h"
#include "librbd/journal/Types.h"
#include <algorithm>
#include <iosfwd>
#include <list>
@ -32,7 +33,6 @@ class ImageCtx;
namespace journal {
class EventEntry;
template <typename> class Replay;
template <typename ImageCtxT>
@ -92,6 +92,9 @@ public:
STATE_CLOSED
};
static const std::string IMAGE_CLIENT_ID;
static const std::string LOCAL_MIRROR_UUID;
typedef std::list<AioObjectRequest *> AioObjectRequests;
Journal(ImageCtxT &image_ctx);
@ -112,6 +115,9 @@ public:
void open(Context *on_finish);
void close(Context *on_finish);
bool is_tag_owner() const;
void allocate_tag(const std::string &mirror_uuid, Context *on_finish);
void flush_commit_position(Context *on_finish);
uint64_t append_io_event(AioCompletion *aio_comp,
@ -241,6 +247,9 @@ private:
Journaler *m_journaler;
mutable Mutex m_lock;
State m_state;
uint64_t m_tag_class = 0;
uint64_t m_tag_tid = 0;
journal::TagData m_tag_data;
int m_error_result;
Contexts m_wait_for_state_contexts;
@ -268,6 +277,7 @@ private:
void complete_event(typename Events::iterator it, int r);
void handle_initialized(int r);
void handle_get_tags(int r);
void handle_replay_ready();
void handle_replay_complete(int r);

View File

@ -166,12 +166,64 @@ Context *AcquireRequest<I>::handle_open_journal(int *ret_val) {
if (*ret_val < 0) {
lderr(cct) << "failed to open journal: " << cpp_strerror(*ret_val) << dendl;
m_error_result = *ret_val;
return send_close_object_map();
send_close_journal();
return nullptr;
}
send_allocate_journal_tag();
return nullptr;
}
template <typename I>
void AcquireRequest<I>::send_allocate_journal_tag() {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 10) << __func__ << dendl;
if (!m_journal->is_tag_owner()) {
lderr(cct) << "local image not promoted" << dendl;
m_error_result = -EPERM;
send_close_journal();
return;
}
using klass = AcquireRequest<I>;
Context *ctx = create_context_callback<
klass, &klass::handle_allocate_journal_tag>(this);
m_journal->allocate_tag(Journal<I>::LOCAL_MIRROR_UUID, ctx);
}
template <typename I>
Context *AcquireRequest<I>::handle_allocate_journal_tag(int *ret_val) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 10) << __func__ << ": r=" << *ret_val << dendl;
if (*ret_val < 0) {
m_error_result = *ret_val;
send_close_journal();
return nullptr;
}
return m_on_finish;
}
template <typename I>
void AcquireRequest<I>::send_close_journal() {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 10) << __func__ << dendl;
using klass = AcquireRequest<I>;
Context *ctx = create_context_callback<klass, &klass::handle_close_journal>(
this);
m_journal->close(ctx);
}
template <typename I>
Context *AcquireRequest<I>::handle_close_journal(int *ret_val) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 10) << __func__ << ": r=" << *ret_val << dendl;
return send_close_object_map(ret_val);
}
template <typename I>
Context *AcquireRequest<I>::send_open_object_map() {
if (!m_image_ctx.test_features(RBD_FEATURE_OBJECT_MAP)) {
@ -201,9 +253,9 @@ Context *AcquireRequest<I>::handle_open_object_map(int *ret_val) {
}
template <typename I>
Context *AcquireRequest<I>::send_close_object_map() {
Context *AcquireRequest<I>::send_close_object_map(int *ret_val) {
if (m_object_map == nullptr) {
revert();
revert(ret_val);
return m_on_finish;
}
@ -224,11 +276,7 @@ Context *AcquireRequest<I>::handle_close_object_map(int *ret_val) {
// object map should never result in an error
assert(*ret_val == 0);
assert(m_error_result < 0);
*ret_val = m_error_result;
revert();
revert(ret_val);
return m_on_finish;
}
@ -439,13 +487,16 @@ void AcquireRequest<I>::apply() {
}
template <typename I>
void AcquireRequest<I>::revert() {
void AcquireRequest<I>::revert(int *ret_val) {
RWLock::WLocker snap_locker(m_image_ctx.snap_lock);
m_image_ctx.object_map = nullptr;
m_image_ctx.journal = nullptr;
delete m_object_map;
delete m_journal;
assert(m_error_result < 0);
*ret_val = m_error_result;
}
} // namespace exclusive_lock

View File

@ -39,25 +39,33 @@ private:
* v
* FLUSH_NOTIFIES
* |
* | /---------------------------------------------------------\
* | | |
* | | (no lockers) |
* | | . . . . . . . . . . . . . . . . . . . . . |
* | | . . |
* | v v (EBUSY) . |
* \--> LOCK_IMAGE * * * * * * * > GET_LOCKERS . . . . |
* . | | |
* . . . . | | |
* . v v |
* . OPEN_OBJECT_MAP GET_WATCHERS . . . |
* . | | . |
* . v v . |
* . . > OPEN_JOURNAL * * BLACKLIST . (blacklist |
* . | * | . disabled) |
* . | v v . |
* . | CLOSE_OBJECT_MAP BREAK_LOCK < . . . |
* . v | | |
* . . > <finish> <-----/ \-----------------------------/
* | /-----------------------------------------------------------\
* | | |
* | | (no lockers) |
* | | . . . . . . . . . . . . . . . . . . . . . . |
* | | . . |
* | v v (EBUSY) . |
* \--> LOCK_IMAGE * * * * * * * > GET_LOCKERS . . . . |
* . | | |
* . . . . | | |
* . v v |
* . OPEN_OBJECT_MAP GET_WATCHERS . . . |
* . | | . |
* . v v . |
* . . > OPEN_JOURNAL * * * * * * BLACKLIST . (blacklist |
* . | * | . disabled) |
* . v * v . |
* . ALLOCATE_JOURNAL_TAG * BREAK_LOCK < . . . |
* . | * * | |
* . | * * \-----------------------------/
* . | v v
* . | CLOSE_JOURNAL
* . | |
* . | v
* . | CLOSE_OBJECT_MAP
* . | |
* . v |
* . . > <finish> <----------/
*
* @endverbatim
*/
@ -94,10 +102,16 @@ private:
Context *send_open_journal();
Context *handle_open_journal(int *ret_val);
void send_allocate_journal_tag();
Context *handle_allocate_journal_tag(int *ret_val);
void send_close_journal();
Context *handle_close_journal(int *ret_val);
Context *send_open_object_map();
Context *handle_open_object_map(int *ret_val);
Context *send_close_object_map();
Context *send_close_object_map(int *ret_val);
Context *handle_close_object_map(int *ret_val);
void send_get_lockers();
@ -113,7 +127,7 @@ private:
Context *handle_break_lock(int *ret_val);
void apply();
void revert();
void revert(int *ret_val);
};
} // namespace exclusive_lock

View File

@ -2332,13 +2332,12 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
ldout(cct, 20) << __func__ << dendl;
cls::rbd::MirrorMode mirror_mode_internal;
cls::rbd::MirrorMode next_mirror_mode;
switch (mirror_mode) {
case RBD_MIRROR_MODE_DISABLED:
case RBD_MIRROR_MODE_IMAGE:
case RBD_MIRROR_MODE_POOL:
mirror_mode_internal = static_cast<cls::rbd::MirrorMode>(
mirror_mode);
next_mirror_mode = static_cast<cls::rbd::MirrorMode>(mirror_mode);
break;
default:
lderr(cct) << "Unknown mirror mode ("
@ -2346,7 +2345,28 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
return -EINVAL;
}
int r = cls_client::mirror_mode_set(&io_ctx, mirror_mode_internal);
cls::rbd::MirrorMode current_mirror_mode;
int r = cls_client::mirror_mode_get(&io_ctx, &current_mirror_mode);
if (r < 0) {
lderr(cct) << "Failed to retrieve mirror mode: " << cpp_strerror(r)
<< dendl;
return r;
}
if (current_mirror_mode == next_mirror_mode) {
return 0;
} else if (current_mirror_mode == cls::rbd::MIRROR_MODE_DISABLED) {
uuid_d uuid_gen;
uuid_gen.generate_random();
r = cls_client::mirror_uuid_set(&io_ctx, uuid_gen.to_string());
if (r < 0) {
lderr(cct) << "Failed to allocate mirroring uuid: " << cpp_strerror(r)
<< dendl;
return r;
}
}
r = cls_client::mirror_mode_set(&io_ctx, next_mirror_mode);
if (r < 0) {
lderr(cct) << "Failed to set mirror mode: " << cpp_strerror(r) << dendl;
return r;

View File

@ -333,25 +333,49 @@ void ImageClientMeta::dump(Formatter *f) const {
f->dump_unsigned("tag_class", tag_class);
}
void MirrorPeerClientMeta::encode(bufferlist& bl) const {
::encode(cluster_id, bl);
::encode(pool_id, bl);
::encode(image_id, bl);
void MirrorPeerSyncPoint::encode(bufferlist& bl) const {
::encode(snap_name, bl);
::encode(object_number, bl);
}
void MirrorPeerSyncPoint::decode(__u8 version, bufferlist::iterator& it) {
::decode(snap_name, it);
::decode(object_number, it);
}
void MirrorPeerSyncPoint::dump(Formatter *f) const {
f->dump_string("snap_name", snap_name);
if (object_number) {
f->dump_unsigned("object_number", *object_number);
}
}
void MirrorPeerClientMeta::encode(bufferlist& bl) const {
::encode(image_id, bl);
::encode(static_cast<uint32_t>(sync_points.size()), bl);
for (auto &sync_point : sync_points) {
sync_point.encode(bl);
}
}
void MirrorPeerClientMeta::decode(__u8 version, bufferlist::iterator& it) {
::decode(cluster_id, it);
::decode(pool_id, it);
::decode(image_id, it);
::decode(snap_name, it);
uint32_t sync_point_count;
::decode(sync_point_count, it);
sync_points.resize(sync_point_count);
for (auto &sync_point : sync_points) {
sync_point.decode(version, it);
}
}
void MirrorPeerClientMeta::dump(Formatter *f) const {
f->dump_string("cluster_id", cluster_id.c_str());
f->dump_int("pool_id", pool_id);
f->dump_string("image_id", image_id.c_str());
f->dump_string("snap_name", snap_name.c_str());
f->dump_string("image_id", image_id);
f->open_array_section("sync_points");
for (auto &sync_point : sync_points) {
sync_point.dump(f);
}
f->close_section();
}
void CliClientMeta::encode(bufferlist& bl) const {
@ -417,46 +441,44 @@ void ClientData::generate_test_instances(std::list<ClientData *> &o) {
o.push_back(new ClientData(ImageClientMeta()));
o.push_back(new ClientData(ImageClientMeta(123)));
o.push_back(new ClientData(MirrorPeerClientMeta()));
o.push_back(new ClientData(MirrorPeerClientMeta("cluster_id", 123, "image_id")));
o.push_back(new ClientData(MirrorPeerClientMeta("image_id", {{"snap 1", 123}})));
o.push_back(new ClientData(CliClientMeta()));
}
// Journal Tag
void TagData::encode(bufferlist& bl) const {
::encode(cluster_id, bl);
::encode(pool_id, bl);
::encode(image_id, bl);
::encode(mirror_uuid, bl);
::encode(predecessor_mirror_uuid, bl);
::encode(predecessor_commit_valid, bl);
::encode(predecessor_tag_tid, bl);
::encode(predecessor_entry_tid, bl);
}
void TagData::decode(bufferlist::iterator& it) {
::decode(cluster_id, it);
::decode(pool_id, it);
::decode(image_id, it);
::decode(mirror_uuid, it);
::decode(predecessor_mirror_uuid, it);
::decode(predecessor_commit_valid, it);
::decode(predecessor_tag_tid, it);
::decode(predecessor_entry_tid, it);
}
void TagData::dump(Formatter *f) const {
f->dump_string("cluster_id", cluster_id.c_str());
f->dump_int("pool_id", pool_id);
f->dump_string("image_id", image_id.c_str());
f->dump_string("mirror_uuid", mirror_uuid);
f->dump_string("predecessor_mirror_uuid", predecessor_mirror_uuid);
f->dump_string("predecessor_commit_valid",
predecessor_commit_valid ? "true" : "false");
f->dump_unsigned("predecessor_tag_tid", predecessor_tag_tid);
f->dump_unsigned("predecessor_entry_tid", predecessor_entry_tid);
}
void TagData::generate_test_instances(std::list<TagData *> &o) {
o.push_back(new TagData());
o.push_back(new TagData("cluster_id", 123, "image_id"));
o.push_back(new TagData("mirror-uuid"));
o.push_back(new TagData("mirror-uuid", "remote-mirror-uuid", true, 123, 234));
}
} // namespace journal
} // namespace librbd
std::ostream &operator<<(std::ostream &out,
const librbd::journal::EventType &type) {
std::ostream &operator<<(std::ostream &out, const EventType &type) {
using namespace librbd::journal;
switch (type) {
@ -506,8 +528,7 @@ std::ostream &operator<<(std::ostream &out,
return out;
}
std::ostream &operator<<(std::ostream &out,
const librbd::journal::ClientMetaType &type) {
std::ostream &operator<<(std::ostream &out, const ClientMetaType &type) {
using namespace librbd::journal;
switch (type) {
@ -525,5 +546,26 @@ std::ostream &operator<<(std::ostream &out,
break;
}
return out;
}
std::ostream &operator<<(std::ostream &out, const ImageClientMeta &meta) {
out << "[tag_class=" << meta.tag_class << "]";
return out;
}
std::ostream &operator<<(std::ostream &out, const TagData &tag_data) {
out << "["
<< "mirror_uuid=" << tag_data.mirror_uuid << ", "
<< "predecessor_mirror_uuid=" << tag_data.predecessor_mirror_uuid;
if (tag_data.predecessor_commit_valid) {
out << ", "
<< "predecessor_tag_tid=" << tag_data.predecessor_tag_tid << ", "
<< "predecessor_entry_tid=" << tag_data.predecessor_entry_tid;
}
out << "]";
return out;
}
} // namespace journal
} // namespace librbd

View File

@ -9,6 +9,9 @@
#include "include/encoding.h"
#include "include/types.h"
#include <iosfwd>
#include <list>
#include <boost/none.hpp>
#include <boost/optional.hpp>
#include <boost/variant.hpp>
namespace ceph {
@ -316,21 +319,37 @@ struct ImageClientMeta {
void dump(Formatter *f) const;
};
struct MirrorPeerSyncPoint {
typedef boost::optional<uint64_t> ObjectNumber;
std::string snap_name;
ObjectNumber object_number;
MirrorPeerSyncPoint() : object_number(boost::none) {
}
MirrorPeerSyncPoint(const std::string &snap_name,
const ObjectNumber &object_number)
: snap_name(snap_name), object_number(object_number) {
}
void encode(bufferlist& bl) const;
void decode(__u8 version, bufferlist::iterator& it);
void dump(Formatter *f) const;
};
struct MirrorPeerClientMeta {
typedef std::list<MirrorPeerSyncPoint> SyncPoints;
static const ClientMetaType TYPE = MIRROR_PEER_CLIENT_META_TYPE;
std::string cluster_id;
int64_t pool_id = 0;
std::string image_id;
std::string snap_name;
SyncPoints sync_points;
MirrorPeerClientMeta() {
}
MirrorPeerClientMeta(const std::string &cluster_id, int64_t pool_id,
const std::string &image_id,
const std::string &snap_name = "")
: cluster_id(cluster_id), pool_id(pool_id), image_id(image_id),
snap_name(snap_name) {
MirrorPeerClientMeta(const std::string &image_id,
const SyncPoints &sync_points = SyncPoints())
: image_id(image_id), sync_points(sync_points) {
}
void encode(bufferlist& bl) const;
@ -380,19 +399,27 @@ struct ClientData {
struct TagData {
// owner of the tag (exclusive lock epoch)
std::string cluster_id;
int64_t pool_id = 0;
std::string image_id;
std::string mirror_uuid; // empty if local
// mapping to last committed record of previous tag
std::string predecessor_mirror_uuid; // empty if local
bool predecessor_commit_valid = false;
uint64_t predecessor_tag_tid = 0;
uint64_t predecessor_entry_tid = 0;
TagData() {
}
TagData(const std::string &cluster_id, int64_t pool_id,
const std::string &image_id)
: cluster_id(cluster_id), pool_id(pool_id), image_id(image_id) {
TagData(const std::string &mirror_uuid) : mirror_uuid(mirror_uuid) {
}
TagData(const std::string &mirror_uuid,
const std::string &predecessor_mirror_uuid,
bool predecessor_commit_valid,
uint64_t predecessor_tag_tid, uint64_t predecessor_entry_tid)
: mirror_uuid(mirror_uuid),
predecessor_mirror_uuid(predecessor_mirror_uuid),
predecessor_commit_valid(predecessor_commit_valid),
predecessor_tag_tid(predecessor_tag_tid),
predecessor_entry_tid(predecessor_entry_tid) {
}
void encode(bufferlist& bl) const;
@ -402,14 +429,14 @@ struct TagData {
static void generate_test_instances(std::list<TagData *> &o);
};
std::ostream &operator<<(std::ostream &out, const EventType &type);
std::ostream &operator<<(std::ostream &out, const ClientMetaType &type);
std::ostream &operator<<(std::ostream &out, const ImageClientMeta &meta);
std::ostream &operator<<(std::ostream &out, const TagData &tag_data);
} // namespace journal
} // namespace librbd
std::ostream &operator<<(std::ostream &out,
const librbd::journal::EventType &type);
std::ostream &operator<<(std::ostream &out,
const librbd::journal::ClientMetaType &type);
WRITE_CLASS_ENCODER(librbd::journal::EventEntry);
WRITE_CLASS_ENCODER(librbd::journal::ClientData);
WRITE_CLASS_ENCODER(librbd::journal::TagData);

View File

@ -1297,20 +1297,31 @@ TEST_F(TestClsRbd, mirror) {
std::vector<cls::rbd::MirrorPeer> peers;
ASSERT_EQ(-ENOENT, mirror_peer_list(&ioctx, &peers));
std::string uuid;
ASSERT_EQ(-ENOENT, mirror_uuid_get(&ioctx, &uuid));
ASSERT_EQ(-EINVAL, mirror_peer_add(&ioctx, "uuid1", "cluster1", "client"));
cls::rbd::MirrorMode mirror_mode;
ASSERT_EQ(0, mirror_mode_get(&ioctx, &mirror_mode));
ASSERT_EQ(cls::rbd::MIRROR_MODE_DISABLED, mirror_mode);
ASSERT_EQ(-EINVAL, mirror_mode_set(&ioctx, cls::rbd::MIRROR_MODE_IMAGE));
ASSERT_EQ(-EINVAL, mirror_uuid_set(&ioctx, ""));
ASSERT_EQ(0, mirror_uuid_set(&ioctx, "mirror-uuid"));
ASSERT_EQ(0, mirror_uuid_get(&ioctx, &uuid));
ASSERT_EQ("mirror-uuid", uuid);
ASSERT_EQ(0, mirror_mode_set(&ioctx, cls::rbd::MIRROR_MODE_IMAGE));
ASSERT_EQ(0, mirror_mode_get(&ioctx, &mirror_mode));
ASSERT_EQ(cls::rbd::MIRROR_MODE_IMAGE, mirror_mode);
ASSERT_EQ(-EINVAL, mirror_uuid_set(&ioctx, "new-mirror-uuid"));
ASSERT_EQ(0, mirror_mode_set(&ioctx, cls::rbd::MIRROR_MODE_POOL));
ASSERT_EQ(0, mirror_mode_get(&ioctx, &mirror_mode));
ASSERT_EQ(cls::rbd::MIRROR_MODE_POOL, mirror_mode);
ASSERT_EQ(-EINVAL, mirror_peer_add(&ioctx, "mirror-uuid", "cluster1", "client"));
ASSERT_EQ(0, mirror_peer_add(&ioctx, "uuid1", "cluster1", "client"));
ASSERT_EQ(0, mirror_peer_add(&ioctx, "uuid2", "cluster2", "admin"));
ASSERT_EQ(-ESTALE, mirror_peer_add(&ioctx, "uuid2", "cluster3", "foo"));
@ -1354,6 +1365,7 @@ TEST_F(TestClsRbd, mirror) {
ASSERT_EQ(0, mirror_mode_set(&ioctx, cls::rbd::MIRROR_MODE_DISABLED));
ASSERT_EQ(0, mirror_mode_get(&ioctx, &mirror_mode));
ASSERT_EQ(cls::rbd::MIRROR_MODE_DISABLED, mirror_mode);
ASSERT_EQ(-ENOENT, mirror_uuid_get(&ioctx, &uuid));
}
TEST_F(TestClsRbd, mirror_image) {

View File

@ -249,11 +249,12 @@ TEST_F(TestJournalPlayer, PrefetchMultipleTags) {
journal::JournalPlayer::ObjectPositions positions;
positions = {
cls::journal::ObjectPosition(0, 234, 122),
cls::journal::ObjectPosition(1, 345, 1)};
cls::journal::ObjectPosition(2, 234, 122),
cls::journal::ObjectPosition(1, 234, 121),
cls::journal::ObjectPosition(0, 234, 120)};
cls::journal::ObjectSetPosition commit_position(positions);
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, create(oid, 3));
ASSERT_EQ(0, client_register(oid));
ASSERT_EQ(0, client_commit(oid, commit_position));
@ -263,13 +264,11 @@ TEST_F(TestJournalPlayer, PrefetchMultipleTags) {
journal::JournalPlayer *player = create_player(oid, metadata);
ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
ASSERT_EQ(0, write_entry(oid, 0, 345, 0));
ASSERT_EQ(0, write_entry(oid, 1, 234, 121));
ASSERT_EQ(0, write_entry(oid, 1, 345, 1));
ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
ASSERT_EQ(0, write_entry(oid, 0, 234, 124));
ASSERT_EQ(0, write_entry(oid, 0, 345, 2));
ASSERT_EQ(0, write_entry(oid, 2, 234, 122));
ASSERT_EQ(0, write_entry(oid, 0, 234, 123));
ASSERT_EQ(0, write_entry(oid, 1, 234, 124));
ASSERT_EQ(0, write_entry(oid, 0, 236, 0)); // new tag allocated
player->prefetch();
@ -280,8 +279,8 @@ TEST_F(TestJournalPlayer, PrefetchMultipleTags) {
uint64_t last_tid;
ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
ASSERT_EQ(124U, last_tid);
ASSERT_TRUE(metadata->get_last_allocated_entry_tid(345, &last_tid));
ASSERT_EQ(2U, last_tid);
ASSERT_TRUE(metadata->get_last_allocated_entry_tid(236, &last_tid));
ASSERT_EQ(0U, last_tid);
}
TEST_F(TestJournalPlayer, PrefetchCorruptSequence) {
@ -299,13 +298,41 @@ TEST_F(TestJournalPlayer, PrefetchCorruptSequence) {
journal::JournalPlayer *player = create_player(oid, metadata);
ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
ASSERT_EQ(0, write_entry(oid, 0, 345, 0));
ASSERT_EQ(0, write_entry(oid, 1, 234, 121));
ASSERT_EQ(0, write_entry(oid, 0, 234, 124));
player->prefetch();
Entries entries;
ASSERT_TRUE(wait_for_entries(player, 3, &entries));
ASSERT_TRUE(wait_for_entries(player, 2, &entries));
journal::Entry entry;
uint64_t commit_tid;
ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid));
ASSERT_TRUE(wait_for_complete(player));
ASSERT_EQ(-ENOMSG, m_replay_hander.complete_result);
}
TEST_F(TestJournalPlayer, PrefetchUnexpectedTag) {
std::string oid = get_temp_oid();
cls::journal::ObjectSetPosition commit_position;
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
ASSERT_EQ(0, client_commit(oid, commit_position));
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
ASSERT_EQ(0, write_entry(oid, 1, 235, 121));
ASSERT_EQ(0, write_entry(oid, 0, 234, 124));
player->prefetch();
Entries entries;
ASSERT_TRUE(wait_for_entries(player, 1, &entries));
journal::Entry entry;
uint64_t commit_tid;

View File

@ -267,7 +267,7 @@ TEST_F(TestObjectPlayer, Unwatch) {
bool done = false;
int rval = 0;
C_SafeCond *ctx = new C_SafeCond(&mutex, &cond, &done, &rval);
object->watch(ctx, 0.1);
object->watch(ctx, 600);
usleep(200000);
ASSERT_FALSE(done);

View File

@ -17,6 +17,7 @@
#include <list>
// template definitions
#include "librbd/Journal.cc"
#include "librbd/exclusive_lock/AcquireRequest.cc"
template class librbd::exclusive_lock::AcquireRequest<librbd::MockImageCtx>;
@ -80,6 +81,22 @@ public:
.WillOnce(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue));
}
void expect_close_journal(MockImageCtx &mock_image_ctx,
MockJournal &mock_journal) {
EXPECT_CALL(mock_journal, close(_))
.WillOnce(CompleteContext(0, mock_image_ctx.image_ctx->op_work_queue));
}
void expect_is_journal_tag_owner(MockJournal &mock_journal, bool owner) {
EXPECT_CALL(mock_journal, is_tag_owner()).WillOnce(Return(owner));
}
void expect_allocate_journal_tag(MockImageCtx &mock_image_ctx,
MockJournal &mock_journal, int r) {
EXPECT_CALL(mock_journal, allocate_tag("", _))
.WillOnce(WithArg<1>(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue)));
}
void expect_get_lock_info(MockImageCtx &mock_image_ctx, int r,
const entity_name_t &locker_entity,
const std::string &locker_address,
@ -171,6 +188,8 @@ TEST_F(TestMockExclusiveLockAcquireRequest, Success) {
expect_test_features(mock_image_ctx, RBD_FEATURE_JOURNALING, true);
expect_create_journal(mock_image_ctx, &mock_journal);
expect_open_journal(mock_image_ctx, mock_journal, 0);
expect_is_journal_tag_owner(mock_journal, true);
expect_allocate_journal_tag(mock_image_ctx, mock_journal, 0);
C_SaferCond acquire_ctx;
C_SaferCond ctx;
@ -231,6 +250,8 @@ TEST_F(TestMockExclusiveLockAcquireRequest, SuccessObjectMapDisabled) {
expect_test_features(mock_image_ctx, RBD_FEATURE_JOURNALING, true);
expect_create_journal(mock_image_ctx, &mock_journal);
expect_open_journal(mock_image_ctx, mock_journal, 0);
expect_is_journal_tag_owner(mock_journal, true);
expect_allocate_journal_tag(mock_image_ctx, mock_journal, 0);
C_SaferCond acquire_ctx;
C_SaferCond ctx;
@ -264,6 +285,7 @@ TEST_F(TestMockExclusiveLockAcquireRequest, JournalError) {
expect_test_features(mock_image_ctx, RBD_FEATURE_JOURNALING, true);
expect_create_journal(mock_image_ctx, mock_journal);
expect_open_journal(mock_image_ctx, *mock_journal, -EINVAL);
expect_close_journal(mock_image_ctx, *mock_journal);
expect_close_object_map(mock_image_ctx, *mock_object_map);
C_SaferCond acquire_ctx;
@ -275,6 +297,77 @@ TEST_F(TestMockExclusiveLockAcquireRequest, JournalError) {
ASSERT_EQ(-EINVAL, ctx.wait());
}
TEST_F(TestMockExclusiveLockAcquireRequest, NotJournalTagOwner) {
REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
librbd::ImageCtx *ictx;
ASSERT_EQ(0, open_image(m_image_name, &ictx));
MockImageCtx mock_image_ctx(*ictx);
expect_op_work_queue(mock_image_ctx);
InSequence seq;
expect_flush_notifies(mock_image_ctx);
expect_lock(mock_image_ctx, 0);
MockObjectMap *mock_object_map = new MockObjectMap();
expect_test_features(mock_image_ctx, RBD_FEATURE_OBJECT_MAP, true);
expect_create_object_map(mock_image_ctx, mock_object_map);
expect_open_object_map(mock_image_ctx, *mock_object_map);
MockJournal *mock_journal = new MockJournal();
expect_test_features(mock_image_ctx, RBD_FEATURE_JOURNALING, true);
expect_create_journal(mock_image_ctx, mock_journal);
expect_open_journal(mock_image_ctx, *mock_journal, 0);
expect_is_journal_tag_owner(*mock_journal, false);
expect_close_journal(mock_image_ctx, *mock_journal);
expect_close_object_map(mock_image_ctx, *mock_object_map);
C_SaferCond acquire_ctx;
C_SaferCond ctx;
MockAcquireRequest *req = MockAcquireRequest::create(mock_image_ctx,
TEST_COOKIE,
&acquire_ctx, &ctx);
req->send();
ASSERT_EQ(-EPERM, ctx.wait());
}
TEST_F(TestMockExclusiveLockAcquireRequest, AllocateJournalTagError) {
REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
librbd::ImageCtx *ictx;
ASSERT_EQ(0, open_image(m_image_name, &ictx));
MockImageCtx mock_image_ctx(*ictx);
expect_op_work_queue(mock_image_ctx);
InSequence seq;
expect_flush_notifies(mock_image_ctx);
expect_lock(mock_image_ctx, 0);
MockObjectMap *mock_object_map = new MockObjectMap();
expect_test_features(mock_image_ctx, RBD_FEATURE_OBJECT_MAP, true);
expect_create_object_map(mock_image_ctx, mock_object_map);
expect_open_object_map(mock_image_ctx, *mock_object_map);
MockJournal *mock_journal = new MockJournal();
expect_test_features(mock_image_ctx, RBD_FEATURE_JOURNALING, true);
expect_create_journal(mock_image_ctx, mock_journal);
expect_open_journal(mock_image_ctx, *mock_journal, 0);
expect_is_journal_tag_owner(*mock_journal, true);
expect_allocate_journal_tag(mock_image_ctx, *mock_journal, -ESTALE);
expect_close_journal(mock_image_ctx, *mock_journal);
expect_close_object_map(mock_image_ctx, *mock_object_map);
C_SaferCond acquire_ctx;
C_SaferCond ctx;
MockAcquireRequest *req = MockAcquireRequest::create(mock_image_ctx,
TEST_COOKIE,
&acquire_ctx, &ctx);
req->send();
ASSERT_EQ(-ESTALE, ctx.wait());
}
TEST_F(TestMockExclusiveLockAcquireRequest, LockBusy) {
REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);

View File

@ -44,7 +44,8 @@ public:
}
}
void get_journal_commit_position(librbd::ImageCtx *ictx, int64_t *tid)
void get_journal_commit_position(librbd::ImageCtx *ictx, int64_t *tag,
int64_t *entry)
{
const std::string client_id = "";
std::string journal_id = ictx->id;
@ -69,19 +70,16 @@ public:
break;
}
}
if (c == registered_clients.end()) {
*tid = -1;
return;
if (c == registered_clients.end() ||
c->commit_position.object_positions.empty()) {
*tag = 0;
*entry = -1;
} else {
const cls::journal::ObjectPosition &object_position =
*c->commit_position.object_positions.begin();
*tag = object_position.tag_tid;
*entry = object_position.entry_tid;
}
cls::journal::ObjectPositions object_positions =
c->commit_position.object_positions;
cls::journal::ObjectPositions::const_iterator p;
for (p = object_positions.begin(); p != object_positions.end(); p++) {
if (p->tag_tid == 0) {
break;
}
}
*tid = p == object_positions.end() ? -1 : p->entry_tid;
C_SaferCond open_cond;
ictx->journal = new librbd::Journal<>(*ictx);
@ -123,8 +121,9 @@ TEST_F(TestJournalReplay, AioDiscardEvent) {
ASSERT_EQ(0, when_acquired_lock(ictx));
// get current commit position
int64_t initial;
get_journal_commit_position(ictx, &initial);
int64_t initial_tag;
int64_t initial_entry;
get_journal_commit_position(ictx, &initial_tag, &initial_entry);
// inject a discard operation into the journal
inject_into_journal(ictx,
@ -142,9 +141,11 @@ TEST_F(TestJournalReplay, AioDiscardEvent) {
ASSERT_EQ(std::string(read_payload.size(), '\0'), read_payload);
// check the commit position is properly updated
int64_t current;
get_journal_commit_position(ictx, &current);
ASSERT_EQ(current, initial + 1);
int64_t current_tag;
int64_t current_entry;
get_journal_commit_position(ictx, &current_tag, &current_entry);
ASSERT_EQ(initial_tag + 1, current_tag);
ASSERT_EQ(0, current_entry);
// replay several envents and check the commit position
inject_into_journal(ictx,
@ -153,8 +154,9 @@ TEST_F(TestJournalReplay, AioDiscardEvent) {
librbd::journal::AioDiscardEvent(0, payload.size()));
ASSERT_EQ(0, open_image(m_image_name, &ictx));
ASSERT_EQ(0, when_acquired_lock(ictx));
get_journal_commit_position(ictx, &current);
ASSERT_EQ(current, initial + 3);
get_journal_commit_position(ictx, &current_tag, &current_entry);
ASSERT_EQ(initial_tag + 2, current_tag);
ASSERT_EQ(1, current_entry);
// verify lock ordering constraints
aio_comp = new librbd::AioCompletion();
@ -171,8 +173,9 @@ TEST_F(TestJournalReplay, AioWriteEvent) {
ASSERT_EQ(0, when_acquired_lock(ictx));
// get current commit position
int64_t initial;
get_journal_commit_position(ictx, &initial);
int64_t initial_tag;
int64_t initial_entry;
get_journal_commit_position(ictx, &initial_tag, &initial_entry);
// inject a write operation into the journal
std::string payload(4096, '1');
@ -194,9 +197,11 @@ TEST_F(TestJournalReplay, AioWriteEvent) {
ASSERT_EQ(payload, read_payload);
// check the commit position is properly updated
int64_t current;
get_journal_commit_position(ictx, &current);
ASSERT_EQ(current, initial + 1);
int64_t current_tag;
int64_t current_entry;
get_journal_commit_position(ictx, &current_tag, &current_entry);
ASSERT_EQ(initial_tag + 1, current_tag);
ASSERT_EQ(0, current_entry);
// replay several events and check the commit position
inject_into_journal(ictx,
@ -205,8 +210,9 @@ TEST_F(TestJournalReplay, AioWriteEvent) {
librbd::journal::AioWriteEvent(0, payload.size(), payload_bl));
ASSERT_EQ(0, open_image(m_image_name, &ictx));
ASSERT_EQ(0, when_acquired_lock(ictx));
get_journal_commit_position(ictx, &current);
ASSERT_EQ(current, initial + 3);
get_journal_commit_position(ictx, &current_tag, &current_entry);
ASSERT_EQ(initial_tag + 2, current_tag);
ASSERT_EQ(1, current_entry);
// verify lock ordering constraints
aio_comp = new librbd::AioCompletion();
@ -225,8 +231,9 @@ TEST_F(TestJournalReplay, AioFlushEvent) {
ASSERT_EQ(0, when_acquired_lock(ictx));
// get current commit position
int64_t initial;
get_journal_commit_position(ictx, &initial);
int64_t initial_tag;
int64_t initial_entry;
get_journal_commit_position(ictx, &initial_tag, &initial_entry);
// inject a flush operation into the journal
inject_into_journal(ictx, librbd::journal::AioFlushEvent());
@ -261,17 +268,20 @@ TEST_F(TestJournalReplay, AioFlushEvent) {
ASSERT_EQ(payload, read_payload);
// check the commit position is properly updated
int64_t current;
get_journal_commit_position(ictx, &current);
ASSERT_EQ(current, initial + 1);
int64_t current_tag;
int64_t current_entry;
get_journal_commit_position(ictx, &current_tag, &current_entry);
ASSERT_EQ(initial_tag + 1, current_tag);
ASSERT_EQ(0, current_entry);
// replay several events and check the commit position
inject_into_journal(ictx, librbd::journal::AioFlushEvent());
inject_into_journal(ictx, librbd::journal::AioFlushEvent());
ASSERT_EQ(0, open_image(m_image_name, &ictx));
ASSERT_EQ(0, when_acquired_lock(ictx));
get_journal_commit_position(ictx, &current);
ASSERT_EQ(current, initial + 3);
get_journal_commit_position(ictx, &current_tag, &current_entry);
ASSERT_EQ(initial_tag + 2, current_tag);
ASSERT_EQ(1, current_entry);
// verify lock ordering constraints
aio_comp = new librbd::AioCompletion();
@ -289,8 +299,9 @@ TEST_F(TestJournalReplay, SnapCreate) {
ASSERT_EQ(0, when_acquired_lock(ictx));
// get current commit position
int64_t initial;
get_journal_commit_position(ictx, &initial);
int64_t initial_tag;
int64_t initial_entry;
get_journal_commit_position(ictx, &initial_tag, &initial_entry);
// inject snapshot ops into journal
inject_into_journal(ictx, librbd::journal::SnapCreateEvent(1, "snap"));
@ -300,9 +311,11 @@ TEST_F(TestJournalReplay, SnapCreate) {
ASSERT_EQ(0, open_image(m_image_name, &ictx));
ASSERT_EQ(0, when_acquired_lock(ictx));
int64_t current;
get_journal_commit_position(ictx, &current);
ASSERT_EQ(current, initial + 2);
int64_t current_tag;
int64_t current_entry;
get_journal_commit_position(ictx, &current_tag, &current_entry);
ASSERT_EQ(initial_tag + 1, current_tag);
ASSERT_EQ(1, current_entry);
{
RWLock::RLocker snap_locker(ictx->snap_lock);
@ -324,8 +337,9 @@ TEST_F(TestJournalReplay, SnapProtect) {
ASSERT_EQ(0, ictx->operations->snap_create("snap"));
// get current commit position
int64_t initial;
get_journal_commit_position(ictx, &initial);
int64_t initial_tag;
int64_t initial_entry;
get_journal_commit_position(ictx, &initial_tag, &initial_entry);
// inject snapshot ops into journal
inject_into_journal(ictx, librbd::journal::SnapProtectEvent(1, "snap"));
@ -335,9 +349,11 @@ TEST_F(TestJournalReplay, SnapProtect) {
ASSERT_EQ(0, open_image(m_image_name, &ictx));
ASSERT_EQ(0, when_acquired_lock(ictx));
int64_t current;
get_journal_commit_position(ictx, &current);
ASSERT_EQ(current, initial + 2);
int64_t current_tag;
int64_t current_entry;
get_journal_commit_position(ictx, &current_tag, &current_entry);
ASSERT_EQ(initial_tag, current_tag);
ASSERT_EQ(initial_entry + 2, current_entry);
bool is_protected;
ASSERT_EQ(0, librbd::snap_is_protected(ictx, "snap", &is_protected));
@ -366,8 +382,9 @@ TEST_F(TestJournalReplay, SnapUnprotect) {
ASSERT_EQ(0, ictx->operations->snap_protect("snap"));
// get current commit position
int64_t initial;
get_journal_commit_position(ictx, &initial);
int64_t initial_tag;
int64_t initial_entry;
get_journal_commit_position(ictx, &initial_tag, &initial_entry);
// inject snapshot ops into journal
inject_into_journal(ictx, librbd::journal::SnapUnprotectEvent(1, "snap"));
@ -377,9 +394,11 @@ TEST_F(TestJournalReplay, SnapUnprotect) {
ASSERT_EQ(0, open_image(m_image_name, &ictx));
ASSERT_EQ(0, when_acquired_lock(ictx));
int64_t current;
get_journal_commit_position(ictx, &current);
ASSERT_EQ(current, initial + 2);
int64_t current_tag;
int64_t current_entry;
get_journal_commit_position(ictx, &current_tag, &current_entry);
ASSERT_EQ(initial_tag, current_tag);
ASSERT_EQ(initial_entry + 2, current_entry);
bool is_protected;
ASSERT_EQ(0, librbd::snap_is_protected(ictx, "snap", &is_protected));
@ -408,8 +427,9 @@ TEST_F(TestJournalReplay, SnapRename) {
}
// get current commit position
int64_t initial;
get_journal_commit_position(ictx, &initial);
int64_t initial_tag;
int64_t initial_entry;
get_journal_commit_position(ictx, &initial_tag, &initial_entry);
// inject snapshot ops into journal
inject_into_journal(ictx, librbd::journal::SnapRenameEvent(1, snap_id, "snap2"));
@ -419,9 +439,11 @@ TEST_F(TestJournalReplay, SnapRename) {
ASSERT_EQ(0, open_image(m_image_name, &ictx));
ASSERT_EQ(0, when_acquired_lock(ictx));
int64_t current;
get_journal_commit_position(ictx, &current);
ASSERT_EQ(current, initial + 2);
int64_t current_tag;
int64_t current_entry;
get_journal_commit_position(ictx, &current_tag, &current_entry);
ASSERT_EQ(initial_tag, current_tag);
ASSERT_EQ(initial_entry + 2, current_entry);
{
RWLock::RLocker snap_locker(ictx->snap_lock);
@ -444,8 +466,9 @@ TEST_F(TestJournalReplay, SnapRollback) {
ASSERT_EQ(0, ictx->operations->snap_create("snap"));
// get current commit position
int64_t initial;
get_journal_commit_position(ictx, &initial);
int64_t initial_tag;
int64_t initial_entry;
get_journal_commit_position(ictx, &initial_tag, &initial_entry);
// inject snapshot ops into journal
inject_into_journal(ictx, librbd::journal::SnapRollbackEvent(1, "snap"));
@ -455,9 +478,11 @@ TEST_F(TestJournalReplay, SnapRollback) {
ASSERT_EQ(0, open_image(m_image_name, &ictx));
ASSERT_EQ(0, when_acquired_lock(ictx));
int64_t current;
get_journal_commit_position(ictx, &current);
ASSERT_EQ(current, initial + 2);
int64_t current_tag;
int64_t current_entry;
get_journal_commit_position(ictx, &current_tag, &current_entry);
ASSERT_EQ(initial_tag, current_tag);
ASSERT_EQ(initial_entry + 2, current_entry);
// verify lock ordering constraints
librbd::NoOpProgressContext no_op_progress;
@ -475,8 +500,9 @@ TEST_F(TestJournalReplay, SnapRemove) {
ASSERT_EQ(0, ictx->operations->snap_create("snap"));
// get current commit position
int64_t initial;
get_journal_commit_position(ictx, &initial);
int64_t initial_tag;
int64_t initial_entry;
get_journal_commit_position(ictx, &initial_tag, &initial_entry);
// inject snapshot ops into journal
inject_into_journal(ictx, librbd::journal::SnapRemoveEvent(1, "snap"));
@ -486,9 +512,11 @@ TEST_F(TestJournalReplay, SnapRemove) {
ASSERT_EQ(0, open_image(m_image_name, &ictx));
ASSERT_EQ(0, when_acquired_lock(ictx));
int64_t current;
get_journal_commit_position(ictx, &current);
ASSERT_EQ(current, initial + 2);
int64_t current_tag;
int64_t current_entry;
get_journal_commit_position(ictx, &current_tag, &current_entry);
ASSERT_EQ(initial_tag, current_tag);
ASSERT_EQ(initial_entry + 2, current_entry);
{
RWLock::RLocker snap_locker(ictx->snap_lock);
@ -510,8 +538,9 @@ TEST_F(TestJournalReplay, Rename) {
ASSERT_EQ(0, when_acquired_lock(ictx));
// get current commit position
int64_t initial;
get_journal_commit_position(ictx, &initial);
int64_t initial_tag;
int64_t initial_entry;
get_journal_commit_position(ictx, &initial_tag, &initial_entry);
// inject snapshot ops into journal
std::string new_image_name(get_temp_image_name());
@ -522,9 +551,11 @@ TEST_F(TestJournalReplay, Rename) {
ASSERT_EQ(0, open_image(m_image_name, &ictx));
ASSERT_EQ(0, when_acquired_lock(ictx));
int64_t current;
get_journal_commit_position(ictx, &current);
ASSERT_EQ(current, initial + 2);
int64_t current_tag;
int64_t current_entry;
get_journal_commit_position(ictx, &current_tag, &current_entry);
ASSERT_EQ(initial_tag + 1, current_tag);
ASSERT_EQ(1, current_entry);
// verify lock ordering constraints
librbd::RBD rbd;
@ -540,8 +571,9 @@ TEST_F(TestJournalReplay, Resize) {
ASSERT_EQ(0, when_acquired_lock(ictx));
// get current commit position
int64_t initial;
get_journal_commit_position(ictx, &initial);
int64_t initial_tag;
int64_t initial_entry;
get_journal_commit_position(ictx, &initial_tag, &initial_entry);
// inject snapshot ops into journal
inject_into_journal(ictx, librbd::journal::ResizeEvent(1, 16));
@ -551,9 +583,11 @@ TEST_F(TestJournalReplay, Resize) {
ASSERT_EQ(0, open_image(m_image_name, &ictx));
ASSERT_EQ(0, when_acquired_lock(ictx));
int64_t current;
get_journal_commit_position(ictx, &current);
ASSERT_EQ(current, initial + 2);
int64_t current_tag;
int64_t current_entry;
get_journal_commit_position(ictx, &current_tag, &current_entry);
ASSERT_EQ(initial_tag + 1, current_tag);
ASSERT_EQ(1, current_entry);
// verify lock ordering constraints
librbd::NoOpProgressContext no_op_progress;
@ -578,8 +612,9 @@ TEST_F(TestJournalReplay, Flatten) {
ASSERT_EQ(0, when_acquired_lock(ictx2));
// get current commit position
int64_t initial;
get_journal_commit_position(ictx2, &initial);
int64_t initial_tag;
int64_t initial_entry;
get_journal_commit_position(ictx2, &initial_tag, &initial_entry);
// inject snapshot ops into journal
inject_into_journal(ictx2, librbd::journal::FlattenEvent(1));
@ -589,9 +624,11 @@ TEST_F(TestJournalReplay, Flatten) {
ASSERT_EQ(0, open_image(clone_name, &ictx2));
ASSERT_EQ(0, when_acquired_lock(ictx2));
int64_t current;
get_journal_commit_position(ictx2, &current);
ASSERT_EQ(current, initial + 2);
int64_t current_tag;
int64_t current_entry;
get_journal_commit_position(ictx2, &current_tag, &current_entry);
ASSERT_EQ(initial_tag + 1, current_tag);
ASSERT_EQ(1, current_entry);
ASSERT_EQ(0, ictx->operations->snap_unprotect("snap"));
// verify lock ordering constraints
@ -607,8 +644,9 @@ TEST_F(TestJournalReplay, ObjectPosition) {
ASSERT_EQ(0, when_acquired_lock(ictx));
// get current commit position
int64_t initial;
get_journal_commit_position(ictx, &initial);
int64_t initial_tag;
int64_t initial_entry;
get_journal_commit_position(ictx, &initial_tag, &initial_entry);
std::string payload(4096, '1');
librbd::AioCompletion *aio_comp = new librbd::AioCompletion();
@ -623,9 +661,11 @@ TEST_F(TestJournalReplay, ObjectPosition) {
aio_comp->release();
// check the commit position updated
int64_t current;
get_journal_commit_position(ictx, &current);
ASSERT_EQ(current, initial + 2);
int64_t current_tag;
int64_t current_entry;
get_journal_commit_position(ictx, &current_tag, &current_entry);
ASSERT_EQ(initial_tag + 1, current_tag);
ASSERT_EQ(1, current_entry);
// write again
@ -641,6 +681,7 @@ TEST_F(TestJournalReplay, ObjectPosition) {
aio_comp->release();
// check the commit position updated
get_journal_commit_position(ictx, &current);
ASSERT_EQ(current, initial + 4);
get_journal_commit_position(ictx, &current_tag, &current_entry);
ASSERT_EQ(initial_tag + 1, current_tag);
ASSERT_EQ(3, current_entry);
}

View File

@ -16,6 +16,9 @@ struct MockJournal {
MOCK_METHOD1(wait_for_journal_ready, void(Context *));
MOCK_CONST_METHOD0(is_tag_owner, bool());
MOCK_METHOD2(allocate_tag, void(const std::string &, Context *));
MOCK_METHOD1(open, void(Context *));
MOCK_METHOD1(close, void(Context *));

View File

@ -7,6 +7,7 @@
#include "common/Cond.h"
#include "common/Mutex.h"
#include "cls/journal/cls_journal_types.h"
#include "journal/Journaler.h"
#include "librbd/Journal.h"
#include "librbd/Utils.h"
#include "librbd/journal/Replay.h"
@ -87,6 +88,9 @@ struct MockJournaler {
MOCK_METHOD1(init, void(Context*));
MOCK_METHOD1(flush_commit_position, void(Context*));
MOCK_METHOD2(get_cached_client, int(const std::string&, cls::journal::Client*));
MOCK_METHOD3(get_tags, void(uint64_t, journal::Journaler::Tags*, Context*));
MOCK_METHOD1(start_replay, void(::journal::ReplayHandler *replay_handler));
MOCK_METHOD1(try_pop_front, bool(MockReplayEntryProxy *replay_entry));
MOCK_METHOD0(stop_replay, void());
@ -134,6 +138,16 @@ struct MockJournalerProxy {
MockJournaler::get_instance().init(on_finish);
}
int get_cached_client(const std::string& client_id,
cls::journal::Client* client) {
return MockJournaler::get_instance().get_cached_client(client_id, client);
}
void get_tags(uint64_t tag_class, journal::Journaler::Tags *tags,
Context *on_finish) {
MockJournaler::get_instance().get_tags(tag_class, tags, on_finish);
}
void flush_commit_position(Context *on_finish) {
MockJournaler::get_instance().flush_commit_position(on_finish);
}
@ -308,7 +322,36 @@ public:
void expect_init_journaler(::journal::MockJournaler &mock_journaler, int r) {
EXPECT_CALL(mock_journaler, init(_))
.WillOnce(CompleteContext(r, NULL));
}
void expect_get_journaler_cached_client(::journal::MockJournaler &mock_journaler, int r) {
journal::ImageClientMeta image_client_meta;
image_client_meta.tag_class = 0;
journal::ClientData client_data;
client_data.client_meta = image_client_meta;
cls::journal::Client client;
::encode(client_data, client.data);
EXPECT_CALL(mock_journaler, get_cached_client("", _))
.WillOnce(DoAll(SetArgPointee<1>(client),
Return(r)));
}
void expect_get_journaler_tags(MockImageCtx &mock_image_ctx,
::journal::MockJournaler &mock_journaler,
int r) {
journal::TagData tag_data;
bufferlist tag_data_bl;
::encode(tag_data, tag_data_bl);
::journal::Journaler::Tags tags = {{0, 0, {}}, {1, 0, tag_data_bl}};
EXPECT_CALL(mock_journaler, get_tags(0, _, _))
.WillOnce(DoAll(SetArgPointee<1>(tags),
WithArg<2>(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue))));
}
void expect_start_replay(MockJournalImageCtx &mock_image_ctx,
@ -442,6 +485,8 @@ public:
InSequence seq;
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
expect_start_replay(
mock_image_ctx, mock_journaler, {
std::bind(&invoke_replay_complete, _1, 0)
@ -485,6 +530,8 @@ TEST_F(TestMockJournal, StateTransitions) {
::journal::MockJournaler mock_journaler;
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
expect_start_replay(
mock_image_ctx, mock_journaler, {
std::bind(&invoke_replay_ready, _1),
@ -533,6 +580,45 @@ TEST_F(TestMockJournal, InitError) {
ASSERT_EQ(-EINVAL, when_open(mock_journal));
}
TEST_F(TestMockJournal, GetCachedClientError) {
REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
librbd::ImageCtx *ictx;
ASSERT_EQ(0, open_image(m_image_name, &ictx));
MockJournalImageCtx mock_image_ctx(*ictx);
MockJournal mock_journal(mock_image_ctx);
expect_op_work_queue(mock_image_ctx);
InSequence seq;
::journal::MockJournaler mock_journaler;
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
expect_get_journaler_cached_client(mock_journaler, -ENOENT);
ASSERT_EQ(-ENOENT, when_open(mock_journal));
}
TEST_F(TestMockJournal, GetTagsError) {
REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
librbd::ImageCtx *ictx;
ASSERT_EQ(0, open_image(m_image_name, &ictx));
MockJournalImageCtx mock_image_ctx(*ictx);
MockJournal mock_journal(mock_image_ctx);
expect_op_work_queue(mock_image_ctx);
InSequence seq;
::journal::MockJournaler mock_journaler;
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, -EBADMSG);
ASSERT_EQ(-EBADMSG, when_open(mock_journal));
}
TEST_F(TestMockJournal, ReplayCompleteError) {
REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
@ -548,6 +634,8 @@ TEST_F(TestMockJournal, ReplayCompleteError) {
::journal::MockJournaler mock_journaler;
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
expect_start_replay(
mock_image_ctx, mock_journaler, {
std::bind(&invoke_replay_complete, _1, -EINVAL)
@ -560,6 +648,8 @@ TEST_F(TestMockJournal, ReplayCompleteError) {
// replay failure should result in replay-restart
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
expect_start_replay(
mock_image_ctx, mock_journaler, {
std::bind(&invoke_replay_complete, _1, 0)
@ -589,6 +679,8 @@ TEST_F(TestMockJournal, FlushReplayError) {
::journal::MockJournaler mock_journaler;
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
expect_start_replay(
mock_image_ctx, mock_journaler, {
std::bind(&invoke_replay_ready, _1),
@ -606,6 +698,8 @@ TEST_F(TestMockJournal, FlushReplayError) {
// replay flush failure should result in replay-restart
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
expect_start_replay(
mock_image_ctx, mock_journaler, {
std::bind(&invoke_replay_complete, _1, 0)
@ -635,6 +729,8 @@ TEST_F(TestMockJournal, StopError) {
::journal::MockJournaler mock_journaler;
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
expect_start_replay(
mock_image_ctx, mock_journaler, {
std::bind(&invoke_replay_complete, _1, 0)
@ -664,6 +760,8 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) {
::journal::MockJournaler mock_journaler;
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
::journal::ReplayHandler *replay_handler = nullptr;
expect_start_replay(
@ -688,6 +786,8 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) {
// replay write-to-disk failure should result in replay-restart
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
expect_start_replay(
mock_image_ctx, mock_journaler, {
std::bind(&invoke_replay_complete, _1, 0)
@ -738,6 +838,8 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) {
::journal::MockJournaler mock_journaler;
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
expect_start_replay(
mock_image_ctx, mock_journaler, {
std::bind(&invoke_replay_ready, _1),
@ -759,6 +861,8 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) {
// replay write-to-disk failure should result in replay-restart
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
expect_start_replay(
mock_image_ctx, mock_journaler, {
std::bind(&invoke_replay_complete, _1, 0)

View File

@ -104,6 +104,7 @@ int main(int argc, const char **argv)
rbd::mirror::ImageReplayer::BootstrapParams bootstap_params(local_pool_name,
image_name);
int64_t local_pool_id;
int64_t remote_pool_id;
std::string remote_image_id;
@ -141,6 +142,14 @@ int main(int argc, const char **argv)
goto cleanup;
}
r = local->pool_lookup(local_pool_name.c_str());
if (r < 0) {
derr << "error finding local pool " << local_pool_name
<< ": " << cpp_strerror(r) << dendl;
goto cleanup;
}
local_pool_id = r;
r = remote->init_with_context(g_ceph_context);
if (r < 0) {
derr << "could not initialize rados handle" << dendl;
@ -171,7 +180,8 @@ int main(int argc, const char **argv)
dout(5) << "starting replay" << dendl;
replayer = new rbd::mirror::ImageReplayer(local, remote, client_id,
remote_pool_id, remote_image_id);
local_pool_id, remote_pool_id,
remote_image_id);
r = replayer->start(&bootstap_params);
if (r < 0) {

View File

@ -101,7 +101,7 @@ public:
m_replayer = new rbd::mirror::ImageReplayer(
rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
rbd::mirror::RadosRef(new librados::Rados(m_remote_ioctx)),
m_client_id, remote_pool_id, m_remote_image_id);
m_client_id, m_local_ioctx.get_id(), remote_pool_id, m_remote_image_id);
bootstrap();
}

View File

@ -160,13 +160,14 @@ private:
ImageReplayer::ImageReplayer(RadosRef local, RadosRef remote,
const std::string &client_id,
int64_t local_pool_id,
int64_t remote_pool_id,
const std::string &remote_image_id) :
m_local(local),
m_remote(remote),
m_client_id(client_id),
m_remote_pool_id(remote_pool_id),
m_local_pool_id(-1),
m_local_pool_id(local_pool_id),
m_remote_image_id(remote_image_id),
m_lock("rbd::mirror::ImageReplayer " + stringify(remote_pool_id) + " " +
remote_image_id),
@ -521,9 +522,13 @@ int ImageReplayer::get_registered_client_status(bool *registered)
}
librbd::journal::MirrorPeerClientMeta &cm =
boost::get<librbd::journal::MirrorPeerClientMeta>(client_data.client_meta);
m_local_pool_id = cm.pool_id;
m_local_image_id = cm.image_id;
m_snap_name = cm.snap_name;
// TODO: snap name should be transient
if (cm.sync_points.empty()) {
return -ENOENT;
}
m_snap_name = cm.sync_points.front().snap_name;
dout(20) << "client found, pool_id=" << m_local_pool_id << ", image_id="
<< m_local_image_id << ", snap_name=" << m_snap_name << dendl;
@ -539,27 +544,17 @@ int ImageReplayer::get_registered_client_status(bool *registered)
int ImageReplayer::register_client()
{
int r;
std::string local_cluster_id;
r = m_local->cluster_fsid(&local_cluster_id);
if (r < 0) {
derr << "error retrieving local cluster id: " << cpp_strerror(r)
<< dendl;
return r;
}
// TODO allocate snap as part of sync process
std::string m_snap_name = ".rbd-mirror." + m_client_id;
dout(20) << "m_cluster_id=" << local_cluster_id << ", pool_id="
<< m_local_pool_id << ", image_id=" << m_local_image_id
<< ", snap_name=" << m_snap_name << dendl;
dout(20) << "mirror_uuid=" << m_client_id << ", "
<< "image_id=" << m_local_image_id << ", "
<< "snap_name=" << m_snap_name << dendl;
bufferlist client_data;
::encode(librbd::journal::ClientData{librbd::journal::MirrorPeerClientMeta{
local_cluster_id, m_local_pool_id, m_local_image_id, m_snap_name}},
client_data);
r = m_remote_journaler->register_client(client_data);
m_local_image_id, {{m_snap_name, boost::none}}}}, client_data);
int r = m_remote_journaler->register_client(client_data);
if (r < 0) {
derr << "error registering client: " << cpp_strerror(r) << dendl;
return r;

View File

@ -65,7 +65,8 @@ public:
public:
ImageReplayer(RadosRef local, RadosRef remote, const std::string &client_id,
int64_t remote_pool_id, const std::string &remote_image_id);
int64_t local_pool_id, int64_t remote_pool_id,
const std::string &remote_image_id);
virtual ~ImageReplayer();
ImageReplayer(const ImageReplayer&) = delete;
ImageReplayer& operator=(const ImageReplayer&) = delete;

View File

@ -118,6 +118,24 @@ void Replayer::set_sources(const map<int64_t, set<string> > &images)
for (const auto &kv : images) {
int64_t pool_id = kv.first;
// TODO: clean up once remote peer -> image replayer refactored
librados::IoCtx remote_ioctx;
int r = m_remote->ioctx_create2(pool_id, remote_ioctx);
if (r < 0) {
derr << "failed to lookup remote pool " << pool_id << ": "
<< cpp_strerror(r) << dendl;
continue;
}
librados::IoCtx local_ioctx;
r = m_local->ioctx_create(remote_ioctx.get_pool_name().c_str(), local_ioctx);
if (r < 0) {
derr << "failed to lookup local pool " << remote_ioctx.get_pool_name()
<< ": " << cpp_strerror(r) << dendl;
continue;
}
// create entry for pool if it doesn't exist
auto &pool_replayers = m_images[pool_id];
for (const auto &image_id : kv.second) {
@ -125,6 +143,7 @@ void Replayer::set_sources(const map<int64_t, set<string> > &images)
unique_ptr<ImageReplayer> image_replayer(new ImageReplayer(m_local,
m_remote,
m_client_id,
local_ioctx.get_id(),
pool_id,
image_id));
int r = image_replayer->start();