import mars-48.tgz

This commit is contained in:
Thomas Schoebel-Theuer 2010-11-26 14:45:10 +01:00
parent 95e2e2d391
commit 8d307464da
21 changed files with 3486 additions and 797 deletions

21
Kconfig
View File

@ -14,6 +14,13 @@ config MARS_DUMMY
---help---
Experimental storage System.
config MARS_CHECK
tristate "MARS check brick"
depends on MARS
default m
---help---
Experimental storage System.
config MARS_IF_DEVICE
tristate "interface to a linux device"
depends on MARS
@ -21,6 +28,13 @@ config MARS_IF_DEVICE
---help---
Experimental storage System.
config MARS_DEVICE_AIO
tristate "interface to a linux file"
depends on MARS && AIO
default m
---help---
Experimental storage System.
config MARS_DEVICE_SIO
tristate "interface to a linux file, synchronous"
depends on MARS
@ -35,6 +49,13 @@ config MARS_BUF
---help---
Experimental storage System.
config MARS_USEBUF
tristate "usebuf brick"
depends on MARS
default m
---help---
Experimental storage System.
config MARS_TRANS_LOGGER
tristate "transaction logger"
depends on MARS

View File

@ -2,11 +2,14 @@
# Makefile for MARS
#
obj-$(CONFIG_MARS) += brick.o mars_generic.o mars_check.o
obj-$(CONFIG_MARS) += brick.o mars_generic.o
obj-$(CONFIG_MARS_DUMMY) += mars_dummy.o
obj-$(CONFIG_MARS_CHECK) += mars_check.o
obj-$(CONFIG_MARS_IF_DEVICE) += mars_if_device.o
obj-$(CONFIG_MARS_DEVICE_AIO) += mars_device_aio.o
obj-$(CONFIG_MARS_DEVICE_SIO) += mars_device_sio.o
obj-$(CONFIG_MARS_BUF) += mars_buf.o mars_usebuf.o
obj-$(CONFIG_MARS_BUF) += mars_buf.o
obj-$(CONFIG_MARS_USEBUF) += mars_usebuf.o
obj-$(CONFIG_MARS_TRANS_LOGGER) += mars_trans_logger.o
obj-$(CONFIG_MARS_TEST) += mars_test.o

10
brick.c
View File

