mirror of
https://github.com/ceph/ceph
synced 2024-12-14 23:46:28 +00:00
ebofs: journal cleanup, tuning params in g_conf
This commit is contained in:
parent
219710e1c0
commit
0e77815a00
@ -349,6 +349,9 @@ md_config_t g_conf = {
|
||||
ebofs_max_prefetch: 1000, // 4k blocks
|
||||
ebofs_realloc: false, // hrm, this can cause bad fragmentation, don't use!
|
||||
ebofs_verify_csum_on_read: true,
|
||||
ebofs_journal_dio: false,
|
||||
ebofs_journal_max_write_bytes: 0,
|
||||
ebofs_journal_max_write_entries: 10,
|
||||
|
||||
// --- block device ---
|
||||
bdev_lock: true,
|
||||
@ -804,7 +807,12 @@ void parse_config_options(std::vector<const char*>& args)
|
||||
g_conf.ebofs_max_prefetch = atoi(args[++i]);
|
||||
else if (strcmp(args[i], "--ebofs_realloc") == 0)
|
||||
g_conf.ebofs_realloc = atoi(args[++i]);
|
||||
|
||||
else if (strcmp(args[i], "--ebofs_journal_dio") == 0)
|
||||
g_conf.ebofs_journal_dio = atoi(args[++i]);
|
||||
else if (strcmp(args[i], "--ebofs_journal_max_write_entries") == 0)
|
||||
g_conf.ebofs_journal_max_write_entries = atoi(args[++i]);
|
||||
else if (strcmp(args[i], "--ebofs_journal_max_write_bytes") == 0)
|
||||
g_conf.ebofs_journal_max_write_bytes = atoi(args[++i]);
|
||||
|
||||
else if (strcmp(args[i], "--fakestore") == 0) {
|
||||
g_conf.ebofs = 0;
|
||||
|
@ -287,7 +287,10 @@ struct md_config_t {
|
||||
unsigned ebofs_max_prefetch;
|
||||
bool ebofs_realloc;
|
||||
bool ebofs_verify_csum_on_read;
|
||||
|
||||
bool ebofs_journal_dio;
|
||||
bool ebofs_journal_max_write_bytes;
|
||||
bool ebofs_journal_max_write_entries;
|
||||
|
||||
// block device
|
||||
bool bdev_lock;
|
||||
int bdev_iothreads;
|
||||
|
@ -124,8 +124,7 @@ int Ebofs::mount()
|
||||
|
||||
// open journal?
|
||||
if (journalfn) {
|
||||
journal = new FileJournal(this, journalfn);
|
||||
//journal = new DioJournal(this, journalfn);
|
||||
journal = new FileJournal(this, journalfn, g_conf.ebofs_journal_dio);
|
||||
if (journal->open() < 0) {
|
||||
dout(3) << "mount journal " << journalfn << " open failed" << dendl;
|
||||
delete journal;
|
||||
@ -270,7 +269,7 @@ int Ebofs::mkfs()
|
||||
|
||||
// create journal?
|
||||
if (journalfn) {
|
||||
Journal *journal = new FileJournal(this, journalfn);
|
||||
Journal *journal = new FileJournal(this, journalfn, g_conf.ebofs_journal_dio);
|
||||
if (journal->create() < 0) {
|
||||
dout(3) << "mount journal " << journalfn << " created failed" << dendl;
|
||||
} else {
|
||||
@ -559,7 +558,7 @@ int Ebofs::commit_thread_entry()
|
||||
}
|
||||
|
||||
// signal journal
|
||||
if (journal) journal->commit_epoch_finish();
|
||||
if (journal) journal->commit_epoch_finish(super_epoch);
|
||||
|
||||
// kick waiters
|
||||
dout(10) << "commit_thread queueing commit + kicking sync waiters" << dendl;
|
||||
|
@ -27,32 +27,41 @@
|
||||
#define derr(x) if (x <= g_conf.debug_ebofs) *_derr << dbeginl << g_clock.now() << " ebofs(" << ebofs->dev.get_device_name() << ").journal "
|
||||
|
||||
|
||||
int FileJournal::create()
|
||||
int FileJournal::_open()
|
||||
{
|
||||
dout(2) << "create " << fn << dendl;
|
||||
|
||||
// open/create
|
||||
fd = ::open(fn.c_str(), O_RDWR|O_SYNC);
|
||||
int flags = O_RDWR;
|
||||
if (directio) flags |= O_DIRECT;
|
||||
|
||||
fd = ::open(fn.c_str(), flags);
|
||||
if (fd < 0) {
|
||||
dout(2) << "create failed " << errno << " " << strerror(errno) << dendl;
|
||||
dout(2) << "_open failed " << errno << " " << strerror(errno) << dendl;
|
||||
return -errno;
|
||||
}
|
||||
assert(fd > 0);
|
||||
|
||||
//::ftruncate(fd, 0);
|
||||
//::fchmod(fd, 0644);
|
||||
|
||||
// get size
|
||||
struct stat st;
|
||||
::fstat(fd, &st);
|
||||
max_size = st.st_size;
|
||||
block_size = st.st_blksize;
|
||||
dout(2) << "create " << fn << " " << st.st_size << " bytes, block size " << block_size << dendl;
|
||||
dout(2) << "_open " << fn << " " << st.st_size << " bytes, block size " << block_size << dendl;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int FileJournal::create()
|
||||
{
|
||||
dout(2) << "create " << fn << dendl;
|
||||
|
||||
int err = _open();
|
||||
if (err < 0) return err;
|
||||
|
||||
// write empty header
|
||||
memset(&header, 0, sizeof(header));
|
||||
header.clear();
|
||||
header.fsid = ebofs->get_fsid();
|
||||
header.max_size = st.st_size;
|
||||
header.max_size = max_size;
|
||||
header.block_size = block_size;
|
||||
write_header();
|
||||
|
||||
// writeable.
|
||||
@ -60,22 +69,16 @@ int FileJournal::create()
|
||||
write_pos = get_top();
|
||||
|
||||
::close(fd);
|
||||
|
||||
dout(2) << "create done" << dendl;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int FileJournal::open()
|
||||
{
|
||||
//dout(1) << "open " << fn << dendl;
|
||||
dout(2) << "open " << fn << dendl;
|
||||
|
||||
// open and file
|
||||
assert(fd == 0);
|
||||
fd = ::open(fn.c_str(), O_RDWR|O_SYNC);
|
||||
if (fd < 0) {
|
||||
dout(2) << "open failed " << errno << " " << strerror(errno) << dendl;
|
||||
return -errno;
|
||||
}
|
||||
assert(fd > 0);
|
||||
int err = _open();
|
||||
if (err < 0) return err;
|
||||
|
||||
// assume writeable, unless...
|
||||
read_pos = 0;
|
||||
@ -86,8 +89,16 @@ int FileJournal::open()
|
||||
if (header.fsid != ebofs->get_fsid()) {
|
||||
dout(2) << "open journal fsid doesn't match, invalid (someone else's?) journal" << dendl;
|
||||
}
|
||||
else if (header.max_size > max_size) {
|
||||
dout(2) << "open old header has mismatched max size, discarding" << dendl;
|
||||
}
|
||||
else if (header.block_size != block_size) {
|
||||
dout(2) << "open old header has mismatched block size, discarding" << dendl;
|
||||
}
|
||||
else if (header.num > 0) {
|
||||
// valid header, pick an offset
|
||||
// valid header.
|
||||
|
||||
// pick an offset
|
||||
for (int i=0; i<header.num; i++) {
|
||||
if (header.epoch[i] == ebofs->get_super_epoch()) {
|
||||
dout(2) << "using read_pos header pointer "
|
||||
@ -110,7 +121,7 @@ int FileJournal::open()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
write_header();
|
||||
start_writer();
|
||||
|
||||
return 0;
|
||||
@ -160,43 +171,222 @@ void FileJournal::print_header()
|
||||
}
|
||||
//if (header.wrap) dout(10) << "header: wrap at " << header.wrap << dendl;
|
||||
}
|
||||
|
||||
void FileJournal::read_header()
|
||||
{
|
||||
int r;
|
||||
dout(10) << "read_header" << dendl;
|
||||
::lseek(fd, 0, SEEK_SET);
|
||||
if (directio) {
|
||||
buffer::ptr bp = buffer::create_page_aligned(block_size);
|
||||
bp.zero();
|
||||
r = ::read(fd, bp.c_str(), bp.length());
|
||||
r = ::pread(fd, bp.c_str(), bp.length(), 0);
|
||||
memcpy(&header, bp.c_str(), sizeof(header));
|
||||
} else {
|
||||
memset(&header, 0, sizeof(header)); // zero out (read may fail)
|
||||
r = ::read(fd, &header, sizeof(header));
|
||||
r = ::pread(fd, &header, sizeof(header), 0);
|
||||
}
|
||||
if (r < 0)
|
||||
dout(0) << "read_header error " << errno << " " << strerror(errno) << dendl;
|
||||
print_header();
|
||||
}
|
||||
void FileJournal::write_header()
|
||||
|
||||
bufferptr FileJournal::prepare_header()
|
||||
{
|
||||
int r;
|
||||
dout(10) << "write_header " << dendl;
|
||||
print_header();
|
||||
::lseek(fd, 0, SEEK_SET);
|
||||
bufferptr bp;
|
||||
if (directio) {
|
||||
buffer::ptr bp = buffer::create_page_aligned(block_size);
|
||||
bp = buffer::create_page_aligned(block_size);
|
||||
bp.zero();
|
||||
memcpy(bp.c_str(), &header, sizeof(header));
|
||||
r = ::write(fd, bp.c_str(), bp.length());
|
||||
} else {
|
||||
r = ::write(fd, &header, sizeof(header));
|
||||
bp = buffer::create(sizeof(header));
|
||||
memcpy(bp.c_str(), &header, sizeof(header));
|
||||
}
|
||||
return bp;
|
||||
}
|
||||
|
||||
void FileJournal::write_header()
|
||||
{
|
||||
buffer::ptr bp = prepare_header();
|
||||
dout(10) << "write_header writing " << bp.length() << dendl;
|
||||
print_header();
|
||||
|
||||
int r = ::pwrite(fd, bp.c_str(), bp.length(), 0);
|
||||
if (r < 0)
|
||||
dout(0) << "write_header error " << errno << " " << strerror(errno) << dendl;
|
||||
}
|
||||
|
||||
|
||||
|
||||
void FileJournal::check_for_wrap(epoch_t epoch, off_t pos, off_t size)
|
||||
{
|
||||
// epoch boundary?
|
||||
dout(10) << "check_for_wrap epoch " << epoch << " last " << header.last_epoch() << " of " << header.num << dendl;
|
||||
if (epoch > header.last_epoch()) {
|
||||
dout(10) << "saw an epoch boundary " << header.last_epoch() << " -> " << epoch << dendl;
|
||||
header.push(epoch, pos);
|
||||
must_write_header = true;
|
||||
}
|
||||
|
||||
// does it fit?
|
||||
if (header.wrap) {
|
||||
// we're wrapped. don't overwrite ourselves.
|
||||
if (pos + size >= header.offset[0]) {
|
||||
dout(10) << "JOURNAL FULL (and wrapped), " << pos << "+" << size
|
||||
<< " >= " << header.offset[0]
|
||||
<< dendl;
|
||||
full = true;
|
||||
writeq.clear();
|
||||
print_header();
|
||||
}
|
||||
} else {
|
||||
// we haven't wrapped.
|
||||
if (pos + size >= header.max_size) {
|
||||
// is there room if we wrap?
|
||||
if (get_top() + size < header.offset[0]) {
|
||||
// yes!
|
||||
dout(10) << "wrapped from " << pos << " to " << get_top() << dendl;
|
||||
header.wrap = pos;
|
||||
pos = get_top();
|
||||
header.push(ebofs->get_super_epoch(), pos);
|
||||
must_write_header = true;
|
||||
} else {
|
||||
// no room.
|
||||
dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << pos << "+" << size
|
||||
<< " >= " << header.max_size
|
||||
<< dendl;
|
||||
full = true;
|
||||
writeq.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void FileJournal::prepare_multi_write(bufferlist& bl)
|
||||
{
|
||||
// gather queued writes
|
||||
off_t queue_pos = write_pos;
|
||||
|
||||
int eleft = g_conf.ebofs_journal_max_write_entries;
|
||||
int bleft = g_conf.ebofs_journal_max_write_bytes;
|
||||
|
||||
while (!writeq.empty()) {
|
||||
// grab next item
|
||||
epoch_t epoch = writeq.front().first;
|
||||
bufferlist &ebl = writeq.front().second;
|
||||
off_t size = 2*sizeof(entry_header_t) + ebl.length();
|
||||
|
||||
if (bl.length() > 0 && bleft > 0 && bleft < size) break;
|
||||
|
||||
check_for_wrap(epoch, queue_pos, size);
|
||||
if (full) break;
|
||||
if (bl.length() && must_write_header)
|
||||
break;
|
||||
|
||||
// add to write buffer
|
||||
dout(15) << "prepare_multi_write will write " << queue_pos << " : "
|
||||
<< ebl.length()
|
||||
<< " epoch " << epoch
|
||||
<< dendl;
|
||||
|
||||
// add it this entry
|
||||
entry_header_t h;
|
||||
h.epoch = epoch;
|
||||
h.len = ebl.length();
|
||||
h.make_magic(write_pos, header.fsid);
|
||||
bl.append((const char*)&h, sizeof(h));
|
||||
bl.claim_append(ebl);
|
||||
bl.append((const char*)&h, sizeof(h));
|
||||
|
||||
Context *oncommit = commitq.front();
|
||||
if (oncommit)
|
||||
writingq.push_back(oncommit);
|
||||
|
||||
// pop from writeq
|
||||
writeq.pop_front();
|
||||
commitq.pop_front();
|
||||
if (--eleft == 0) break;
|
||||
bleft -= size;
|
||||
if (bleft == 0) break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
bool FileJournal::prepare_single_dio_write(bufferlist& bl)
|
||||
{
|
||||
// grab next item
|
||||
epoch_t epoch = writeq.front().first;
|
||||
bufferlist &ebl = writeq.front().second;
|
||||
|
||||
off_t size = 2*sizeof(entry_header_t) + ebl.length();
|
||||
size = DIV_ROUND_UP(size, 4096) * 4096;
|
||||
|
||||
check_for_wrap(epoch, write_pos, size);
|
||||
if (full) return false;
|
||||
|
||||
// build it
|
||||
dout(15) << "prepare_single_dio_write will write " << write_pos << " : "
|
||||
<< size << " epoch " << epoch << dendl;
|
||||
|
||||
bufferptr bp = buffer::create_page_aligned(size);
|
||||
entry_header_t *h = (entry_header_t*)bp.c_str();
|
||||
h->epoch = epoch;
|
||||
h->len = ebl.length();
|
||||
h->make_magic(write_pos, header.fsid);
|
||||
ebl.copy(0, ebl.length(), bp.c_str()+sizeof(*h));
|
||||
memcpy(bp.c_str() + sizeof(*h) + ebl.length(), h, sizeof(*h));
|
||||
bl.push_back(bp);
|
||||
|
||||
Context *oncommit = commitq.front();
|
||||
if (oncommit)
|
||||
writingq.push_back(oncommit);
|
||||
|
||||
// pop from writeq
|
||||
writeq.pop_front();
|
||||
commitq.pop_front();
|
||||
return true;
|
||||
}
|
||||
|
||||
void FileJournal::do_write(bufferlist& bl)
|
||||
{
|
||||
// nothing to do?
|
||||
if (bl.length() == 0 && !must_write_header)
|
||||
return;
|
||||
|
||||
buffer::ptr hbp;
|
||||
if (must_write_header)
|
||||
hbp = prepare_header();
|
||||
|
||||
writing = true;
|
||||
|
||||
write_lock.Unlock();
|
||||
|
||||
dout(15) << "do_write writing " << write_pos << "~" << bl.length()
|
||||
<< (must_write_header ? " + header":"")
|
||||
<< dendl;
|
||||
|
||||
// header
|
||||
if (hbp.length())
|
||||
::pwrite(fd, hbp.c_str(), hbp.length(), 0);
|
||||
|
||||
// entry
|
||||
off_t pos = write_pos;
|
||||
for (list<bufferptr>::const_iterator it = bl.buffers().begin();
|
||||
it != bl.buffers().end();
|
||||
it++) {
|
||||
if ((*it).length() == 0) continue; // blank buffer.
|
||||
::pwrite(fd, (char*)(*it).c_str(), (*it).length(), pos);
|
||||
pos += (*it).length();
|
||||
}
|
||||
|
||||
write_lock.Lock();
|
||||
|
||||
writing = false;
|
||||
write_pos += bl.length();
|
||||
ebofs->queue_finishers(writingq);
|
||||
}
|
||||
|
||||
|
||||
void FileJournal::write_thread_entry()
|
||||
{
|
||||
dout(10) << "write_thread_entry start" << dendl;
|
||||
@ -211,210 +401,19 @@ void FileJournal::write_thread_entry()
|
||||
continue;
|
||||
}
|
||||
|
||||
// gather queued writes
|
||||
off_t queue_pos = write_pos;
|
||||
bufferlist bl;
|
||||
|
||||
while (!writeq.empty()) {
|
||||
// grab next item
|
||||
epoch_t epoch = writeq.front().first;
|
||||
bufferlist &ebl = writeq.front().second;
|
||||
off_t size = 2*sizeof(entry_header_t) + ebl.length();
|
||||
|
||||
// epoch boundary?
|
||||
if (epoch > header.last_epoch()) {
|
||||
dout(10) << "saw an epoch boundary " << header.last_epoch() << " -> " << epoch << dendl;
|
||||
header.push(epoch, queue_pos);
|
||||
}
|
||||
|
||||
// does it fit?
|
||||
if (header.wrap) {
|
||||
// we're wrapped. don't overwrite ourselves.
|
||||
if (queue_pos + size >= header.offset[0]) {
|
||||
if (queue_pos != write_pos) break; // do what we have, first
|
||||
dout(10) << "JOURNAL FULL (and wrapped), " << queue_pos << "+" << size
|
||||
<< " >= " << header.offset[0]
|
||||
<< dendl;
|
||||
full = true;
|
||||
writeq.clear();
|
||||
print_header();
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// we haven't wrapped.
|
||||
if (queue_pos + size >= header.max_size) {
|
||||
if (queue_pos != write_pos) break; // do what we have, first
|
||||
// is there room if we wrap?
|
||||
if ((off_t)sizeof(header_t) + size < header.offset[0]) {
|
||||
// yes!
|
||||
dout(10) << "wrapped from " << queue_pos << " to " << get_top() << dendl;
|
||||
header.wrap = queue_pos;
|
||||
queue_pos = get_top();
|
||||
header.push(ebofs->get_super_epoch(), queue_pos);
|
||||
write_header();
|
||||
} else {
|
||||
// no room.
|
||||
dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << queue_pos << "+" << size
|
||||
<< " >= " << header.max_size
|
||||
<< dendl;
|
||||
full = true;
|
||||
writeq.clear();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// add to write buffer
|
||||
dout(15) << "write_thread_entry will write " << queue_pos << " : "
|
||||
<< ebl.length()
|
||||
<< " epoch " << epoch
|
||||
<< dendl;
|
||||
|
||||
// add it this entry
|
||||
entry_header_t h;
|
||||
h.epoch = epoch;
|
||||
h.len = ebl.length();
|
||||
h.make_magic(write_pos, header.fsid);
|
||||
bl.append((const char*)&h, sizeof(h));
|
||||
bl.claim_append(ebl);
|
||||
bl.append((const char*)&h, sizeof(h));
|
||||
|
||||
Context *oncommit = commitq.front();
|
||||
if (oncommit)
|
||||
writingq.push_back(oncommit);
|
||||
|
||||
// pop from writeq
|
||||
writeq.pop_front();
|
||||
commitq.pop_front();
|
||||
break;
|
||||
}
|
||||
|
||||
// write anything?
|
||||
if (bl.length() > 0) {
|
||||
writing = true;
|
||||
//write_lock.Unlock();
|
||||
dout(15) << "write_thread_entry writing " << write_pos << "~" << bl.length() << dendl;
|
||||
|
||||
::lseek(fd, write_pos, SEEK_SET);
|
||||
for (list<bufferptr>::const_iterator it = bl.buffers().begin();
|
||||
it != bl.buffers().end();
|
||||
it++) {
|
||||
if ((*it).length() == 0) continue; // blank buffer.
|
||||
::write(fd, (char*)(*it).c_str(), (*it).length() );
|
||||
}
|
||||
|
||||
//write_lock.Lock();
|
||||
writing = false;
|
||||
write_pos = queue_pos;
|
||||
ebofs->queue_finishers(writingq);
|
||||
}
|
||||
must_write_header = false;
|
||||
if (directio)
|
||||
prepare_single_dio_write(bl);
|
||||
else
|
||||
prepare_multi_write(bl);
|
||||
do_write(bl);
|
||||
}
|
||||
|
||||
write_lock.Unlock();
|
||||
dout(10) << "write_thread_entry finish" << dendl;
|
||||
}
|
||||
|
||||
void FileJournal::write_thread_entry_dio()
|
||||
{
|
||||
dout(10) << "write_thread_entry start" << dendl;
|
||||
write_lock.Lock();
|
||||
|
||||
while (!write_stop) {
|
||||
if (writeq.empty()) {
|
||||
// sleep
|
||||
dout(20) << "write_thread_entry going to sleep" << dendl;
|
||||
write_cond.Wait(write_lock);
|
||||
dout(20) << "write_thread_entry woke up" << dendl;
|
||||
continue;
|
||||
}
|
||||
|
||||
// grab next item
|
||||
epoch_t epoch = writeq.front().first;
|
||||
bufferlist &ebl = writeq.front().second;
|
||||
|
||||
// epoch boundary?
|
||||
if (epoch > header.last_epoch()) {
|
||||
dout(10) << "saw an epoch boundary " << header.last_epoch() << " -> " << epoch << dendl;
|
||||
header.push(epoch, write_pos);
|
||||
}
|
||||
|
||||
off_t size = 2*sizeof(entry_header_t) + ebl.length();
|
||||
size = DIV_ROUND_UP(size, 4096) * 4096;
|
||||
|
||||
// does it fit?
|
||||
if (header.wrap) {
|
||||
// we're wrapped. don't overwrite ourselves.
|
||||
if (write_pos + size >= header.offset[0]) {
|
||||
dout(10) << "JOURNAL FULL (and wrapped), " << write_pos << "+" << size
|
||||
<< " >= " << header.offset[0]
|
||||
<< dendl;
|
||||
full = true;
|
||||
writeq.clear();
|
||||
print_header();
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
// we haven't wrapped.
|
||||
if (write_pos + size >= header.max_size) {
|
||||
// is there room if we wrap?
|
||||
if ((off_t)sizeof(header_t) + size < header.offset[0]) {
|
||||
// yes!
|
||||
dout(10) << "wrapped from " << write_pos << " to " << sizeof(header_t) << dendl;
|
||||
header.wrap = write_pos;
|
||||
write_pos = sizeof(header_t);
|
||||
header.push(ebofs->get_super_epoch(), write_pos);
|
||||
write_header();
|
||||
} else {
|
||||
// no room.
|
||||
dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << write_pos << "+" << size
|
||||
<< " >= " << header.max_size
|
||||
<< dendl;
|
||||
full = true;
|
||||
writeq.clear();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// build it
|
||||
bufferptr bp = buffer::create_page_aligned(size);
|
||||
entry_header_t *h = (entry_header_t*)bp.c_str();
|
||||
ebl.copy(0, ebl.length(), bp.c_str()+sizeof(entry_header_t));
|
||||
|
||||
// add to write buffer
|
||||
dout(15) << "write_thread_entry will write " << write_pos << " : "
|
||||
<< bp.length()
|
||||
<< " epoch " << epoch
|
||||
<< dendl;
|
||||
|
||||
// add it this entry
|
||||
h->epoch = epoch;
|
||||
h->len = ebl.length();
|
||||
h->make_magic(write_pos, header.fsid);
|
||||
memcpy(bp.c_str() + sizeof(*h) + ebl.length(), h, sizeof(*h));
|
||||
|
||||
Context *oncommit = commitq.front();
|
||||
if (oncommit)
|
||||
writingq.push_back(oncommit);
|
||||
|
||||
// pop from writeq
|
||||
writeq.pop_front();
|
||||
commitq.pop_front();
|
||||
|
||||
writing = true;
|
||||
write_lock.Unlock();
|
||||
dout(15) << "write_thread_entry writing " << write_pos << "~" << bp.length() << dendl;
|
||||
|
||||
::lseek(fd, write_pos, SEEK_SET);
|
||||
::write(fd, bp.c_str(), size);
|
||||
|
||||
write_lock.Lock();
|
||||
writing = false;
|
||||
write_pos += bp.length();
|
||||
ebofs->queue_finishers(writingq);
|
||||
}
|
||||
write_lock.Unlock();
|
||||
dout(10) << "write_thread_entry finish" << dendl;
|
||||
}
|
||||
|
||||
bool FileJournal::is_full()
|
||||
{
|
||||
@ -458,42 +457,50 @@ void FileJournal::commit_epoch_start()
|
||||
}
|
||||
}
|
||||
|
||||
void FileJournal::commit_epoch_finish()
|
||||
void FileJournal::commit_epoch_finish(epoch_t new_epoch)
|
||||
{
|
||||
dout(10) << "commit_epoch_finish committed " << ebofs->get_super_epoch()-1 << dendl;
|
||||
dout(10) << "commit_epoch_finish committed " << (new_epoch-1) << dendl;
|
||||
|
||||
Mutex::Locker locker(write_lock);
|
||||
|
||||
if (full) {
|
||||
// full journal damage control.
|
||||
dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << dendl;
|
||||
header.clear();
|
||||
write_pos = get_top();
|
||||
} else {
|
||||
// update header -- trim/discard old (committed) epochs
|
||||
while (header.num && header.epoch[0] < ebofs->get_super_epoch())
|
||||
header.pop();
|
||||
}
|
||||
write_header();
|
||||
|
||||
// discard any unwritten items in previous epoch
|
||||
epoch_t epoch = ebofs->get_super_epoch();
|
||||
while (!writeq.empty() && writeq.front().first < epoch) {
|
||||
dout(15) << " dropping unwritten and committed "
|
||||
<< write_pos << " : " << writeq.front().second.length()
|
||||
<< " epoch " << writeq.front().first
|
||||
<< dendl;
|
||||
// finisher?
|
||||
Context *oncommit = commitq.front();
|
||||
if (oncommit) writingq.push_back(oncommit);
|
||||
|
||||
// discard.
|
||||
writeq.pop_front();
|
||||
commitq.pop_front();
|
||||
}
|
||||
|
||||
// queue the finishers
|
||||
ebofs->queue_finishers(writingq);
|
||||
Mutex::Locker locker(write_lock);
|
||||
|
||||
if (full) {
|
||||
// full journal damage control.
|
||||
dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << dendl;
|
||||
header.clear();
|
||||
write_pos = get_top();
|
||||
} else {
|
||||
// update header -- trim/discard old (committed) epochs
|
||||
print_header();
|
||||
while (header.num && header.epoch[0] < new_epoch) {
|
||||
dout(10) << " popping epoch " << header.epoch[0] << " < " << new_epoch << dendl;
|
||||
header.pop();
|
||||
}
|
||||
if (header.num == 0) {
|
||||
dout(10) << " starting fresh" << dendl;
|
||||
write_pos = get_top();
|
||||
header.push(new_epoch, write_pos);
|
||||
}
|
||||
}
|
||||
must_write_header = true;
|
||||
|
||||
// discard any unwritten items in previous epoch
|
||||
while (!writeq.empty() && writeq.front().first < new_epoch) {
|
||||
dout(15) << " dropping unwritten and committed "
|
||||
<< write_pos << " : " << writeq.front().second.length()
|
||||
<< " epoch " << writeq.front().first
|
||||
<< dendl;
|
||||
// finisher?
|
||||
Context *oncommit = commitq.front();
|
||||
if (oncommit) writingq.push_back(oncommit);
|
||||
|
||||
// discard.
|
||||
writeq.pop_front();
|
||||
commitq.pop_front();
|
||||
}
|
||||
|
||||
// queue the finishers
|
||||
ebofs->queue_finishers(writingq);
|
||||
dout(10) << "commit_epoch_finish done" << dendl;
|
||||
}
|
||||
|
||||
|
||||
|
@ -38,10 +38,11 @@ public:
|
||||
int num;
|
||||
off_t wrap;
|
||||
off_t max_size;
|
||||
size_t block_size;
|
||||
epoch_t epoch[4];
|
||||
off_t offset[4];
|
||||
|
||||
header_t() : fsid(0), num(0), wrap(0), max_size(0) {}
|
||||
header_t() : fsid(0), num(0), wrap(0), max_size(0), block_size(0) {}
|
||||
|
||||
void clear() {
|
||||
num = 0;
|
||||
@ -67,7 +68,10 @@ public:
|
||||
num++;
|
||||
}
|
||||
epoch_t last_epoch() {
|
||||
return epoch[num-1];
|
||||
if (num)
|
||||
return epoch[num-1];
|
||||
else
|
||||
return 0;
|
||||
}
|
||||
} header;
|
||||
|
||||
@ -91,9 +95,10 @@ public:
|
||||
private:
|
||||
string fn;
|
||||
|
||||
off_t max_size;
|
||||
size_t block_size;
|
||||
bool directio;
|
||||
bool full, writing;
|
||||
bool full, writing, must_write_header;
|
||||
off_t write_pos; // byte where next entry written goes
|
||||
off_t read_pos; //
|
||||
|
||||
@ -111,23 +116,26 @@ private:
|
||||
Cond write_cond;
|
||||
bool write_stop;
|
||||
|
||||
int _open();
|
||||
void print_header();
|
||||
void read_header();
|
||||
bufferptr prepare_header();
|
||||
void write_header();
|
||||
void start_writer();
|
||||
void stop_writer();
|
||||
void write_thread_entry();
|
||||
void write_thread_entry_dio();
|
||||
|
||||
void check_for_wrap(epoch_t epoch, off_t pos, off_t size);
|
||||
bool prepare_single_dio_write(bufferlist& bl);
|
||||
void prepare_multi_write(bufferlist& bl);
|
||||
void do_write(bufferlist& bl);
|
||||
|
||||
class Writer : public Thread {
|
||||
FileJournal *journal;
|
||||
public:
|
||||
Writer(FileJournal *fj) : journal(fj) {}
|
||||
void *entry() {
|
||||
if (journal->directio)
|
||||
journal->write_thread_entry_dio();
|
||||
else
|
||||
journal->write_thread_entry();
|
||||
journal->write_thread_entry();
|
||||
return 0;
|
||||
}
|
||||
} write_thread;
|
||||
@ -142,8 +150,9 @@ private:
|
||||
public:
|
||||
FileJournal(Ebofs *e, const char *f, bool dio=false) :
|
||||
Journal(e), fn(f),
|
||||
max_size(0), block_size(0),
|
||||
directio(dio),
|
||||
full(false), writing(false),
|
||||
full(false), writing(false), must_write_header(false),
|
||||
write_pos(0), read_pos(0),
|
||||
fd(0),
|
||||
write_stop(false), write_thread(this) { }
|
||||
@ -158,7 +167,7 @@ private:
|
||||
// writes
|
||||
void submit_entry(bufferlist& e, Context *oncommit); // submit an item
|
||||
void commit_epoch_start(); // mark epoch boundary
|
||||
void commit_epoch_finish(); // mark prior epoch as committed (we can expire)
|
||||
void commit_epoch_finish(epoch_t); // mark prior epoch as committed (we can expire)
|
||||
|
||||
bool read_entry(bufferlist& bl, epoch_t& e);
|
||||
|
||||
|
@ -37,7 +37,7 @@ public:
|
||||
virtual void make_writeable() = 0;
|
||||
virtual void submit_entry(bufferlist& e, Context *oncommit) = 0;
|
||||
virtual void commit_epoch_start() = 0; // mark epoch boundary
|
||||
virtual void commit_epoch_finish() = 0; // mark prior epoch as committed (we can expire)
|
||||
virtual void commit_epoch_finish(epoch_t) = 0; // mark prior epoch as committed (we can expire)
|
||||
virtual bool read_entry(bufferlist& bl, epoch_t &e) = 0;
|
||||
virtual bool is_full() = 0;
|
||||
|
||||
|
@ -19,10 +19,13 @@
|
||||
|
||||
map<off_t, pair<utime_t,utime_t> > writes;
|
||||
|
||||
Mutex lock;
|
||||
|
||||
struct C_Commit : public Context {
|
||||
off_t off;
|
||||
C_Commit(off_t o) : off(o) {}
|
||||
void finish(int r) {
|
||||
Mutex::Locker l(lock);
|
||||
utime_t now = g_clock.now();
|
||||
dout(0) << off << "\t"
|
||||
<< (writes[off].second-writes[off].first) << "\t"
|
||||
@ -51,8 +54,9 @@ int main(int argc, const char **argv)
|
||||
bp.zero();
|
||||
bufferlist bl;
|
||||
bl.push_back(bp);
|
||||
|
||||
|
||||
float interval = 1.0 / 1000;
|
||||
|
||||
cout << "#dev " << filename
|
||||
<< seconds << " seconds, " << bytes << " bytes per write" << std::endl;
|
||||
|
||||
@ -74,11 +78,28 @@ int main(int argc, const char **argv)
|
||||
cout << "# offset\tack\tcommit" << std::endl;
|
||||
while (now < end) {
|
||||
object_t oid(1,1);
|
||||
writes[pos].first = now;
|
||||
utime_t start;
|
||||
{
|
||||
Mutex::Locker l(lock);
|
||||
start = writes[pos].first = now;
|
||||
}
|
||||
fs.write(oid, pos, bytes, bl, new C_Commit(pos));
|
||||
now = g_clock.now();
|
||||
writes[pos].second = now;
|
||||
{
|
||||
Mutex::Locker l(lock);
|
||||
writes[pos].second = now;
|
||||
}
|
||||
pos += bytes;
|
||||
|
||||
// wait?
|
||||
utime_t next = start;
|
||||
next += interval;
|
||||
if (now < next) {
|
||||
float s = next - now;
|
||||
s *= 1000 * 1000; // s -> us
|
||||
//cout << "sleeping for " << s << std::endl;
|
||||
usleep(s);
|
||||
}
|
||||
}
|
||||
|
||||
fs.umount();
|
||||
|
Loading…
Reference in New Issue
Block a user