import mars-50.tgz

This commit is contained in:
Thomas Schoebel-Theuer 2010-12-15 12:58:22 +01:00
parent 4b59c62d92
commit f8aabf8426
15 changed files with 380 additions and 4267 deletions

View File

@ -579,6 +579,11 @@ GENERIC_OBJECT_FUNCTIONS(generic);
// some helpers
#undef spin_lock_irqsave
#define spin_lock_irqsave(l,f) spin_lock(l)
#undef spin_unlock_irqrestore
#define spin_unlock_irqrestore(l,f) spin_unlock(l)
#ifdef CONFIG_DEBUG_SPINLOCK
# define LOCK_CHECK(OP) \
@ -595,13 +600,11 @@ GENERIC_OBJECT_FUNCTIONS(generic);
} \
atomic_inc(&current->lock_count); \
(void)flags; \
/*spin_lock(spinlock);*/ \
spin_lock_irqsave(spinlock, flags); \
} while (0)
# define traced_unlock(spinlock,flags) \
do { \
/*spin_unlock(spinlock);*/ \
spin_unlock_irqrestore(spinlock, flags); \
atomic_dec(&current->lock_count); \
} while (0)

218
log_format.h Normal file
View File

@ -0,0 +1,218 @@
// (c) 2010 Thomas Schoebel-Theuer / 1&1 Internet AG
#ifndef LOG_FORMAT_H
#define LOG_FORMAT_H
#include "mars.h"
/* The following structure is memory-only.
* Transfers to disk are indirectly via the
* format conversion functions below.
* The advantage is that even newer disk formats can be parsed
* by old code (of course, not all information / features will be
* available then).
*/
struct log_header {
struct timespec l_stamp;
loff_t l_pos;
int l_len;
int l_code;
};
/* Bookkeeping status between calls
*/
struct log_status {
struct mars_input *input;
struct mars_output hidden_output;
struct generic_object_layout ref_object_layout;
struct mars_info info;
loff_t log_pos;
int validflag_offset;
int reallen_offset;
int payload_offset;
int payload_len;
struct mars_ref_object *log_mref;
};
#define FORMAT_VERSION 1 // version of disk format, currently there is no other one
#define CODE_UNKNOWN 0
#define CODE_WRITE_NEW 1
#define CODE_WRITE_OLD 2
#define START_MAGIC 0xa8f7e908d9177957ll
#define END_MAGIC 0x74941fb74ab5726dll
#define OVERHEAD \
( \
sizeof(START_MAGIC) + \
sizeof(char) * 2 + \
sizeof(short) + \
sizeof(int) + \
sizeof(struct log_header) + \
sizeof(END_MAGIC) + \
sizeof(char) * 2 + \
sizeof(short) + \
sizeof(int) + \
sizeof(struct timespec) + \
0 \
)
// TODO: make this bytesex-aware.
#define DATA_PUT(data,offset,val) \
do { \
*((typeof(val)*)(data+offset)) = val; \
offset += sizeof(val); \
} while (0)
#define DATA_GET(data,offset,val) \
do { \
val = *((typeof(val)*)(data+offset)); \
offset += sizeof(val); \
} while (0)
static inline
void log_skip(struct log_status *logst)
{
int bits;
if (!logst->info.transfer_size) {
int status = GENERIC_INPUT_CALL(logst->input, mars_get_info, &logst->info);
if (status < 0) {
MARS_FAT("cannot get transfer log info (code=%d)\n", status);
}
}
bits = logst->info.transfer_order + PAGE_SHIFT;
logst->log_pos = ((logst->log_pos >> bits) + 1) << bits;
}
static inline
void *log_reserve(struct log_status *logst, struct log_header *l)
{
struct mars_ref_object *mref;
void *data;
int total_len;
int status;
int offset;
MARS_DBG("reserving %d bytes at %lld\n", l->l_len, logst->log_pos);
if (unlikely(logst->log_mref)) {
MARS_ERR("mref already existing\n");
goto err;
}
mref = mars_alloc_mars_ref(&logst->hidden_output, &logst->ref_object_layout);
if (unlikely(!mref))
goto err;
mref->ref_pos = logst->log_pos;
total_len = l->l_len + OVERHEAD;
mref->ref_len = total_len;
mref->ref_may_write = WRITE;
status = GENERIC_INPUT_CALL(logst->input, mars_ref_get, mref);
if (unlikely(status < 0)) {
goto err_free;
}
if (unlikely(mref->ref_len < total_len)) {
goto put;
}
logst->log_mref = mref;
data = mref->ref_data;
offset = 0;
DATA_PUT(data, offset, START_MAGIC);
DATA_PUT(data, offset, (char)FORMAT_VERSION);
logst->validflag_offset = offset;
DATA_PUT(data, offset, (char)0); // valid_flag
DATA_PUT(data, offset, (short)0); // spare
DATA_PUT(data, offset, total_len); // start of next header
DATA_PUT(data, offset, l->l_stamp.tv_sec);
DATA_PUT(data, offset, l->l_stamp.tv_nsec);
DATA_PUT(data, offset, l->l_pos);
logst->reallen_offset = offset;
DATA_PUT(data, offset, l->l_len);
DATA_PUT(data, offset, l->l_code);
logst->payload_offset = offset;
logst->payload_len = l->l_len;
return data + offset;
put:
GENERIC_INPUT_CALL(logst->input, mars_ref_put, mref);
return NULL;
err_free:
mars_free_mars_ref(mref);
err:
return NULL;
}
static inline
bool log_finalize(struct log_status *logst, int len, void (*endio)(struct generic_callback *cb), void *private)
{
struct mars_ref_object *mref = logst->log_mref;
struct generic_callback *cb;
struct timespec now;
void *data;
int offset;
bool ok = false;
CHECK_PTR(mref, err);
logst->log_mref = NULL;
if (unlikely(len > logst->payload_len)) {
MARS_ERR("trying to write more than reserved (%d > %d)\n", len, logst->payload_len);
goto put;
}
data = mref->ref_data;
/* Correct the length in the header.
*/
offset = logst->reallen_offset;
DATA_PUT(data, offset, len);
/* Write the trailer.
*/
offset = logst->payload_offset + len;
DATA_PUT(data, offset, END_MAGIC);
DATA_PUT(data, offset, (char)1); // valid_flag copy
DATA_PUT(data, offset, (char)0); // spare
DATA_PUT(data, offset, (short)0); // spare
DATA_PUT(data, offset, (int)0); // spare
now = CURRENT_TIME; // when the log entry was ready.
DATA_PUT(data, offset, now.tv_sec);
DATA_PUT(data, offset, now.tv_nsec);
logst->log_pos += offset;
/* This must come last. In case of incomplete
* or even operlapping disk transfers, this indicates
* the completeness / integrity of the payload at
* the time of starting the transfer.
*/
offset = logst->validflag_offset;
DATA_PUT(data, offset, (char)1);
cb = &mref->_ref_cb;
cb->cb_fn = endio;
cb->cb_error = 0;
cb->cb_prev = NULL;
cb->cb_private = private;
mref->ref_cb = cb;
mref->ref_rw = 1;
GENERIC_INPUT_CALL(logst->input, mars_ref_io, mref);
ok = true;
put:
GENERIC_INPUT_CALL(logst->input, mars_ref_put, mref);
err:
return ok;
}
#endif

11
mars.h
View File

