journal: player shutdown is now handled asynchronously

Fixes: http://tracker.ceph.com/issues/15949
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
Jason Dillaman 2016-05-24 12:06:26 -04:00
parent b70151129b
commit 79b4134667
10 changed files with 282 additions and 68 deletions

View File

@ -22,10 +22,18 @@ void AsyncOpTracker::start_op() {
}
void AsyncOpTracker::finish_op() {
Mutex::Locker locker(m_lock);
assert(m_pending_ops > 0);
if (--m_pending_ops == 0) {
m_cond.Signal();
Context *on_finish = nullptr;
{
Mutex::Locker locker(m_lock);
assert(m_pending_ops > 0);
if (--m_pending_ops == 0) {
m_cond.Signal();
std::swap(on_finish, m_on_finish);
}
}
if (on_finish != nullptr) {
on_finish->complete(0);
}
}
@ -36,4 +44,21 @@ void AsyncOpTracker::wait_for_ops() {
}
}
void AsyncOpTracker::wait_for_ops(Context *on_finish) {
{
Mutex::Locker locker(m_lock);
assert(m_on_finish == nullptr);
if (m_pending_ops > 0) {
m_on_finish = on_finish;
return;
}
}
on_finish->complete(0);
}
bool AsyncOpTracker::empty() {
Mutex::Locker locker(m_lock);
return (m_pending_ops == 0);
}
} // namespace journal

View File

