Merge remote-tracking branch 'gh/wip-journal-aio-rebased'

This commit is contained in:
Sage Weil 2012-02-13 14:31:17 -08:00
commit 12035cd4e3
11 changed files with 440 additions and 27 deletions

View File

@ -37,6 +37,7 @@ BuildRequires: perl
BuildRequires: gdbm
BuildRequires: pkgconfig
BuildRequires: python
BuildRequires: libaio-devel
#################################################################################
# specific

View File

@ -294,11 +294,25 @@ AC_ARG_WITH([ocf],
[with_ocf=no])
AM_CONDITIONAL(WITH_OCF, [ test "$with_ocf" = "yes" ])
# use libaio?
AC_ARG_WITH([libaio],
[AS_HELP_STRING([--without-libaio], [disable libaio use by journal])],
,
[with_libaio=yes])
AS_IF([test "x$with_libaio" != xno],
[AC_CHECK_LIB([aio], [io_submit], [true], AC_MSG_FAILURE([libaio not found]))])
AS_IF([test "x$with_libaio" != xno],
[AC_CHECK_HEADER([libaio.h])])
AS_IF([test "$with_libaio" = "yes"],
[AC_DEFINE([HAVE_LIBAIO], [1], [Defined if you don't have atomic_ops])])
AM_CONDITIONAL(WITH_LIBAIO, [ test "$with_libaio" = "yes" ])
# Checks for header files.
AC_HEADER_DIRENT
AC_HEADER_STDC
AC_HEADER_SYS_WAIT
# spirit?
AC_LANG([C++])

2
debian/control vendored
View File

@ -6,7 +6,7 @@ Vcs-Git: git://github.com/NewDreamNetwork/ceph.git
Vcs-Browser: https://github.com/NewDreamNetwork/ceph
Maintainer: Laszlo Boszormenyi (GCS) <gcs@debian.hu>
Uploaders: Sage Weil <sage@newdream.net>
Build-Depends: debhelper (>= 6.0.7~), autotools-dev, autoconf, automake, libfuse-dev, libboost-dev (>= 1.34), libedit-dev, libcrypto++-dev, libtool, libexpat1-dev, libfcgi-dev, libatomic-ops-dev, libgoogle-perftools-dev [i386 amd64], pkg-config, libgtkmm-2.4-dev, python, python-support, libcurl4-gnutls-dev, libkeyutils-dev, uuid-dev
Build-Depends: debhelper (>= 6.0.7~), autotools-dev, autoconf, automake, libfuse-dev, libboost-dev (>= 1.34), libedit-dev, libcrypto++-dev, libtool, libexpat1-dev, libfcgi-dev, libatomic-ops-dev, libgoogle-perftools-dev [i386 amd64], pkg-config, libgtkmm-2.4-dev, python, python-support, libcurl4-gnutls-dev, libkeyutils-dev, uuid-dev, libaio-dev
Standards-Version: 3.9.1
Package: ceph

View File

@ -1005,6 +1005,9 @@ libos_la_SOURCES = \
os/FlatIndex.cc
libos_la_CXXFLAGS= ${CRYPTO_CXXFLAGS} ${AM_CXXFLAGS}
libos_la_LIBADD = libglobal.la
if WITH_LIBAIO
libos_la_LIBADD += -laio
endif
noinst_LTLIBRARIES += libos.la
libosd_la_SOURCES = \

View File

@ -328,6 +328,7 @@ OPTION(filestore_split_multiple, OPT_INT, 2)
OPTION(filestore_update_collections, OPT_BOOL, false)
OPTION(filestore_blackhole, OPT_BOOL, false) // drop any new transactions on the floor
OPTION(journal_dio, OPT_BOOL, true)
OPTION(journal_aio, OPT_BOOL, true)
OPTION(journal_block_align, OPT_BOOL, true)
OPTION(journal_max_write_bytes, OPT_INT, 10 << 20)
OPTION(journal_max_write_entries, OPT_INT, 100)

View File

@ -44,6 +44,17 @@ int FileJournal::_open(bool forwrite, bool create)
{
int flags, ret;
if (aio && !directio) {
derr << "FileJournal::_open: aio not supported without directio; disabling aio" << dendl;
aio = false;
}
#ifndef HAVE_LIBAIO
if (aio) {
derr << "FileJournal::_open: libaio not compiled in; disabling aio" << dendl;
aio = false;
}
#endif
if (forwrite) {
flags = O_RDWR;
if (directio)
@ -64,18 +75,17 @@ int FileJournal::_open(bool forwrite, bool create)
fd = TEMP_FAILURE_RETRY(::open(fn.c_str(), flags, 0644));
if (fd < 0) {
int err = errno;
derr << "FileJournal::_open : unable to open journal: open() "
<< "failed: " << cpp_strerror(err) << dendl;
derr << "FileJournal::_open: unable to open journal: open() failed: "
<< cpp_strerror(err) << dendl;
return -err;
}
struct stat st;
ret = ::fstat(fd, &st);
if (ret) {
int err = errno;
derr << "FileJournal::_open: unable to fstat journal: "
<< cpp_strerror(err) << dendl;
return -err;
ret = errno;
derr << "FileJournal::_open: unable to fstat journal: " << cpp_strerror(ret) << dendl;
goto out_fd;
}
if (S_ISBLK(st.st_mode))
@ -84,7 +94,17 @@ int FileJournal::_open(bool forwrite, bool create)
ret = _open_file(st.st_size, st.st_blksize, create);
if (ret)
return ret;
goto out_fd;
#ifdef HAVE_LIBAIO
aio_ctx = 0;
ret = io_setup(128, &aio_ctx);
if (ret < 0) {
ret = errno;
derr << "FileJournal::_open: unable to setup io_context " << cpp_strerror(ret) << dendl;
goto out_fd;
}
#endif
/* We really want max_size to be a multiple of block_size. */
max_size -= max_size % block_size;
@ -92,8 +112,14 @@ int FileJournal::_open(bool forwrite, bool create)
dout(1) << "_open " << fn << " fd " << fd
<< ": " << max_size
<< " bytes, block size " << block_size
<< " bytes, directio = " << directio << dendl;
<< " bytes, directio = " << directio
<< ", aio = " << aio
<< dendl;
return 0;
out_fd:
TEMP_FAILURE_RETRY(::close(fd));
return ret;
}
int FileJournal::_open_block_device()
@ -550,6 +576,9 @@ void FileJournal::start_writer()
{
write_stop = false;
write_thread.create();
#ifdef HAVE_LIBAIO
write_finish_thread.create();
#endif
}
void FileJournal::stop_writer()
@ -558,9 +587,15 @@ void FileJournal::stop_writer()
{
write_stop = true;
write_cond.Signal();
#ifdef HAVE_LIBAIO
write_finish_cond.Signal();
#endif
}
write_lock.Unlock();
write_thread.join();
#ifdef HAVE_LIBAIO
write_finish_thread.join();
#endif
}
@ -832,7 +867,7 @@ int FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64
return 0;
}
int FileJournal::write_bl(off64_t& pos, bufferlist& bl)
void FileJournal::align_bl(off64_t pos, bufferlist& bl)
{
// make sure list segments are page aligned
if (directio && (!bl.is_page_aligned() ||
@ -844,6 +879,11 @@ int FileJournal::write_bl(off64_t& pos, bufferlist& bl)
assert((bl.length() & ~CEPH_PAGE_MASK) == 0);
assert((pos & ~CEPH_PAGE_MASK) == 0);
}
}
int FileJournal::write_bl(off64_t& pos, bufferlist& bl)
{
align_bl(pos, bl);
::lseek64(fd, pos, SEEK_SET);
int ret = bl.write_fd(fd);
@ -958,6 +998,7 @@ void FileJournal::do_write(bufferlist& bl)
write_lock.Lock();
writing = false;
write_empty_cond.Signal();
// wrap if we hit the end of the journal
if (pos == header.max_size)
@ -986,7 +1027,7 @@ void FileJournal::do_write(bufferlist& bl)
void FileJournal::flush()
{
write_lock.Lock();
while ((!writeq.empty() || writing) && !write_stop) {
while ((!writeq.empty() || writing)) {
dout(5) << "flush waiting for writeq to empty and writes to complete" << dendl;
write_empty_cond.Wait(write_lock);
}
@ -1013,6 +1054,28 @@ void FileJournal::write_thread_entry()
continue;
}
#ifdef HAVE_LIBAIO
if (aio) {
// should we back off to limit aios in flight? try to do this
// adaptively so that we submit larger aios once we have lots of
// them in flight.
int exp = MIN(aio_num * 2, 24);
long unsigned min_new = 1ull << exp;
long unsigned cur = throttle_bytes.get_current();
dout(20) << "write_thread_entry aio throttle: aio num " << aio_num << " bytes " << aio_bytes
<< " ... exp " << exp << " min_new " << min_new
<< " ... pending " << cur << dendl;
if (cur < min_new) {
dout(20) << "write_thread_entry deferring until more aios complete: "
<< aio_num << " aios with " << aio_bytes << " bytes needs " << min_new
<< " bytes to start a new aio (currently " << cur << " pending)" << dendl;
write_cond.Wait(write_lock);
dout(20) << "write_thread_entry woke up" << dendl;
continue;
}
}
#endif
uint64_t orig_ops = 0;
uint64_t orig_bytes = 0;
@ -1025,7 +1088,15 @@ void FileJournal::write_thread_entry()
continue;
}
assert(r == 0);
#ifdef HAVE_LIBAIO
if (aio)
do_aio_write(bl);
else
do_write(bl);
#else
do_write(bl);
#endif
put_throttle(orig_ops, orig_bytes);
}
@ -1034,6 +1105,238 @@ void FileJournal::write_thread_entry()
dout(10) << "write_thread_entry finish" << dendl;
}
#ifdef HAVE_LIBAIO
void FileJournal::do_aio_write(bufferlist& bl)
{
// nothing to do?
if (bl.length() == 0 && !must_write_header)
return;
buffer::ptr hbp;
if (must_write_header) {
must_write_header = false;
hbp = prepare_header();
}
if (!writing) {
writing = true;
write_finish_cond.Signal();
}
// entry
off64_t pos = write_pos;
dout(15) << "do_aio_write writing " << pos << "~" << bl.length()
<< (hbp.length() ? " + header":"")
<< dendl;
utime_t from = ceph_clock_now(g_ceph_context);
// split?
off64_t split = 0;
if (pos + bl.length() > header.max_size) {
bufferlist first, second;
split = header.max_size - pos;
first.substr_of(bl, 0, split);
second.substr_of(bl, split, bl.length() - split);
assert(first.length() + second.length() == bl.length());
dout(10) << "do_aio_write wrapping, first bit at " << pos << "~" << first.length() << dendl;
if (write_aio_bl(pos, first, 0)) {
derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
<< ") failed" << dendl;
ceph_abort();
}
assert(pos == header.max_size);
if (hbp.length()) {
// be sneaky: include the header in the second fragment
second.push_front(hbp);
pos = 0; // we included the header
} else
pos = get_top(); // no header, start after that
if (write_aio_bl(pos, second, writing_seq)) {
derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
<< ") failed" << dendl;
ceph_abort();
}
} else {
// header too?
if (hbp.length()) {
bufferlist hbl;
hbl.push_back(hbp);
loff_t pos = 0;
if (write_aio_bl(pos, hbl, 0)) {
derr << "FileJournal::do_aio_write: write_aio_bl(header) failed" << dendl;
ceph_abort();
}
}
if (write_aio_bl(pos, bl, writing_seq)) {
derr << "FileJournal::do_aio_write: write_aio_bl(pos=" << pos
<< ") failed" << dendl;
ceph_abort();
}
}
write_pos = pos;
if (write_pos == header.max_size)
write_pos = get_top();
assert(write_pos % header.alignment == 0);
}
/**
* write a buffer using aio
*
* @param seq seq to trigger when this aio completes. if 0, do not update any state
* on completion.
*/
int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq)
{
align_bl(pos, bl);
dout(20) << "write_aio_bl " << pos << "~" << bl.length() << " seq " << seq << dendl;
aio_queue.push_back(aio_info(bl, pos, seq));
aio_info& aio = aio_queue.back();
aio.iov = new iovec[aio.bl.buffers().size()];
int n = 0;
for (std::list<buffer::ptr>::const_iterator p = aio.bl.buffers().begin();
p != aio.bl.buffers().end();
++p, ++n) {
aio.iov[n].iov_base = (void *)p->c_str();
aio.iov[n].iov_len = p->length();
}
io_prep_pwritev(&aio.iocb, fd, aio.iov, n, pos);
dout(20) << "write_aio_bl .. " << aio.off << "~" << aio.len
<< " in " << n << dendl;
aio_num++;
aio_bytes += aio.len;
iocb *piocb = &aio.iocb;
int attempts = 10;
do {
int r = io_submit(aio_ctx, 1, &piocb);
if (r < 0) {
derr << "io_submit to " << aio.off << "~" << aio.len
<< " got " << cpp_strerror(r) << dendl;
if (r == -EAGAIN && attempts-- > 0) {
usleep(500);
continue;
}
assert(0 == "io_submit got unexpected error");
}
} while (false);
pos += aio.len;
return 0;
}
#endif
void FileJournal::write_finish_thread_entry()
{
#ifdef HAVE_LIBAIO
dout(10) << "write_finish_thread_entry enter" << dendl;
write_lock.Lock();
while (true) {
if (aio_queue.empty()) {
if (write_stop)
break;
dout(20) << "write_finish_thread_entry sleeping" << dendl;
write_finish_cond.Wait(write_lock);
continue;
}
write_lock.Unlock();
dout(20) << "write_finish_thread_entry waiting for aio(s)" << dendl;
io_event event[16];
int r = io_getevents(aio_ctx, 1, 16, event, NULL);
if (r < 0) {
if (r == -EINTR) {
dout(0) << "io_getevents got " << cpp_strerror(r) << dendl;
write_lock.Lock();
continue;
}
derr << "io_getevents got " << cpp_strerror(r) << dendl;
assert(0 == "got unexpected error from io_getevents");
}
write_lock.Lock();
for (int i=0; i<r; i++) {
aio_info *ai = (aio_info *)event[i].obj;
if (event[i].res != ai->len) {
derr << "aio to " << ai->off << "~" << ai->len
<< " got " << cpp_strerror(event[i].res) << dendl;
assert(0 == "unexpected aio error");
}
dout(10) << "write_finish_thread_entry aio " << ai->off << "~" << ai->len << " done" << dendl;
ai->done = true;
}
check_aio_completion();
}
write_lock.Unlock();
dout(10) << "write_finish_thread_entry exit" << dendl;
#endif
}
#ifdef HAVE_LIBAIO
/**
* check aio_wait for completed aio, and update state appropriately.
*/
void FileJournal::check_aio_completion()
{
assert(write_lock.is_locked());
dout(20) << "check_aio_completion" << dendl;
bool completed_something = false;
list<aio_info>::iterator p = aio_queue.begin();
while (p != aio_queue.end() && p->done) {
dout(20) << "check_aio_completion completed seq " << p->seq << " "
<< p->off << "~" << p->len << dendl;
if (p->seq) {
journaled_seq = p->seq;
completed_something = true;
}
aio_num--;
aio_bytes -= p->len;
aio_queue.erase(p++);
}
if (completed_something) {
// kick finisher?
// only if we haven't filled up recently!
if (full_state != FULL_NOTFULL) {
dout(10) << "check_aio_completion NOT queueing finisher seq " << journaled_seq
<< ", full_commit_seq|full_restart_seq" << dendl;
} else {
if (plug_journal_completions) {
dout(20) << "check_aio_completion NOT queueing finishers through seq " << journaled_seq
<< " due to completion plug" << dendl;
} else {
dout(20) << "check_aio_completion queueing finishers through seq " << journaled_seq << dendl;
queue_completions_thru(journaled_seq);
}
}
// maybe write queue was waiting for aio count to drop?
if (!writeq.empty())
write_cond.Signal();
// wake up flush?
if (aio_queue.empty()) {
writing = false;
write_empty_cond.Signal();
assert(aio_num == 0);
assert(aio_bytes == 0);
}
}
}
#endif
void FileJournal::submit_entry(uint64_t seq, bufferlist& e, int alignment, Context *oncommit)
{

View File

@ -25,6 +25,10 @@ using std::deque;
#include "common/Thread.h"
#include "common/Throttle.h"
#ifdef HAVE_LIBAIO
# include <libaio.h>
#endif
class FileJournal : public Journal {
public:
/*
@ -123,11 +127,35 @@ private:
off64_t max_size;
size_t block_size;
bool is_bdev;
bool directio;
bool directio, aio;
bool writing, must_write_header;
off64_t write_pos; // byte where the next entry to be written will go
off64_t read_pos; //
#ifdef HAVE_LIBAIO
/// state associated with an in-flight aio request
struct aio_info {
struct iocb iocb;
bufferlist bl;
struct iovec *iov;
bool done;
uint64_t off, len; ///< these are for debug only
uint64_t seq; ///< seq number to complete on aio completion, if non-zero
aio_info(bufferlist& b, uint64_t o, uint64_t s)
: iov(NULL), done(false), off(o), len(b.length()), seq(s) {
bl.claim(b);
}
~aio_info() {
delete[] iov;
}
};
io_context_t aio_ctx;
list<aio_info> aio_queue;
int aio_num, aio_bytes;
Cond write_finish_cond;
#endif
uint64_t last_committed_seq;
/*
@ -203,6 +231,13 @@ private:
int prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes);
void do_write(bufferlist& bl);
void write_finish_thread_entry();
void check_aio_completion();
void do_aio_write(bufferlist& bl);
int write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq);
void align_bl(off64_t pos, bufferlist& bl);
int write_bl(off64_t& pos, bufferlist& bl);
void wrap_read_bl(off64_t& pos, int64_t len, bufferlist& bl);
@ -216,18 +251,31 @@ private:
}
} write_thread;
class WriteFinisher : public Thread {
FileJournal *journal;
public:
WriteFinisher(FileJournal *fj) : journal(fj) {}
void *entry() {
journal->write_finish_thread_entry();
return 0;
}
} write_finish_thread;
off64_t get_top() {
return ROUND_UP_TO(sizeof(header), block_size);
}
public:
FileJournal(uuid_d fsid, Finisher *fin, Cond *sync_cond, const char *f, bool dio=false) :
FileJournal(uuid_d fsid, Finisher *fin, Cond *sync_cond, const char *f, bool dio=false, bool ai=true) :
Journal(fsid, fin, sync_cond), fn(f),
zero_buf(NULL),
max_size(0), block_size(0),
is_bdev(false),directio(dio),
is_bdev(false), directio(dio), aio(ai),
writing(false), must_write_header(false),
write_pos(0), read_pos(0),
#ifdef HAVE_LIBAIO
aio_num(0), aio_bytes(0),
#endif
last_committed_seq(0),
full_state(FULL_NOTFULL),
fd(-1),
@ -235,7 +283,8 @@ private:
plug_journal_completions(false),
write_lock("FileJournal::write_lock"),
write_stop(false),
write_thread(this) { }
write_thread(this),
write_finish_thread(this) { }
~FileJournal() {
delete[] zero_buf;
}

View File

@ -640,6 +640,7 @@ FileStore::FileStore(const std::string &base, const std::string &jdev) :
m_filestore_min_sync_interval(g_conf->filestore_min_sync_interval),
m_filestore_update_collections(g_conf->filestore_update_collections),
m_journal_dio(g_conf->journal_dio),
m_journal_aio(g_conf->journal_aio),
m_osd_rollback_to_cluster_snap(g_conf->osd_rollback_to_cluster_snap),
m_osd_use_stale_snap(g_conf->osd_use_stale_snap),
m_filestore_queue_max_ops(g_conf->filestore_queue_max_ops),
@ -763,7 +764,8 @@ int FileStore::open_journal()
{
if (journalpath.length()) {
dout(10) << "open_journal at " << journalpath << dendl;
journal = new FileJournal(fsid, &finisher, &sync_cond, journalpath.c_str(), m_journal_dio);
journal = new FileJournal(fsid, &finisher, &sync_cond, journalpath.c_str(),
m_journal_dio, m_journal_aio);
if (journal)
journal->logger = logger;
}

View File

@ -398,7 +398,7 @@ private:
double m_filestore_max_sync_interval;
double m_filestore_min_sync_interval;
bool m_filestore_update_collections;
bool m_journal_dio;
bool m_journal_dio, m_journal_aio;
std::string m_osd_rollback_to_cluster_snap;
bool m_osd_use_stale_snap;
int m_filestore_queue_max_ops;

View File

@ -40,12 +40,20 @@ void throttle()
cond.Wait(lock);
}
}
double total_ack = 0;
double total_commit = 0;
int total_num = 0;
void pr(off_t off)
{
io &i = writes[off];
cout << off << "\t"
if (false) cout << off << "\t"
<< (i.ack - i.start) << "\t"
<< (i.commit - i.start) << std::endl;
total_num++;
total_ack += (i.ack - i.start);
total_commit += (i.commit - i.start);
writes.erase(off);
cond.Signal();
}
@ -140,6 +148,7 @@ int main(int argc, const char **argv)
fs->apply_transaction(ft);
utime_t now = ceph_clock_now(g_ceph_context);
utime_t start = now;
utime_t end = now;
end += seconds;
off_t pos = 0;
@ -156,6 +165,8 @@ int main(int argc, const char **argv)
throttle();
now = ceph_clock_now(g_ceph_context);
// wait?
/*
utime_t next = start;
@ -169,6 +180,11 @@ int main(int argc, const char **argv)
*/
}
cout << "total num " << total_num << std::endl;
cout << "avg ack\t" << (total_ack / (double)total_num) << std::endl;
cout << "avg commit\t" << (total_commit / (double)total_num) << std::endl;
cout << "tput\t" << prettybyte_t((double)(total_num * bytes) / (double)(end-start)) << "/sec" << std::endl;
fs->umount();
}

View File

@ -16,6 +16,7 @@ Cond sync_cond;
char path[200];
uuid_d fsid;
bool directio = false;
bool aio = false;
// ----
Cond cond;
@ -75,13 +76,20 @@ int main(int argc, char **argv) {
finisher->start();
cout << "DIRECTIO OFF" << std::endl;
cout << "DIRECTIO OFF AIO OFF" << std::endl;
directio = false;
aio = false;
int r = RUN_ALL_TESTS();
if (r >= 0) {
cout << "DIRECTIO ON" << std::endl;
cout << "DIRECTIO ON AIO OFF" << std::endl;
directio = true;
r = RUN_ALL_TESTS();
if (r >= 0) {
cout << "DIRECTIO ON AIO ON" << std::endl;
aio = true;
r = RUN_ALL_TESTS();
}
}
finisher->stop();
@ -93,13 +101,13 @@ int main(int argc, char **argv) {
TEST(TestFileJournal, Create) {
fsid.generate_random();
FileJournal j(fsid, finisher, &sync_cond, path, directio);
FileJournal j(fsid, finisher, &sync_cond, path, directio, aio);
ASSERT_EQ(0, j.create());
}
TEST(TestFileJournal, WriteSmall) {
fsid.generate_random();
FileJournal j(fsid, finisher, &sync_cond, path, directio);
FileJournal j(fsid, finisher, &sync_cond, path, directio, aio);
ASSERT_EQ(0, j.create());
j.make_writeable();
@ -113,7 +121,7 @@ TEST(TestFileJournal, WriteSmall) {
TEST(TestFileJournal, WriteBig) {
fsid.generate_random();
FileJournal j(fsid, finisher, &sync_cond, path, directio);
FileJournal j(fsid, finisher, &sync_cond, path, directio, aio);
ASSERT_EQ(0, j.create());
j.make_writeable();
@ -131,7 +139,7 @@ TEST(TestFileJournal, WriteBig) {
TEST(TestFileJournal, WriteMany) {
fsid.generate_random();
FileJournal j(fsid, finisher, &sync_cond, path, directio);
FileJournal j(fsid, finisher, &sync_cond, path, directio, aio);
ASSERT_EQ(0, j.create());
j.make_writeable();
@ -154,7 +162,7 @@ TEST(TestFileJournal, WriteMany) {
TEST(TestFileJournal, ReplaySmall) {
fsid.generate_random();
FileJournal j(fsid, finisher, &sync_cond, path, directio);
FileJournal j(fsid, finisher, &sync_cond, path, directio, aio);
ASSERT_EQ(0, j.create());
j.make_writeable();
@ -175,11 +183,22 @@ TEST(TestFileJournal, ReplaySmall) {
j.open(1);
bufferlist inbl;
string v;
uint64_t seq = 0;
ASSERT_EQ(true, j.read_entry(inbl, seq));
ASSERT_EQ(seq, 2ull);
inbl.copy(0, inbl.length(), v);
ASSERT_EQ("small", v);
inbl.clear();
v.clear();
ASSERT_EQ(true, j.read_entry(inbl, seq));
ASSERT_EQ(seq, 3ull);
inbl.copy(0, inbl.length(), v);
ASSERT_EQ("small", v);
inbl.clear();
v.clear();
ASSERT_TRUE(!j.read_entry(inbl, seq));
j.make_writeable();
@ -188,7 +207,7 @@ TEST(TestFileJournal, ReplaySmall) {
TEST(TestFileJournal, ReplayCorrupt) {
fsid.generate_random();
FileJournal j(fsid, finisher, &sync_cond, path, directio);
FileJournal j(fsid, finisher, &sync_cond, path, directio, aio);
ASSERT_EQ(0, j.create());
j.make_writeable();
@ -239,9 +258,14 @@ TEST(TestFileJournal, ReplayCorrupt) {
j.open(1);
bufferlist inbl;
string v;
uint64_t seq = 0;
ASSERT_EQ(true, j.read_entry(inbl, seq));
ASSERT_EQ(seq, 2ull);
inbl.copy(0, inbl.length(), v);
ASSERT_EQ(needle, v);
inbl.clear();
v.clear();
ASSERT_TRUE(!j.read_entry(inbl, seq));
j.make_writeable();
@ -250,7 +274,7 @@ TEST(TestFileJournal, ReplayCorrupt) {
TEST(TestFileJournal, WriteTrim) {
fsid.generate_random();
FileJournal j(fsid, finisher, &sync_cond, path, directio);
FileJournal j(fsid, finisher, &sync_cond, path, directio, aio);
ASSERT_EQ(0, j.create());
j.make_writeable();