@ -308,7 +308,7 @@ int generic_add_aspect(struct generic_output *output, struct generic_object_layo
return -EBADF;
}
min_offset = aspect_layout->aspect_offset + aspect_type->aspect_size;
if (unlikely(object_layout->object_size > min_offset)) {
if (unlikely(object_layout->object_size > min_offset)) {
BRICK_ERR("overlapping aspects %d > %d (aspect_type=%s)\n", object_layout->object_size, min_offset, aspect_type->aspect_type_name);
return -ENOMEM;
}
@ -316,11 +316,11 @@ int generic_add_aspect(struct generic_output *output, struct generic_object_layo
object_layout->object_size = min_offset;
} else {
/* first call: initialize aspect_layout. */
BRICK_DBG("initializing aspect_type %s on object_layout %p\n", aspect_type->aspect_type_name, object_layout);
aspect_layout->aspect_type = aspect_type;
aspect_layout->init_data = output;
aspect_layout->aspect_offset = object_layout->object_size;
object_layout->object_size += aspect_type->aspect_size;
BRICK_DBG("initializing aspect_type %s on object_layout %p, object_size=%d\n", aspect_type->aspect_type_name, object_layout, object_layout->object_size);
}
nr = object_layout->aspect_count++;
object_layout->aspect_layouts[nr] = aspect_layout;
@ -382,7 +382,8 @@ int default_init_object_layout(struct generic_output *output, struct generic_obj
goto done;
}
BRICK_INF("OK, object_layout %s init succeeded.\n", object_type->object_type_name);
BRICK_INF("OK, object_layout %s init succeeded (size = %d).\n", object_type->object_type_name, object_layout->object_size);
done:
return status;
}
@ -482,8 +483,9 @@ ok:
#if 1
{
int count = atomic_read(&object_layout->alloc_count);
if (count >= object_layout->last_count + 1000) {
if (count >= object_layout->last_count + 1000 || ((int)jiffies - object_layout->last_jiffies) >= 30 * HZ) {
object_layout->last_count = count;
object_layout->last_jiffies = jiffies;
BRICK_INF("pool %s/%p/%s alloc=%d free=%d\n", object_layout->object_type->object_type_name, object_layout, object_layout->module_name, count, atomic_read(&object_layout->free_count));
}
}

View File

@ -74,6 +74,7 @@ struct generic_object_type {
int aspect_max; \
int object_size; \
int last_count; \
int last_jiffies; \
atomic_t alloc_count; \
atomic_t free_count; \
spinlock_t free_lock; \

21
mars.h
View File

@ -6,7 +6,8 @@
#include <asm/spinlock.h>
#include <asm/atomic.h>
#define MARS_DELAY msleep(30000)
//#define MARS_DELAY /**/
#define MARS_DELAY msleep(20000)
#define MARS_FATAL "MARS_FATAL " __BASE_FILE__ ": "
#define MARS_ERROR "MARS_ERROR " __BASE_FILE__ ": "
@ -61,9 +62,8 @@ struct mars_ref_object_layout {
loff_t ref_pos; \
int ref_len; \
int ref_may_write; \
void *ref_data; /* preset to NULL for buffered IO */ \
/* maintained by the ref implementation, readable for callers */ \
struct bio *orig_bio; \
void *ref_data; \
int ref_flags; \
int ref_rw; \
/* maintained by the ref implementation, incrementable for \
@ -201,21 +201,6 @@ static const struct generic_aspect_type *BRICK##_aspect_types[BRICK_OBJ_NR] = {
#define CHECK_ATOMIC(atom,minval) \
_CHECK_ATOMIC(atom, <, minval)
static inline void mars_ref_attach_bio(struct mars_ref_object *mref, struct bio *bio)
{
int test;
if (unlikely(mref->orig_bio)) {
MARS_ERR("attaching a bio twice!\n");
}
test = atomic_read(&mref->ref_count);
if (unlikely(test != 0)) {
MARS_ERR("bad ref_count %d\n", test);
}
mref->orig_bio = bio;
mref->ref_pos = -1;
atomic_set(&mref->ref_count, 1);
}
#define CHECK_HEAD_EMPTY(head) \
if (unlikely(!list_empty(head))) { \
INIT_LIST_HEAD(head); \

View File

@ -8,7 +8,6 @@
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/string.h>
#include <linux/bio.h>
#include <linux/blkdev.h>
#include <linux/delay.h>
@ -204,133 +203,6 @@ static inline int get_info(struct buf_brick *brick)
return status;
}
/* Convert from arbitrary/odd kernel address/length to struct page,
* create bio from it, round up/down to full sectors.
* return the length (may be smaller or even larger than requested)
*/
static int make_bio(struct buf_brick *brick, struct bio **_bio, void *data, loff_t pos, int len)
{
unsigned long long sector;
int sector_offset;
int data_offset;
int page_offset;
int page_len;
int bvec_count;
int ilen = len;
int status;
int i;
struct page *page;
struct bio *bio = NULL;
struct block_device *bdev;
status = -EINVAL;
CHECK_PTR(brick, out);
bdev = brick->bdev;
if (unlikely(!bdev)) {
struct request_queue *q;
status = get_info(brick);
if (status < 0)
goto out;
status = -EINVAL;
CHECK_PTR(brick->base_info.backing_file, out);
CHECK_PTR(brick->base_info.backing_file->f_mapping, out);
CHECK_PTR(brick->base_info.backing_file->f_mapping->host, out);
CHECK_PTR(brick->base_info.backing_file->f_mapping->host->i_sb, out);
bdev = brick->base_info.backing_file->f_mapping->host->i_sb->s_bdev;
if (!bdev && S_ISBLK(brick->base_info.backing_file->f_mapping->host->i_mode)) {
bdev = brick->base_info.backing_file->f_mapping->host->i_bdev;
}
CHECK_PTR(bdev, out);
brick->bdev = bdev;
q = bdev_get_queue(bdev);
CHECK_PTR(q, out);
brick->bvec_max = queue_max_hw_sectors(q) >> (PAGE_SHIFT - 9);
}
if (unlikely(ilen <= 0)) {
MARS_ERR("bad bio len %d\n", ilen);
status = -EINVAL;
goto out;
}
sector = pos >> 9; // TODO: make dynamic
sector_offset = pos & ((1 << 9) - 1); // TODO: make dynamic
data_offset = ((unsigned long)data) & ((1 << 9) - 1); // TODO: make dynamic
if (unlikely(sector_offset != data_offset)) {
MARS_ERR("bad alignment: offset %d != %d\n", sector_offset, data_offset);
}
// round down to start of first sector
data -= sector_offset;
ilen += sector_offset;
pos -= sector_offset;
// round up to full sector
ilen = (((ilen - 1) >> 9) + 1) << 9; // TODO: make dynamic
// map onto pages. TODO: allow higher-order pages (performance!)
page_offset = pos & (PAGE_SIZE - 1);
page_len = ilen + page_offset;
bvec_count = (page_len - 1) / PAGE_SIZE + 1;
if (bvec_count > brick->bvec_max)
bvec_count = brick->bvec_max;
bio = bio_alloc(GFP_MARS, bvec_count);
status = -ENOMEM;
if (!bio)
goto out;
status = 0;
for (i = 0; i < bvec_count && ilen > 0; i++) {
int myrest = PAGE_SIZE - page_offset;
int mylen = ilen;
if (mylen > myrest)
mylen = myrest;
page = virt_to_page(data);
if (!page)
goto out;
bio->bi_io_vec[i].bv_page = page;
bio->bi_io_vec[i].bv_len = mylen;
bio->bi_io_vec[i].bv_offset = page_offset;
data += mylen;
ilen -= mylen;
status += mylen;
page_offset = 0;
//MARS_INF("page_offset=%d mylen=%d (new len=%d, new status=%d)\n", page_offset, mylen, ilen, status);
}
if (unlikely(ilen != 0)) {
bio_put(bio);
bio = NULL;
MARS_ERR("computation of bvec_count %d was wrong, diff=%d\n", bvec_count, ilen);
status = -EIO;
goto out;
}
bio->bi_vcnt = i;
bio->bi_idx = 0;
bio->bi_size = status;
bio->bi_sector = sector;
bio->bi_bdev = bdev;
bio->bi_private = NULL; // must be filled in later
bio->bi_end_io = NULL; // must be filled in later
bio->bi_rw = 0; // must be filled in later
// ignore rounding on return
if (status > len)
status = len;
out:
*_bio = bio;
if (status < 0)
MARS_ERR("error %d\n", status);
return status;
}
static inline struct buf_head *_alloc_bf(struct buf_brick *brick)
{
struct buf_head *bf = kzalloc(sizeof(struct buf_head), GFP_MARS);
@ -394,10 +266,6 @@ static int buf_ref_get(struct buf_output *output, struct mars_ref_object *mref)
might_sleep();
if (unlikely(mref->orig_bio)) {
MARS_ERR("illegal: mref has a bio assigend\n");
}
#ifdef PRE_ALLOC
if (unlikely(atomic_read(&brick->alloc_count) < brick->max_count)) {
// grab all memory in one go => avoid memory fragmentation
@ -409,6 +277,14 @@ static int buf_ref_get(struct buf_output *output, struct mars_ref_object *mref)
_CHECK_ATOMIC(&mref->ref_count, !=, 0);
atomic_inc(&mref->ref_count);
/* shortcut in case of unbuffered IO
*/
if (mref->ref_data) {
/* Note: unbuffered IO is later indicated by rfa_bf == NULL
*/
return 0;
}
mref_a = buf_mars_ref_get_aspect(output, mref);
if (unlikely(!mref_a))
goto done;
@ -429,7 +305,7 @@ again:
#if 1
loff_t end_pos = bf->bf_pos + brick->backing_size;
if (mref->ref_pos < bf->bf_pos || mref->ref_pos >= end_pos) {
MARS_ERR("hash value corruption. %lld not in (%lld ... %lld)\n", mref->ref_pos, bf->bf_pos, end_pos);
MARS_ERR("hash corruption. %lld not in (%lld ... %lld)\n", mref->ref_pos, bf->bf_pos, end_pos);
}
#endif
atomic_inc(&brick->hit_count);
@ -495,8 +371,7 @@ again:
new->bf_flags |= MARS_REF_UPTODATE;
}
atomic_set(&new->bf_count, 1);
new->bf_bio_status = 0;
atomic_set(&new->bf_bio_count, 0);
new->bf_error = 0;
//INIT_LIST_HEAD(&new->bf_mref_anchor);
//INIT_LIST_HEAD(&new->bf_lru_head);
INIT_LIST_HEAD(&new->bf_hash_head);
@ -518,7 +393,8 @@ again:
CHECK_ATOMIC(&mref->ref_count, 1);
return mref->ref_len;
//return mref->ref_len;
status = 0;
done:
return status;
@ -561,21 +437,27 @@ static void __bf_put(struct buf_head *bf)
traced_unlock(&brick->brick_lock, flags);
}
static void _buf_ref_put(struct buf_mars_ref_aspect *mref_a)
static void _buf_ref_put(struct buf_output *output, struct buf_mars_ref_aspect *mref_a)
{
struct mars_ref_object *mref = mref_a->object;
struct buf_head *bf;
/* shortcut in case of unbuffered IO
*/
bf = mref_a->rfa_bf;
if (!bf) {
struct buf_brick *brick = output->brick;
GENERIC_INPUT_CALL(brick->inputs[0], mars_ref_put, mref);
return;
}
CHECK_ATOMIC(&mref->ref_count, 1);
if (!atomic_dec_and_test(&mref->ref_count))
return;
bf = mref_a->rfa_bf;
if (bf) {
MARS_DBG("buf_ref_put() mref=%p mref_a=%p bf=%p\n", mref, mref_a, bf);
__bf_put(bf);
}
MARS_DBG("buf_ref_put() mref=%p mref_a=%p bf=%p\n", mref, mref_a, bf);
__bf_put(bf);
buf_free_mars_ref(mref);
}
@ -588,39 +470,15 @@ static void buf_ref_put(struct buf_output *output, struct mars_ref_object *mref)
MARS_FAT("cannot get aspect\n");
return;
}
_buf_ref_put(mref_a);
_buf_ref_put(output, mref_a);
}
static void _buf_endio(struct generic_callback *cb)
{
struct buf_mars_ref_aspect *mref_a = cb->cb_private;
struct mars_ref_object *mref = mref_a->object;
int error = cb->cb_error;
struct bio *bio = mref->orig_bio;
static void _buf_endio(struct generic_callback *cb);
MARS_DBG("_buf_endio() mref=%p bio=%p\n", mref, bio);
if (bio) {
if (error < 0) {
MARS_ERR("_buf_endio() error=%d bi_size=%d\n", error, bio->bi_size);
}
if (error > 0)
error = 0;
bio_endio(bio, error);
bio_put(bio);
} else {
//...
}
}
static void _buf_bio_callback(struct bio *bio, int code);
static int _buf_make_bios(struct buf_brick *brick, struct buf_head *bf, void *start_data, loff_t start_pos, int start_len, int rw)
static int _buf_make_io(struct buf_brick *brick, struct buf_head *bf, void *start_data, loff_t start_pos, int start_len, int rw)
{
struct buf_input *input;
LIST_HEAD(tmp);
int status = EINVAL;
int iters = 0;
#if 1
loff_t bf_end = bf->bf_pos + brick->backing_size;
loff_t end_pos;
@ -642,11 +500,14 @@ static int _buf_make_bios(struct buf_brick *brick, struct buf_head *bf, void *st
goto done;
}
#endif
atomic_set(&bf->bf_io_count, 0);
status = -ENOMEM;
input = brick->inputs[0];
while (start_len > 0) {
struct mars_ref_object *mref;
struct buf_mars_ref_aspect *mref_a;
struct bio *bio = NULL;
int len;
mref = buf_alloc_mars_ref(brick->outputs[0], &brick->mref_object_layout);
@ -659,136 +520,83 @@ static int _buf_make_bios(struct buf_brick *brick, struct buf_head *bf, void *st
break;
}
list_add(&mref_a->tmp_head, &tmp);
mref_a->rfa_bf = bf;
mref_a->cb.cb_fn = _buf_endio;
mref_a->cb.cb_private = mref_a;
mref_a->cb.cb_error = 0;
mref_a->cb.cb_prev = NULL;
len = make_bio(brick, &bio, start_data, start_pos, start_len);
if (unlikely(len < 0)) {
status = len;
break;
}
if (unlikely(len == 0 || !bio)) {
status = -EIO;
//buf_free_mars_ref(mref);
break;
}
bio->bi_private = mref_a;
bio->bi_end_io = _buf_bio_callback;
bio->bi_rw = rw;
mref->ref_cb = &mref_a->cb;
mars_ref_attach_bio(mref, bio);
mref->ref_pos = start_pos;
mref->ref_len = start_len;
mref->ref_may_write = rw;
mref->ref_data = start_data;
start_data += len;
start_pos += len;
start_len -= len;
iters++;
}
if (likely(!start_len))
status = 0;
#if 1
else {
MARS_ERR("start_len %d != 0 (error %d)\n", start_len, status);
}
if (iters != 1) {
MARS_INF("start_pos=%lld start_len=%d iters=%d, status=%d\n", start_pos, start_len, iters, status);
}
iters = 0;
#endif
input = brick->inputs[0];
while (!list_empty(&tmp)) {
struct mars_ref_object *mref;
struct buf_mars_ref_aspect *mref_a;
struct generic_callback *cb;
mref_a = container_of(tmp.next, struct buf_mars_ref_aspect, tmp_head);
mref = mref_a->object;
list_del_init(&mref_a->tmp_head);
iters++;
cb = mref->ref_cb;
if (status < 0) { // clean up
MARS_ERR("reporting error %d\n", status);
cb->cb_error = status;
cb->cb_fn(cb);
#if 0
if (mref->orig_bio)
bio_put(mref->orig_bio);
#endif
buf_free_mars_ref(mref);
continue;
status = GENERIC_INPUT_CALL(input, mars_ref_get, mref);
if (status < 0) {
MARS_ERR();
goto done;
}
/* Remember the number of bios we are submitting.
/* Remember number of fired-off mrefs
*/
CHECK_ATOMIC(&bf->bf_bio_count, 0);
atomic_inc(&bf->bf_bio_count);
MARS_DBG("starting buf IO mref=%p bio=%p bf=%p bf_count=%d bf_bio_count=%d\n", mref, mref->orig_bio, bf, atomic_read(&bf->bf_count), atomic_read(&bf->bf_bio_count));
atomic_inc(&bf->bf_io_count);
len = mref->ref_len;
#ifndef FAKE_IO
GENERIC_INPUT_CALL(input, mars_ref_io, mref, rw);
#else
// fake IO for testing
cb->cb_error = status;
cb->cb_fn(cb);
#if 0
if (mref->orig_bio)
bio_put(mref->orig_bio);
mref_a->cb.cb_error = status;
mref_a->cb.cb_fn(&mref_a->cb);
#endif
buf_free_mars_ref(mref);
#endif
}
GENERIC_INPUT_CALL(input, mars_ref_put, mref);
start_data += len;
start_pos += len;
start_len -= len;
#if 1
if (iters != 1) {
MARS_INF("start_pos=%lld start_len=%d iters=%d, status=%d\n", start_pos, start_len, iters, status);
}
iters = 0;
if (start_len > 0)
MARS_ERR("cannot submit request in one go, rest=%d\n", start_len);
#endif
}
done:
return status;
}
/* This is called from the bio layer.
*/
static void _buf_bio_callback(struct bio *bio, int code)
static void _buf_endio(struct generic_callback *cb)
{
struct buf_mars_ref_aspect *mref_a;
struct buf_mars_ref_aspect *bf_mref_a = cb->cb_private;
struct mars_ref_object *bf_mref;
struct buf_head *bf;
struct buf_brick *brick;
LIST_HEAD(tmp);
int old_flags;
unsigned long flags;
void *start_data = NULL;
loff_t start_pos = 0;
int start_len = 0;
int old_flags;
unsigned long flags;
LIST_HEAD(tmp);
int error = cb->cb_error;
#if 1
int count = 0;
#endif
mref_a = bio->bi_private;
bf = mref_a->rfa_bf;
MARS_DBG("_buf_bio_callback() mref=%p bio=%p bf=%p bf_count=%d bf_bio_count=%d code=%d\n", mref_a->object, bio, bf, atomic_read(&bf->bf_count), atomic_read(&bf->bf_bio_count), code);
if (unlikely(code < 0)) {
MARS_ERR("BIO ERROR %d (old=%d)\n", code, bf->bf_bio_status);
// this can race, but we don't worry about the exact error code
bf->bf_bio_status = code;
}
CHECK_ATOMIC(&bf->bf_bio_count, 1);
if (!atomic_dec_and_test(&bf->bf_bio_count))
return;
MARS_DBG("_buf_bio_callback() ZERO_COUNT mref=%p bio=%p bf=%p code=%d\n", mref_a->object, bio, bf, code);
CHECK_PTR(bf_mref_a, err);
bf_mref = bf_mref_a->object;
MARS_DBG("_buf_endio() bf_mref_a=%p bf_mref=%p\n", bf_mref_a, bf_mref);
CHECK_PTR(bf_mref, err);
bf = bf_mref_a->rfa_bf;
CHECK_PTR(bf, err);
brick = bf->bf_brick;
CHECK_PTR(brick, err);
if (error < 0)
bf->bf_error = error;
// wait until all IO on this bf is completed.
if (!atomic_dec_and_test(&bf->bf_io_count))
return;
// get an extra reference, to avoid freeing bf underneath during callbacks
CHECK_ATOMIC(&bf->bf_count, 1);
@ -798,7 +606,7 @@ static void _buf_bio_callback(struct bio *bio, int code)
// update flags. this must be done before the callbacks.
old_flags = bf->bf_flags;
if (!bf->bf_bio_status && (old_flags & MARS_REF_READING)) {
if (bf->bf_error >= 0 && (old_flags & MARS_REF_READING)) {
bf->bf_flags |= MARS_REF_UPTODATE;
}
// clear the flags, callbacks must not see them. may be re-enabled later.
@ -823,6 +631,7 @@ static void _buf_bio_callback(struct bio *bio, int code)
while (!list_empty(&bf->bf_postpone_anchor)) {
struct buf_mars_ref_aspect *mref_a = container_of(bf->bf_postpone_anchor.next, struct buf_mars_ref_aspect, rfa_pending_head);
struct mars_ref_object *mref = mref_a->object;
if (mref_a->rfa_bf != bf) {
MARS_ERR("bad pointers %p != %p\n", mref_a->rfa_bf, bf);
}
@ -836,7 +645,7 @@ static void _buf_bio_callback(struct bio *bio, int code)
// re-enable flags
bf->bf_flags |= MARS_REF_WRITING;
bf->bf_bio_status = 0;
bf->bf_error = 0;
if (!start_len) {
// first time: only flush the affected area
@ -888,21 +697,23 @@ static void _buf_bio_callback(struct bio *bio, int code)
// update infos for callbacks, they may inspect it.
mref->ref_flags = bf->bf_flags;
cb->cb_error = bf->bf_bio_status;
cb->cb_error = bf->bf_error;
atomic_dec(&brick->nr_io_pending);
cb->cb_fn(cb);
_buf_ref_put(mref_a);
_buf_ref_put(brick->outputs[0], mref_a);
}
if (start_len) {
MARS_DBG("ATTENTION %d\n", start_len);
_buf_make_bios(brick, bf, start_data, start_pos, start_len, WRITE);
MARS_DBG("ATTENTION restart %d\n", start_len);
_buf_make_io(brick, bf, start_data, start_pos, start_len, WRITE);
}
// drop the extra reference from above
__bf_put(bf);
err:;
}
static void buf_ref_io(struct buf_output *output, struct mars_ref_object *mref, int rw)
@ -927,26 +738,26 @@ static void buf_ref_io(struct buf_output *output, struct mars_ref_object *mref,
MARS_ERR("internal problem: mref aspect does not work\n");
goto fatal;
}
/* shortcut in case of unbuffered IO
*/
bf = mref_a->rfa_bf;
if (!bf) {
GENERIC_INPUT_CALL(brick->inputs[0], mars_ref_io, mref, rw);
return;
}
/* Grab an extra reference.
* This will be released later in _buf_bio_callback() after
* This will be released later in _bf_endio() after
* calling the callbacks.
*/
CHECK_ATOMIC(&mref->ref_count, 1);
atomic_inc(&mref->ref_count);
bf = mref_a->rfa_bf;
if (unlikely(!bf)) {
MARS_ERR("internal problem: forgotten bf\n");
goto callback;
}
CHECK_ATOMIC(&bf->bf_count, 1);
if (rw != READ) {
loff_t end;
if (unlikely(mref->ref_may_write == READ)) {
MARS_ERR("sorry, forgotten to set ref_may_write\n");
MARS_ERR("sorry, you have forgotten to set ref_may_write\n");
goto callback;
}
if (unlikely(!(bf->bf_flags & MARS_REF_UPTODATE))) {
@ -986,7 +797,7 @@ static void buf_ref_io(struct buf_output *output, struct mars_ref_object *mref,
if (!(bf->bf_flags & MARS_REF_WRITING)) {
// by definition, a writeout buffer is always uptodate
bf->bf_flags |= (MARS_REF_WRITING | MARS_REF_UPTODATE);
bf->bf_bio_status = 0;
bf->bf_error = 0;
#if 1
start_data = mref->ref_data;
start_pos = mref->ref_pos;
@ -1009,7 +820,7 @@ static void buf_ref_io(struct buf_output *output, struct mars_ref_object *mref,
}
if (!(bf->bf_flags & MARS_REF_READING)) {
bf->bf_flags |= MARS_REF_READING;
bf->bf_bio_status = 0;
bf->bf_error = 0;
// always read the whole buffer.
start_data = (void*)((unsigned long)mref->ref_data & ~(unsigned long)(brick->backing_size - 1));
@ -1021,7 +832,7 @@ static void buf_ref_io(struct buf_output *output, struct mars_ref_object *mref,
}
mref->ref_flags = bf->bf_flags;
mref->ref_cb->cb_error = bf->bf_bio_status;
mref->ref_cb->cb_error = bf->bf_error;
if (likely(delay)) {
atomic_inc(&brick->nr_io_pending);
@ -1035,10 +846,10 @@ static void buf_ref_io(struct buf_output *output, struct mars_ref_object *mref,
goto no_callback;
}
status = _buf_make_bios(brick, bf, start_data, start_pos, start_len, rw);
status = _buf_make_io(brick, bf, start_data, start_pos, start_len, rw);
if (likely(status >= 0)) {
/* No immediate callback, this time.
* Callbacks will be called later from _buf_bio_callback().
* Callbacks will be called later from _bf_endio().
*/
goto no_callback;
}
@ -1049,7 +860,7 @@ static void buf_ref_io(struct buf_output *output, struct mars_ref_object *mref,
already_done:
mref->ref_flags = bf->bf_flags;
status = bf->bf_bio_status;
status = bf->bf_error;
traced_unlock(&bf->bf_lock, flags);
@ -1096,9 +907,9 @@ MARS_MAKE_STATICS(buf);
static int buf_brick_construct(struct buf_brick *brick)
{
int i;
brick->backing_order = 5; // TODO: make this configurable
brick->backing_order = 0;
brick->backing_size = PAGE_SIZE << brick->backing_order;
brick->max_count = 32; // TODO: make this configurable
brick->max_count = 32;
atomic_set(&brick->alloc_count, 0);
atomic_set(&brick->hashed_count, 0);
atomic_set(&brick->lru_count, 0);

View File

@ -36,17 +36,13 @@ struct buf_brick {
atomic_t nr_io_pending;
atomic_t nr_collisions;
struct generic_object_layout mref_object_layout;
struct mars_info base_info;
// lists for caching
struct list_head free_anchor; // members are not hashed
struct list_head lru_anchor; // members are hashed and not in use
struct cache_anchor cache_anchors[MARS_BUF_HASH_MAX]; // hash table
// for creation of bios
struct mars_info base_info;
struct block_device *bdev;
int bvec_max;
// statistics
unsigned long last_jiffies;
atomic_t hit_count;
@ -71,9 +67,9 @@ struct buf_head {
loff_t bf_pos;
loff_t bf_base_index;
int bf_flags;
int bf_error;
atomic_t bf_count;
int bf_bio_status;
atomic_t bf_bio_count;
atomic_t bf_io_count;
// lists for caching
//struct list_head bf_mref_anchor; // all current mref members
struct list_head bf_lru_head;

1215
mars_buf_old.c Normal file

File diff suppressed because it is too large Load Diff

86
mars_buf_old.h Normal file
View File

@ -0,0 +1,86 @@
// (c) 2010 Thomas Schoebel-Theuer / 1&1 Internet AG
#ifndef MARS_BUF_H
#define MARS_BUF_H
#include <linux/list.h>
#include <asm/atomic.h>
//#define MARS_BUF_HASH_MAX 512
#define MARS_BUF_HASH_MAX 2048
struct buf_mars_ref_aspect {
GENERIC_ASPECT(mars_ref);
struct buf_head *rfa_bf;
struct list_head rfa_pending_head;
struct list_head tmp_head;
struct generic_callback cb;
};
struct cache_anchor {
spinlock_t hash_lock;
struct list_head hash_anchor;
};
struct buf_brick {
MARS_BRICK(buf);
/* brick parameters */
int backing_order;
int backing_size;
int max_count;
/* internals */
spinlock_t brick_lock;
atomic_t alloc_count;
atomic_t hashed_count;
atomic_t lru_count;
atomic_t nr_io_pending;
atomic_t nr_collisions;
struct generic_object_layout mref_object_layout;
// lists for caching
struct list_head free_anchor; // members are not hashed
struct list_head lru_anchor; // members are hashed and not in use
struct cache_anchor cache_anchors[MARS_BUF_HASH_MAX]; // hash table
// for creation of bios
struct mars_info base_info;
struct block_device *bdev;
int bvec_max;
// statistics
unsigned long last_jiffies;
atomic_t hit_count;
atomic_t miss_count;
atomic_t io_count;
};
struct buf_input {
MARS_INPUT(buf);
};
struct buf_output {
MARS_OUTPUT(buf);
};
MARS_TYPES(buf);
struct buf_head {
void *bf_data;
spinlock_t bf_lock;
struct buf_brick *bf_brick;
loff_t bf_pos;
loff_t bf_base_index;
int bf_flags;
atomic_t bf_count;
int bf_bio_status;
atomic_t bf_bio_count;
// lists for caching
//struct list_head bf_mref_anchor; // all current mref members
struct list_head bf_lru_head;
struct list_head bf_hash_head;
// lists for IO
struct list_head bf_io_pending_anchor;
struct list_head bf_postpone_anchor;
};
#endif

498
mars_device_aio.c Normal file
View File

@ -0,0 +1,498 @@
// (c) 2010 Thomas Schoebel-Theuer / 1&1 Internet AG
//#define BRICK_DEBUGGING
//#define MARS_DEBUGGING
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/string.h>
#include <linux/list.h>
#include <linux/types.h>
#include <linux/blkdev.h>
#include <linux/kthread.h>
#include <linux/spinlock.h>
#include <linux/wait.h>
#include <linux/mmu_context.h>
#include <linux/file.h>
#include "mars.h"
#define MARS_MAX_AIO 1024
#define MARS_MAX_AIO_READ 32
///////////////////////// own type definitions ////////////////////////
#include "mars_device_aio.h"
////////////////// own brick / input / output operations //////////////////
static int device_aio_ref_get(struct device_aio_output *output, struct mars_ref_object *mref)
{
_CHECK_ATOMIC(&mref->ref_count, !=, 0);
/* Buffered IO is not implemented.
* Use an intermediate buf instance if you need it.
*/
if (!mref->ref_data)
return -ENOSYS;
atomic_inc(&mref->ref_count);
return 0;
}
static void device_aio_ref_put(struct device_aio_output *output, struct mars_ref_object *mref)
{
CHECK_ATOMIC(&mref->ref_count, 1);
if (!atomic_dec_and_test(&mref->ref_count))
return;
device_aio_free_mars_ref(mref);
}
static void device_aio_ref_io(struct device_aio_output *output, struct mars_ref_object *mref, int rw)
{
struct aio_threadinfo *tinfo = &output->tinfo[0];
struct generic_callback *cb = mref->ref_cb;
struct device_aio_mars_ref_aspect *mref_a;
unsigned long flags;
atomic_inc(&mref->ref_count);
if (unlikely(!output->filp)) {
cb->cb_error = -EINVAL;
goto done;
}
MARS_INF("IO rw=%d pos=%lld len=%d data=%p\n", mref->ref_rw, mref->ref_pos, mref->ref_len, mref->ref_data);
mref_a = device_aio_mars_ref_get_aspect(output, mref);
traced_lock(&tinfo->lock, flags);
list_add_tail(&mref_a->io_head, &tinfo->mref_list);
traced_unlock(&tinfo->lock, flags);
wake_up(&tinfo->event);
return;
done:
if (cb->cb_error < 0)
MARS_ERR("IO error %d\n", cb->cb_error);
cb->cb_fn(cb);
device_aio_ref_put(output, mref);
}
static int device_aio_submit(struct device_aio_output *output, struct device_aio_mars_ref_aspect *mref_a, bool use_fdsync)
{
struct mars_ref_object *mref = mref_a->object;
mm_segment_t oldfs;
int res;
struct iocb iocb = {
.aio_data = (__u64)mref_a,
.aio_lio_opcode = use_fdsync ? IOCB_CMD_FDSYNC : (mref->ref_rw == WRITE ? IOCB_CMD_PWRITE : IOCB_CMD_PREAD),
.aio_fildes = output->fd,
.aio_buf = (unsigned long)mref->ref_data,
.aio_nbytes = mref->ref_len,
.aio_offset = mref->ref_pos,
};
struct iocb *iocbp = &iocb;
oldfs = get_fs();
set_fs(get_ds());
res = sys_io_submit(output->ctxp, 1, &iocbp);
set_fs(oldfs);
if (res < 0 && res != -EAGAIN)
MARS_ERR("error = %d\n", res);
return res;
}
static int device_aio_submit_thread(void *data)
{
struct aio_threadinfo *tinfo = data;
struct device_aio_output *output = tinfo->output;
int err;
/* TODO: this is provisionary. We only need it for sys_io_submit().
* The latter should be accompanied by a future vfs_submit() or
* do_sumbmit() which currently does not exist :(
* FIXME: corresponding cleanup NYI
*/
err = get_unused_fd();
MARS_INF("fd = %d\n", err);
if (unlikely(err < 0))
return err;
output->fd = err;
fd_install(err, output->filp);
MARS_INF("kthread has started.\n");
//set_user_nice(current, -20);
MARS_INF("old mm = %p\n", current->mm);
use_mm(tinfo->mm);
MARS_INF("new mm = %p\n", current->mm);
if (!current->mm)
return 0;
while (!kthread_should_stop()) {
struct list_head *tmp = NULL;
struct device_aio_mars_ref_aspect *mref_a;
unsigned long flags;
int err;
wait_event_interruptible_timeout(
tinfo->event,
!list_empty(&tinfo->mref_list) || kthread_should_stop(),
HZ);
traced_lock(&tinfo->lock, flags);
if (!list_empty(&tinfo->mref_list)) {
tmp = tinfo->mref_list.next;
list_del_init(tmp);
}
traced_unlock(&tinfo->lock, flags);
if (!tmp)
continue;
mref_a = container_of(tmp, struct device_aio_mars_ref_aspect, io_head);
err = device_aio_submit(output, mref_a, false);
if (err == -EAGAIN) {
traced_lock(&tinfo->lock, flags);
list_add(&mref_a->io_head, &tinfo->mref_list);
traced_unlock(&tinfo->lock, flags);
msleep(10); // PROVISIONARY
continue;
}
#if 0
if (false) {
struct generic_callback *cb = mref_a->object->ref_cb;
cb->cb_error = err;
if (err < 0)
MARS_ERR("IO error %d\n", err);
cb->cb_fn(cb);
device_aio_ref_put(output, mref_a->object);
}
#endif
}
unuse_mm(tinfo->mm);
MARS_INF("kthread has stopped.\n");
return 0;
}
static int device_aio_event_thread(void *data)
{
struct aio_threadinfo *tinfo = data;
struct device_aio_output *output = tinfo->output;
struct aio_threadinfo *other = &output->tinfo[2];
MARS_INF("kthread has started.\n");
//set_user_nice(current, -20);
MARS_INF("old mm = %p\n", current->mm);
use_mm(tinfo->mm);
MARS_INF("new mm = %p\n", current->mm);
if (!current->mm)
return 0;
while (!kthread_should_stop()) {
mm_segment_t oldfs;
int count;
int bounced;
int i;
struct timespec timeout = {
.tv_sec = 30,
};
struct io_event events[MARS_MAX_AIO_READ];
oldfs = get_fs();
set_fs(get_ds());
count = sys_io_getevents(output->ctxp, 1, MARS_MAX_AIO_READ, events, &timeout);
set_fs(oldfs);
//MARS_INF("count = %d\n", count);
bounced = 0;
for (i = 0; i < count; i++) {
struct device_aio_mars_ref_aspect *mref_a = (void*)events[i].data;
struct generic_callback *cb = mref_a->object->ref_cb;
int err = events[i].res;
if (output->o_fdsync
&& err >= 0
&& mref_a->object->ref_rw == WRITE
&& !mref_a->resubmit++) {
if (!output->filp->f_op->aio_fsync) {
unsigned long flags;
traced_lock(&other->lock, flags);
list_add(&mref_a->io_head, &other->mref_list);
traced_unlock(&other->lock, flags);
bounced++;
continue;
}
err = device_aio_submit(output, mref_a, true);
if (likely(err >= 0))
continue;
}
cb->cb_error = err;
if (err < 0)
MARS_ERR("IO error %d\n", err);
cb->cb_fn(cb);
device_aio_ref_put(output, mref_a->object);
}
if (bounced)
wake_up(&other->event);
}
unuse_mm(tinfo->mm);
MARS_INF("kthread has stopped.\n");
return 0;
}
/* Workaround for non-implemented aio_fsync()
*/
static int device_aio_sync_thread(void *data)
{
struct aio_threadinfo *tinfo = data;
struct device_aio_output *output = tinfo->output;
struct file *file = output->filp;
MARS_INF("kthread has started.\n");
//set_user_nice(current, -20);
while (!kthread_should_stop()) {
LIST_HEAD(tmp_list);
unsigned long flags;
int err;
wait_event_interruptible_timeout(
tinfo->event,
!list_empty(&tinfo->mref_list) || kthread_should_stop(),
HZ);
traced_lock(&tinfo->lock, flags);
if (!list_empty(&tinfo->mref_list)) {
// move over the whole list
list_replace_init(&tinfo->mref_list, &tmp_list);
}
traced_unlock(&tinfo->lock, flags);
if (list_empty(&tmp_list))
continue;
err = vfs_fsync(file, file->f_path.dentry, 1);
if (err < 0)
MARS_ERR("FDSYNC error %d\n", err);
/* Signal completion for the whole list.
* No locking needed, it's on the stack.
*/
while (!list_empty(&tmp_list)) {
struct list_head *tmp = tmp_list.next;
struct device_aio_mars_ref_aspect *mref_a = container_of(tmp, struct device_aio_mars_ref_aspect, io_head);
struct generic_callback *cb = mref_a->object->ref_cb;
list_del_init(tmp);
cb->cb_error = err;
cb->cb_fn(cb);
device_aio_ref_put(output, mref_a->object);
}
}
MARS_INF("kthread has stopped.\n");
return 0;
}
static int device_aio_get_info(struct device_aio_output *output, struct mars_info *info)
{
struct file *file = output->filp;
info->current_size = i_size_read(file->f_mapping->host);
info->backing_file = file;
return 0;
}
//////////////// object / aspect constructors / destructors ///////////////
static int device_aio_mars_ref_aspect_init_fn(struct generic_aspect *_ini, void *_init_data)
{
struct device_aio_mars_ref_aspect *ini = (void*)_ini;
INIT_LIST_HEAD(&ini->io_head);
return 0;
}
static void device_aio_mars_ref_aspect_exit_fn(struct generic_aspect *_ini, void *_init_data)
{
struct device_aio_mars_ref_aspect *ini = (void*)_ini;
(void)ini;
}
MARS_MAKE_STATICS(device_aio);
////////////////////// brick constructors / destructors ////////////////////
static int device_aio_brick_construct(struct device_aio_brick *brick)
{
return 0;
}
static int device_aio_switch(struct device_aio_brick *brick, bool state)
{
static int index = 0;
struct device_aio_output *output = brick->outputs[0];
char *path = output->output_name;
int flags = O_CREAT | O_RDWR | O_LARGEFILE;
int prot = 0600;
mm_segment_t oldfs;
int i;
int err = 0;
if (output->o_direct) {
flags |= O_DIRECT;
MARS_INF("using O_DIRECT on %s\n", path);
}
if (!state)
goto cleanup;
oldfs = get_fs();
set_fs(get_ds());
output->filp = filp_open(path, flags, prot);
set_fs(oldfs);
if (unlikely(IS_ERR(output->filp))) {
err = PTR_ERR(output->filp);
MARS_ERR("can't open file '%s' status=%d\n", path, err);
output->filp = NULL;
return err;
}
if (!output->ctxp) {
MARS_INF("mm = %p\n", current->mm);
oldfs = get_fs();
set_fs(get_ds());
err = sys_io_setup(MARS_MAX_AIO, &output->ctxp);
set_fs(oldfs);
if (unlikely(err))
goto err;
}
for (i = 0; i < 3; i++) {
static int (*fn[])(void*) = {
device_aio_submit_thread,
device_aio_event_thread,
device_aio_sync_thread,
};
struct aio_threadinfo *tinfo = &output->tinfo[i];
INIT_LIST_HEAD(&tinfo->mref_list);
tinfo->output = output;
tinfo->mm = current->mm;
spin_lock_init(&tinfo->lock);
init_waitqueue_head(&tinfo->event);
tinfo->thread = kthread_create(fn[i], tinfo, "mars_aio%d", index++);
if (IS_ERR(tinfo->thread)) {
err = PTR_ERR(tinfo->thread);
MARS_ERR("cannot create thread\n");
tinfo->thread = NULL;
goto err;
}
wake_up_process(tinfo->thread);
}
MARS_INF("opened file '%s'\n", path);
return 0;
err:
MARS_ERR("status = %d\n", err);
cleanup:
for (i = 0; i < 2; i++) {
struct aio_threadinfo *tinfo = &output->tinfo[i];
if (tinfo->thread) {
kthread_stop(tinfo->thread);
// FIXME: wait for termination
tinfo->thread = NULL;
}
}
if (output->ctxp) {
//...
}
if (output->filp) {
filp_close(output->filp, NULL);
output->filp = NULL;
}
return err;
}
static int device_aio_output_construct(struct device_aio_output *output)
{
return 0;
}
static int device_aio_output_destruct(struct device_aio_output *output)
{
return device_aio_switch(output->brick, false);
}
///////////////////////// static structs ////////////////////////
static struct device_aio_brick_ops device_aio_brick_ops = {
.brick_switch = device_aio_switch,
};
static struct device_aio_output_ops device_aio_output_ops = {
.make_object_layout = device_aio_make_object_layout,
.mars_ref_get = device_aio_ref_get,
.mars_ref_put = device_aio_ref_put,
.mars_ref_io = device_aio_ref_io,
.mars_get_info = device_aio_get_info,
};
const struct device_aio_output_type device_aio_output_type = {
.type_name = "device_aio_output",
.output_size = sizeof(struct device_aio_output),
.master_ops = &device_aio_output_ops,
.output_construct = &device_aio_output_construct,
.output_destruct = &device_aio_output_destruct,
.aspect_types = device_aio_aspect_types,
.layout_code = {
[BRICK_OBJ_MARS_REF] = LAYOUT_NONE,
}
};
static const struct device_aio_output_type *device_aio_output_types[] = {
&device_aio_output_type,
};
const struct device_aio_brick_type device_aio_brick_type = {
.type_name = "device_aio_brick",
.brick_size = sizeof(struct device_aio_brick),
.max_inputs = 0,
.max_outputs = 1,
.master_ops = &device_aio_brick_ops,
.default_output_types = device_aio_output_types,
.brick_construct = &device_aio_brick_construct,
};
EXPORT_SYMBOL_GPL(device_aio_brick_type);
////////////////// module init stuff /////////////////////////
static int __init init_device_aio(void)
{
MARS_INF("init_device_aio()\n");
return device_aio_register_brick_type();
}
static void __exit exit_device_aio(void)
{
MARS_INF("exit_device_aio()\n");
device_aio_unregister_brick_type();
}
MODULE_DESCRIPTION("MARS device_aio brick");
MODULE_AUTHOR("Thomas Schoebel-Theuer <tst@1und1.de>");
MODULE_LICENSE("GPL");
module_init(init_device_aio);
module_exit(exit_device_aio);

45
mars_device_aio.h Normal file
View File

@ -0,0 +1,45 @@
// (c) 2010 Thomas Schoebel-Theuer / 1&1 Internet AG
#ifndef MARS_DEVICE_AIO_H
#define MARS_DEVICE_AIO_H
#include <linux/aio.h>
#include <linux/syscalls.h>
struct device_aio_mars_ref_aspect {
GENERIC_ASPECT(mars_ref);
struct list_head io_head;
int resubmit;
};
struct device_aio_brick {
MARS_BRICK(device_aio);
};
struct device_aio_input {
MARS_INPUT(device_aio);
};
struct aio_threadinfo {
struct list_head mref_list;
struct device_aio_output *output;
struct task_struct *thread;
struct mm_struct *mm;
wait_queue_head_t event;
spinlock_t lock;
};
struct device_aio_output {
MARS_OUTPUT(device_aio);
// parameters
bool o_direct;
bool o_fdsync;
// private
struct file *filp;
int fd; // FIXME: remove this!
struct aio_threadinfo tinfo[3];
aio_context_t ctxp;
};
MARS_TYPES(device_aio);
#endif

View File

@ -23,40 +23,64 @@
////////////////// own brick / input / output operations //////////////////
static int device_sio_ref_get(struct device_sio_output *output, struct mars_ref_object *mref)
{
_CHECK_ATOMIC(&mref->ref_count, !=, 0);
/* Buffered IO is not implemented.
* Use an intermediate buf instance if you need it.
*/
if (!mref->ref_data)
return -ENOSYS;
atomic_inc(&mref->ref_count);
return 0;
}
static void device_sio_ref_put(struct device_sio_output *output, struct mars_ref_object *mref)
{
CHECK_ATOMIC(&mref->ref_count, 1);
if (!atomic_dec_and_test(&mref->ref_count))
return;
device_sio_free_mars_ref(mref);
}
// some code borrowed from the loopback driver
static int transfer_none(int cmd,
struct page *raw_page, unsigned raw_off,
struct page *loop_page, unsigned loop_off,
//struct page *loop_page, unsigned loop_off,
void *loop_buf,
int size)
{
char *raw_buf = kmap_atomic(raw_page, KM_USER0) + raw_off;
char *loop_buf = kmap_atomic(loop_page, KM_USER1) + loop_off;
#if 1
void *raw_buf = kmap_atomic(raw_page, KM_USER0) + raw_off;
//void *loop_buf = kmap_atomic(loop_page, KM_USER1) + loop_off;
if (unlikely(!raw_buf || !loop_buf)) {
MARS_ERR("transfer NULL: %p %p\n", raw_buf, loop_buf);
return -EFAULT;
}
#if 1
if (cmd == READ)
memcpy(loop_buf, raw_buf, size);
else
memcpy(raw_buf, loop_buf, size);
#endif
kunmap_atomic(raw_buf, KM_USER0);
kunmap_atomic(loop_buf, KM_USER1);
//kunmap_atomic(loop_buf, KM_USER1);
cond_resched();
#endif
return 0;
}
static void write_aops(struct device_sio_output *output, struct mars_ref_object *mref)
{
struct bio *bio = mref->orig_bio;
loff_t pos = ((loff_t)bio->bi_sector << 9);
struct file *file = output->filp;
loff_t pos = mref->ref_pos;
void *data = mref->ref_data;
unsigned offset;
int len;
struct address_space *mapping;
struct bio_vec *bvec;
int i;
int ret = 0;
if (unlikely(!file)) {
@ -65,66 +89,55 @@ static void write_aops(struct device_sio_output *output, struct mars_ref_object
}
mapping = file->f_mapping;
MARS_DBG("write_aops pos=%llu len=%d\n", pos, bio->bi_size);
mutex_lock(&mapping->host->i_mutex);
bio_for_each_segment(bvec, bio, i) {
//pgoff_t index;
unsigned offset, bv_offs;
int len;
offset = pos & ((pgoff_t)PAGE_CACHE_SIZE - 1);
len = mref->ref_len;
while (len > 0) {
int transfer_result;
unsigned size, copied;
struct page *page = NULL;
void *fsdata;
//index = pos >> PAGE_CACHE_SHIFT;
offset = pos & ((pgoff_t)PAGE_CACHE_SIZE - 1);
bv_offs = bvec->bv_offset;
len = bvec->bv_len;
size = PAGE_CACHE_SIZE - offset;
if (size > len)
size = len;
while (len > 0) {
int transfer_result;
unsigned size, copied;
struct page *page;
void *fsdata;
size = PAGE_CACHE_SIZE - offset;
if (size > len)
size = len;
ret = pagecache_write_begin(file, mapping, pos, size, 0,
&page, &fsdata);
if (ret) {
MARS_ERR("cannot start pagecache_write_begin() error=%d\n", ret);
if (ret >= 0)
ret = -EIO;
goto fail;
}
//file_update_time(file);
transfer_result = transfer_none(WRITE, page, offset, bvec->bv_page, bv_offs, size);
copied = size;
if (transfer_result) {
MARS_ERR("transfer error %d\n", transfer_result);
copied = 0;
}
ret = pagecache_write_end(file, mapping, pos, size, copied,
page, fsdata);
if (ret < 0 || ret != copied || transfer_result) {
MARS_ERR("write error %d\n", ret);
if (ret >= 0)
ret = -EIO;
goto fail;
}
bv_offs += copied;
len -= copied;
offset = 0;
//index++;
pos += copied;
ret = pagecache_write_begin(file, mapping, pos, size, 0,
&page, &fsdata);
if (ret) {
MARS_ERR("cannot start pagecache_write_begin() error=%d\n", ret);
if (ret >= 0)
ret = -EIO;
goto fail;
}
ret = 0;
//file_update_time(file);
transfer_result = transfer_none(WRITE, page, offset, data, size);
copied = size;
if (transfer_result) {
MARS_ERR("transfer error %d\n", transfer_result);
copied = 0;
}
ret = pagecache_write_end(file, mapping, pos, size, copied,
page, fsdata);
if (ret < 0 || ret != copied || transfer_result) {
MARS_ERR("write error %d\n", ret);
if (ret >= 0)
ret = -EIO;
goto fail;
}
len -= copied;
offset = 0;
pos += copied;
data += copied;
}
ret = 0;
fail:
mutex_unlock(&mapping->host->i_mutex);
@ -139,8 +152,8 @@ fail:
struct cookie_data {
struct device_sio_output *output;
struct mars_ref_object *mref;
struct bio_vec *bvec;
unsigned int offset;
void *data;
int len;
};
static int
@ -160,18 +173,15 @@ device_sio_splice_actor(struct pipe_inode_info *pipe,
IV = ((sector_t) page->index << (PAGE_CACHE_SHIFT - 9)) +
(buf->offset >> 9);
size = sd->len;
if (size > p->bvec->bv_len)
size = p->bvec->bv_len;
if (size > p->len)
size = p->len;
if (transfer_none(READ, page, buf->offset, p->bvec->bv_page, p->offset, size)) {
MARS_ERR("transfer error block %ld\n", p->bvec->bv_page->index);
if (transfer_none(READ, page, buf->offset, p->data, size)) {
MARS_ERR("transfer error\n");
size = -EINVAL;
}
flush_dcache_page(p->bvec->bv_page);
if (size > 0)
p->offset += size;
//flush_dcache_page(p->bvec->bv_page);
return size;
}
@ -184,41 +194,26 @@ device_sio_direct_splice_actor(struct pipe_inode_info *pipe, struct splice_desc
static void read_aops(struct device_sio_output *output, struct mars_ref_object *mref)
{
struct bio *bio = mref->orig_bio;
loff_t pos = ((loff_t)bio->bi_sector << 9); // TODO: make dynamic
struct bio_vec *bvec;
int i;
loff_t pos = mref->ref_pos;
int ret = -EIO;
bio_for_each_segment(bvec, bio, i) {
struct cookie_data cookie = {
.output = output,
.mref = mref,
.bvec = bvec,
.offset = bvec->bv_offset,
};
struct splice_desc sd = {
.len = 0,
.total_len = bvec->bv_len,
.flags = 0,
.pos = pos,
.u.data = &cookie,
};
struct cookie_data cookie = {
.output = output,
.mref = mref,
.data = mref->ref_data,
.len = mref->ref_len,
};
struct splice_desc sd = {
.len = 0,
.total_len = mref->ref_len,
.flags = 0,
.pos = pos,
.u.data = &cookie,
};
MARS_DBG("start splice %p %p %p %p\n", output, mref, bio, bvec);
ret = 0;
ret = splice_direct_to_actor(output->filp, &sd, device_sio_direct_splice_actor);
if (unlikely(ret < 0)) {
MARS_ERR("splice %p %p %p %p status=%d\n", output, mref, bio, bvec, ret);
break;
}
pos += bvec->bv_len;
bio->bi_size -= bvec->bv_len;
}
if (unlikely(bio->bi_size)) {
MARS_ERR("unhandled rest size %d on bio %p\n", bio->bi_size, bio);
ret = splice_direct_to_actor(output->filp, &sd, device_sio_direct_splice_actor);
if (unlikely(ret < 0)) {
MARS_ERR("splice %p %p status=%d\n", output, mref, ret);
}
mref->ref_cb->cb_error = ret;
}
@ -228,7 +223,7 @@ static void sync_file(struct device_sio_output *output)
struct file *file = output->filp;
int ret;
#if 1
ret = vfs_fsync(file, file->f_path.dentry, 0);
ret = vfs_fsync(file, file->f_path.dentry, 1);
if (unlikely(ret)) {
MARS_ERR("syncing pages failed: %d\n", ret);
}
@ -238,9 +233,8 @@ static void sync_file(struct device_sio_output *output)
static void device_sio_ref_io(struct device_sio_output *output, struct mars_ref_object *mref, int rw)
{
struct bio *bio = mref->orig_bio;
struct generic_callback *cb = mref->ref_cb;
bool barrier = (rw != READ && bio_rw_flagged(bio, BIO_RW_BARRIER));
bool barrier = false;
int test;
if (unlikely(!output->filp)) {
@ -248,20 +242,6 @@ static void device_sio_ref_io(struct device_sio_output *output, struct mars_ref_
goto done;
}
/* Shortcut when possible
*/
if (S_ISBLK(output->filp->f_mapping->host->i_mode) && output->allow_bio) {
struct block_device *bdev = output->filp->f_mapping->host->i_bdev;
#if 0
static int count = 10;
if (count-- > 0)
MARS_INF("AHA: %p\n", bdev);
#endif
bio->bi_bdev = bdev;
submit_bio(bio->bi_rw, bio);
return;
}
if (barrier) {
MARS_INF("got barrier request\n");
sync_file(output);
@ -271,7 +251,7 @@ static void device_sio_ref_io(struct device_sio_output *output, struct mars_ref_
read_aops(output, mref);
} else {
write_aops(output, mref);
if (barrier || output->o_sync)
if (barrier || output->o_fdsync)
sync_file(output);
}
@ -302,6 +282,8 @@ static void device_sio_mars_queue(struct device_sio_output *output, struct mars_
struct generic_callback *cb = mref->ref_cb;
unsigned long flags;
atomic_inc(&mref->ref_count);
if (rw == READ) {
traced_lock(&output->g_lock, flags);
index = output->index++;
@ -519,6 +501,8 @@ static struct device_sio_brick_ops device_sio_brick_ops = {
static struct device_sio_output_ops device_sio_output_ops = {
.make_object_layout = device_sio_make_object_layout,
.mars_ref_get = device_sio_ref_get,
.mars_ref_put = device_sio_ref_put,
.mars_ref_io = device_sio_mars_queue,
.mars_get_info = device_sio_get_info,
};

View File

@ -30,8 +30,7 @@ struct device_sio_output {
MARS_OUTPUT(device_sio);
// parameters
bool o_direct;
bool o_sync;
bool allow_bio;
bool o_fdsync;
// private
struct file *filp;
struct sio_threadinfo tinfo[WITH_THREAD+1];

576
mars_device_sio_old.c Normal file
View File

@ -0,0 +1,576 @@
// (c) 2010 Thomas Schoebel-Theuer / 1&1 Internet AG
//#define BRICK_DEBUGGING
//#define MARS_DEBUGGING
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/string.h>
#include <linux/list.h>
#include <linux/types.h>
#include <linux/blkdev.h>
#include <linux/highmem.h>
#include <linux/kthread.h>
#include <linux/spinlock.h>
#include <linux/wait.h>
#include <linux/splice.h>
#include "mars.h"
///////////////////////// own type definitions ////////////////////////
#include "mars_device_sio.h"
////////////////// own brick / input / output operations //////////////////
// some code borrowed from the loopback driver
static int transfer_none(int cmd,
struct page *raw_page, unsigned raw_off,
struct page *loop_page, unsigned loop_off,
int size)
{
char *raw_buf = kmap_atomic(raw_page, KM_USER0) + raw_off;
char *loop_buf = kmap_atomic(loop_page, KM_USER1) + loop_off;
if (unlikely(!raw_buf || !loop_buf)) {
MARS_ERR("transfer NULL: %p %p\n", raw_buf, loop_buf);
return -EFAULT;
}
#if 1
if (cmd == READ)
memcpy(loop_buf, raw_buf, size);
else
memcpy(raw_buf, loop_buf, size);
#endif
kunmap_atomic(raw_buf, KM_USER0);
kunmap_atomic(loop_buf, KM_USER1);
cond_resched();
return 0;
}
static void write_aops(struct device_sio_output *output, struct mars_ref_object *mref)
{
struct bio *bio = mref->orig_bio;
loff_t pos = ((loff_t)bio->bi_sector << 9);
struct file *file = output->filp;
struct address_space *mapping;
struct bio_vec *bvec;
int i;
int ret = 0;
if (unlikely(!file)) {
MARS_FAT("No FILE\n");
return;
}
mapping = file->f_mapping;
MARS_DBG("write_aops pos=%llu len=%d\n", pos, bio->bi_size);
mutex_lock(&mapping->host->i_mutex);
bio_for_each_segment(bvec, bio, i) {
//pgoff_t index;
unsigned offset, bv_offs;
int len;
//index = pos >> PAGE_CACHE_SHIFT;
offset = pos & ((pgoff_t)PAGE_CACHE_SIZE - 1);
bv_offs = bvec->bv_offset;
len = bvec->bv_len;
while (len > 0) {
int transfer_result;
unsigned size, copied;
struct page *page;
void *fsdata;
size = PAGE_CACHE_SIZE - offset;
if (size > len)
size = len;
ret = pagecache_write_begin(file, mapping, pos, size, 0,
&page, &fsdata);
if (ret) {
MARS_ERR("cannot start pagecache_write_begin() error=%d\n", ret);
if (ret >= 0)
ret = -EIO;
goto fail;
}
//file_update_time(file);
transfer_result = transfer_none(WRITE, page, offset, bvec->bv_page, bv_offs, size);
copied = size;
if (transfer_result) {
MARS_ERR("transfer error %d\n", transfer_result);
copied = 0;
}
ret = pagecache_write_end(file, mapping, pos, size, copied,
page, fsdata);
if (ret < 0 || ret != copied || transfer_result) {
MARS_ERR("write error %d\n", ret);
if (ret >= 0)
ret = -EIO;
goto fail;
}
bv_offs += copied;
len -= copied;
offset = 0;
//index++;
pos += copied;
}
ret = 0;
}
fail:
mutex_unlock(&mapping->host->i_mutex);
mref->ref_cb->cb_error = ret;
#if 1
blk_run_address_space(mapping);
#endif
}
struct cookie_data {
struct device_sio_output *output;
struct mars_ref_object *mref;
struct bio_vec *bvec;
unsigned int offset;
};
static int
device_sio_splice_actor(struct pipe_inode_info *pipe,
struct pipe_buffer *buf,
struct splice_desc *sd)
{
struct cookie_data *p = sd->u.data;
struct page *page = buf->page;
sector_t IV;
int size, ret;
ret = buf->ops->confirm(pipe, buf);
if (unlikely(ret))
return ret;
IV = ((sector_t) page->index << (PAGE_CACHE_SHIFT - 9)) +
(buf->offset >> 9);
size = sd->len;
if (size > p->bvec->bv_len)
size = p->bvec->bv_len;
if (transfer_none(READ, page, buf->offset, p->bvec->bv_page, p->offset, size)) {
MARS_ERR("transfer error block %ld\n", p->bvec->bv_page->index);
size = -EINVAL;
}
flush_dcache_page(p->bvec->bv_page);
if (size > 0)
p->offset += size;
return size;
}
static int
device_sio_direct_splice_actor(struct pipe_inode_info *pipe, struct splice_desc *sd)
{
return __splice_from_pipe(pipe, sd, device_sio_splice_actor);
}
static void read_aops(struct device_sio_output *output, struct mars_ref_object *mref)
{
struct bio *bio = mref->orig_bio;
loff_t pos = ((loff_t)bio->bi_sector << 9); // TODO: make dynamic
struct bio_vec *bvec;
int i;
int ret = -EIO;
bio_for_each_segment(bvec, bio, i) {
struct cookie_data cookie = {
.output = output,
.mref = mref,
.bvec = bvec,
.offset = bvec->bv_offset,
};
struct splice_desc sd = {
.len = 0,
.total_len = bvec->bv_len,
.flags = 0,
.pos = pos,
.u.data = &cookie,
};
MARS_DBG("start splice %p %p %p %p\n", output, mref, bio, bvec);
ret = 0;
ret = splice_direct_to_actor(output->filp, &sd, device_sio_direct_splice_actor);
if (unlikely(ret < 0)) {
MARS_ERR("splice %p %p %p %p status=%d\n", output, mref, bio, bvec, ret);
break;
}
pos += bvec->bv_len;
bio->bi_size -= bvec->bv_len;
}
if (unlikely(bio->bi_size)) {
MARS_ERR("unhandled rest size %d on bio %p\n", bio->bi_size, bio);
}
mref->ref_cb->cb_error = ret;
}
static void sync_file(struct device_sio_output *output)
{
struct file *file = output->filp;
int ret;
#if 1
ret = vfs_fsync(file, file->f_path.dentry, 1);
if (unlikely(ret)) {
MARS_ERR("syncing pages failed: %d\n", ret);
}
return;
#endif
}
static void device_sio_ref_io(struct device_sio_output *output, struct mars_ref_object *mref, int rw)
{
struct bio *bio = mref->orig_bio;
struct generic_callback *cb = mref->ref_cb;
bool barrier = (rw != READ && bio_rw_flagged(bio, BIO_RW_BARRIER));
int test;
if (unlikely(!output->filp)) {
cb->cb_error = -EINVAL;
goto done;
}
#if 1
MARS_INF("got BIO %2lu %12ld %4d\n", bio->bi_rw, bio->bi_sector, bio->bi_size);
#endif
/* Shortcut when possible
*/
if (output->allow_bio && S_ISBLK(output->filp->f_mapping->host->i_mode)) {
struct block_device *bdev = output->filp->f_mapping->host->i_bdev;
#if 0
static int count = 10;
if (count-- > 0)
MARS_INF("AHA: %p\n", bdev);
#endif
bio->bi_bdev = bdev;
submit_bio(bio->bi_rw, bio);
return;
}
if (barrier) {
MARS_INF("got barrier request\n");
sync_file(output);
}
if (rw == READ) {
read_aops(output, mref);
} else {
write_aops(output, mref);
if (barrier || output->o_fdsync)
sync_file(output);
}
done:
#if 1
if (cb->cb_error < 0)
MARS_ERR("IO error %d\n", cb->cb_error);
#endif
cb->cb_fn(cb);
test = atomic_read(&mref->ref_count);
if (test <= 0) {
MARS_ERR("ref_count UNDERRUN %d\n", test);
atomic_set(&mref->ref_count, 1);
}
if (!atomic_dec_and_test(&mref->ref_count))
return;
device_sio_free_mars_ref(mref);
}
static void device_sio_mars_queue(struct device_sio_output *output, struct mars_ref_object *mref, int rw)
{
int index = 0;
struct sio_threadinfo *tinfo;
struct device_sio_mars_ref_aspect *mref_a;
struct generic_callback *cb = mref->ref_cb;
unsigned long flags;
if (rw == READ) {
traced_lock(&output->g_lock, flags);
index = output->index++;
traced_unlock(&output->g_lock, flags);
index = (index % WITH_THREAD) + 1;
}
mref_a = device_sio_mars_ref_get_aspect(output, mref);
if (unlikely(!mref_a)) {
MARS_FAT("cannot get aspect\n");
cb->cb_error = -EINVAL;
cb->cb_fn(cb);
return;
}
tinfo = &output->tinfo[index];
MARS_DBG("queueing %p on %d\n", mref, index);
traced_lock(&tinfo->lock, flags);
mref->ref_rw = rw;
list_add_tail(&mref_a->io_head, &tinfo->mref_list);
traced_unlock(&tinfo->lock, flags);
wake_up(&tinfo->event);
}
static int device_sio_thread(void *data)
{
struct sio_threadinfo *tinfo = data;
struct device_sio_output *output = tinfo->output;
MARS_INF("kthread has started.\n");
//set_user_nice(current, -20);
while (!kthread_should_stop()) {
struct list_head *tmp = NULL;
struct mars_ref_object *mref;
struct device_sio_mars_ref_aspect *mref_a;
unsigned long flags;
wait_event_interruptible_timeout(
tinfo->event,
!list_empty(&tinfo->mref_list) || kthread_should_stop(),
HZ);
tinfo->last_jiffies = jiffies;
traced_lock(&tinfo->lock, flags);
if (!list_empty(&tinfo->mref_list)) {
tmp = tinfo->mref_list.next;
list_del_init(tmp);
}
traced_unlock(&tinfo->lock, flags);
if (!tmp)
continue;
mref_a = container_of(tmp, struct device_sio_mars_ref_aspect, io_head);
mref = mref_a->object;
MARS_DBG("got %p %p\n", mref_a, mref);
device_sio_ref_io(output, mref, mref->ref_rw);
}
MARS_INF("kthread has stopped.\n");
return 0;
}
static int device_sio_watchdog(void *data)
{
struct device_sio_output *output = data;
MARS_INF("watchdog has started.\n");
while (!kthread_should_stop()) {
int i;
msleep_interruptible(5000);
for (i = 0; i <= WITH_THREAD; i++) {
struct sio_threadinfo *tinfo = &output->tinfo[i];
unsigned long now = jiffies;
unsigned long elapsed = now - tinfo->last_jiffies;
if (elapsed > 10 * HZ) {
tinfo->last_jiffies = now;
MARS_ERR("thread %d is dead for more than 10 seconds.\n", i);
}
}
}
return 0;
}
static int device_sio_get_info(struct device_sio_output *output, struct mars_info *info)
{
struct file *file = output->filp;
info->current_size = i_size_read(file->f_mapping->host);
info->backing_file = file;
return 0;
}
//////////////// object / aspect constructors / destructors ///////////////
static int device_sio_mars_ref_aspect_init_fn(struct generic_aspect *_ini, void *_init_data)
{
struct device_sio_mars_ref_aspect *ini = (void*)_ini;
INIT_LIST_HEAD(&ini->io_head);
return 0;
}
static void device_sio_mars_ref_aspect_exit_fn(struct generic_aspect *_ini, void *_init_data)
{
struct device_sio_mars_ref_aspect *ini = (void*)_ini;
(void)ini;
#if 1
CHECK_HEAD_EMPTY(&ini->io_head);
#endif
}
MARS_MAKE_STATICS(device_sio);
////////////////////// brick constructors / destructors ////////////////////
static int device_sio_brick_construct(struct device_sio_brick *brick)
{
return 0;
}
static int device_sio_switch(struct device_sio_brick *brick, bool state)
{
struct device_sio_output *output = brick->outputs[0];
char *path = output->output_name;
int flags = O_CREAT | O_RDWR | O_LARGEFILE;
int prot = 0600;
mm_segment_t oldfs;
if (output->o_direct) {
flags |= O_DIRECT;
MARS_INF("using O_DIRECT on %s\n", path);
}
if (state) {
oldfs = get_fs();
set_fs(get_ds());
output->filp = filp_open(path, flags, prot);
set_fs(oldfs);
if (IS_ERR(output->filp)) {
int err = PTR_ERR(output->filp);
MARS_ERR("can't open file '%s' status=%d\n", path, err);
output->filp = NULL;
return err;
}
#if 0
{
struct address_space *mapping = output->filp->f_mapping;
int old_gfp_mask = mapping_gfp_mask(mapping);
mapping_set_gfp_mask(mapping, old_gfp_mask & ~(__GFP_IO|__GFP_FS));
}
#endif
MARS_INF("opened file '%s'\n", path);
} else {
// TODO: close etc...
}
return 0;
}
static int device_sio_output_construct(struct device_sio_output *output)
{
struct task_struct *watchdog;
int index;
spin_lock_init(&output->g_lock);
output->index = 0;
for (index = 0; index <= WITH_THREAD; index++) {
struct sio_threadinfo *tinfo = &output->tinfo[index];
tinfo->output = output;
spin_lock_init(&tinfo->lock);
init_waitqueue_head(&tinfo->event);
INIT_LIST_HEAD(&tinfo->mref_list);
tinfo->last_jiffies = jiffies;
tinfo->thread = kthread_create(device_sio_thread, tinfo, "mars_sio%d", index);
if (IS_ERR(tinfo->thread)) {
int error = PTR_ERR(tinfo->thread);
MARS_ERR("cannot create thread, status=%d\n", error);
filp_close(output->filp, NULL);
return error;
}
wake_up_process(tinfo->thread);
}
watchdog = kthread_create(device_sio_watchdog, output, "mars_watchdog%d", 0);
if (!IS_ERR(watchdog)) {
wake_up_process(watchdog);
}
return 0;
}
static int device_sio_output_destruct(struct device_sio_output *output)
{
int index;
for (index = 0; index <= WITH_THREAD; index++) {
kthread_stop(output->tinfo[index].thread);
output->tinfo[index].thread = NULL;
}
if (output->filp) {
filp_close(output->filp, NULL);
output->filp = NULL;
}
return 0;
}
///////////////////////// static structs ////////////////////////
static struct device_sio_brick_ops device_sio_brick_ops = {
.brick_switch = device_sio_switch,
};
static struct device_sio_output_ops device_sio_output_ops = {
.make_object_layout = device_sio_make_object_layout,
.mars_ref_io = device_sio_mars_queue,
.mars_get_info = device_sio_get_info,
};
const struct device_sio_output_type device_sio_output_type = {
.type_name = "device_sio_output",
.output_size = sizeof(struct device_sio_output),
.master_ops = &device_sio_output_ops,
.output_construct = &device_sio_output_construct,
.output_destruct = &device_sio_output_destruct,
.aspect_types = device_sio_aspect_types,
.layout_code = {
[BRICK_OBJ_MARS_REF] = LAYOUT_NONE,
}
};
static const struct device_sio_output_type *device_sio_output_types[] = {
&device_sio_output_type,
};
const struct device_sio_brick_type device_sio_brick_type = {
.type_name = "device_sio_brick",
.brick_size = sizeof(struct device_sio_brick),
.max_inputs = 0,
.max_outputs = 1,
.master_ops = &device_sio_brick_ops,
.default_output_types = device_sio_output_types,
.brick_construct = &device_sio_brick_construct,
};
EXPORT_SYMBOL_GPL(device_sio_brick_type);
////////////////// module init stuff /////////////////////////
static int __init init_device_sio(void)
{
MARS_INF("init_device_sio()\n");
return device_sio_register_brick_type();
}
static void __exit exit_device_sio(void)
{
MARS_INF("exit_device_sio()\n");
device_sio_unregister_brick_type();
}
MODULE_DESCRIPTION("MARS device_sio brick");
MODULE_AUTHOR("Thomas Schoebel-Theuer <tst@1und1.de>");
MODULE_LICENSE("GPL");
module_init(init_device_sio);
module_exit(exit_device_sio);

View File

@ -33,34 +33,71 @@ static int device_minor = 0;
*/
static void _if_device_endio(struct generic_callback *cb)
{
struct mars_ref_object *mref = cb->cb_private;
struct bio *bio = mref->orig_bio;
struct if_device_mars_ref_aspect *mref_a = cb->cb_private;
struct if_device_mars_ref_aspect *master_mref_a;
struct bio *bio;
struct bio_vec *bvec;
int i;
int error;
if (unlikely(!mref_a)) {
MARS_FAT("callback with no mref_a called. something is very wrong here!\n");
return;
}
master_mref_a = mref_a->master;
if (unlikely(!master_mref_a)) {
MARS_FAT("master is missing. something is very wrong here!\n");
return;
}
if (cb->cb_error < 0)
master_mref_a->cb.cb_error = cb->cb_error;
if (!atomic_dec_and_test(&master_mref_a->split_count))
goto done;
bio = master_mref_a->orig_bio;
if (unlikely(!bio)) {
MARS_FAT("callback with no bio called. something is very wrong here!\n");
return;
}
error = cb->cb_error;
bio_for_each_segment(bvec, bio, i) {
kunmap(bvec->bv_page);
}
error = master_mref_a->cb.cb_error;
if (unlikely(error < 0)) {
MARS_ERR("NYI: error=%d RETRY LOGIC %u\n", error, bio->bi_size);
}
if (likely(error > 0)) { // bio conventions are slightly different...
} else { // bio conventions are slightly different...
error = 0;
bio->bi_size = 0;
}
bio_endio(bio, error);
done:
// paired with X1
GENERIC_INPUT_CALL(master_mref_a->input, mars_ref_put, master_mref_a->object);
}
/* accept a linux bio, wrap it into mref and call buf_io() on it.
/* accept a linux bio, convert to mref and call buf_io() on it.
*/
static int if_device_make_request(struct request_queue *q, struct bio *bio)
{
LIST_HEAD(tmp_list);
struct if_device_input *input;
struct if_device_brick *brick;
struct mars_ref_object *mref = NULL;
struct if_device_mars_ref_aspect *mref_a;
struct if_device_mars_ref_aspect *master;
struct generic_callback *cb;
struct bio_vec *bvec;
int i;
//bool barrier = ((bio->bi_rw & 1) != READ && bio_rw_flagged(bio, BIO_RW_BARRIER));
loff_t pos = ((loff_t)bio->bi_sector) << 9; // TODO: make dynamic
int rw = bio->bi_rw & 1;
int maxlen = 0;
int error = -ENOSYS;
MARS_DBG("make_request(%d)\n", bio->bi_size);
@ -79,33 +116,100 @@ static int if_device_make_request(struct request_queue *q, struct bio *bio)
msleep(100);
}
error = -ENOMEM;
mref = if_device_alloc_mars_ref(&brick->hidden_output, &input->mref_object_layout);
if (unlikely(!mref))
goto err;
MARS_INF("BIO rw=%d len = %d\n", rw, bio->bi_size);
bio_for_each_segment(bvec, bio, i) {
int bv_len = bvec->bv_len;
void *data = kmap(bvec->bv_page);
data += bvec->bv_offset;
mref_a = if_device_mars_ref_get_aspect(&brick->hidden_output, mref);
if (unlikely(!mref_a))
goto err;
cb = &mref_a->cb;
cb->cb_fn = _if_device_endio;
cb->cb_private = mref;
cb->cb_error = 0;
cb->cb_prev = NULL;
mref->ref_cb = cb;
while (bv_len > 0) {
int len = bv_len;
MARS_INF("rw = %d i = %d pos = %lld bv_len = %d maxlen = %d mref=%p\n", rw, i, pos, bv_len, maxlen, mref);
#if 1 // optimizing
if (mref) { // try to merge with previous bvec
if (len > maxlen) {
len = maxlen;
}
if (mref->ref_data + mref->ref_len == data && len > 0) {
mref->ref_len += len;
MARS_INF("merge %d new ref_len = %d\n", len, mref->ref_len);
} else {
mref = NULL;
}
}
#else
mref = NULL;
#endif
if (!mref) {
error = -ENOMEM;
mref = if_device_alloc_mars_ref(&brick->hidden_output, &input->mref_object_layout);
if (unlikely(!mref))
goto err;
mref_a = if_device_mars_ref_get_aspect(&brick->hidden_output, mref);
if (unlikely(!mref_a))
goto err;
cb = &mref_a->cb;
cb->cb_fn = _if_device_endio;
cb->cb_private = mref_a;
cb->cb_error = 0;
cb->cb_prev = NULL;
mref->ref_cb = cb;
mref_a->input = input;
mref_a->orig_bio = bio;
mref->ref_rw = mref->ref_may_write = rw;
mref->ref_pos = pos;
mref->ref_len = bv_len;
mref->ref_data = data;
error = GENERIC_INPUT_CALL(input, mars_ref_get, mref);
if (unlikely(error < 0))
goto err;
maxlen = mref->ref_len;
if (len > maxlen)
len = maxlen;
mref->ref_len = len;
list_add_tail(&mref_a->tmp_head, &tmp_list);
// The first mref is called "master". It carries the split_count
mref_a->master = container_of(tmp_list.next, struct if_device_mars_ref_aspect, tmp_head);
atomic_inc(&mref_a->master->split_count);
}
mars_ref_attach_bio(mref, bio);
pos += len;
data += len;
bv_len -= len;
maxlen -= len;
} // while bv_len > 0
} // foreach bvec
GENERIC_INPUT_CALL(input, mars_ref_io, mref, rw);
GENERIC_INPUT_CALL(input, mars_ref_put, mref);
return 0;
error = 0;
err:
MARS_ERR("cannot submit request, status=%d\n", error);
if (!mref)
master = NULL;
if (error < 0) {
MARS_ERR("cannot submit request, status=%d\n", error);
bio_endio(bio, error);
} else {
master = container_of(tmp_list.next, struct if_device_mars_ref_aspect, tmp_head);
// grab one extra reference X2
atomic_inc(&master->object->ref_count);
}
while (!list_empty(&tmp_list)) {
mref_a = container_of(tmp_list.next, struct if_device_mars_ref_aspect, tmp_head);
list_del_init(&mref_a->tmp_head);
mref = mref_a->object;
if (error >= 0) {
// paired with X1
atomic_inc(&mref_a->master->object->ref_count);
GENERIC_INPUT_CALL(input, mars_ref_io, mref, rw);
}
GENERIC_INPUT_CALL(input, mars_ref_put, mref);
}
// drop extra reference X2
if (master)
GENERIC_INPUT_CALL(input, mars_ref_put, master->object);
return error;
}
@ -136,6 +240,7 @@ static void if_device_unplug(struct request_queue *q)
{
//struct if_device_input *input = q->queuedata;
MARS_DBG("UNPLUG\n");
MARS_INF("UNPLUG\n");
queue_flag_clear_unlocked(QUEUE_FLAG_PLUGGED, q);
//blk_run_address_space(lo->lo_backing_file->f_mapping);
}
@ -145,11 +250,16 @@ static void if_device_unplug(struct request_queue *q)
static int if_device_mars_ref_aspect_init_fn(struct generic_aspect *_ini, void *_init_data)
{
struct if_device_mars_ref_aspect *ini = (void*)_ini;
INIT_LIST_HEAD(&ini->tmp_head);
atomic_set(&ini->split_count, 0);
return 0;
}
static void if_device_mars_ref_aspect_exit_fn(struct generic_aspect *_ini, void *_init_data)
{
struct if_device_mars_ref_aspect *ini = (void*)_ini;
CHECK_HEAD_EMPTY(&ini->tmp_head);
}
MARS_MAKE_STATICS(if_device);

View File

@ -7,7 +7,12 @@
struct if_device_mars_ref_aspect {
GENERIC_ASPECT(mars_ref);
struct list_head tmp_head;
struct if_device_mars_ref_aspect *master;
atomic_t split_count;
struct generic_callback cb;
struct bio *orig_bio;
struct if_device_input *input;
};
struct if_device_input {

View File

@ -5,21 +5,25 @@
#define DEFAULT_ORDER 0
//#define DEFAULT_BUFFERS (32768 / 2)
#define DEFAULT_MEM (1024 / 4 * 1024)
//#define DEFAULT_MEM (1024 / 4 * 1024)
#define DEFAULT_MEM (1024 / 4 * 1024 / 4)
#define TRANS_ORDER 4
#define TRANS_BUFFERS (32)
#define TRANS_MEM (1024 / 4)
//#define CONF_TEST // use intermediate mars_check bricks
#define CONF_BUF
#define CONF_USEBUF
#define CONF_TRANS
#define CONF_AIO // use device_aio instead of device_sio
//#define CONF_BUF
//#define CONF_USEBUF
//#define CONF_TRANS
//#define CONF_TRANS_FLYING 1
#define CONF_TRANS_SORT
#define CONF_DIRECT // use O_DIRECT
#define CONF_SNYC // use O_SYNC
//#define CONF_BIO // submit bios directly to device when possible
//#define CONF_DIRECT // use O_DIRECT
#define CONF_FDSYNC // use additional aio_fdsync
#define DIRECT
#include <linux/kernel.h>
#include <linux/module.h>
@ -35,12 +39,18 @@
#include "mars_if_device.h"
#include "mars_check.h"
#include "mars_device_sio.h"
#include "mars_device_aio.h"
#include "mars_buf.h"
#include "mars_usebuf.h"
#include "mars_trans_logger.h"
GENERIC_ASPECT_FUNCTIONS(generic,mars_ref);
#ifdef CONF_AIO
#define device_sio_brick device_aio_brick
#define device_sio_brick_type device_aio_brick_type
#endif
static struct generic_brick *if_brick = NULL;
static struct if_device_brick *_if_brick = NULL;
static struct generic_brick *usebuf_brick = NULL;
@ -135,11 +145,8 @@ void make_test_instance(void)
#ifdef CONF_DIRECT
_device_brick->outputs[0]->o_direct = true;
#endif
#ifdef CONF_SYNC
_device_brick->outputs[0]->o_sync = true;
#endif
#ifdef CONF_BIO
_device_brick->outputs[0]->allow_bio = true;
#ifdef CONF_FDSYNC
_device_brick->outputs[0]->o_fdsync = true;
#endif
device_brick->ops->brick_switch(device_brick, true);
@ -149,10 +156,11 @@ void make_test_instance(void)
usebuf_brick = brick(&usebuf_brick_type);
connect(if_brick->inputs[0], usebuf_brick->outputs[0]);
last = usebuf_brick->inputs[0];
#else
(void)usebuf_brick;
(void)tdevice_brick;
(void)_tdevice_brick;
last = if_brick->inputs[0];
#endif
@ -179,11 +187,8 @@ void make_test_instance(void)
#ifdef CONF_DIRECT
_tdevice_brick->outputs[0]->o_direct = true;
#endif
#ifdef CONF_SYNC
_tdevice_brick->outputs[0]->o_sync = true;
#endif
#ifdef CONF_BIO
_tdevice_brick->outputs[0]->allow_bio = true;
#ifdef CONF_FDSYNC
_tdevice_brick->outputs[0]->o_fdsync = true;
#endif
tdevice_brick->ops->brick_switch(tdevice_brick, true);
@ -219,14 +224,14 @@ void make_test_instance(void)
connect(trans_brick->inputs[1], tbuf_brick->outputs[0]);
connect(last, trans_brick->outputs[0]);
#else
#else // CONF_TRANS
(void)trans_brick;
(void)_trans_brick;
(void)tbuf_brick;
(void)_tbuf_brick;
(void)tdevice_brick;
connect(last, buf_brick->outputs[0]);
#endif
#endif // CONF_TRANS
if (false) { // ref-counting no longer valid
struct buf_output *output = _buf_brick->outputs[0];
@ -256,10 +261,21 @@ void make_test_instance(void)
GENERIC_OUTPUT_CALL(output, mars_ref_put, mref);
}
}
#else
#else // CONF_BUF
(void)trans_brick;
(void)_trans_brick;
(void)buf_brick;
(void)_buf_brick;
(void)tbuf_brick;
(void)_tbuf_brick;
(void)tdevice_brick;
(void)_tdevice_brick;
(void)test_endio;
connect(last, device_brick->outputs[0]);
#endif
#endif // CONF_BUF
msleep(200);
@ -279,6 +295,9 @@ void make_test_instance(void)
msleep(2000);
MARS_INF("------------- DONE --------------\n");
//msleep(1000 * 92);
// FIXME: this is never released!
atomic_inc(&current->mm->mm_users);
}
void destroy_test_instance(void)

View File

@ -1,8 +1,6 @@
// (c) 2010 Thomas Schoebel-Theuer / 1&1 Internet AG
/* Usebuf brick.
* translates from unbuffered IO to buffered IO (mars_{get,put}_buf)
*/
// Usebuf brick (just for demonstration)
//#define BRICK_DEBUGGING
//#define MARS_DEBUGGING
@ -10,7 +8,6 @@
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/string.h>
#include <linux/bio.h>
#include "mars.h"
@ -18,142 +15,10 @@
#include "mars_usebuf.h"
#define SHORTCUT
///////////////////////// own helper functions ////////////////////////
/* currently we have copy semantics :(
*/
static void _usebuf_copy(struct usebuf_mars_ref_aspect *mref_a, int rw)
{
void *ref_data = mref_a->object->ref_data;
void *bio_base = kmap_atomic(mref_a->bvec->bv_page, KM_USER0);
void *bio_data = bio_base + mref_a->bvec_offset;
int len = mref_a->bvec_len;
#if 1
if (rw == READ) {
memcpy(bio_data, ref_data, len);
} else {
memcpy(ref_data, bio_data, len);
}
#endif
kunmap_atomic(bio_base, KM_USER0);
}
static void usebuf_ref_put(struct usebuf_output *output, struct mars_ref_object *origmref);
static void _usebuf_origmref_endio(struct usebuf_output *output, struct mars_ref_object *origmref)
{
struct usebuf_mars_ref_aspect *origmref_a;
struct generic_callback *cb;
origmref_a = usebuf_mars_ref_get_aspect(output, origmref);
if (unlikely(!origmref_a)) {
MARS_FAT("cannot get origmref_a from origmref %p\n", origmref);
goto out;
}
MARS_DBG("origmref=%p subref_count=%d error=%d\n", origmref, atomic_read(&origmref_a->subref_count), origmref->cb_error);
CHECK_ATOMIC(&origmref_a->subref_count, 1);
if (!atomic_dec_and_test(&origmref_a->subref_count)) {
goto out;
}
cb = origmref->ref_cb;
MARS_DBG("DONE error=%d\n", cb->cb_error);
cb->cb_fn(cb);
usebuf_ref_put(output, origmref);
out:
return;
}
static void _usebuf_mref_endio(struct generic_callback *cb)
{
struct usebuf_mars_ref_aspect *mref_a;
struct mars_ref_object *mref;
struct usebuf_output *output;
struct mars_ref_object *origmref;
struct usebuf_mars_ref_aspect *origmref_a;
int status;
mref_a = cb->cb_private;
if (unlikely(!mref_a)) {
MARS_FAT("cannot get aspect\n");
goto out_fatal;
}
mref = mref_a->object;
if (unlikely(!mref)) {
MARS_FAT("cannot get mref\n");
goto out_fatal;
}
output = mref_a->output;
if (unlikely(!output)) {
MARS_FAT("bad argument output\n");
goto out_fatal;
}
origmref = mref_a->origmref;
if (unlikely(!origmref)) {
MARS_FAT("cannot get origmref\n");
goto out_fatal;
}
MARS_DBG("origmref=%p\n", origmref);
status = -EINVAL;
origmref_a = usebuf_mars_ref_get_aspect(output, origmref);
if (unlikely(!origmref_a)) {
MARS_ERR("cannot get origmref_a\n");
goto out_err;
}
// check if we have an initial read => now start the final write
if (mref->ref_may_write != READ && mref->ref_rw == READ && cb->cb_error >= 0) {
struct usebuf_input *input = output->brick->inputs[0];
status = -EIO;
if (unlikely(!(mref->ref_flags & MARS_REF_UPTODATE))) {
MARS_ERR("not UPTODATE after initial read\n");
goto out_err;
}
_usebuf_copy(mref_a, WRITE);
// grab extra reference
CHECK_ATOMIC(&origmref_a->subref_count, 1);
atomic_inc(&origmref_a->subref_count);
GENERIC_INPUT_CALL(input, mars_ref_io, mref, WRITE);
} else {
// finalize the read or final write
if (likely(!cb->cb_error)) {
struct bio *bio = origmref->orig_bio;
int direction;
status = -EINVAL;
if (unlikely(!bio)) {
MARS_ERR("bad bio setup on origmref %p", origmref);
goto out_err;
}
direction = bio->bi_rw & 1;
if (direction == READ) {
_usebuf_copy(mref_a, READ);
}
}
}
CHECK_ATOMIC(&origmref_a->subref_count, 1);
CHECK_ATOMIC(&mref->ref_count, 1);
status = cb->cb_error;
out_err:
if (status < 0) {
origmref->ref_cb->cb_error = status;
MARS_ERR("error %d\n", status);
}
_usebuf_origmref_endio(output, origmref);
out_fatal: // no chance to call callback; this will result in mem leak :(
;
}
////////////////// own brick / input / output operations //////////////////
static int usebuf_get_info(struct usebuf_output *output, struct mars_info *info)
@ -162,177 +27,184 @@ static int usebuf_get_info(struct usebuf_output *output, struct mars_info *info)
return GENERIC_INPUT_CALL(input, mars_get_info, info);
}
static int usebuf_ref_get(struct usebuf_output *output, struct mars_ref_object *origmref)
static inline void _copy_mref(struct mars_ref_object *b, struct mars_ref_object *a)
{
MARS_FAT("not callable!\n");
return -ENOSYS;
#if 0
struct usebuf_input *input = output->brick->inputs[0];
return GENERIC_INPUT_CALL(input, mars_ref_get, mref);
#endif
b->ref_pos = a->ref_pos;
b->ref_len = a->ref_len;
b->ref_may_write = a->ref_may_write;
// ref_data is NOT copied!
b->ref_flags = a->ref_flags;
b->ref_rw = a->ref_rw;
}
static void usebuf_ref_put(struct usebuf_output *output, struct mars_ref_object *origmref)
static void _usebuf_endio(struct generic_callback *cb)
{
CHECK_ATOMIC(&origmref->ref_count, 1);
if (!atomic_dec_and_test(&origmref->ref_count)) {
struct usebuf_mars_ref_aspect *mref_a = cb->cb_private;
struct mars_ref_object *mref;
struct mars_ref_object *sub_mref;
CHECK_PTR(mref_a, done);
mref = mref_a->object;
CHECK_PTR(mref, done);
CHECK_PTR(mref->ref_cb, done);
sub_mref = mref_a->sub_mref;
CHECK_PTR(sub_mref, done);
_copy_mref(mref, sub_mref);
if (mref->ref_data != sub_mref->ref_data) {
if (mref->ref_rw == 0) {
memcpy(mref->ref_data, sub_mref->ref_data, mref->ref_len);
}
}
#if 1
mref->ref_cb->cb_error = cb->cb_error;
mref->ref_cb->cb_fn(mref->ref_cb);
#endif
CHECK_ATOMIC(&mref->ref_count, 1);
if (!atomic_dec_and_test(&mref->ref_count))
return;
#if 1
CHECK_ATOMIC(&sub_mref->ref_count, 2);
atomic_dec(&sub_mref->ref_count);
{
int test = atomic_read(&sub_mref->ref_count);
if (test > 1) {
MARS_INF("ref_count = %d\n", test);
}
}
#endif
usebuf_free_mars_ref(mref);
done:;
}
static int usebuf_ref_get(struct usebuf_output *output, struct mars_ref_object *mref)
{
struct usebuf_input *input = output->brick->inputs[0];
struct usebuf_mars_ref_aspect *mref_a;
struct mars_ref_object *sub_mref;
int status = 0;
mref_a = usebuf_mars_ref_get_aspect(output, mref);
if (unlikely(!mref_a)) {
MARS_FAT("cannot get aspect\n");
return -EILSEQ;
}
sub_mref = mref_a->sub_mref;
if (!sub_mref) {
sub_mref = usebuf_alloc_mars_ref(output, &output->mref_object_layout);
if (unlikely(!sub_mref)) {
MARS_FAT("cannot get sub_mref\n");
return -ENOMEM;
}
mref_a->sub_mref = sub_mref;
_copy_mref(sub_mref, mref);
#if 1 // shortcut: do direct IO
if (!mref->ref_data)
MARS_ERR("NULL.......\n");
sub_mref->ref_data = mref->ref_data;
#else // normal case: buffered IO
sub_mref->ref_data = NULL;
#endif
sub_mref->ref_cb = &mref_a->cb;
mref_a->cb.cb_private = mref_a;
mref_a->cb.cb_fn = _usebuf_endio;
}
status = GENERIC_INPUT_CALL(input, mars_ref_get, sub_mref);
if (status < 0) {
return status;
}
_copy_mref(mref, sub_mref);
if (!mref->ref_data) {
MARS_INF("uiiiiiiiiiii\n");
mref->ref_data = sub_mref->ref_data;
}
if ((sub_mref->ref_flags & MARS_REF_UPTODATE) && mref->ref_data != sub_mref->ref_data) {
memcpy(mref->ref_data, sub_mref->ref_data, mref->ref_len);
}
atomic_inc(&mref->ref_count);
return status;
}
static void usebuf_ref_put(struct usebuf_output *output, struct mars_ref_object *mref)
{
struct usebuf_input *input = output->brick->inputs[0];
struct usebuf_mars_ref_aspect *mref_a;
struct mars_ref_object *sub_mref;
mref_a = usebuf_mars_ref_get_aspect(output, mref);
if (unlikely(!mref_a)) {
MARS_FAT("cannot get aspect\n");
return;
}
usebuf_free_mars_ref(origmref);
#if 0 // NYI
struct usebuf_input *input = output->brick->inputs[0];
GENERIC_INPUT_CALL(input, mars_ref_put, mref);
#endif
sub_mref = mref_a->sub_mref;
if (!sub_mref) {
MARS_FAT("sub_mref is missing\n");
return;
}
CHECK_ATOMIC(&mref->ref_count, 1);
if (!atomic_dec_and_test(&mref->ref_count))
return;
GENERIC_INPUT_CALL(input, mars_ref_put, sub_mref);
usebuf_free_mars_ref(mref);
}
static void usebuf_ref_io(struct usebuf_output *output, struct mars_ref_object *origmref, int rw)
static void usebuf_ref_io(struct usebuf_output *output, struct mars_ref_object *mref, int rw)
{
struct usebuf_input *input = output->brick->inputs[0];
struct bio *bio = origmref->orig_bio;
struct bio_vec *bvec;
struct usebuf_mars_ref_aspect *origmref_a;
loff_t start_pos;
int start_len;
int status;
int i;
#if 1
static int last_jiffies = 0;
if (!last_jiffies || ((int)jiffies) - last_jiffies >= HZ * 30) {
last_jiffies = jiffies;
MARS_INF("USEBUF: reads=%d writes=%d prereads=%d\n", atomic_read(&output->io_count) - atomic_read(&output->write_count), atomic_read(&output->write_count), atomic_read(&output->preread_count));
}
#endif
struct usebuf_mars_ref_aspect *mref_a;
struct mars_ref_object *sub_mref;
struct generic_callback *cb;
int error = -EILSEQ;
MARS_DBG("START origmref=%p\n", origmref);
status = -EINVAL;
if (unlikely(!bio)) {
MARS_ERR("cannot get bio\n");
goto done;
}
origmref_a = usebuf_mars_ref_get_aspect(output, origmref);
if (unlikely(!origmref_a)) {
MARS_ERR("cannot get origmref_a\n");
goto done;
mref_a = usebuf_mars_ref_get_aspect(output, mref);
if (unlikely(!mref_a)) {
MARS_FAT("cannot get aspect\n");
goto err;
}
origmref->ref_cb->cb_error = 0;
sub_mref = mref_a->sub_mref;
if (!sub_mref) {
MARS_FAT("sub_mref is missing\n");
goto err;
}
// initial refcount: prevent intermediate drops
_CHECK_ATOMIC(&origmref->ref_count, !=, 1);
atomic_inc(&origmref->ref_count);
_CHECK_ATOMIC(&origmref_a->subref_count, !=, 0);
atomic_set(&origmref_a->subref_count, 1);
start_pos = ((loff_t)bio->bi_sector) << 9; // TODO: make dynamic
start_len = bio->bi_size;
bio_for_each_segment(bvec, bio, i) {
int this_len = bvec->bv_len;
int my_offset = 0;
while (this_len > 0) {
struct mars_ref_object *mref;
struct usebuf_mars_ref_aspect *mref_a;
struct generic_callback *cb;
int my_len;
int my_rw;
mref = usebuf_alloc_mars_ref(output, &output->ref_object_layout);
status = -ENOMEM;
if (unlikely(!mref)) {
MARS_ERR("cannot alloc buffer, status=%d\n", status);
goto done_drop;
}
mref->ref_pos = start_pos;
mref->ref_len = this_len;
mref->ref_may_write = rw;
status = GENERIC_INPUT_CALL(input, mars_ref_get, mref);
if (unlikely(status < 0)) {
MARS_ERR("cannot get buffer, status=%d\n", status);
goto done_drop;
}
my_len = status;
MARS_DBG("origmref=%p got mref=%p pos=%lld len=%d mode=%d flags=%d status=%d\n", origmref, mref, start_pos, this_len, mref->ref_may_write, mref->ref_flags, status);
status = -ENOMEM;
mref_a = usebuf_mars_ref_get_aspect(output, mref);
if (unlikely(!mref_a)) {
MARS_ERR("cannot get my own mref aspect\n");
goto put;
}
mref_a->origmref = origmref;
mref_a->bvec = bvec;
mref_a->bvec_offset = bvec->bv_offset + my_offset;
mref_a->bvec_len = my_len;
status = 0;
if ((mref->ref_flags & MARS_REF_UPTODATE) && rw == READ) {
// cache hit: immediately signal success
_usebuf_copy(mref_a, READ);
goto put;
}
cb = &mref_a->cb;
cb->cb_fn = _usebuf_mref_endio;
cb->cb_private = mref_a;
cb->cb_error = 0;
cb->cb_prev = NULL;
mref_a->output = output;
mref->ref_cb = cb;
my_rw = rw;
atomic_inc(&output->io_count);
if (!(my_rw == READ)) {
atomic_inc(&output->write_count);
if (mref->ref_flags & MARS_REF_UPTODATE) {
// buffer uptodate: start writing.
_usebuf_copy(mref_a, WRITE);
} else {
// first start initial read, to get the whole buffer UPTODATE
my_rw = READ;
atomic_inc(&output->preread_count);
}
}
// grab reference for each sub-IO
CHECK_ATOMIC(&origmref_a->subref_count, 1);
atomic_inc(&origmref_a->subref_count);
GENERIC_INPUT_CALL(input, mars_ref_io, mref, my_rw);
put:
GENERIC_INPUT_CALL(input, mars_ref_put, mref);
if (unlikely(status < 0))
break;
start_len -= my_len;
start_pos += my_len;
this_len -= my_len;
my_offset += my_len;
}
if (unlikely(this_len != 0)) {
MARS_ERR("bad internal length %d (status=%d)\n", this_len, status);
if (mref->ref_data != sub_mref->ref_data) {
if (rw != 0) {
memcpy(sub_mref->ref_data, mref->ref_data, mref->ref_len);
}
}
if (unlikely(start_len != 0 && !status)) {
MARS_ERR("length mismatch %d (status=%d)\n", start_len, status);
atomic_inc(&mref->ref_count);
/* Optimization: when buffered IO is used and buffer is already
* uptodate, skip real IO operation.
*/
if (rw != 0 || !(sub_mref->ref_flags & MARS_REF_UPTODATE)) {
GENERIC_INPUT_CALL(input, mars_ref_io, sub_mref, rw);
_copy_mref(mref, sub_mref);
} else {
_usebuf_endio(sub_mref->ref_cb);
}
done_drop:
// drop initial refcount
if (status < 0)
origmref->ref_cb->cb_error = status;
_usebuf_origmref_endio(output, origmref);
return;
done:
MARS_DBG("status=%d\n", status);
err:
cb = mref->ref_cb;
cb->cb_error = error;
cb->cb_fn(cb);
return;
}
//////////////// object / aspect constructors / destructors ///////////////
@ -340,8 +212,7 @@ done:
static int usebuf_mars_ref_aspect_init_fn(struct generic_aspect *_ini, void *_init_data)
{
struct usebuf_mars_ref_aspect *ini = (void*)_ini;
ini->origmref = NULL;
ini->bvec = NULL;
(void)ini;
return 0;
}

View File

@ -4,13 +4,8 @@
struct usebuf_mars_ref_aspect {
GENERIC_ASPECT(mars_ref);
struct mars_ref_object *origmref;
struct mars_ref_object *sub_mref;
struct generic_callback cb;
struct usebuf_output *output;
struct bio_vec *bvec;
int bvec_offset;
int bvec_len;
atomic_t subref_count;
};
struct usebuf_brick {
@ -23,10 +18,7 @@ struct usebuf_input {
struct usebuf_output {
MARS_OUTPUT(usebuf);
struct generic_object_layout ref_object_layout;
atomic_t io_count;
atomic_t write_count;
atomic_t preread_count;
struct generic_object_layout mref_object_layout;
};
MARS_TYPES(usebuf);

436
mars_usebuf_old.c Normal file
View File

@ -0,0 +1,436 @@
// (c) 2010 Thomas Schoebel-Theuer / 1&1 Internet AG
/* Usebuf brick.
* translates from unbuffered IO to buffered IO (mars_{get,put}_buf)
*/
//#define BRICK_DEBUGGING
//#define MARS_DEBUGGING
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/string.h>
#include <linux/bio.h>
#include "mars.h"
///////////////////////// own type definitions ////////////////////////
#include "mars_usebuf.h"
///////////////////////// own helper functions ////////////////////////
/* currently we have copy semantics :(
*/
static void _usebuf_copy(struct usebuf_mars_ref_aspect *mref_a, int rw)
{
void *ref_data = mref_a->object->ref_data;
void *bio_base = kmap_atomic(mref_a->bvec->bv_page, KM_USER0);
void *bio_data = bio_base + mref_a->bvec_offset;
int len = mref_a->bvec_len;
#if 1
if (rw == READ) {
memcpy(bio_data, ref_data, len);
} else {
memcpy(ref_data, bio_data, len);
}
#endif
kunmap_atomic(bio_base, KM_USER0);
}
static void usebuf_ref_put(struct usebuf_output *output, struct mars_ref_object *origmref);
static void _usebuf_origmref_endio(struct usebuf_output *output, struct mars_ref_object *origmref)
{
struct usebuf_mars_ref_aspect *origmref_a;
struct generic_callback *cb;
origmref_a = usebuf_mars_ref_get_aspect(output, origmref);
if (unlikely(!origmref_a)) {
MARS_FAT("cannot get origmref_a from origmref %p\n", origmref);
goto out;
}
MARS_DBG("origmref=%p subref_count=%d error=%d\n", origmref, atomic_read(&origmref_a->subref_count), origmref->cb_error);
CHECK_ATOMIC(&origmref_a->subref_count, 1);
if (!atomic_dec_and_test(&origmref_a->subref_count)) {
goto out;
}
cb = origmref->ref_cb;
MARS_DBG("DONE error=%d\n", cb->cb_error);
cb->cb_fn(cb);
usebuf_ref_put(output, origmref);
out:
return;
}
static void _usebuf_mref_endio(struct generic_callback *cb)
{
struct usebuf_mars_ref_aspect *mref_a;
struct mars_ref_object *mref;
struct usebuf_output *output;
struct mars_ref_object *origmref;
struct usebuf_mars_ref_aspect *origmref_a;
int status;
mref_a = cb->cb_private;
if (unlikely(!mref_a)) {
MARS_FAT("cannot get aspect\n");
goto out_fatal;
}
mref = mref_a->object;
if (unlikely(!mref)) {
MARS_FAT("cannot get mref\n");
goto out_fatal;
}
output = mref_a->output;
if (unlikely(!output)) {
MARS_FAT("bad argument output\n");
goto out_fatal;
}
origmref = mref_a->origmref;
if (unlikely(!origmref)) {
MARS_FAT("cannot get origmref\n");
goto out_fatal;
}
MARS_DBG("origmref=%p\n", origmref);
status = -EINVAL;
origmref_a = usebuf_mars_ref_get_aspect(output, origmref);
if (unlikely(!origmref_a)) {
MARS_ERR("cannot get origmref_a\n");
goto out_err;
}
// check if we have an initial read => now start the final write
if (mref->ref_may_write != READ && mref->ref_rw == READ && cb->cb_error >= 0) {
struct usebuf_input *input = output->brick->inputs[0];
status = -EIO;
if (unlikely(!(mref->ref_flags & MARS_REF_UPTODATE))) {
MARS_ERR("not UPTODATE after initial read\n");
goto out_err;
}
_usebuf_copy(mref_a, WRITE);
// grab extra reference
CHECK_ATOMIC(&origmref_a->subref_count, 1);
atomic_inc(&origmref_a->subref_count);
GENERIC_INPUT_CALL(input, mars_ref_io, mref, WRITE);
} else {
// finalize the read or final write
if (likely(!cb->cb_error)) {
struct bio *bio = origmref->orig_bio;
int direction;
status = -EINVAL;
if (unlikely(!bio)) {
MARS_ERR("bad bio setup on origmref %p", origmref);
goto out_err;
}
direction = bio->bi_rw & 1;
if (direction == READ) {
_usebuf_copy(mref_a, READ);
}
}
}
CHECK_ATOMIC(&origmref_a->subref_count, 1);
CHECK_ATOMIC(&mref->ref_count, 1);
status = cb->cb_error;
out_err:
if (status < 0) {
origmref->ref_cb->cb_error = status;
MARS_ERR("error %d\n", status);
}
_usebuf_origmref_endio(output, origmref);
out_fatal: // no chance to call callback; this will result in mem leak :(
;
}
////////////////// own brick / input / output operations //////////////////
static int usebuf_get_info(struct usebuf_output *output, struct mars_info *info)
{
struct usebuf_input *input = output->brick->inputs[0];
return GENERIC_INPUT_CALL(input, mars_get_info, info);
}
static int usebuf_ref_get(struct usebuf_output *output, struct mars_ref_object *origmref)
{
MARS_FAT("not callable!\n");
return -ENOSYS;
#if 0
struct usebuf_input *input = output->brick->inputs[0];
return GENERIC_INPUT_CALL(input, mars_ref_get, mref);
#endif
}
static void usebuf_ref_put(struct usebuf_output *output, struct mars_ref_object *origmref)
{
CHECK_ATOMIC(&origmref->ref_count, 1);
if (!atomic_dec_and_test(&origmref->ref_count)) {
return;
}
usebuf_free_mars_ref(origmref);
#if 0 // NYI
struct usebuf_input *input = output->brick->inputs[0];
GENERIC_INPUT_CALL(input, mars_ref_put, mref);
#endif
}
static void usebuf_ref_io(struct usebuf_output *output, struct mars_ref_object *origmref, int rw)
{
struct usebuf_input *input = output->brick->inputs[0];
struct bio *bio = origmref->orig_bio;
struct bio_vec *bvec;
struct usebuf_mars_ref_aspect *origmref_a;
loff_t start_pos;
int start_len;
int status;
int i;
#if 1
static int last_jiffies = 0;
if (!last_jiffies || ((int)jiffies) - last_jiffies >= HZ * 30) {
last_jiffies = jiffies;
MARS_INF("USEBUF: reads=%d writes=%d prereads=%d\n", atomic_read(&output->io_count) - atomic_read(&output->write_count), atomic_read(&output->write_count), atomic_read(&output->preread_count));
}
#endif
MARS_DBG("START origmref=%p\n", origmref);
status = -EINVAL;
if (unlikely(!bio)) {
MARS_ERR("cannot get bio\n");
goto done;
}
origmref_a = usebuf_mars_ref_get_aspect(output, origmref);
if (unlikely(!origmref_a)) {
MARS_ERR("cannot get origmref_a\n");
goto done;
}
origmref->ref_cb->cb_error = 0;
// initial refcount: prevent intermediate drops
_CHECK_ATOMIC(&origmref->ref_count, !=, 1);
atomic_inc(&origmref->ref_count);
_CHECK_ATOMIC(&origmref_a->subref_count, !=, 0);
atomic_set(&origmref_a->subref_count, 1);
start_pos = ((loff_t)bio->bi_sector) << 9; // TODO: make dynamic
start_len = bio->bi_size;
bio_for_each_segment(bvec, bio, i) {
int this_len = bvec->bv_len;
int my_offset = 0;
while (this_len > 0) {
struct mars_ref_object *mref;
struct usebuf_mars_ref_aspect *mref_a;
struct generic_callback *cb;
int my_len;
int my_rw;
mref = usebuf_alloc_mars_ref(output, &output->ref_object_layout);
status = -ENOMEM;
if (unlikely(!mref)) {
MARS_ERR("cannot alloc buffer, status=%d\n", status);
goto done_drop;
}
mref->ref_pos = start_pos;
mref->ref_len = this_len;
mref->ref_may_write = rw;
status = GENERIC_INPUT_CALL(input, mars_ref_get, mref);
if (unlikely(status < 0)) {
MARS_ERR("cannot get buffer, status=%d\n", status);
goto done_drop;
}
my_len = status;
MARS_DBG("origmref=%p got mref=%p pos=%lld len=%d mode=%d flags=%d status=%d\n", origmref, mref, start_pos, this_len, mref->ref_may_write, mref->ref_flags, status);
status = -ENOMEM;
mref_a = usebuf_mars_ref_get_aspect(output, mref);
if (unlikely(!mref_a)) {
MARS_ERR("cannot get my own mref aspect\n");
goto put;
}
mref_a->origmref = origmref;
mref_a->bvec = bvec;
mref_a->bvec_offset = bvec->bv_offset + my_offset;
mref_a->bvec_len = my_len;
status = 0;
if ((mref->ref_flags & MARS_REF_UPTODATE) && rw == READ) {
// cache hit: immediately signal success
_usebuf_copy(mref_a, READ);
goto put;
}
cb = &mref_a->cb;
cb->cb_fn = _usebuf_mref_endio;
cb->cb_private = mref_a;
cb->cb_error = 0;
cb->cb_prev = NULL;
mref_a->output = output;
mref->ref_cb = cb;
my_rw = rw;
atomic_inc(&output->io_count);
if (!(my_rw == READ)) {
atomic_inc(&output->write_count);
if (mref->ref_flags & MARS_REF_UPTODATE) {
// buffer uptodate: start writing.
_usebuf_copy(mref_a, WRITE);
} else {
// first start initial read, to get the whole buffer UPTODATE
my_rw = READ;
atomic_inc(&output->preread_count);
}
}
// grab reference for each sub-IO
CHECK_ATOMIC(&origmref_a->subref_count, 1);
atomic_inc(&origmref_a->subref_count);
GENERIC_INPUT_CALL(input, mars_ref_io, mref, my_rw);
put:
GENERIC_INPUT_CALL(input, mars_ref_put, mref);
if (unlikely(status < 0))
break;
start_len -= my_len;
start_pos += my_len;
this_len -= my_len;
my_offset += my_len;
}
if (unlikely(this_len != 0)) {
MARS_ERR("bad internal length %d (status=%d)\n", this_len, status);
}
}
if (unlikely(start_len != 0 && !status)) {
MARS_ERR("length mismatch %d (status=%d)\n", start_len, status);
}
done_drop:
// drop initial refcount
if (status < 0)
origmref->ref_cb->cb_error = status;
_usebuf_origmref_endio(output, origmref);
done:
MARS_DBG("status=%d\n", status);
}
//////////////// object / aspect constructors / destructors ///////////////
static int usebuf_mars_ref_aspect_init_fn(struct generic_aspect *_ini, void *_init_data)
{
struct usebuf_mars_ref_aspect *ini = (void*)_ini;
ini->origmref = NULL;
ini->bvec = NULL;
return 0;
}
static void usebuf_mars_ref_aspect_exit_fn(struct generic_aspect *_ini, void *_init_data)
{
struct usebuf_mars_ref_aspect *ini = (void*)_ini;
(void)ini;
}
MARS_MAKE_STATICS(usebuf);
////////////////////// brick constructors / destructors ////////////////////
static int usebuf_brick_construct(struct usebuf_brick *brick)
{
return 0;
}
static int usebuf_output_construct(struct usebuf_output *output)
{
return 0;
}
///////////////////////// static structs ////////////////////////
static struct usebuf_brick_ops usebuf_brick_ops = {
};
static struct usebuf_output_ops usebuf_output_ops = {
.make_object_layout = usebuf_make_object_layout,
.mars_get_info = usebuf_get_info,
.mars_ref_get = usebuf_ref_get,
.mars_ref_put = usebuf_ref_put,
.mars_ref_io = usebuf_ref_io,
};
const struct usebuf_input_type usebuf_input_type = {
.type_name = "usebuf_input",
.input_size = sizeof(struct usebuf_input),
};
static const struct usebuf_input_type *usebuf_input_types[] = {
&usebuf_input_type,
};
const struct usebuf_output_type usebuf_output_type = {
.type_name = "usebuf_output",
.output_size = sizeof(struct usebuf_output),
.master_ops = &usebuf_output_ops,
.output_construct = &usebuf_output_construct,
.aspect_types = usebuf_aspect_types,
.layout_code = {
[BRICK_OBJ_MARS_REF] = LAYOUT_ALL,
}
};
static const struct usebuf_output_type *usebuf_output_types[] = {
&usebuf_output_type,
};
const struct usebuf_brick_type usebuf_brick_type = {
.type_name = "usebuf_brick",
.brick_size = sizeof(struct usebuf_brick),
.max_inputs = 1,
.max_outputs = 1,
.master_ops = &usebuf_brick_ops,
.default_input_types = usebuf_input_types,
.default_output_types = usebuf_output_types,
.brick_construct = &usebuf_brick_construct,
};
EXPORT_SYMBOL_GPL(usebuf_brick_type);
////////////////// module init stuff /////////////////////////
static int __init init_usebuf(void)
{
printk(MARS_INFO "init_usebuf()\n");
return usebuf_register_brick_type();
}
static void __exit exit_usebuf(void)
{
printk(MARS_INFO "exit_usebuf()\n");
usebuf_unregister_brick_type();
}
MODULE_DESCRIPTION("MARS usebuf brick");
MODULE_AUTHOR("Thomas Schoebel-Theuer <tst@1und1.de>");
MODULE_LICENSE("GPL");
module_init(init_usebuf);
module_exit(exit_usebuf);

34
mars_usebuf_old.h Normal file
View File

@ -0,0 +1,34 @@
// (c) 2010 Thomas Schoebel-Theuer / 1&1 Internet AG
#ifndef MARS_USEBUF_H
#define MARS_USEBUF_H
struct usebuf_mars_ref_aspect {
GENERIC_ASPECT(mars_ref);
struct mars_ref_object *origmref;
struct generic_callback cb;
struct usebuf_output *output;
struct bio_vec *bvec;
int bvec_offset;
int bvec_len;
atomic_t subref_count;
};
struct usebuf_brick {
MARS_BRICK(usebuf);
};
struct usebuf_input {
MARS_INPUT(usebuf);
};
struct usebuf_output {
MARS_OUTPUT(usebuf);
struct generic_object_layout ref_object_layout;
atomic_t io_count;
atomic_t write_count;
atomic_t preread_count;
};
MARS_TYPES(usebuf);
#endif