mirror of
https://github.com/schoebel/mars
synced 2025-02-03 13:51:45 +00:00
client: ensure that completion occurs exactly once
This commit is contained in:
parent
aef14d7011
commit
f98dd17aa4
@ -526,7 +526,9 @@ static void client_ref_io(struct client_output *output, struct mref_object *mref
|
|||||||
atomic_inc(&brick->fly_count);
|
atomic_inc(&brick->fly_count);
|
||||||
_mref_get(mref);
|
_mref_get(mref);
|
||||||
|
|
||||||
|
mref_a->has_completed = false;
|
||||||
mref_a->submit_jiffies = jiffies;
|
mref_a->submit_jiffies = jiffies;
|
||||||
|
|
||||||
_hash_insert(output, mref_a);
|
_hash_insert(output, mref_a);
|
||||||
|
|
||||||
MARS_IO("added request id = %d pos = %lld len = %d flags = %ux (flying = %d)\n",
|
MARS_IO("added request id = %d pos = %lld len = %d flags = %ux (flying = %d)\n",
|
||||||
@ -539,6 +541,7 @@ static void client_ref_io(struct client_output *output, struct mref_object *mref
|
|||||||
|
|
||||||
error:
|
error:
|
||||||
MARS_ERR("IO submission on dead instance\n");
|
MARS_ERR("IO submission on dead instance\n");
|
||||||
|
mref_a->has_completed = true;
|
||||||
error = -ESHUTDOWN;
|
error = -ESHUTDOWN;
|
||||||
SIMPLE_CALLBACK(mref, error);
|
SIMPLE_CALLBACK(mref, error);
|
||||||
return;
|
return;
|
||||||
@ -604,6 +607,7 @@ int receiver_thread(void *data)
|
|||||||
struct client_mref_aspect *mref_a = NULL;
|
struct client_mref_aspect *mref_a = NULL;
|
||||||
unsigned long id = READ_ONCE(cmd.cmd_int1);
|
unsigned long id = READ_ONCE(cmd.cmd_int1);
|
||||||
unsigned int hash_index = CLIENT_HASH_FN(id);
|
unsigned int hash_index = CLIENT_HASH_FN(id);
|
||||||
|
bool had_completed = false;
|
||||||
|
|
||||||
mutex_lock(&output->mutex);
|
mutex_lock(&output->mutex);
|
||||||
for (tmp = output->hash_table[hash_index].next; tmp != &output->hash_table[hash_index]; tmp = tmp->next) {
|
for (tmp = output->hash_table[hash_index].next; tmp != &output->hash_table[hash_index]; tmp = tmp->next) {
|
||||||
@ -626,6 +630,11 @@ int receiver_thread(void *data)
|
|||||||
mref = tmp_mref;
|
mref = tmp_mref;
|
||||||
list_del_init(&mref_a->hash_head);
|
list_del_init(&mref_a->hash_head);
|
||||||
list_del_init(&mref_a->io_head);
|
list_del_init(&mref_a->io_head);
|
||||||
|
/* Networking produces inherent races between re-submission and
|
||||||
|
* completion. Compensate them here.
|
||||||
|
*/
|
||||||
|
had_completed = mref_a->has_completed;
|
||||||
|
mref_a->has_completed = true;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
err:
|
err:
|
||||||
@ -660,6 +669,8 @@ int receiver_thread(void *data)
|
|||||||
output->bundle.path,
|
output->bundle.path,
|
||||||
output->bundle.host,
|
output->bundle.host,
|
||||||
status);
|
status);
|
||||||
|
if (had_completed)
|
||||||
|
goto has_finished;
|
||||||
_hash_insert(output, mref_a);
|
_hash_insert(output, mref_a);
|
||||||
goto done;
|
goto done;
|
||||||
}
|
}
|
||||||
@ -667,8 +678,11 @@ int receiver_thread(void *data)
|
|||||||
if (mref->_object_cb.cb_error < 0) {
|
if (mref->_object_cb.cb_error < 0) {
|
||||||
MARS_DBG("ERROR %d\n", mref->_object_cb.cb_error);
|
MARS_DBG("ERROR %d\n", mref->_object_cb.cb_error);
|
||||||
}
|
}
|
||||||
SIMPLE_CALLBACK(mref, mref->_object_cb.cb_error);
|
if (!had_completed) {
|
||||||
|
SIMPLE_CALLBACK(mref, mref->_object_cb.cb_error);
|
||||||
|
}
|
||||||
|
|
||||||
|
has_finished:
|
||||||
client_ref_put(output, mref);
|
client_ref_put(output, mref);
|
||||||
|
|
||||||
atomic_dec(&output->brick->fly_count);
|
atomic_dec(&output->brick->fly_count);
|
||||||
@ -729,6 +743,7 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, int *ro
|
|||||||
struct list_head *tmp;
|
struct list_head *tmp;
|
||||||
struct list_head *prev;
|
struct list_head *prev;
|
||||||
LIST_HEAD(tmp_list);
|
LIST_HEAD(tmp_list);
|
||||||
|
LIST_HEAD(completed_list);
|
||||||
long io_timeout = _compute_timeout(brick, false);
|
long io_timeout = _compute_timeout(brick, false);
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
@ -770,7 +785,12 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, int *ro
|
|||||||
|
|
||||||
list_del_init(&mref_a->hash_head);
|
list_del_init(&mref_a->hash_head);
|
||||||
list_del_init(&mref_a->io_head);
|
list_del_init(&mref_a->io_head);
|
||||||
list_add_tail(&mref_a->tmp_head, &tmp_list);
|
if (mref_a->has_completed) {
|
||||||
|
list_add_tail(&mref_a->tmp_head, &completed_list);
|
||||||
|
} else {
|
||||||
|
mref_a->has_completed = true;
|
||||||
|
list_add_tail(&mref_a->tmp_head, &tmp_list);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
mutex_unlock(&output->mutex);
|
mutex_unlock(&output->mutex);
|
||||||
|
|
||||||
@ -801,6 +821,22 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, int *ro
|
|||||||
atomic_dec(&brick->fly_count);
|
atomic_dec(&brick->fly_count);
|
||||||
atomic_dec(&mars_global_io_flying);
|
atomic_dec(&mars_global_io_flying);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
while (!list_empty(&completed_list)) {
|
||||||
|
struct client_mref_aspect *mref_a;
|
||||||
|
struct mref_object *mref;
|
||||||
|
|
||||||
|
tmp = completed_list.next;
|
||||||
|
list_del_init(tmp);
|
||||||
|
mref_a = container_of(tmp, struct client_mref_aspect, tmp_head);
|
||||||
|
mref = mref_a->object;
|
||||||
|
|
||||||
|
client_ref_put(output, mref);
|
||||||
|
|
||||||
|
atomic_dec(&brick->fly_count);
|
||||||
|
atomic_dec(&mars_global_io_flying);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static
|
static
|
||||||
|
@ -44,6 +44,7 @@ struct client_mref_aspect {
|
|||||||
unsigned long submit_jiffies;
|
unsigned long submit_jiffies;
|
||||||
int alloc_len;
|
int alloc_len;
|
||||||
bool do_dealloc;
|
bool do_dealloc;
|
||||||
|
bool has_completed;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct client_brick {
|
struct client_brick {
|
||||||
|
Loading…
Reference in New Issue
Block a user