import mars-75.tgz

This commit is contained in:
Thomas Schoebel-Theuer 2011-03-08 17:45:52 +01:00
parent 09fcbebc4e
commit 5ede8cd9ce
8 changed files with 170 additions and 81 deletions

View File

@ -56,17 +56,14 @@ void _delay(struct aio_threadinfo *tinfo, struct aio_mref_aspect *mref_a, int de
}
static
struct aio_mref_aspect *_get_delayed(struct aio_threadinfo *tinfo, bool remove)
struct aio_mref_aspect *__get_delayed(struct aio_threadinfo *tinfo, bool remove)
{
struct list_head *tmp;
struct aio_mref_aspect *mref_a;
unsigned long flags;
if (list_empty(&tinfo->delay_list))
return NULL;
traced_lock(&tinfo->lock, flags);
tmp = tinfo->delay_list.next;
mref_a = container_of(tmp, struct aio_mref_aspect, io_head);
if (mref_a->timeout < (long long)jiffies) {
@ -75,6 +72,19 @@ struct aio_mref_aspect *_get_delayed(struct aio_threadinfo *tinfo, bool remove)
list_del_init(tmp);
}
return mref_a;
}
static
struct aio_mref_aspect *_get_delayed(struct aio_threadinfo *tinfo, bool remove)
{
struct aio_mref_aspect *mref_a;
unsigned long flags;
traced_lock(&tinfo->lock, flags);
mref_a = __get_delayed(tinfo, remove);
traced_unlock(&tinfo->lock, flags);
return mref_a;
@ -273,7 +283,7 @@ static int aio_submit_thread(void *data)
list_del_init(tmp);
mref_a = container_of(tmp, struct aio_mref_aspect, io_head);
} else {
mref_a = _get_delayed(tinfo, true);
mref_a = __get_delayed(tinfo, true);
}
traced_unlock(&tinfo->lock, flags);

111
mars_if.c
View File

