mirror of https://github.com/schoebel/mars
client: fix completion fix
This commit is contained in:
parent
128f94714c
commit
b45c66b7e4
|
@ -475,13 +475,17 @@ static void client_ref_put(struct client_output *output, struct mref_object *mre
|
|||
}
|
||||
|
||||
static
|
||||
void _hash_insert(struct client_output *output, struct client_mref_aspect *mref_a)
|
||||
void _hash_insert(struct client_output *output,
|
||||
struct client_mref_aspect *mref_a,
|
||||
bool refresh_completed)
|
||||
{
|
||||
struct mref_object *mref = mref_a->object;
|
||||
int ref_id;
|
||||
unsigned int hash_index;
|
||||
|
||||
mutex_lock(&output->mutex);
|
||||
if (refresh_completed)
|
||||
mref_a->has_completed = false;
|
||||
list_del(&mref_a->io_head);
|
||||
list_add_tail(&mref_a->io_head, &output->mref_list);
|
||||
list_del(&mref_a->hash_head);
|
||||
|
@ -526,10 +530,9 @@ static void client_ref_io(struct client_output *output, struct mref_object *mref
|
|||
atomic_inc(&brick->fly_count);
|
||||
_mref_get(mref);
|
||||
|
||||
mref_a->has_completed = false;
|
||||
mref_a->submit_jiffies = jiffies;
|
||||
|
||||
_hash_insert(output, mref_a);
|
||||
_hash_insert(output, mref_a, true);
|
||||
|
||||
MARS_IO("added request id = %d pos = %lld len = %d flags = %ux (flying = %d)\n",
|
||||
mref->ref_id, mref->ref_pos, mref->ref_len, mref->ref_flags,
|
||||
|
@ -671,7 +674,7 @@ int receiver_thread(void *data)
|
|||
status);
|
||||
if (had_completed)
|
||||
goto has_finished;
|
||||
_hash_insert(output, mref_a);
|
||||
_hash_insert(output, mref_a, true);
|
||||
goto done;
|
||||
}
|
||||
|
||||
|
@ -743,7 +746,6 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, int *ro
|
|||
struct list_head *tmp;
|
||||
struct list_head *prev;
|
||||
LIST_HEAD(tmp_list);
|
||||
LIST_HEAD(completed_list);
|
||||
long io_timeout = _compute_timeout(brick, false);
|
||||
int i;
|
||||
|
||||
|
@ -782,15 +784,14 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, int *ro
|
|||
!time_is_before_jiffies(mref_a->submit_jiffies + io_timeout)) {
|
||||
break;
|
||||
}
|
||||
/* Race compensation: skip already completed requests */
|
||||
if (mref_a->has_completed)
|
||||
continue;
|
||||
|
||||
list_del_init(&mref_a->hash_head);
|
||||
list_del_init(&mref_a->io_head);
|
||||
if (mref_a->has_completed) {
|
||||
list_add_tail(&mref_a->tmp_head, &completed_list);
|
||||
} else {
|
||||
mref_a->has_completed = true;
|
||||
list_add_tail(&mref_a->tmp_head, &tmp_list);
|
||||
}
|
||||
mref_a->has_completed = true;
|
||||
list_add_tail(&mref_a->tmp_head, &tmp_list);
|
||||
}
|
||||
mutex_unlock(&output->mutex);
|
||||
|
||||
|
@ -821,22 +822,6 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, int *ro
|
|||
atomic_dec(&brick->fly_count);
|
||||
atomic_dec(&mars_global_io_flying);
|
||||
}
|
||||
|
||||
while (!list_empty(&completed_list)) {
|
||||
struct client_mref_aspect *mref_a;
|
||||
struct mref_object *mref;
|
||||
|
||||
tmp = completed_list.next;
|
||||
list_del_init(tmp);
|
||||
mref_a = container_of(tmp, struct client_mref_aspect, tmp_head);
|
||||
mref = mref_a->object;
|
||||
|
||||
client_ref_put(output, mref);
|
||||
|
||||
atomic_dec(&brick->fly_count);
|
||||
atomic_dec(&mars_global_io_flying);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static
|
||||
|
@ -969,7 +954,7 @@ static int sender_thread(void *data)
|
|||
ch = _get_channel(bundle, min_nr, max_nr);
|
||||
if (unlikely(!ch)) {
|
||||
// notice: this will re-assign hash_head without harm
|
||||
_hash_insert(output, mref_a);
|
||||
_hash_insert(output, mref_a, false);
|
||||
brick_msleep(1000);
|
||||
continue;
|
||||
}
|
||||
|
@ -990,7 +975,7 @@ static int sender_thread(void *data)
|
|||
status = mars_send_mref(&ch->socket, mref, cork);
|
||||
old_cork = cork;
|
||||
if (unlikely(status < 0)) {
|
||||
_hash_insert(output, mref_a);
|
||||
_hash_insert(output, mref_a, false);
|
||||
do_timeout = true;
|
||||
ch = NULL;
|
||||
// retry submission on next occasion..
|
||||
|
|
Loading…
Reference in New Issue