From f98dd17aa42f25b0314bab300341c0a4f23f1474 Mon Sep 17 00:00:00 2001 From: Thomas Schoebel-Theuer Date: Wed, 21 Apr 2021 10:04:14 +0200 Subject: [PATCH] client: ensure that completion occurs exactly once --- kernel/mars_client.c | 40 ++++++++++++++++++++++++++++++++++++++-- kernel/mars_client.h | 1 + 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/kernel/mars_client.c b/kernel/mars_client.c index 1b74e581..6801da49 100644 --- a/kernel/mars_client.c +++ b/kernel/mars_client.c @@ -526,7 +526,9 @@ static void client_ref_io(struct client_output *output, struct mref_object *mref atomic_inc(&brick->fly_count); _mref_get(mref); + mref_a->has_completed = false; mref_a->submit_jiffies = jiffies; + _hash_insert(output, mref_a); MARS_IO("added request id = %d pos = %lld len = %d flags = %ux (flying = %d)\n", @@ -539,6 +541,7 @@ static void client_ref_io(struct client_output *output, struct mref_object *mref error: MARS_ERR("IO submission on dead instance\n"); + mref_a->has_completed = true; error = -ESHUTDOWN; SIMPLE_CALLBACK(mref, error); return; @@ -604,6 +607,7 @@ int receiver_thread(void *data) struct client_mref_aspect *mref_a = NULL; unsigned long id = READ_ONCE(cmd.cmd_int1); unsigned int hash_index = CLIENT_HASH_FN(id); + bool had_completed = false; mutex_lock(&output->mutex); for (tmp = output->hash_table[hash_index].next; tmp != &output->hash_table[hash_index]; tmp = tmp->next) { @@ -626,6 +630,11 @@ int receiver_thread(void *data) mref = tmp_mref; list_del_init(&mref_a->hash_head); list_del_init(&mref_a->io_head); + /* Networking produces inherent races between re-submission and + * completion. Compensate them here. + */ + had_completed = mref_a->has_completed; + mref_a->has_completed = true; break; err: @@ -660,6 +669,8 @@ int receiver_thread(void *data) output->bundle.path, output->bundle.host, status); + if (had_completed) + goto has_finished; _hash_insert(output, mref_a); goto done; } @@ -667,8 +678,11 @@ int receiver_thread(void *data) if (mref->_object_cb.cb_error < 0) { MARS_DBG("ERROR %d\n", mref->_object_cb.cb_error); } - SIMPLE_CALLBACK(mref, mref->_object_cb.cb_error); + if (!had_completed) { + SIMPLE_CALLBACK(mref, mref->_object_cb.cb_error); + } + has_finished: client_ref_put(output, mref); atomic_dec(&output->brick->fly_count); @@ -729,6 +743,7 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, int *ro struct list_head *tmp; struct list_head *prev; LIST_HEAD(tmp_list); + LIST_HEAD(completed_list); long io_timeout = _compute_timeout(brick, false); int i; @@ -770,7 +785,12 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, int *ro list_del_init(&mref_a->hash_head); list_del_init(&mref_a->io_head); - list_add_tail(&mref_a->tmp_head, &tmp_list); + if (mref_a->has_completed) { + list_add_tail(&mref_a->tmp_head, &completed_list); + } else { + mref_a->has_completed = true; + list_add_tail(&mref_a->tmp_head, &tmp_list); + } } mutex_unlock(&output->mutex); @@ -801,6 +821,22 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, int *ro atomic_dec(&brick->fly_count); atomic_dec(&mars_global_io_flying); } + + while (!list_empty(&completed_list)) { + struct client_mref_aspect *mref_a; + struct mref_object *mref; + + tmp = completed_list.next; + list_del_init(tmp); + mref_a = container_of(tmp, struct client_mref_aspect, tmp_head); + mref = mref_a->object; + + client_ref_put(output, mref); + + atomic_dec(&brick->fly_count); + atomic_dec(&mars_global_io_flying); + } + } static diff --git a/kernel/mars_client.h b/kernel/mars_client.h index 46dba237..2d0994ac 100644 --- a/kernel/mars_client.h +++ b/kernel/mars_client.h @@ -44,6 +44,7 @@ struct client_mref_aspect { unsigned long submit_jiffies; int alloc_len; bool do_dealloc; + bool has_completed; }; struct client_brick {