@ -71,6 +71,7 @@ struct mars_ref_object_layout {
atomic_t ref_count; \
/* callback part */ \
struct generic_callback *ref_cb; \
struct generic_callback _ref_cb; \
struct mars_ref_object {
MARS_REF_OBJECT(mars_ref);
@ -168,9 +169,15 @@ GENERIC_ASPECT_FUNCTIONS(BRICK,mars_ref); \
GENERIC_OBJECT_FUNCTIONS(mars_ref);
// instantiate a pseudo base-class "mars"
_MARS_TYPES(mars);
GENERIC_OBJECT_LAYOUT_FUNCTIONS(mars);
GENERIC_ASPECT_FUNCTIONS(mars,mars_ref);
/////////////////////////////////////////////////////////////////////////
// MARS-specific helper functions
// MARS-specific helpers
#define MARS_MAKE_STATICS(BRICK) \
\
@ -194,7 +201,7 @@ static const struct generic_aspect_type *BRICK##_aspect_types[BRICK_OBJ_NR] = {
int test = atomic_read(atom); \
if (test OP (minval)) { \
atomic_set(atom, minval); \
MARS_ERR("%d: atomic " #atom " " #OP " " #minval " (instead %d)\n", __LINE__, test); \
MARS_ERR("%d: atomic " #atom " " #OP " " #minval " (%d)\n", __LINE__, test); \
} \
} while (0)

File diff suppressed because it is too large Load Diff

View File

@ -1,86 +0,0 @@
// (c) 2010 Thomas Schoebel-Theuer / 1&1 Internet AG
#ifndef MARS_BUF_H
#define MARS_BUF_H
#include <linux/list.h>
#include <asm/atomic.h>
//#define MARS_BUF_HASH_MAX 512
#define MARS_BUF_HASH_MAX 2048
struct buf_mars_ref_aspect {
GENERIC_ASPECT(mars_ref);
struct buf_head *rfa_bf;
struct list_head rfa_pending_head;
struct list_head tmp_head;
struct generic_callback cb;
};
struct cache_anchor {
spinlock_t hash_lock;
struct list_head hash_anchor;
};
struct buf_brick {
MARS_BRICK(buf);
/* brick parameters */
int backing_order;
int backing_size;
int max_count;
/* internals */
spinlock_t brick_lock;
atomic_t alloc_count;
atomic_t hashed_count;
atomic_t lru_count;
atomic_t nr_io_pending;
atomic_t nr_collisions;
struct generic_object_layout mref_object_layout;
// lists for caching
struct list_head free_anchor; // members are not hashed
struct list_head lru_anchor; // members are hashed and not in use
struct cache_anchor cache_anchors[MARS_BUF_HASH_MAX]; // hash table
// for creation of bios
struct mars_info base_info;
struct block_device *bdev;
int bvec_max;
// statistics
unsigned long last_jiffies;
atomic_t hit_count;
atomic_t miss_count;
atomic_t io_count;
};
struct buf_input {
MARS_INPUT(buf);
};
struct buf_output {
MARS_OUTPUT(buf);
};
MARS_TYPES(buf);
struct buf_head {
void *bf_data;
spinlock_t bf_lock;
struct buf_brick *bf_brick;
loff_t bf_pos;
loff_t bf_base_index;
int bf_flags;
atomic_t bf_count;
int bf_bio_status;
atomic_t bf_bio_count;
// lists for caching
//struct list_head bf_mref_anchor; // all current mref members
struct list_head bf_lru_head;
struct list_head bf_hash_head;
// lists for IO
struct list_head bf_io_pending_anchor;
struct list_head bf_postpone_anchor;
};
#endif

File diff suppressed because it is too large Load Diff

View File

@ -70,7 +70,7 @@ static void device_aio_ref_put(struct device_aio_output *output, struct mars_ref
device_aio_free_mars_ref(mref);
}
static void device_aio_ref_io(struct device_aio_output *output, struct mars_ref_object *mref, int rw)
static void device_aio_ref_io(struct device_aio_output *output, struct mars_ref_object *mref)
{
struct aio_threadinfo *tinfo = &output->tinfo[0];
struct generic_callback *cb = mref->ref_cb;

View File

@ -1,576 +0,0 @@
// (c) 2010 Thomas Schoebel-Theuer / 1&1 Internet AG
//#define BRICK_DEBUGGING
//#define MARS_DEBUGGING
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/string.h>
#include <linux/list.h>
#include <linux/types.h>
#include <linux/blkdev.h>
#include <linux/highmem.h>
#include <linux/kthread.h>
#include <linux/spinlock.h>
#include <linux/wait.h>
#include <linux/splice.h>
#include "mars.h"
///////////////////////// own type definitions ////////////////////////
#include "mars_device_sio.h"
////////////////// own brick / input / output operations //////////////////
// some code borrowed from the loopback driver
static int transfer_none(int cmd,
struct page *raw_page, unsigned raw_off,
struct page *loop_page, unsigned loop_off,
int size)
{
char *raw_buf = kmap_atomic(raw_page, KM_USER0) + raw_off;
char *loop_buf = kmap_atomic(loop_page, KM_USER1) + loop_off;
if (unlikely(!raw_buf || !loop_buf)) {
MARS_ERR("transfer NULL: %p %p\n", raw_buf, loop_buf);
return -EFAULT;
}
#if 1
if (cmd == READ)
memcpy(loop_buf, raw_buf, size);
else
memcpy(raw_buf, loop_buf, size);
#endif
kunmap_atomic(raw_buf, KM_USER0);
kunmap_atomic(loop_buf, KM_USER1);
cond_resched();
return 0;
}
static void write_aops(struct device_sio_output *output, struct mars_ref_object *mref)
{
struct bio *bio = mref->orig_bio;
loff_t pos = ((loff_t)bio->bi_sector << 9);
struct file *file = output->filp;
struct address_space *mapping;
struct bio_vec *bvec;
int i;
int ret = 0;
if (unlikely(!file)) {
MARS_FAT("No FILE\n");
return;
}
mapping = file->f_mapping;
MARS_DBG("write_aops pos=%llu len=%d\n", pos, bio->bi_size);
mutex_lock(&mapping->host->i_mutex);
bio_for_each_segment(bvec, bio, i) {
//pgoff_t index;
unsigned offset, bv_offs;
int len;
//index = pos >> PAGE_CACHE_SHIFT;
offset = pos & ((pgoff_t)PAGE_CACHE_SIZE - 1);
bv_offs = bvec->bv_offset;
len = bvec->bv_len;
while (len > 0) {
int transfer_result;
unsigned size, copied;
struct page *page;
void *fsdata;
size = PAGE_CACHE_SIZE - offset;
if (size > len)
size = len;
ret = pagecache_write_begin(file, mapping, pos, size, 0,
&page, &fsdata);
if (ret) {
MARS_ERR("cannot start pagecache_write_begin() error=%d\n", ret);
if (ret >= 0)
ret = -EIO;
goto fail;
}
//file_update_time(file);
transfer_result = transfer_none(WRITE, page, offset, bvec->bv_page, bv_offs, size);
copied = size;
if (transfer_result) {
MARS_ERR("transfer error %d\n", transfer_result);
copied = 0;
}
ret = pagecache_write_end(file, mapping, pos, size, copied,
page, fsdata);
if (ret < 0 || ret != copied || transfer_result) {
MARS_ERR("write error %d\n", ret);
if (ret >= 0)
ret = -EIO;
goto fail;
}
bv_offs += copied;
len -= copied;
offset = 0;
//index++;
pos += copied;
}
ret = 0;
}
fail:
mutex_unlock(&mapping->host->i_mutex);
mref->ref_cb->cb_error = ret;
#if 1
blk_run_address_space(mapping);
#endif
}
struct cookie_data {
struct device_sio_output *output;
struct mars_ref_object *mref;
struct bio_vec *bvec;
unsigned int offset;
};
static int
device_sio_splice_actor(struct pipe_inode_info *pipe,
struct pipe_buffer *buf,
struct splice_desc *sd)
{
struct cookie_data *p = sd->u.data;
struct page *page = buf->page;
sector_t IV;
int size, ret;
ret = buf->ops->confirm(pipe, buf);
if (unlikely(ret))
return ret;
IV = ((sector_t) page->index << (PAGE_CACHE_SHIFT - 9)) +
(buf->offset >> 9);
size = sd->len;
if (size > p->bvec->bv_len)
size = p->bvec->bv_len;
if (transfer_none(READ, page, buf->offset, p->bvec->bv_page, p->offset, size)) {
MARS_ERR("transfer error block %ld\n", p->bvec->bv_page->index);
size = -EINVAL;
}
flush_dcache_page(p->bvec->bv_page);
if (size > 0)
p->offset += size;
return size;
}
static int
device_sio_direct_splice_actor(struct pipe_inode_info *pipe, struct splice_desc *sd)
{
return __splice_from_pipe(pipe, sd, device_sio_splice_actor);
}
static void read_aops(struct device_sio_output *output, struct mars_ref_object *mref)
{
struct bio *bio = mref->orig_bio;
loff_t pos = ((loff_t)bio->bi_sector << 9); // TODO: make dynamic
struct bio_vec *bvec;
int i;
int ret = -EIO;
bio_for_each_segment(bvec, bio, i) {
struct cookie_data cookie = {
.output = output,
.mref = mref,
.bvec = bvec,
.offset = bvec->bv_offset,
};
struct splice_desc sd = {
.len = 0,
.total_len = bvec->bv_len,
.flags = 0,
.pos = pos,
.u.data = &cookie,
};
MARS_DBG("start splice %p %p %p %p\n", output, mref, bio, bvec);
ret = 0;
ret = splice_direct_to_actor(output->filp, &sd, device_sio_direct_splice_actor);
if (unlikely(ret < 0)) {
MARS_ERR("splice %p %p %p %p status=%d\n", output, mref, bio, bvec, ret);
break;
}
pos += bvec->bv_len;
bio->bi_size -= bvec->bv_len;
}
if (unlikely(bio->bi_size)) {
MARS_ERR("unhandled rest size %d on bio %p\n", bio->bi_size, bio);
}
mref->ref_cb->cb_error = ret;
}
static void sync_file(struct device_sio_output *output)
{
struct file *file = output->filp;
int ret;
#if 1
ret = vfs_fsync(file, file->f_path.dentry, 1);
if (unlikely(ret)) {
MARS_ERR("syncing pages failed: %d\n", ret);
}
return;
#endif
}
static void device_sio_ref_io(struct device_sio_output *output, struct mars_ref_object *mref, int rw)
{
struct bio *bio = mref->orig_bio;
struct generic_callback *cb = mref->ref_cb;
bool barrier = (rw != READ && bio_rw_flagged(bio, BIO_RW_BARRIER));
int test;
if (unlikely(!output->filp)) {
cb->cb_error = -EINVAL;
goto done;
}
#if 1
MARS_INF("got BIO %2lu %12ld %4d\n", bio->bi_rw, bio->bi_sector, bio->bi_size);
#endif
/* Shortcut when possible
*/
if (output->allow_bio && S_ISBLK(output->filp->f_mapping->host->i_mode)) {
struct block_device *bdev = output->filp->f_mapping->host->i_bdev;
#if 0
static int count = 10;
if (count-- > 0)
MARS_INF("AHA: %p\n", bdev);
#endif
bio->bi_bdev = bdev;
submit_bio(bio->bi_rw, bio);
return;
}
if (barrier) {
MARS_INF("got barrier request\n");
sync_file(output);
}
if (rw == READ) {
read_aops(output, mref);
} else {
write_aops(output, mref);
if (barrier || output->o_fdsync)
sync_file(output);
}
done:
#if 1
if (cb->cb_error < 0)
MARS_ERR("IO error %d\n", cb->cb_error);
#endif
cb->cb_fn(cb);
test = atomic_read(&mref->ref_count);
if (test <= 0) {
MARS_ERR("ref_count UNDERRUN %d\n", test);
atomic_set(&mref->ref_count, 1);
}
if (!atomic_dec_and_test(&mref->ref_count))
return;
device_sio_free_mars_ref(mref);
}
static void device_sio_mars_queue(struct device_sio_output *output, struct mars_ref_object *mref, int rw)
{
int index = 0;
struct sio_threadinfo *tinfo;
struct device_sio_mars_ref_aspect *mref_a;
struct generic_callback *cb = mref->ref_cb;
unsigned long flags;
if (rw == READ) {
traced_lock(&output->g_lock, flags);
index = output->index++;
traced_unlock(&output->g_lock, flags);
index = (index % WITH_THREAD) + 1;
}
mref_a = device_sio_mars_ref_get_aspect(output, mref);
if (unlikely(!mref_a)) {
MARS_FAT("cannot get aspect\n");
cb->cb_error = -EINVAL;
cb->cb_fn(cb);
return;
}
tinfo = &output->tinfo[index];
MARS_DBG("queueing %p on %d\n", mref, index);
traced_lock(&tinfo->lock, flags);
mref->ref_rw = rw;
list_add_tail(&mref_a->io_head, &tinfo->mref_list);
traced_unlock(&tinfo->lock, flags);
wake_up(&tinfo->event);
}
static int device_sio_thread(void *data)
{
struct sio_threadinfo *tinfo = data;
struct device_sio_output *output = tinfo->output;
MARS_INF("kthread has started.\n");
//set_user_nice(current, -20);
while (!kthread_should_stop()) {
struct list_head *tmp = NULL;
struct mars_ref_object *mref;
struct device_sio_mars_ref_aspect *mref_a;
unsigned long flags;
wait_event_interruptible_timeout(
tinfo->event,
!list_empty(&tinfo->mref_list) || kthread_should_stop(),
HZ);
tinfo->last_jiffies = jiffies;
traced_lock(&tinfo->lock, flags);
if (!list_empty(&tinfo->mref_list)) {
tmp = tinfo->mref_list.next;
list_del_init(tmp);
}
traced_unlock(&tinfo->lock, flags);
if (!tmp)
continue;
mref_a = container_of(tmp, struct device_sio_mars_ref_aspect, io_head);
mref = mref_a->object;
MARS_DBG("got %p %p\n", mref_a, mref);
device_sio_ref_io(output, mref, mref->ref_rw);
}
MARS_INF("kthread has stopped.\n");
return 0;
}
static int device_sio_watchdog(void *data)
{
struct device_sio_output *output = data;
MARS_INF("watchdog has started.\n");
while (!kthread_should_stop()) {
int i;
msleep_interruptible(5000);
for (i = 0; i <= WITH_THREAD; i++) {
struct sio_threadinfo *tinfo = &output->tinfo[i];
unsigned long now = jiffies;
unsigned long elapsed = now - tinfo->last_jiffies;
if (elapsed > 10 * HZ) {
tinfo->last_jiffies = now;
MARS_ERR("thread %d is dead for more than 10 seconds.\n", i);
}
}
}
return 0;
}
static int device_sio_get_info(struct device_sio_output *output, struct mars_info *info)
{
struct file *file = output->filp;
info->current_size = i_size_read(file->f_mapping->host);
info->backing_file = file;
return 0;
}
//////////////// object / aspect constructors / destructors ///////////////
static int device_sio_mars_ref_aspect_init_fn(struct generic_aspect *_ini, void *_init_data)
{
struct device_sio_mars_ref_aspect *ini = (void*)_ini;
INIT_LIST_HEAD(&ini->io_head);
return 0;
}
static void device_sio_mars_ref_aspect_exit_fn(struct generic_aspect *_ini, void *_init_data)
{
struct device_sio_mars_ref_aspect *ini = (void*)_ini;
(void)ini;
#if 1
CHECK_HEAD_EMPTY(&ini->io_head);
#endif
}
MARS_MAKE_STATICS(device_sio);
////////////////////// brick constructors / destructors ////////////////////
static int device_sio_brick_construct(struct device_sio_brick *brick)
{
return 0;
}
static int device_sio_switch(struct device_sio_brick *brick, bool state)
{
struct device_sio_output *output = brick->outputs[0];
char *path = output->output_name;
int flags = O_CREAT | O_RDWR | O_LARGEFILE;
int prot = 0600;
mm_segment_t oldfs;
if (output->o_direct) {
flags |= O_DIRECT;
MARS_INF("using O_DIRECT on %s\n", path);
}
if (state) {
oldfs = get_fs();
set_fs(get_ds());
output->filp = filp_open(path, flags, prot);
set_fs(oldfs);
if (IS_ERR(output->filp)) {
int err = PTR_ERR(output->filp);
MARS_ERR("can't open file '%s' status=%d\n", path, err);
output->filp = NULL;
return err;
}
#if 0
{
struct address_space *mapping = output->filp->f_mapping;
int old_gfp_mask = mapping_gfp_mask(mapping);
mapping_set_gfp_mask(mapping, old_gfp_mask & ~(__GFP_IO|__GFP_FS));
}
#endif
MARS_INF("opened file '%s'\n", path);
} else {
// TODO: close etc...
}
return 0;
}
static int device_sio_output_construct(struct device_sio_output *output)
{
struct task_struct *watchdog;
int index;
spin_lock_init(&output->g_lock);
output->index = 0;
for (index = 0; index <= WITH_THREAD; index++) {
struct sio_threadinfo *tinfo = &output->tinfo[index];
tinfo->output = output;
spin_lock_init(&tinfo->lock);
init_waitqueue_head(&tinfo->event);
INIT_LIST_HEAD(&tinfo->mref_list);
tinfo->last_jiffies = jiffies;
tinfo->thread = kthread_create(device_sio_thread, tinfo, "mars_sio%d", index);
if (IS_ERR(tinfo->thread)) {
int error = PTR_ERR(tinfo->thread);
MARS_ERR("cannot create thread, status=%d\n", error);
filp_close(output->filp, NULL);
return error;
}
wake_up_process(tinfo->thread);
}
watchdog = kthread_create(device_sio_watchdog, output, "mars_watchdog%d", 0);
if (!IS_ERR(watchdog)) {
wake_up_process(watchdog);
}
return 0;
}
static int device_sio_output_destruct(struct device_sio_output *output)
{
int index;
for (index = 0; index <= WITH_THREAD; index++) {
kthread_stop(output->tinfo[index].thread);
output->tinfo[index].thread = NULL;
}
if (output->filp) {
filp_close(output->filp, NULL);
output->filp = NULL;
}
return 0;
}
///////////////////////// static structs ////////////////////////
static struct device_sio_brick_ops device_sio_brick_ops = {
.brick_switch = device_sio_switch,
};
static struct device_sio_output_ops device_sio_output_ops = {
.make_object_layout = device_sio_make_object_layout,
.mars_ref_io = device_sio_mars_queue,
.mars_get_info = device_sio_get_info,
};
const struct device_sio_output_type device_sio_output_type = {
.type_name = "device_sio_output",
.output_size = sizeof(struct device_sio_output),
.master_ops = &device_sio_output_ops,
.output_construct = &device_sio_output_construct,
.output_destruct = &device_sio_output_destruct,
.aspect_types = device_sio_aspect_types,
.layout_code = {
[BRICK_OBJ_MARS_REF] = LAYOUT_NONE,
}
};
static const struct device_sio_output_type *device_sio_output_types[] = {
&device_sio_output_type,
};
const struct device_sio_brick_type device_sio_brick_type = {
.type_name = "device_sio_brick",
.brick_size = sizeof(struct device_sio_brick),
.max_inputs = 0,
.max_outputs = 1,
.master_ops = &device_sio_brick_ops,
.default_output_types = device_sio_output_types,
.brick_construct = &device_sio_brick_construct,
};
EXPORT_SYMBOL_GPL(device_sio_brick_type);
////////////////// module init stuff /////////////////////////
static int __init init_device_sio(void)
{
MARS_INF("init_device_sio()\n");
return device_sio_register_brick_type();
}
static void __exit exit_device_sio(void)
{
MARS_INF("exit_device_sio()\n");
device_sio_unregister_brick_type();
}
MODULE_DESCRIPTION("MARS device_sio brick");
MODULE_AUTHOR("Thomas Schoebel-Theuer <tst@1und1.de>");
MODULE_LICENSE("GPL");
module_init(init_device_sio);
module_exit(exit_device_sio);

View File

@ -1,472 +0,0 @@
// (c) 2010 Thomas Schoebel-Theuer / 1&1 Internet AG
/* Interface to a Linux device.
* 1 Input, 0 Outputs.
*/
//#define BRICK_DEBUGGING
//#define MARS_DEBUGGING
#define LOG
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/string.h>
#include <linux/major.h>
#include <linux/genhd.h>
#include <linux/blkdev.h>
#include "mars.h"
///////////////////////// own type definitions ////////////////////////
#include "mars_if_device.h"
///////////////////////// own static definitions ////////////////////////
static int device_minor = 0;
//////////////// object / aspect constructors / destructors ///////////////
///////////////////////// linux operations ////////////////////////
/* callback
*/
static void _if_device_endio(struct generic_callback *cb)
{
struct if_device_mars_ref_aspect *mref_a = cb->cb_private;
struct if_device_mars_ref_aspect *master_mref_a;
struct bio *bio;
struct bio_vec *bvec;
int i;
int error;
if (unlikely(!mref_a)) {
MARS_FAT("callback with no mref_a called. something is very wrong here!\n");
return;
}
master_mref_a = mref_a->master;
if (unlikely(!master_mref_a)) {
MARS_FAT("master is missing. something is very wrong here!\n");
return;
}
if (cb->cb_error < 0)
master_mref_a->cb.cb_error = cb->cb_error;
if (!atomic_dec_and_test(&master_mref_a->split_count))
goto done;
bio = master_mref_a->orig_bio;
if (unlikely(!bio)) {
MARS_FAT("callback with no bio called. something is very wrong here!\n");
return;
}
bio_for_each_segment(bvec, bio, i) {
kunmap(bvec->bv_page);
}
error = master_mref_a->cb.cb_error;
if (unlikely(error < 0)) {
MARS_ERR("NYI: error=%d RETRY LOGIC %u\n", error, bio->bi_size);
} else { // bio conventions are slightly different...
error = 0;
bio->bi_size = 0;
}
bio_endio(bio, error);
done:
// paired with X1
GENERIC_INPUT_CALL(master_mref_a->input, mars_ref_put, master_mref_a->object);
}
/* accept a linux bio, convert to mref and call buf_io() on it.
*/
static int if_device_make_request(struct request_queue *q, struct bio *bio)
{
LIST_HEAD(tmp_list);
struct if_device_input *input;
struct if_device_brick *brick;
struct mars_ref_object *mref = NULL;
struct if_device_mars_ref_aspect *mref_a;
struct if_device_mars_ref_aspect *master;
struct generic_callback *cb;
struct bio_vec *bvec;
int i;
//bool barrier = ((bio->bi_rw & 1) != READ && bio_rw_flagged(bio, BIO_RW_BARRIER));
loff_t pos = ((loff_t)bio->bi_sector) << 9; // TODO: make dynamic
int rw = bio_data_dir(bio);
int maxlen = 0;
int error = -ENOSYS;
MARS_DBG("make_request(%d)\n", bio->bi_size);
input = q->queuedata;
if (unlikely(!input))
goto err;
brick = input->brick;
if (unlikely(!brick))
goto err;
/* THIS IS PROVISIONARY
*/
while (unlikely(!brick->is_active)) {
msleep(100);
}
#ifdef LOG
{
const unsigned short prio = bio_prio(bio);
const bool sync = bio_rw_flagged(bio, BIO_RW_SYNCIO);
const bool unplug = bio_rw_flagged(bio, BIO_RW_UNPLUG);
const unsigned int ff = bio->bi_rw & REQ_FAILFAST_MASK;
MARS_INF("BIO rw = %lx len = %d prio = %d sync = %d unplug = %d ff = %d\n", bio->bi_rw, bio->bi_size, prio, sync, unplug, ff);
}
#endif
bio_for_each_segment(bvec, bio, i) {
int bv_len = bvec->bv_len;
void *data = kmap(bvec->bv_page);
data += bvec->bv_offset;
while (bv_len > 0) {
int len = bv_len;
#ifdef LOG
MARS_INF("rw = %d i = %d pos = %lld bv_page = %p bv_offset = %d bv_len = %d maxlen = %d mref=%p\n", rw, i, pos, bvec->bv_page, bvec->bv_offset, bv_len, maxlen, mref);
#endif
#if 1 // optimizing
if (mref) { // try to merge with previous bvec
if (len > maxlen) {
len = maxlen;
}
if (mref->ref_data + mref->ref_len == data && len > 0) {
mref->ref_len += len;
#ifdef LOG
MARS_INF("merge %d new ref_len = %d\n", len, mref->ref_len);
#endif
} else {
mref = NULL;
}
}
#else
mref = NULL;
#endif
if (!mref) {
error = -ENOMEM;
mref = if_device_alloc_mars_ref(&brick->hidden_output, &input->mref_object_layout);
if (unlikely(!mref))
goto err;
mref_a = if_device_mars_ref_get_aspect(&brick->hidden_output, mref);
if (unlikely(!mref_a))
goto err;
cb = &mref_a->cb;
cb->cb_fn = _if_device_endio;
cb->cb_private = mref_a;
cb->cb_error = 0;
cb->cb_prev = NULL;
mref->ref_cb = cb;
mref_a->input = input;
mref_a->orig_bio = bio;
mref->ref_rw = mref->ref_may_write = rw;
mref->ref_pos = pos;
mref->ref_len = bv_len;
mref->ref_data = data;
error = GENERIC_INPUT_CALL(input, mars_ref_get, mref);
if (unlikely(error < 0))
goto err;
maxlen = mref->ref_len;
if (len > maxlen)
len = maxlen;
mref->ref_len = len;
list_add_tail(&mref_a->tmp_head, &tmp_list);
// The first mref is called "master". It carries the split_count
mref_a->master = container_of(tmp_list.next, struct if_device_mars_ref_aspect, tmp_head);
atomic_inc(&mref_a->master->split_count);
}
pos += len;
data += len;
bv_len -= len;
maxlen -= len;
} // while bv_len > 0
} // foreach bvec
error = 0;
err:
master = NULL;
if (error < 0) {
MARS_ERR("cannot submit request, status=%d\n", error);
bio_endio(bio, error);
} else {
master = container_of(tmp_list.next, struct if_device_mars_ref_aspect, tmp_head);
// grab one extra reference X2
atomic_inc(&master->object->ref_count);
}
while (!list_empty(&tmp_list)) {
mref_a = container_of(tmp_list.next, struct if_device_mars_ref_aspect, tmp_head);
list_del_init(&mref_a->tmp_head);
mref = mref_a->object;
if (error >= 0) {
// paired with X1
atomic_inc(&mref_a->master->object->ref_count);
GENERIC_INPUT_CALL(input, mars_ref_io, mref, rw);
}
GENERIC_INPUT_CALL(input, mars_ref_put, mref);
}
// drop extra reference X2
if (master)
GENERIC_INPUT_CALL(input, mars_ref_put, master->object);
return error;
}
static int if_device_open(struct block_device *bdev, fmode_t mode)
{
struct if_device_input *input = bdev->bd_disk->private_data;
(void)input;
MARS_DBG("if_device_open()\n");
return 0;
}
static int if_device_release(struct gendisk *gd, fmode_t mode)
{
MARS_DBG("if_device_close()\n");
return 0;
}
static const struct block_device_operations if_device_blkdev_ops = {
.owner = THIS_MODULE,
.open = if_device_open,
.release = if_device_release,
};
////////////////// own brick / input / output operations //////////////////
static void if_device_unplug(struct request_queue *q)
{
//struct if_device_input *input = q->queuedata;
MARS_DBG("UNPLUG\n");
#ifdef LOG
MARS_INF("UNPLUG\n");
#endif
queue_flag_clear_unlocked(QUEUE_FLAG_PLUGGED, q);
//blk_run_address_space(lo->lo_backing_file->f_mapping);
}
//////////////// object / aspect constructors / destructors ///////////////
static int if_device_mars_ref_aspect_init_fn(struct generic_aspect *_ini, void *_init_data)
{
struct if_device_mars_ref_aspect *ini = (void*)_ini;
INIT_LIST_HEAD(&ini->tmp_head);
atomic_set(&ini->split_count, 0);
return 0;
}
static void if_device_mars_ref_aspect_exit_fn(struct generic_aspect *_ini, void *_init_data)
{
struct if_device_mars_ref_aspect *ini = (void*)_ini;
CHECK_HEAD_EMPTY(&ini->tmp_head);
}
MARS_MAKE_STATICS(if_device);
//////////////////////// contructors / destructors ////////////////////////
static int if_device_brick_construct(struct if_device_brick *brick)
{
struct if_device_output *hidden = &brick->hidden_output;
_if_device_output_init(brick, hidden, "internal");
return 0;
}
static int if_device_brick_destruct(struct if_device_brick *brick)
{
return 0;
}
static int if_device_switch(struct if_device_brick *brick, bool state)
{
struct if_device_input *input = brick->inputs[0];
struct request_queue *q;
struct gendisk *disk;
int minor;
struct mars_info info = {};
unsigned long capacity;
int status;
//MARS_DBG("1\n");
status = GENERIC_INPUT_CALL(input, mars_get_info, &info);
if (status < 0) {
MARS_ERR("cannot get device info, status=%d\n", status);
return status;
}
capacity = info.current_size >> 9; // TODO: make this dynamic
q = blk_alloc_queue(GFP_MARS);
if (!q) {
MARS_ERR("cannot allocate device request queue\n");
return -ENOMEM;
}
q->queuedata = input;
input->q = q;
//MARS_DBG("2\n");
disk = alloc_disk(1);
if (!disk) {
MARS_ERR("cannot allocate gendisk\n");
return -ENOMEM;
}
//MARS_DBG("3\n");
minor = device_minor++; //TODO: protect against races (e.g. atomic_t)
disk->queue = q;
disk->major = MARS_MAJOR; //TODO: make this dynamic for >256 devices
disk->first_minor = minor;
disk->fops = &if_device_blkdev_ops;
sprintf(disk->disk_name, "mars%d", minor);
MARS_DBG("created device name %s\n", disk->disk_name);
disk->private_data = input;
set_capacity(disk, capacity);
blk_queue_make_request(q, if_device_make_request);
blk_queue_max_segment_size(q, MARS_MAX_SEGMENT_SIZE);
blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
q->unplug_fn = if_device_unplug;
spin_lock_init(&input->req_lock);
q->queue_lock = &input->req_lock; // needed!
//blk_queue_ordered(q, QUEUE_ORDERED_DRAIN, NULL);//???
//MARS_DBG("4\n");
input->bdev = bdget(MKDEV(disk->major, minor));
/* we have no partitions. we contain only ourselves. */
input->bdev->bd_contains = input->bdev;
#if 0 // ???
q->backing_dev_info.congested_fn = mars_congested;
q->backing_dev_info.congested_data = input;
#endif
#if 0 // ???
blk_queue_merge_bvec(q, mars_merge_bvec);
#endif
// point of no return
//MARS_DBG("99999\n");
add_disk(disk);
input->disk = disk;
//set_device_ro(input->bdev, 0); // TODO: implement modes
brick->is_active = true;
return 0;
}
static int if_device_input_construct(struct if_device_input *input)
{
return 0;
}
static int if_device_input_destruct(struct if_device_input *input)
{
if (input->bdev)
bdput(input->bdev);
if (input->disk) {
del_gendisk(input->disk);
//put_disk(input->disk);
}
if (input->q)
blk_cleanup_queue(input->q);
return 0;
}
static int if_device_output_construct(struct if_device_output *output)
{
return 0;
}
///////////////////////// static structs ////////////////////////
static struct if_device_brick_ops if_device_brick_ops = {
.brick_switch = if_device_switch,
};
static struct if_device_output_ops if_device_output_ops = {
.make_object_layout = if_device_make_object_layout,
};
const struct if_device_input_type if_device_input_type = {
.type_name = "if_device_input",
.input_size = sizeof(struct if_device_input),
.input_construct = &if_device_input_construct,
.input_destruct = &if_device_input_destruct,
};
static const struct if_device_input_type *if_device_input_types[] = {
&if_device_input_type,
};
const struct if_device_output_type if_device_output_type = {
.type_name = "if_device_output",
.output_size = sizeof(struct if_device_output),
.master_ops = &if_device_output_ops,
.output_construct = &if_device_output_construct,
.aspect_types = if_device_aspect_types,
.layout_code = {
[BRICK_OBJ_MARS_REF] = LAYOUT_ALL,
}
};
const struct if_device_brick_type if_device_brick_type = {
.type_name = "if_device_brick",
.brick_size = sizeof(struct if_device_brick),
.max_inputs = 1,
.max_outputs = 0,
.master_ops = &if_device_brick_ops,
.default_input_types = if_device_input_types,
.brick_construct = &if_device_brick_construct,
.brick_destruct = &if_device_brick_destruct,
};
EXPORT_SYMBOL_GPL(if_device_brick_type);
////////////////// module init stuff /////////////////////////
static void __exit exit_if_device(void)
{
int status;
printk(MARS_INFO "exit_if_device()\n");
status = if_device_unregister_brick_type();
unregister_blkdev(DRBD_MAJOR, "mars");
}
static int __init init_if_device(void)
{
int status;
(void)if_device_aspect_types; // not used, shut up gcc
printk(MARS_INFO "init_if_device()\n");
status = register_blkdev(DRBD_MAJOR, "mars");
if (status)
return status;
status = if_device_register_brick_type();
if (status)
goto err_device;
return status;
err_device:
MARS_ERR("init_if_device() status=%d\n", status);
exit_if_device();
return status;
}
MODULE_DESCRIPTION("MARS if_device");
MODULE_AUTHOR("Thomas Schoebel-Theuer <tst@1und1.de>");
MODULE_LICENSE("GPL");
module_init(init_if_device);
module_exit(exit_if_device);

View File

@ -1,39 +0,0 @@
// (c) 2010 Thomas Schoebel-Theuer / 1&1 Internet AG
#ifndef MARS_IF_DEVICE_H
#define MARS_IF_DEVICE_H
#define HT_SHIFT 6 //????
#define MARS_MAX_SEGMENT_SIZE (1U << (9+HT_SHIFT))
struct if_device_mars_ref_aspect {
GENERIC_ASPECT(mars_ref);
struct list_head tmp_head;
struct if_device_mars_ref_aspect *master;
atomic_t split_count;
struct generic_callback cb;
struct bio *orig_bio;
struct if_device_input *input;
};
struct if_device_input {
MARS_INPUT(if_device);
struct request_queue *q;
struct gendisk *disk;
struct block_device *bdev;
spinlock_t req_lock;
struct generic_object_layout mref_object_layout;
};
struct if_device_output {
MARS_OUTPUT(if_device);
};
struct if_device_brick {
MARS_BRICK(if_device);
bool is_active;
struct if_device_output hidden_output;
};
MARS_TYPES(if_device);
#endif

View File

@ -15,12 +15,18 @@
//#define CONF_TEST // use intermediate mars_check bricks
#define CONF_AIO // use device_aio instead of device_sio
#define CONF_BUF
//#define CONF_BUF
#define CONF_BUF_AHEAD // readahead optimization
#define CONF_USEBUF
//#define CONF_USEBUF
//#define CONF_TRANS
//#define CONF_TRANS_FLYING 1
#define CONF_TRANS_LOG_READS false
//#define CONF_TRANS_LOG_READS true
#define CONF_TRANS_FLYING 32
#define CONF_TRANS_MAX_QUEUE 1000
#define CONF_TRANS_MAX_JIFFIES (5 * HZ)
#define CONF_TRANS_SORT
//#define CONF_TBUF
//#define CONF_DIRECT // use O_DIRECT
#define CONF_FDSYNC // use additional aio_fdsync
@ -69,14 +75,11 @@ static struct buf_brick *_buf_brick = NULL;
static struct generic_brick *device_brick = NULL;
static struct device_sio_brick *_device_brick = NULL;
static void test_endio(struct generic_callback *cb)
{
MARS_DBG("test_endio() called! error=%d\n", cb->cb_error);
}
void make_test_instance(void)
{
static char *names[] = { "brick" };
struct generic_output *first = NULL;
struct generic_output *inter = NULL;
struct generic_input *last = NULL;
void *brick(const void *_brick_type)
@ -141,6 +144,7 @@ void make_test_instance(void)
MARS_DBG("starting....\n");
// first
device_brick = brick(&device_sio_brick_type);
_device_brick = (void*)device_brick;
device_brick->outputs[0]->output_name = "/tmp/testfile.img";
@ -151,23 +155,21 @@ void make_test_instance(void)
_device_brick->outputs[0]->o_fdsync = true;
#endif
device_brick->ops->brick_switch(device_brick, true);
first = device_brick->outputs[0];
// last
if_brick = brick(&if_device_brick_type);
last = if_brick->inputs[0];
#ifdef CONF_USEBUF // usebuf zwischenschalten
usebuf_brick = brick(&usebuf_brick_type);
connect(if_brick->inputs[0], usebuf_brick->outputs[0]);
connect(last, usebuf_brick->outputs[0]);
last = usebuf_brick->inputs[0];
#else
(void)usebuf_brick;
(void)tdevice_brick;
(void)_tdevice_brick;
last = if_brick->inputs[0];
#endif
#ifdef CONF_BUF // Standard-buf zwischenschalten
buf_brick = brick(&buf_brick_type);
_buf_brick = (void*)buf_brick;
_buf_brick->outputs[0]->output_name = "/tmp/testfile.img";
@ -182,7 +184,14 @@ void make_test_instance(void)
_buf_brick->optimize_chains = true;
#endif
connect(buf_brick->inputs[0], device_brick->outputs[0]);
connect(buf_brick->inputs[0], first);
first = buf_brick->outputs[0];
#else // CONF_BUF
(void)buf_brick;
(void)_buf_brick;
#endif // CONF_BUF
#ifdef CONF_TRANS // trans_logger plus Infrastruktur zwischenschalten
@ -196,7 +205,9 @@ void make_test_instance(void)
_tdevice_brick->outputs[0]->o_fdsync = true;
#endif
tdevice_brick->ops->brick_switch(tdevice_brick, true);
inter = tdevice_brick->outputs[0];
#ifdef CONF_TBUF
tbuf_brick = brick(&buf_brick_type);
_tbuf_brick = (void*)tbuf_brick;
_tbuf_brick->outputs[0]->output_name = "/tmp/testfile.log";
@ -208,79 +219,42 @@ void make_test_instance(void)
_tbuf_brick->max_count = TRANS_MEM >> _tbuf_brick->backing_order;
#endif
connect(tbuf_brick->inputs[0], tdevice_brick->outputs[0]);
connect(tbuf_brick->inputs[0], inter);
inter = tbuf_brick->outputs[0];
#else
(void)tbuf_brick;
(void)_tbuf_brick;
#endif // CONF_TBUF
trans_brick = brick(&trans_logger_brick_type);
_trans_brick = (void*)trans_brick;
//_trans_brick->log_reads = true;
_trans_brick->log_reads = false;
_trans_brick->allow_reads_after = HZ;
_trans_brick->max_queue = 1000;
#ifdef CONF_TRANS_FLYING
_trans_brick->log_reads = CONF_TRANS_LOG_READS;
_trans_brick->outputs[0]->q_phase2.q_max_queued = CONF_TRANS_MAX_QUEUE;
_trans_brick->outputs[0]->q_phase4.q_max_queued = CONF_TRANS_MAX_QUEUE;
_trans_brick->outputs[0]->q_phase2.q_max_jiffies = CONF_TRANS_MAX_JIFFIES;
_trans_brick->outputs[0]->q_phase4.q_max_jiffies = CONF_TRANS_MAX_JIFFIES;
_trans_brick->outputs[0]->q_phase2.q_max_flying = CONF_TRANS_FLYING;
_trans_brick->outputs[0]->q_phase4.q_max_flying = CONF_TRANS_FLYING;
#endif
#ifdef CONF_TRANS_SORT
_trans_brick->outputs[0]->q_phase2.q_ordering = true;
_trans_brick->outputs[0]->q_phase4.q_ordering = true;
#endif
connect(trans_brick->inputs[0], buf_brick->outputs[0]);
connect(trans_brick->inputs[1], tbuf_brick->outputs[0]);
connect(trans_brick->inputs[0], first);
connect(trans_brick->inputs[1], inter);
first = trans_brick->outputs[0];
connect(last, trans_brick->outputs[0]);
#else // CONF_TRANS
(void)trans_brick;
(void)_trans_brick;
(void)tbuf_brick;
(void)_tbuf_brick;
(void)tdevice_brick;
connect(last, buf_brick->outputs[0]);
#endif // CONF_TRANS
if (false) { // ref-counting no longer valid
struct buf_output *output = _buf_brick->outputs[0];
struct mars_ref_object *mref = NULL;
struct generic_object_layout ol = {};
mref = generic_alloc_mars_ref((struct generic_output*)output, &ol);
if (mref) {
int status;
mref->ref_pos = 0;
mref->ref_len = PAGE_SIZE;
mref->ref_may_write = READ;
status = GENERIC_OUTPUT_CALL(output, mars_ref_get, mref);
MARS_DBG("buf_get (status=%d)\n", status);
if (true) {
struct generic_callback cb = {
.cb_fn = test_endio,
};
mref->ref_cb = &cb;
GENERIC_OUTPUT_CALL(output, mars_ref_io, mref);
status = cb.cb_error;
MARS_DBG("buf_io (status=%d)\n", status);
}
GENERIC_OUTPUT_CALL(output, mars_ref_put, mref);
}
}
#else // CONF_BUF
(void)trans_brick;
(void)_trans_brick;
(void)buf_brick;
(void)_buf_brick;
(void)tbuf_brick;
(void)_tbuf_brick;
(void)tdevice_brick;
(void)_tdevice_brick;
(void)test_endio;
connect(last, device_brick->outputs[0]);
(void)inter;
(void)tbuf_brick;
(void)_tbuf_brick;
#endif // CONF_TRANS
#endif // CONF_BUF
connect(last, first);
msleep(200);

View File

@ -24,185 +24,6 @@
////////////////////////////////////////////////////////////////////
#define CODE_UNKNOWN 0
#define CODE_WRITE_NEW 1
#define CODE_WRITE_OLD 2
#define START_MAGIC 0xa8f7e908d9177957ll
#define END_MAGIC 0x74941fb74ab5726dll
#define OVERHEAD \
( \
sizeof(START_MAGIC) + \
sizeof(char) * 2 + \
sizeof(short) + \
sizeof(int) + \
sizeof(struct log_header) + \
sizeof(END_MAGIC) + \
sizeof(char) * 2 + \
sizeof(short) + \
sizeof(int) + \
sizeof(struct timespec) + \
0 \
)
// TODO: make this bytesex-aware.
#define DATA_PUT(data,offset,val) \
do { \
*((typeof(val)*)(data+offset)) = val; \
offset += sizeof(val); \
} while (0)
#define DATA_GET(data,offset,val) \
do { \
val = *((typeof(val)*)(data+offset)); \
offset += sizeof(val); \
} while (0)
static inline void log_skip(struct trans_logger_input *input) _noinline
{
int bits;
if (!input->info.transfer_size) {
int status = GENERIC_INPUT_CALL(input, mars_get_info, &input->info);
if (status < 0) {
MARS_FAT("cannot get transfer log info (code=%d)\n", status);
}
}
bits = input->info.transfer_order + PAGE_SHIFT;
input->log_pos = ((input->log_pos >> bits) + 1) << bits;
}
static void *log_reserve(struct trans_logger_input *input, struct log_header *l)
{
struct mars_ref_object *mref;
void *data;
int total_len;
int status;
int offset;
//MARS_INF("reserving %d at %lld\n", l->l_len, input->log_pos);
if (unlikely(input->log_mref)) {
MARS_ERR("mref already existing\n");
goto err;
}
mref = trans_logger_alloc_mars_ref(&input->hidden_output, &input->ref_object_layout);
if (unlikely(!mref))
goto err;
mref->ref_pos = input->log_pos;
total_len = l->l_len + OVERHEAD;
mref->ref_len = total_len;
mref->ref_may_write = WRITE;
status = GENERIC_INPUT_CALL(input, mars_ref_get, mref);
if (unlikely(status < 0)) {
goto err_free;
}
if (unlikely(status < total_len)) {
goto put;
}
input->log_mref = mref;
data = mref->ref_data;
offset = 0;
DATA_PUT(data, offset, START_MAGIC);
DATA_PUT(data, offset, (char)1); // version of format, currently there is no other one
input->validflag_offset = offset;
DATA_PUT(data, offset, (char)0); // valid_flag
DATA_PUT(data, offset, (short)0); // spare
DATA_PUT(data, offset, total_len); // start of next header
DATA_PUT(data, offset, l->l_stamp.tv_sec);
DATA_PUT(data, offset, l->l_stamp.tv_nsec);
DATA_PUT(data, offset, l->l_pos);
input->reallen_offset = offset;
DATA_PUT(data, offset, l->l_len);
DATA_PUT(data, offset, l->l_code);
input->payload_offset = offset;
input->payload_len = l->l_len;
return data + offset;
put:
GENERIC_INPUT_CALL(input, mars_ref_put, mref);
return NULL;
err_free:
trans_logger_free_mars_ref(mref);
err:
return NULL;
}
bool log_finalize(struct trans_logger_input *input, int len, void (*endio)(struct generic_callback *cb), struct trans_logger_mars_ref_aspect *orig_mref_a)
{
struct mars_ref_object *mref = input->log_mref;
struct trans_logger_mars_ref_aspect *mref_a;
struct generic_callback *cb;
struct timespec now;
void *data;
int offset;
bool ok = false;
CHECK_PTR(mref, err);
input->log_mref = NULL;
if (unlikely(len > input->payload_len)) {
MARS_ERR("trying to write more than reserved\n");
goto put;
}
mref_a = trans_logger_mars_ref_get_aspect(&input->hidden_output, mref);
CHECK_PTR(mref_a, put);
data = mref->ref_data;
/* Correct the length in the header.
*/
offset = input->reallen_offset;
DATA_PUT(data, offset, len);
/* Write the trailer.
*/
offset = input->payload_offset + len;
DATA_PUT(data, offset, END_MAGIC);
DATA_PUT(data, offset, (char)1); // valid_flag copy
DATA_PUT(data, offset, (char)0); // spare
DATA_PUT(data, offset, (short)0); // spare
DATA_PUT(data, offset, (int)0); // spare
now = CURRENT_TIME; // when the log entry was ready.
DATA_PUT(data, offset, now.tv_sec);
DATA_PUT(data, offset, now.tv_nsec);
input->log_pos += offset;
/* This must come last. In case of incomplete
* or even operlapping disk transfers, this indicates
* the completeness / integrity of the payload at
* the time of starting the transfer.
*/
offset = input->validflag_offset;
DATA_PUT(data, offset, (char)1);
cb = &mref_a->cb;
cb->cb_fn = endio;
cb->cb_error = 0;
cb->cb_prev = NULL;
cb->cb_private = orig_mref_a;
mref->ref_cb = cb;
GENERIC_INPUT_CALL(input, mars_ref_io, mref, WRITE);
ok = true;
put:
GENERIC_INPUT_CALL(input, mars_ref_put, mref);
err:
return ok;
}
////////////////////////////////////////////////////////////////////
static inline bool q_cmp(struct pairing_heap_mref *_a, struct pairing_heap_mref *_b)
{
struct trans_logger_mars_ref_aspect *mref_a = container_of(_a, struct trans_logger_mars_ref_aspect, ph);
@ -224,6 +45,30 @@ static inline void q_init(struct logger_queue *q) _noinline
atomic_set(&q->q_flying, 0);
}
static
bool q_is_ready(struct logger_queue *q)
{
int queued = atomic_read(&q->q_queued);
int flying;
bool res = false;
if (queued <= 0)
goto always_done;
res = true;
if (queued >= q->q_max_queued)
goto done;
if (q->q_max_jiffies > 0 &&
(long long)jiffies - q->q_last_action >= q->q_max_jiffies)
goto done;
res = false;
goto always_done;
done:
flying = atomic_read(&q->q_flying);
if (q->q_max_flying > 0 && flying >= q->q_max_flying)
res = false;
always_done:
return res;
}
static inline void q_insert(struct logger_queue *q, struct trans_logger_mars_ref_aspect *mref_a) _noinline
{
unsigned long flags;
@ -282,7 +127,7 @@ static inline struct trans_logger_mars_ref_aspect *q_fetch(struct logger_queue *
q->heap_border = mref_a->object->ref_pos;
ph_delete_min_mref(minpos);
atomic_dec(&q->q_queued);
q->q_last_action = jiffies;
//q->q_last_action = jiffies;
}
#else
if (!q->heap_high) {
@ -294,14 +139,14 @@ static inline struct trans_logger_mars_ref_aspect *q_fetch(struct logger_queue *
q->heap_border = mref_a->object->ref_pos;
ph_delete_min_mref(&q->heap_high);
atomic_dec(&q->q_queued);
q->q_last_action = jiffies;
//q->q_last_action = jiffies;
}
#endif
} else if (!list_empty(&q->q_anchor)) {
struct list_head *next = q->q_anchor.next;
list_del_init(next);
atomic_dec(&q->q_queued);
q->q_last_action = jiffies;
//q->q_last_action = jiffies;
mref_a = container_of(next, struct trans_logger_mars_ref_aspect, q_head);
}
@ -318,9 +163,7 @@ static inline int hash_fn(loff_t base_index) _noinline
// simple and stupid
loff_t tmp;
tmp = base_index ^ (base_index / TRANS_HASH_MAX);
tmp += tmp / 13;
tmp ^= tmp / (TRANS_HASH_MAX * TRANS_HASH_MAX);
return tmp % TRANS_HASH_MAX;
return ((unsigned)tmp) % TRANS_HASH_MAX;
}
static struct trans_logger_mars_ref_aspect *hash_find(struct hash_anchor *table, loff_t pos, int len)
@ -341,7 +184,7 @@ static struct trans_logger_mars_ref_aspect *hash_find(struct hash_anchor *table,
/* The lists are always sorted according to age.
* Caution: there may be duplicates in the list, some of them
* overlapping with the search area in many different ways.
* Always find the both _newest_ and _lowest_ overlapping element.
* Always find both the _newest_ and _lowest_ overlapping element.
*/
for (tmp = start->hash_anchor.next; tmp != &start->hash_anchor; tmp = tmp->next) {
#if 1
@ -465,6 +308,7 @@ static int _read_ref_get(struct trans_logger_output *output, struct trans_logger
mref->ref_data = shadow->ref_data - diff;
mref->ref_flags = shadow->ref_flags;
mref_a->shadow_ref = shadow_a;
atomic_inc(&mref->ref_count);
return mref->ref_len;
}
@ -501,6 +345,8 @@ static int trans_logger_ref_get(struct trans_logger_output *output, struct mars_
CHECK_PTR(mref_a, err);
CHECK_PTR(mref_a->object, err);
mref_a->orig_data = mref->ref_data;
base_offset = mref->ref_pos & (loff_t)(REGION_SIZE - 1);
if (base_offset + mref->ref_len > REGION_SIZE)
mref->ref_len = REGION_SIZE - base_offset;
@ -520,6 +366,8 @@ static void trans_logger_ref_put(struct trans_logger_output *output, struct mars
struct trans_logger_mars_ref_aspect *shadow_a;
struct trans_logger_input *input;
CHECK_ATOMIC(&mref->ref_count, 1);
CHECK_PTR(output, err);
mref_a = trans_logger_mars_ref_get_aspect(output, mref);
@ -557,6 +405,7 @@ static void _trans_logger_endio(struct generic_callback *cb)
struct trans_logger_output *output;
struct mars_ref_object *mref;
struct generic_callback *prev_cb;
mref_a = cb->cb_private;
CHECK_PTR(mref_a, err);
if (unlikely(&mref_a->cb != cb)) {
@ -579,7 +428,7 @@ static void _trans_logger_endio(struct generic_callback *cb)
err: ;
}
static void trans_logger_ref_io(struct trans_logger_output *output, struct mars_ref_object *mref, int rw)
static void trans_logger_ref_io(struct trans_logger_output *output, struct mars_ref_object *mref)
{
struct trans_logger_mars_ref_aspect *mref_a;
struct trans_logger_input *input = output->brick->inputs[0];
@ -592,8 +441,7 @@ static void trans_logger_ref_io(struct trans_logger_output *output, struct mars_
// is this a shadow buffer?
if (mref_a->shadow_ref) {
mref->ref_rw = rw;
if (rw == READ) {
if (mref->ref_rw == READ) {
// nothing to do: directly signal success.
struct generic_callback *cb = mref->ref_cb;
cb->cb_error = 0;
@ -612,7 +460,7 @@ static void trans_logger_ref_io(struct trans_logger_output *output, struct mars_
}
#endif
mref->ref_flags |= MARS_REF_WRITING;
//MARS_INF("hashing %d at %lld\n", mref->ref_len, mref->ref_pos);
MARS_DBG("hashing %d at %lld\n", mref->ref_len, mref->ref_pos);
hash_insert(output->hash_table, mref_a, &output->hash_count);
q_insert(&output->q_phase1, mref_a);
wake_up(&output->event);
@ -621,8 +469,8 @@ static void trans_logger_ref_io(struct trans_logger_output *output, struct mars_
}
// only READ is allowed on non-shadow buffers
if (unlikely(rw != READ)) {
MARS_FAT("bad operation %d without shadow\n", rw);
if (unlikely(mref->ref_rw != READ)) {
MARS_FAT("bad operation %d without shadow\n", mref->ref_rw);
}
atomic_inc(&output->fly_count);
@ -634,7 +482,7 @@ static void trans_logger_ref_io(struct trans_logger_output *output, struct mars_
cb->cb_prev = mref->ref_cb;
mref->ref_cb = cb;
GENERIC_INPUT_CALL(input, mars_ref_io, mref, rw);
GENERIC_INPUT_CALL(input, mars_ref_io, mref);
err: ;
}
@ -686,7 +534,7 @@ static bool phase1_startio(struct trans_logger_mars_ref_aspect *orig_mref_a)
{
struct mars_ref_object *orig_mref;
struct trans_logger_output *output;
struct trans_logger_input *input;
struct trans_logger_brick *brick;
void *data;
bool ok;
@ -696,8 +544,8 @@ static bool phase1_startio(struct trans_logger_mars_ref_aspect *orig_mref_a)
CHECK_PTR(orig_mref->ref_cb, err);
output = orig_mref_a->output;
CHECK_PTR(output, err);
input = output->brick->inputs[1];
CHECK_PTR(input, err);
brick = output->brick;
CHECK_PTR(brick, err);
{
struct log_header l = {
@ -706,7 +554,7 @@ static bool phase1_startio(struct trans_logger_mars_ref_aspect *orig_mref_a)
.l_len = orig_mref->ref_len,
.l_code = CODE_WRITE_NEW,
};
data = log_reserve(input, &l);
data = log_reserve(&brick->logst, &l);
}
if (unlikely(!data)) {
goto err;
@ -714,7 +562,7 @@ static bool phase1_startio(struct trans_logger_mars_ref_aspect *orig_mref_a)
memcpy(data, orig_mref->ref_data, orig_mref->ref_len);
ok = log_finalize(input, orig_mref->ref_len, phase1_endio, orig_mref_a);
ok = log_finalize(&brick->logst, orig_mref->ref_len, phase1_endio, orig_mref_a);
if (unlikely(!ok)) {
goto err;
}
@ -765,7 +613,7 @@ static bool phase2_startio(struct trans_logger_mars_ref_aspect *orig_mref_a)
{
struct mars_ref_object *orig_mref;
struct trans_logger_output *output;
struct trans_logger_input *input;
struct trans_logger_brick *brick;
struct mars_ref_object *sub_mref;
struct trans_logger_mars_ref_aspect *sub_mref_a;
struct generic_callback *cb;
@ -778,8 +626,8 @@ static bool phase2_startio(struct trans_logger_mars_ref_aspect *orig_mref_a)
CHECK_PTR(orig_mref, err);
output = orig_mref_a->output;
CHECK_PTR(output, err);
input = output->brick->inputs[0];
CHECK_PTR(input, err);
brick = output->brick;
CHECK_PTR(brick, err);
pos = orig_mref->ref_pos;
len = orig_mref->ref_len;
@ -787,7 +635,7 @@ static bool phase2_startio(struct trans_logger_mars_ref_aspect *orig_mref_a)
/* allocate internal sub_mref for further work
*/
while (len > 0) {
sub_mref = trans_logger_alloc_mars_ref(&input->hidden_output, &input->ref_object_layout);
sub_mref = mars_alloc_mars_ref(&brick->logst.hidden_output, &brick->logst.ref_object_layout);
if (unlikely(!sub_mref)) {
MARS_FAT("cannot alloc sub_mref\n");
goto err;
@ -797,19 +645,19 @@ static bool phase2_startio(struct trans_logger_mars_ref_aspect *orig_mref_a)
sub_mref->ref_len = len;
sub_mref->ref_may_write = WRITE;
sub_mref_a = trans_logger_mars_ref_get_aspect(&input->hidden_output, sub_mref);
sub_mref_a = trans_logger_mars_ref_get_aspect((struct trans_logger_output*)&brick->logst.hidden_output, sub_mref);
CHECK_PTR(sub_mref_a, err);
sub_mref_a->stamp = orig_mref_a->stamp;
sub_mref_a->orig_mref_a = orig_mref_a;
sub_mref_a->output = output;
status = GENERIC_INPUT_CALL(input, mars_ref_get, sub_mref);
if (unlikely(status <= 0)) {
MARS_FAT("cannot get sub_ref\n");
status = GENERIC_INPUT_CALL(brick->logst.input, mars_ref_get, sub_mref);
if (unlikely(status < 0)) {
MARS_FAT("cannot get sub_ref, status = %d\n", status);
goto err;
}
pos += status;
len -= status;
pos += sub_mref->ref_len;
len -= sub_mref->ref_len;
/* Get a reference count for each sub_mref.
* Paired with trans_logger_ref_put() in phase4_endio().
@ -823,10 +671,14 @@ static bool phase2_startio(struct trans_logger_mars_ref_aspect *orig_mref_a)
cb->cb_error = 0;
cb->cb_prev = NULL;
sub_mref->ref_cb = cb;
sub_mref->ref_rw = 0;
atomic_inc(&output->q_phase2.q_flying);
GENERIC_INPUT_CALL(input, mars_ref_io, sub_mref, READ);
if (output->brick->log_reads) {
GENERIC_INPUT_CALL(brick->logst.input, mars_ref_io, sub_mref);
} else { // shortcut
phase2_endio(cb);
}
}
/* Finally, put the original reference (i.e. in essence
@ -871,7 +723,7 @@ static bool phase3_startio(struct trans_logger_mars_ref_aspect *sub_mref_a)
{
struct mars_ref_object *sub_mref;
struct trans_logger_output *output;
struct trans_logger_input *input;
struct trans_logger_brick *brick;
void *data;
bool ok;
@ -880,8 +732,8 @@ static bool phase3_startio(struct trans_logger_mars_ref_aspect *sub_mref_a)
CHECK_PTR(sub_mref, err);
output = sub_mref_a->output;
CHECK_PTR(output, err);
input = output->brick->inputs[1];
CHECK_PTR(input, err);
brick = output->brick;
CHECK_PTR(brick, err);
{
struct log_header l = {
@ -890,7 +742,7 @@ static bool phase3_startio(struct trans_logger_mars_ref_aspect *sub_mref_a)
.l_len = sub_mref->ref_len,
.l_code = CODE_WRITE_OLD,
};
data = log_reserve(input, &l);
data = log_reserve(&brick->logst, &l);
}
if (unlikely(!data)) {
@ -899,7 +751,7 @@ static bool phase3_startio(struct trans_logger_mars_ref_aspect *sub_mref_a)
memcpy(data, sub_mref->ref_data, sub_mref->ref_len);
ok = log_finalize(input, sub_mref->ref_len, phase3_endio, sub_mref_a);
ok = log_finalize(&brick->logst, sub_mref->ref_len, phase3_endio, sub_mref_a);
if (unlikely(!ok)) {
goto err;
}
@ -937,18 +789,19 @@ static void phase4_endio(struct generic_callback *cb)
goto put;
}
// TODO: signal final completion.
// TODO: save final completion status into the status input
put:
//MARS_INF("put ORIGREF.\n");
CHECK_ATOMIC(&orig_mref->ref_count, 1);
trans_logger_ref_put(orig_mref_a->output, orig_mref);
wake_up(&output->event);
err: ;
}
static bool phase4_startio(struct trans_logger_mars_ref_aspect *sub_mref_a)
{
struct mars_ref_object *sub_mref;
struct mars_ref_object *sub_mref = NULL;
struct generic_callback *cb;
struct trans_logger_output *output;
struct trans_logger_input *input;
@ -975,15 +828,17 @@ static bool phase4_startio(struct trans_logger_mars_ref_aspect *sub_mref_a)
cb->cb_error = 0;
cb->cb_prev = NULL;
sub_mref->ref_cb = cb;
sub_mref->ref_rw = 1;
atomic_inc(&output->q_phase4.q_flying);
GENERIC_INPUT_CALL(input, mars_ref_io, sub_mref, WRITE);
GENERIC_INPUT_CALL(input, mars_ref_io, sub_mref);
//MARS_INF("put SUBREF.\n");
GENERIC_INPUT_CALL(input, mars_ref_put, sub_mref);
return true;
err:
MARS_ERR("cannot start phase 4 IO %p\n", sub_mref);
return false;
}
@ -1000,9 +855,6 @@ static int run_queue(struct logger_queue *q, bool (*startio)(struct trans_logger
bool ok;
while (max-- > 0) {
if (q->q_max_flying > 0 && atomic_read(&q->q_flying) >= q->q_max_flying)
break;
mref_a = q_fetch(q);
if (!mref_a)
return -1;
@ -1020,27 +872,21 @@ static int trans_logger_thread(void *data)
{
struct trans_logger_output *output = data;
struct trans_logger_brick *brick;
struct trans_logger_input *input;
int wait_jiffies = HZ;
int last_jiffies = jiffies;
bool check_q = true;
brick = output->brick;
input = brick->inputs[1];
MARS_INF("logger has started.\n");
while (!kthread_should_stop()) {
int status;
if (wait_jiffies < 5)
wait_jiffies = 5; // prohibit high CPU load
wait_event_interruptible_timeout(
output->event,
!list_empty(&output->q_phase1.q_anchor) ||
(check_q &&
(!list_empty(&output->q_phase2.q_anchor) ||
!list_empty(&output->q_phase3.q_anchor) ||
!list_empty(&output->q_phase4.q_anchor))),
q_is_ready(&output->q_phase1) ||
q_is_ready(&output->q_phase2) ||
q_is_ready(&output->q_phase3) ||
q_is_ready(&output->q_phase4),
wait_jiffies);
#if 1
if (((int)jiffies) - last_jiffies >= HZ * 10 && atomic_read(&output->hash_count) > 0) {
@ -1048,44 +894,34 @@ static int trans_logger_thread(void *data)
MARS_INF("LOGGER: hash_count=%d fly=%d phase1=%d/%d phase2=%d/%d phase3=%d/%d phase4=%d/%d\n", atomic_read(&output->hash_count), atomic_read(&output->fly_count), atomic_read(&output->q_phase1.q_queued), atomic_read(&output->q_phase1.q_flying), atomic_read(&output->q_phase2.q_queued), atomic_read(&output->q_phase2.q_flying), atomic_read(&output->q_phase3.q_queued), atomic_read(&output->q_phase3.q_flying), atomic_read(&output->q_phase4.q_queued), atomic_read(&output->q_phase4.q_flying));
}
#endif
wait_jiffies = HZ;
status = run_queue(&output->q_phase1, phase1_startio, 1000);
if (unlikely(status > 0)) {
(void)run_queue(&output->q_phase3, phase3_startio, 1);
log_skip(input);
check_q = true;
log_skip(&brick->logst);
wait_jiffies = 5;
continue;
}
/* Strategy / performance:
* run higher phases only when IO contention is "low".
*/
if (brick->max_queue <= 0 ||
atomic_read(&output->q_phase2.q_queued) + atomic_read(&output->q_phase4.q_queued) < brick->max_queue) {
int rest = brick->allow_reads_after - (jiffies - output->q_phase1.q_last_action);
if (brick->allow_reads_after > 0 && rest > 0) {
wait_jiffies = rest;
check_q = false;
continue;
}
if (brick->limit_congest > 0 &&
atomic_read(&output->q_phase1.q_flying) + atomic_read(&output->fly_count) >= brick->limit_congest) {
wait_jiffies = HZ / 100;
check_q = false;
continue;
}
if (q_is_ready(&output->q_phase2)) {
(void)run_queue(&output->q_phase2, phase2_startio, 64);
}
wait_jiffies = HZ;
check_q = true;
status = run_queue(&output->q_phase2, phase2_startio, 8);
status = run_queue(&output->q_phase3, phase3_startio, 16);
if (unlikely(status > 0)) {
log_skip(input);
continue;
if (q_is_ready(&output->q_phase3)) {
status = run_queue(&output->q_phase3, phase3_startio, 64);
if (unlikely(status > 0)) {
log_skip(&brick->logst);
wait_jiffies = 5;
continue;
}
}
if (q_is_ready(&output->q_phase4)) {
(void)run_queue(&output->q_phase4, phase4_startio, 64);
}
status = run_queue(&output->q_phase4, phase4_startio, 8);
}
return 0;
}
@ -1113,6 +949,8 @@ MARS_MAKE_STATICS(trans_logger);
static int trans_logger_brick_construct(struct trans_logger_brick *brick)
{
_generic_output_init((struct generic_brick*)brick, (struct generic_output_type*)&trans_logger_output_type, (struct generic_output*)&brick->logst.hidden_output, "internal");
brick->logst.input = (void*)brick->inputs[1];
return 0;
}
@ -1143,8 +981,6 @@ static int trans_logger_output_construct(struct trans_logger_output *output)
static int trans_logger_input_construct(struct trans_logger_input *input)
{
struct trans_logger_output *hidden = &input->hidden_output;
_trans_logger_output_init(input->brick, hidden, "internal");
return 0;
}

View File

@ -4,18 +4,12 @@
#define REGION_SIZE_BITS 22
#define REGION_SIZE (1 << REGION_SIZE_BITS)
#define TRANS_HASH_MAX 32
#define TRANS_HASH_MAX 128
#include <linux/time.h>
#include "log_format.h"
#include "pairing_heap.h"
struct log_header {
struct timespec l_stamp;
loff_t l_pos;
int l_len;
int l_code;
};
////////////////////////////////////////////////////////////////////
_PAIRING_HEAP_TYPEDEF(mref,)
@ -28,9 +22,11 @@ struct logger_queue {
spinlock_t q_lock;
atomic_t q_queued;
atomic_t q_flying;
int q_last_action; // jiffies
long long q_last_action; // jiffies
// tunables
int q_max_queued;
int q_max_flying;
int q_max_jiffies;
bool q_ordering;
};
@ -43,12 +39,12 @@ struct hash_anchor {
struct trans_logger_mars_ref_aspect {
GENERIC_ASPECT(mars_ref);
struct trans_logger_output *output;
struct list_head hash_head;
struct list_head q_head;
struct pairing_heap_mref ph;
struct trans_logger_mars_ref_aspect *shadow_ref;
void *orig_data;
struct trans_logger_output *output;
void *orig_data;
struct timespec stamp;
struct generic_callback cb;
struct trans_logger_mars_ref_aspect *orig_mref_a;
@ -56,11 +52,10 @@ struct trans_logger_mars_ref_aspect {
struct trans_logger_brick {
MARS_BRICK(trans_logger);
struct log_status logst;
// parameters
bool log_reads;
int allow_reads_after; // phase2 and later is only started after this time (in jiffies)
int limit_congest; // limit phase1 congestion.
int max_queue; // delay phase2 & later only if this number of waiting requests is not exceeded
};
struct trans_logger_output {
@ -79,15 +74,6 @@ struct trans_logger_output {
struct trans_logger_input {
MARS_INPUT(trans_logger);
struct mars_info info;
loff_t log_pos;
struct mars_ref_object *log_mref;
int validflag_offset;
int reallen_offset;
int payload_offset;
int payload_len;
struct trans_logger_output hidden_output;
struct generic_object_layout ref_object_layout;
};
MARS_TYPES(trans_logger);

View File

@ -1,436 +0,0 @@
// (c) 2010 Thomas Schoebel-Theuer / 1&1 Internet AG
/* Usebuf brick.
* translates from unbuffered IO to buffered IO (mars_{get,put}_buf)
*/
//#define BRICK_DEBUGGING
//#define MARS_DEBUGGING
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/string.h>
#include <linux/bio.h>
#include "mars.h"
///////////////////////// own type definitions ////////////////////////
#include "mars_usebuf.h"
///////////////////////// own helper functions ////////////////////////
/* currently we have copy semantics :(
*/
static void _usebuf_copy(struct usebuf_mars_ref_aspect *mref_a, int rw)
{
void *ref_data = mref_a->object->ref_data;
void *bio_base = kmap_atomic(mref_a->bvec->bv_page, KM_USER0);
void *bio_data = bio_base + mref_a->bvec_offset;
int len = mref_a->bvec_len;
#if 1
if (rw == READ) {
memcpy(bio_data, ref_data, len);
} else {
memcpy(ref_data, bio_data, len);
}
#endif
kunmap_atomic(bio_base, KM_USER0);
}
static void usebuf_ref_put(struct usebuf_output *output, struct mars_ref_object *origmref);
static void _usebuf_origmref_endio(struct usebuf_output *output, struct mars_ref_object *origmref)
{
struct usebuf_mars_ref_aspect *origmref_a;
struct generic_callback *cb;
origmref_a = usebuf_mars_ref_get_aspect(output, origmref);
if (unlikely(!origmref_a)) {
MARS_FAT("cannot get origmref_a from origmref %p\n", origmref);
goto out;
}
MARS_DBG("origmref=%p subref_count=%d error=%d\n", origmref, atomic_read(&origmref_a->subref_count), origmref->cb_error);
CHECK_ATOMIC(&origmref_a->subref_count, 1);
if (!atomic_dec_and_test(&origmref_a->subref_count)) {
goto out;
}
cb = origmref->ref_cb;
MARS_DBG("DONE error=%d\n", cb->cb_error);
cb->cb_fn(cb);
usebuf_ref_put(output, origmref);
out:
return;
}
static void _usebuf_mref_endio(struct generic_callback *cb)
{
struct usebuf_mars_ref_aspect *mref_a;
struct mars_ref_object *mref;
struct usebuf_output *output;
struct mars_ref_object *origmref;
struct usebuf_mars_ref_aspect *origmref_a;
int status;
mref_a = cb->cb_private;
if (unlikely(!mref_a)) {
MARS_FAT("cannot get aspect\n");
goto out_fatal;
}
mref = mref_a->object;
if (unlikely(!mref)) {
MARS_FAT("cannot get mref\n");
goto out_fatal;
}
output = mref_a->output;
if (unlikely(!output)) {
MARS_FAT("bad argument output\n");
goto out_fatal;
}
origmref = mref_a->origmref;
if (unlikely(!origmref)) {
MARS_FAT("cannot get origmref\n");
goto out_fatal;
}
MARS_DBG("origmref=%p\n", origmref);
status = -EINVAL;
origmref_a = usebuf_mars_ref_get_aspect(output, origmref);
if (unlikely(!origmref_a)) {
MARS_ERR("cannot get origmref_a\n");
goto out_err;
}
// check if we have an initial read => now start the final write
if (mref->ref_may_write != READ && mref->ref_rw == READ && cb->cb_error >= 0) {
struct usebuf_input *input = output->brick->inputs[0];
status = -EIO;
if (unlikely(!(mref->ref_flags & MARS_REF_UPTODATE))) {
MARS_ERR("not UPTODATE after initial read\n");
goto out_err;
}
_usebuf_copy(mref_a, WRITE);
// grab extra reference
CHECK_ATOMIC(&origmref_a->subref_count, 1);
atomic_inc(&origmref_a->subref_count);
GENERIC_INPUT_CALL(input, mars_ref_io, mref, WRITE);
} else {
// finalize the read or final write
if (likely(!cb->cb_error)) {
struct bio *bio = origmref->orig_bio;
int direction;
status = -EINVAL;
if (unlikely(!bio)) {
MARS_ERR("bad bio setup on origmref %p", origmref);
goto out_err;
}
direction = bio->bi_rw & 1;
if (direction == READ) {
_usebuf_copy(mref_a, READ);
}
}
}
CHECK_ATOMIC(&origmref_a->subref_count, 1);
CHECK_ATOMIC(&mref->ref_count, 1);
status = cb->cb_error;
out_err:
if (status < 0) {
origmref->ref_cb->cb_error = status;
MARS_ERR("error %d\n", status);
}
_usebuf_origmref_endio(output, origmref);
out_fatal: // no chance to call callback; this will result in mem leak :(
;
}
////////////////// own brick / input / output operations //////////////////
static int usebuf_get_info(struct usebuf_output *output, struct mars_info *info)
{
struct usebuf_input *input = output->brick->inputs[0];
return GENERIC_INPUT_CALL(input, mars_get_info, info);
}
static int usebuf_ref_get(struct usebuf_output *output, struct mars_ref_object *origmref)
{
MARS_FAT("not callable!\n");
return -ENOSYS;
#if 0
struct usebuf_input *input = output->brick->inputs[0];
return GENERIC_INPUT_CALL(input, mars_ref_get, mref);
#endif
}
static void usebuf_ref_put(struct usebuf_output *output, struct mars_ref_object *origmref)
{
CHECK_ATOMIC(&origmref->ref_count, 1);
if (!atomic_dec_and_test(&origmref->ref_count)) {
return;
}
usebuf_free_mars_ref(origmref);
#if 0 // NYI
struct usebuf_input *input = output->brick->inputs[0];
GENERIC_INPUT_CALL(input, mars_ref_put, mref);
#endif
}
static void usebuf_ref_io(struct usebuf_output *output, struct mars_ref_object *origmref, int rw)
{
struct usebuf_input *input = output->brick->inputs[0];
struct bio *bio = origmref->orig_bio;
struct bio_vec *bvec;
struct usebuf_mars_ref_aspect *origmref_a;
loff_t start_pos;
int start_len;
int status;
int i;
#if 1
static int last_jiffies = 0;
if (!last_jiffies || ((int)jiffies) - last_jiffies >= HZ * 30) {
last_jiffies = jiffies;
MARS_INF("USEBUF: reads=%d writes=%d prereads=%d\n", atomic_read(&output->io_count) - atomic_read(&output->write_count), atomic_read(&output->write_count), atomic_read(&output->preread_count));
}
#endif
MARS_DBG("START origmref=%p\n", origmref);
status = -EINVAL;
if (unlikely(!bio)) {
MARS_ERR("cannot get bio\n");
goto done;
}
origmref_a = usebuf_mars_ref_get_aspect(output, origmref);
if (unlikely(!origmref_a)) {
MARS_ERR("cannot get origmref_a\n");
goto done;
}
origmref->ref_cb->cb_error = 0;
// initial refcount: prevent intermediate drops
_CHECK_ATOMIC(&origmref->ref_count, !=, 1);
atomic_inc(&origmref->ref_count);
_CHECK_ATOMIC(&origmref_a->subref_count, !=, 0);
atomic_set(&origmref_a->subref_count, 1);
start_pos = ((loff_t)bio->bi_sector) << 9; // TODO: make dynamic
start_len = bio->bi_size;
bio_for_each_segment(bvec, bio, i) {
int this_len = bvec->bv_len;
int my_offset = 0;
while (this_len > 0) {
struct mars_ref_object *mref;
struct usebuf_mars_ref_aspect *mref_a;
struct generic_callback *cb;
int my_len;
int my_rw;
mref = usebuf_alloc_mars_ref(output, &output->ref_object_layout);
status = -ENOMEM;
if (unlikely(!mref)) {
MARS_ERR("cannot alloc buffer, status=%d\n", status);
goto done_drop;
}
mref->ref_pos = start_pos;
mref->ref_len = this_len;
mref->ref_may_write = rw;
status = GENERIC_INPUT_CALL(input, mars_ref_get, mref);
if (unlikely(status < 0)) {
MARS_ERR("cannot get buffer, status=%d\n", status);
goto done_drop;
}
my_len = status;
MARS_DBG("origmref=%p got mref=%p pos=%lld len=%d mode=%d flags=%d status=%d\n", origmref, mref, start_pos, this_len, mref->ref_may_write, mref->ref_flags, status);
status = -ENOMEM;
mref_a = usebuf_mars_ref_get_aspect(output, mref);
if (unlikely(!mref_a)) {
MARS_ERR("cannot get my own mref aspect\n");
goto put;
}
mref_a->origmref = origmref;
mref_a->bvec = bvec;
mref_a->bvec_offset = bvec->bv_offset + my_offset;
mref_a->bvec_len = my_len;
status = 0;
if ((mref->ref_flags & MARS_REF_UPTODATE) && rw == READ) {
// cache hit: immediately signal success
_usebuf_copy(mref_a, READ);
goto put;
}
cb = &mref_a->cb;
cb->cb_fn = _usebuf_mref_endio;
cb->cb_private = mref_a;
cb->cb_error = 0;
cb->cb_prev = NULL;
mref_a->output = output;
mref->ref_cb = cb;
my_rw = rw;
atomic_inc(&output->io_count);
if (!(my_rw == READ)) {
atomic_inc(&output->write_count);
if (mref->ref_flags & MARS_REF_UPTODATE) {
// buffer uptodate: start writing.
_usebuf_copy(mref_a, WRITE);
} else {
// first start initial read, to get the whole buffer UPTODATE
my_rw = READ;
atomic_inc(&output->preread_count);
}
}
// grab reference for each sub-IO
CHECK_ATOMIC(&origmref_a->subref_count, 1);
atomic_inc(&origmref_a->subref_count);
GENERIC_INPUT_CALL(input, mars_ref_io, mref, my_rw);
put:
GENERIC_INPUT_CALL(input, mars_ref_put, mref);
if (unlikely(status < 0))
break;
start_len -= my_len;
start_pos += my_len;
this_len -= my_len;
my_offset += my_len;
}
if (unlikely(this_len != 0)) {
MARS_ERR("bad internal length %d (status=%d)\n", this_len, status);
}
}
if (unlikely(start_len != 0 && !status)) {
MARS_ERR("length mismatch %d (status=%d)\n", start_len, status);
}
done_drop:
// drop initial refcount
if (status < 0)
origmref->ref_cb->cb_error = status;
_usebuf_origmref_endio(output, origmref);
done:
MARS_DBG("status=%d\n", status);
}
//////////////// object / aspect constructors / destructors ///////////////
static int usebuf_mars_ref_aspect_init_fn(struct generic_aspect *_ini, void *_init_data)
{
struct usebuf_mars_ref_aspect *ini = (void*)_ini;
ini->origmref = NULL;
ini->bvec = NULL;
return 0;
}
static void usebuf_mars_ref_aspect_exit_fn(struct generic_aspect *_ini, void *_init_data)
{
struct usebuf_mars_ref_aspect *ini = (void*)_ini;
(void)ini;
}
MARS_MAKE_STATICS(usebuf);
////////////////////// brick constructors / destructors ////////////////////
static int usebuf_brick_construct(struct usebuf_brick *brick)
{
return 0;
}
static int usebuf_output_construct(struct usebuf_output *output)
{
return 0;
}
///////////////////////// static structs ////////////////////////
static struct usebuf_brick_ops usebuf_brick_ops = {
};
static struct usebuf_output_ops usebuf_output_ops = {
.make_object_layout = usebuf_make_object_layout,
.mars_get_info = usebuf_get_info,
.mars_ref_get = usebuf_ref_get,
.mars_ref_put = usebuf_ref_put,
.mars_ref_io = usebuf_ref_io,
};
const struct usebuf_input_type usebuf_input_type = {
.type_name = "usebuf_input",
.input_size = sizeof(struct usebuf_input),
};
static const struct usebuf_input_type *usebuf_input_types[] = {
&usebuf_input_type,
};
const struct usebuf_output_type usebuf_output_type = {
.type_name = "usebuf_output",
.output_size = sizeof(struct usebuf_output),
.master_ops = &usebuf_output_ops,
.output_construct = &usebuf_output_construct,
.aspect_types = usebuf_aspect_types,
.layout_code = {
[BRICK_OBJ_MARS_REF] = LAYOUT_ALL,
}
};
static const struct usebuf_output_type *usebuf_output_types[] = {
&usebuf_output_type,
};
const struct usebuf_brick_type usebuf_brick_type = {
.type_name = "usebuf_brick",
.brick_size = sizeof(struct usebuf_brick),
.max_inputs = 1,
.max_outputs = 1,
.master_ops = &usebuf_brick_ops,
.default_input_types = usebuf_input_types,
.default_output_types = usebuf_output_types,
.brick_construct = &usebuf_brick_construct,
};
EXPORT_SYMBOL_GPL(usebuf_brick_type);
////////////////// module init stuff /////////////////////////
static int __init init_usebuf(void)
{
printk(MARS_INFO "init_usebuf()\n");
return usebuf_register_brick_type();
}
static void __exit exit_usebuf(void)
{
printk(MARS_INFO "exit_usebuf()\n");
usebuf_unregister_brick_type();
}
MODULE_DESCRIPTION("MARS usebuf brick");
MODULE_AUTHOR("Thomas Schoebel-Theuer <tst@1und1.de>");
MODULE_LICENSE("GPL");
module_init(init_usebuf);
module_exit(exit_usebuf);

View File

@ -1,34 +0,0 @@
// (c) 2010 Thomas Schoebel-Theuer / 1&1 Internet AG
#ifndef MARS_USEBUF_H
#define MARS_USEBUF_H
struct usebuf_mars_ref_aspect {
GENERIC_ASPECT(mars_ref);
struct mars_ref_object *origmref;
struct generic_callback cb;
struct usebuf_output *output;
struct bio_vec *bvec;
int bvec_offset;
int bvec_len;
atomic_t subref_count;
};
struct usebuf_brick {
MARS_BRICK(usebuf);
};
struct usebuf_input {
MARS_INPUT(usebuf);
};
struct usebuf_output {
MARS_OUTPUT(usebuf);
struct generic_object_layout ref_object_layout;
atomic_t io_count;
atomic_t write_count;
atomic_t preread_count;
};
MARS_TYPES(usebuf);
#endif