import mars-126.tgz

This commit is contained in:
Thomas Schoebel-Theuer 2011-08-26 11:31:56 +01:00
parent 3091f75f67
commit b54dbfb492
3 changed files with 301 additions and 262 deletions

View File

@ -20,8 +20,6 @@
#define MARS_MAX_AIO 1024
#define MARS_MAX_AIO_READ 32
#define STRONG_MM
#define MEMLEAK // FIXME: remove this
#define MEASURE_SYNC 8
///////////////////////// own type definitions ////////////////////////
@ -330,219 +328,63 @@ static int aio_submit_dummy(struct aio_output *output)
return res;
}
static int aio_submit_thread(void *data)
static
int aio_start_thread(struct aio_output *output, int i, int(*fn)(void*))
{
struct aio_threadinfo *tinfo = data;
struct aio_output *output = tinfo->output;
struct file *file = output->filp;
int err;
/* TODO: this is provisionary. We only need it for sys_io_submit().
* The latter should be accompanied by a future vfs_submit() or
* do_submit() which currently does not exist :(
* FIXME: corresponding cleanup NYI
*/
err = get_unused_fd();
MARS_INF("fd = %d\n", err);
if (unlikely(err < 0))
return err;
output->fd = err;
fd_install(err, output->filp);
static int index = 0;
struct aio_threadinfo *tinfo = &output->tinfo[i];
int j;
MARS_INF("kthread has started.\n");
//set_user_nice(current, -20);
use_fake_mm();
if (!current->mm)
return -ENOMEM;
while (!kthread_should_stop()) {
struct aio_mref_aspect *mref_a;
struct mref_object *mref;
int sleeptime;
int err;
wait_event_interruptible_timeout(
tinfo->event,
kthread_should_stop() ||
_dequeue(tinfo, false),
HZ);
mref_a = _dequeue(tinfo, true);
if (!mref_a) {
continue;
}
// check for reads exactly at EOF (special case)
mref = mref_a->object;
if (mref->ref_pos == mref->ref_total_size &&
!mref->ref_rw &&
mref->ref_timeout > 0) {
loff_t total_size = i_size_read(file->f_mapping->host);
loff_t len = total_size - mref->ref_pos;
if (len > 0) {
mref->ref_total_size = total_size;
mref->ref_len = len;
} else {
if (!mref_a->start_jiffies) {
mref_a->start_jiffies = jiffies;
}
if ((long long)jiffies - mref_a->start_jiffies <= mref->ref_timeout) {
if (!_dequeue(tinfo, false)) {
atomic_inc(&output->total_msleep_count);
msleep(1000 * 4 / HZ);
}
_enqueue(tinfo, mref_a, MARS_PRIO_LOW, true);
continue;
}
MARS_DBG("ENODATA %lld\n", len);
_complete(output, mref, -ENODATA);
continue;
}
}
sleeptime = 1000 / HZ;
for (;;) {
/* This is just a test. Don't use it for performance reasons.
*/
if (output->brick->wait_during_fdsync && mref->ref_rw != READ) {
if (output->fdsync_active) {
long long delay = 60 * HZ;
atomic_inc(&output->total_fdsync_wait_count);
__wait_event_interruptible_timeout(
output->fdsync_event,
!output->fdsync_active || kthread_should_stop(),
delay);
}
}
/* Now really do the work
*/
err = aio_submit(output, mref_a, false);
if (likely(err != -EAGAIN)) {
break;
}
atomic_inc(&output->total_delay_count);
msleep(sleeptime);
if (sleeptime < 100) {
sleeptime += 1000 / HZ;
}
}
if (unlikely(err < 0)) {
_complete(output, mref, err);
}
for (j = 0; j < MARS_PRIO_NR; j++) {
INIT_LIST_HEAD(&tinfo->mref_list[j]);
}
#if 1 // workaround for waking up the receiver thread. TODO: check whether signal handlong could do better.
aio_submit_dummy(output);
#endif
tinfo->terminated = true;
MARS_INF("kthread has stopped.\n");
unuse_fake_mm();
tinfo->output = output;
spin_lock_init(&tinfo->lock);
init_waitqueue_head(&tinfo->event);
tinfo->terminated = false;
tinfo->thread = kthread_create(fn, tinfo, "mars_aio%d", index++);
if (IS_ERR(tinfo->thread)) {
int err = PTR_ERR(tinfo->thread);
MARS_ERR("cannot create thread\n");
tinfo->thread = NULL;
return err;
}
get_task_struct(tinfo->thread);
wake_up_process(tinfo->thread);
return 0;
}
static int aio_event_thread(void *data)
static
void aio_stop_thread(struct aio_output *output, int i, bool do_submit_dummy)
{
struct aio_threadinfo *tinfo = data;
struct aio_output *output = tinfo->output;
struct aio_threadinfo *other = &output->tinfo[2];
int err = -ENOMEM;
MARS_INF("kthread has started.\n");
//set_user_nice(current, -20);
struct aio_threadinfo *tinfo = &output->tinfo[i];
use_fake_mm();
if (!current->mm)
goto err;
if (tinfo->thread) {
MARS_INF("stopping thread %d ...\n", i);
kthread_stop_nowait(tinfo->thread);
#if 1
if (!output->ctxp) {
mm_segment_t oldfs;
if (!current->mm) {
MARS_ERR("mm = %p\n", current->mm);
err = -EINVAL;
goto err;
// workaround for waking up the receiver thread. TODO: check whether signal handlong could do better.
if (do_submit_dummy) {
MARS_INF("submitting dummy for wakeup...\n", i);
aio_submit_dummy(output);
}
oldfs = get_fs();
set_fs(get_ds());
err = sys_io_setup(MARS_MAX_AIO, &output->ctxp);
set_fs(oldfs);
if (unlikely(err))
goto err;
}
#endif
while (!kthread_should_stop()) {
mm_segment_t oldfs;
int count;
int bounced;
int i;
struct timespec timeout = {
.tv_sec = 10,
};
struct io_event events[MARS_MAX_AIO_READ];
oldfs = get_fs();
set_fs(get_ds());
/* TODO: don't timeout upon termination.
* Probably we should submit a dummy request.
*/
count = sys_io_getevents(output->ctxp, 1, MARS_MAX_AIO_READ, events, &timeout);
set_fs(oldfs);
//MARS_INF("count = %d\n", count);
bounced = 0;
for (i = 0; i < count; i++) {
struct aio_mref_aspect *mref_a = (void*)events[i].data;
struct mref_object *mref;
int err = events[i].res;
if (!mref_a) {
continue; // this was a dummy request
}
mref = mref_a->object;
MARS_IO("AIO done %p pos = %lld len = %d rw = %d\n", mref, mref->ref_pos, mref->ref_len, mref->ref_rw);
if (output->brick->o_fdsync
&& err >= 0
&& mref->ref_rw != READ
&& !mref->ref_skip_sync
&& !mref_a->resubmit++) {
// workaround for non-implemented AIO FSYNC operation
if (!output->filp->f_op->aio_fsync) {
mars_trace(mref, "aio_fsync");
_enqueue(other, mref_a, mref->ref_prio, true);
bounced++;
continue;
}
err = aio_submit(output, mref_a, true);
if (likely(err >= 0))
continue;
}
_complete(output, mref, err);
// wait for termination
MARS_INF("waiting for thread %d ...\n", i);
wait_event_interruptible_timeout(
tinfo->event,
tinfo->terminated,
(60 - i * 2) * HZ);
if (likely(tinfo->terminated)) {
//MARS_INF("finalizing thread %d ...\n", i);
//kthread_stop(tinfo->thread);
MARS_INF("thread %d finished.\n", i);
put_task_struct(tinfo->thread);
tinfo->thread = NULL;
} else {
MARS_ERR("thread %d did not terminate - leaving a zombie\n", i);
}
if (bounced)
wake_up_interruptible_all(&other->event);
}
err = 0;
err:
tinfo->terminated = true;
MARS_INF("kthread has stopped, err = %d\n", err);
unuse_fake_mm();
return err;
}
static
@ -683,9 +525,251 @@ int aio_sync_thread(void *data)
MARS_INF("kthread has stopped.\n");
tinfo->terminated = true;
wake_up_interruptible_all(&tinfo->event);
return 0;
}
static int aio_event_thread(void *data)
{
struct aio_threadinfo *tinfo = data;
struct aio_output *output = tinfo->output;
struct aio_threadinfo *other = &output->tinfo[2];
int err = -ENOMEM;
MARS_INF("kthread has started.\n");
//set_user_nice(current, -20);
use_fake_mm();
if (!current->mm)
goto err;
err = aio_start_thread(output, 2, aio_sync_thread);
if (unlikely(err < 0))
goto err;
while (!kthread_should_stop()) {
mm_segment_t oldfs;
int count;
int bounced;
int i;
struct timespec timeout = {
.tv_sec = 10,
};
struct io_event events[MARS_MAX_AIO_READ];
oldfs = get_fs();
set_fs(get_ds());
/* TODO: don't timeout upon termination.
* Probably we should submit a dummy request.
*/
count = sys_io_getevents(output->ctxp, 1, MARS_MAX_AIO_READ, events, &timeout);
set_fs(oldfs);
//MARS_INF("count = %d\n", count);
bounced = 0;
for (i = 0; i < count; i++) {
struct aio_mref_aspect *mref_a = (void*)events[i].data;
struct mref_object *mref;
int err = events[i].res;
if (!mref_a) {
continue; // this was a dummy request
}
mref = mref_a->object;
MARS_IO("AIO done %p pos = %lld len = %d rw = %d\n", mref, mref->ref_pos, mref->ref_len, mref->ref_rw);
if (output->brick->o_fdsync
&& err >= 0
&& mref->ref_rw != READ
&& !mref->ref_skip_sync
&& !mref_a->resubmit++) {
// workaround for non-implemented AIO FSYNC operation
if (!output->filp->f_op->aio_fsync) {
mars_trace(mref, "aio_fsync");
_enqueue(other, mref_a, mref->ref_prio, true);
bounced++;
continue;
}
err = aio_submit(output, mref_a, true);
if (likely(err >= 0))
continue;
}
_complete(output, mref, err);
}
if (bounced)
wake_up_interruptible_all(&other->event);
}
err = 0;
err:
MARS_INF("kthread has stopped, err = %d\n", err);
aio_stop_thread(output, 2, false);
unuse_fake_mm();
tinfo->terminated = true;
wake_up_interruptible_all(&tinfo->event);
return err;
}
static int aio_submit_thread(void *data)
{
struct aio_threadinfo *tinfo = data;
struct aio_output *output = tinfo->output;
struct file *file = output->filp;
int err;
/* TODO: this is provisionary. We only need it for sys_io_submit().
* The latter should be accompanied by a future vfs_submit() or
* do_submit() which currently does not exist :(
* FIXME: corresponding cleanup NYI
*/
err = get_unused_fd();
MARS_INF("fd = %d\n", err);
if (unlikely(err < 0))
goto done;
output->fd = err;
fd_install(err, output->filp);
MARS_INF("kthread has started.\n");
//set_user_nice(current, -20);
use_fake_mm();
err = -ENOMEM;
if (unlikely(!current->mm))
goto done;
#if 1
if (true) {
mm_segment_t oldfs;
if (!current->mm) {
MARS_ERR("mm = %p\n", current->mm);
err = -EINVAL;
goto done;
}
oldfs = get_fs();
set_fs(get_ds());
err = sys_io_setup(MARS_MAX_AIO, &output->ctxp);
set_fs(oldfs);
if (unlikely(err < 0))
goto done;
}
#endif
err = aio_start_thread(output, 1, aio_event_thread);
if (unlikely(err < 0))
goto done;
while (!kthread_should_stop()) {
struct aio_mref_aspect *mref_a;
struct mref_object *mref;
int sleeptime;
int err;
wait_event_interruptible_timeout(
tinfo->event,
kthread_should_stop() ||
_dequeue(tinfo, false),
HZ);
mref_a = _dequeue(tinfo, true);
if (!mref_a) {
continue;
}
// check for reads exactly at EOF (special case)
mref = mref_a->object;
if (mref->ref_pos == mref->ref_total_size &&
!mref->ref_rw &&
mref->ref_timeout > 0) {
loff_t total_size = i_size_read(file->f_mapping->host);
loff_t len = total_size - mref->ref_pos;
if (len > 0) {
mref->ref_total_size = total_size;
mref->ref_len = len;
} else {
if (!mref_a->start_jiffies) {
mref_a->start_jiffies = jiffies;
}
if ((long long)jiffies - mref_a->start_jiffies <= mref->ref_timeout) {
if (!_dequeue(tinfo, false)) {
atomic_inc(&output->total_msleep_count);
msleep(1000 * 4 / HZ);
}
_enqueue(tinfo, mref_a, MARS_PRIO_LOW, true);
continue;
}
MARS_DBG("ENODATA %lld\n", len);
_complete(output, mref, -ENODATA);
continue;
}
}
sleeptime = 1000 / HZ;
for (;;) {
/* This is just a test. Don't use it for performance reasons.
*/
if (output->brick->wait_during_fdsync && mref->ref_rw != READ) {
if (output->fdsync_active) {
long long delay = 60 * HZ;
atomic_inc(&output->total_fdsync_wait_count);
__wait_event_interruptible_timeout(
output->fdsync_event,
!output->fdsync_active || kthread_should_stop(),
delay);
}
}
/* Now really do the work
*/
err = aio_submit(output, mref_a, false);
if (likely(err != -EAGAIN)) {
break;
}
atomic_inc(&output->total_delay_count);
msleep(sleeptime);
if (sleeptime < 100) {
sleeptime += 1000 / HZ;
}
}
if (unlikely(err < 0)) {
_complete(output, mref, err);
}
}
MARS_INF("kthread has stopped.\n");
aio_stop_thread(output, 1, true);
#if 1
if (true) {
mm_segment_t oldfs;
MARS_INF("destroying ioctx.....\n");
oldfs = get_fs();
set_fs(get_ds());
sys_io_destroy(output->ctxp);
set_fs(oldfs);
output->ctxp = 0;
}
#endif
MARS_INF("destroying fd.....\n");
put_unused_fd(output->fd);
unuse_fake_mm();
err = 0;
done:
tinfo->terminated = true;
wake_up_interruptible_all(&tinfo->event);
return err;
}
static int aio_get_info(struct aio_output *output, struct mars_info *info)
{
struct file *file = output->filp;
@ -775,13 +859,11 @@ static int aio_brick_construct(struct aio_brick *brick)
static int aio_switch(struct aio_brick *brick)
{
static int index = 0;
struct aio_output *output = brick->outputs[0];
const char *path = output->brick->brick_name;
int flags = O_CREAT | O_RDWR | O_LARGEFILE;
int prot = 0600;
mm_segment_t oldfs;
int i;
int err = 0;
MARS_DBG("power.button = %d\n", brick->power.button);
@ -820,30 +902,9 @@ static int aio_switch(struct aio_brick *brick)
}
#endif
for (i = 0; i < 3; i++) {
static int (*fn[])(void*) = {
aio_submit_thread,
aio_event_thread,
aio_sync_thread,
};
struct aio_threadinfo *tinfo = &output->tinfo[i];
int j;
for (j = 0; j < MARS_PRIO_NR; j++) {
INIT_LIST_HEAD(&tinfo->mref_list[j]);
}
tinfo->output = output;
spin_lock_init(&tinfo->lock);
init_waitqueue_head(&tinfo->event);
tinfo->terminated = false;
tinfo->thread = kthread_create(fn[i], tinfo, "mars_aio%d", index++);
if (IS_ERR(tinfo->thread)) {
err = PTR_ERR(tinfo->thread);
MARS_ERR("cannot create thread\n");
tinfo->thread = NULL;
goto err;
}
wake_up_process(tinfo->thread);
}
err = aio_start_thread(output, 0, aio_submit_thread);
if (err < 0)
goto err;
MARS_INF("opened file '%s'\n", path);
mars_power_led_on((void*)brick, true);
@ -859,31 +920,8 @@ cleanup:
}
mars_power_led_on((void*)brick, false);
for (i = 2; i >= 0; i--) {
struct aio_threadinfo *tinfo = &output->tinfo[i];
if (tinfo->thread) {
MARS_INF("stopping thread %d ...\n", i);
kthread_stop_nowait(tinfo->thread);
}
}
for (i = 0; i < 3; i++) {
struct aio_threadinfo *tinfo = &output->tinfo[i];
if (tinfo->thread) {
// wait for termination
MARS_INF("waiting for thread %d ...\n", i);
wait_event_interruptible_timeout(
tinfo->event,
tinfo->terminated, 60 * HZ);
if (likely(tinfo->terminated)) {
MARS_INF("finalizing thread %d ...\n", i);
kthread_stop(tinfo->thread);
put_task_struct(tinfo->thread);
tinfo->thread = NULL;
} else {
MARS_ERR("thread %d did not terminate - leaving a zombie\n", i);
}
}
}
aio_stop_thread(output, 0, false);
mars_power_led_off((void*)brick,
(output->tinfo[0].thread == NULL &&
@ -895,12 +933,6 @@ cleanup:
filp_close(output->filp, NULL);
output->filp = NULL;
}
if (output->ctxp) {
#ifndef MEMLEAK // FIXME this crashes
sys_io_destroy(output->ctxp);
#endif
output->ctxp = 0;
}
}
MARS_DBG("switch off status = %d\n", err);
return err;

View File

@ -584,7 +584,8 @@ int __init init_mars_server(void)
int status;
MARS_INF("init_server()\n");
#if 0
status = mars_create_sockaddr(&sockaddr, "");
if (status < 0)
return status;
@ -602,8 +603,10 @@ int __init init_mars_server(void)
return PTR_ERR(thread);
}
get_task_struct(thread);
server_thread = thread;
wake_up_process(thread);
#endif
return server_register_brick_type();
}
@ -618,10 +621,12 @@ void __exit exit_mars_server(void)
}
MARS_INF("stopping thread...\n");
kthread_stop(server_thread);
if (server_socket && !server_thread) {
if (server_socket) {
//sock_release(server_socket);
server_socket = NULL;
}
put_task_struct(server_thread);
server_thread = NULL;
}
}

View File

@ -98,8 +98,10 @@ int __init init_mars_proc(void)
{
MARS_INF("init_proc()\n");
#if 0
header = register_sysctl_table(mars_table);
#endif
return 0;
}