import mars-38.tgz

This commit is contained in:
Thomas Schoebel-Theuer 2010-08-20 11:58:24 +01:00
parent b121a44a1f
commit a180d8f83a
8 changed files with 621 additions and 156 deletions

View File

@ -175,8 +175,7 @@ struct generic_output {
)
#define GENERIC_BRICK_OPS(BRICK) \
/*int (*brick_start)(struct BRICK##_brick *brick);*/ \
/*int (*brick_stop)(struct BRICK##_brick *brick);*/ \
int (*brick_switch)(struct BRICK##_brick *brick, bool state); \
struct generic_brick_ops {
GENERIC_BRICK_OPS(generic);

14
mars.h
View File

@ -6,16 +6,18 @@
#include <asm/spinlock.h>
#include <asm/atomic.h>
#define MARS_DELAY msleep(30000)
#define MARS_FATAL "MARS_FATAL " __BASE_FILE__ ": "
#define MARS_ERROR "MARS_ERROR " __BASE_FILE__ ": "
#define MARS_INFO "MARS_INFO " __BASE_FILE__ ": "
#define MARS_DEBUG "MARS_DEBUG " __BASE_FILE__ ": "
#define MARS_FAT(fmt, args...) printk(MARS_FATAL "%s(): " fmt, __FUNCTION__, ##args)
#define MARS_ERR(fmt, args...) printk(MARS_ERROR "%s(): " fmt, __FUNCTION__, ##args)
#define MARS_INF(fmt, args...) printk(MARS_INFO "%s(): " fmt, __FUNCTION__, ##args)
#define MARS_FAT(fmt, args...) do { printk(MARS_FATAL "%s(): " fmt, __FUNCTION__, ##args); MARS_DELAY; } while (0)
#define MARS_ERR(fmt, args...) do { printk(MARS_ERROR "%s(): " fmt, __FUNCTION__, ##args); MARS_DELAY; } while (0)
#define MARS_INF(fmt, args...) do { printk(MARS_INFO "%s(): " fmt, __FUNCTION__, ##args); } while (0)
#ifdef MARS_DEBUGGING
#define MARS_DBG(fmt, args...) printk(MARS_DEBUG "%s(): " fmt, __FUNCTION__, ##args)
#define MARS_DBG(fmt, args...) do { printk(MARS_DEBUG "%s(): " fmt, __FUNCTION__, ##args); } while (0)
#else
#define MARS_DBG(args...) /**/
#endif
@ -190,7 +192,7 @@ static const struct generic_aspect_type *BRICK##_aspect_types[BRICK_OBJ_NR] = {
int test = atomic_read(atom); \
if (test OP (minval)) { \
atomic_set(atom, minval); \
MARS_ERR("line %d atom " #atom " " #OP " " #minval "\n", __LINE__); \
MARS_ERR("line %d atomic " #atom " " #OP " " #minval "\n", __LINE__); \
} \
} while (0)
@ -213,7 +215,7 @@ static inline void mars_ref_attach_bio(struct mars_ref_object *mref, struct bio
}
#define CHECK_HEAD_EMPTY(head) \
if (!list_empty(head)) { \
if (unlikely(!list_empty(head))) { \
INIT_LIST_HEAD(head); \
MARS_ERR("list_head " #head " (%p) not empty\n", head); \
} \

View File

@ -15,6 +15,7 @@
#include "mars.h"
//#define USE_VMALLOC
//#define FAKE_IO
///////////////////////// own type definitions ////////////////////////
@ -184,9 +185,11 @@ static int make_bio(struct buf_brick *brick, struct bio **_bio, void *data, loff
{
unsigned long long sector;
int sector_offset;
int data_offset;
int page_offset;
int page_len;
int bvec_count;
int ilen = len;
int status;
int i;
struct page *page;
@ -205,26 +208,31 @@ static int make_bio(struct buf_brick *brick, struct bio **_bio, void *data, loff
bdev = brick->base_info.backing_file->f_mapping->host->i_sb->s_bdev;
}
if (unlikely(len <= 0)) {
MARS_ERR("bad bio len %d\n", len);
if (unlikely(ilen <= 0)) {
MARS_ERR("bad bio len %d\n", ilen);
status = -EINVAL;
goto out;
}
sector = pos >> 9; // TODO: make dynamic
sector_offset = pos & ((1 << 9) - 1); // TODO: make dynamic
data_offset = ((unsigned long)data) & ((1 << 9) - 1); // TODO: make dynamic
if (unlikely(sector_offset != data_offset)) {
MARS_ERR("bad alignment: offset %d != %d\n", sector_offset, data_offset);
}
// round down to start of first sector
data -= sector_offset;
len += sector_offset;
ilen += sector_offset;
pos -= sector_offset;
// round up to full sector
len = (((len - 1) >> 9) + 1) << 9; // TODO: make dynamic
ilen = (((ilen - 1) >> 9) + 1) << 9; // TODO: make dynamic
// map onto pages. TODO: allow higher-order pages (performance!)
page_offset = pos & (PAGE_SIZE - 1);
page_len = len + page_offset;
page_len = ilen + page_offset;
bvec_count = (page_len - 1) / PAGE_SIZE + 1;
if (bvec_count > brick->bvec_max)
bvec_count = brick->bvec_max;
@ -235,9 +243,9 @@ static int make_bio(struct buf_brick *brick, struct bio **_bio, void *data, loff
goto out;
status = 0;
for (i = 0; i < bvec_count && len > 0; i++) {
for (i = 0; i < bvec_count && ilen > 0; i++) {
int myrest = PAGE_SIZE - page_offset;
int mylen = len;
int mylen = ilen;
if (mylen > myrest)
mylen = myrest;
@ -249,15 +257,16 @@ static int make_bio(struct buf_brick *brick, struct bio **_bio, void *data, loff
bio->bi_io_vec[i].bv_offset = page_offset;
data += mylen;
len -= mylen;
ilen -= mylen;
status += mylen;
page_offset = 0;
//MARS_INF("page_offset=%d mylen=%d (new len=%d, new status=%d)\n", page_offset, mylen, ilen, status);
}
if (unlikely(len)) {
if (unlikely(ilen != 0)) {
bio_put(bio);
bio = NULL;
MARS_ERR("computation of bvec_count %d was wrong, diff=%d\n", bvec_count, len);
MARS_ERR("computation of bvec_count %d was wrong, diff=%d\n", bvec_count, ilen);
status = -EIO;
goto out;
}
@ -270,12 +279,14 @@ static int make_bio(struct buf_brick *brick, struct bio **_bio, void *data, loff
bio->bi_private = NULL; // must be filled in later
bio->bi_end_io = NULL; // must be filled in later
bio->bi_rw = 0; // must be filled in later
// ignore rounding-down on return
if (status >= sector_offset)
status -= sector_offset;
// ignore rounding on return
if (status > len)
status = len;
out:
*_bio = bio;
if (status < 0)
MARS_ERR("error %d\n", status);
return status;
}
@ -432,6 +443,14 @@ again:
new->bf_pos = base_pos;
new->bf_base_index = ((unsigned int)base_pos) >> brick->backing_order;
new->bf_flags = 0;
/* Important optimization: treat whole buffers as uptodate
* upon first write.
*/
if (mref->ref_may_write != READ &&
((!base_offset && mref->ref_len == brick->backing_size) ||
(mref->ref_pos >= brick->base_info.current_size))) {
new->bf_flags |= MARS_REF_UPTODATE;
}
atomic_set(&new->bf_count, 1);
new->bf_bio_status = 0;
atomic_set(&new->bf_bio_count, 0);
@ -630,8 +649,13 @@ static int _buf_make_bios(struct buf_brick *brick, struct buf_head *bf, void *st
mref_a->cb.cb_prev = NULL;
len = make_bio(brick, &bio, start_data, start_pos, start_len);
if (unlikely(len <= 0 || !bio)) {
buf_free_mars_ref(mref);
if (unlikely(len < 0)) {
status = len;
break;
}
if (unlikely(len == 0 || !bio)) {
status = -EIO;
//buf_free_mars_ref(mref);
break;
}
@ -647,9 +671,12 @@ static int _buf_make_bios(struct buf_brick *brick, struct buf_head *bf, void *st
start_len -= len;
iters++;
}
if (!start_len)
if (likely(!start_len))
status = 0;
#if 1
else {
MARS_ERR("start_len %d != 0 (error %d)\n", start_len, status);
}
if (iters != 1) {
MARS_INF("start_pos=%lld start_len=%d iters=%d, status=%d\n", start_pos, start_len, iters, status);
}
@ -669,6 +696,7 @@ static int _buf_make_bios(struct buf_brick *brick, struct buf_head *bf, void *st
cb = mref->ref_cb;
if (status < 0) { // clean up
MARS_ERR("reporting error %d\n", status);
cb->cb_error = status;
cb->cb_fn(cb);
#if 0
@ -685,7 +713,7 @@ static int _buf_make_bios(struct buf_brick *brick, struct buf_head *bf, void *st
atomic_inc(&bf->bf_bio_count);
MARS_DBG("starting buf IO mref=%p bio=%p bf=%p bf_count=%d bf_bio_count=%d\n", mref, mref->orig_bio, bf, atomic_read(&bf->bf_count), atomic_read(&bf->bf_bio_count));
#if 0
#ifndef FAKE_IO
GENERIC_INPUT_CALL(input, mars_ref_io, mref, rw);
#else
// fake IO for testing
@ -890,6 +918,7 @@ static void buf_ref_io(struct buf_output *output, struct mars_ref_object *mref,
CHECK_ATOMIC(&bf->bf_count, 1);
if (rw != READ) {
loff_t end;
if (unlikely(mref->ref_may_write == READ)) {
MARS_ERR("sorry, forgotten to set ref_may_write\n");
goto callback;
@ -898,6 +927,11 @@ static void buf_ref_io(struct buf_output *output, struct mars_ref_object *mref,
MARS_ERR("sorry, writing is only allowed on UPTODATE buffers\n");
goto callback;
}
end = mref->ref_pos + mref->ref_len;
//FIXME: race condition :(
if (end > brick->base_info.current_size) {
brick->base_info.current_size = end;
}
}
mref->ref_rw = rw;

View File

@ -20,6 +20,23 @@
///////////////////////// own helper functions ////////////////////////
#define CHECK_ERR(output,fmt,args...) \
do { \
struct check_input *input = (output)->brick->inputs[0]; \
struct generic_output *other = (void*)input->connect; \
if (other) { \
MARS_ERR("instance %d/%s/%s: " fmt, \
(output)->instance_nr, \
other->type->type_name, \
other->output_name, \
##args); \
} else { \
MARS_ERR("instance %d: " fmt, \
(output)->instance_nr, \
##args); \
} \
} while (0)
static void check_buf_endio(struct generic_callback *cb)
{
struct check_mars_ref_aspect *mref_a;
@ -59,14 +76,14 @@ static void check_buf_endio(struct generic_callback *cb)
if (atomic_dec_and_test(&mref_a->callback_count)) {
atomic_set(&mref_a->callback_count, 1);
MARS_ERR("instance %d/%s: too many callbacks on %p\n", output->instance_nr, input->connect->type->type_name, mref);
CHECK_ERR(output, "too many callbacks on %p\n", mref);
}
#ifdef CHECK_LOCK
traced_lock(&output->check_lock, flags);
if (list_empty(&mref_a->mref_head)) {
MARS_ERR("instance %d/%s: list entry missing on %p\n", output->instance_nr, input->connect->type->type_name, mref);
CHECK_ERR(output, "list entry missing on %p\n", mref);
}
list_del_init(&mref_a->mref_head);
@ -82,6 +99,10 @@ static void check_buf_endio(struct generic_callback *cb)
MARS_FAT("cannot get chain callback\n");
return;
}
#if 1
mref->ref_cb = prev_cb;
mref_a->installed = false;
#endif
prev_cb->cb_fn(prev_cb);
return;
fatal:
@ -138,8 +159,8 @@ static int check_watchdog(void *data)
struct generic_object_layout *object_layout;
int i;
mref_a->last_jiffies = now + 600 * HZ;
MARS_ERR("================================\n");
MARS_ERR("instance %d: mref %p callback is missing for more than %d seconds.\n", output->instance_nr, mref, timeout);
MARS_INF("================================\n");
CHECK_ERR(output, "mref %p callback is missing for more than %d seconds.\n", mref, timeout);
object_layout = (void*)mref->object_layout;
//dump_mem(mref, object_layout->object_size);
for (i = 0; i < object_layout->aspect_count; i++) {
@ -154,7 +175,7 @@ static int check_watchdog(void *data)
MARS_INF("--- aspect %s ---:\n", aspect_layout->aspect_type->aspect_type_name);
dump_mem(((void*)mref + pos), aspect_layout->aspect_type->aspect_size);
}
MARS_ERR("================================\n");
MARS_INF("================================\n");
}
}
@ -198,7 +219,7 @@ static void check_ref_io(struct check_output *output, struct mars_ref_object *mr
if (atomic_dec_and_test(&mref_a->call_count)) {
atomic_set(&mref_a->call_count, 1);
MARS_ERR("instance %d/%s: multiple parallel calls on %p\n", output->instance_nr, input->connect->type->type_name, mref);
CHECK_ERR(output, "multiple parallel calls on %p\n", mref);
}
atomic_set(&mref_a->callback_count, 2);
@ -206,7 +227,7 @@ static void check_ref_io(struct check_output *output, struct mars_ref_object *mr
traced_lock(&output->check_lock, flags);
if (!list_empty(&mref_a->mref_head)) {
MARS_ERR("instance %d/%s: list head not empty on %p\n", output->instance_nr, input->connect->type->type_name, mref);
CHECK_ERR(output, "list head not empty on %p\n", mref);
list_del(&mref_a->mref_head);
}
list_add_tail(&mref_a->mref_head, &output->mref_anchor);
@ -253,7 +274,15 @@ static void check_mars_ref_aspect_exit_fn(struct generic_aspect *_ini, void *_in
struct check_mars_ref_aspect *ini = (void*)_ini;
(void)ini;
#ifdef CHECK_LOCK
CHECK_HEAD_EMPTY(&ini->mref_head);
if (!list_empty(&ini->mref_head)) {
struct check_output *output = ini->output;
if (output) {
CHECK_ERR(output, "list head not empty on %p\n", ini->object);
INIT_LIST_HEAD(&ini->mref_head);
} else {
CHECK_HEAD_EMPTY(&ini->mref_head);
}
}
#endif
}

View File

@ -54,11 +54,17 @@ static void write_aops(struct device_sio_output *output, struct mars_ref_object
struct bio *bio = mref->orig_bio;
loff_t pos = ((loff_t)bio->bi_sector << 9);
struct file *file = output->filp;
struct address_space *mapping = file->f_mapping;
struct address_space *mapping;
struct bio_vec *bvec;
int i;
int ret = 0;
if (unlikely(!file)) {
MARS_FAT("No FILE\n");
return;
}
mapping = file->f_mapping;
MARS_DBG("write_aops pos=%llu len=%d\n", pos, bio->bi_size);
mutex_lock(&mapping->host->i_mutex);
@ -236,7 +242,10 @@ static void device_sio_ref_io(struct device_sio_output *output, struct mars_ref_
struct generic_callback *cb = mref->ref_cb;
bool barrier = (rw != READ && bio_rw_flagged(bio, BIO_RW_BARRIER));
int test;
if (unlikely(!output->filp)) {
}
if (barrier) {
MARS_INF("got barrier request\n");
sync_file(output);
@ -406,35 +415,45 @@ static int device_sio_brick_construct(struct device_sio_brick *brick)
return 0;
}
static int device_sio_output_construct(struct device_sio_output *output)
static int device_sio_switch(struct device_sio_brick *brick, bool state)
{
mm_segment_t oldfs;
struct device_sio_output *output = brick->outputs[0];
char *path = output->output_name;
int flags = O_CREAT | O_RDWR | O_LARGEFILE;
int prot = 0600;
char *path = "/tmp/testfile.img";
mm_segment_t oldfs;
if (state) {
oldfs = get_fs();
set_fs(get_ds());
output->filp = filp_open(path, flags, prot);
set_fs(oldfs);
if (IS_ERR(output->filp)) {
int err = PTR_ERR(output->filp);
MARS_ERR("can't open file '%s' status=%d\n", path, err);
output->filp = NULL;
return err;
}
#if 0
{
struct address_space *mapping = output->filp->f_mapping;
int old_gfp_mask = mapping_gfp_mask(mapping);
mapping_set_gfp_mask(mapping, old_gfp_mask & ~(__GFP_IO|__GFP_FS));
}
#endif
MARS_INF("opened file '%s'\n", path);
} else {
// TODO: close etc...
}
return 0;
}
static int device_sio_output_construct(struct device_sio_output *output)
{
struct task_struct *watchdog;
int index;
oldfs = get_fs();
set_fs(get_ds());
output->filp = filp_open(path, flags, prot);
set_fs(oldfs);
if (IS_ERR(output->filp)) {
int err = PTR_ERR(output->filp);
MARS_ERR("can't open file '%s' status=%d\n", path, err);
output->filp = NULL;
return err;
}
#if 0
{
struct address_space *mapping = output->filp->f_mapping;
int old_gfp_mask = mapping_gfp_mask(mapping);
mapping_set_gfp_mask(mapping, old_gfp_mask & ~(__GFP_IO|__GFP_FS));
}
#endif
spin_lock_init(&output->g_lock);
output->index = 0;
for (index = 0; index <= WITH_THREAD; index++) {
@ -480,6 +499,7 @@ static int device_sio_output_destruct(struct device_sio_output *output)
///////////////////////// static structs ////////////////////////
static struct device_sio_brick_ops device_sio_brick_ops = {
.brick_switch = device_sio_switch,
};
static struct device_sio_output_ops device_sio_output_ops = {

View File

@ -3,6 +3,20 @@
//#define BRICK_DEBUGGING
//#define MARS_DEBUGGING
#define DEFAULT_ORDER 0
#define DEFAULT_BUFFERS (32768 / 2)
#define DEFAULT_MEM (1024 / 4 * 256)
#define TRANS_ORDER 4
#define TRANS_BUFFERS (8)
#define TRANS_MEM (1024 / 4)
#define CONF_TEST
#define CONF_BUF
#define CONF_USEBUF
#define CONF_TRANS
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/string.h>
@ -19,12 +33,19 @@
#include "mars_device_sio.h"
#include "mars_buf.h"
#include "mars_usebuf.h"
#include "mars_trans_logger.h"
GENERIC_ASPECT_FUNCTIONS(generic,mars_ref);
static struct generic_brick *if_brick = NULL;
static struct if_device_brick *_if_brick = NULL;
static struct generic_brick *usebuf_brick = NULL;
static struct generic_brick *trans_brick = NULL;
static struct generic_brick *tbuf_brick = NULL;
static struct buf_brick *_tbuf_brick = NULL;
static struct generic_brick *tdevice_brick = NULL;
static struct generic_brick *buf_brick = NULL;
static struct buf_brick *_buf_brick = NULL;
static struct generic_brick *device_brick = NULL;
@ -69,7 +90,7 @@ void make_test_instance(void)
return NULL;
}
status = generic_brick_init_full(mem, size, brick_type, NULL, NULL, names);
MARS_DBG("done (status=%d)\n", status);
MARS_INF("init '%s' (status=%d)\n", brick_type->type_name, status);
if (status) {
MARS_ERR("cannot init brick %s\n", brick_type->type_name);
msleep(60000);
@ -81,7 +102,7 @@ void make_test_instance(void)
void connect(struct generic_input *a, struct generic_output *b)
{
int status;
#if 0
#ifdef CONF_TEST
struct generic_brick *tmp = brick(&check_brick_type);
status = generic_connect(a, tmp->outputs[0]);
@ -102,10 +123,12 @@ void make_test_instance(void)
MARS_DBG("starting....\n");
device_brick = brick(&device_sio_brick_type);
device_brick->outputs[0]->output_name = "/tmp/testfile.img";
device_brick->ops->brick_switch(device_brick, true);
if_brick = brick(&if_device_brick_type);
#if 1 // usebuf zwischenschalten
#if defined(CONF_USEBUF) && defined(CONF_BUF) // usebuf zwischenschalten
usebuf_brick = brick(&usebuf_brick_type);
connect(if_brick->inputs[0], usebuf_brick->outputs[0]);
@ -116,26 +139,54 @@ void make_test_instance(void)
last = if_brick->inputs[0];
#endif
#if 1 // buf zwischenschalten
#define MEM (1024 / 4 * 256)
#ifdef CONF_BUF // Standard-buf zwischenschalten
buf_brick = brick(&buf_brick_type);
_buf_brick = (void*)buf_brick;
_buf_brick->backing_order = 0;
//_buf_brick->backing_order = 2;
//_buf_brick->backing_order = 4;
//_buf_brick->backing_order = 7;
_buf_brick->outputs[0]->output_name = "/tmp/testfile.img";
_buf_brick->backing_order = DEFAULT_ORDER;
_buf_brick->backing_size = PAGE_SIZE << _buf_brick->backing_order;
#if 0
_buf_brick->max_count = MEM >> _buf_brick->backing_order;
#ifdef DEFAULT_BUFFERS
_buf_brick->max_count = DEFAULT_BUFFERS;
#else
_buf_brick->max_count = 32768 / 2;
_buf_brick->max_count = DEFAULT_MEM >> _buf_brick->backing_order;
#endif
connect(last, buf_brick->outputs[0]);
connect(buf_brick->inputs[0], device_brick->outputs[0]);
#ifdef CONF_TRANS // trans_logger plus Infrastruktur zwischenschalten
tdevice_brick = brick(&device_sio_brick_type);
tdevice_brick->outputs[0]->output_name = "/tmp/testfile.log";
tdevice_brick->ops->brick_switch(tdevice_brick, true);
tbuf_brick = brick(&buf_brick_type);
_tbuf_brick = (void*)tbuf_brick;
_tbuf_brick->outputs[0]->output_name = "/tmp/testfile.log";
_tbuf_brick->backing_order = TRANS_ORDER;
_tbuf_brick->backing_size = PAGE_SIZE << _tbuf_brick->backing_order;
#ifdef TRANS_BUFFERS
_tbuf_brick->max_count = TRANS_BUFFERS;
#else
_tbuf_brick->max_count = TRANS_MEM >> _tbuf_brick->backing_order;
#endif
connect(tbuf_brick->inputs[0], tdevice_brick->outputs[0]);
trans_brick = brick(&trans_logger_brick_type);
connect(trans_brick->inputs[0], buf_brick->outputs[0]);
connect(trans_brick->inputs[1], tbuf_brick->outputs[0]);
connect(last, trans_brick->outputs[0]);
#else
(void)trans_brick;
(void)tbuf_brick;
(void)_tbuf_brick;
(void)tdevice_brick;
connect(last, buf_brick->outputs[0]);
#endif
if (false) { // ref-counting no longer valid
struct buf_output *output = _buf_brick->outputs[0];
struct mars_ref_object *mref = NULL;

View File

@ -9,6 +9,7 @@
#include <linux/module.h>
#include <linux/string.h>
#include <linux/bio.h>
#include <linux/kthread.h>
#include "mars.h"
@ -20,6 +21,7 @@
#define CODE_UNKNOWN 0
#define CODE_WRITE_NEW 1
#define CODE_WRITE_OLD 2
#define START_MAGIC 0xa8f7e908d9177957ll
#define END_MAGIC 0x74941fb74ab5726dll
@ -60,6 +62,8 @@ void *log_reserve(struct trans_logger_input *input, struct log_header *l)
int status;
int offset;
//MARS_INF("reserving %d at %lld\n", l->l_len, input->log_pos);
if (unlikely(input->log_mref)) {
MARS_ERR("mref already existing\n");
goto err;
@ -112,7 +116,7 @@ err:
return NULL;
}
bool log_finalize(struct trans_logger_input *input, int len, void (*endio)(struct generic_callback *cb), void *private)
bool log_finalize(struct trans_logger_input *input, int len, void (*endio)(struct generic_callback *cb), struct trans_logger_mars_ref_aspect *orig_mref_a)
{
struct mars_ref_object *mref = input->log_mref;
struct trans_logger_mars_ref_aspect *mref_a;
@ -122,10 +126,11 @@ bool log_finalize(struct trans_logger_input *input, int len, void (*endio)(struc
int offset;
bool ok = false;
if (unlikely(!input->log_mref)) {
if (unlikely(!mref)) {
MARS_ERR("mref is missing\n");
goto err;
}
input->log_mref = NULL;
if (unlikely(len > input->payload_len)) {
MARS_ERR("trying to write more than reserved\n");
goto put;
@ -167,9 +172,9 @@ bool log_finalize(struct trans_logger_input *input, int len, void (*endio)(struc
cb = &mref_a->cb;
cb->cb_fn = endio;
cb->cb_private = private;
cb->cb_error = 0;
cb->cb_prev = NULL;
cb->cb_private = orig_mref_a;
mref->ref_cb = cb;
GENERIC_INPUT_CALL(input, mars_ref_io, mref, WRITE);
@ -201,6 +206,17 @@ static inline void q_insert(struct logger_queue *q, struct trans_logger_mars_ref
traced_unlock(&q->q_lock, flags);
}
static inline void q_pushback(struct logger_queue *q, struct trans_logger_mars_ref_aspect *mref_a)
{
unsigned long flags;
traced_lock(&q->q_lock, flags);
list_add(&mref_a->q_head, &q->q_anchor);
traced_unlock(&q->q_lock, flags);
}
static inline struct trans_logger_mars_ref_aspect *q_fetch(struct logger_queue *q)
{
struct trans_logger_mars_ref_aspect *mref_a = NULL;
@ -209,8 +225,9 @@ static inline struct trans_logger_mars_ref_aspect *q_fetch(struct logger_queue *
traced_lock(&q->q_lock, flags);
if (likely(!list_empty(&q->q_anchor))) {
mref_a = container_of(q->q_anchor.next, struct trans_logger_mars_ref_aspect, q_head);
list_del_init(q->q_anchor.next);
struct list_head *next = q->q_anchor.next;
list_del_init(next);
mref_a = container_of(next, struct trans_logger_mars_ref_aspect, q_head);
}
traced_unlock(&q->q_lock, flags);
@ -279,7 +296,7 @@ static struct trans_logger_mars_ref_aspect *hash_find(struct hash_anchor *table,
}
if (res) {
atomic_inc(&res->hash_count);
atomic_inc(&res->object->ref_count);
}
traced_readunlock(&start->hash_lock, flags);
@ -287,35 +304,44 @@ static struct trans_logger_mars_ref_aspect *hash_find(struct hash_anchor *table,
return res;
}
static inline void hash_insert(struct hash_anchor *table, struct trans_logger_mars_ref_aspect *elem)
static inline void hash_insert(struct hash_anchor *table, struct trans_logger_mars_ref_aspect *elem_a)
{
unsigned int base_index = ((unsigned int)elem->object->ref_pos) >> REGION_SIZE_BITS;
unsigned int base_index = ((unsigned int)elem_a->object->ref_pos) >> REGION_SIZE_BITS;
int hash = hash_fn(base_index);
struct hash_anchor *start = &table[hash];
unsigned int flags;
traced_writelock(&start->hash_lock, flags);
list_add(&elem->hash_head, &start->hash_anchor);
#if 1
CHECK_HEAD_EMPTY(&elem_a->hash_head);
#endif
list_add(&elem_a->hash_head, &start->hash_anchor);
traced_writeunlock(&start->hash_lock, flags);
}
static inline void hash_put(struct hash_anchor *table, struct trans_logger_mars_ref_aspect *elem)
static inline bool hash_put(struct hash_anchor *table, struct trans_logger_mars_ref_aspect *elem_a)
{
unsigned int base_index = ((unsigned int)elem->object->ref_pos) >> REGION_SIZE_BITS;
struct mars_ref_object *elem = elem_a->object;
unsigned int base_index = ((unsigned int)elem->ref_pos) >> REGION_SIZE_BITS;
int hash = hash_fn(base_index);
struct hash_anchor *start = &table[hash];
unsigned int flags;
bool res;
traced_writelock(&start->hash_lock, flags);
CHECK_ATOMIC(&elem->hash_count, 1);
if (atomic_dec_and_test(&elem->hash_count)) {
list_del_init(&elem->hash_head);
CHECK_ATOMIC(&elem->ref_count, 1);
res = atomic_dec_and_test(&elem->ref_count);
if (res) {
list_del_init(&elem_a->hash_head);
}
traced_writeunlock(&start->hash_lock, flags);
return res;
}
////////////////// own brick / input / output operations //////////////////
@ -326,6 +352,8 @@ static int trans_logger_get_info(struct trans_logger_output *output, struct mars
return GENERIC_INPUT_CALL(input, mars_get_info, info);
}
static void trans_logger_ref_put(struct trans_logger_output *output, struct mars_ref_object *mref);
static int _read_ref_get(struct trans_logger_output *output, struct trans_logger_mars_ref_aspect *mref_a)
{
struct mars_ref_object *mref = mref_a->object;
@ -344,22 +372,21 @@ static int _read_ref_get(struct trans_logger_output *output, struct trans_logger
if (diff > 0) {
/* Although the shadow is overlapping, the
* region before its start is _not_ shadowed.
* Thus we return this (smaller) unshadowed
* Thus we must return that (smaller) unshadowed
* region.
*/
mref->ref_len = diff;
hash_put(output->hash_table, shadow_a);
trans_logger_ref_put(output, shadow);
goto call_through;
}
/* Attach mref to the existing shadow.
/* Attach mref to the existing shadow ("slave shadow").
*/
restlen = shadow->ref_len + diff;
if (mref->ref_len > restlen)
mref->ref_len = restlen;
mref->ref_data = shadow->ref_data - diff;
mref_a->orig_data = shadow->ref_data;
mref->ref_flags = shadow->ref_flags;
mref_a->is_shadow = true;
mref_a->shadow_ref = shadow_a;
return mref->ref_len;
}
@ -377,11 +404,11 @@ static int _write_ref_get(struct trans_logger_output *output, struct trans_logge
return -ENOMEM;
}
mref_a->orig_data = mref->ref_data;
mref_a->output = output;
mref_a->stamp = CURRENT_TIME;
mref->ref_flags = 0;
mref_a->is_shadow = true;
mref->ref_flags = MARS_REF_UPTODATE;
mref_a->shadow_ref = mref_a; // cyclic self-reference
atomic_set(&mref->ref_count, 1);
return mref->ref_len;
}
@ -408,7 +435,8 @@ static int trans_logger_ref_get(struct trans_logger_output *output, struct mars_
static void trans_logger_ref_put(struct trans_logger_output *output, struct mars_ref_object *mref)
{
struct trans_logger_mars_ref_aspect *mref_a;
struct trans_logger_input *input = output->brick->inputs[0];
struct trans_logger_mars_ref_aspect *shadow_a;
struct trans_logger_input *input;
mref_a = trans_logger_mars_ref_get_aspect(output, mref);
if (unlikely(!mref_a)) {
@ -416,18 +444,27 @@ static void trans_logger_ref_put(struct trans_logger_output *output, struct mars
return;
}
if (mref_a->is_shadow) {
hash_put(output->hash_table, mref_a);
CHECK_ATOMIC(&mref->ref_count, 1);
if (!atomic_dec_and_test(&mref->ref_count))
return;
kfree(mref_a->orig_data);
trans_logger_free_mars_ref(mref);
// are we a shadow?
shadow_a = mref_a->shadow_ref;
if (shadow_a) {
if (shadow_a != mref_a) { // we are a slave shadow
MARS_INF("slave\n");
CHECK_HEAD_EMPTY(&mref_a->hash_head);
if (atomic_dec_and_test(&mref->ref_count)) {
trans_logger_free_mars_ref(mref);
}
}
// now put the master shadow
if (hash_put(output->hash_table, shadow_a)) {
struct mars_ref_object *shadow = shadow_a->object;
kfree(shadow->ref_data);
MARS_INF("hm?\n");
trans_logger_free_mars_ref(shadow);
}
return;
}
input = output->brick->inputs[0];
GENERIC_INPUT_CALL(input, mars_ref_put, mref);
}
@ -436,6 +473,8 @@ static void trans_logger_ref_io(struct trans_logger_output *output, struct mars_
struct trans_logger_mars_ref_aspect *mref_a;
struct trans_logger_input *input = output->brick->inputs[0];
CHECK_ATOMIC(&mref->ref_count, 1);
mref_a = trans_logger_mars_ref_get_aspect(output, mref);
if (unlikely(!mref_a)) {
MARS_FAT("cannot get mref_a on %p\n", mref);
@ -443,15 +482,31 @@ static void trans_logger_ref_io(struct trans_logger_output *output, struct mars_
}
// is this a shadow buffer?
if (mref_a->is_shadow) {
if (mref_a->shadow_ref) {
mref->ref_rw = rw;
if (rw == READ) {
// nothing to do: directly signal success.
struct generic_callback *cb = mref->ref_cb;
cb->cb_error = 0;
cb->cb_fn(cb);
trans_logger_ref_put(output, mref);
// no touch of ref_count necessary
} else {
#if 1
if (unlikely(mref_a->shadow_ref != mref_a)) {
MARS_ERR("something is wrong: %p != %p\n", mref_a->shadow_ref, mref_a);
}
CHECK_HEAD_EMPTY(&mref_a->hash_head);
CHECK_HEAD_EMPTY(&mref_a->q_head);
if (unlikely(mref->ref_flags & (MARS_REF_READING | MARS_REF_WRITING))) {
MARS_ERR("bad flags %d\n", mref->ref_flags);
}
#endif
mref->ref_flags |= MARS_REF_WRITING;
atomic_inc(&mref->ref_count); // paired with trans_logger_ref_put() in phase4_endio()
//MARS_INF("hashing %d at %lld\n", mref->ref_len, mref->ref_pos);
hash_insert(output->hash_table, mref_a);
q_insert(&output->q_phase1, mref_a);
wake_up(&output->event);
}
return;
}
@ -466,80 +521,343 @@ static void trans_logger_ref_io(struct trans_logger_output *output, struct mars_
////////////////////////////// worker thread //////////////////////////////
static void phase2_endio(struct generic_callback *cb)
{
//struct trans_logger_mars_ref_aspect *mref_a = cb->cb_private;
//struct mars_ref_object *mref = mref_a->object;
struct generic_callback *cb_prev = cb->cb_prev;
if (unlikely(!cb_prev)) {
MARS_FAT("callback chain is corrupted\n");
return;
}
cb_prev->cb_error = cb->cb_error;
cb_prev->cb_fn(cb_prev);
//trans_logger_ref_put(mref_a->output, mref);
}
static void phase2_startio(struct trans_logger_mars_ref_aspect *mref_a)
{
struct mars_ref_object *mref = mref_a->object;
struct generic_callback *cb = &mref_a->cb;
struct trans_logger_output *output = mref_a->output;
struct trans_logger_input *input = output->brick->inputs[0];
cb->cb_fn = phase2_endio;
cb->cb_private = mref_a;
cb->cb_error = 0;
cb->cb_prev = mref->ref_cb;
mref->ref_cb = cb;
GENERIC_INPUT_CALL(input, mars_ref_io, mref, READ);
}
/*********************************************************************
* Phase 1: write transaction log entry for the original write request.
*/
static void phase1_endio(struct generic_callback *cb)
{
struct trans_logger_mars_ref_aspect *mref_a = cb->cb_private;
//struct mars_ref_object *mref = mref_a->object;
struct trans_logger_output *output = mref_a->output;
struct generic_callback *cb_prev = &mref_a->cb;
struct trans_logger_mars_ref_aspect *orig_mref_a;
struct mars_ref_object *orig_mref;
struct trans_logger_output *output;
struct generic_callback *orig_cb;
cb_prev->cb_error = cb->cb_error;
cb_prev->cb_fn(cb_prev);
q_insert(&output->q_phase2, mref_a);
if (unlikely(!cb)) {
MARS_FAT("invalid cb\n");
return;
}
orig_mref_a = cb->cb_private;
if (unlikely(!orig_mref_a)) {
MARS_FAT("invalid orig_mref_a\n");
return;
}
output = orig_mref_a->output;
if (unlikely(!output)) {
MARS_FAT("invalid output\n");
return;
}
orig_mref = orig_mref_a->object;
orig_cb = orig_mref->ref_cb;
if (unlikely(!orig_cb)) {
MARS_FAT("invalid orig_cb\n");
return;
}
// signal completion to the upper layer, as early as possible
orig_cb->cb_error = cb->cb_error;
if (likely(cb->cb_error >= 0)) {
orig_mref->ref_flags &= ~MARS_REF_WRITING;
orig_mref->ref_flags |= MARS_REF_UPTODATE;
}
orig_cb->cb_fn(orig_cb);
// queue up for the next phase
q_insert(&output->q_phase2, orig_mref_a);
wake_up(&output->event);
}
static bool phase1_startio(struct trans_logger_mars_ref_aspect *mref_a)
static bool phase1_startio(struct trans_logger_mars_ref_aspect *orig_mref_a)
{
struct mars_ref_object *mref = mref_a->object;
struct trans_logger_output *output = mref_a->output;
struct mars_ref_object *orig_mref = orig_mref_a->object;
struct trans_logger_output *output = orig_mref_a->output;
struct trans_logger_input *input = output->brick->inputs[1];
struct log_header l = {
.l_stamp = mref_a->stamp,
.l_pos = mref->ref_pos,
.l_len = mref->ref_len,
.l_stamp = orig_mref_a->stamp,
.l_pos = orig_mref->ref_pos,
.l_len = orig_mref->ref_len,
.l_code = CODE_WRITE_NEW,
};
void *data;
bool ok;
#if 1
if (!orig_mref->ref_cb)
MARS_ERR("missing ref_cb\n");
#endif
data = log_reserve(input, &l);
if (unlikely(!data)) {
return false;
}
memcpy(data, orig_mref->ref_data, orig_mref->ref_len);
ok = log_finalize(input, orig_mref->ref_len, phase1_endio, orig_mref_a);
if (unlikely(!ok)) {
return false;
}
return true;
}
/*********************************************************************
* Phase 2: read original version of data.
* This happens _after_ phase 1, deliberately.
* We are explicitly dealing with old and new versions.
* The new version is hashed in memory all the time (such that parallel
* READs will see them), so we hvae plenty of time for getting the
* old version from disk somewhen later, e.g. when IO contention is low.
*/
static void phase2_endio(struct generic_callback *cb)
{
struct trans_logger_mars_ref_aspect *sub_mref_a = cb->cb_private;
struct trans_logger_output *output = sub_mref_a->output;
if (unlikely(cb->cb_error < 0)) {
MARS_FAT("IO error %d\n", cb->cb_error);
return;
}
// queue up for the next phase
#if 0
q_insert(&output->q_phase3, sub_mref_a);
#else
q_insert(&output->q_phase4, sub_mref_a);
#endif
wake_up(&output->event);
}
static bool phase2_startio(struct trans_logger_mars_ref_aspect *orig_mref_a)
{
struct mars_ref_object *orig_mref = orig_mref_a->object;
struct trans_logger_output *output = orig_mref_a->output;
struct trans_logger_input *input = output->brick->inputs[0];
struct mars_ref_object *sub_mref;
struct trans_logger_mars_ref_aspect *sub_mref_a;
struct generic_callback *cb;
int status;
/* allocate internal sub_mref for further work
*/
sub_mref = trans_logger_alloc_mars_ref(&input->hidden_output, &input->ref_object_layout);
if (unlikely(!sub_mref)) {
MARS_FAT("cannot alloc sub_mref\n");
return false;
}
sub_mref->ref_pos = orig_mref->ref_pos;
sub_mref->ref_len = orig_mref->ref_len;
sub_mref->ref_may_write = WRITE;
sub_mref_a = trans_logger_mars_ref_get_aspect(&input->hidden_output, sub_mref);
if (unlikely(!sub_mref_a)) {
MARS_FAT("cannot get my own mref_a\n");
return false;
}
sub_mref_a->orig_mref_a = orig_mref_a;
sub_mref_a->output = output;
CHECK_ATOMIC(&orig_mref->ref_count, 1);
status = GENERIC_INPUT_CALL(input, mars_ref_get, sub_mref);
if (unlikely(status < 0)) {
MARS_FAT("cannot get my own mref_a\n");
return false;
}
if (unlikely(sub_mref->ref_len < orig_mref->ref_len)) {
MARS_ERR("NYI: multiple sub-IOs\n");
}
cb = &sub_mref_a->cb;
cb->cb_fn = phase2_endio;
cb->cb_private = sub_mref_a;
cb->cb_error = 0;
cb->cb_prev = NULL;
sub_mref->ref_cb = cb;
GENERIC_INPUT_CALL(input, mars_ref_io, sub_mref, READ);
return true;
}
/*********************************************************************
* Phase 3: log the old disk version.
*/
static void phase3_endio(struct generic_callback *cb)
{
struct trans_logger_mars_ref_aspect *sub_mref_a = cb->cb_private;
struct trans_logger_output *output = sub_mref_a->output;
if (unlikely(cb->cb_error < 0)) {
MARS_FAT("IO error %d\n", cb->cb_error);
return;
}
// queue up for the next phase
q_insert(&output->q_phase4, sub_mref_a);
wake_up(&output->event);
}
static bool phase3_startio(struct trans_logger_mars_ref_aspect *sub_mref_a)
{
struct mars_ref_object *sub_mref = sub_mref_a->object;
struct trans_logger_output *output = sub_mref_a->output;
struct trans_logger_input *input = output->brick->inputs[1];
struct log_header l = {
.l_stamp = sub_mref_a->stamp,
.l_pos = sub_mref->ref_pos,
.l_len = sub_mref->ref_len,
.l_code = CODE_WRITE_OLD,
};
void *data;
bool ok;
data = log_reserve(input, &l);
if (unlikely(!data)) {
return false;
}
memcpy(data, mref->ref_data, mref->ref_len);
memcpy(data, sub_mref->ref_data, sub_mref->ref_len);
ok = log_finalize(input, mref->ref_len, phase1_endio, mref_a);
ok = log_finalize(input, sub_mref->ref_len, phase3_endio, sub_mref_a);
if (unlikely(!ok)) {
return false;
}
return true;
}
/*********************************************************************
* Phase 4: overwrite old disk version with new version.
*/
static void phase4_endio(struct generic_callback *cb)
{
struct trans_logger_mars_ref_aspect *sub_mref_a = cb->cb_private;
struct trans_logger_mars_ref_aspect *orig_mref_a;
struct mars_ref_object *orig_mref;
if (unlikely(cb->cb_error < 0)) {
MARS_FAT("IO error %d\n", cb->cb_error);
return;
}
//MARS_INF("DONE.\n");
orig_mref_a = sub_mref_a->orig_mref_a;
if (unlikely(!orig_mref_a)) {
MARS_FAT("bad orig_mref_a\n");
return;
}
orig_mref = orig_mref_a->object;
if (unlikely(!orig_mref->ref_pos)) {
MARS_FAT("bad ref_pos\n");
return;
}
#if 1
_CHECK_ATOMIC(&orig_mref->ref_count, >, 1);
#endif
//MARS_INF("put ORIGREF.\n");
trans_logger_ref_put(orig_mref_a->output, orig_mref);
}
static bool phase4_startio(struct trans_logger_mars_ref_aspect *sub_mref_a)
{
struct mars_ref_object *sub_mref = sub_mref_a->object;
struct generic_callback *cb = &sub_mref_a->cb;
struct trans_logger_output *output = sub_mref_a->output;
struct trans_logger_input *input = output->brick->inputs[0];
struct trans_logger_mars_ref_aspect *orig_mref_a = sub_mref_a->orig_mref_a;
struct mars_ref_object *orig_mref;
if (unlikely(!orig_mref_a)) {
MARS_FAT("bad orig_mref_a\n");
return false;
}
orig_mref = orig_mref_a->object;
if (unlikely(!orig_mref->ref_pos)) {
MARS_FAT("bad ref_pos\n");
return false;
}
memcpy(sub_mref->ref_data, orig_mref->ref_data, sub_mref->ref_len);
cb->cb_fn = phase4_endio;
cb->cb_private = sub_mref_a;
cb->cb_error = 0;
cb->cb_prev = NULL;
sub_mref->ref_cb = cb;
GENERIC_INPUT_CALL(input, mars_ref_io, sub_mref, WRITE);
//MARS_INF("put SUBREF.\n");
GENERIC_INPUT_CALL(input, mars_ref_put, sub_mref);
return true;
}
/*********************************************************************
* The logger thread.
* There is only a single instance, dealing with all requests in parallel.
* So there is less need for locking (concept stolen from microkernel
* architectures).
*/
static int trans_logger_thread(void *data)
{
struct trans_logger_output *output = data;
MARS_INF("logger has started.\n");
while (!kthread_should_stop()) {
struct trans_logger_mars_ref_aspect *mref_a;
wait_event_interruptible_timeout(
output->event,
!list_empty(&output->q_phase1.q_anchor) ||
!list_empty(&output->q_phase2.q_anchor) ||
!list_empty(&output->q_phase3.q_anchor),
HZ);
mref_a = q_fetch(&output->q_phase1);
if (mref_a) {
bool ok;
//MARS_INF("got phase1 %p\n", mref_a);
ok = phase1_startio(mref_a);
if (!ok) {
q_pushback(&output->q_phase1, mref_a);
}
}
mref_a = q_fetch(&output->q_phase2);
if (mref_a) {
bool ok;
//MARS_INF("got phase2 %p\n", mref_a);
ok = phase2_startio(mref_a);
if (!ok) {
q_pushback(&output->q_phase2, mref_a);
}
}
mref_a = q_fetch(&output->q_phase3);
if (mref_a) {
bool ok;
//MARS_INF("got phase3 %p\n", mref_a);
ok = phase3_startio(mref_a);
if (!ok) {
q_pushback(&output->q_phase3, mref_a);
}
}
mref_a = q_fetch(&output->q_phase4);
if (mref_a) {
bool ok;
//MARS_INF("got phase4 %p\n", mref_a);
ok = phase4_startio(mref_a);
if (!ok) {
q_pushback(&output->q_phase4, mref_a);
}
}
}
return 0;
}
//////////////// object / aspect constructors / destructors ///////////////
static int trans_logger_mars_ref_aspect_init_fn(struct generic_aspect *_ini, void *_init_data)
@ -547,7 +865,6 @@ static int trans_logger_mars_ref_aspect_init_fn(struct generic_aspect *_ini, voi
struct trans_logger_mars_ref_aspect *ini = (void*)_ini;
INIT_LIST_HEAD(&ini->hash_head);
INIT_LIST_HEAD(&ini->q_head);
atomic_set(&ini->hash_count, 1);
return 0;
}
@ -569,15 +886,25 @@ static int trans_logger_brick_construct(struct trans_logger_brick *brick)
static int trans_logger_output_construct(struct trans_logger_output *output)
{
static int index = 0;
int i;
for (i = 0; i < TRANS_HASH_MAX; i++) {
struct hash_anchor *start = &output->hash_table[i];
rwlock_init(&start->hash_lock);
INIT_LIST_HEAD(&start->hash_anchor);
}
init_waitqueue_head(&output->event);
q_init(&output->q_phase1);
q_init(&output->q_phase2);
q_init(&output->q_phase3);
q_init(&output->q_phase4);
output->thread = kthread_create(trans_logger_thread, output, "mars_logger%d", index++);
if (IS_ERR(output->thread)) {
int error = PTR_ERR(output->thread);
MARS_ERR("cannot create thread, status=%d\n", error);
return error;
}
wake_up_process(output->thread);
return 0;
}

View File

@ -33,12 +33,12 @@ struct trans_logger_mars_ref_aspect {
GENERIC_ASPECT(mars_ref);
struct list_head hash_head;
struct list_head q_head;
atomic_t hash_count;
bool is_shadow;
struct trans_logger_mars_ref_aspect *shadow_ref;
void *orig_data;
struct trans_logger_output *output;
struct timespec stamp;
struct generic_callback cb;
struct trans_logger_mars_ref_aspect *orig_mref_a;
};
struct trans_logger_brick {
@ -48,10 +48,13 @@ struct trans_logger_brick {
struct trans_logger_output {
MARS_OUTPUT(trans_logger);
struct hash_anchor hash_table[TRANS_HASH_MAX];
struct task_struct *thread;
wait_queue_head_t event;
// queues
struct logger_queue q_phase1;
struct logger_queue q_phase2;
struct logger_queue q_phase3;
struct logger_queue q_phase4;
};
struct trans_logger_input {