diff --git a/mars_aio.c b/mars_aio.c index 43c96be7..e4ff24ff 100644 --- a/mars_aio.c +++ b/mars_aio.c @@ -20,8 +20,6 @@ #define MARS_MAX_AIO 1024 #define MARS_MAX_AIO_READ 32 -#define STRONG_MM -#define MEMLEAK // FIXME: remove this #define MEASURE_SYNC 8 ///////////////////////// own type definitions //////////////////////// @@ -330,219 +328,63 @@ static int aio_submit_dummy(struct aio_output *output) return res; } -static int aio_submit_thread(void *data) +static +int aio_start_thread(struct aio_output *output, int i, int(*fn)(void*)) { - struct aio_threadinfo *tinfo = data; - struct aio_output *output = tinfo->output; - struct file *file = output->filp; - int err; - - /* TODO: this is provisionary. We only need it for sys_io_submit(). - * The latter should be accompanied by a future vfs_submit() or - * do_submit() which currently does not exist :( - * FIXME: corresponding cleanup NYI - */ - err = get_unused_fd(); - MARS_INF("fd = %d\n", err); - if (unlikely(err < 0)) - return err; - output->fd = err; - fd_install(err, output->filp); + static int index = 0; + struct aio_threadinfo *tinfo = &output->tinfo[i]; + int j; - MARS_INF("kthread has started.\n"); - //set_user_nice(current, -20); - - use_fake_mm(); - - if (!current->mm) - return -ENOMEM; - - while (!kthread_should_stop()) { - struct aio_mref_aspect *mref_a; - struct mref_object *mref; - int sleeptime; - int err; - - wait_event_interruptible_timeout( - tinfo->event, - kthread_should_stop() || - _dequeue(tinfo, false), - HZ); - - mref_a = _dequeue(tinfo, true); - if (!mref_a) { - continue; - } - - // check for reads exactly at EOF (special case) - mref = mref_a->object; - if (mref->ref_pos == mref->ref_total_size && - !mref->ref_rw && - mref->ref_timeout > 0) { - loff_t total_size = i_size_read(file->f_mapping->host); - loff_t len = total_size - mref->ref_pos; - if (len > 0) { - mref->ref_total_size = total_size; - mref->ref_len = len; - } else { - if (!mref_a->start_jiffies) { - mref_a->start_jiffies = jiffies; - } - if ((long long)jiffies - mref_a->start_jiffies <= mref->ref_timeout) { - if (!_dequeue(tinfo, false)) { - atomic_inc(&output->total_msleep_count); - msleep(1000 * 4 / HZ); - } - _enqueue(tinfo, mref_a, MARS_PRIO_LOW, true); - continue; - } - MARS_DBG("ENODATA %lld\n", len); - _complete(output, mref, -ENODATA); - continue; - } - } - - sleeptime = 1000 / HZ; - for (;;) { - /* This is just a test. Don't use it for performance reasons. - */ - if (output->brick->wait_during_fdsync && mref->ref_rw != READ) { - if (output->fdsync_active) { - long long delay = 60 * HZ; - atomic_inc(&output->total_fdsync_wait_count); - __wait_event_interruptible_timeout( - output->fdsync_event, - !output->fdsync_active || kthread_should_stop(), - delay); - } - - } - - /* Now really do the work - */ - err = aio_submit(output, mref_a, false); - - if (likely(err != -EAGAIN)) { - break; - } - atomic_inc(&output->total_delay_count); - msleep(sleeptime); - if (sleeptime < 100) { - sleeptime += 1000 / HZ; - } - } - if (unlikely(err < 0)) { - _complete(output, mref, err); - } + for (j = 0; j < MARS_PRIO_NR; j++) { + INIT_LIST_HEAD(&tinfo->mref_list[j]); } - -#if 1 // workaround for waking up the receiver thread. TODO: check whether signal handlong could do better. - aio_submit_dummy(output); -#endif - - tinfo->terminated = true; - MARS_INF("kthread has stopped.\n"); - - unuse_fake_mm(); - + tinfo->output = output; + spin_lock_init(&tinfo->lock); + init_waitqueue_head(&tinfo->event); + tinfo->terminated = false; + tinfo->thread = kthread_create(fn, tinfo, "mars_aio%d", index++); + if (IS_ERR(tinfo->thread)) { + int err = PTR_ERR(tinfo->thread); + MARS_ERR("cannot create thread\n"); + tinfo->thread = NULL; + return err; + } + get_task_struct(tinfo->thread); + wake_up_process(tinfo->thread); return 0; } -static int aio_event_thread(void *data) +static +void aio_stop_thread(struct aio_output *output, int i, bool do_submit_dummy) { - struct aio_threadinfo *tinfo = data; - struct aio_output *output = tinfo->output; - struct aio_threadinfo *other = &output->tinfo[2]; - int err = -ENOMEM; - - MARS_INF("kthread has started.\n"); - //set_user_nice(current, -20); + struct aio_threadinfo *tinfo = &output->tinfo[i]; - use_fake_mm(); - if (!current->mm) - goto err; + if (tinfo->thread) { + MARS_INF("stopping thread %d ...\n", i); + kthread_stop_nowait(tinfo->thread); -#if 1 - if (!output->ctxp) { - mm_segment_t oldfs; - if (!current->mm) { - MARS_ERR("mm = %p\n", current->mm); - err = -EINVAL; - goto err; + // workaround for waking up the receiver thread. TODO: check whether signal handlong could do better. + if (do_submit_dummy) { + MARS_INF("submitting dummy for wakeup...\n", i); + aio_submit_dummy(output); } - oldfs = get_fs(); - set_fs(get_ds()); - err = sys_io_setup(MARS_MAX_AIO, &output->ctxp); - set_fs(oldfs); - if (unlikely(err)) - goto err; - } -#endif - - while (!kthread_should_stop()) { - mm_segment_t oldfs; - int count; - int bounced; - int i; - struct timespec timeout = { - .tv_sec = 10, - }; - struct io_event events[MARS_MAX_AIO_READ]; - - oldfs = get_fs(); - set_fs(get_ds()); - /* TODO: don't timeout upon termination. - * Probably we should submit a dummy request. - */ - count = sys_io_getevents(output->ctxp, 1, MARS_MAX_AIO_READ, events, &timeout); - set_fs(oldfs); - - //MARS_INF("count = %d\n", count); - bounced = 0; - for (i = 0; i < count; i++) { - struct aio_mref_aspect *mref_a = (void*)events[i].data; - struct mref_object *mref; - int err = events[i].res; - - if (!mref_a) { - continue; // this was a dummy request - } - mref = mref_a->object; - - MARS_IO("AIO done %p pos = %lld len = %d rw = %d\n", mref, mref->ref_pos, mref->ref_len, mref->ref_rw); - - if (output->brick->o_fdsync - && err >= 0 - && mref->ref_rw != READ - && !mref->ref_skip_sync - && !mref_a->resubmit++) { - // workaround for non-implemented AIO FSYNC operation - if (!output->filp->f_op->aio_fsync) { - mars_trace(mref, "aio_fsync"); - _enqueue(other, mref_a, mref->ref_prio, true); - bounced++; - continue; - } - err = aio_submit(output, mref_a, true); - if (likely(err >= 0)) - continue; - } - - _complete(output, mref, err); + // wait for termination + MARS_INF("waiting for thread %d ...\n", i); + wait_event_interruptible_timeout( + tinfo->event, + tinfo->terminated, + (60 - i * 2) * HZ); + if (likely(tinfo->terminated)) { + //MARS_INF("finalizing thread %d ...\n", i); + //kthread_stop(tinfo->thread); + MARS_INF("thread %d finished.\n", i); + put_task_struct(tinfo->thread); + tinfo->thread = NULL; + } else { + MARS_ERR("thread %d did not terminate - leaving a zombie\n", i); } - if (bounced) - wake_up_interruptible_all(&other->event); } - err = 0; - - err: - tinfo->terminated = true; - MARS_INF("kthread has stopped, err = %d\n", err); - - unuse_fake_mm(); - - return err; } static @@ -683,9 +525,251 @@ int aio_sync_thread(void *data) MARS_INF("kthread has stopped.\n"); tinfo->terminated = true; + wake_up_interruptible_all(&tinfo->event); return 0; } +static int aio_event_thread(void *data) +{ + struct aio_threadinfo *tinfo = data; + struct aio_output *output = tinfo->output; + struct aio_threadinfo *other = &output->tinfo[2]; + int err = -ENOMEM; + + MARS_INF("kthread has started.\n"); + //set_user_nice(current, -20); + + use_fake_mm(); + if (!current->mm) + goto err; + + err = aio_start_thread(output, 2, aio_sync_thread); + if (unlikely(err < 0)) + goto err; + + while (!kthread_should_stop()) { + mm_segment_t oldfs; + int count; + int bounced; + int i; + struct timespec timeout = { + .tv_sec = 10, + }; + struct io_event events[MARS_MAX_AIO_READ]; + + oldfs = get_fs(); + set_fs(get_ds()); + /* TODO: don't timeout upon termination. + * Probably we should submit a dummy request. + */ + count = sys_io_getevents(output->ctxp, 1, MARS_MAX_AIO_READ, events, &timeout); + set_fs(oldfs); + + //MARS_INF("count = %d\n", count); + bounced = 0; + for (i = 0; i < count; i++) { + struct aio_mref_aspect *mref_a = (void*)events[i].data; + struct mref_object *mref; + int err = events[i].res; + + if (!mref_a) { + continue; // this was a dummy request + } + mref = mref_a->object; + + MARS_IO("AIO done %p pos = %lld len = %d rw = %d\n", mref, mref->ref_pos, mref->ref_len, mref->ref_rw); + + if (output->brick->o_fdsync + && err >= 0 + && mref->ref_rw != READ + && !mref->ref_skip_sync + && !mref_a->resubmit++) { + // workaround for non-implemented AIO FSYNC operation + if (!output->filp->f_op->aio_fsync) { + mars_trace(mref, "aio_fsync"); + _enqueue(other, mref_a, mref->ref_prio, true); + bounced++; + continue; + } + err = aio_submit(output, mref_a, true); + if (likely(err >= 0)) + continue; + } + + _complete(output, mref, err); + + } + if (bounced) + wake_up_interruptible_all(&other->event); + } + err = 0; + + err: + MARS_INF("kthread has stopped, err = %d\n", err); + + aio_stop_thread(output, 2, false); + + unuse_fake_mm(); + + tinfo->terminated = true; + wake_up_interruptible_all(&tinfo->event); + return err; +} + +static int aio_submit_thread(void *data) +{ + struct aio_threadinfo *tinfo = data; + struct aio_output *output = tinfo->output; + struct file *file = output->filp; + int err; + + /* TODO: this is provisionary. We only need it for sys_io_submit(). + * The latter should be accompanied by a future vfs_submit() or + * do_submit() which currently does not exist :( + * FIXME: corresponding cleanup NYI + */ + err = get_unused_fd(); + MARS_INF("fd = %d\n", err); + if (unlikely(err < 0)) + goto done; + output->fd = err; + fd_install(err, output->filp); + + MARS_INF("kthread has started.\n"); + //set_user_nice(current, -20); + + use_fake_mm(); + + err = -ENOMEM; + if (unlikely(!current->mm)) + goto done; + +#if 1 + if (true) { + mm_segment_t oldfs; + if (!current->mm) { + MARS_ERR("mm = %p\n", current->mm); + err = -EINVAL; + goto done; + } + oldfs = get_fs(); + set_fs(get_ds()); + err = sys_io_setup(MARS_MAX_AIO, &output->ctxp); + set_fs(oldfs); + if (unlikely(err < 0)) + goto done; + } +#endif + + err = aio_start_thread(output, 1, aio_event_thread); + if (unlikely(err < 0)) + goto done; + + while (!kthread_should_stop()) { + struct aio_mref_aspect *mref_a; + struct mref_object *mref; + int sleeptime; + int err; + + wait_event_interruptible_timeout( + tinfo->event, + kthread_should_stop() || + _dequeue(tinfo, false), + HZ); + + mref_a = _dequeue(tinfo, true); + if (!mref_a) { + continue; + } + + // check for reads exactly at EOF (special case) + mref = mref_a->object; + if (mref->ref_pos == mref->ref_total_size && + !mref->ref_rw && + mref->ref_timeout > 0) { + loff_t total_size = i_size_read(file->f_mapping->host); + loff_t len = total_size - mref->ref_pos; + if (len > 0) { + mref->ref_total_size = total_size; + mref->ref_len = len; + } else { + if (!mref_a->start_jiffies) { + mref_a->start_jiffies = jiffies; + } + if ((long long)jiffies - mref_a->start_jiffies <= mref->ref_timeout) { + if (!_dequeue(tinfo, false)) { + atomic_inc(&output->total_msleep_count); + msleep(1000 * 4 / HZ); + } + _enqueue(tinfo, mref_a, MARS_PRIO_LOW, true); + continue; + } + MARS_DBG("ENODATA %lld\n", len); + _complete(output, mref, -ENODATA); + continue; + } + } + + sleeptime = 1000 / HZ; + for (;;) { + /* This is just a test. Don't use it for performance reasons. + */ + if (output->brick->wait_during_fdsync && mref->ref_rw != READ) { + if (output->fdsync_active) { + long long delay = 60 * HZ; + atomic_inc(&output->total_fdsync_wait_count); + __wait_event_interruptible_timeout( + output->fdsync_event, + !output->fdsync_active || kthread_should_stop(), + delay); + } + + } + + /* Now really do the work + */ + err = aio_submit(output, mref_a, false); + + if (likely(err != -EAGAIN)) { + break; + } + atomic_inc(&output->total_delay_count); + msleep(sleeptime); + if (sleeptime < 100) { + sleeptime += 1000 / HZ; + } + } + if (unlikely(err < 0)) { + _complete(output, mref, err); + } + } + + MARS_INF("kthread has stopped.\n"); + + aio_stop_thread(output, 1, true); + +#if 1 + if (true) { + mm_segment_t oldfs; + MARS_INF("destroying ioctx.....\n"); + oldfs = get_fs(); + set_fs(get_ds()); + sys_io_destroy(output->ctxp); + set_fs(oldfs); + output->ctxp = 0; + } +#endif + MARS_INF("destroying fd.....\n"); + put_unused_fd(output->fd); + unuse_fake_mm(); + err = 0; + +done: + tinfo->terminated = true; + wake_up_interruptible_all(&tinfo->event); + return err; +} + static int aio_get_info(struct aio_output *output, struct mars_info *info) { struct file *file = output->filp; @@ -775,13 +859,11 @@ static int aio_brick_construct(struct aio_brick *brick) static int aio_switch(struct aio_brick *brick) { - static int index = 0; struct aio_output *output = brick->outputs[0]; const char *path = output->brick->brick_name; int flags = O_CREAT | O_RDWR | O_LARGEFILE; int prot = 0600; mm_segment_t oldfs; - int i; int err = 0; MARS_DBG("power.button = %d\n", brick->power.button); @@ -820,30 +902,9 @@ static int aio_switch(struct aio_brick *brick) } #endif - for (i = 0; i < 3; i++) { - static int (*fn[])(void*) = { - aio_submit_thread, - aio_event_thread, - aio_sync_thread, - }; - struct aio_threadinfo *tinfo = &output->tinfo[i]; - int j; - for (j = 0; j < MARS_PRIO_NR; j++) { - INIT_LIST_HEAD(&tinfo->mref_list[j]); - } - tinfo->output = output; - spin_lock_init(&tinfo->lock); - init_waitqueue_head(&tinfo->event); - tinfo->terminated = false; - tinfo->thread = kthread_create(fn[i], tinfo, "mars_aio%d", index++); - if (IS_ERR(tinfo->thread)) { - err = PTR_ERR(tinfo->thread); - MARS_ERR("cannot create thread\n"); - tinfo->thread = NULL; - goto err; - } - wake_up_process(tinfo->thread); - } + err = aio_start_thread(output, 0, aio_submit_thread); + if (err < 0) + goto err; MARS_INF("opened file '%s'\n", path); mars_power_led_on((void*)brick, true); @@ -859,31 +920,8 @@ cleanup: } mars_power_led_on((void*)brick, false); - for (i = 2; i >= 0; i--) { - struct aio_threadinfo *tinfo = &output->tinfo[i]; - if (tinfo->thread) { - MARS_INF("stopping thread %d ...\n", i); - kthread_stop_nowait(tinfo->thread); - } - } - for (i = 0; i < 3; i++) { - struct aio_threadinfo *tinfo = &output->tinfo[i]; - if (tinfo->thread) { - // wait for termination - MARS_INF("waiting for thread %d ...\n", i); - wait_event_interruptible_timeout( - tinfo->event, - tinfo->terminated, 60 * HZ); - if (likely(tinfo->terminated)) { - MARS_INF("finalizing thread %d ...\n", i); - kthread_stop(tinfo->thread); - put_task_struct(tinfo->thread); - tinfo->thread = NULL; - } else { - MARS_ERR("thread %d did not terminate - leaving a zombie\n", i); - } - } - } + + aio_stop_thread(output, 0, false); mars_power_led_off((void*)brick, (output->tinfo[0].thread == NULL && @@ -895,12 +933,6 @@ cleanup: filp_close(output->filp, NULL); output->filp = NULL; } - if (output->ctxp) { -#ifndef MEMLEAK // FIXME this crashes - sys_io_destroy(output->ctxp); -#endif - output->ctxp = 0; - } } MARS_DBG("switch off status = %d\n", err); return err; diff --git a/mars_server.c b/mars_server.c index 08d32e0c..c63f5e58 100644 --- a/mars_server.c +++ b/mars_server.c @@ -584,7 +584,8 @@ int __init init_mars_server(void) int status; MARS_INF("init_server()\n"); - + +#if 0 status = mars_create_sockaddr(&sockaddr, ""); if (status < 0) return status; @@ -602,8 +603,10 @@ int __init init_mars_server(void) return PTR_ERR(thread); } + get_task_struct(thread); server_thread = thread; wake_up_process(thread); +#endif return server_register_brick_type(); } @@ -618,10 +621,12 @@ void __exit exit_mars_server(void) } MARS_INF("stopping thread...\n"); kthread_stop(server_thread); - if (server_socket && !server_thread) { + if (server_socket) { //sock_release(server_socket); server_socket = NULL; } + put_task_struct(server_thread); + server_thread = NULL; } } diff --git a/sy_old/mars_proc.c b/sy_old/mars_proc.c index 9b65e8c6..4c786d68 100644 --- a/sy_old/mars_proc.c +++ b/sy_old/mars_proc.c @@ -98,8 +98,10 @@ int __init init_mars_proc(void) { MARS_INF("init_proc()\n"); - + +#if 0 header = register_sysctl_table(mars_table); +#endif return 0; }