From b45c66b7e45f47d9586ea160bfd60985f3f7a5d7 Mon Sep 17 00:00:00 2001 From: Thomas Schoebel-Theuer Date: Thu, 22 Apr 2021 10:55:03 +0200 Subject: [PATCH] client: fix completion fix --- kernel/mars_client.c | 43 ++++++++++++++----------------------------- 1 file changed, 14 insertions(+), 29 deletions(-) diff --git a/kernel/mars_client.c b/kernel/mars_client.c index 6801da49..3ebca37d 100644 --- a/kernel/mars_client.c +++ b/kernel/mars_client.c @@ -475,13 +475,17 @@ static void client_ref_put(struct client_output *output, struct mref_object *mre } static -void _hash_insert(struct client_output *output, struct client_mref_aspect *mref_a) +void _hash_insert(struct client_output *output, + struct client_mref_aspect *mref_a, + bool refresh_completed) { struct mref_object *mref = mref_a->object; int ref_id; unsigned int hash_index; mutex_lock(&output->mutex); + if (refresh_completed) + mref_a->has_completed = false; list_del(&mref_a->io_head); list_add_tail(&mref_a->io_head, &output->mref_list); list_del(&mref_a->hash_head); @@ -526,10 +530,9 @@ static void client_ref_io(struct client_output *output, struct mref_object *mref atomic_inc(&brick->fly_count); _mref_get(mref); - mref_a->has_completed = false; mref_a->submit_jiffies = jiffies; - _hash_insert(output, mref_a); + _hash_insert(output, mref_a, true); MARS_IO("added request id = %d pos = %lld len = %d flags = %ux (flying = %d)\n", mref->ref_id, mref->ref_pos, mref->ref_len, mref->ref_flags, @@ -671,7 +674,7 @@ int receiver_thread(void *data) status); if (had_completed) goto has_finished; - _hash_insert(output, mref_a); + _hash_insert(output, mref_a, true); goto done; } @@ -743,7 +746,6 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, int *ro struct list_head *tmp; struct list_head *prev; LIST_HEAD(tmp_list); - LIST_HEAD(completed_list); long io_timeout = _compute_timeout(brick, false); int i; @@ -782,15 +784,14 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, int *ro !time_is_before_jiffies(mref_a->submit_jiffies + io_timeout)) { break; } + /* Race compensation: skip already completed requests */ + if (mref_a->has_completed) + continue; list_del_init(&mref_a->hash_head); list_del_init(&mref_a->io_head); - if (mref_a->has_completed) { - list_add_tail(&mref_a->tmp_head, &completed_list); - } else { - mref_a->has_completed = true; - list_add_tail(&mref_a->tmp_head, &tmp_list); - } + mref_a->has_completed = true; + list_add_tail(&mref_a->tmp_head, &tmp_list); } mutex_unlock(&output->mutex); @@ -821,22 +822,6 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, int *ro atomic_dec(&brick->fly_count); atomic_dec(&mars_global_io_flying); } - - while (!list_empty(&completed_list)) { - struct client_mref_aspect *mref_a; - struct mref_object *mref; - - tmp = completed_list.next; - list_del_init(tmp); - mref_a = container_of(tmp, struct client_mref_aspect, tmp_head); - mref = mref_a->object; - - client_ref_put(output, mref); - - atomic_dec(&brick->fly_count); - atomic_dec(&mars_global_io_flying); - } - } static @@ -969,7 +954,7 @@ static int sender_thread(void *data) ch = _get_channel(bundle, min_nr, max_nr); if (unlikely(!ch)) { // notice: this will re-assign hash_head without harm - _hash_insert(output, mref_a); + _hash_insert(output, mref_a, false); brick_msleep(1000); continue; } @@ -990,7 +975,7 @@ static int sender_thread(void *data) status = mars_send_mref(&ch->socket, mref, cork); old_cork = cork; if (unlikely(status < 0)) { - _hash_insert(output, mref_a); + _hash_insert(output, mref_a, false); do_timeout = true; ch = NULL; // retry submission on next occasion..