mirror of
https://github.com/schoebel/mars
synced 2024-12-22 14:42:58 +00:00
fix network hang by timeout in client
This commit is contained in:
parent
ae12c25267
commit
18cf152941
10
Kconfig
10
Kconfig
@ -61,6 +61,16 @@ config MARS_FAST_TRIGGER
|
||||
---help---
|
||||
Normally ON. Switch off in case of endless trigger loops
|
||||
|
||||
config MARS_NETIO_TIMEOUT
|
||||
int "timeout for remote IO operations (in seconds)"
|
||||
depends on MARS
|
||||
default 30
|
||||
---help---
|
||||
In case of network hangs, don't wait forever, but rather
|
||||
abort with -ENOTCONN
|
||||
when == 0, wait forever (may lead to hanging operations
|
||||
similar to NFS hard mounts)
|
||||
|
||||
config MARS_MEM_PREALLOC
|
||||
bool "avoid memory fragmentation by preallocation"
|
||||
depends on MARS
|
||||
|
110
mars_client.c
110
mars_client.c
@ -222,6 +222,7 @@ static void client_ref_io(struct client_output *output, struct mref_object *mref
|
||||
atomic_inc(&mref->ref_count);
|
||||
|
||||
traced_lock(&output->lock, flags);
|
||||
mref_a->submit_jiffies = jiffies;
|
||||
mref->ref_id = ++output->last_id;
|
||||
list_add_tail(&mref_a->io_head, &output->mref_list);
|
||||
traced_unlock(&output->lock, flags);
|
||||
@ -289,7 +290,6 @@ int receiver_thread(void *data)
|
||||
}
|
||||
if (tmp_mref->ref_id == cmd.cmd_int1) {
|
||||
mref = tmp_mref;
|
||||
list_del_init(&mref_a->hash_head);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -307,12 +307,13 @@ int receiver_thread(void *data)
|
||||
MARS_IO("new status = %d, pos = %lld len = %d rw = %d\n", status, mref->ref_pos, mref->ref_len, mref->ref_rw);
|
||||
if (status < 0) {
|
||||
MARS_WRN("interrupted data transfer during callback, status = %d\n", status);
|
||||
traced_lock(&output->hash_lock[hash_index], flags);
|
||||
list_add_tail(&mref_a->hash_head, &output->hash_table[hash_index]);
|
||||
traced_unlock(&output->hash_lock[hash_index], flags);
|
||||
goto done;
|
||||
}
|
||||
|
||||
traced_lock(&output->hash_lock[hash_index], flags);
|
||||
list_del_init(&mref_a->hash_head);
|
||||
traced_unlock(&output->hash_lock[hash_index], flags);
|
||||
|
||||
traced_lock(&output->lock, flags);
|
||||
list_del_init(&mref_a->io_head);
|
||||
traced_unlock(&output->lock, flags);
|
||||
@ -365,6 +366,49 @@ void _do_resubmit(struct client_output *output)
|
||||
}
|
||||
}
|
||||
|
||||
static
|
||||
void _do_timeout(struct client_output *output, struct list_head *anchor, bool force)
|
||||
{
|
||||
struct client_brick *brick = output->brick;
|
||||
|
||||
while (!list_empty(anchor)) {
|
||||
struct list_head *tmp;
|
||||
struct client_mref_aspect *mref_a;
|
||||
struct mref_object *mref;
|
||||
int hash_index;
|
||||
unsigned long flags;
|
||||
|
||||
traced_lock(&output->lock, flags);
|
||||
tmp = anchor->next;
|
||||
traced_unlock(&output->lock, flags);
|
||||
|
||||
mref_a = container_of(tmp, struct client_mref_aspect, io_head);
|
||||
mref = mref_a->object;
|
||||
|
||||
if (!force &&
|
||||
(brick->io_timeout <= 0 || !time_is_before_jiffies(mref_a->submit_jiffies + brick->io_timeout * HZ))) {
|
||||
break;
|
||||
}
|
||||
|
||||
MARS_DBG("signalling IO error at pos = %lld len = %d\n", mref->ref_pos, mref->ref_len);
|
||||
atomic_inc(&output->timeout_count);
|
||||
|
||||
hash_index = mref->ref_id % CLIENT_HASH_MAX;
|
||||
|
||||
traced_lock(&output->hash_lock[hash_index], flags);
|
||||
list_del_init(&mref_a->hash_head);
|
||||
traced_unlock(&output->hash_lock[hash_index], flags);
|
||||
|
||||
traced_lock(&output->lock, flags);
|
||||
list_del_init(&mref_a->io_head);
|
||||
traced_unlock(&output->lock, flags);
|
||||
|
||||
SIMPLE_CALLBACK(mref, -ENOTCONN);
|
||||
client_ref_put(output, mref);
|
||||
atomic_dec(&output->fly_count);
|
||||
}
|
||||
}
|
||||
|
||||
static int sender_thread(void *data)
|
||||
{
|
||||
struct client_output *output = data;
|
||||
@ -376,7 +420,7 @@ static int sender_thread(void *data)
|
||||
output->receiver.restart_count = 0;
|
||||
|
||||
while (!kthread_should_stop()) {
|
||||
struct list_head *tmp;
|
||||
struct list_head *tmp = NULL;
|
||||
struct client_mref_aspect *mref_a;
|
||||
struct mref_object *mref;
|
||||
bool do_resubmit = false;
|
||||
@ -386,10 +430,13 @@ static int sender_thread(void *data)
|
||||
do_kill = false;
|
||||
_kill_socket(output);
|
||||
}
|
||||
|
||||
status = _connect(output, brick->brick_name);
|
||||
MARS_IO("connect status = %d\n", status);
|
||||
if (unlikely(status < 0)) {
|
||||
msleep(5000);
|
||||
_do_timeout(output, &output->wait_list, false);
|
||||
_do_timeout(output, &output->mref_list, false);
|
||||
msleep(3000);
|
||||
continue;
|
||||
}
|
||||
do_kill = true;
|
||||
@ -411,6 +458,9 @@ static int sender_thread(void *data)
|
||||
status = _request_info(output);
|
||||
if (status >= 0) {
|
||||
output->get_info = false;
|
||||
} else {
|
||||
MARS_WRN("cannot get info, status = %d\n", status);
|
||||
msleep(1000);
|
||||
}
|
||||
}
|
||||
|
||||
@ -419,8 +469,6 @@ static int sender_thread(void *data)
|
||||
|
||||
traced_lock(&output->lock, flags);
|
||||
tmp = output->mref_list.next;
|
||||
list_del(tmp);
|
||||
list_add(tmp, &output->wait_list);
|
||||
traced_unlock(&output->lock, flags);
|
||||
|
||||
mref_a = container_of(tmp, struct client_mref_aspect, io_head);
|
||||
@ -432,11 +480,6 @@ static int sender_thread(void *data)
|
||||
MARS_IO("status = %d\n", status);
|
||||
if (unlikely(status < 0)) {
|
||||
// retry submission on next occasion..
|
||||
traced_lock(&output->lock, flags);
|
||||
list_del(&mref_a->io_head);
|
||||
list_add(&mref_a->io_head, &output->mref_list);
|
||||
traced_unlock(&output->lock, flags);
|
||||
|
||||
MARS_WRN("sending failed, status = %d\n", status);
|
||||
|
||||
if (do_kill) {
|
||||
@ -445,6 +488,12 @@ static int sender_thread(void *data)
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// all ok, remember in-flight mrefs
|
||||
traced_lock(&output->lock, flags);
|
||||
list_del(tmp);
|
||||
list_add(tmp, &output->wait_list);
|
||||
traced_unlock(&output->lock, flags);
|
||||
}
|
||||
//done:
|
||||
if (status < 0) {
|
||||
@ -457,27 +506,11 @@ static int sender_thread(void *data)
|
||||
|
||||
/* Signal error on all pending IO requests.
|
||||
* We have no other chance (except probably delaying
|
||||
* this until destruction which mostly is not what
|
||||
* this until destruction which is probably not what
|
||||
* we want).
|
||||
*/
|
||||
traced_lock(&output->lock, flags);
|
||||
_do_resubmit(output);
|
||||
while (!list_empty(&output->mref_list)) {
|
||||
struct list_head *tmp = output->mref_list.next;
|
||||
struct client_mref_aspect *mref_a;
|
||||
struct mref_object *mref;
|
||||
|
||||
list_del_init(tmp);
|
||||
traced_unlock(&output->lock, flags);
|
||||
mref_a = container_of(tmp, struct client_mref_aspect, io_head);
|
||||
mref = mref_a->object;
|
||||
MARS_DBG("signalling IO error at pos = %lld len = %d\n", mref->ref_pos, mref->ref_len);
|
||||
atomic_dec(&output->fly_count);
|
||||
SIMPLE_CALLBACK(mref, -ENOTCONN);
|
||||
client_ref_put(output, mref);
|
||||
traced_lock(&output->lock, flags);
|
||||
}
|
||||
traced_unlock(&output->lock, flags);
|
||||
_do_timeout(output, &output->wait_list, true);
|
||||
_do_timeout(output, &output->mref_list, true);
|
||||
|
||||
output->sender.terminated = true;
|
||||
wake_up_interruptible(&output->sender.run_event);
|
||||
@ -527,20 +560,23 @@ static
|
||||
char *client_statistics(struct client_brick *brick, int verbose)
|
||||
{
|
||||
struct client_output *output = brick->outputs[0];
|
||||
char *res = brick_string_alloc(0);
|
||||
char *res = brick_string_alloc(1024);
|
||||
if (!res)
|
||||
return NULL;
|
||||
|
||||
snprintf(res, 512, "#%d socket fly_count = %d\n",
|
||||
output->socket.s_debug_nr,
|
||||
atomic_read(&output->fly_count));
|
||||
|
||||
snprintf(res, 1024,
|
||||
"#%d socket max_flying = %d io_timeout = %d | timeout_count = %d fly_count = %d\n",
|
||||
output->socket.s_debug_nr, brick->max_flying, brick->io_timeout,
|
||||
atomic_read(&output->timeout_count), atomic_read(&output->fly_count));
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
static
|
||||
void client_reset_statistics(struct client_brick *brick)
|
||||
{
|
||||
struct client_output *output = brick->outputs[0];
|
||||
atomic_set(&output->timeout_count, 0);
|
||||
}
|
||||
|
||||
//////////////// object / aspect constructors / destructors ///////////////
|
||||
|
@ -10,6 +10,7 @@ struct client_mref_aspect {
|
||||
GENERIC_ASPECT(mref);
|
||||
struct list_head io_head;
|
||||
struct list_head hash_head;
|
||||
unsigned long submit_jiffies;
|
||||
int alloc_len;
|
||||
bool do_dealloc;
|
||||
};
|
||||
@ -18,6 +19,7 @@ struct client_brick {
|
||||
MARS_BRICK(client);
|
||||
// tunables
|
||||
int max_flying; // limit on parallelism
|
||||
int io_timeout; // > 0: report IO errors after timeout (in seconds)
|
||||
};
|
||||
|
||||
struct client_input {
|
||||
@ -34,6 +36,7 @@ struct client_threadinfo {
|
||||
struct client_output {
|
||||
MARS_OUTPUT(client);
|
||||
atomic_t fly_count;
|
||||
atomic_t timeout_count;
|
||||
spinlock_t lock;
|
||||
struct list_head mref_list;
|
||||
struct list_head wait_list;
|
||||
|
62
mars_copy.c
62
mars_copy.c
@ -96,6 +96,34 @@ int _determine_input(struct copy_brick *brick, struct mref_object *mref)
|
||||
#define GET_INDEX(pos) (((pos) / COPY_CHUNK) % MAX_COPY_PARA)
|
||||
#define GET_OFFSET(pos) ((pos) % COPY_CHUNK)
|
||||
|
||||
static
|
||||
void __clear_mref(struct copy_brick *brick, struct mref_object *mref, int queue)
|
||||
{
|
||||
struct copy_input *input;
|
||||
input = queue ? brick->inputs[INPUT_B_COPY] : brick->inputs[INPUT_A_COPY];
|
||||
GENERIC_INPUT_CALL(input, mref_put, mref);
|
||||
}
|
||||
|
||||
static
|
||||
void _clear_mref(struct copy_brick *brick, int index, int queue)
|
||||
{
|
||||
struct mref_object *mref = brick->st[index].table[queue];
|
||||
if (mref) {
|
||||
__clear_mref(brick, mref, queue);
|
||||
brick->st[index].table[queue] = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static
|
||||
void _clear_all_mref(struct copy_brick *brick)
|
||||
{
|
||||
int i;
|
||||
for (i = 0; i < MAX_COPY_PARA; i++) {
|
||||
_clear_mref(brick, i, 0);
|
||||
_clear_mref(brick, i, 1);
|
||||
}
|
||||
}
|
||||
|
||||
static
|
||||
void copy_endio(struct generic_callback *cb)
|
||||
{
|
||||
@ -130,7 +158,7 @@ void copy_endio(struct generic_callback *cb)
|
||||
goto exit;
|
||||
}
|
||||
if (unlikely(cb->cb_error < 0)) {
|
||||
MARS_DBG("IO error %d on index %d, old state = %d\n", cb->cb_error, index, st->state);
|
||||
MARS_WRN("IO error %d on index %d, old state = %d\n", cb->cb_error, index, st->state);
|
||||
error = cb->cb_error;
|
||||
} else if (likely(!st->error)) {
|
||||
st->table[queue] = mref;
|
||||
@ -140,6 +168,7 @@ exit:
|
||||
if (unlikely(error < 0)) {
|
||||
st->error = error;
|
||||
_clash(brick);
|
||||
__clear_mref(brick, mref, queue);
|
||||
}
|
||||
st->active[queue] = false;
|
||||
atomic_dec(&brick->copy_flight);
|
||||
@ -215,28 +244,6 @@ done:
|
||||
return status;
|
||||
}
|
||||
|
||||
static
|
||||
void _clear_mref(struct copy_brick *brick, int index, int queue)
|
||||
{
|
||||
struct mref_object *mref = brick->st[index].table[queue];
|
||||
if (mref) {
|
||||
struct copy_input *input;
|
||||
input = queue ? brick->inputs[INPUT_B_COPY] : brick->inputs[INPUT_A_COPY];
|
||||
GENERIC_INPUT_CALL(input, mref_put, mref);
|
||||
brick->st[index].table[queue] = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static
|
||||
void _clear_all_mref(struct copy_brick *brick)
|
||||
{
|
||||
int i;
|
||||
for (i = 0; i < MAX_COPY_PARA; i++) {
|
||||
_clear_mref(brick, i, 0);
|
||||
_clear_mref(brick, i, 1);
|
||||
}
|
||||
}
|
||||
|
||||
static
|
||||
void _update_percent(struct copy_brick *brick)
|
||||
{
|
||||
@ -281,6 +288,10 @@ int _next_state(struct copy_brick *brick, int index, loff_t pos)
|
||||
st->error = 0;
|
||||
if (brick->is_aborting || kthread_should_stop())
|
||||
goto done;
|
||||
|
||||
_clear_mref(brick, index, 1);
|
||||
_clear_mref(brick, index, 0);
|
||||
|
||||
i = 0;
|
||||
next_state = COPY_STATE_READ1;
|
||||
if (brick->verify_mode) {
|
||||
@ -377,7 +388,7 @@ int _next_state(struct copy_brick *brick, int index, loff_t pos)
|
||||
st->state = next_state;
|
||||
if (status < 0) {
|
||||
st->error = status;
|
||||
MARS_ERR("status = %d\n", status);
|
||||
MARS_WRN("status = %d\n", status);
|
||||
_clash(brick);
|
||||
}
|
||||
|
||||
@ -486,9 +497,10 @@ static int _copy_thread(void *data)
|
||||
if (unlikely(status < 0)) {
|
||||
brick->copy_error = status;
|
||||
if (brick->abort_mode && !brick->is_aborting) {
|
||||
MARS_INF("IO error, terminating prematurely, status = %d\n", status);
|
||||
MARS_WRN("IO error, terminating prematurely, status = %d\n", status);
|
||||
brick->is_aborting = true;
|
||||
}
|
||||
MARS_WRN("IO error, status = %d\n", status);
|
||||
}
|
||||
msleep(10); // yield FIXME: remove this, use event handling for over/underflow
|
||||
}
|
||||
|
@ -195,7 +195,8 @@ int _set_trans_params(struct mars_brick *_brick, void *private)
|
||||
static
|
||||
int _set_client_params(struct mars_brick *_brick, void *private)
|
||||
{
|
||||
// currently no params
|
||||
struct client_brick *client_brick = (void*)_brick;
|
||||
client_brick->io_timeout = CONFIG_MARS_NETIO_TIMEOUT;
|
||||
MARS_INF("name = '%s' path = '%s'\n", _brick->brick_name, _brick->brick_path);
|
||||
return 1;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user