* simple ebofs journaling, yay!

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1432 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
sageweil 2007-06-20 23:04:27 +00:00
parent b719c69e6f
commit da3a11dd83
11 changed files with 746 additions and 165 deletions

View File

@ -38,7 +38,8 @@ EBOFS_OBJS= \
ebofs/BlockDevice.o\
ebofs/BufferCache.o\
ebofs/Ebofs.o\
ebofs/Allocator.o
ebofs/Allocator.o\
ebofs/FileJournal.o
MDS_OBJS= \
mds/MDS.o\

View File

@ -16,6 +16,8 @@
#include "Ebofs.h"
#include "FileJournal.h"
#include <errno.h>
#ifndef DARWIN
@ -50,6 +52,7 @@ int Ebofs::mount()
ebofs_lock.Lock();
assert(!mounted);
// open dev
int r = dev.open(&idle_kicker);
if (r < 0) {
ebofs_lock.Unlock();
@ -79,6 +82,8 @@ int Ebofs::mount()
dout(3) << "mount epoch " << super_epoch << endl;
assert(super_epoch == sb->epoch);
super_fsid = sb->fsid;
free_blocks = sb->free_blocks;
limbo_blocks = sb->limbo_blocks;
@ -101,6 +106,43 @@ int Ebofs::mount()
allocator.release_limbo();
// open journal?
if (journalfn) {
journal = new FileJournal(this, journalfn);
if (journal->open() < 0) {
dout(-3) << "mount journal " << journalfn << " open failed" << endl;
delete journal;
journal = 0;
} else {
dout(-3) << "mount journal " << journalfn << " opened, replaying" << endl;
while (1) {
bufferlist bl;
epoch_t e;
if (!journal->read_entry(bl, e)) {
dout(-3) << "mount replay: end of journal, done." << endl;
break;
}
if (e < super_epoch) {
dout(-3) << "mount replay: skipping old entry in epoch " << e << " < " << super_epoch << endl;
}
if (e == super_epoch+1) {
super_epoch++;
dout(-3) << "mount replay: jumped to next epoch " << super_epoch << endl;
}
assert(e == super_epoch);
dout(-3) << "mount replay: applying transaction in epoch " << e << endl;
Transaction t;
int off = 0;
t._decode(bl, off);
_apply_transaction(t);
}
}
}
dout(3) << "mount starting commit+finisher threads" << endl;
commit_thread.create();
finisher_thread.create();
@ -108,6 +150,7 @@ int Ebofs::mount()
dout(1) << "mounted " << dev.get_device_name() << " " << dev.get_num_blocks() << " blocks, " << nice_blocks(dev.get_num_blocks()) << endl;
mounted = true;
ebofs_lock.Unlock();
return 0;
}
@ -126,6 +169,10 @@ int Ebofs::mkfs()
block_t num_blocks = dev.get_num_blocks();
// make a super-random fsid
srand(time(0) ^ getpid());
super_fsid = (lrand48() << 32) ^ mrand48();
free_blocks = 0;
limbo_blocks = 0;
@ -197,6 +244,18 @@ int Ebofs::mkfs()
dev.close();
// create journal?
if (journalfn) {
journal = new FileJournal(this, journalfn);
if (journal->create() < 0) {
dout(3) << "mount journal " << journalfn << " created failed" << endl;
delete journal;
} else {
dout(3) << "mount journal " << journalfn << " created" << endl;
}
}
dout(2) << "mkfs: " << dev.get_device_name() << " " << dev.get_num_blocks() << " blocks, " << nice_blocks(dev.get_num_blocks()) << endl;
ebofs_lock.Unlock();
return 0;
@ -272,6 +331,7 @@ void Ebofs::prepare_super(version_t epoch, bufferptr& bp)
// fill in super
memset(&sb, 0, sizeof(sb));
sb.s_magic = EBOFS_MAGIC;
sb.fsid = super_fsid;
sb.epoch = epoch;
sb.num_blocks = dev.get_num_blocks();
@ -409,6 +469,7 @@ int Ebofs::commit_thread_entry()
<< ", max dirty " << g_conf.ebofs_bc_max_dirty
<< endl;
if (journal) journal->commit_epoch_start();
// (async) write onodes+condes (do this first; it currently involves inode reallocation)
commit_inodes_start();
@ -453,14 +514,14 @@ int Ebofs::commit_thread_entry()
alloc_more_node_space();
}
// signal journal
if (journal) journal->commit_epoch_finish();
// kick waiters
dout(10) << "commit_thread queueing commit + kicking sync waiters" << endl;
finisher_lock.Lock();
finisher_queue.splice(finisher_queue.end(), commit_waiters[super_epoch-1]);
queue_finishers(commit_waiters[super_epoch-1]);
commit_waiters.erase(super_epoch-1);
finisher_cond.Signal();
finisher_lock.Unlock();
sync_cond.Signal();
@ -1222,7 +1283,18 @@ void Ebofs::sync(Context *onsafe)
ebofs_lock.Lock();
if (onsafe) {
dirty = true;
commit_waiters[super_epoch].push_back(onsafe);
while (1) {
if (journal) {
// journal empty transaction
Transaction t;
bufferlist bl;
t._encode(bl);
if (journal->submit_entry(bl, onsafe)) break;
}
commit_waiters[super_epoch].push_back(onsafe);
break;
}
}
ebofs_lock.Unlock();
}
@ -1994,6 +2066,25 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
ebofs_lock.Lock();
dout(7) << "apply_transaction start (" << t.ops.size() << " ops)" << endl;
unsigned r = _apply_transaction(t);
// set up commit waiter
while (1) {
if (journal) {
bufferlist bl;
t._encode(bl);
if (journal->submit_entry(bl, onsafe)) break;
}
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
break;
}
ebofs_lock.Unlock();
return r;
}
unsigned Ebofs::_apply_transaction(Transaction& t)
{
// do ops
unsigned r = 0; // bit fields indicate which ops failed.
int bit = 1;
@ -2028,7 +2119,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
case Transaction::OP_GETATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
const char *attrname = t.get_attrname(); t.pop_attrname();
pair<void*,int*> pattrval = t.pattrvals.front(); t.pattrvals.pop_front();
if ((*(pattrval.second) = _getattr(oid, attrname, pattrval.first, *(pattrval.second))) < 0) {
dout(7) << "apply_transaction fail on _getattr" << endl;
@ -2095,7 +2186,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
case Transaction::OP_SETATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
const char *attrname = t.get_attrname(); t.pop_attrname();
//pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
bufferlist bl;
bl.claim( t.attrbls.front() );
@ -2121,7 +2212,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
case Transaction::OP_RMATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
const char *attrname = t.get_attrname(); t.pop_attrname();
if (_rmattr(oid, attrname) < 0) {
dout(7) << "apply_transaction fail on _rmattr" << endl;
r &= bit;
@ -2185,7 +2276,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
case Transaction::OP_COLL_SETATTR:
{
coll_t cid = t.cids.front(); t.cids.pop_front();
const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
const char *attrname = t.get_attrname(); t.pop_attrname();
//pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
bufferlist bl;
bl.claim( t.attrbls.front() );
@ -2201,7 +2292,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
case Transaction::OP_COLL_RMATTR:
{
coll_t cid = t.cids.front(); t.cids.pop_front();
const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
const char *attrname = t.get_attrname(); t.pop_attrname();
if (_collection_rmattr(cid, attrname) < 0) {
dout(7) << "apply_transaction fail on _collection_rmattr" << endl;
r &= bit;
@ -2217,16 +2308,7 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
bit = bit << 1;
}
dout(7) << "apply_transaction finish (r = " << r << ")" << endl;
// set up commit waiter
//if (r == 0) {
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
//} else {
//if (onsafe) delete onsafe;
//}
ebofs_lock.Unlock();
dout(7) << "_apply_transaction finish (r = " << r << ")" << endl;
return r;
}
@ -2295,36 +2377,6 @@ int Ebofs::_write(object_t oid, off_t offset, size_t length, bufferlist& bl)
}
/*int Ebofs::write(object_t oid,
off_t off, size_t len,
bufferlist& bl, bool fsync)
{
// wait?
if (fsync) {
// wait for flush.
Cond cond;
bool done;
int flush = 1; // write never returns positive
Context *c = new C_Cond(&cond, &done, &flush);
int r = write(oid, off, len, bl, c);
if (r < 0) return r;
ebofs_lock.Lock();
{
while (!done)
cond.Wait(ebofs_lock);
assert(flush <= 0);
}
ebofs_lock.Unlock();
if (flush < 0) return flush;
return r;
} else {
// don't wait for flush.
return write(oid, off, len, bl, (Context*)0);
}
}
*/
int Ebofs::write(object_t oid,
off_t off, size_t len,
bufferlist& bl, Context *onsafe)
@ -2338,7 +2390,17 @@ int Ebofs::write(object_t oid,
// commit waiter
if (r > 0) {
assert((size_t)r == len);
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
while (1) {
if (journal) {
Transaction t;
t.write(oid, off, len, bl);
bufferlist bl;
t._encode(bl);
if (journal->submit_entry(bl, onsafe)) break;
}
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
break;
}
} else {
if (onsafe) delete onsafe;
}
@ -2372,7 +2434,17 @@ int Ebofs::remove(object_t oid, Context *onsafe)
// set up commit waiter
if (r >= 0) {
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
while (1) {
if (journal) {
Transaction t;
t.remove(oid);
bufferlist bl;
t._encode(bl);
if (journal->submit_entry(bl, onsafe)) break;
}
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
break;
}
} else {
if (onsafe) delete onsafe;
}
@ -2447,7 +2519,17 @@ int Ebofs::truncate(object_t oid, off_t size, Context *onsafe)
// set up commit waiter
if (r >= 0) {
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
while (1) {
if (journal) {
Transaction t;
t.truncate(oid, size);
bufferlist bl;
t._encode(bl);
if (journal->submit_entry(bl, onsafe)) break;
}
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
break;
}
} else {
if (onsafe) delete onsafe;
}
@ -2466,7 +2548,17 @@ int Ebofs::clone(object_t from, object_t to, Context *onsafe)
// set up commit waiter
if (r >= 0) {
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
while (1) {
if (journal) {
Transaction t;
t.clone(from, to);
bufferlist bl;
t._encode(bl);
if (journal->submit_entry(bl, onsafe)) break;
}
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
break;
}
} else {
if (onsafe) delete onsafe;
}
@ -2642,7 +2734,17 @@ int Ebofs::setattr(object_t oid, const char *name, const void *value, size_t siz
// set up commit waiter
if (r >= 0) {
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
while (1) {
if (journal) {
Transaction t;
t.setattr(oid, name, value, size);
bufferlist bl;
t._encode(bl);
if (journal->submit_entry(bl, onsafe)) break;
}
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
break;
}
} else {
if (onsafe) delete onsafe;
}
@ -2675,7 +2777,17 @@ int Ebofs::setattrs(object_t oid, map<string,bufferptr>& attrset, Context *onsaf
// set up commit waiter
if (r >= 0) {
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
while (1) {
if (journal) {
Transaction t;
t.setattrs(oid, attrset);
bufferlist bl;
t._encode(bl);
if (journal->submit_entry(bl, onsafe)) break;
}
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
break;
}
} else {
if (onsafe) delete onsafe;
}
@ -2760,7 +2872,17 @@ int Ebofs::rmattr(object_t oid, const char *name, Context *onsafe)
// set up commit waiter
if (r >= 0) {
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
while (1) {
if (journal) {
Transaction t;
t.rmattr(oid, name);
bufferlist bl;
t._encode(bl);
if (journal->submit_entry(bl, onsafe)) break;
}
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
break;
}
} else {
if (onsafe) delete onsafe;
}
@ -2837,7 +2959,17 @@ int Ebofs::create_collection(coll_t cid, Context *onsafe)
// set up commit waiter
if (r >= 0) {
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
while (1) {
if (journal) {
Transaction t;
t.create_collection(cid);
bufferlist bl;
t._encode(bl);
if (journal->submit_entry(bl, onsafe)) break;
}
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
break;
}
} else {
if (onsafe) delete onsafe;
}
@ -2884,7 +3016,17 @@ int Ebofs::destroy_collection(coll_t cid, Context *onsafe)
// set up commit waiter
if (r >= 0) {
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
while (1) {
if (journal) {
Transaction t;
t.remove_collection(cid);
bufferlist bl;
t._encode(bl);
if (journal->submit_entry(bl, onsafe)) break;
}
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
break;
}
} else {
if (onsafe) delete onsafe;
}
@ -2938,7 +3080,17 @@ int Ebofs::collection_add(coll_t cid, object_t oid, Context *onsafe)
// set up commit waiter
if (r >= 0) {
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
while (1) {
if (journal) {
Transaction t;
t.collection_add(cid, oid);
bufferlist bl;
t._encode(bl);
if (journal->submit_entry(bl, onsafe)) break;
}
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
break;
}
} else {
if (onsafe) delete onsafe;
}
@ -2979,7 +3131,17 @@ int Ebofs::collection_remove(coll_t cid, object_t oid, Context *onsafe)
// set up commit waiter
if (r >= 0) {
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
while (1) {
if (journal) {
Transaction t;
t.collection_remove(cid, oid);
bufferlist bl;
t._encode(bl);
if (journal->submit_entry(bl, onsafe)) break;
}
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
break;
}
} else {
if (onsafe) delete onsafe;
}
@ -3042,7 +3204,17 @@ int Ebofs::collection_setattr(coll_t cid, const char *name, const void *value, s
// set up commit waiter
if (r >= 0) {
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
while (1) {
if (journal) {
Transaction t;
t.collection_setattr(cid, name, value, size);
bufferlist bl;
t._encode(bl);
if (journal->submit_entry(bl, onsafe)) break;
}
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
break;
}
} else {
if (onsafe) delete onsafe;
}
@ -3100,7 +3272,17 @@ int Ebofs::collection_rmattr(coll_t cid, const char *name, Context *onsafe)
// set up commit waiter
if (r >= 0) {
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
while (1) {
if (journal) {
Transaction t;
t.collection_rmattr(cid, name);
bufferlist bl;
t._encode(bl);
if (journal->submit_entry(bl, onsafe)) break;
}
if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
break;
}
} else {
if (onsafe) delete onsafe;
}

View File

@ -29,6 +29,7 @@ using namespace __gnu_cxx;
#include "nodes.h"
#include "Allocator.h"
#include "Table.h"
#include "Journal.h"
#include "common/Mutex.h"
#include "common/Cond.h"
@ -40,20 +41,23 @@ typedef pair<coll_t,object_t> coll_object_t;
class Ebofs : public ObjectStore {
protected:
protected:
Mutex ebofs_lock; // a beautiful global lock
// ** debuggy **
bool fake_writes;
// ** super **
public:
BlockDevice dev;
protected:
bool mounted, unmounting, dirty;
bool readonly;
version_t super_epoch;
bool commit_thread_started, mid_commit;
Cond commit_cond; // to wake up the commit thread
Cond sync_cond;
uint64_t super_fsid;
map<version_t, list<Context*> > commit_waiters;
@ -71,9 +75,16 @@ class Ebofs : public ObjectStore {
}
} commit_thread;
public:
uint64_t get_fsid() { return super_fsid; }
epoch_t get_super_epoch() { return super_epoch; }
protected:
// ** journal **
char *journalfn;
Journal *journal;
// ** allocator **
block_t free_blocks, limbo_blocks;
Allocator allocator;
@ -188,6 +199,21 @@ class Ebofs : public ObjectStore {
bool finisher_stop;
list<Context*> finisher_queue;
public:
void queue_finisher(Context *c) {
finisher_lock.Lock();
finisher_queue.push_back(c);
finisher_cond.Signal();
finisher_lock.Unlock();
}
void queue_finishers(list<Context*>& ls) {
finisher_lock.Lock();
finisher_queue.splice(finisher_queue.end(), ls);
finisher_cond.Signal();
finisher_lock.Unlock();
}
protected:
void *finisher_thread_entry();
class FinisherThread : public Thread {
Ebofs *ebofs;
@ -204,12 +230,13 @@ class Ebofs : public ObjectStore {
public:
Ebofs(char *devfn) :
Ebofs(char *devfn, char *jfn=0) :
fake_writes(false),
dev(devfn),
mounted(false), unmounting(false), dirty(false), readonly(false),
super_epoch(0), commit_thread_started(false), mid_commit(false),
commit_thread(this),
journalfn(jfn), journal(0),
free_blocks(0), limbo_blocks(0),
allocator(this),
nodepool(ebofs_lock),
@ -222,6 +249,11 @@ class Ebofs : public ObjectStore {
finisher_stop(false), finisher_thread(this) {
for (int i=0; i<EBOFS_NUM_FREE_BUCKETS; i++)
free_tab[i] = 0;
if (!journalfn) {
journalfn = new char[strlen(devfn) + 100];
strcpy(journalfn, devfn);
strcat(journalfn, ".journal");
}
}
~Ebofs() {
}
@ -298,6 +330,8 @@ class Ebofs : public ObjectStore {
private:
// private interface -- use if caller already holds lock
unsigned _apply_transaction(Transaction& t);
int _read(object_t oid, off_t off, size_t len, bufferlist& bl);
int _is_cached(object_t oid, off_t off, size_t len);
int _stat(object_t oid, struct stat *st);

View File

@ -1,44 +1,115 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "FileJournal.h"
#include "Ebofs.h"
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "config.h"
#define dout(x) if (x <= g_conf.debug_ebofs) cout << "ebofs(" << dev.get_device_name() << ").journal "
#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << dev.get_device_name() << ").journal "
#undef dout
#define dout(x) if (true || x <= g_conf.debug_ebofs) cout << "ebofs(" << ebofs->dev.get_device_name() << ").journal "
#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << ebofs->dev.get_device_name() << ").journal "
void FileJournal::create()
int FileJournal::create()
{
dout(1) << "create " << fn << endl;
// open/create
fd = ::open(fn.c_str(), O_CREAT|O_WRONLY);
fd = ::open(fn.c_str(), O_RDWR|O_SYNC);
if (fd < 0) {
dout(1) << "create failed " << errno << " " << strerror(errno) << endl;
return -errno;
}
assert(fd > 0);
::ftruncate(fd);
::fchmod(fd, 0644);
//::ftruncate(fd, 0);
//::fchmod(fd, 0644);
// get size
struct stat st;
::fstat(fd, &st);
dout(1) << "open " << fn << " " << st.st_size << " bytes" << endl;
// write empty header
header.clear();
header.fsid = ebofs->get_fsid();
header.max_size = st.st_size;
write_header();
read_pos = write_pos = queue_pos = sizeof(header);
::close(fd);
return 0;
}
void FileJournal::open()
int FileJournal::open()
{
dout(1) << "open " << fn << endl;
//dout(1) << "open " << fn << endl;
// open and file
assert(fd == 0);
fd = ::open(fn.c_str(), O_RDWR);
fd = ::open(fn.c_str(), O_RDWR|O_SYNC);
if (fd < 0) {
dout(1) << "open failed " << errno << " " << strerror(errno) << endl;
return -errno;
}
assert(fd > 0);
// read header?
// ***
read_header();
if (header.num == 0 ||
header.fsid != ebofs->get_fsid()) {
// empty.
read_pos = 0;
write_pos = queue_pos = sizeof(header);
} else {
// pick an offset
read_pos = write_pos = queue_pos = 0;
for (int i=0; i<header.num; i++) {
if (header.epoch[i] == ebofs->get_super_epoch()) {
dout(2) << "using read_pos header pointer "
<< header.epoch[i] << " at " << header.offset[i]
<< endl;
read_pos = header.offset[i];
break;
}
if (header.epoch[i] < ebofs->get_super_epoch()) {
dout(2) << "super_epoch is " << ebofs->get_super_epoch()
<< ", skipping " << header.epoch[i] << " at " << header.offset[i]
<< endl;
continue;
}
if (header.epoch[i] > ebofs->get_super_epoch()) {
dout(2) << "super_epoch is " << ebofs->get_super_epoch()
<< ", but wtf, journal is later " << header.epoch[i] << " at " << header.offset[i]
<< endl;
break;
}
assert(0);
}
}
start_writer();
return 0;
}
void FileJournal::close()
@ -49,7 +120,8 @@ void FileJournal::close()
stop_writer();
// close
assert(q.empty());
assert(writeq.empty());
assert(commitq.empty());
assert(fd > 0);
::close(fd);
fd = 0;
@ -73,12 +145,36 @@ void FileJournal::stop_writer()
}
void FileJournal::print_header()
{
for (int i=0; i<header.num; i++) {
if (i && header.offset[i] < header.offset[i-1]) {
assert(header.wrap);
dout(10) << "header: wrap at " << header.wrap << endl;
}
dout(10) << "header: epoch " << header.epoch[i] << " at " << header.offset[i] << endl;
}
//if (header.wrap) dout(10) << "header: wrap at " << header.wrap << endl;
}
void FileJournal::read_header()
{
dout(10) << "read_header" << endl;
memset(&header, 0, sizeof(header)); // zero out (read may fail)
::lseek(fd, 0, SEEK_SET);
int r = ::read(fd, &header, sizeof(header));
if (r < 0)
dout(0) << "read_header error " << errno << " " << strerror(errno) << endl;
print_header();
}
void FileJournal::write_header()
{
dout(10) << "write_header" << endl;
dout(10) << "write_header " << endl;
print_header();
::lseek(fd, 0, SEEK_SET);
::write(fd, &header, sizeof(header));
int r = ::write(fd, &header, sizeof(header));
if (r < 0)
dout(0) << "write_header error " << errno << " " << strerror(errno) << endl;
}
@ -91,6 +187,7 @@ void FileJournal::write_thread_entry()
if (writeq.empty()) {
// sleep
dout(20) << "write_thread_entry going to sleep" << endl;
assert(write_pos == queue_pos);
write_cond.Wait(write_lock);
dout(20) << "write_thread_entry woke up" << endl;
continue;
@ -99,24 +196,35 @@ void FileJournal::write_thread_entry()
// do queued writes
while (!writeq.empty()) {
// grab next item
epoch_t e = writeq.front().first;
epoch_t epoch = writeq.front().first;
bufferlist bl;
bl.claim(writeq.front().second);
writeq.pop_front();
Context *oncommit = commitq.front();
commitq.pop_front();
dout(15) << "write_thread_entry writing " << bottom << " : "
// wrap?
if (write_pos == header.wrap) {
dout(15) << "write_thread_entry wrapped write_pos at " << write_pos << " to " << sizeof(header_t) << endl;
assert(header.wrap == write_pos);
write_header();
write_pos = sizeof(header_t);
}
// write!
dout(15) << "write_thread_entry writing " << write_pos << " : "
<< bl.length()
<< " epoch " << e
<< " epoch " << epoch
<< endl;
// write epoch, len, data.
::fseek(fd, bottom, SEEK_SET);
::write(fd, &e, sizeof(e));
uint32_t len = bl.length();
::write(fd, &len, sizeof(len));
// write entry header
entry_header_t h;
h.epoch = epoch;
h.len = bl.length();
h.make_magic(write_pos, header.fsid);
::lseek(fd, write_pos, SEEK_SET);
::write(fd, &h, sizeof(h));
for (list<bufferptr>::const_iterator it = bl.buffers().begin();
it != bl.buffers().end();
@ -124,14 +232,21 @@ void FileJournal::write_thread_entry()
if ((*it).length() == 0) continue; // blank buffer.
::write(fd, (char*)(*it).c_str(), (*it).length() );
}
::write(fd, &h, sizeof(h));
// move position pointer
bottom += sizeof(epoch_t) + sizeof(uint32_t) + e.length();
write_pos += 2*sizeof(entry_header_t) + bl.length();
// do commit callback
if (oncommit) {
oncommit->finish(0);
delete oncommit;
if (1) {
// queue callback
ebofs->queue_finisher(oncommit);
} else {
// callback now
oncommit->finish(0);
delete oncommit;
}
}
}
}
@ -140,61 +255,202 @@ void FileJournal::write_thread_entry()
dout(10) << "write_thread_entry finish" << endl;
}
void FileJournal::submit_entry(bufferlist& e, Context *oncommit)
bool FileJournal::submit_entry(bufferlist& e, Context *oncommit)
{
dout(10) << "submit_entry " << bottom << " : " << e.length()
<< " epoch " << ebofs->super_epoch
assert(queue_pos != 0); // bad create(), or journal didn't replay to completion.
// ** lock **
Mutex::Locker locker(write_lock);
// wrap? full?
off_t size = 2*sizeof(entry_header_t) + e.length();
if (full) return false; // already marked full.
if (header.wrap) {
// we're wrapped. don't overwrite ourselves.
if (queue_pos + size >= header.offset[0]) {
dout(10) << "submit_entry JOURNAL FULL (and wrapped), " << queue_pos << "+" << size
<< " >= " << header.offset[0]
<< endl;
full = true;
print_header();
return false;
}
} else {
// we haven't wrapped.
if (queue_pos + size >= header.max_size) {
// is there room if we wrap?
if ((off_t)sizeof(header_t) + size < header.offset[0]) {
// yes!
dout(10) << "submit_entry wrapped from " << queue_pos << " to " << sizeof(header_t) << endl;
header.wrap = queue_pos;
queue_pos = sizeof(header_t);
header.push(ebofs->get_super_epoch(), queue_pos);
} else {
// no room.
dout(10) << "submit_entry JOURNAL FULL (and can't wrap), " << queue_pos << "+" << size
<< " >= " << header.max_size
<< endl;
full = true;
return false;
}
}
}
dout(10) << "submit_entry " << queue_pos << " : " << e.length()
<< " epoch " << ebofs->get_super_epoch()
<< " " << oncommit << endl;
// dump on queue
writeq.push_back(pair<epoch_t,bufferlist>(ebofs->super_epoch, e));
writeq.push_back(pair<epoch_t,bufferlist>(ebofs->get_super_epoch(), e));
commitq.push_back(oncommit);
queue_pos += size;
// kick writer thread
write_cond.Signal();
return true;
}
void FileJournal::commit_epoch_start()
{
dout(10) << "commit_epoch_start" << endl;
dout(10) << "commit_epoch_start on " << ebofs->get_super_epoch()-1
<< " -- new epoch " << ebofs->get_super_epoch()
<< endl;
write_lock.Lock();
{
header.epoch2 = ebofs->super_epoch;
header.top2 = bottom;
write_header();
}
write_lock.Unlock();
Mutex::Locker locker(write_lock);
// was full -> empty -> now usable?
if (full) {
if (header.num != 0) {
dout(1) << " journal FULL, ignoring this epoch" << endl;
return;
}
dout(1) << " clearing FULL flag, journal now usable" << endl;
full = false;
}
// note epoch boundary
header.push(ebofs->get_super_epoch(), queue_pos); // note: these entries may not yet be written.
//write_header(); // no need to write it now, though...
}
void FileJournal::commit_epoch_finish()
{
dout(10) << "commit_epoch_finish" << endl;
dout(10) << "commit_epoch_finish committed " << ebofs->get_super_epoch()-1 << endl;
write_lock.Lock();
{
// update header
header.epoch1 = ebofs->super_epoch;
header.top1 = header.top2;
header.epoch2 = 0;
header.top2 = 0;
if (full) {
// full journal damage control.
dout(15) << " journal was FULL, contents now committed, clearing header. journal still not usable until next epoch." << endl;
header.clear();
write_pos = queue_pos = sizeof(header_t);
} else {
// update header -- trim/discard old (committed) epochs
while (header.epoch[0] < ebofs->get_super_epoch())
header.pop();
}
write_header();
// flush any unwritten items in previous epoch
while (!writeq.empty() &&
writeq.front().first < ebofs->super_epoch) {
dout(15) << " dropping uncommitted journal item from prior epoch" << endl;
writeq.pop_front();
// discard any unwritten items in previous epoch, and do callbacks
epoch_t epoch = ebofs->get_super_epoch();
list<Context*> callbacks;
while (!writeq.empty() && writeq.front().first < epoch) {
dout(15) << " dropping unwritten and committed "
<< write_pos << " : " << writeq.front().second.length()
<< " epoch " << writeq.front().first
<< endl;
// finisher?
Context *oncommit = commitq.front();
if (oncommit) callbacks.push_back(oncommit);
write_pos += 2*sizeof(entry_header_t) + writeq.front().second.length();
// discard.
writeq.pop_front();
commitq.pop_front();
if (oncommit) {
oncommit->finish(0);
delete oncommit;
}
}
// queue the finishers
ebofs->queue_finishers(callbacks);
}
write_lock.Unlock();
}
void FileJournal::make_writeable()
{
if (read_pos)
write_pos = queue_pos = read_pos;
else
write_pos = queue_pos = sizeof(header_t);
read_pos = 0;
}
bool FileJournal::read_entry(bufferlist& bl, epoch_t& epoch)
{
if (!read_pos) {
dout(1) << "read_entry -- not readable" << endl;
make_writeable();
return false;
}
if (read_pos == header.wrap) {
// find wrap point
for (int i=1; i<header.num; i++) {
if (header.offset[i] < read_pos) {
assert(header.offset[i-1] < read_pos);
read_pos = header.offset[i];
break;
}
}
assert(read_pos != header.wrap);
dout(10) << "read_entry wrapped from " << header.wrap << " to " << read_pos << endl;
}
// header
entry_header_t h;
::lseek(fd, read_pos, SEEK_SET);
::read(fd, &h, sizeof(h));
if (!h.check_magic(read_pos, header.fsid)) {
dout(1) << "read_entry " << read_pos << " : bad header magic, end of journal" << endl;
make_writeable();
return false;
}
// body
bufferptr bp(h.len);
::read(fd, bp.c_str(), h.len);
// footer
entry_header_t f;
::read(fd, &f, sizeof(h));
if (!f.check_magic(read_pos, header.fsid) ||
h.epoch != f.epoch ||
h.len != f.len) {
dout(1) << "read_entry " << read_pos << " : bad footer magic, partially entry, end of journal" << endl;
make_writeable();
return false;
}
// yay!
dout(1) << "read_entry " << read_pos << " : "
<< " " << h.len << " bytes"
<< " epoch " << h.epoch
<< endl;
bl.push_back(bp);
epoch = h.epoch;
read_pos += 2*sizeof(entry_header_t) + h.len;
return true;
}

View File

@ -18,24 +18,76 @@
#include "Journal.h"
#include "common/Cond.h"
#include "common/Mutex.h"
#include "common/Thread.h"
class FileJournal : public Journal {
public:
/** log header
* we allow 3 pointers:
* top/initial,
* one for an epoch boundary,
* and one for a wrap in the ring buffer/journal file.
* the epoch boundary one is useful only for speedier recovery in certain cases
* (i.e. when ebofs committed, but the journal didn't rollover ... very small window!)
*/
struct header_t {
epoch_t epoch1;
off_t top1;
epoch_t epoch2;
off_t top2;
uint64_t fsid;
int num;
off_t wrap;
off_t max_size;
epoch_t epoch[3];
off_t offset[3];
header_t() : fsid(0), num(0), wrap(0), max_size(0) {}
void clear() {
num = 0;
wrap = 0;
}
void pop() {
if (num >= 2 && offset[0] > offset[1])
wrap = 0; // we're eliminating a wrap
num--;
for (int i=0; i<num; i++) {
epoch[i] = epoch[i+1];
offset[i] = offset[i+1];
}
}
void push(epoch_t e, off_t o) {
assert(num < 3);
epoch[num] = e;
offset[num] = o;
num++;
}
} header;
struct entry_header_t {
uint64_t epoch;
uint64_t len;
uint64_t magic1;
uint64_t magic2;
void make_magic(off_t pos, uint64_t fsid) {
magic1 = pos;
magic2 = fsid ^ epoch ^ len;
}
bool check_magic(off_t pos, uint64_t fsid) {
return
magic1 == (uint64_t)pos &&
magic2 == (fsid ^ epoch ^ len);
}
};
private:
string fn;
off_t max_size;
off_t top; // byte of first entry chronologically
off_t bottom; // byte where next entry goes
off_t committing_to; // offset of epoch boundary, if we are committing
bool full;
off_t write_pos; // byte where next entry written goes
off_t queue_pos; // byte where next entry queued for write goes
off_t read_pos; //
int fd;
@ -47,39 +99,44 @@ private:
Cond write_cond;
bool write_stop;
void print_header();
void read_header();
void write_header();
void start_writer();
void stop_writer();
void write_thread_entry();
void make_writeable();
class Writer : public Thread {
FileJournal *journal;
public:
Writer(FileJournal *fj) : journal(fj) {}
void *entry() {
journal->write_thread();
journal->write_thread_entry();
return 0;
}
} write_thread;
public:
FileJournal(Ebofs *e, char *f, off_t sz) :
Journal(e),
fn(f), max_size(sz),
top(0), bottom(0), committing_to(0),
FileJournal(Ebofs *e, char *f) :
Journal(e), fn(f),
full(false),
write_pos(0), queue_pos(0), read_pos(0),
fd(0),
write_stop(false), write_thread(this)
{ }
write_stop(false), write_thread(this) { }
~FileJournal() {}
void create();
void open();
int create();
int open();
void close();
// writes
void submit_entry(bufferlist& e, Context *oncommit); // submit an item
void commit_epoch_start(); // mark epoch boundary
void commit_epoch_finish(); // mark prior epoch as committed (we can expire)
bool submit_entry(bufferlist& e, Context *oncommit); // submit an item
void commit_epoch_start(); // mark epoch boundary
void commit_epoch_finish(); // mark prior epoch as committed (we can expire)
bool read_entry(bufferlist& bl, epoch_t& e);
// reads
};

View File

@ -16,22 +16,28 @@
#ifndef __EBOFS_JOURNAL_H
#define __EBOFS_JOURNAL_H
class Ebofs;
#include "include/buffer.h"
#include "include/Context.h"
class Journal {
protected:
Ebofs *ebofs;
public:
public:
Journal(Ebofs *e) : ebofs(e) { }
virtual ~Journal() { }
virtual void create() = 0;
virtual void open() = 0;
virtual int create() = 0;
virtual int open() = 0;
virtual void close() = 0;
// writes
virtual void submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item
virtual bool submit_entry(bufferlist& e, Context *oncommit) = 0;// submit an item
virtual void commit_epoch_start() = 0; // mark epoch boundary
virtual void commit_epoch_finish(list<Context*>& ls) = 0; // mark prior epoch as committed (we can expire)
virtual void commit_epoch_finish() = 0; // mark prior epoch as committed (we can expire)
virtual bool read_entry(bufferlist& bl, epoch_t &e) = 0;
// reads/recovery

View File

@ -145,6 +145,7 @@ int main(int argc, char **argv)
char *filename = args[0];
int seconds = atoi(args[1]);
int threads = atoi(args[2]);
if (!threads) threads = 1;
cout << "dev " << filename << " .. " << threads << " threads .. " << seconds << " seconds" << endl;
@ -153,7 +154,7 @@ int main(int argc, char **argv)
// explicit tests
if (1) {
if (0) {
// verify that clone() plays nice with partial writes
object_t oid(1,1);
bufferptr bp(10000);

View File

@ -142,15 +142,16 @@ static const int EBOFS_FREE_BUCKET_BITS = 2;
struct ebofs_super {
unsigned s_magic;
unsigned epoch; // version of this superblock.
uint64_t s_magic;
uint64_t fsid;
unsigned num_blocks; /* # blocks in filesystem */
epoch_t epoch; // version of this superblock.
uint64_t num_blocks; /* # blocks in filesystem */
// some basic stats, for kicks
unsigned free_blocks; /* unused blocks */
unsigned limbo_blocks; /* limbo blocks */
uint64_t free_blocks; /* unused blocks */
uint64_t limbo_blocks; /* limbo blocks */
//unsigned num_objects;
//unsigned num_fragmented;

View File

@ -919,6 +919,14 @@ inline void _decode(std::string& s, bufferlist& bl, int& off)
off += len+1;
}
// const char* (encode only, string compatible)
inline void _encode(const char *s, bufferlist& bl)
{
uint32_t len = strlen(s);
_encoderaw(len, bl);
bl.append(s, len+1);
}
// bufferptr (encapsulated)
inline void _encode(bufferptr& bp, bufferlist& bl)
{

View File

@ -3402,7 +3402,6 @@ void OSD::prepare_op_transaction(ObjectStore::Transaction& t,
switch (op->get_op()) {
case OSD_OP_WRLOCK:
{ // lock object
//r = store->setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t), oncommit);
t.setattr(oid, "wrlock", &op->get_client(), sizeof(entity_name_t));
}
break;

View File

@ -102,14 +102,29 @@ public:
list<off_t> offsets;
list<size_t> lengths;
list<const char*> attrnames;
list<string> attrnames2;
//list< pair<const void*,int> > attrvals;
list<bufferlist> attrbls;
// for reads only (not encoded)
list<bufferlist*> pbls;
list<struct stat*> psts;
list< pair<void*,int*> > pattrvals;
list< map<string,bufferptr>* > pattrsets;
const char *get_attrname() {
if (attrnames.empty())
return attrnames2.front().c_str();
else
return attrnames.front();
}
void pop_attrname() {
if (attrnames.empty())
attrnames2.pop_front();
else
attrnames.pop_front();
}
void read(object_t oid, off_t off, size_t len, bufferlist *pbl) {
int op = OP_READ;
ops.push_back(op);
@ -232,6 +247,27 @@ public:
}
// etc.
void _encode(bufferlist& bl) {
::_encode(ops, bl);
::_encode(bls, bl);
::_encode(oids, bl);
::_encode(cids, bl);
::_encode(offsets, bl);
::_encode(lengths, bl);
::_encode(attrnames, bl);
::_encode(attrbls, bl);
}
void _decode(bufferlist& bl, int& off) {
::_decode(ops, bl, off);
::_decode(bls, bl, off);
::_decode(oids, bl, off);
::_decode(cids, bl, off);
::_decode(offsets, bl, off);
::_decode(lengths, bl, off);
::_decode(attrnames2, bl, off);
::_decode(attrbls, bl, off);
}
};
@ -264,7 +300,7 @@ public:
case Transaction::OP_GETATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
const char *attrname = t.get_attrname(); t.pop_attrname();
pair<void*,int*> pattrval = t.pattrvals.front(); t.pattrvals.pop_front();
*pattrval.second = getattr(oid, attrname, pattrval.first, *pattrval.second);
}
@ -314,7 +350,7 @@ public:
case Transaction::OP_SETATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
const char *attrname = t.get_attrname(); t.pop_attrname();
//pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
bufferlist bl;
bl.claim( t.attrbls.front() );
@ -333,7 +369,7 @@ public:
case Transaction::OP_RMATTR:
{
object_t oid = t.oids.front(); t.oids.pop_front();
const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
const char *attrname = t.get_attrname(); t.pop_attrname();
rmattr(oid, attrname, 0);
}
break;
@ -379,7 +415,7 @@ public:
case Transaction::OP_COLL_SETATTR:
{
coll_t cid = t.cids.front(); t.cids.pop_front();
const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
const char *attrname = t.get_attrname(); t.pop_attrname();
//pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
bufferlist bl;
bl.claim( t.attrbls.front() );
@ -391,7 +427,7 @@ public:
case Transaction::OP_COLL_RMATTR:
{
coll_t cid = t.cids.front(); t.cids.pop_front();
const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
const char *attrname = t.get_attrname(); t.pop_attrname();
collection_rmattr(cid, attrname, 0);
}
break;