Merge remote-tracking branch 'gh/next'

This commit is contained in:
Sage Weil 2013-01-02 18:13:25 -08:00
commit 6b5a89d237
4 changed files with 97 additions and 40 deletions

View File

@ -155,14 +155,15 @@ int main(int argc, const char **argv, const char *envp[]) {
}
r = cfuse.init(newargc, newargv);
if (r < 0) {
if (r != 0) {
cerr << "ceph-fuse[" << getpid() << "]: fuse failed to initialize" << std::endl;
goto out_shutdown;
goto out_client_unmount;
}
cerr << "ceph-fuse[" << getpid() << "]: starting fuse" << std::endl;
r = cfuse.loop();
cerr << "ceph-fuse[" << getpid() << "]: fuse finished with error " << r << std::endl;
out_client_unmount:
client->unmount();
//cout << "unmounted" << std::endl;

View File

@ -252,7 +252,7 @@ void Log::_log_message(const char *s, bool crash)
void Log::dump_recent()
{
pthread_mutex_unlock(&m_flush_mutex);
pthread_mutex_lock(&m_flush_mutex);
pthread_mutex_lock(&m_queue_mutex);
EntryQueue t;

View File

@ -1147,8 +1147,10 @@ void FileJournal::write_thread_entry()
}
assert(r == 0);
logger->inc(l_os_j_wr);
logger->inc(l_os_j_wr_bytes, bl.length());
if (logger) {
logger->inc(l_os_j_wr);
logger->inc(l_os_j_wr_bytes, bl.length());
}
#ifdef HAVE_LIBAIO
if (aio)
@ -1249,40 +1251,51 @@ int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq)
dout(20) << "write_aio_bl " << pos << "~" << bl.length() << " seq " << seq << dendl;
aio_queue.push_back(aio_info(bl, pos, seq));
aio_info& aio = aio_queue.back();
aio.iov = new iovec[aio.bl.buffers().size()];
int n = 0;
for (std::list<buffer::ptr>::const_iterator p = aio.bl.buffers().begin();
p != aio.bl.buffers().end();
++p, ++n) {
aio.iov[n].iov_base = (void *)p->c_str();
aio.iov[n].iov_len = p->length();
}
io_prep_pwritev(&aio.iocb, fd, aio.iov, n, pos);
dout(20) << "write_aio_bl .. " << aio.off << "~" << aio.len
<< " in " << n << dendl;
aio_num++;
aio_bytes += aio.len;
iocb *piocb = &aio.iocb;
int attempts = 10;
do {
int r = io_submit(aio_ctx, 1, &piocb);
if (r < 0) {
derr << "io_submit to " << aio.off << "~" << aio.len
<< " got " << cpp_strerror(r) << dendl;
if (r == -EAGAIN && attempts-- > 0) {
usleep(500);
continue;
}
assert(0 == "io_submit got unexpected error");
while (bl.length() > 0) {
int max = MIN(bl.buffers().size(), IOV_MAX-1);
iovec *iov = new iovec[max];
int n = 0;
unsigned len = 0;
for (std::list<buffer::ptr>::const_iterator p = bl.buffers().begin();
n < max;
++p, ++n) {
assert(p != bl.buffers().end());
iov[n].iov_base = (void *)p->c_str();
iov[n].iov_len = p->length();
len += p->length();
}
} while (false);
pos += aio.len;
bufferlist tbl;
bl.splice(0, len, &tbl); // move bytes from bl -> tbl
aio_queue.push_back(aio_info(tbl, pos, bl.length() > 0 ? 0 : seq));
aio_info& aio = aio_queue.back();
aio.iov = iov;
io_prep_pwritev(&aio.iocb, fd, aio.iov, n, pos);
dout(20) << "write_aio_bl .. " << aio.off << "~" << aio.len
<< " in " << n << dendl;
aio_num++;
aio_bytes += aio.len;
iocb *piocb = &aio.iocb;
int attempts = 10;
do {
int r = io_submit(aio_ctx, 1, &piocb);
if (r < 0) {
derr << "io_submit to " << aio.off << "~" << aio.len
<< " got " << cpp_strerror(r) << dendl;
if (r == -EAGAIN && attempts-- > 0) {
usleep(500);
continue;
}
assert(0 == "io_submit got unexpected error");
}
} while (false);
pos += aio.len;
}
write_finish_cond.Signal();
return 0;
}

View File

@ -1,5 +1,6 @@
#include <gtest/gtest.h>
#include <stdlib.h>
#include <limits.h>
#include "common/ceph_argparse.h"
#include "common/common_init.h"
@ -69,8 +70,13 @@ int main(int argc, char **argv) {
finisher = new Finisher(g_ceph_context);
srand(getpid()+time(0));
snprintf(path, sizeof(path), "/tmp/test_filejournal.tmp.%d", rand());
if (args.size()) {
strcpy(path, args[0]);
} else {
srand(getpid()+time(0));
snprintf(path, sizeof(path), "/tmp/test_filejournal.tmp.%d", rand());
}
cout << "path " << path << std::endl;
::testing::InitGoogleTest(&argc, argv);
@ -160,6 +166,43 @@ TEST(TestFileJournal, WriteMany) {
j.close();
}
TEST(TestFileJournal, WriteManyVecs) {
fsid.generate_random();
FileJournal j(fsid, finisher, &sync_cond, path, directio, aio);
ASSERT_EQ(0, j.create());
j.make_writeable();
C_GatherBuilder gb(g_ceph_context, new C_SafeCond(&lock, &cond, &done));
bufferlist first;
first.append("small");
j.submit_entry(1, first, 0, gb.new_sub());
bufferlist bl;
for (int i=0; i<IOV_MAX * 2; i++) {
bufferptr bp = buffer::create_page_aligned(4096);
memset(bp.c_str(), (char)i, 4096);
bl.append(bp);
}
bufferlist origbl = bl;
j.submit_entry(2, bl, 0, gb.new_sub());
gb.activate();
wait();
j.close();
j.open(1);
bufferlist inbl;
string v;
uint64_t seq = 0;
ASSERT_EQ(true, j.read_entry(inbl, seq));
ASSERT_EQ(seq, 2ull);
ASSERT_TRUE(inbl.contents_equal(origbl));
j.make_writeable();
j.close();
}
TEST(TestFileJournal, ReplaySmall) {
fsid.generate_random();
FileJournal j(fsid, finisher, &sync_cond, path, directio, aio);