diff --git a/Kconfig b/Kconfig index 7c7ca837..df3719a4 100644 --- a/Kconfig +++ b/Kconfig @@ -61,6 +61,16 @@ config MARS_FAST_TRIGGER ---help--- Normally ON. Switch off in case of endless trigger loops +config MARS_NETIO_TIMEOUT + int "timeout for remote IO operations (in seconds)" + depends on MARS + default 30 + ---help--- + In case of network hangs, don't wait forever, but rather + abort with -ENOTCONN + when == 0, wait forever (may lead to hanging operations + similar to NFS hard mounts) + config MARS_MEM_PREALLOC bool "avoid memory fragmentation by preallocation" depends on MARS diff --git a/mars_client.c b/mars_client.c index 05f7ff19..39d5d497 100644 --- a/mars_client.c +++ b/mars_client.c @@ -222,6 +222,7 @@ static void client_ref_io(struct client_output *output, struct mref_object *mref atomic_inc(&mref->ref_count); traced_lock(&output->lock, flags); + mref_a->submit_jiffies = jiffies; mref->ref_id = ++output->last_id; list_add_tail(&mref_a->io_head, &output->mref_list); traced_unlock(&output->lock, flags); @@ -289,7 +290,6 @@ int receiver_thread(void *data) } if (tmp_mref->ref_id == cmd.cmd_int1) { mref = tmp_mref; - list_del_init(&mref_a->hash_head); break; } } @@ -307,12 +307,13 @@ int receiver_thread(void *data) MARS_IO("new status = %d, pos = %lld len = %d rw = %d\n", status, mref->ref_pos, mref->ref_len, mref->ref_rw); if (status < 0) { MARS_WRN("interrupted data transfer during callback, status = %d\n", status); - traced_lock(&output->hash_lock[hash_index], flags); - list_add_tail(&mref_a->hash_head, &output->hash_table[hash_index]); - traced_unlock(&output->hash_lock[hash_index], flags); goto done; } + traced_lock(&output->hash_lock[hash_index], flags); + list_del_init(&mref_a->hash_head); + traced_unlock(&output->hash_lock[hash_index], flags); + traced_lock(&output->lock, flags); list_del_init(&mref_a->io_head); traced_unlock(&output->lock, flags); @@ -365,6 +366,49 @@ void _do_resubmit(struct client_output *output) } } +static +void _do_timeout(struct client_output *output, struct list_head *anchor, bool force) +{ + struct client_brick *brick = output->brick; + + while (!list_empty(anchor)) { + struct list_head *tmp; + struct client_mref_aspect *mref_a; + struct mref_object *mref; + int hash_index; + unsigned long flags; + + traced_lock(&output->lock, flags); + tmp = anchor->next; + traced_unlock(&output->lock, flags); + + mref_a = container_of(tmp, struct client_mref_aspect, io_head); + mref = mref_a->object; + + if (!force && + (brick->io_timeout <= 0 || !time_is_before_jiffies(mref_a->submit_jiffies + brick->io_timeout * HZ))) { + break; + } + + MARS_DBG("signalling IO error at pos = %lld len = %d\n", mref->ref_pos, mref->ref_len); + atomic_inc(&output->timeout_count); + + hash_index = mref->ref_id % CLIENT_HASH_MAX; + + traced_lock(&output->hash_lock[hash_index], flags); + list_del_init(&mref_a->hash_head); + traced_unlock(&output->hash_lock[hash_index], flags); + + traced_lock(&output->lock, flags); + list_del_init(&mref_a->io_head); + traced_unlock(&output->lock, flags); + + SIMPLE_CALLBACK(mref, -ENOTCONN); + client_ref_put(output, mref); + atomic_dec(&output->fly_count); + } +} + static int sender_thread(void *data) { struct client_output *output = data; @@ -376,7 +420,7 @@ static int sender_thread(void *data) output->receiver.restart_count = 0; while (!kthread_should_stop()) { - struct list_head *tmp; + struct list_head *tmp = NULL; struct client_mref_aspect *mref_a; struct mref_object *mref; bool do_resubmit = false; @@ -386,10 +430,13 @@ static int sender_thread(void *data) do_kill = false; _kill_socket(output); } + status = _connect(output, brick->brick_name); MARS_IO("connect status = %d\n", status); if (unlikely(status < 0)) { - msleep(5000); + _do_timeout(output, &output->wait_list, false); + _do_timeout(output, &output->mref_list, false); + msleep(3000); continue; } do_kill = true; @@ -411,6 +458,9 @@ static int sender_thread(void *data) status = _request_info(output); if (status >= 0) { output->get_info = false; + } else { + MARS_WRN("cannot get info, status = %d\n", status); + msleep(1000); } } @@ -419,8 +469,6 @@ static int sender_thread(void *data) traced_lock(&output->lock, flags); tmp = output->mref_list.next; - list_del(tmp); - list_add(tmp, &output->wait_list); traced_unlock(&output->lock, flags); mref_a = container_of(tmp, struct client_mref_aspect, io_head); @@ -432,11 +480,6 @@ static int sender_thread(void *data) MARS_IO("status = %d\n", status); if (unlikely(status < 0)) { // retry submission on next occasion.. - traced_lock(&output->lock, flags); - list_del(&mref_a->io_head); - list_add(&mref_a->io_head, &output->mref_list); - traced_unlock(&output->lock, flags); - MARS_WRN("sending failed, status = %d\n", status); if (do_kill) { @@ -445,6 +488,12 @@ static int sender_thread(void *data) } continue; } + + // all ok, remember in-flight mrefs + traced_lock(&output->lock, flags); + list_del(tmp); + list_add(tmp, &output->wait_list); + traced_unlock(&output->lock, flags); } //done: if (status < 0) { @@ -457,27 +506,11 @@ static int sender_thread(void *data) /* Signal error on all pending IO requests. * We have no other chance (except probably delaying - * this until destruction which mostly is not what + * this until destruction which is probably not what * we want). */ - traced_lock(&output->lock, flags); - _do_resubmit(output); - while (!list_empty(&output->mref_list)) { - struct list_head *tmp = output->mref_list.next; - struct client_mref_aspect *mref_a; - struct mref_object *mref; - - list_del_init(tmp); - traced_unlock(&output->lock, flags); - mref_a = container_of(tmp, struct client_mref_aspect, io_head); - mref = mref_a->object; - MARS_DBG("signalling IO error at pos = %lld len = %d\n", mref->ref_pos, mref->ref_len); - atomic_dec(&output->fly_count); - SIMPLE_CALLBACK(mref, -ENOTCONN); - client_ref_put(output, mref); - traced_lock(&output->lock, flags); - } - traced_unlock(&output->lock, flags); + _do_timeout(output, &output->wait_list, true); + _do_timeout(output, &output->mref_list, true); output->sender.terminated = true; wake_up_interruptible(&output->sender.run_event); @@ -527,20 +560,23 @@ static char *client_statistics(struct client_brick *brick, int verbose) { struct client_output *output = brick->outputs[0]; - char *res = brick_string_alloc(0); + char *res = brick_string_alloc(1024); if (!res) return NULL; - snprintf(res, 512, "#%d socket fly_count = %d\n", - output->socket.s_debug_nr, - atomic_read(&output->fly_count)); - + snprintf(res, 1024, + "#%d socket max_flying = %d io_timeout = %d | timeout_count = %d fly_count = %d\n", + output->socket.s_debug_nr, brick->max_flying, brick->io_timeout, + atomic_read(&output->timeout_count), atomic_read(&output->fly_count)); + return res; } static void client_reset_statistics(struct client_brick *brick) { + struct client_output *output = brick->outputs[0]; + atomic_set(&output->timeout_count, 0); } //////////////// object / aspect constructors / destructors /////////////// diff --git a/mars_client.h b/mars_client.h index 98eefdfb..d22c44bb 100644 --- a/mars_client.h +++ b/mars_client.h @@ -10,6 +10,7 @@ struct client_mref_aspect { GENERIC_ASPECT(mref); struct list_head io_head; struct list_head hash_head; + unsigned long submit_jiffies; int alloc_len; bool do_dealloc; }; @@ -18,6 +19,7 @@ struct client_brick { MARS_BRICK(client); // tunables int max_flying; // limit on parallelism + int io_timeout; // > 0: report IO errors after timeout (in seconds) }; struct client_input { @@ -34,6 +36,7 @@ struct client_threadinfo { struct client_output { MARS_OUTPUT(client); atomic_t fly_count; + atomic_t timeout_count; spinlock_t lock; struct list_head mref_list; struct list_head wait_list; diff --git a/mars_copy.c b/mars_copy.c index 110cad74..28a3a7cf 100644 --- a/mars_copy.c +++ b/mars_copy.c @@ -96,6 +96,34 @@ int _determine_input(struct copy_brick *brick, struct mref_object *mref) #define GET_INDEX(pos) (((pos) / COPY_CHUNK) % MAX_COPY_PARA) #define GET_OFFSET(pos) ((pos) % COPY_CHUNK) +static +void __clear_mref(struct copy_brick *brick, struct mref_object *mref, int queue) +{ + struct copy_input *input; + input = queue ? brick->inputs[INPUT_B_COPY] : brick->inputs[INPUT_A_COPY]; + GENERIC_INPUT_CALL(input, mref_put, mref); +} + +static +void _clear_mref(struct copy_brick *brick, int index, int queue) +{ + struct mref_object *mref = brick->st[index].table[queue]; + if (mref) { + __clear_mref(brick, mref, queue); + brick->st[index].table[queue] = NULL; + } +} + +static +void _clear_all_mref(struct copy_brick *brick) +{ + int i; + for (i = 0; i < MAX_COPY_PARA; i++) { + _clear_mref(brick, i, 0); + _clear_mref(brick, i, 1); + } +} + static void copy_endio(struct generic_callback *cb) { @@ -130,7 +158,7 @@ void copy_endio(struct generic_callback *cb) goto exit; } if (unlikely(cb->cb_error < 0)) { - MARS_DBG("IO error %d on index %d, old state = %d\n", cb->cb_error, index, st->state); + MARS_WRN("IO error %d on index %d, old state = %d\n", cb->cb_error, index, st->state); error = cb->cb_error; } else if (likely(!st->error)) { st->table[queue] = mref; @@ -140,6 +168,7 @@ exit: if (unlikely(error < 0)) { st->error = error; _clash(brick); + __clear_mref(brick, mref, queue); } st->active[queue] = false; atomic_dec(&brick->copy_flight); @@ -215,28 +244,6 @@ done: return status; } -static -void _clear_mref(struct copy_brick *brick, int index, int queue) -{ - struct mref_object *mref = brick->st[index].table[queue]; - if (mref) { - struct copy_input *input; - input = queue ? brick->inputs[INPUT_B_COPY] : brick->inputs[INPUT_A_COPY]; - GENERIC_INPUT_CALL(input, mref_put, mref); - brick->st[index].table[queue] = NULL; - } -} - -static -void _clear_all_mref(struct copy_brick *brick) -{ - int i; - for (i = 0; i < MAX_COPY_PARA; i++) { - _clear_mref(brick, i, 0); - _clear_mref(brick, i, 1); - } -} - static void _update_percent(struct copy_brick *brick) { @@ -281,6 +288,10 @@ int _next_state(struct copy_brick *brick, int index, loff_t pos) st->error = 0; if (brick->is_aborting || kthread_should_stop()) goto done; + + _clear_mref(brick, index, 1); + _clear_mref(brick, index, 0); + i = 0; next_state = COPY_STATE_READ1; if (brick->verify_mode) { @@ -377,7 +388,7 @@ int _next_state(struct copy_brick *brick, int index, loff_t pos) st->state = next_state; if (status < 0) { st->error = status; - MARS_ERR("status = %d\n", status); + MARS_WRN("status = %d\n", status); _clash(brick); } @@ -486,9 +497,10 @@ static int _copy_thread(void *data) if (unlikely(status < 0)) { brick->copy_error = status; if (brick->abort_mode && !brick->is_aborting) { - MARS_INF("IO error, terminating prematurely, status = %d\n", status); + MARS_WRN("IO error, terminating prematurely, status = %d\n", status); brick->is_aborting = true; } + MARS_WRN("IO error, status = %d\n", status); } msleep(10); // yield FIXME: remove this, use event handling for over/underflow } diff --git a/sy_old/mars_light.c b/sy_old/mars_light.c index b54e7566..35aba003 100644 --- a/sy_old/mars_light.c +++ b/sy_old/mars_light.c @@ -195,7 +195,8 @@ int _set_trans_params(struct mars_brick *_brick, void *private) static int _set_client_params(struct mars_brick *_brick, void *private) { - // currently no params + struct client_brick *client_brick = (void*)_brick; + client_brick->io_timeout = CONFIG_MARS_NETIO_TIMEOUT; MARS_INF("name = '%s' path = '%s'\n", _brick->brick_name, _brick->brick_path); return 1; }