@ -8,6 +8,8 @@
#include "common/Cond.h"
#include "common/Mutex.h"
struct Context;
namespace journal {
class AsyncOpTracker {
@ -19,11 +21,15 @@ public:
void finish_op();
void wait_for_ops();
void wait_for_ops(Context *on_finish);
bool empty();
private:
Mutex m_lock;
Cond m_cond;
uint32_t m_pending_ops;
Context *m_on_finish = nullptr;
};

View File

@ -79,9 +79,10 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
}
JournalPlayer::~JournalPlayer() {
m_async_op_tracker.wait_for_ops();
assert(m_async_op_tracker.empty());
{
Mutex::Locker locker(m_lock);
assert(m_shut_down);
assert(m_fetch_object_numbers.empty());
assert(!m_watch_scheduled);
}
@ -140,16 +141,32 @@ void JournalPlayer::prefetch_and_watch(double interval) {
prefetch();
}
void JournalPlayer::unwatch() {
void JournalPlayer::shut_down(Context *on_finish) {
ldout(m_cct, 20) << __func__ << dendl;
Mutex::Locker locker(m_lock);
assert(!m_shut_down);
m_shut_down = true;
m_watch_enabled = false;
on_finish = utils::create_async_context_callback(
m_journal_metadata, on_finish);
if (m_watch_scheduled) {
for (auto &players : m_object_players) {
players.second.begin()->second->unwatch();
ObjectPlayerPtr object_player = get_object_player();
switch (m_watch_step) {
case WATCH_STEP_FETCH_FIRST:
object_player = m_object_players.begin()->second.begin()->second;
// fallthrough
case WATCH_STEP_FETCH_CURRENT:
object_player->unwatch();
break;
case WATCH_STEP_ASSERT_ACTIVE:
break;
}
m_watch_scheduled = false;
}
m_async_op_tracker.wait_for_ops(on_finish);
}
bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
@ -623,6 +640,10 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
assert(m_fetch_object_numbers.count(object_num) == 1);
m_fetch_object_numbers.erase(object_num);
if (m_shut_down) {
return;
}
if (r == -ENOENT) {
r = 0;
}
@ -647,6 +668,8 @@ void JournalPlayer::schedule_watch() {
// by an incomplete tag sequence
ldout(m_cct, 20) << __func__ << ": asserting active tag="
<< *m_active_tag_tid << dendl;
m_async_op_tracker.start_op();
FunctionContext *ctx = new FunctionContext([this](int r) {
handle_watch_assert_active(r);
});
@ -654,9 +677,9 @@ void JournalPlayer::schedule_watch() {
return;
}
ObjectPlayerPtr object_player;
double watch_interval = m_watch_interval;
ObjectPlayerPtr object_player = get_object_player();
switch (m_watch_step) {
case WATCH_STEP_FETCH_CURRENT:
{
@ -684,21 +707,22 @@ void JournalPlayer::schedule_watch() {
ldout(m_cct, 20) << __func__ << ": scheduling watch on "
<< object_player->get_oid() << dendl;
C_Watch *ctx = new C_Watch(this, object_player->get_object_number());
Context *ctx = utils::create_async_context_callback(
m_journal_metadata, new C_Watch(this, object_player->get_object_number()));
object_player->watch(ctx, watch_interval);
}
void JournalPlayer::handle_watch(uint64_t object_num, int r) {
ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
if (r == -ECANCELED) {
// unwatch of object player(s)
return;
}
Mutex::Locker locker(m_lock);
assert(m_watch_scheduled);
m_watch_scheduled = false;
if (m_shut_down || r == -ECANCELED) {
// unwatch of object player(s)
return;
}
ObjectPlayerPtr object_player = get_object_player(object_num);
if (r == 0 && object_player->empty()) {
// possibly need to prune this empty object player if we've
@ -737,7 +761,10 @@ void JournalPlayer::handle_watch_assert_active(int r) {
}
m_watch_step = WATCH_STEP_FETCH_CURRENT;
schedule_watch();
if (!m_shut_down && m_watch_enabled) {
schedule_watch();
}
m_async_op_tracker.finish_op();
}
void JournalPlayer::notify_entries_available() {

View File

@ -36,7 +36,7 @@ public:
void prefetch();
void prefetch_and_watch(double interval);
void unwatch();
void shut_down(Context *on_finish);
bool try_pop_front(Entry *entry, uint64_t *commit_tid);
@ -79,6 +79,10 @@ private:
uint64_t object_num;
C_Watch(JournalPlayer *player, uint64_t object_num)
: player(player), object_num(object_num) {
player->m_async_op_tracker.start_op();
}
virtual ~C_Watch() {
player->m_async_op_tracker.finish_op();
}
virtual void finish(int r) override {
@ -105,6 +109,7 @@ private:
WatchStep m_watch_step = WATCH_STEP_FETCH_CURRENT;
bool m_watch_prune_active_tag = false;
bool m_shut_down = false;
bool m_handler_notified = false;
ObjectNumbers m_fetch_object_numbers;

View File

@ -317,7 +317,12 @@ bool Journaler::try_pop_front(ReplayEntry *replay_entry,
void Journaler::stop_replay() {
assert(m_player != NULL);
m_player->unwatch();
// TODO
C_SaferCond ctx;
m_player->shut_down(&ctx);
ctx.wait();
delete m_player;
m_player = NULL;
}

View File

@ -21,8 +21,7 @@ ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order),
m_watch_interval(0), m_watch_task(NULL),
m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)),
m_fetch_in_progress(false), m_read_off(0), m_watch_ctx(NULL),
m_watch_in_progress(false) {
m_fetch_in_progress(false), m_read_off(0) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
}
@ -32,8 +31,7 @@ ObjectPlayer::~ObjectPlayer() {
Mutex::Locker timer_locker(m_timer_lock);
Mutex::Locker locker(m_lock);
assert(!m_fetch_in_progress);
assert(!m_watch_in_progress);
assert(m_watch_ctx == NULL);
assert(m_watch_ctx == nullptr);
}
}
@ -62,13 +60,10 @@ void ObjectPlayer::watch(Context *on_fetch, double interval) {
Mutex::Locker timer_locker(m_timer_lock);
m_watch_interval = interval;
assert(m_watch_ctx == NULL);
assert(m_watch_ctx == nullptr);
m_watch_ctx = on_fetch;
// watch callback might lead to re-scheduled watch
if (!m_watch_in_progress) {
schedule_watch();
}
schedule_watch();
}
void ObjectPlayer::unwatch() {
@ -76,13 +71,14 @@ void ObjectPlayer::unwatch() {
Context *watch_ctx = nullptr;
{
Mutex::Locker timer_locker(m_timer_lock);
assert(!m_unwatched);
m_unwatched = true;
cancel_watch();
if (!cancel_watch()) {
return;
}
std::swap(watch_ctx, m_watch_ctx);
while (m_watch_in_progress) {
m_watch_in_progress_cond.Wait(m_timer_lock);
}
}
if (watch_ctx != nullptr) {
@ -190,24 +186,27 @@ void ObjectPlayer::schedule_watch() {
m_timer.add_event_after(m_watch_interval, m_watch_task);
}
void ObjectPlayer::cancel_watch() {
bool ObjectPlayer::cancel_watch() {
assert(m_timer_lock.is_locked());
ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl;
if (m_watch_task != NULL) {
m_timer.cancel_event(m_watch_task);
m_watch_task = NULL;
if (m_watch_task != nullptr) {
bool canceled = m_timer.cancel_event(m_watch_task);
assert(canceled);
m_watch_task = nullptr;
return true;
}
return false;
}
void ObjectPlayer::handle_watch_task() {
assert(m_timer_lock.is_locked());
ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
assert(m_watch_ctx != NULL);
assert(m_watch_ctx != nullptr);
assert(m_watch_task != nullptr);
assert(!m_watch_in_progress);
m_watch_in_progress = true;
m_watch_task = NULL;
m_watch_task = nullptr;
fetch(new C_WatchFetch(this));
}
@ -215,38 +214,31 @@ void ObjectPlayer::handle_watch_fetched(int r) {
ldout(m_cct, 10) << __func__ << ": " << m_oid << " poll complete, r=" << r
<< dendl;
Context *on_finish = nullptr;
Context *watch_ctx = nullptr;
{
Mutex::Locker timer_locker(m_timer_lock);
assert(m_watch_in_progress);
if (r == -ENOENT) {
r = 0;
} else {
m_refetch_required = true;
}
std::swap(on_finish, m_watch_ctx);
}
std::swap(watch_ctx, m_watch_ctx);
if (on_finish != nullptr) {
on_finish->complete(r);
}
{
Mutex::Locker locker(m_timer_lock);
assert(m_watch_in_progress);
// callback might have attempted to re-schedule the watch -- complete now
if (m_watch_ctx != nullptr) {
schedule_watch();
if (m_unwatched) {
m_unwatched = false;
r = -ECANCELED;
}
}
m_watch_in_progress = false;
m_watch_in_progress_cond.Signal();
if (watch_ctx != nullptr) {
watch_ctx->complete(r);
}
}
void ObjectPlayer::C_Fetch::finish(int r) {
r = object_player->handle_fetch_complete(r, read_bl);
object_player.reset();
on_finish->complete(r);
}

View File

@ -117,16 +117,15 @@ private:
EntryKeys m_entry_keys;
InvalidRanges m_invalid_ranges;
Context *m_watch_ctx;
Cond m_watch_in_progress_cond;
bool m_watch_in_progress;
Context *m_watch_ctx = nullptr;
bool m_unwatched = false;
bool m_refetch_required = true;
int handle_fetch_complete(int r, const bufferlist &bl);
void schedule_watch();
void cancel_watch();
bool cancel_watch();
void handle_watch_task();
void handle_watch_fetched(int r);
};

View File

@ -5,12 +5,30 @@
#define CEPH_JOURNAL_UTILS_H
#include "include/int_types.h"
#include "include/Context.h"
#include "include/rados/librados.hpp"
#include <string>
namespace journal {
namespace utils {
namespace detail {
template <typename M>
struct C_AsyncCallback : public Context {
M journal_metadata;
Context *on_finish;
C_AsyncCallback(M journal_metadata, Context *on_finish)
: journal_metadata(journal_metadata), on_finish(on_finish) {
}
virtual void finish(int r) {
journal_metadata->queue(on_finish, r);
}
};
} // namespace detail
template <typename T, void(T::*MF)(int)>
void rados_state_callback(rados_completion_t c, void *arg) {
T *obj = reinterpret_cast<T*>(arg);
@ -24,6 +42,12 @@ std::string unique_lock_name(const std::string &name, void *address);
void rados_ctx_callback(rados_completion_t c, void *arg);
template <typename M>
Context *create_async_context_callback(M journal_metadata, Context *on_finish) {
// use async callback to acquire a clean lock context
return new detail::C_AsyncCallback<M>(journal_metadata, on_finish);
}
} // namespace utils
} // namespace journal

View File

@ -11,6 +11,7 @@
#include "gtest/gtest.h"
#include "test/journal/RadosTestFixture.h"
#include <list>
#include <boost/scope_exit.hpp>
class TestJournalPlayer : public RadosTestFixture {
public:
@ -142,6 +143,11 @@ TEST_F(TestJournalPlayer, Prefetch) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
BOOST_SCOPE_EXIT_ALL( (player) ) {
C_SaferCond unwatch_ctx;
player->shut_down(&unwatch_ctx);
ASSERT_EQ(0, unwatch_ctx.wait());
};
ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
@ -183,6 +189,11 @@ TEST_F(TestJournalPlayer, PrefetchSkip) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
BOOST_SCOPE_EXIT_ALL( (player) ) {
C_SaferCond unwatch_ctx;
player->shut_down(&unwatch_ctx);
ASSERT_EQ(0, unwatch_ctx.wait());
};
ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
@ -213,6 +224,11 @@ TEST_F(TestJournalPlayer, PrefetchWithoutCommit) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
BOOST_SCOPE_EXIT_ALL( (player) ) {
C_SaferCond unwatch_ctx;
player->shut_down(&unwatch_ctx);
ASSERT_EQ(0, unwatch_ctx.wait());
};
ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
@ -248,6 +264,11 @@ TEST_F(TestJournalPlayer, PrefetchMultipleTags) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
BOOST_SCOPE_EXIT_ALL( (player) ) {
C_SaferCond unwatch_ctx;
player->shut_down(&unwatch_ctx);
ASSERT_EQ(0, unwatch_ctx.wait());
};
ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
ASSERT_EQ(0, write_entry(oid, 1, 234, 121));
@ -282,6 +303,11 @@ TEST_F(TestJournalPlayer, PrefetchCorruptSequence) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
BOOST_SCOPE_EXIT_ALL( (player) ) {
C_SaferCond unwatch_ctx;
player->shut_down(&unwatch_ctx);
ASSERT_EQ(0, unwatch_ctx.wait());
};
ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
ASSERT_EQ(0, write_entry(oid, 1, 234, 121));
@ -311,6 +337,11 @@ TEST_F(TestJournalPlayer, PrefetchMissingSequence) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
BOOST_SCOPE_EXIT_ALL( (player) ) {
C_SaferCond unwatch_ctx;
player->shut_down(&unwatch_ctx);
ASSERT_EQ(0, unwatch_ctx.wait());
};
ASSERT_EQ(0, metadata->set_active_set(1));
ASSERT_EQ(0, write_entry(oid, 0, 2, 852));
@ -356,6 +387,11 @@ TEST_F(TestJournalPlayer, PrefetchLargeMissingSequence) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
BOOST_SCOPE_EXIT_ALL( (player) ) {
C_SaferCond unwatch_ctx;
player->shut_down(&unwatch_ctx);
ASSERT_EQ(0, unwatch_ctx.wait());
};
ASSERT_EQ(0, metadata->set_active_set(2));
ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
@ -387,6 +423,11 @@ TEST_F(TestJournalPlayer, PrefetchBlockedNewTag) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
BOOST_SCOPE_EXIT_ALL( (player) ) {
C_SaferCond unwatch_ctx;
player->shut_down(&unwatch_ctx);
ASSERT_EQ(0, unwatch_ctx.wait());
};
ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
@ -421,6 +462,11 @@ TEST_F(TestJournalPlayer, PrefetchStaleEntries) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
BOOST_SCOPE_EXIT_ALL( (player) ) {
C_SaferCond unwatch_ctx;
player->shut_down(&unwatch_ctx);
ASSERT_EQ(0, unwatch_ctx.wait());
};
ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
ASSERT_EQ(0, write_entry(oid, 1, 0, 3));
@ -452,6 +498,11 @@ TEST_F(TestJournalPlayer, PrefetchUnexpectedTag) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
BOOST_SCOPE_EXIT_ALL( (player) ) {
C_SaferCond unwatch_ctx;
player->shut_down(&unwatch_ctx);
ASSERT_EQ(0, unwatch_ctx.wait());
};
ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
ASSERT_EQ(0, write_entry(oid, 1, 235, 121));
@ -484,6 +535,11 @@ TEST_F(TestJournalPlayer, PrefetchAndWatch) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
BOOST_SCOPE_EXIT_ALL( (player) ) {
C_SaferCond unwatch_ctx;
player->shut_down(&unwatch_ctx);
ASSERT_EQ(0, unwatch_ctx.wait());
};
ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
@ -518,6 +574,11 @@ TEST_F(TestJournalPlayer, PrefetchSkippedObject) {
ASSERT_EQ(0, metadata->set_active_set(2));
journal::JournalPlayer *player = create_player(oid, metadata);
BOOST_SCOPE_EXIT_ALL( (player) ) {
C_SaferCond unwatch_ctx;
player->shut_down(&unwatch_ctx);
ASSERT_EQ(0, unwatch_ctx.wait());
};
ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
@ -565,6 +626,11 @@ TEST_F(TestJournalPlayer, ImbalancedJournal) {
metadata->set_minimum_set(2);
journal::JournalPlayer *player = create_player(oid, metadata);
BOOST_SCOPE_EXIT_ALL( (player) ) {
C_SaferCond unwatch_ctx;
player->shut_down(&unwatch_ctx);
ASSERT_EQ(0, unwatch_ctx.wait());
};
ASSERT_EQ(0, write_entry(oid, 8, 300, 0));
ASSERT_EQ(0, write_entry(oid, 8, 301, 0));
@ -607,6 +673,11 @@ TEST_F(TestJournalPlayer, LiveReplayLaggyAppend) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
BOOST_SCOPE_EXIT_ALL( (player) ) {
C_SaferCond unwatch_ctx;
player->shut_down(&unwatch_ctx);
ASSERT_EQ(0, unwatch_ctx.wait());
};
ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
@ -652,6 +723,11 @@ TEST_F(TestJournalPlayer, LiveReplayMissingSequence) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
BOOST_SCOPE_EXIT_ALL( (player) ) {
C_SaferCond unwatch_ctx;
player->shut_down(&unwatch_ctx);
ASSERT_EQ(0, unwatch_ctx.wait());
};
ASSERT_EQ(0, write_entry(oid, 0, 2, 852));
ASSERT_EQ(0, write_entry(oid, 0, 2, 856));
@ -702,6 +778,11 @@ TEST_F(TestJournalPlayer, LiveReplayLargeMissingSequence) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
BOOST_SCOPE_EXIT_ALL( (player) ) {
C_SaferCond unwatch_ctx;
player->shut_down(&unwatch_ctx);
ASSERT_EQ(0, unwatch_ctx.wait());
};
ASSERT_EQ(0, metadata->set_active_set(2));
ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
@ -733,6 +814,11 @@ TEST_F(TestJournalPlayer, LiveReplayBlockedNewTag) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
BOOST_SCOPE_EXIT_ALL( (player) ) {
C_SaferCond unwatch_ctx;
player->shut_down(&unwatch_ctx);
ASSERT_EQ(0, unwatch_ctx.wait());
};
C_SaferCond ctx1;
cls::journal::Tag tag1;
@ -787,6 +873,11 @@ TEST_F(TestJournalPlayer, LiveReplayStaleEntries) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
BOOST_SCOPE_EXIT_ALL( (player) ) {
C_SaferCond unwatch_ctx;
player->shut_down(&unwatch_ctx);
ASSERT_EQ(0, unwatch_ctx.wait());
};
ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
ASSERT_EQ(0, write_entry(oid, 1, 0, 3));
@ -818,6 +909,11 @@ TEST_F(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) {
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
BOOST_SCOPE_EXIT_ALL( (player) ) {
C_SaferCond unwatch_ctx;
player->shut_down(&unwatch_ctx);
ASSERT_EQ(0, unwatch_ctx.wait());
};
ASSERT_EQ(0, metadata->set_active_set(1));
ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
@ -844,3 +940,41 @@ TEST_F(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) {
ASSERT_EQ(expected_entries, entries);
}
TEST_F(TestJournalPlayer, PrefechShutDown) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
ASSERT_EQ(0, client_commit(oid, {}));
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
BOOST_SCOPE_EXIT_ALL( (player) ) {
C_SaferCond unwatch_ctx;
player->shut_down(&unwatch_ctx);
ASSERT_EQ(0, unwatch_ctx.wait());
};
player->prefetch();
}
TEST_F(TestJournalPlayer, LiveReplayShutDown) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
ASSERT_EQ(0, client_commit(oid, {}));
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
BOOST_SCOPE_EXIT_ALL( (player) ) {
C_SaferCond unwatch_ctx;
player->shut_down(&unwatch_ctx);
ASSERT_EQ(0, unwatch_ctx.wait());
};
player->prefetch_and_watch(0.25);
}

View File

@ -262,14 +262,11 @@ TEST_F(TestObjectPlayer, Unwatch) {
std::string oid = get_temp_oid();
journal::ObjectPlayerPtr object = create_object(oid, 14);
Mutex mutex("lock");
Cond cond;
bool done = false;
int rval = 0;
C_SafeCond *ctx = new C_SafeCond(&mutex, &cond, &done, &rval);
object->watch(ctx, 600);
C_SaferCond watch_ctx;
object->watch(&watch_ctx, 600);
usleep(200000);
ASSERT_FALSE(done);
object->unwatch();
ASSERT_EQ(-ECANCELED, watch_ctx.wait());
}