diff --git a/mars_client.c b/mars_client.c index a5af1954..247b5e51 100644 --- a/mars_client.c +++ b/mars_client.c @@ -357,10 +357,26 @@ int receiver_thread(void *data) return status; } +static +void _do_resubmit(struct client_output *output) +{ + if (!list_empty(&output->wait_list)) { + struct list_head *first = output->wait_list.next; + struct list_head *last = output->wait_list.prev; + struct list_head *old_start = output->mref_list.next; +#define list_connect __list_del // the original routine has a misleading name: in reality it is more general + list_connect(&output->mref_list, first); + list_connect(last, old_start); + INIT_LIST_HEAD(&output->wait_list); + MARS_IO("done re-submit %p %p\n", first, last); + } +} + static int sender_thread(void *data) { struct client_output *output = data; struct client_brick *brick = output->brick; + unsigned long flags; int status = 0; output->receiver.restart_count = 0; @@ -369,7 +385,6 @@ static int sender_thread(void *data) struct list_head *tmp; struct client_mref_aspect *mref_a; struct mref_object *mref; - unsigned long flags; bool do_resubmit = false; if (unlikely(!output->socket)) { @@ -387,20 +402,11 @@ static int sender_thread(void *data) */ MARS_IO("re-submit\n"); traced_lock(&output->lock, flags); - if (!list_empty(&output->wait_list)) { - struct list_head *first = output->wait_list.next; - struct list_head *last = output->wait_list.prev; - struct list_head *old_start = output->mref_list.next; -#define list_connect __list_del // the original routine has a misleading name: in reality it is more general - list_connect(&output->mref_list, first); - list_connect(last, old_start); - INIT_LIST_HEAD(&output->wait_list); - MARS_IO("done re-submit %p %p\n", first, last); - } + _do_resubmit(output); traced_unlock(&output->lock, flags); } - wait_event_interruptible_timeout(output->event, !list_empty(&output->mref_list) || output->get_info, 10 * HZ); + wait_event_interruptible_timeout(output->event, !list_empty(&output->mref_list) || output->get_info || kthread_should_stop(), 1 * HZ); if (output->get_info) { status = _request_info(output); @@ -446,8 +452,33 @@ static int sender_thread(void *data) _kill_socket(output); + /* Signal error on all pending IO requests. + * We have no other chance (except probably delaying + * this until destruction which mostly is not what + * we want). + */ + traced_lock(&output->lock, flags); + _do_resubmit(output); + while (!list_empty(&output->mref_list)) { + struct list_head *tmp = output->mref_list.next; + struct client_mref_aspect *mref_a; + struct mref_object *mref; + + list_del_init(tmp); + traced_unlock(&output->lock, flags); + mref_a = container_of(tmp, struct client_mref_aspect, io_head); + mref = mref_a->object; + MARS_DBG("signalling IO error at pos = %lld len = %d\n", mref->ref_pos, mref->ref_len); + atomic_dec(&output->fly_count); + SIMPLE_CALLBACK(mref, -ENOTCONN); + client_ref_put(output, mref); + traced_lock(&output->lock, flags); + } + traced_unlock(&output->lock, flags); + output->sender.terminated = true; wake_up_interruptible(&output->sender.run_event); + MARS_DBG("sender terminated\n"); return status; } diff --git a/mars_copy.c b/mars_copy.c index 14b824e9..f76f2f08 100644 --- a/mars_copy.c +++ b/mars_copy.c @@ -372,13 +372,13 @@ done: } static -void _run_copy(struct copy_brick *brick) +int _run_copy(struct copy_brick *brick) { int max; loff_t pos; loff_t limit = 0; short prev; - int status; + int res_status = 0; if (unlikely(_clear_clash(brick))) { int i; @@ -389,7 +389,7 @@ void _run_copy(struct copy_brick *brick) _clash(brick); MARS_DBG("re-clash\n"); msleep(100); - return; + return 0; } for (i = 0; i < MAX_COPY_PARA; i++) { _clear_mref(brick, i, 0); @@ -416,7 +416,9 @@ void _run_copy(struct copy_brick *brick) prev = index; // call the finite state automaton if (!st->active[0] && !st->active[1]) { + int status; status = _next_state(brick, index, pos); + MARS_IO("index = %d pos = %lld status 0 %d\n", index, pos, status); limit = pos; } } @@ -432,6 +434,7 @@ void _run_copy(struct copy_brick *brick) } st->state = COPY_STATE_START; if (unlikely(st->error < 0)) { + res_status = st->error; break; } count += st->len; @@ -446,6 +449,7 @@ void _run_copy(struct copy_brick *brick) _update_percent(brick); } } + return res_status; } static int _copy_thread(void *data) @@ -453,6 +457,7 @@ static int _copy_thread(void *data) struct copy_brick *brick = data; MARS_DBG("--------------- copy_thread %p starting\n", brick); + brick->copy_error = 0; mars_power_led_on((void*)brick, true); brick->trigger = true; @@ -460,18 +465,25 @@ static int _copy_thread(void *data) loff_t old_start = brick->copy_start; loff_t old_end = brick->copy_end; if (old_end > 0) { - _run_copy(brick); + int status = _run_copy(brick); + if (unlikely(status < 0)) { + brick->copy_error = status; + if (brick->abort_mode) { + MARS_INF("IO error, terminating prematurely, status = %d\n", status); + break; + } + } msleep(10); // yield FIXME: remove this, use event handling for over/underflow } wait_event_interruptible_timeout(brick->event, brick->trigger || brick->copy_start != old_start || brick->copy_end != old_end || kthread_should_stop(), - 5 * HZ); + 1 * HZ); brick->trigger = false; } - MARS_DBG("--------------- copy_thread terminating\n"); + MARS_DBG("--------------- copy_thread terminating (%d requests flying)\n", atomic_read(&brick->copy_flight)); wait_event_interruptible_timeout(brick->event, !atomic_read(&brick->copy_flight), 300 * HZ); mars_power_led_off((void*)brick, true); MARS_DBG("--------------- copy_thread done.\n"); @@ -545,7 +557,7 @@ static int copy_switch(struct copy_brick *brick) mars_power_led_on((void*)brick, false); if (brick->thread) { MARS_INF("stopping thread...\n"); - kthread_stop_nowait(brick->thread); + kthread_stop(brick->thread); put_task_struct(brick->thread); brick->thread = NULL; wake_up_interruptible(&brick->event); diff --git a/mars_copy.h b/mars_copy.h index f66e7d09..c5e45fad 100644 --- a/mars_copy.h +++ b/mars_copy.h @@ -48,8 +48,10 @@ struct copy_brick { int append_mode; // 1 = passively, 2 = actively bool verify_mode; bool utilize_mode; // utilize already copied data + bool abort_mode; // abort on IO error (default is retry forever) // readonly from outside loff_t copy_last; // current working position + int copy_error; bool low_dirty; // internal volatile bool trigger;