client: simplify timeout code

This commit is contained in:
Thomas Schoebel-Theuer 2023-05-16 09:20:52 +02:00
parent 63a16833a3
commit ada1544100

View File

@ -109,6 +109,7 @@ void _do_resubmit(struct client_channel *ch)
{
struct client_output *output = ch->output;
mb();
mutex_lock(&output->mutex);
if (!list_empty(&ch->wait_list)) {
struct list_head *first = READ_ONCE(ch->wait_list.next);
@ -121,6 +122,7 @@ void _do_resubmit(struct client_channel *ch)
list_connect(last, old_start);
INIT_LIST_HEAD(&ch->wait_list);
}
mb();
mutex_unlock(&output->mutex);
}
@ -586,6 +588,7 @@ void _hash_insert(struct client_output *output,
mref_a->has_completed = false;
__io_insert(output, mref_a);
__hash_insert(output, mref_a);
mb();
mutex_unlock(&output->mutex);
}
@ -739,6 +742,7 @@ int receiver_thread(void *data)
status = -EBADR;
goto done;
}
mb();
mutex_unlock(&output->mutex);
if (unlikely(!mref || !mref_a)) {
@ -840,7 +844,6 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, int *ro
struct client_brick *brick = output->brick;
struct list_head *tmp;
struct list_head *prev;
LIST_HEAD(tmp_list);
long io_timeout = _compute_timeout(brick, false);
int i;
@ -874,6 +877,7 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, int *ro
tmp != anchor;
tmp = prev, prev = READ_ONCE(tmp->prev)) {
struct client_mref_aspect *mref_a;
struct mref_object *mref;
mref_a = container_of(tmp, struct client_mref_aspect, io_head);
@ -888,17 +892,7 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, int *ro
list_del_init(&mref_a->hash_head);
list_del_init(&mref_a->io_head);
mref_a->has_completed = true;
list_add_tail(&mref_a->tmp_head, &tmp_list);
}
mutex_unlock(&output->mutex);
while (!list_empty(&tmp_list)) {
struct client_mref_aspect *mref_a;
struct mref_object *mref;
tmp = READ_ONCE(tmp_list.next);
list_del_init(tmp);
mref_a = container_of(tmp, struct client_mref_aspect, tmp_head);
mref = mref_a->object;
if (unlikely(!(*rounds)++)) {
@ -919,6 +913,8 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, int *ro
atomic_dec(&brick->fly_count);
atomic_dec(&mars_global_io_flying);
}
mb();
mutex_unlock(&output->mutex);
}
static
@ -1006,6 +1002,7 @@ static int sender_thread(void *data)
/* Grab the next mref from the queue
*/
mb();
mutex_lock(&output->mutex);
tmp = READ_ONCE(output->mref_list.next);
if (tmp == &output->mref_list) {
@ -1017,6 +1014,7 @@ static int sender_thread(void *data)
list_del_init(tmp);
// notice: hash_head remains in its list!
cork = !list_empty(&output->mref_list);
mb();
mutex_unlock(&output->mutex);
mref_a = container_of(tmp, struct client_mref_aspect, io_head);
@ -1062,9 +1060,11 @@ static int sender_thread(void *data)
ch_skip = max_client_bulk;
}
mb();
mutex_lock(&output->mutex);
list_add_tail(tmp, &ch->wait_list);
// notice: hash_head is already there!
mb();
mutex_unlock(&output->mutex);
mref->ref_flags |= enabled_net_compressions;
@ -1211,7 +1211,6 @@ static int client_mref_aspect_init_fn(struct generic_aspect *_ini)
INIT_LIST_HEAD(&ini->io_head);
INIT_LIST_HEAD(&ini->hash_head);
INIT_LIST_HEAD(&ini->tmp_head);
return 0;
}
@ -1221,7 +1220,6 @@ static void client_mref_aspect_exit_fn(struct generic_aspect *_ini)
CHECK_HEAD_EMPTY(&ini->io_head);
CHECK_HEAD_EMPTY(&ini->hash_head);
CHECK_HEAD_EMPTY(&ini->tmp_head);
}
MARS_MAKE_STATICS(client);