@ -299,26 +299,23 @@ err:
static int if_open(struct block_device *bdev, fmode_t mode)
{
struct if_input *input = bdev->bd_disk->private_data;
(void)input;
MARS_DBG("if_open()\n");
atomic_inc(&input->open_count);
MARS_INF("----------------------- OPEN %d ------------------------------\n", atomic_read(&input->open_count));
return 0;
}
static int if_release(struct gendisk *gd, fmode_t mode)
{
MARS_DBG("if_close()\n");
struct if_input *input = gd->private_data;
MARS_INF("----------------------- CLOSE %d ------------------------------\n", atomic_read(&input->open_count));
if (atomic_dec_and_test(&input->open_count)) {
struct if_brick *brick = input->brick;
brick->has_closed = true;
mars_trigger();
}
return 0;
}
static const struct block_device_operations if_blkdev_ops = {
.owner = THIS_MODULE,
.open = if_open,
.release = if_release,
};
////////////////// own brick / input / output operations //////////////////
static void if_unplug(struct request_queue *q)
{
struct if_input *input = q->queuedata;
@ -330,39 +327,12 @@ static void if_unplug(struct request_queue *q)
_if_unplug(input);
}
static const struct block_device_operations if_blkdev_ops = {
.owner = THIS_MODULE,
.open = if_open,
.release = if_release,
//////////////// object / aspect constructors / destructors ///////////////
static int if_mref_aspect_init_fn(struct generic_aspect *_ini, void *_init_data)
{
struct if_mref_aspect *ini = (void*)_ini;
//INIT_LIST_HEAD(&ini->tmp_head);
INIT_LIST_HEAD(&ini->plug_head);
return 0;
}
static void if_mref_aspect_exit_fn(struct generic_aspect *_ini, void *_init_data)
{
struct if_mref_aspect *ini = (void*)_ini;
//CHECK_HEAD_EMPTY(&ini->tmp_head);
CHECK_HEAD_EMPTY(&ini->plug_head);
}
MARS_MAKE_STATICS(if);
//////////////////////// contructors / destructors ////////////////////////
static int if_brick_construct(struct if_brick *brick)
{
struct if_output *hidden = &brick->hidden_output;
_if_output_init(brick, hidden, "internal");
return 0;
}
static int if_brick_destruct(struct if_brick *brick)
{
return 0;
}
};
static int if_switch(struct if_brick *brick)
{
@ -414,8 +384,6 @@ static int if_switch(struct if_brick *brick)
blk_queue_max_segment_size(q, MARS_MAX_SEGMENT_SIZE);
blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
q->unplug_fn = if_unplug;
sema_init(&input->kick_sem, 1);
spin_lock_init(&input->req_lock);
q->queue_lock = &input->req_lock; // needed!
//blk_queue_ordered(q, QUEUE_ORDERED_DRAIN, NULL);//???
@ -432,7 +400,6 @@ static int if_switch(struct if_brick *brick)
#if 0 // ???
blk_queue_merge_bvec(q, mars_merge_bvec);
#endif
INIT_LIST_HEAD(&input->plug_anchor);
// point of no return
//MARS_DBG("99999\n");
@ -442,11 +409,17 @@ static int if_switch(struct if_brick *brick)
mars_power_led_on((void*)brick, true);
} else {
mars_power_led_on((void*)brick, false);
disk = input->disk;
if (!disk)
goto down;
if (atomic_read(&input->open_count) > 0) {
MARS_INF("device '%s' is open %d times, cannot shutdown\n", disk->disk_name, atomic_read(&input->open_count));
return -EBUSY;
}
if (input->bdev) {
bdput(input->bdev);
input->bdev = NULL;
}
disk = input->disk;
if (disk) {
q = disk->queue;
del_gendisk(input->disk);
@ -457,13 +430,55 @@ static int if_switch(struct if_brick *brick)
}
}
//........
down:
mars_power_led_off((void*)brick, true);
}
return 0;
}
////////////////// own brick / input / output operations //////////////////
// none
//////////////// object / aspect constructors / destructors ///////////////
static int if_mref_aspect_init_fn(struct generic_aspect *_ini, void *_init_data)
{
struct if_mref_aspect *ini = (void*)_ini;
//INIT_LIST_HEAD(&ini->tmp_head);
INIT_LIST_HEAD(&ini->plug_head);
return 0;
}
static void if_mref_aspect_exit_fn(struct generic_aspect *_ini, void *_init_data)
{
struct if_mref_aspect *ini = (void*)_ini;
//CHECK_HEAD_EMPTY(&ini->tmp_head);
CHECK_HEAD_EMPTY(&ini->plug_head);
}
MARS_MAKE_STATICS(if);
//////////////////////// contructors / destructors ////////////////////////
static int if_brick_construct(struct if_brick *brick)
{
struct if_output *hidden = &brick->hidden_output;
_if_output_init(brick, hidden, "internal");
return 0;
}
static int if_brick_destruct(struct if_brick *brick)
{
return 0;
}
static int if_input_construct(struct if_input *input)
{
INIT_LIST_HEAD(&input->plug_anchor);
sema_init(&input->kick_sem, 1);
spin_lock_init(&input->req_lock);
atomic_set(&input->open_count, 0);
return 0;
}

View File

