diff --git a/src/cls/rbd/cls_rbd.cc b/src/cls/rbd/cls_rbd.cc index ee54c3b7969..af1e740d915 100644 --- a/src/cls/rbd/cls_rbd.cc +++ b/src/cls/rbd/cls_rbd.cc @@ -111,6 +111,8 @@ cls_method_handle_t h_old_snapshots_list; cls_method_handle_t h_old_snapshot_add; cls_method_handle_t h_old_snapshot_remove; cls_method_handle_t h_old_snapshot_rename; +cls_method_handle_t h_mirror_uuid_get; +cls_method_handle_t h_mirror_uuid_set; cls_method_handle_t h_mirror_mode_get; cls_method_handle_t h_mirror_mode_set; cls_method_handle_t h_mirror_peer_list; @@ -204,6 +206,15 @@ static int read_key(cls_method_context_t hctx, const string &key, T *out) return 0; } +static int remove_key(cls_method_context_t hctx, const string &key) { + int r = cls_cxx_map_remove_key(hctx, key); + if (r < 0 && r != -ENOENT) { + CLS_ERR("failed to remove key: %s", key.c_str()); + return r; + } + return 0; +} + static bool is_valid_id(const string &id) { if (!id.size()) return false; @@ -2939,6 +2950,7 @@ int old_snapshot_rename(cls_method_context_t hctx, bufferlist *in, bufferlist *o namespace mirror { +static const std::string UUID("mirror_uuid"); static const std::string MODE("mirror_mode"); static const std::string PEER_KEY_PREFIX("mirror_peer_"); static const std::string IMAGE_KEY_PREFIX("image_"); @@ -2951,6 +2963,20 @@ std::string image_key(const string &image_id) { return IMAGE_KEY_PREFIX + image_id; } +int uuid_get(cls_method_context_t hctx, std::string *mirror_uuid) { + bufferlist mirror_uuid_bl; + int r = cls_cxx_map_get_val(hctx, mirror::UUID, &mirror_uuid_bl); + if (r < 0) { + if (r != -ENOENT) { + CLS_ERR("error reading mirror uuid: %s", cpp_strerror(r).c_str()); + } + return r; + } + + *mirror_uuid = std::string(mirror_uuid_bl.c_str(), mirror_uuid_bl.length()); + return 0; +} + int read_peers(cls_method_context_t hctx, std::vector *peers) { std::string last_read = PEER_KEY_PREFIX; @@ -3115,6 +3141,67 @@ int image_remove(cls_method_context_t hctx, const string &image_id) { } // namespace mirror +/** + * Input: + * none + * + * Output: + * @param uuid (std::string) + * @returns 0 on success, negative error code on failure + */ +int mirror_uuid_get(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + std::string mirror_uuid; + int r = mirror::uuid_get(hctx, &mirror_uuid); + if (r < 0) { + return r; + } + + ::encode(mirror_uuid, *out); + return 0; +} + +/** + * Input: + * @param mirror_uuid (std::string) + * + * Output: + * @returns 0 on success, negative error code on failure + */ +int mirror_uuid_set(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + std::string mirror_uuid; + try { + bufferlist::iterator bl_it = in->begin(); + ::decode(mirror_uuid, bl_it); + } catch (const buffer::error &err) { + return -EINVAL; + } + + if (mirror_uuid.empty()) { + CLS_ERR("cannot set empty mirror uuid"); + return -EINVAL; + } + + uint32_t mirror_mode; + int r = read_key(hctx, mirror::MODE, &mirror_mode); + if (r < 0 && r != -ENOENT) { + return r; + } else if (r == 0 && mirror_mode != cls::rbd::MIRROR_MODE_DISABLED) { + CLS_ERR("cannot set mirror uuid while mirroring enabled"); + return -EINVAL; + } + + bufferlist mirror_uuid_bl; + mirror_uuid_bl.append(mirror_uuid); + r = cls_cxx_map_set_val(hctx, mirror::UUID, &mirror_uuid_bl); + if (r < 0) { + CLS_ERR("failed to set mirror uuid"); + return r; + } + return 0; +} + /** * Input: * none @@ -3168,6 +3255,14 @@ int mirror_mode_set(cls_method_context_t hctx, bufferlist *in, int r; if (enabled) { + std::string mirror_uuid; + r = mirror::uuid_get(hctx, &mirror_uuid); + if (r == -ENOENT) { + return -EINVAL; + } else if (r < 0) { + return r; + } + bufferlist bl; ::encode(mirror_mode_decode, bl); @@ -3188,9 +3283,13 @@ int mirror_mode_set(cls_method_context_t hctx, bufferlist *in, return -EBUSY; } - r = cls_cxx_map_remove_key(hctx, mirror::MODE); - if (r < 0 && r != -ENOENT) { - CLS_ERR("error disabling mirroring: %s", cpp_strerror(r).c_str()); + r = remove_key(hctx, mirror::MODE); + if (r < 0) { + return r; + } + + r = remove_key(hctx, mirror::UUID); + if (r < 0) { return r; } } @@ -3242,6 +3341,17 @@ int mirror_peer_add(cls_method_context_t hctx, bufferlist *in, mirror_mode_decode == cls::rbd::MIRROR_MODE_DISABLED) { CLS_ERR("mirroring must be enabled on the pool"); return -EINVAL; + } else if (!mirror_peer.is_valid()) { + CLS_ERR("mirror peer is not valid"); + return -EINVAL; + } + + std::string mirror_uuid; + r = mirror::uuid_get(hctx, &mirror_uuid); + if (mirror_peer.uuid == mirror_uuid) { + CLS_ERR("peer uuid '%s' matches pool mirroring uuid", + mirror_uuid.c_str()); + return -EINVAL; } std::vector peers; @@ -3626,6 +3736,11 @@ void __cls_init() old_snapshot_rename, &h_old_snapshot_rename); /* methods for the rbd_mirroring object */ + cls_register_cxx_method(h_class, "mirror_uuid_get", CLS_METHOD_RD, + mirror_uuid_get, &h_mirror_uuid_get); + cls_register_cxx_method(h_class, "mirror_uuid_set", + CLS_METHOD_RD | CLS_METHOD_WR, + mirror_uuid_set, &h_mirror_uuid_set); cls_register_cxx_method(h_class, "mirror_mode_get", CLS_METHOD_RD, mirror_mode_get, &h_mirror_mode_get); cls_register_cxx_method(h_class, "mirror_mode_set", diff --git a/src/cls/rbd/cls_rbd_client.cc b/src/cls/rbd/cls_rbd_client.cc index d88e4e4628f..30b872b4d8c 100644 --- a/src/cls/rbd/cls_rbd_client.cc +++ b/src/cls/rbd/cls_rbd_client.cc @@ -980,6 +980,37 @@ namespace librbd { return 0; } + int mirror_uuid_get(librados::IoCtx *ioctx, std::string *uuid) { + bufferlist in_bl; + bufferlist out_bl; + int r = ioctx->exec(RBD_MIRRORING, "rbd", "mirror_uuid_get", in_bl, + out_bl); + if (r < 0) { + return r; + } + + try { + bufferlist::iterator bl_it = out_bl.begin(); + ::decode(*uuid, bl_it); + } catch (const buffer::error &err) { + return -EBADMSG; + } + return 0; + } + + int mirror_uuid_set(librados::IoCtx *ioctx, const std::string &uuid) { + bufferlist in_bl; + ::encode(uuid, in_bl); + + bufferlist out_bl; + int r = ioctx->exec(RBD_MIRRORING, "rbd", "mirror_uuid_set", in_bl, + out_bl); + if (r < 0) { + return r; + } + return 0; + } + int mirror_mode_get(librados::IoCtx *ioctx, cls::rbd::MirrorMode *mirror_mode) { bufferlist in_bl; diff --git a/src/cls/rbd/cls_rbd_client.h b/src/cls/rbd/cls_rbd_client.h index c502c82bd60..3248f78998c 100644 --- a/src/cls/rbd/cls_rbd_client.h +++ b/src/cls/rbd/cls_rbd_client.h @@ -207,6 +207,8 @@ namespace librbd { ::SnapContext *snapc); // operations on the rbd_mirroring object + int mirror_uuid_get(librados::IoCtx *ioctx, std::string *uuid); + int mirror_uuid_set(librados::IoCtx *ioctx, const std::string &uuid); int mirror_mode_get(librados::IoCtx *ioctx, cls::rbd::MirrorMode *mirror_mode); int mirror_mode_set(librados::IoCtx *ioctx, diff --git a/src/cls/rbd/cls_rbd_types.h b/src/cls/rbd/cls_rbd_types.h index b0d5c459c9c..e3189146bd9 100644 --- a/src/cls/rbd/cls_rbd_types.h +++ b/src/cls/rbd/cls_rbd_types.h @@ -35,6 +35,10 @@ struct MirrorPeer { std::string client_name; int64_t pool_id = -1; + inline bool is_valid() const { + return (!uuid.empty() && !cluster_name.empty() && !client_name.empty()); + } + void encode(bufferlist &bl) const; void decode(bufferlist::iterator &it); void dump(Formatter *f) const; diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc index 5652ba6f5ce..4d81122c1c4 100644 --- a/src/journal/JournalMetadata.cc +++ b/src/journal/JournalMetadata.cc @@ -591,7 +591,9 @@ void JournalMetadata::schedule_commit_task() { void JournalMetadata::handle_commit_position_task() { assert(m_timer_lock.is_locked()); assert(m_lock.is_locked()); - ldout(m_cct, 20) << __func__ << dendl; + ldout(m_cct, 20) << __func__ << ": " + << "client_id=" << m_client_id << ", " + << "commit_position=" << m_commit_position << dendl; librados::ObjectWriteOperation op; client::client_commit(&op, m_client_id, m_commit_position); diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc index 0478cec91be..b05fead55a8 100644 --- a/src/journal/JournalPlayer.cc +++ b/src/journal/JournalPlayer.cc @@ -68,7 +68,9 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx, // start replay after the last committed entry's object uint8_t splay_width = m_journal_metadata->get_splay_width(); - m_commit_object = commit_position.object_positions.front().object_number; + auto &active_position = commit_position.object_positions.front(); + m_active_tag_tid = active_position.tag_tid; + m_commit_object = active_position.object_number; m_splay_offset = m_commit_object % splay_width; for (auto &position : commit_position.object_positions) { uint8_t splay_offset = position.object_number % splay_width; @@ -79,6 +81,10 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx, JournalPlayer::~JournalPlayer() { m_async_op_tracker.wait_for_ops(); + { + Mutex::Locker locker(m_lock); + assert(m_fetch_object_numbers.empty()); + } m_replay_handler->put(); } @@ -134,13 +140,13 @@ void JournalPlayer::prefetch_and_watch(double interval) { } void JournalPlayer::unwatch() { + ldout(m_cct, 20) << __func__ << dendl; Mutex::Locker locker(m_lock); m_watch_enabled = false; if (m_watch_scheduled) { - ObjectPlayerPtr object_player = get_object_player(); - assert(object_player); - - object_player->unwatch(); + for (auto &players : m_object_players) { + players.second.begin()->second->unwatch(); + } m_watch_scheduled = false; } } @@ -148,56 +154,51 @@ void JournalPlayer::unwatch() { bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) { ldout(m_cct, 20) << __func__ << dendl; Mutex::Locker locker(m_lock); + + m_handler_notified = false; if (m_state != STATE_PLAYBACK) { return false; } - ObjectPlayerPtr object_player = get_object_player(); - assert(object_player); + if (!is_object_set_ready()) { + return false; + } - if (object_player->empty()) { - if (m_watch_enabled && !m_watch_scheduled) { - object_player->watch( - new C_Watch(this, object_player->get_object_number()), - m_watch_interval); - m_watch_scheduled = true; - } else if (!m_watch_enabled && !object_player->is_fetch_in_progress()) { - ldout(m_cct, 10) << __func__ << ": replay complete" << dendl; - m_journal_metadata->get_finisher().queue(new C_HandleComplete( - m_replay_handler), 0); + if (!verify_playback_ready()) { + if (!m_watch_enabled) { + notify_complete(0); + } else if (!m_watch_scheduled) { + schedule_watch(); } return false; } + ObjectPlayerPtr object_player = get_object_player(); + assert(object_player && !object_player->empty()); + object_player->front(entry); object_player->pop_front(); uint64_t last_entry_tid; - if (m_journal_metadata->get_last_allocated_entry_tid( - entry->get_tag_tid(), &last_entry_tid) && - entry->get_entry_tid() != last_entry_tid + 1) { + if (m_active_tag_tid && *m_active_tag_tid != entry->get_tag_tid()) { + lderr(m_cct) << "unexpected tag in journal entry: " << *entry << dendl; + + m_state = STATE_ERROR; + notify_complete(-ENOMSG); + return false; + } else if (m_journal_metadata->get_last_allocated_entry_tid( + entry->get_tag_tid(), &last_entry_tid) && + entry->get_entry_tid() != last_entry_tid + 1) { lderr(m_cct) << "missing prior journal entry: " << *entry << dendl; m_state = STATE_ERROR; - m_journal_metadata->get_finisher().queue(new C_HandleComplete( - m_replay_handler), -ENOMSG); + notify_complete(-ENOMSG); return false; } - // skip to next splay offset if we cannot apply the next entry in-sequence - if (!object_player->empty()) { - Entry peek_entry; - object_player->front(&peek_entry); - if (peek_entry.get_tag_tid() == entry->get_tag_tid() || - (m_journal_metadata->get_last_allocated_entry_tid( - peek_entry.get_tag_tid(), &last_entry_tid) && - last_entry_tid + 1 != peek_entry.get_entry_tid())) { - advance_splay_object(); - } - } else { - advance_splay_object(); - remove_empty_object_player(object_player); - } + m_active_tag_tid = entry->get_tag_tid(); + advance_splay_object(); + remove_empty_object_player(object_player); m_journal_metadata->reserve_entry_tid(entry->get_tag_tid(), entry->get_entry_tid()); @@ -210,8 +211,9 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) { void JournalPlayer::process_state(uint64_t object_number, int r) { ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << ", " << "r=" << r << dendl; + + assert(m_lock.is_locked()); if (r >= 0) { - Mutex::Locker locker(m_lock); switch (m_state) { case STATE_PREFETCH: ldout(m_cct, 10) << "PREFETCH" << dendl; @@ -232,11 +234,8 @@ void JournalPlayer::process_state(uint64_t object_number, int r) { } if (r < 0) { - { - Mutex::Locker locker(m_lock); - m_state = STATE_ERROR; - } - m_replay_handler->handle_complete(r); + m_state = STATE_ERROR; + notify_complete(r); } } @@ -258,7 +257,8 @@ int JournalPlayer::process_prefetch(uint64_t object_number) { ObjectPlayers &object_players = m_object_players[splay_offset]; // prefetch in-order since a newer splay object could prefetch first - while (!object_players.begin()->second->is_fetch_in_progress()) { + while (m_fetch_object_numbers.count( + object_players.begin()->second->get_object_number()) == 0) { ObjectPlayerPtr object_player = object_players.begin()->second; uint64_t player_object_number = object_player->get_object_number(); @@ -328,21 +328,16 @@ int JournalPlayer::process_prefetch(uint64_t object_number) { } m_state = STATE_PLAYBACK; - ObjectPlayerPtr object_player = get_object_player(); - if (!object_player->empty()) { - ldout(m_cct, 10) << __func__ << ": entries available" << dendl; - m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable( - m_replay_handler), 0); + if (!is_object_set_ready()) { + ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl; + } else if (verify_playback_ready()) { + notify_entries_available(); } else if (m_watch_enabled) { - object_player->watch( - new C_Watch(this, object_player->get_object_number()), - m_watch_interval); - m_watch_scheduled = true; + schedule_watch(); } else { ldout(m_cct, 10) << __func__ << ": no uncommitted entries available" << dendl; - m_journal_metadata->get_finisher().queue(new C_HandleComplete( - m_replay_handler), 0); + notify_complete(0); } return 0; } @@ -351,26 +346,74 @@ int JournalPlayer::process_playback(uint64_t object_number) { ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl; assert(m_lock.is_locked()); - m_watch_scheduled = false; + if (!is_object_set_ready()) { + return 0; + } ObjectPlayerPtr object_player = get_object_player(); - if (object_player->get_object_number() == object_number) { + if (verify_playback_ready()) { + notify_entries_available(); + } else if (!m_watch_enabled && is_object_set_ready()) { uint8_t splay_width = m_journal_metadata->get_splay_width(); uint64_t active_set = m_journal_metadata->get_active_set(); uint64_t object_set = object_player->get_object_number() / splay_width; - if (!object_player->empty()) { - ldout(m_cct, 10) << __func__ << ": entries available" << dendl; - m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable( - m_replay_handler), 0); - } else if (object_set == active_set) { - ldout(m_cct, 10) << __func__ << ": replay complete" << dendl; - m_journal_metadata->get_finisher().queue(new C_HandleComplete( - m_replay_handler), 0); + if (object_set == active_set) { + notify_complete(0); } } return 0; } +bool JournalPlayer::is_object_set_ready() const { + assert(m_lock.is_locked()); + if (m_watch_scheduled || !m_fetch_object_numbers.empty()) { + return false; + } + return true; +} + +bool JournalPlayer::verify_playback_ready() { + assert(m_lock.is_locked()); + assert(is_object_set_ready()); + + ObjectPlayerPtr object_player = get_object_player(); + assert(object_player); + + // Verify is the active object player has another entry available + // in the sequence + Entry entry; + bool entry_available = false; + if (!object_player->empty()) { + entry_available = true; + object_player->front(&entry); + if (!m_active_tag_tid || entry.get_tag_tid() == *m_active_tag_tid) { + return true; + } + } + + // NOTE: replay currently does not check tag class to playback multiple tags + // from different classes (issue #14909). When a new tag is discovered, it + // is assumed that the previous tag was closed at the last replayable entry. + object_player = m_object_players.begin()->second.begin()->second; + if (!object_player->empty() && m_active_tag_tid) { + object_player->front(&entry); + if (entry.get_tag_tid() > *m_active_tag_tid && + entry.get_entry_tid() == 0) { + uint8_t splay_width = m_journal_metadata->get_splay_width(); + m_active_tag_tid = entry.get_tag_tid(); + m_splay_offset = object_player->get_object_number() / splay_width; + + ldout(m_cct, 20) << __func__ << ": new tag " << entry.get_tag_tid() << " " + << "detected, adjusting offset to " + << static_cast(m_splay_offset) << dendl; + return true; + } + } + + // if any entry is available, we can test if the sequence is corrupt + return entry_available; +} + const JournalPlayer::ObjectPlayers &JournalPlayer::get_object_players() const { assert(m_lock.is_locked()); @@ -405,6 +448,7 @@ void JournalPlayer::advance_splay_object() { bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) { assert(m_lock.is_locked()); + assert(!m_watch_scheduled); uint8_t splay_width = m_journal_metadata->get_splay_width(); uint64_t object_set = player->get_object_number() / splay_width; @@ -432,6 +476,9 @@ void JournalPlayer::fetch(uint64_t object_num) { std::string oid = utils::get_object_name(m_object_oid_prefix, object_num); + assert(m_fetch_object_numbers.count(object_num) == 0); + m_fetch_object_numbers.insert(object_num); + ldout(m_cct, 10) << __func__ << ": " << oid << dendl; C_Fetch *fetch_ctx = new C_Fetch(this, object_num); ObjectPlayerPtr object_player(new ObjectPlayer( @@ -447,11 +494,15 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) { ldout(m_cct, 10) << __func__ << ": " << utils::get_object_name(m_object_oid_prefix, object_num) << ": r=" << r << dendl; + + Mutex::Locker locker(m_lock); + assert(m_fetch_object_numbers.count(object_num) == 1); + m_fetch_object_numbers.erase(object_num); + if (r == -ENOENT) { r = 0; } if (r == 0) { - Mutex::Locker locker(m_lock); uint8_t splay_width = m_journal_metadata->get_splay_width(); uint8_t splay_offset = object_num % splay_width; assert(m_object_players.count(splay_offset) == 1); @@ -461,15 +512,67 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) { ObjectPlayerPtr object_player = object_players[object_num]; remove_empty_object_player(object_player); } - process_state(object_num, r); } -void JournalPlayer::handle_watch(uint64_t object_num, int r) { - ldout(m_cct, 10) << __func__ << ": " - << utils::get_object_name(m_object_oid_prefix, object_num) - << ": r=" << r << dendl; - process_state(object_num, r); +void JournalPlayer::schedule_watch() { + ldout(m_cct, 10) << __func__ << dendl; + assert(m_lock.is_locked()); + if (m_watch_scheduled) { + return; + } + + // poll first splay offset and active splay offset since + // new records should only appear in those two objects + C_Watch *ctx = new C_Watch(this); + ObjectPlayerPtr object_player = get_object_player(); + object_player->watch(ctx, m_watch_interval); + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + if (object_player->get_object_number() % splay_width != 0) { + ++ctx->pending_fetches; + + object_player = m_object_players.begin()->second.begin()->second; + object_player->watch(ctx, m_watch_interval); + } + m_watch_scheduled = true; +} + +void JournalPlayer::handle_watch(int r) { + ldout(m_cct, 10) << __func__ << ": r=" << r << dendl; + + Mutex::Locker locker(m_lock); + m_watch_scheduled = false; + std::set object_numbers; + for (auto &players : m_object_players) { + object_numbers.insert( + players.second.begin()->second->get_object_number()); + } + + for (auto object_num : object_numbers) { + process_state(object_num, r); + } +} + +void JournalPlayer::notify_entries_available() { + assert(m_lock.is_locked()); + if (m_handler_notified) { + return; + } + m_handler_notified = true; + + ldout(m_cct, 10) << __func__ << ": entries available" << dendl; + m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable( + m_replay_handler), 0); +} + +void JournalPlayer::notify_complete(int r) { + assert(m_lock.is_locked()); + m_handler_notified = true; + + ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl; + m_journal_metadata->get_finisher().queue(new C_HandleComplete( + m_replay_handler), r); } } // namespace journal diff --git a/src/journal/JournalPlayer.h b/src/journal/JournalPlayer.h index a07728ec0a2..902a1959d95 100644 --- a/src/journal/JournalPlayer.h +++ b/src/journal/JournalPlayer.h @@ -12,6 +12,8 @@ #include "journal/JournalMetadata.h" #include "journal/ObjectPlayer.h" #include "cls/journal/cls_journal_types.h" +#include +#include #include class SafeTimer; @@ -43,6 +45,7 @@ private: typedef std::map ObjectPlayers; typedef std::map SplayedObjectPlayers; typedef std::map SplayedObjectPositions; + typedef std::set ObjectNumbers; enum State { STATE_INIT, @@ -51,17 +54,6 @@ private: STATE_ERROR }; - struct C_Watch : public Context { - JournalPlayer *player; - uint64_t object_num; - - C_Watch(JournalPlayer *p, uint64_t o) : player(p), object_num(o) { - } - virtual void finish(int r) { - player->handle_watch(object_num, r); - } - }; - struct C_Fetch : public Context { JournalPlayer *player; uint64_t object_num; @@ -76,6 +68,34 @@ private: } }; + struct C_Watch : public Context { + JournalPlayer *player; + uint8_t pending_fetches = 1; + int ret_val = 0; + + C_Watch(JournalPlayer *player) : player(player) { + } + + virtual void complete(int r) override { + player->m_lock.Lock(); + if (ret_val == 0 && r < 0) { + ret_val = r; + } + + assert(pending_fetches > 0); + if (--pending_fetches == 0) { + player->m_lock.Unlock(); + Context::complete(ret_val); + } else { + player->m_lock.Unlock(); + } + } + + virtual void finish(int r) override { + player->handle_watch(r); + } + }; + librados::IoCtx m_ioctx; CephContext *m_cct; std::string m_object_oid_prefix; @@ -93,13 +113,20 @@ private: bool m_watch_scheduled; double m_watch_interval; + bool m_handler_notified = false; + + ObjectNumbers m_fetch_object_numbers; + PrefetchSplayOffsets m_prefetch_splay_offsets; SplayedObjectPlayers m_object_players; uint64_t m_commit_object; SplayedObjectPositions m_commit_positions; + boost::optional m_active_tag_tid = boost::none; void advance_splay_object(); + bool is_object_set_ready() const; + bool verify_playback_ready(); const ObjectPlayers &get_object_players() const; ObjectPlayerPtr get_object_player() const; ObjectPlayerPtr get_next_set_object_player() const; @@ -111,7 +138,12 @@ private: void fetch(uint64_t object_num); void handle_fetched(uint64_t object_num, int r); - void handle_watch(uint64_t object_num, int r); + + void schedule_watch(); + void handle_watch(int r); + + void notify_entries_available(); + void notify_complete(int r); }; } // namespace journal diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc index a6220ef1012..f81e6f61162 100644 --- a/src/journal/Journaler.cc +++ b/src/journal/Journaler.cc @@ -199,6 +199,20 @@ void Journaler::unregister_client(Context *on_finish) { return m_metadata->unregister_client(on_finish); } +int Journaler::get_cached_client(const std::string &client_id, + cls::journal::Client *client) { + RegisteredClients clients; + m_metadata->get_registered_clients(&clients); + + auto it = clients.find({client_id, {}}); + if (it == clients.end()) { + return -ENOENT; + } + + *client = *it; + return 0; +} + void Journaler::allocate_tag(const bufferlist &data, cls::journal::Tag *tag, Context *on_finish) { m_metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, data, tag, diff --git a/src/journal/Journaler.h b/src/journal/Journaler.h index 2e9ba85bb20..2ce8acd7692 100644 --- a/src/journal/Journaler.h +++ b/src/journal/Journaler.h @@ -52,11 +52,16 @@ public: RegisteredClients *clients, Context *on_finish); int register_client(const bufferlist &data); - int unregister_client(); void register_client(const bufferlist &data, Context *on_finish); - void update_client(const bufferlist &data, Context *on_finish); + + int unregister_client(); void unregister_client(Context *on_finish); + void update_client(const bufferlist &data, Context *on_finish); + + int get_cached_client(const std::string &client_id, + cls::journal::Client *client); + void flush_commit_position(Context *on_safe); void allocate_tag(const bufferlist &data, cls::journal::Tag *tag, diff --git a/src/journal/ObjectPlayer.cc b/src/journal/ObjectPlayer.cc index 2aca0d73b47..e890dfa2d90 100644 --- a/src/journal/ObjectPlayer.cc +++ b/src/journal/ObjectPlayer.cc @@ -207,11 +207,10 @@ void ObjectPlayer::handle_watch_fetched(int r) { Mutex::Locker timer_locker(m_timer_lock); assert(m_watch_in_progress); if (r == -ENOENT) { - schedule_watch(); - } else { - on_finish = m_watch_ctx; - m_watch_ctx = NULL; + r = 0; } + on_finish = m_watch_ctx; + m_watch_ctx = NULL; } if (on_finish != NULL) { diff --git a/src/journal/ObjectPlayer.h b/src/journal/ObjectPlayer.h index c63e2f7a252..f68ee3741d8 100644 --- a/src/journal/ObjectPlayer.h +++ b/src/journal/ObjectPlayer.h @@ -46,11 +46,6 @@ public: void watch(Context *on_fetch, double interval); void unwatch(); - inline bool is_fetch_in_progress() const { - Mutex::Locker locker(m_lock); - return m_fetch_in_progress; - } - void front(Entry *entry) const; void pop_front(); inline bool empty() const { diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index c07ba02b9f7..871abcf0a24 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -8,8 +8,8 @@ #include "librbd/ExclusiveLock.h" #include "librbd/ImageCtx.h" #include "librbd/journal/Replay.h" -#include "librbd/journal/Types.h" #include "librbd/Utils.h" +#include "cls/journal/cls_journal_types.h" #include "journal/Journaler.h" #include "journal/ReplayEntry.h" #include "common/errno.h" @@ -20,9 +20,128 @@ namespace librbd { +namespace { + +struct C_DecodeTag : public Context { + CephContext *cct; + Mutex *lock; + uint64_t *tag_tid; + journal::TagData *tag_data; + Context *on_finish; + + cls::journal::Tag tag; + + C_DecodeTag(CephContext *cct, Mutex *lock, uint64_t *tag_tid, + journal::TagData *tag_data, Context *on_finish) + : cct(cct), lock(lock), tag_tid(tag_tid), tag_data(tag_data), + on_finish(on_finish) { + } + + virtual void complete(int r) override { + on_finish->complete(process(r)); + Context::complete(0); + } + virtual void finish(int r) override { + } + + int process(int r) { + if (r < 0) { + lderr(cct) << "failed to allocate tag: " << cpp_strerror(r) << dendl; + return r; + } + + Mutex::Locker locker(*lock); + *tag_tid = tag.tid; + + bufferlist::iterator data_it = tag.data.begin(); + r = decode(&data_it, tag_data); + if (r < 0) { + lderr(cct) << "failed to decode allocated tag" << dendl; + return r; + } + + ldout(cct, 20) << "allocated journal tag: " + << "tid=" << tag.tid << ", " + << "data=" << *tag_data << dendl; + return 0; + } + + static int decode(bufferlist::iterator *it, + journal::TagData *tag_data) { + try { + ::decode(*tag_data, *it); + } catch (const buffer::error &err) { + return -EBADMSG; + } + return 0; + } + +}; + +struct C_DecodeTags : public Context { + CephContext *cct; + Mutex *lock; + uint64_t *tag_tid; + journal::TagData *tag_data; + Context *on_finish; + + ::journal::Journaler::Tags tags; + + C_DecodeTags(CephContext *cct, Mutex *lock, uint64_t *tag_tid, + journal::TagData *tag_data, Context *on_finish) + : cct(cct), lock(lock), tag_tid(tag_tid), tag_data(tag_data), + on_finish(on_finish) { + } + + virtual void complete(int r) { + on_finish->complete(process(r)); + Context::complete(0); + } + virtual void finish(int r) override { + } + + int process(int r) { + if (r < 0) { + lderr(cct) << "failed to retrieve journal tags: " << cpp_strerror(r) + << dendl; + return r; + } + + if (tags.empty()) { + lderr(cct) << "no journal tags retrieved" << dendl; + return -ENOENT; + } + + Mutex::Locker locker(*lock); + *tag_tid = tags.back().tid; + + bufferlist::iterator data_it = tags.back().data.begin(); + r = C_DecodeTag::decode(&data_it, tag_data); + if (r < 0) { + lderr(cct) << "failed to decode journal tag" << dendl; + return r; + } + + ldout(cct, 20) << "most recent journal tag: " + << "tid=" << *tag_tid << ", " + << "data=" << *tag_data << dendl; + return 0; + } +}; + +} // anonymous namespace + using util::create_async_context_callback; using util::create_context_callback; +// client id for local image +template +const std::string Journal::IMAGE_CLIENT_ID(""); + +// mirror uuid to use for local images +template +const std::string Journal::LOCAL_MIRROR_UUID(""); + template std::ostream &operator<<(std::ostream &os, const typename Journal::State &state) { @@ -111,7 +230,8 @@ int Journal::create(librados::IoCtx &io_ctx, const std::string &image_id, pool_id = data_io_ctx.get_id(); } - Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age); + Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID, + cct->_conf->rbd_journal_commit_age); int r = journaler.create(order, splay_width, pool_id); if (r < 0) { @@ -119,16 +239,9 @@ int Journal::create(librados::IoCtx &io_ctx, const std::string &image_id, return r; } - std::string cluster_id; - r = rados.cluster_fsid(&cluster_id); - if (r < 0) { - lderr(cct) << "failed to retrieve cluster id: " << cpp_strerror(r) << dendl; - return r; - } - // create tag class for this image's journal events bufferlist tag_data; - ::encode(journal::TagData{cluster_id, pool_id, image_id}, tag_data); + ::encode(journal::TagData(), tag_data); C_SaferCond tag_ctx; cls::journal::Tag tag; @@ -157,7 +270,8 @@ int Journal::remove(librados::IoCtx &io_ctx, const std::string &image_id) { CephContext *cct = reinterpret_cast(io_ctx.cct()); ldout(cct, 5) << __func__ << ": image=" << image_id << dendl; - Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age); + Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID, + cct->_conf->rbd_journal_commit_age); bool journal_exists; int r = journaler.exists(&journal_exists); @@ -192,7 +306,8 @@ int Journal::reset(librados::IoCtx &io_ctx, const std::string &image_id) { CephContext *cct = reinterpret_cast(io_ctx.cct()); ldout(cct, 5) << __func__ << ": image=" << image_id << dendl; - Journaler journaler(io_ctx, image_id, "", cct->_conf->rbd_journal_commit_age); + Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID, + cct->_conf->rbd_journal_commit_age); C_SaferCond cond; journaler.init(&cond); @@ -288,6 +403,38 @@ void Journal::close(Context *on_finish) { wait_for_steady_state(on_finish); } +template +bool Journal::is_tag_owner() const { + return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID); +} + +template +void Journal::allocate_tag(const std::string &mirror_uuid, + Context *on_finish) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": mirror_uuid=" << mirror_uuid + << dendl; + + Mutex::Locker locker(m_lock); + assert(m_journaler != nullptr && is_tag_owner()); + + // NOTE: currently responsibility of caller to provide local mirror + // uuid constant or remote peer uuid + journal::TagData tag_data; + tag_data.mirror_uuid = mirror_uuid; + + // TODO: inject current commit position into tag data (need updated journaler PR) + tag_data.predecessor_mirror_uuid = m_tag_data.mirror_uuid; + + bufferlist tag_bl; + ::encode(tag_data, tag_bl); + + C_DecodeTag *decode_tag_ctx = new C_DecodeTag(cct, &m_lock, &m_tag_tid, + &m_tag_data, on_finish); + m_journaler->allocate_tag(m_tag_class, tag_bl, &decode_tag_ctx->tag, + decode_tag_ctx); +} + template void Journal::flush_commit_position(Context *on_finish) { CephContext *cct = m_image_ctx.cct; @@ -319,8 +466,7 @@ uint64_t Journal::append_io_event(AioCompletion *aio_comp, tid = ++m_event_tid; assert(tid != 0); - // TODO: use allocated tag_id - future = m_journaler->append(0, bl); + future = m_journaler->append(m_tag_tid, bl); m_events[tid] = Event(future, aio_comp, requests, offset, length); } @@ -405,8 +551,7 @@ void Journal::append_op_event(uint64_t op_tid, Mutex::Locker locker(m_lock); assert(m_state == STATE_READY); - // TODO: use allocated tag_id - future = m_journaler->append(0, bl); + future = m_journaler->append(m_tag_tid, bl); // delay committing op event to ensure consistent replay assert(m_op_futures.count(op_tid) == 0); @@ -445,8 +590,7 @@ void Journal::commit_op_event(uint64_t op_tid, int r) { op_start_future = it->second; m_op_futures.erase(it); - // TODO: use allocated tag_id - op_finish_future = m_journaler->append(0, bl); + op_finish_future = m_journaler->append(m_tag_tid, bl); } op_finish_future.flush(new C_OpEventSafe(this, op_tid, op_start_future, @@ -560,8 +704,8 @@ void Journal::create_journaler() { assert(m_journaler == NULL); transition_state(STATE_INITIALIZING, 0); - m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id, "", - m_image_ctx.journal_commit_age); + m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id, + IMAGE_CLIENT_ID, m_image_ctx.journal_commit_age); m_journaler->init(create_async_context_callback( m_image_ctx, create_context_callback< Journal, &Journal::handle_initialized>(this))); @@ -632,15 +776,69 @@ void Journal::handle_initialized(int r) { ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl; Mutex::Locker locker(m_lock); + assert(m_state == STATE_INITIALIZING); if (r < 0) { - lderr(cct) << this << " " << __func__ + lderr(cct) << this << " " << __func__ << ": " << "failed to initialize journal: " << cpp_strerror(r) << dendl; destroy_journaler(r); return; } + // locate the master image client record + cls::journal::Client client; + r = m_journaler->get_cached_client(Journal::IMAGE_CLIENT_ID, + &client); + if (r < 0) { + lderr(cct) << "failed to locate master image client" << dendl; + destroy_journaler(r); + return; + } + + librbd::journal::ClientData client_data; + bufferlist::iterator bl = client.data.begin(); + try { + ::decode(client_data, bl); + } catch (const buffer::error &err) { + lderr(cct) << "failed to decode client meta data: " << err.what() + << dendl; + destroy_journaler(-EINVAL); + return; + } + + librbd::journal::ImageClientMeta *image_client_meta = + boost::get(&client_data.client_meta); + if (image_client_meta == nullptr) { + lderr(cct) << "failed to extract client meta data" << dendl; + destroy_journaler(-EINVAL); + return; + } + + m_tag_class = image_client_meta->tag_class; + ldout(cct, 20) << "client: " << client << ", " + << "image meta: " << *image_client_meta << dendl; + + C_DecodeTags *tags_ctx = new C_DecodeTags( + cct, &m_lock, &m_tag_tid, &m_tag_data, create_async_context_callback( + m_image_ctx, create_context_callback< + Journal, &Journal::handle_get_tags>(this))); + m_journaler->get_tags(m_tag_class, &tags_ctx->tags, tags_ctx); +} + +template +void Journal::handle_get_tags(int r) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl; + + Mutex::Locker locker(m_lock); + assert(m_state == STATE_INITIALIZING); + + if (r < 0) { + destroy_journaler(r); + return; + } + transition_state(STATE_REPLAYING, 0); m_journal_replay = journal::Replay::create(m_image_ctx); m_journaler->start_replay(&m_replay_handler); diff --git a/src/librbd/Journal.h b/src/librbd/Journal.h index a9f376763f0..127569ea740 100644 --- a/src/librbd/Journal.h +++ b/src/librbd/Journal.h @@ -13,6 +13,7 @@ #include "journal/Future.h" #include "journal/ReplayEntry.h" #include "journal/ReplayHandler.h" +#include "librbd/journal/Types.h" #include #include #include @@ -32,7 +33,6 @@ class ImageCtx; namespace journal { -class EventEntry; template class Replay; template @@ -92,6 +92,9 @@ public: STATE_CLOSED }; + static const std::string IMAGE_CLIENT_ID; + static const std::string LOCAL_MIRROR_UUID; + typedef std::list AioObjectRequests; Journal(ImageCtxT &image_ctx); @@ -112,6 +115,9 @@ public: void open(Context *on_finish); void close(Context *on_finish); + bool is_tag_owner() const; + void allocate_tag(const std::string &mirror_uuid, Context *on_finish); + void flush_commit_position(Context *on_finish); uint64_t append_io_event(AioCompletion *aio_comp, @@ -241,6 +247,9 @@ private: Journaler *m_journaler; mutable Mutex m_lock; State m_state; + uint64_t m_tag_class = 0; + uint64_t m_tag_tid = 0; + journal::TagData m_tag_data; int m_error_result; Contexts m_wait_for_state_contexts; @@ -268,6 +277,7 @@ private: void complete_event(typename Events::iterator it, int r); void handle_initialized(int r); + void handle_get_tags(int r); void handle_replay_ready(); void handle_replay_complete(int r); diff --git a/src/librbd/exclusive_lock/AcquireRequest.cc b/src/librbd/exclusive_lock/AcquireRequest.cc index e46714778a3..d973bf21fc6 100644 --- a/src/librbd/exclusive_lock/AcquireRequest.cc +++ b/src/librbd/exclusive_lock/AcquireRequest.cc @@ -166,12 +166,64 @@ Context *AcquireRequest::handle_open_journal(int *ret_val) { if (*ret_val < 0) { lderr(cct) << "failed to open journal: " << cpp_strerror(*ret_val) << dendl; m_error_result = *ret_val; - return send_close_object_map(); + send_close_journal(); + return nullptr; } + send_allocate_journal_tag(); + return nullptr; +} + +template +void AcquireRequest::send_allocate_journal_tag() { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 10) << __func__ << dendl; + + if (!m_journal->is_tag_owner()) { + lderr(cct) << "local image not promoted" << dendl; + m_error_result = -EPERM; + send_close_journal(); + return; + } + + using klass = AcquireRequest; + Context *ctx = create_context_callback< + klass, &klass::handle_allocate_journal_tag>(this); + m_journal->allocate_tag(Journal::LOCAL_MIRROR_UUID, ctx); +} + +template +Context *AcquireRequest::handle_allocate_journal_tag(int *ret_val) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 10) << __func__ << ": r=" << *ret_val << dendl; + + if (*ret_val < 0) { + m_error_result = *ret_val; + send_close_journal(); + return nullptr; + } return m_on_finish; } +template +void AcquireRequest::send_close_journal() { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 10) << __func__ << dendl; + + using klass = AcquireRequest; + Context *ctx = create_context_callback( + this); + m_journal->close(ctx); +} + +template +Context *AcquireRequest::handle_close_journal(int *ret_val) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 10) << __func__ << ": r=" << *ret_val << dendl; + + return send_close_object_map(ret_val); +} + template Context *AcquireRequest::send_open_object_map() { if (!m_image_ctx.test_features(RBD_FEATURE_OBJECT_MAP)) { @@ -201,9 +253,9 @@ Context *AcquireRequest::handle_open_object_map(int *ret_val) { } template -Context *AcquireRequest::send_close_object_map() { +Context *AcquireRequest::send_close_object_map(int *ret_val) { if (m_object_map == nullptr) { - revert(); + revert(ret_val); return m_on_finish; } @@ -224,11 +276,7 @@ Context *AcquireRequest::handle_close_object_map(int *ret_val) { // object map should never result in an error assert(*ret_val == 0); - - assert(m_error_result < 0); - *ret_val = m_error_result; - - revert(); + revert(ret_val); return m_on_finish; } @@ -439,13 +487,16 @@ void AcquireRequest::apply() { } template -void AcquireRequest::revert() { +void AcquireRequest::revert(int *ret_val) { RWLock::WLocker snap_locker(m_image_ctx.snap_lock); m_image_ctx.object_map = nullptr; m_image_ctx.journal = nullptr; delete m_object_map; delete m_journal; + + assert(m_error_result < 0); + *ret_val = m_error_result; } } // namespace exclusive_lock diff --git a/src/librbd/exclusive_lock/AcquireRequest.h b/src/librbd/exclusive_lock/AcquireRequest.h index dc03829af4f..2c4e05b86ce 100644 --- a/src/librbd/exclusive_lock/AcquireRequest.h +++ b/src/librbd/exclusive_lock/AcquireRequest.h @@ -39,25 +39,33 @@ private: * v * FLUSH_NOTIFIES * | - * | /---------------------------------------------------------\ - * | | | - * | | (no lockers) | - * | | . . . . . . . . . . . . . . . . . . . . . | - * | | . . | - * | v v (EBUSY) . | - * \--> LOCK_IMAGE * * * * * * * > GET_LOCKERS . . . . | - * . | | | - * . . . . | | | - * . v v | - * . OPEN_OBJECT_MAP GET_WATCHERS . . . | - * . | | . | - * . v v . | - * . . > OPEN_JOURNAL * * BLACKLIST . (blacklist | - * . | * | . disabled) | - * . | v v . | - * . | CLOSE_OBJECT_MAP BREAK_LOCK < . . . | - * . v | | | - * . . > <-----/ \-----------------------------/ + * | /-----------------------------------------------------------\ + * | | | + * | | (no lockers) | + * | | . . . . . . . . . . . . . . . . . . . . . . | + * | | . . | + * | v v (EBUSY) . | + * \--> LOCK_IMAGE * * * * * * * > GET_LOCKERS . . . . | + * . | | | + * . . . . | | | + * . v v | + * . OPEN_OBJECT_MAP GET_WATCHERS . . . | + * . | | . | + * . v v . | + * . . > OPEN_JOURNAL * * * * * * BLACKLIST . (blacklist | + * . | * | . disabled) | + * . v * v . | + * . ALLOCATE_JOURNAL_TAG * BREAK_LOCK < . . . | + * . | * * | | + * . | * * \-----------------------------/ + * . | v v + * . | CLOSE_JOURNAL + * . | | + * . | v + * . | CLOSE_OBJECT_MAP + * . | | + * . v | + * . . > <----------/ * * @endverbatim */ @@ -94,10 +102,16 @@ private: Context *send_open_journal(); Context *handle_open_journal(int *ret_val); + void send_allocate_journal_tag(); + Context *handle_allocate_journal_tag(int *ret_val); + + void send_close_journal(); + Context *handle_close_journal(int *ret_val); + Context *send_open_object_map(); Context *handle_open_object_map(int *ret_val); - Context *send_close_object_map(); + Context *send_close_object_map(int *ret_val); Context *handle_close_object_map(int *ret_val); void send_get_lockers(); @@ -113,7 +127,7 @@ private: Context *handle_break_lock(int *ret_val); void apply(); - void revert(); + void revert(int *ret_val); }; } // namespace exclusive_lock diff --git a/src/librbd/internal.cc b/src/librbd/internal.cc index 06d84833c26..1a2e26b6bc1 100644 --- a/src/librbd/internal.cc +++ b/src/librbd/internal.cc @@ -2332,13 +2332,12 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) { CephContext *cct = reinterpret_cast(io_ctx.cct()); ldout(cct, 20) << __func__ << dendl; - cls::rbd::MirrorMode mirror_mode_internal; + cls::rbd::MirrorMode next_mirror_mode; switch (mirror_mode) { case RBD_MIRROR_MODE_DISABLED: case RBD_MIRROR_MODE_IMAGE: case RBD_MIRROR_MODE_POOL: - mirror_mode_internal = static_cast( - mirror_mode); + next_mirror_mode = static_cast(mirror_mode); break; default: lderr(cct) << "Unknown mirror mode (" @@ -2346,7 +2345,28 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) { return -EINVAL; } - int r = cls_client::mirror_mode_set(&io_ctx, mirror_mode_internal); + cls::rbd::MirrorMode current_mirror_mode; + int r = cls_client::mirror_mode_get(&io_ctx, ¤t_mirror_mode); + if (r < 0) { + lderr(cct) << "Failed to retrieve mirror mode: " << cpp_strerror(r) + << dendl; + return r; + } + + if (current_mirror_mode == next_mirror_mode) { + return 0; + } else if (current_mirror_mode == cls::rbd::MIRROR_MODE_DISABLED) { + uuid_d uuid_gen; + uuid_gen.generate_random(); + r = cls_client::mirror_uuid_set(&io_ctx, uuid_gen.to_string()); + if (r < 0) { + lderr(cct) << "Failed to allocate mirroring uuid: " << cpp_strerror(r) + << dendl; + return r; + } + } + + r = cls_client::mirror_mode_set(&io_ctx, next_mirror_mode); if (r < 0) { lderr(cct) << "Failed to set mirror mode: " << cpp_strerror(r) << dendl; return r; diff --git a/src/librbd/journal/Types.cc b/src/librbd/journal/Types.cc index f082e0268c8..8da835eaf5f 100644 --- a/src/librbd/journal/Types.cc +++ b/src/librbd/journal/Types.cc @@ -333,25 +333,49 @@ void ImageClientMeta::dump(Formatter *f) const { f->dump_unsigned("tag_class", tag_class); } -void MirrorPeerClientMeta::encode(bufferlist& bl) const { - ::encode(cluster_id, bl); - ::encode(pool_id, bl); - ::encode(image_id, bl); +void MirrorPeerSyncPoint::encode(bufferlist& bl) const { ::encode(snap_name, bl); + ::encode(object_number, bl); +} + +void MirrorPeerSyncPoint::decode(__u8 version, bufferlist::iterator& it) { + ::decode(snap_name, it); + ::decode(object_number, it); +} + +void MirrorPeerSyncPoint::dump(Formatter *f) const { + f->dump_string("snap_name", snap_name); + if (object_number) { + f->dump_unsigned("object_number", *object_number); + } +} + +void MirrorPeerClientMeta::encode(bufferlist& bl) const { + ::encode(image_id, bl); + ::encode(static_cast(sync_points.size()), bl); + for (auto &sync_point : sync_points) { + sync_point.encode(bl); + } } void MirrorPeerClientMeta::decode(__u8 version, bufferlist::iterator& it) { - ::decode(cluster_id, it); - ::decode(pool_id, it); ::decode(image_id, it); - ::decode(snap_name, it); + + uint32_t sync_point_count; + ::decode(sync_point_count, it); + sync_points.resize(sync_point_count); + for (auto &sync_point : sync_points) { + sync_point.decode(version, it); + } } void MirrorPeerClientMeta::dump(Formatter *f) const { - f->dump_string("cluster_id", cluster_id.c_str()); - f->dump_int("pool_id", pool_id); - f->dump_string("image_id", image_id.c_str()); - f->dump_string("snap_name", snap_name.c_str()); + f->dump_string("image_id", image_id); + f->open_array_section("sync_points"); + for (auto &sync_point : sync_points) { + sync_point.dump(f); + } + f->close_section(); } void CliClientMeta::encode(bufferlist& bl) const { @@ -417,46 +441,44 @@ void ClientData::generate_test_instances(std::list &o) { o.push_back(new ClientData(ImageClientMeta())); o.push_back(new ClientData(ImageClientMeta(123))); o.push_back(new ClientData(MirrorPeerClientMeta())); - o.push_back(new ClientData(MirrorPeerClientMeta("cluster_id", 123, "image_id"))); + o.push_back(new ClientData(MirrorPeerClientMeta("image_id", {{"snap 1", 123}}))); o.push_back(new ClientData(CliClientMeta())); } // Journal Tag void TagData::encode(bufferlist& bl) const { - ::encode(cluster_id, bl); - ::encode(pool_id, bl); - ::encode(image_id, bl); + ::encode(mirror_uuid, bl); + ::encode(predecessor_mirror_uuid, bl); + ::encode(predecessor_commit_valid, bl); ::encode(predecessor_tag_tid, bl); ::encode(predecessor_entry_tid, bl); } void TagData::decode(bufferlist::iterator& it) { - ::decode(cluster_id, it); - ::decode(pool_id, it); - ::decode(image_id, it); + ::decode(mirror_uuid, it); + ::decode(predecessor_mirror_uuid, it); + ::decode(predecessor_commit_valid, it); ::decode(predecessor_tag_tid, it); ::decode(predecessor_entry_tid, it); } void TagData::dump(Formatter *f) const { - f->dump_string("cluster_id", cluster_id.c_str()); - f->dump_int("pool_id", pool_id); - f->dump_string("image_id", image_id.c_str()); + f->dump_string("mirror_uuid", mirror_uuid); + f->dump_string("predecessor_mirror_uuid", predecessor_mirror_uuid); + f->dump_string("predecessor_commit_valid", + predecessor_commit_valid ? "true" : "false"); f->dump_unsigned("predecessor_tag_tid", predecessor_tag_tid); f->dump_unsigned("predecessor_entry_tid", predecessor_entry_tid); } void TagData::generate_test_instances(std::list &o) { o.push_back(new TagData()); - o.push_back(new TagData("cluster_id", 123, "image_id")); + o.push_back(new TagData("mirror-uuid")); + o.push_back(new TagData("mirror-uuid", "remote-mirror-uuid", true, 123, 234)); } -} // namespace journal -} // namespace librbd - -std::ostream &operator<<(std::ostream &out, - const librbd::journal::EventType &type) { +std::ostream &operator<<(std::ostream &out, const EventType &type) { using namespace librbd::journal; switch (type) { @@ -506,8 +528,7 @@ std::ostream &operator<<(std::ostream &out, return out; } -std::ostream &operator<<(std::ostream &out, - const librbd::journal::ClientMetaType &type) { +std::ostream &operator<<(std::ostream &out, const ClientMetaType &type) { using namespace librbd::journal; switch (type) { @@ -525,5 +546,26 @@ std::ostream &operator<<(std::ostream &out, break; } return out; - } + +std::ostream &operator<<(std::ostream &out, const ImageClientMeta &meta) { + out << "[tag_class=" << meta.tag_class << "]"; + return out; +} + +std::ostream &operator<<(std::ostream &out, const TagData &tag_data) { + out << "[" + << "mirror_uuid=" << tag_data.mirror_uuid << ", " + << "predecessor_mirror_uuid=" << tag_data.predecessor_mirror_uuid; + if (tag_data.predecessor_commit_valid) { + out << ", " + << "predecessor_tag_tid=" << tag_data.predecessor_tag_tid << ", " + << "predecessor_entry_tid=" << tag_data.predecessor_entry_tid; + } + out << "]"; + return out; +} + +} // namespace journal +} // namespace librbd + diff --git a/src/librbd/journal/Types.h b/src/librbd/journal/Types.h index 5e3a5c0e161..252ab776759 100644 --- a/src/librbd/journal/Types.h +++ b/src/librbd/journal/Types.h @@ -9,6 +9,9 @@ #include "include/encoding.h" #include "include/types.h" #include +#include +#include +#include #include namespace ceph { @@ -316,21 +319,37 @@ struct ImageClientMeta { void dump(Formatter *f) const; }; +struct MirrorPeerSyncPoint { + typedef boost::optional ObjectNumber; + + std::string snap_name; + ObjectNumber object_number; + + MirrorPeerSyncPoint() : object_number(boost::none) { + } + MirrorPeerSyncPoint(const std::string &snap_name, + const ObjectNumber &object_number) + : snap_name(snap_name), object_number(object_number) { + } + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::iterator& it); + void dump(Formatter *f) const; +}; + struct MirrorPeerClientMeta { + typedef std::list SyncPoints; + static const ClientMetaType TYPE = MIRROR_PEER_CLIENT_META_TYPE; - std::string cluster_id; - int64_t pool_id = 0; std::string image_id; - std::string snap_name; + SyncPoints sync_points; MirrorPeerClientMeta() { } - MirrorPeerClientMeta(const std::string &cluster_id, int64_t pool_id, - const std::string &image_id, - const std::string &snap_name = "") - : cluster_id(cluster_id), pool_id(pool_id), image_id(image_id), - snap_name(snap_name) { + MirrorPeerClientMeta(const std::string &image_id, + const SyncPoints &sync_points = SyncPoints()) + : image_id(image_id), sync_points(sync_points) { } void encode(bufferlist& bl) const; @@ -380,19 +399,27 @@ struct ClientData { struct TagData { // owner of the tag (exclusive lock epoch) - std::string cluster_id; - int64_t pool_id = 0; - std::string image_id; + std::string mirror_uuid; // empty if local // mapping to last committed record of previous tag + std::string predecessor_mirror_uuid; // empty if local + bool predecessor_commit_valid = false; uint64_t predecessor_tag_tid = 0; uint64_t predecessor_entry_tid = 0; TagData() { } - TagData(const std::string &cluster_id, int64_t pool_id, - const std::string &image_id) - : cluster_id(cluster_id), pool_id(pool_id), image_id(image_id) { + TagData(const std::string &mirror_uuid) : mirror_uuid(mirror_uuid) { + } + TagData(const std::string &mirror_uuid, + const std::string &predecessor_mirror_uuid, + bool predecessor_commit_valid, + uint64_t predecessor_tag_tid, uint64_t predecessor_entry_tid) + : mirror_uuid(mirror_uuid), + predecessor_mirror_uuid(predecessor_mirror_uuid), + predecessor_commit_valid(predecessor_commit_valid), + predecessor_tag_tid(predecessor_tag_tid), + predecessor_entry_tid(predecessor_entry_tid) { } void encode(bufferlist& bl) const; @@ -402,14 +429,14 @@ struct TagData { static void generate_test_instances(std::list &o); }; +std::ostream &operator<<(std::ostream &out, const EventType &type); +std::ostream &operator<<(std::ostream &out, const ClientMetaType &type); +std::ostream &operator<<(std::ostream &out, const ImageClientMeta &meta); +std::ostream &operator<<(std::ostream &out, const TagData &tag_data); + } // namespace journal } // namespace librbd -std::ostream &operator<<(std::ostream &out, - const librbd::journal::EventType &type); -std::ostream &operator<<(std::ostream &out, - const librbd::journal::ClientMetaType &type); - WRITE_CLASS_ENCODER(librbd::journal::EventEntry); WRITE_CLASS_ENCODER(librbd::journal::ClientData); WRITE_CLASS_ENCODER(librbd::journal::TagData); diff --git a/src/test/cls_rbd/test_cls_rbd.cc b/src/test/cls_rbd/test_cls_rbd.cc index 8c405685d82..cd8283e5410 100644 --- a/src/test/cls_rbd/test_cls_rbd.cc +++ b/src/test/cls_rbd/test_cls_rbd.cc @@ -1297,20 +1297,31 @@ TEST_F(TestClsRbd, mirror) { std::vector peers; ASSERT_EQ(-ENOENT, mirror_peer_list(&ioctx, &peers)); + std::string uuid; + ASSERT_EQ(-ENOENT, mirror_uuid_get(&ioctx, &uuid)); ASSERT_EQ(-EINVAL, mirror_peer_add(&ioctx, "uuid1", "cluster1", "client")); cls::rbd::MirrorMode mirror_mode; ASSERT_EQ(0, mirror_mode_get(&ioctx, &mirror_mode)); ASSERT_EQ(cls::rbd::MIRROR_MODE_DISABLED, mirror_mode); + ASSERT_EQ(-EINVAL, mirror_mode_set(&ioctx, cls::rbd::MIRROR_MODE_IMAGE)); + ASSERT_EQ(-EINVAL, mirror_uuid_set(&ioctx, "")); + ASSERT_EQ(0, mirror_uuid_set(&ioctx, "mirror-uuid")); + ASSERT_EQ(0, mirror_uuid_get(&ioctx, &uuid)); + ASSERT_EQ("mirror-uuid", uuid); + ASSERT_EQ(0, mirror_mode_set(&ioctx, cls::rbd::MIRROR_MODE_IMAGE)); ASSERT_EQ(0, mirror_mode_get(&ioctx, &mirror_mode)); ASSERT_EQ(cls::rbd::MIRROR_MODE_IMAGE, mirror_mode); + ASSERT_EQ(-EINVAL, mirror_uuid_set(&ioctx, "new-mirror-uuid")); + ASSERT_EQ(0, mirror_mode_set(&ioctx, cls::rbd::MIRROR_MODE_POOL)); ASSERT_EQ(0, mirror_mode_get(&ioctx, &mirror_mode)); ASSERT_EQ(cls::rbd::MIRROR_MODE_POOL, mirror_mode); + ASSERT_EQ(-EINVAL, mirror_peer_add(&ioctx, "mirror-uuid", "cluster1", "client")); ASSERT_EQ(0, mirror_peer_add(&ioctx, "uuid1", "cluster1", "client")); ASSERT_EQ(0, mirror_peer_add(&ioctx, "uuid2", "cluster2", "admin")); ASSERT_EQ(-ESTALE, mirror_peer_add(&ioctx, "uuid2", "cluster3", "foo")); @@ -1354,6 +1365,7 @@ TEST_F(TestClsRbd, mirror) { ASSERT_EQ(0, mirror_mode_set(&ioctx, cls::rbd::MIRROR_MODE_DISABLED)); ASSERT_EQ(0, mirror_mode_get(&ioctx, &mirror_mode)); ASSERT_EQ(cls::rbd::MIRROR_MODE_DISABLED, mirror_mode); + ASSERT_EQ(-ENOENT, mirror_uuid_get(&ioctx, &uuid)); } TEST_F(TestClsRbd, mirror_image) { diff --git a/src/test/journal/test_JournalPlayer.cc b/src/test/journal/test_JournalPlayer.cc index 7612b4982f5..4eb5628e59b 100644 --- a/src/test/journal/test_JournalPlayer.cc +++ b/src/test/journal/test_JournalPlayer.cc @@ -249,11 +249,12 @@ TEST_F(TestJournalPlayer, PrefetchMultipleTags) { journal::JournalPlayer::ObjectPositions positions; positions = { - cls::journal::ObjectPosition(0, 234, 122), - cls::journal::ObjectPosition(1, 345, 1)}; + cls::journal::ObjectPosition(2, 234, 122), + cls::journal::ObjectPosition(1, 234, 121), + cls::journal::ObjectPosition(0, 234, 120)}; cls::journal::ObjectSetPosition commit_position(positions); - ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, create(oid, 3)); ASSERT_EQ(0, client_register(oid)); ASSERT_EQ(0, client_commit(oid, commit_position)); @@ -263,13 +264,11 @@ TEST_F(TestJournalPlayer, PrefetchMultipleTags) { journal::JournalPlayer *player = create_player(oid, metadata); ASSERT_EQ(0, write_entry(oid, 0, 234, 120)); - ASSERT_EQ(0, write_entry(oid, 0, 345, 0)); ASSERT_EQ(0, write_entry(oid, 1, 234, 121)); - ASSERT_EQ(0, write_entry(oid, 1, 345, 1)); - ASSERT_EQ(0, write_entry(oid, 0, 234, 122)); - ASSERT_EQ(0, write_entry(oid, 1, 234, 123)); - ASSERT_EQ(0, write_entry(oid, 0, 234, 124)); - ASSERT_EQ(0, write_entry(oid, 0, 345, 2)); + ASSERT_EQ(0, write_entry(oid, 2, 234, 122)); + ASSERT_EQ(0, write_entry(oid, 0, 234, 123)); + ASSERT_EQ(0, write_entry(oid, 1, 234, 124)); + ASSERT_EQ(0, write_entry(oid, 0, 236, 0)); // new tag allocated player->prefetch(); @@ -280,8 +279,8 @@ TEST_F(TestJournalPlayer, PrefetchMultipleTags) { uint64_t last_tid; ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid)); ASSERT_EQ(124U, last_tid); - ASSERT_TRUE(metadata->get_last_allocated_entry_tid(345, &last_tid)); - ASSERT_EQ(2U, last_tid); + ASSERT_TRUE(metadata->get_last_allocated_entry_tid(236, &last_tid)); + ASSERT_EQ(0U, last_tid); } TEST_F(TestJournalPlayer, PrefetchCorruptSequence) { @@ -299,13 +298,41 @@ TEST_F(TestJournalPlayer, PrefetchCorruptSequence) { journal::JournalPlayer *player = create_player(oid, metadata); ASSERT_EQ(0, write_entry(oid, 0, 234, 120)); - ASSERT_EQ(0, write_entry(oid, 0, 345, 0)); ASSERT_EQ(0, write_entry(oid, 1, 234, 121)); ASSERT_EQ(0, write_entry(oid, 0, 234, 124)); player->prefetch(); Entries entries; - ASSERT_TRUE(wait_for_entries(player, 3, &entries)); + ASSERT_TRUE(wait_for_entries(player, 2, &entries)); + + journal::Entry entry; + uint64_t commit_tid; + ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid)); + ASSERT_TRUE(wait_for_complete(player)); + ASSERT_EQ(-ENOMSG, m_replay_hander.complete_result); +} + +TEST_F(TestJournalPlayer, PrefetchUnexpectedTag) { + std::string oid = get_temp_oid(); + + cls::journal::ObjectSetPosition commit_position; + + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + ASSERT_EQ(0, client_commit(oid, commit_position)); + + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + journal::JournalPlayer *player = create_player(oid, metadata); + + ASSERT_EQ(0, write_entry(oid, 0, 234, 120)); + ASSERT_EQ(0, write_entry(oid, 1, 235, 121)); + ASSERT_EQ(0, write_entry(oid, 0, 234, 124)); + + player->prefetch(); + Entries entries; + ASSERT_TRUE(wait_for_entries(player, 1, &entries)); journal::Entry entry; uint64_t commit_tid; diff --git a/src/test/journal/test_ObjectPlayer.cc b/src/test/journal/test_ObjectPlayer.cc index 586f97094a6..6103ee6b67f 100644 --- a/src/test/journal/test_ObjectPlayer.cc +++ b/src/test/journal/test_ObjectPlayer.cc @@ -267,7 +267,7 @@ TEST_F(TestObjectPlayer, Unwatch) { bool done = false; int rval = 0; C_SafeCond *ctx = new C_SafeCond(&mutex, &cond, &done, &rval); - object->watch(ctx, 0.1); + object->watch(ctx, 600); usleep(200000); ASSERT_FALSE(done); diff --git a/src/test/librbd/exclusive_lock/test_mock_AcquireRequest.cc b/src/test/librbd/exclusive_lock/test_mock_AcquireRequest.cc index 60b4c5a5e6a..9d67982f8c9 100644 --- a/src/test/librbd/exclusive_lock/test_mock_AcquireRequest.cc +++ b/src/test/librbd/exclusive_lock/test_mock_AcquireRequest.cc @@ -17,6 +17,7 @@ #include // template definitions +#include "librbd/Journal.cc" #include "librbd/exclusive_lock/AcquireRequest.cc" template class librbd::exclusive_lock::AcquireRequest; @@ -80,6 +81,22 @@ public: .WillOnce(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue)); } + void expect_close_journal(MockImageCtx &mock_image_ctx, + MockJournal &mock_journal) { + EXPECT_CALL(mock_journal, close(_)) + .WillOnce(CompleteContext(0, mock_image_ctx.image_ctx->op_work_queue)); + } + + void expect_is_journal_tag_owner(MockJournal &mock_journal, bool owner) { + EXPECT_CALL(mock_journal, is_tag_owner()).WillOnce(Return(owner)); + } + + void expect_allocate_journal_tag(MockImageCtx &mock_image_ctx, + MockJournal &mock_journal, int r) { + EXPECT_CALL(mock_journal, allocate_tag("", _)) + .WillOnce(WithArg<1>(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue))); + } + void expect_get_lock_info(MockImageCtx &mock_image_ctx, int r, const entity_name_t &locker_entity, const std::string &locker_address, @@ -171,6 +188,8 @@ TEST_F(TestMockExclusiveLockAcquireRequest, Success) { expect_test_features(mock_image_ctx, RBD_FEATURE_JOURNALING, true); expect_create_journal(mock_image_ctx, &mock_journal); expect_open_journal(mock_image_ctx, mock_journal, 0); + expect_is_journal_tag_owner(mock_journal, true); + expect_allocate_journal_tag(mock_image_ctx, mock_journal, 0); C_SaferCond acquire_ctx; C_SaferCond ctx; @@ -231,6 +250,8 @@ TEST_F(TestMockExclusiveLockAcquireRequest, SuccessObjectMapDisabled) { expect_test_features(mock_image_ctx, RBD_FEATURE_JOURNALING, true); expect_create_journal(mock_image_ctx, &mock_journal); expect_open_journal(mock_image_ctx, mock_journal, 0); + expect_is_journal_tag_owner(mock_journal, true); + expect_allocate_journal_tag(mock_image_ctx, mock_journal, 0); C_SaferCond acquire_ctx; C_SaferCond ctx; @@ -264,6 +285,7 @@ TEST_F(TestMockExclusiveLockAcquireRequest, JournalError) { expect_test_features(mock_image_ctx, RBD_FEATURE_JOURNALING, true); expect_create_journal(mock_image_ctx, mock_journal); expect_open_journal(mock_image_ctx, *mock_journal, -EINVAL); + expect_close_journal(mock_image_ctx, *mock_journal); expect_close_object_map(mock_image_ctx, *mock_object_map); C_SaferCond acquire_ctx; @@ -275,6 +297,77 @@ TEST_F(TestMockExclusiveLockAcquireRequest, JournalError) { ASSERT_EQ(-EINVAL, ctx.wait()); } +TEST_F(TestMockExclusiveLockAcquireRequest, NotJournalTagOwner) { + REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK); + + librbd::ImageCtx *ictx; + ASSERT_EQ(0, open_image(m_image_name, &ictx)); + + MockImageCtx mock_image_ctx(*ictx); + expect_op_work_queue(mock_image_ctx); + + InSequence seq; + expect_flush_notifies(mock_image_ctx); + expect_lock(mock_image_ctx, 0); + + MockObjectMap *mock_object_map = new MockObjectMap(); + expect_test_features(mock_image_ctx, RBD_FEATURE_OBJECT_MAP, true); + expect_create_object_map(mock_image_ctx, mock_object_map); + expect_open_object_map(mock_image_ctx, *mock_object_map); + + MockJournal *mock_journal = new MockJournal(); + expect_test_features(mock_image_ctx, RBD_FEATURE_JOURNALING, true); + expect_create_journal(mock_image_ctx, mock_journal); + expect_open_journal(mock_image_ctx, *mock_journal, 0); + expect_is_journal_tag_owner(*mock_journal, false); + expect_close_journal(mock_image_ctx, *mock_journal); + expect_close_object_map(mock_image_ctx, *mock_object_map); + + C_SaferCond acquire_ctx; + C_SaferCond ctx; + MockAcquireRequest *req = MockAcquireRequest::create(mock_image_ctx, + TEST_COOKIE, + &acquire_ctx, &ctx); + req->send(); + ASSERT_EQ(-EPERM, ctx.wait()); +} + +TEST_F(TestMockExclusiveLockAcquireRequest, AllocateJournalTagError) { + REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK); + + librbd::ImageCtx *ictx; + ASSERT_EQ(0, open_image(m_image_name, &ictx)); + + MockImageCtx mock_image_ctx(*ictx); + expect_op_work_queue(mock_image_ctx); + + InSequence seq; + expect_flush_notifies(mock_image_ctx); + expect_lock(mock_image_ctx, 0); + + MockObjectMap *mock_object_map = new MockObjectMap(); + expect_test_features(mock_image_ctx, RBD_FEATURE_OBJECT_MAP, true); + expect_create_object_map(mock_image_ctx, mock_object_map); + expect_open_object_map(mock_image_ctx, *mock_object_map); + + MockJournal *mock_journal = new MockJournal(); + expect_test_features(mock_image_ctx, RBD_FEATURE_JOURNALING, true); + expect_create_journal(mock_image_ctx, mock_journal); + expect_open_journal(mock_image_ctx, *mock_journal, 0); + expect_is_journal_tag_owner(*mock_journal, true); + expect_allocate_journal_tag(mock_image_ctx, *mock_journal, -ESTALE); + expect_close_journal(mock_image_ctx, *mock_journal); + expect_close_object_map(mock_image_ctx, *mock_object_map); + + C_SaferCond acquire_ctx; + C_SaferCond ctx; + MockAcquireRequest *req = MockAcquireRequest::create(mock_image_ctx, + TEST_COOKIE, + &acquire_ctx, &ctx); + req->send(); + ASSERT_EQ(-ESTALE, ctx.wait()); +} + TEST_F(TestMockExclusiveLockAcquireRequest, LockBusy) { REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK); diff --git a/src/test/librbd/journal/test_Replay.cc b/src/test/librbd/journal/test_Replay.cc index 7e048a04e55..49fd479a250 100644 --- a/src/test/librbd/journal/test_Replay.cc +++ b/src/test/librbd/journal/test_Replay.cc @@ -44,7 +44,8 @@ public: } } - void get_journal_commit_position(librbd::ImageCtx *ictx, int64_t *tid) + void get_journal_commit_position(librbd::ImageCtx *ictx, int64_t *tag, + int64_t *entry) { const std::string client_id = ""; std::string journal_id = ictx->id; @@ -69,19 +70,16 @@ public: break; } } - if (c == registered_clients.end()) { - *tid = -1; - return; + if (c == registered_clients.end() || + c->commit_position.object_positions.empty()) { + *tag = 0; + *entry = -1; + } else { + const cls::journal::ObjectPosition &object_position = + *c->commit_position.object_positions.begin(); + *tag = object_position.tag_tid; + *entry = object_position.entry_tid; } - cls::journal::ObjectPositions object_positions = - c->commit_position.object_positions; - cls::journal::ObjectPositions::const_iterator p; - for (p = object_positions.begin(); p != object_positions.end(); p++) { - if (p->tag_tid == 0) { - break; - } - } - *tid = p == object_positions.end() ? -1 : p->entry_tid; C_SaferCond open_cond; ictx->journal = new librbd::Journal<>(*ictx); @@ -123,8 +121,9 @@ TEST_F(TestJournalReplay, AioDiscardEvent) { ASSERT_EQ(0, when_acquired_lock(ictx)); // get current commit position - int64_t initial; - get_journal_commit_position(ictx, &initial); + int64_t initial_tag; + int64_t initial_entry; + get_journal_commit_position(ictx, &initial_tag, &initial_entry); // inject a discard operation into the journal inject_into_journal(ictx, @@ -142,9 +141,11 @@ TEST_F(TestJournalReplay, AioDiscardEvent) { ASSERT_EQ(std::string(read_payload.size(), '\0'), read_payload); // check the commit position is properly updated - int64_t current; - get_journal_commit_position(ictx, ¤t); - ASSERT_EQ(current, initial + 1); + int64_t current_tag; + int64_t current_entry; + get_journal_commit_position(ictx, ¤t_tag, ¤t_entry); + ASSERT_EQ(initial_tag + 1, current_tag); + ASSERT_EQ(0, current_entry); // replay several envents and check the commit position inject_into_journal(ictx, @@ -153,8 +154,9 @@ TEST_F(TestJournalReplay, AioDiscardEvent) { librbd::journal::AioDiscardEvent(0, payload.size())); ASSERT_EQ(0, open_image(m_image_name, &ictx)); ASSERT_EQ(0, when_acquired_lock(ictx)); - get_journal_commit_position(ictx, ¤t); - ASSERT_EQ(current, initial + 3); + get_journal_commit_position(ictx, ¤t_tag, ¤t_entry); + ASSERT_EQ(initial_tag + 2, current_tag); + ASSERT_EQ(1, current_entry); // verify lock ordering constraints aio_comp = new librbd::AioCompletion(); @@ -171,8 +173,9 @@ TEST_F(TestJournalReplay, AioWriteEvent) { ASSERT_EQ(0, when_acquired_lock(ictx)); // get current commit position - int64_t initial; - get_journal_commit_position(ictx, &initial); + int64_t initial_tag; + int64_t initial_entry; + get_journal_commit_position(ictx, &initial_tag, &initial_entry); // inject a write operation into the journal std::string payload(4096, '1'); @@ -194,9 +197,11 @@ TEST_F(TestJournalReplay, AioWriteEvent) { ASSERT_EQ(payload, read_payload); // check the commit position is properly updated - int64_t current; - get_journal_commit_position(ictx, ¤t); - ASSERT_EQ(current, initial + 1); + int64_t current_tag; + int64_t current_entry; + get_journal_commit_position(ictx, ¤t_tag, ¤t_entry); + ASSERT_EQ(initial_tag + 1, current_tag); + ASSERT_EQ(0, current_entry); // replay several events and check the commit position inject_into_journal(ictx, @@ -205,8 +210,9 @@ TEST_F(TestJournalReplay, AioWriteEvent) { librbd::journal::AioWriteEvent(0, payload.size(), payload_bl)); ASSERT_EQ(0, open_image(m_image_name, &ictx)); ASSERT_EQ(0, when_acquired_lock(ictx)); - get_journal_commit_position(ictx, ¤t); - ASSERT_EQ(current, initial + 3); + get_journal_commit_position(ictx, ¤t_tag, ¤t_entry); + ASSERT_EQ(initial_tag + 2, current_tag); + ASSERT_EQ(1, current_entry); // verify lock ordering constraints aio_comp = new librbd::AioCompletion(); @@ -225,8 +231,9 @@ TEST_F(TestJournalReplay, AioFlushEvent) { ASSERT_EQ(0, when_acquired_lock(ictx)); // get current commit position - int64_t initial; - get_journal_commit_position(ictx, &initial); + int64_t initial_tag; + int64_t initial_entry; + get_journal_commit_position(ictx, &initial_tag, &initial_entry); // inject a flush operation into the journal inject_into_journal(ictx, librbd::journal::AioFlushEvent()); @@ -261,17 +268,20 @@ TEST_F(TestJournalReplay, AioFlushEvent) { ASSERT_EQ(payload, read_payload); // check the commit position is properly updated - int64_t current; - get_journal_commit_position(ictx, ¤t); - ASSERT_EQ(current, initial + 1); + int64_t current_tag; + int64_t current_entry; + get_journal_commit_position(ictx, ¤t_tag, ¤t_entry); + ASSERT_EQ(initial_tag + 1, current_tag); + ASSERT_EQ(0, current_entry); // replay several events and check the commit position inject_into_journal(ictx, librbd::journal::AioFlushEvent()); inject_into_journal(ictx, librbd::journal::AioFlushEvent()); ASSERT_EQ(0, open_image(m_image_name, &ictx)); ASSERT_EQ(0, when_acquired_lock(ictx)); - get_journal_commit_position(ictx, ¤t); - ASSERT_EQ(current, initial + 3); + get_journal_commit_position(ictx, ¤t_tag, ¤t_entry); + ASSERT_EQ(initial_tag + 2, current_tag); + ASSERT_EQ(1, current_entry); // verify lock ordering constraints aio_comp = new librbd::AioCompletion(); @@ -289,8 +299,9 @@ TEST_F(TestJournalReplay, SnapCreate) { ASSERT_EQ(0, when_acquired_lock(ictx)); // get current commit position - int64_t initial; - get_journal_commit_position(ictx, &initial); + int64_t initial_tag; + int64_t initial_entry; + get_journal_commit_position(ictx, &initial_tag, &initial_entry); // inject snapshot ops into journal inject_into_journal(ictx, librbd::journal::SnapCreateEvent(1, "snap")); @@ -300,9 +311,11 @@ TEST_F(TestJournalReplay, SnapCreate) { ASSERT_EQ(0, open_image(m_image_name, &ictx)); ASSERT_EQ(0, when_acquired_lock(ictx)); - int64_t current; - get_journal_commit_position(ictx, ¤t); - ASSERT_EQ(current, initial + 2); + int64_t current_tag; + int64_t current_entry; + get_journal_commit_position(ictx, ¤t_tag, ¤t_entry); + ASSERT_EQ(initial_tag + 1, current_tag); + ASSERT_EQ(1, current_entry); { RWLock::RLocker snap_locker(ictx->snap_lock); @@ -324,8 +337,9 @@ TEST_F(TestJournalReplay, SnapProtect) { ASSERT_EQ(0, ictx->operations->snap_create("snap")); // get current commit position - int64_t initial; - get_journal_commit_position(ictx, &initial); + int64_t initial_tag; + int64_t initial_entry; + get_journal_commit_position(ictx, &initial_tag, &initial_entry); // inject snapshot ops into journal inject_into_journal(ictx, librbd::journal::SnapProtectEvent(1, "snap")); @@ -335,9 +349,11 @@ TEST_F(TestJournalReplay, SnapProtect) { ASSERT_EQ(0, open_image(m_image_name, &ictx)); ASSERT_EQ(0, when_acquired_lock(ictx)); - int64_t current; - get_journal_commit_position(ictx, ¤t); - ASSERT_EQ(current, initial + 2); + int64_t current_tag; + int64_t current_entry; + get_journal_commit_position(ictx, ¤t_tag, ¤t_entry); + ASSERT_EQ(initial_tag, current_tag); + ASSERT_EQ(initial_entry + 2, current_entry); bool is_protected; ASSERT_EQ(0, librbd::snap_is_protected(ictx, "snap", &is_protected)); @@ -366,8 +382,9 @@ TEST_F(TestJournalReplay, SnapUnprotect) { ASSERT_EQ(0, ictx->operations->snap_protect("snap")); // get current commit position - int64_t initial; - get_journal_commit_position(ictx, &initial); + int64_t initial_tag; + int64_t initial_entry; + get_journal_commit_position(ictx, &initial_tag, &initial_entry); // inject snapshot ops into journal inject_into_journal(ictx, librbd::journal::SnapUnprotectEvent(1, "snap")); @@ -377,9 +394,11 @@ TEST_F(TestJournalReplay, SnapUnprotect) { ASSERT_EQ(0, open_image(m_image_name, &ictx)); ASSERT_EQ(0, when_acquired_lock(ictx)); - int64_t current; - get_journal_commit_position(ictx, ¤t); - ASSERT_EQ(current, initial + 2); + int64_t current_tag; + int64_t current_entry; + get_journal_commit_position(ictx, ¤t_tag, ¤t_entry); + ASSERT_EQ(initial_tag, current_tag); + ASSERT_EQ(initial_entry + 2, current_entry); bool is_protected; ASSERT_EQ(0, librbd::snap_is_protected(ictx, "snap", &is_protected)); @@ -408,8 +427,9 @@ TEST_F(TestJournalReplay, SnapRename) { } // get current commit position - int64_t initial; - get_journal_commit_position(ictx, &initial); + int64_t initial_tag; + int64_t initial_entry; + get_journal_commit_position(ictx, &initial_tag, &initial_entry); // inject snapshot ops into journal inject_into_journal(ictx, librbd::journal::SnapRenameEvent(1, snap_id, "snap2")); @@ -419,9 +439,11 @@ TEST_F(TestJournalReplay, SnapRename) { ASSERT_EQ(0, open_image(m_image_name, &ictx)); ASSERT_EQ(0, when_acquired_lock(ictx)); - int64_t current; - get_journal_commit_position(ictx, ¤t); - ASSERT_EQ(current, initial + 2); + int64_t current_tag; + int64_t current_entry; + get_journal_commit_position(ictx, ¤t_tag, ¤t_entry); + ASSERT_EQ(initial_tag, current_tag); + ASSERT_EQ(initial_entry + 2, current_entry); { RWLock::RLocker snap_locker(ictx->snap_lock); @@ -444,8 +466,9 @@ TEST_F(TestJournalReplay, SnapRollback) { ASSERT_EQ(0, ictx->operations->snap_create("snap")); // get current commit position - int64_t initial; - get_journal_commit_position(ictx, &initial); + int64_t initial_tag; + int64_t initial_entry; + get_journal_commit_position(ictx, &initial_tag, &initial_entry); // inject snapshot ops into journal inject_into_journal(ictx, librbd::journal::SnapRollbackEvent(1, "snap")); @@ -455,9 +478,11 @@ TEST_F(TestJournalReplay, SnapRollback) { ASSERT_EQ(0, open_image(m_image_name, &ictx)); ASSERT_EQ(0, when_acquired_lock(ictx)); - int64_t current; - get_journal_commit_position(ictx, ¤t); - ASSERT_EQ(current, initial + 2); + int64_t current_tag; + int64_t current_entry; + get_journal_commit_position(ictx, ¤t_tag, ¤t_entry); + ASSERT_EQ(initial_tag, current_tag); + ASSERT_EQ(initial_entry + 2, current_entry); // verify lock ordering constraints librbd::NoOpProgressContext no_op_progress; @@ -475,8 +500,9 @@ TEST_F(TestJournalReplay, SnapRemove) { ASSERT_EQ(0, ictx->operations->snap_create("snap")); // get current commit position - int64_t initial; - get_journal_commit_position(ictx, &initial); + int64_t initial_tag; + int64_t initial_entry; + get_journal_commit_position(ictx, &initial_tag, &initial_entry); // inject snapshot ops into journal inject_into_journal(ictx, librbd::journal::SnapRemoveEvent(1, "snap")); @@ -486,9 +512,11 @@ TEST_F(TestJournalReplay, SnapRemove) { ASSERT_EQ(0, open_image(m_image_name, &ictx)); ASSERT_EQ(0, when_acquired_lock(ictx)); - int64_t current; - get_journal_commit_position(ictx, ¤t); - ASSERT_EQ(current, initial + 2); + int64_t current_tag; + int64_t current_entry; + get_journal_commit_position(ictx, ¤t_tag, ¤t_entry); + ASSERT_EQ(initial_tag, current_tag); + ASSERT_EQ(initial_entry + 2, current_entry); { RWLock::RLocker snap_locker(ictx->snap_lock); @@ -510,8 +538,9 @@ TEST_F(TestJournalReplay, Rename) { ASSERT_EQ(0, when_acquired_lock(ictx)); // get current commit position - int64_t initial; - get_journal_commit_position(ictx, &initial); + int64_t initial_tag; + int64_t initial_entry; + get_journal_commit_position(ictx, &initial_tag, &initial_entry); // inject snapshot ops into journal std::string new_image_name(get_temp_image_name()); @@ -522,9 +551,11 @@ TEST_F(TestJournalReplay, Rename) { ASSERT_EQ(0, open_image(m_image_name, &ictx)); ASSERT_EQ(0, when_acquired_lock(ictx)); - int64_t current; - get_journal_commit_position(ictx, ¤t); - ASSERT_EQ(current, initial + 2); + int64_t current_tag; + int64_t current_entry; + get_journal_commit_position(ictx, ¤t_tag, ¤t_entry); + ASSERT_EQ(initial_tag + 1, current_tag); + ASSERT_EQ(1, current_entry); // verify lock ordering constraints librbd::RBD rbd; @@ -540,8 +571,9 @@ TEST_F(TestJournalReplay, Resize) { ASSERT_EQ(0, when_acquired_lock(ictx)); // get current commit position - int64_t initial; - get_journal_commit_position(ictx, &initial); + int64_t initial_tag; + int64_t initial_entry; + get_journal_commit_position(ictx, &initial_tag, &initial_entry); // inject snapshot ops into journal inject_into_journal(ictx, librbd::journal::ResizeEvent(1, 16)); @@ -551,9 +583,11 @@ TEST_F(TestJournalReplay, Resize) { ASSERT_EQ(0, open_image(m_image_name, &ictx)); ASSERT_EQ(0, when_acquired_lock(ictx)); - int64_t current; - get_journal_commit_position(ictx, ¤t); - ASSERT_EQ(current, initial + 2); + int64_t current_tag; + int64_t current_entry; + get_journal_commit_position(ictx, ¤t_tag, ¤t_entry); + ASSERT_EQ(initial_tag + 1, current_tag); + ASSERT_EQ(1, current_entry); // verify lock ordering constraints librbd::NoOpProgressContext no_op_progress; @@ -578,8 +612,9 @@ TEST_F(TestJournalReplay, Flatten) { ASSERT_EQ(0, when_acquired_lock(ictx2)); // get current commit position - int64_t initial; - get_journal_commit_position(ictx2, &initial); + int64_t initial_tag; + int64_t initial_entry; + get_journal_commit_position(ictx2, &initial_tag, &initial_entry); // inject snapshot ops into journal inject_into_journal(ictx2, librbd::journal::FlattenEvent(1)); @@ -589,9 +624,11 @@ TEST_F(TestJournalReplay, Flatten) { ASSERT_EQ(0, open_image(clone_name, &ictx2)); ASSERT_EQ(0, when_acquired_lock(ictx2)); - int64_t current; - get_journal_commit_position(ictx2, ¤t); - ASSERT_EQ(current, initial + 2); + int64_t current_tag; + int64_t current_entry; + get_journal_commit_position(ictx2, ¤t_tag, ¤t_entry); + ASSERT_EQ(initial_tag + 1, current_tag); + ASSERT_EQ(1, current_entry); ASSERT_EQ(0, ictx->operations->snap_unprotect("snap")); // verify lock ordering constraints @@ -607,8 +644,9 @@ TEST_F(TestJournalReplay, ObjectPosition) { ASSERT_EQ(0, when_acquired_lock(ictx)); // get current commit position - int64_t initial; - get_journal_commit_position(ictx, &initial); + int64_t initial_tag; + int64_t initial_entry; + get_journal_commit_position(ictx, &initial_tag, &initial_entry); std::string payload(4096, '1'); librbd::AioCompletion *aio_comp = new librbd::AioCompletion(); @@ -623,9 +661,11 @@ TEST_F(TestJournalReplay, ObjectPosition) { aio_comp->release(); // check the commit position updated - int64_t current; - get_journal_commit_position(ictx, ¤t); - ASSERT_EQ(current, initial + 2); + int64_t current_tag; + int64_t current_entry; + get_journal_commit_position(ictx, ¤t_tag, ¤t_entry); + ASSERT_EQ(initial_tag + 1, current_tag); + ASSERT_EQ(1, current_entry); // write again @@ -641,6 +681,7 @@ TEST_F(TestJournalReplay, ObjectPosition) { aio_comp->release(); // check the commit position updated - get_journal_commit_position(ictx, ¤t); - ASSERT_EQ(current, initial + 4); + get_journal_commit_position(ictx, ¤t_tag, ¤t_entry); + ASSERT_EQ(initial_tag + 1, current_tag); + ASSERT_EQ(3, current_entry); } diff --git a/src/test/librbd/mock/MockJournal.h b/src/test/librbd/mock/MockJournal.h index 9fe35185060..1393399d555 100644 --- a/src/test/librbd/mock/MockJournal.h +++ b/src/test/librbd/mock/MockJournal.h @@ -16,6 +16,9 @@ struct MockJournal { MOCK_METHOD1(wait_for_journal_ready, void(Context *)); + MOCK_CONST_METHOD0(is_tag_owner, bool()); + MOCK_METHOD2(allocate_tag, void(const std::string &, Context *)); + MOCK_METHOD1(open, void(Context *)); MOCK_METHOD1(close, void(Context *)); diff --git a/src/test/librbd/test_mock_Journal.cc b/src/test/librbd/test_mock_Journal.cc index 5c506199d57..c2c6a73b931 100644 --- a/src/test/librbd/test_mock_Journal.cc +++ b/src/test/librbd/test_mock_Journal.cc @@ -7,6 +7,7 @@ #include "common/Cond.h" #include "common/Mutex.h" #include "cls/journal/cls_journal_types.h" +#include "journal/Journaler.h" #include "librbd/Journal.h" #include "librbd/Utils.h" #include "librbd/journal/Replay.h" @@ -87,6 +88,9 @@ struct MockJournaler { MOCK_METHOD1(init, void(Context*)); MOCK_METHOD1(flush_commit_position, void(Context*)); + MOCK_METHOD2(get_cached_client, int(const std::string&, cls::journal::Client*)); + MOCK_METHOD3(get_tags, void(uint64_t, journal::Journaler::Tags*, Context*)); + MOCK_METHOD1(start_replay, void(::journal::ReplayHandler *replay_handler)); MOCK_METHOD1(try_pop_front, bool(MockReplayEntryProxy *replay_entry)); MOCK_METHOD0(stop_replay, void()); @@ -134,6 +138,16 @@ struct MockJournalerProxy { MockJournaler::get_instance().init(on_finish); } + int get_cached_client(const std::string& client_id, + cls::journal::Client* client) { + return MockJournaler::get_instance().get_cached_client(client_id, client); + } + + void get_tags(uint64_t tag_class, journal::Journaler::Tags *tags, + Context *on_finish) { + MockJournaler::get_instance().get_tags(tag_class, tags, on_finish); + } + void flush_commit_position(Context *on_finish) { MockJournaler::get_instance().flush_commit_position(on_finish); } @@ -308,7 +322,36 @@ public: void expect_init_journaler(::journal::MockJournaler &mock_journaler, int r) { EXPECT_CALL(mock_journaler, init(_)) .WillOnce(CompleteContext(r, NULL)); + } + void expect_get_journaler_cached_client(::journal::MockJournaler &mock_journaler, int r) { + + journal::ImageClientMeta image_client_meta; + image_client_meta.tag_class = 0; + + journal::ClientData client_data; + client_data.client_meta = image_client_meta; + + cls::journal::Client client; + ::encode(client_data, client.data); + + EXPECT_CALL(mock_journaler, get_cached_client("", _)) + .WillOnce(DoAll(SetArgPointee<1>(client), + Return(r))); + } + + void expect_get_journaler_tags(MockImageCtx &mock_image_ctx, + ::journal::MockJournaler &mock_journaler, + int r) { + journal::TagData tag_data; + + bufferlist tag_data_bl; + ::encode(tag_data, tag_data_bl); + + ::journal::Journaler::Tags tags = {{0, 0, {}}, {1, 0, tag_data_bl}}; + EXPECT_CALL(mock_journaler, get_tags(0, _, _)) + .WillOnce(DoAll(SetArgPointee<1>(tags), + WithArg<2>(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue)))); } void expect_start_replay(MockJournalImageCtx &mock_image_ctx, @@ -442,6 +485,8 @@ public: InSequence seq; expect_construct_journaler(mock_journaler); expect_init_journaler(mock_journaler, 0); + expect_get_journaler_cached_client(mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, 0) @@ -485,6 +530,8 @@ TEST_F(TestMockJournal, StateTransitions) { ::journal::MockJournaler mock_journaler; expect_construct_journaler(mock_journaler); expect_init_journaler(mock_journaler, 0); + expect_get_journaler_cached_client(mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_ready, _1), @@ -533,6 +580,45 @@ TEST_F(TestMockJournal, InitError) { ASSERT_EQ(-EINVAL, when_open(mock_journal)); } +TEST_F(TestMockJournal, GetCachedClientError) { + REQUIRE_FEATURE(RBD_FEATURE_JOURNALING); + + librbd::ImageCtx *ictx; + ASSERT_EQ(0, open_image(m_image_name, &ictx)); + + MockJournalImageCtx mock_image_ctx(*ictx); + MockJournal mock_journal(mock_image_ctx); + expect_op_work_queue(mock_image_ctx); + + InSequence seq; + + ::journal::MockJournaler mock_journaler; + expect_construct_journaler(mock_journaler); + expect_init_journaler(mock_journaler, 0); + expect_get_journaler_cached_client(mock_journaler, -ENOENT); + ASSERT_EQ(-ENOENT, when_open(mock_journal)); +} + +TEST_F(TestMockJournal, GetTagsError) { + REQUIRE_FEATURE(RBD_FEATURE_JOURNALING); + + librbd::ImageCtx *ictx; + ASSERT_EQ(0, open_image(m_image_name, &ictx)); + + MockJournalImageCtx mock_image_ctx(*ictx); + MockJournal mock_journal(mock_image_ctx); + expect_op_work_queue(mock_image_ctx); + + InSequence seq; + + ::journal::MockJournaler mock_journaler; + expect_construct_journaler(mock_journaler); + expect_init_journaler(mock_journaler, 0); + expect_get_journaler_cached_client(mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, -EBADMSG); + ASSERT_EQ(-EBADMSG, when_open(mock_journal)); +} + TEST_F(TestMockJournal, ReplayCompleteError) { REQUIRE_FEATURE(RBD_FEATURE_JOURNALING); @@ -548,6 +634,8 @@ TEST_F(TestMockJournal, ReplayCompleteError) { ::journal::MockJournaler mock_journaler; expect_construct_journaler(mock_journaler); expect_init_journaler(mock_journaler, 0); + expect_get_journaler_cached_client(mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, -EINVAL) @@ -560,6 +648,8 @@ TEST_F(TestMockJournal, ReplayCompleteError) { // replay failure should result in replay-restart expect_construct_journaler(mock_journaler); expect_init_journaler(mock_journaler, 0); + expect_get_journaler_cached_client(mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, 0) @@ -589,6 +679,8 @@ TEST_F(TestMockJournal, FlushReplayError) { ::journal::MockJournaler mock_journaler; expect_construct_journaler(mock_journaler); expect_init_journaler(mock_journaler, 0); + expect_get_journaler_cached_client(mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_ready, _1), @@ -606,6 +698,8 @@ TEST_F(TestMockJournal, FlushReplayError) { // replay flush failure should result in replay-restart expect_construct_journaler(mock_journaler); expect_init_journaler(mock_journaler, 0); + expect_get_journaler_cached_client(mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, 0) @@ -635,6 +729,8 @@ TEST_F(TestMockJournal, StopError) { ::journal::MockJournaler mock_journaler; expect_construct_journaler(mock_journaler); expect_init_journaler(mock_journaler, 0); + expect_get_journaler_cached_client(mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, 0) @@ -664,6 +760,8 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) { ::journal::MockJournaler mock_journaler; expect_construct_journaler(mock_journaler); expect_init_journaler(mock_journaler, 0); + expect_get_journaler_cached_client(mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); ::journal::ReplayHandler *replay_handler = nullptr; expect_start_replay( @@ -688,6 +786,8 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) { // replay write-to-disk failure should result in replay-restart expect_construct_journaler(mock_journaler); expect_init_journaler(mock_journaler, 0); + expect_get_journaler_cached_client(mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, 0) @@ -738,6 +838,8 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) { ::journal::MockJournaler mock_journaler; expect_construct_journaler(mock_journaler); expect_init_journaler(mock_journaler, 0); + expect_get_journaler_cached_client(mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_ready, _1), @@ -759,6 +861,8 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) { // replay write-to-disk failure should result in replay-restart expect_construct_journaler(mock_journaler); expect_init_journaler(mock_journaler, 0); + expect_get_journaler_cached_client(mock_journaler, 0); + expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0); expect_start_replay( mock_image_ctx, mock_journaler, { std::bind(&invoke_replay_complete, _1, 0) diff --git a/src/test/rbd_mirror/image_replay.cc b/src/test/rbd_mirror/image_replay.cc index cb175f8394b..e23f057996a 100644 --- a/src/test/rbd_mirror/image_replay.cc +++ b/src/test/rbd_mirror/image_replay.cc @@ -104,6 +104,7 @@ int main(int argc, const char **argv) rbd::mirror::ImageReplayer::BootstrapParams bootstap_params(local_pool_name, image_name); + int64_t local_pool_id; int64_t remote_pool_id; std::string remote_image_id; @@ -141,6 +142,14 @@ int main(int argc, const char **argv) goto cleanup; } + r = local->pool_lookup(local_pool_name.c_str()); + if (r < 0) { + derr << "error finding local pool " << local_pool_name + << ": " << cpp_strerror(r) << dendl; + goto cleanup; + } + local_pool_id = r; + r = remote->init_with_context(g_ceph_context); if (r < 0) { derr << "could not initialize rados handle" << dendl; @@ -171,7 +180,8 @@ int main(int argc, const char **argv) dout(5) << "starting replay" << dendl; replayer = new rbd::mirror::ImageReplayer(local, remote, client_id, - remote_pool_id, remote_image_id); + local_pool_id, remote_pool_id, + remote_image_id); r = replayer->start(&bootstap_params); if (r < 0) { diff --git a/src/test/rbd_mirror/test_ImageReplayer.cc b/src/test/rbd_mirror/test_ImageReplayer.cc index 80f9505e294..56172992e0b 100644 --- a/src/test/rbd_mirror/test_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_ImageReplayer.cc @@ -101,7 +101,7 @@ public: m_replayer = new rbd::mirror::ImageReplayer( rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)), rbd::mirror::RadosRef(new librados::Rados(m_remote_ioctx)), - m_client_id, remote_pool_id, m_remote_image_id); + m_client_id, m_local_ioctx.get_id(), remote_pool_id, m_remote_image_id); bootstrap(); } diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index 04e135f098c..d3cd665f091 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -160,13 +160,14 @@ private: ImageReplayer::ImageReplayer(RadosRef local, RadosRef remote, const std::string &client_id, + int64_t local_pool_id, int64_t remote_pool_id, const std::string &remote_image_id) : m_local(local), m_remote(remote), m_client_id(client_id), m_remote_pool_id(remote_pool_id), - m_local_pool_id(-1), + m_local_pool_id(local_pool_id), m_remote_image_id(remote_image_id), m_lock("rbd::mirror::ImageReplayer " + stringify(remote_pool_id) + " " + remote_image_id), @@ -521,9 +522,13 @@ int ImageReplayer::get_registered_client_status(bool *registered) } librbd::journal::MirrorPeerClientMeta &cm = boost::get(client_data.client_meta); - m_local_pool_id = cm.pool_id; m_local_image_id = cm.image_id; - m_snap_name = cm.snap_name; + + // TODO: snap name should be transient + if (cm.sync_points.empty()) { + return -ENOENT; + } + m_snap_name = cm.sync_points.front().snap_name; dout(20) << "client found, pool_id=" << m_local_pool_id << ", image_id=" << m_local_image_id << ", snap_name=" << m_snap_name << dendl; @@ -539,27 +544,17 @@ int ImageReplayer::get_registered_client_status(bool *registered) int ImageReplayer::register_client() { - int r; - - std::string local_cluster_id; - r = m_local->cluster_fsid(&local_cluster_id); - if (r < 0) { - derr << "error retrieving local cluster id: " << cpp_strerror(r) - << dendl; - return r; - } + // TODO allocate snap as part of sync process std::string m_snap_name = ".rbd-mirror." + m_client_id; - dout(20) << "m_cluster_id=" << local_cluster_id << ", pool_id=" - << m_local_pool_id << ", image_id=" << m_local_image_id - << ", snap_name=" << m_snap_name << dendl; + dout(20) << "mirror_uuid=" << m_client_id << ", " + << "image_id=" << m_local_image_id << ", " + << "snap_name=" << m_snap_name << dendl; bufferlist client_data; ::encode(librbd::journal::ClientData{librbd::journal::MirrorPeerClientMeta{ - local_cluster_id, m_local_pool_id, m_local_image_id, m_snap_name}}, - client_data); - - r = m_remote_journaler->register_client(client_data); + m_local_image_id, {{m_snap_name, boost::none}}}}, client_data); + int r = m_remote_journaler->register_client(client_data); if (r < 0) { derr << "error registering client: " << cpp_strerror(r) << dendl; return r; diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index 305a1762100..4e94de8ef2e 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -65,7 +65,8 @@ public: public: ImageReplayer(RadosRef local, RadosRef remote, const std::string &client_id, - int64_t remote_pool_id, const std::string &remote_image_id); + int64_t local_pool_id, int64_t remote_pool_id, + const std::string &remote_image_id); virtual ~ImageReplayer(); ImageReplayer(const ImageReplayer&) = delete; ImageReplayer& operator=(const ImageReplayer&) = delete; diff --git a/src/tools/rbd_mirror/Replayer.cc b/src/tools/rbd_mirror/Replayer.cc index 0d8e3f329fe..28b15ecee04 100644 --- a/src/tools/rbd_mirror/Replayer.cc +++ b/src/tools/rbd_mirror/Replayer.cc @@ -118,6 +118,24 @@ void Replayer::set_sources(const map > &images) for (const auto &kv : images) { int64_t pool_id = kv.first; + + // TODO: clean up once remote peer -> image replayer refactored + librados::IoCtx remote_ioctx; + int r = m_remote->ioctx_create2(pool_id, remote_ioctx); + if (r < 0) { + derr << "failed to lookup remote pool " << pool_id << ": " + << cpp_strerror(r) << dendl; + continue; + } + + librados::IoCtx local_ioctx; + r = m_local->ioctx_create(remote_ioctx.get_pool_name().c_str(), local_ioctx); + if (r < 0) { + derr << "failed to lookup local pool " << remote_ioctx.get_pool_name() + << ": " << cpp_strerror(r) << dendl; + continue; + } + // create entry for pool if it doesn't exist auto &pool_replayers = m_images[pool_id]; for (const auto &image_id : kv.second) { @@ -125,6 +143,7 @@ void Replayer::set_sources(const map > &images) unique_ptr image_replayer(new ImageReplayer(m_local, m_remote, m_client_id, + local_ioctx.get_id(), pool_id, image_id)); int r = image_replayer->start();