import mars-87.tgz

This commit is contained in:
Thomas Schoebel-Theuer 2011-03-30 13:02:50 +01:00
parent 57da6d4b37
commit 9375986860
9 changed files with 123 additions and 39 deletions

2
mars.h
View File

@ -8,7 +8,7 @@
#include <asm/atomic.h>
#define MEMLEAK // FIXME: remove this
#define MARS_TRACING // write runtime trace data to /mars/trace.csv
//#define MARS_TRACING // write runtime trace data to /mars/trace.csv
/////////////////////////////////////////////////////////////////////////

View File

@ -86,36 +86,35 @@ int make_bio(struct bio_brick *brick, struct page *page, void *data, int len, lo
sector_offset = pos & ((1 << 9) - 1); // TODO: make dynamic
data_offset = ((unsigned long)data) & ((1 << 9) - 1); // TODO: make dynamic
if (unlikely(sector_offset != data_offset)) {
MARS_ERR("bad alignment: sector_offset %d != data_offet %d\n", sector_offset, data_offset);
goto out;
}
#if 1
if (unlikely(sector_offset > 0)) {
MARS_ERR("odd sector offset %d\n", sector_offset);
goto out;
}
if (unlikely(sector_offset != data_offset)) {
MARS_ERR("bad alignment: sector_offset %d != data_offet %d\n", sector_offset, data_offset);
goto out;
}
if (unlikely(ilen & ((1 << 9) - 1))) {
MARS_ERR("odd length %d\n", ilen);
goto out;
}
#else
// round down to start of first sector
data -= sector_offset;
ilen += sector_offset;
pos -= sector_offset;
// round up to full sector
ilen = (((ilen - 1) >> 9) + 1) << 9; // TODO: make dynamic
// map onto pages.
#if 1
if (page) {
struct page *test_page = virt_to_page(data);
if (test_page != page) {
MARS_ERR("PAGEMOVE %p != %p (addr = %p)\n", test_page, page, data);
}
}
#endif
// map onto pages. TODO: allow higher-order pages (possibly better performance!)
page_offset = pos & (PAGE_SIZE - 1);
page_len = ilen + page_offset;
bvec_count = (page_len - 1) / PAGE_SIZE + 1;
if (bvec_count > brick->bvec_max)
if (bvec_count > brick->bvec_max) {
bvec_count = brick->bvec_max;
}
MARS_IO("sector_offset = %d data = %p pos = %lld ilen = %d page_offset = %d page_len = %d bvec_count = %d\n", sector_offset, data, pos, ilen, page_offset, page_len, bvec_count);

View File

@ -186,6 +186,7 @@ static void client_ref_io(struct client_output *output, struct mref_object *mref
{
struct generic_callback *cb;
struct client_mref_aspect *mref_a;
int hash_index;
unsigned long flags;
int error = -EINVAL;
@ -195,17 +196,29 @@ static void client_ref_io(struct client_output *output, struct mref_object *mref
}
while (output->brick->max_flying > 0 && atomic_read(&output->fly_count) > output->brick->max_flying) {
MARS_IO("sleeping request pos = %lld len = %d rw = %d (flying = %d)\n", mref->ref_pos, mref->ref_len, mref->ref_rw, atomic_read(&output->fly_count));
#ifdef IO_DEBUGGING
msleep(3000);
#else
msleep(1000 * 2 / HZ);
#endif
}
atomic_inc(&output->fly_count);
atomic_inc(&mref->ref_count);
traced_lock(&output->lock, flags);
mref_a->object->ref_id = ++output->last_id;
mref->ref_id = ++output->last_id;
list_add_tail(&mref_a->io_head, &output->mref_list);
traced_unlock(&output->lock, flags);
hash_index = mref->ref_id % CLIENT_HASH_MAX;
traced_lock(&output->hash_lock[hash_index], flags);
list_add_tail(&mref_a->hash_head, &output->hash_table[hash_index]);
traced_unlock(&output->hash_lock[hash_index], flags);
MARS_IO("added request id = %d pos = %lld len = %d rw = %d (flying = %d)\n", mref->ref_id, mref->ref_pos, mref->ref_len, mref->ref_rw, atomic_read(&output->fly_count));
wake_up_interruptible(&output->event);
return;
@ -232,6 +245,7 @@ static int receiver_thread(void *data)
unsigned long flags;
status = mars_recv_struct(&output->socket, &cmd, mars_cmd_meta);
MARS_IO("got cmd = %d status = %d\n", cmd.cmd_code, status);
if (status < 0)
goto done;
@ -244,15 +258,18 @@ static int receiver_thread(void *data)
}
break;
case CMD_CB:
traced_lock(&output->lock, flags);
for (tmp = output->wait_list.next; tmp != &output->wait_list; tmp = tmp->next) {
mref_a = container_of(tmp, struct client_mref_aspect, io_head);
{
int hash_index = mref->ref_id % CLIENT_HASH_MAX;
traced_lock(&output->hash_lock[hash_index], flags);
for (tmp = output->hash_table[hash_index].next; tmp != &output->hash_table[hash_index]; tmp = tmp->next) {
mref_a = container_of(tmp, struct client_mref_aspect, hash_head);
if (mref_a->object->ref_id == cmd.cmd_int1) {
list_del_init(&mref_a->hash_head);
mref = mref_a->object;
break;
}
}
traced_unlock(&output->lock, flags);
traced_unlock(&output->hash_lock[hash_index], flags);
if (!mref) {
MARS_ERR("got unknown id = %d for callback\n", cmd.cmd_int1);
@ -260,9 +277,15 @@ static int receiver_thread(void *data)
goto done;
}
MARS_IO("got callback id = %d, old pos = %lld len = %d rw = %d\n", mref->ref_id, mref->ref_pos, mref->ref_len, mref->ref_rw);
status = mars_recv_cb(&output->socket, mref);
MARS_IO("new status = %d, pos = %lld len = %d rw = %d\n", status, mref->ref_pos, mref->ref_len, mref->ref_rw);
if (status < 0) {
MARS_ERR("interrupted data transfer during callback, status = %d\n", status);
traced_lock(&output->hash_lock[hash_index], flags);
list_add_tail(&mref_a->hash_head, &output->hash_table[hash_index]);
traced_unlock(&output->hash_lock[hash_index], flags);
goto done;
}
@ -276,6 +299,7 @@ static int receiver_thread(void *data)
cb->cb_fn(cb);
client_ref_put(output, mref);
break;
}
case CMD_GETINFO:
status = mars_recv_struct(&output->socket, &output->info, mars_info_meta);
if (status < 0) {
@ -318,11 +342,13 @@ static int sender_thread(void *data)
while (!kthread_should_stop()) {
struct list_head *tmp;
struct client_mref_aspect *mref_a;
struct mref_object *mref;
unsigned long flags;
bool do_resubmit = false;
if (unlikely(!output->socket)) {
status = _connect(output, brick->brick_name);
MARS_IO("connect status = %d\n", status);
if (unlikely(status < 0)) {
msleep(5000);
continue;
@ -333,6 +359,7 @@ static int sender_thread(void *data)
if (do_resubmit) {
/* Re-Submit any waiting requests
*/
MARS_IO("re-submit\n");
traced_lock(&output->lock, flags);
if (!list_empty(&output->wait_list)) {
struct list_head *first = output->wait_list.next;
@ -342,6 +369,7 @@ static int sender_thread(void *data)
list_connect(&output->mref_list, first);
list_connect(last, old_start);
INIT_LIST_HEAD(&output->wait_list);
MARS_IO("done re-submit %p %p\n", first, last);
}
traced_unlock(&output->lock, flags);
}
@ -378,8 +406,12 @@ static int sender_thread(void *data)
traced_unlock(&output->lock, flags);
mref_a = container_of(tmp, struct client_mref_aspect, io_head);
mref = mref_a->object;
status = mars_send_mref(&output->socket, mref_a->object);
MARS_IO("sending mref, id = %d pos = %lld len = %d rw = %d\n", mref->ref_id, mref->ref_pos, mref->ref_len, mref->ref_rw);
status = mars_send_mref(&output->socket, mref);
MARS_IO("status = %d\n", status);
if (unlikely(status < 0)) {
// retry submission on next occasion..
traced_lock(&output->lock, flags);
@ -450,13 +482,15 @@ static int client_mref_aspect_init_fn(struct generic_aspect *_ini, void *_init_d
{
struct client_mref_aspect *ini = (void*)_ini;
INIT_LIST_HEAD(&ini->io_head);
INIT_LIST_HEAD(&ini->hash_head);
return 0;
}
static void client_mref_aspect_exit_fn(struct generic_aspect *_ini, void *_init_data)
{
struct client_mref_aspect *ini = (void*)_ini;
(void)ini;
CHECK_HEAD_EMPTY(&ini->io_head);
CHECK_HEAD_EMPTY(&ini->hash_head);
}
MARS_MAKE_STATICS(client);
@ -470,6 +504,11 @@ static int client_brick_construct(struct client_brick *brick)
static int client_output_construct(struct client_output *output)
{
int i;
for (i = 0; i < CLIENT_HASH_MAX; i++) {
spin_lock_init(&output->hash_lock[i]);
INIT_LIST_HEAD(&output->hash_table[i]);
}
spin_lock_init(&output->lock);
INIT_LIST_HEAD(&output->mref_list);
INIT_LIST_HEAD(&output->wait_list);

View File

@ -4,9 +4,12 @@
#include "mars_net.h"
#define CLIENT_HASH_MAX 256
struct client_mref_aspect {
GENERIC_ASPECT(mref);
struct list_head io_head;
struct list_head hash_head;
bool do_dealloc;
};
@ -43,6 +46,8 @@ struct client_output {
wait_queue_head_t info_event;
bool get_info;
bool got_info;
spinlock_t hash_lock[CLIENT_HASH_MAX];
struct list_head hash_table[CLIENT_HASH_MAX];
};
MARS_TYPES(client);

View File

@ -1215,7 +1215,7 @@ struct mars_brick *make_brick_all(
brick = mars_make_brick(global, belongs, _client_brick_type, new_path, new_name);
if (brick) {
struct client_brick *_brick = (void*)brick;
_brick->max_flying = 1000;
_brick->max_flying = 10000;
}
}
}

View File

@ -114,6 +114,7 @@ static void _if_unplug(struct if_input *input)
if (!list_empty(&input->plug_anchor)) {
// move over the whole list
list_replace_init(&input->plug_anchor, &tmp_list);
atomic_set(&input->plugged_count, 0);
}
traced_unlock(&input->req_lock, flags);
up(&input->kick_sem);
@ -121,8 +122,16 @@ static void _if_unplug(struct if_input *input)
while (!list_empty(&tmp_list)) {
struct if_mref_aspect *mref_a;
struct mref_object *mref;
int hash_index;
unsigned long flags;
mref_a = container_of(tmp_list.next, struct if_mref_aspect, plug_head);
list_del_init(&mref_a->plug_head);
hash_index = mref_a->hash_index;
traced_lock(&input->hash_lock[hash_index], flags);
list_del_init(&mref_a->hash_head);
traced_unlock(&input->hash_lock[hash_index], flags);
mref = mref_a->object;
mars_trace(mref, "if_unplug");
@ -137,7 +146,7 @@ static void _if_unplug(struct if_input *input)
static int if_make_request(struct request_queue *q, struct bio *bio)
{
struct if_input *input;
struct if_brick *brick;
struct if_brick *brick = NULL;
struct mref_object *mref = NULL;
struct if_mref_aspect *mref_a;
struct generic_callback *cb;
@ -203,6 +212,7 @@ static int if_make_request(struct request_queue *q, struct bio *bio)
while (bv_len > 0) {
struct list_head *tmp;
int hash_index;
unsigned long flags;
int len = 0;
@ -212,10 +222,13 @@ static int if_make_request(struct request_queue *q, struct bio *bio)
MARS_IO("rw = %d i = %d pos = %lld bv_page = %p bv_offset = %d data = %p bv_len = %d\n", rw, i, pos, bvec->bv_page, bvec->bv_offset, data, bv_len);
#ifdef REQUEST_MERGING
traced_lock(&input->req_lock, flags);
for (tmp = input->plug_anchor.next; tmp != &input->plug_anchor; tmp = tmp->next) {
//traced_lock(&input->req_lock, flags);
//for(tmp = input->plug_anchor.next; tmp != &input->plug_anchor; tmp = tmp->next) {
hash_index = (pos / IF_HASH_CHUNK) % IF_HASH_MAX;
traced_lock(&input->hash_lock[hash_index], flags);
for (tmp = input->hash_table[hash_index].next; tmp != &input->hash_table[hash_index]; tmp = tmp->next) {
struct if_mref_aspect *tmp_a;
tmp_a = container_of(tmp, struct if_mref_aspect, plug_head);
tmp_a = container_of(tmp, struct if_mref_aspect, hash_head);
len = bv_len;
MARS_IO("bio = %p mref = %p len = %d maxlen = %d\n", bio, mref, len, tmp_a->maxlen);
@ -244,9 +257,11 @@ static int if_make_request(struct request_queue *q, struct bio *bio)
break;
}
}
traced_unlock(&input->req_lock, flags);
//traced_unlock(&input->req_lock, flags);
traced_unlock(&input->hash_lock[hash_index], flags);
#endif
if (!mref) {
int hash_index;
error = -ENOMEM;
mref = if_alloc_mref(&brick->hidden_output, &input->mref_object_layout);
if (unlikely(!mref)) {
@ -297,12 +312,17 @@ static int if_make_request(struct request_queue *q, struct bio *bio)
mref_a->maxlen = mref->ref_len - len;
mref->ref_len = len;
atomic_inc(&input->io_count);
if (brick->skip_sync && !barrier) {
mref->ref_skip_sync = true;
}
atomic_inc(&input->plugged_count);
hash_index = (mref->ref_pos / IF_HASH_CHUNK) % IF_HASH_MAX;
mref_a->hash_index = hash_index;
traced_lock(&input->hash_lock[hash_index], flags);
list_add_tail(&mref_a->hash_head, &input->hash_table[hash_index]);
traced_unlock(&input->hash_lock[hash_index], flags);
traced_lock(&input->req_lock, flags);
list_add_tail(&mref_a->plug_head, &input->plug_anchor);
@ -317,6 +337,7 @@ static int if_make_request(struct request_queue *q, struct bio *bio)
up(&input->kick_sem);
atomic_inc(&input->io_count);
error = 0;
err:
@ -330,7 +351,8 @@ err:
}
}
if (unplug) {
if (unplug ||
(brick && brick->max_plugged > 0 && atomic_read(&input->plugged_count) > brick->max_plugged)) {
_if_unplug(input);
}
@ -535,16 +557,16 @@ static int if_switch(struct if_brick *brick)
static int if_mref_aspect_init_fn(struct generic_aspect *_ini, void *_init_data)
{
struct if_mref_aspect *ini = (void*)_ini;
//INIT_LIST_HEAD(&ini->tmp_head);
INIT_LIST_HEAD(&ini->plug_head);
INIT_LIST_HEAD(&ini->hash_head);
return 0;
}
static void if_mref_aspect_exit_fn(struct generic_aspect *_ini, void *_init_data)
{
struct if_mref_aspect *ini = (void*)_ini;
//CHECK_HEAD_EMPTY(&ini->tmp_head);
CHECK_HEAD_EMPTY(&ini->plug_head);
CHECK_HEAD_EMPTY(&ini->hash_head);
}
MARS_MAKE_STATICS(if);
@ -565,10 +587,17 @@ static int if_brick_destruct(struct if_brick *brick)
static int if_input_construct(struct if_input *input)
{
int i;
for (i = 0; i < IF_HASH_MAX; i++) {
spin_lock_init(&input->hash_lock[i]);
INIT_LIST_HEAD(&input->hash_table[i]);
}
INIT_LIST_HEAD(&input->plug_anchor);
sema_init(&input->kick_sem, 1);
spin_lock_init(&input->req_lock);
atomic_set(&input->open_count, 0);
atomic_set(&input->io_count, 0);
atomic_set(&input->plugged_count, 0);
return 0;
}

View File

@ -9,10 +9,14 @@
#define MAX_BIO 8
#define IF_HASH_MAX 256
#define IF_HASH_CHUNK (1024 * 128)
struct if_mref_aspect {
GENERIC_ASPECT(mref);
//struct list_head tmp_head;
struct list_head plug_head;
struct list_head hash_head;
int hash_index;
int maxlen;
int bio_count;
struct page *orig_page;
@ -30,9 +34,12 @@ struct if_input {
struct block_device *bdev;
atomic_t open_count;
atomic_t io_count;
atomic_t plugged_count;
spinlock_t req_lock;
struct semaphore kick_sem;
struct generic_object_layout mref_object_layout;
spinlock_t hash_lock[IF_HASH_MAX];
struct list_head hash_table[IF_HASH_MAX];
};
struct if_output {
@ -42,6 +49,7 @@ struct if_output {
struct if_brick {
MARS_BRICK(if);
// parameters
int max_plugged;
int readahead;
bool skip_sync;
// inspectable

View File

@ -75,6 +75,7 @@ struct light_class {
#define IF_SKIP_SYNC true
#define IF_MAX_PLUGGED 10000
#define IF_READAHEAD 1
//#define IF_READAHEAD 0
#define BIO_READAHEAD 1
@ -190,8 +191,9 @@ void _set_if_params(struct mars_brick *_brick, void *private)
MARS_ERR("bad brick type\n");
return;
}
if_brick->skip_sync = IF_SKIP_SYNC;
if_brick->max_plugged = IF_MAX_PLUGGED;
if_brick->readahead = IF_READAHEAD;
if_brick->skip_sync = IF_SKIP_SYNC;
}
///////////////////////////////////////////////////////////////////////

View File

@ -1112,8 +1112,10 @@ void trans_logger_log(struct trans_logger_output *output)
{
struct trans_logger_brick *brick = output->brick;
int wait_timeout = HZ;
long long last_jiffies = jiffies;
long long log_jiffies = jiffies;
#ifdef MARS_TRACING
long long last_jiffies = jiffies;
#endif
mars_power_led_on((void*)brick, true);
@ -1134,7 +1136,7 @@ void trans_logger_log(struct trans_logger_output *output)
wait_timeout);
//MARS_INF("AHA %d\n", atomic_read(&output->q_phase1.q_queued));
#if 1
#ifdef MARS_TRACING
{
static int old_mshadow_count = 0;
int cnt;