@ -31,6 +31,7 @@ struct if_input {
struct request_queue *q;
struct gendisk *disk;
struct block_device *bdev;
atomic_t open_count;
spinlock_t req_lock;
struct semaphore kick_sem;
struct generic_object_layout mref_object_layout;
@ -42,6 +43,7 @@ struct if_output {
struct if_brick {
MARS_BRICK(if);
bool has_closed;
struct if_output hidden_output;
};

View File

@ -655,7 +655,6 @@ struct mars_rotate {
bool has_error;
bool do_replay;
bool is_primary;
bool create_once;
};
static
@ -1054,7 +1053,7 @@ int _start_trans(struct mars_rotate *rot)
struct trans_logger_brick *trans_brick = rot->trans_brick;
int status = 0;
if (trans_brick->power.button) {
if (trans_brick->power.button || !trans_brick->power.led_off) {
goto done;
}
@ -1178,16 +1177,15 @@ int make_log_finalize(struct mars_global *global, struct mars_dent *parent)
}
goto done;
}
/* Special case: after a fresh start, when no logfile exists,
* create one. This is a thin exception from the rule that
/* Special case: when no logfile exists,
* create one. This is an exception from the rule that
* normally userspace should control what happens in MARS.
*/
if (!rot->relevant_log && rot->is_primary && !rot->has_error && rot->max_sequence > 0 && !rot->create_once) { // try to create an empty logfile
if (!rot->relevant_log && rot->is_primary && !rot->has_error && rot->max_sequence > 0) { // try to create an empty logfile
char *tmp = path_make("%s/log-%09d-%s", parent->d_path, rot->max_sequence + 1, my_id());
if (likely(tmp)) {
_create_new_logfile(tmp);
kfree(tmp);
rot->create_once = true;
msleep(1000);
goto done;
}
@ -1286,6 +1284,7 @@ static int make_dev(void *buf, struct mars_dent *dent)
struct mars_dent *parent = dent->d_parent;
struct mars_rotate *rot = parent->d_private;
struct mars_brick *dev_brick;
struct if_brick *_dev_brick;
int status = 0;
if (!global->global_power.button || !dent->d_parent || !dent->new_link) {
@ -1331,6 +1330,15 @@ static int make_dev(void *buf, struct mars_dent *dent)
return -1;
}
dev_brick->status_level = 1;
_dev_brick = (void*)dev_brick;
#if 0
if (_dev_brick->has_closed) {
_dev_brick->has_closed = false;
MARS_INF("rotating logfile for '%s'\n", parent->d_name);
status = mars_power_button((void*)rot->trans_brick, false);
rot->relevant_log = NULL;
}
#endif
done:
return status;

View File

@ -144,7 +144,9 @@ int mars_create_socket(struct socket **sock, struct sockaddr_storage *addr, bool
_check(status);
status = kernel_setsockopt(*sock, SOL_IP, SO_PRIORITY, (char*)&default_tcp_params.tos, sizeof(default_tcp_params.tos));
_check(status);
#if 0
status = kernel_setsockopt(*sock, IPPROTO_TCP, TCP_NODELAY, (char*)&x_true, sizeof(x_true));
#endif
_check(status);
status = kernel_setsockopt(*sock, SOL_SOCKET, SO_KEEPALIVE, (char*)&x_true, sizeof(x_true));
_check(status);

View File

@ -5,6 +5,8 @@
//#define BRICK_DEBUGGING
//#define MARS_DEBUGGING
//#define USE_MEMCPY
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/string.h>
@ -221,7 +223,8 @@ static struct trans_logger_mref_aspect *hash_find(struct hash_anchor *table, lof
return res;
}
static inline void hash_insert(struct hash_anchor *table, struct trans_logger_mref_aspect *elem_a, atomic_t *cnt)
static inline
void hash_insert(struct hash_anchor *table, struct trans_logger_mref_aspect *elem_a, atomic_t *cnt)
{
loff_t base_index = elem_a->object->ref_pos >> REGION_SIZE_BITS;
int hash = hash_fn(base_index);
@ -289,6 +292,11 @@ static int _read_ref_get(struct trans_logger_output *output, struct trans_logger
struct mref_object *shadow = shadow_a->object;
int diff = shadow->ref_pos - mref->ref_pos;
int restlen;
#if 1 // xxx
if (shadow_a == mref_a) {
MARS_ERR("oops %p == %p\n", shadow_a, mref_a);
}
#endif
if (diff > 0) {
/* Although the shadow is overlapping, the
* region before its start is _not_ shadowed.
@ -307,7 +315,13 @@ static int _read_ref_get(struct trans_logger_output *output, struct trans_logger
mref->ref_data = shadow->ref_data - diff;
mref->ref_flags = shadow->ref_flags;
mref_a->shadow_ref = shadow_a;
atomic_inc(&mref->ref_count);
atomic_set(&mref->ref_count, 1);
atomic_inc(&output->sshadow_count);
#ifdef USE_MEMCPY
if (mref_a->orig_data) {
memcpy(mref_a->orig_data, mref->ref_data, mref->ref_len);
}
#endif
return mref->ref_len;
}
@ -324,12 +338,17 @@ static int _write_ref_get(struct trans_logger_output *output, struct trans_logge
if (unlikely(!mref->ref_data)) {
return -ENOMEM;
}
atomic_inc(&output->mshadow_count);
#ifdef USE_MEMCPY
if (mref_a->orig_data) {
memcpy(mref->ref_data, mref_a->orig_data, mref->ref_len);
}
#endif
mref_a->output = output;
get_lamport(&mref_a->stamp);
mref->ref_flags = MREF_UPTODATE;
mref_a->shadow_ref = mref_a; // cyclic self-reference
mref_a->shadow_ref = mref_a; // cyclic self-reference => indicates master shadow
atomic_set(&mref->ref_count, 1);
get_lamport(&mref_a->stamp);
return mref->ref_len;
}
@ -340,6 +359,14 @@ static int trans_logger_ref_get(struct trans_logger_output *output, struct mref_
CHECK_PTR(output, err);
#if 1 // xxx
if (atomic_read(&mref->ref_count) > 0) { // setup already performed
MARS_INF("aha %d\n", atomic_read(&mref->ref_count));
atomic_inc(&mref->ref_count);
return mref->ref_len;
}
#endif
mref_a = trans_logger_mref_get_aspect(output, mref);
CHECK_PTR(mref_a, err);
CHECK_PTR(mref_a->object, err);
@ -357,7 +384,7 @@ static int trans_logger_ref_get(struct trans_logger_output *output, struct mref_
/* FIXME: THIS IS PROVISIONARY (use event instead)
*/
while (unlikely(!output->brick->power.led_on)) {
msleep(2 * HZ);
msleep(HZ);
}
return _write_ref_get(output, mref_a);
@ -372,6 +399,7 @@ static void trans_logger_ref_put(struct trans_logger_output *output, struct mref
struct trans_logger_mref_aspect *shadow_a;
struct trans_logger_input *input;
restart:
CHECK_ATOMIC(&mref->ref_count, 1);
CHECK_PTR(output, err);
@ -383,26 +411,38 @@ static void trans_logger_ref_put(struct trans_logger_output *output, struct mref
// are we a shadow?
shadow_a = mref_a->shadow_ref;
if (shadow_a) {
bool finished;
if (mref_a->is_hashed) {
finished = hash_put(output->hash_table, mref_a, &output->hash_count);
} else {
finished = atomic_dec_and_test(&mref->ref_count);
}
if (!finished) {
return;
}
if (shadow_a != mref_a) { // we are a slave shadow
//MARS_INF("slave\n");
atomic_dec(&output->sshadow_count);
CHECK_HEAD_EMPTY(&mref_a->hash_head);
if (atomic_dec_and_test(&mref->ref_count)) {
trans_logger_free_mref(mref);
}
}
// now put the master shadow
if (hash_put(output->hash_table, shadow_a, &output->hash_count)) {
struct mref_object *shadow = shadow_a->object;
kfree(shadow->ref_data);
//MARS_INF("hm?\n");
trans_logger_free_mref(shadow);
trans_logger_free_mref(mref);
// now put the master shadow
mref = shadow_a->object;
goto restart;
}
// we are a master shadow
CHECK_PTR(mref->ref_data, err);
kfree(mref->ref_data);
mref->ref_data = NULL;
atomic_dec(&output->mshadow_count);
trans_logger_free_mref(mref);
return;
}
input = output->brick->inputs[0];
GENERIC_INPUT_CALL(input, mref_put, mref);
err: ;
return;
err:
MARS_FAT("oops\n");
}
static void _trans_logger_endio(struct generic_callback *cb)
@ -450,11 +490,16 @@ static void trans_logger_ref_io(struct trans_logger_output *output, struct mref_
if (mref->ref_rw == READ) {
// nothing to do: directly signal success.
struct generic_callback *cb = mref->ref_cb;
#ifdef USE_MEMCPY
if (mref_a->orig_data) {
memcpy(mref_a->orig_data, mref->ref_data, mref->ref_len);
}
#endif
cb->cb_error = 0;
mref->ref_flags |= MREF_UPTODATE;
cb->cb_fn(cb);
// no touch of ref_count necessary
} else {
} else { // WRITE
#if 1
if (unlikely(mref_a->shadow_ref != mref_a)) {
MARS_ERR("something is wrong: %p != %p\n", mref_a->shadow_ref, mref_a);
@ -466,8 +511,11 @@ static void trans_logger_ref_io(struct trans_logger_output *output, struct mref_
}
#endif
mref->ref_flags |= MREF_WRITING;
MARS_DBG("hashing %d at %lld\n", mref->ref_len, mref->ref_pos);
hash_insert(output->hash_table, mref_a, &output->hash_count);
if (!mref_a->is_hashed) {
mref_a->is_hashed = true;
MARS_DBG("hashing %d at %lld\n", mref->ref_len, mref->ref_pos);
hash_insert(output->hash_table, mref_a, &output->hash_count);
}
q_insert(&output->q_phase1, mref_a);
wake_up_interruptible(&output->event);
}
@ -910,9 +958,9 @@ void trans_logger_log(struct trans_logger_output *output)
(kthread_should_stop() && !_congested(output)),
wait_jiffies);
#if 1
if (((int)jiffies) - last_jiffies >= HZ * 10 && atomic_read(&output->hash_count) > 0) {
if (((int)jiffies) - last_jiffies >= HZ * 10 && brick->power.button) {
last_jiffies = jiffies;
MARS_INF("LOGGER: hash_count=%d fly=%d phase1=%d/%d phase2=%d/%d phase3=%d/%d phase4=%d/%d\n", atomic_read(&output->hash_count), atomic_read(&output->fly_count), atomic_read(&output->q_phase1.q_queued), atomic_read(&output->q_phase1.q_flying), atomic_read(&output->q_phase2.q_queued), atomic_read(&output->q_phase2.q_flying), atomic_read(&output->q_phase3.q_queued), atomic_read(&output->q_phase3.q_flying), atomic_read(&output->q_phase4.q_queued), atomic_read(&output->q_phase4.q_flying));
MARS_INF("LOGGER: mshadow=%d sshadow=%d hash_count=%d fly=%d phase1=%d/%d phase2=%d/%d phase3=%d/%d phase4=%d/%d\n", atomic_read(&output->mshadow_count), atomic_read(&output->sshadow_count), atomic_read(&output->hash_count), atomic_read(&output->fly_count), atomic_read(&output->q_phase1.q_queued), atomic_read(&output->q_phase1.q_flying), atomic_read(&output->q_phase2.q_queued), atomic_read(&output->q_phase2.q_flying), atomic_read(&output->q_phase3.q_queued), atomic_read(&output->q_phase3.q_flying), atomic_read(&output->q_phase4.q_queued), atomic_read(&output->q_phase4.q_flying));
}
#endif
wait_jiffies = HZ;

View File

@ -45,6 +45,7 @@ struct trans_logger_mref_aspect {
struct pairing_heap_mref ph;
struct trans_logger_mref_aspect *shadow_ref;
void *orig_data;
bool is_hashed;
struct timespec stamp;
struct generic_callback cb;
struct trans_logger_mref_aspect *orig_mref_a;
@ -69,6 +70,8 @@ struct trans_logger_output {
struct hash_anchor hash_table[TRANS_HASH_MAX];
atomic_t hash_count;
atomic_t fly_count;
atomic_t mshadow_count;
atomic_t sshadow_count;
struct task_struct *thread;
wait_queue_head_t event;
// queues

View File

@ -10,6 +10,7 @@ my $host = `uname -n` or die "cannot determine my network node name\n";
chomp $host;
my $ip = `ip a` or die "cannot determine my IP address\n";
$ip =~ s/\A.*inet +(?!127\.0\.)([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+).*?\Z/$1/ms or die "cannot parse my IP address\n";
chomp $ip;
print "my IP is $ip\n";
umask 0177;