mirror of https://github.com/schoebel/mars
client: fix races, simplify locking
This commit is contained in:
parent
bdd4a975f2
commit
14f35502b8
156
mars_client.c
156
mars_client.c
|
@ -195,11 +195,26 @@ static void client_ref_put(struct client_output *output, struct mref_object *mre
|
|||
client_free_mref(mref);
|
||||
}
|
||||
|
||||
static
|
||||
void _hash_insert(struct client_output *output, struct client_mref_aspect *mref_a)
|
||||
{
|
||||
struct mref_object *mref = mref_a->object;
|
||||
unsigned long flags;
|
||||
int hash_index;
|
||||
|
||||
traced_lock(&output->lock, flags);
|
||||
list_del(&mref_a->io_head);
|
||||
list_add_tail(&mref_a->io_head, &output->mref_list);
|
||||
list_del(&mref_a->hash_head);
|
||||
mref->ref_id = ++output->last_id;
|
||||
hash_index = mref->ref_id % CLIENT_HASH_MAX;
|
||||
list_add_tail(&mref_a->hash_head, &output->hash_table[hash_index]);
|
||||
traced_unlock(&output->lock, flags);
|
||||
}
|
||||
|
||||
static void client_ref_io(struct client_output *output, struct mref_object *mref)
|
||||
{
|
||||
struct client_mref_aspect *mref_a;
|
||||
int hash_index;
|
||||
unsigned long flags;
|
||||
int error = -EINVAL;
|
||||
|
||||
mref_a = client_mref_get_aspect(output->brick, mref);
|
||||
|
@ -220,16 +235,8 @@ static void client_ref_io(struct client_output *output, struct mref_object *mref
|
|||
atomic_inc(&output->fly_count);
|
||||
_mref_get(mref);
|
||||
|
||||
traced_lock(&output->lock, flags);
|
||||
mref_a->submit_jiffies = jiffies;
|
||||
mref->ref_id = ++output->last_id;
|
||||
list_add_tail(&mref_a->io_head, &output->mref_list);
|
||||
traced_unlock(&output->lock, flags);
|
||||
|
||||
hash_index = mref->ref_id % CLIENT_HASH_MAX;
|
||||
traced_lock(&output->hash_lock[hash_index], flags);
|
||||
list_add_tail(&mref_a->hash_head, &output->hash_table[hash_index]);
|
||||
traced_unlock(&output->hash_lock[hash_index], flags);
|
||||
_hash_insert(output, mref_a);
|
||||
|
||||
MARS_IO("added request id = %d pos = %lld len = %d rw = %d (flying = %d)\n", mref->ref_id, mref->ref_pos, mref->ref_len, mref->ref_rw, atomic_read(&output->fly_count));
|
||||
|
||||
|
@ -276,26 +283,28 @@ int receiver_thread(void *data)
|
|||
{
|
||||
int hash_index = cmd.cmd_int1 % CLIENT_HASH_MAX;
|
||||
|
||||
traced_lock(&output->hash_lock[hash_index], flags);
|
||||
traced_lock(&output->lock, flags);
|
||||
for (tmp = output->hash_table[hash_index].next; tmp != &output->hash_table[hash_index]; tmp = tmp->next) {
|
||||
struct mref_object *tmp_mref;
|
||||
mref_a = container_of(tmp, struct client_mref_aspect, hash_head);
|
||||
tmp_mref = mref_a->object;
|
||||
if (unlikely(!tmp_mref)) {
|
||||
traced_unlock(&output->hash_lock[hash_index], flags);
|
||||
traced_unlock(&output->lock, flags);
|
||||
MARS_ERR("bad internal mref pointer\n");
|
||||
status = -EBADR;
|
||||
goto done;
|
||||
}
|
||||
if (tmp_mref->ref_id == cmd.cmd_int1) {
|
||||
mref = tmp_mref;
|
||||
list_del_init(&mref_a->hash_head);
|
||||
list_del_init(&mref_a->io_head);
|
||||
break;
|
||||
}
|
||||
}
|
||||
traced_unlock(&output->hash_lock[hash_index], flags);
|
||||
traced_unlock(&output->lock, flags);
|
||||
|
||||
if (!mref) {
|
||||
MARS_ERR("got unknown id = %d for callback\n", cmd.cmd_int1);
|
||||
if (unlikely(!mref)) {
|
||||
MARS_WRN("got unknown id = %d for callback\n", cmd.cmd_int1);
|
||||
status = -EBADR;
|
||||
goto done;
|
||||
}
|
||||
|
@ -304,19 +313,12 @@ int receiver_thread(void *data)
|
|||
|
||||
status = mars_recv_cb(&output->socket, mref, &cmd);
|
||||
MARS_IO("new status = %d, pos = %lld len = %d rw = %d\n", status, mref->ref_pos, mref->ref_len, mref->ref_rw);
|
||||
if (status < 0) {
|
||||
if (unlikely(status < 0)) {
|
||||
MARS_WRN("interrupted data transfer during callback, status = %d\n", status);
|
||||
_hash_insert(output, mref_a);
|
||||
goto done;
|
||||
}
|
||||
|
||||
traced_lock(&output->hash_lock[hash_index], flags);
|
||||
list_del_init(&mref_a->hash_head);
|
||||
traced_unlock(&output->hash_lock[hash_index], flags);
|
||||
|
||||
traced_lock(&output->lock, flags);
|
||||
list_del_init(&mref_a->io_head);
|
||||
traced_unlock(&output->lock, flags);
|
||||
|
||||
SIMPLE_CALLBACK(mref, 0);
|
||||
|
||||
client_ref_put(output, mref);
|
||||
|
@ -356,6 +358,9 @@ int receiver_thread(void *data)
|
|||
static
|
||||
void _do_resubmit(struct client_output *output)
|
||||
{
|
||||
unsigned long flags;
|
||||
|
||||
traced_lock(&output->lock, flags);
|
||||
if (!list_empty(&output->wait_list)) {
|
||||
struct list_head *first = output->wait_list.next;
|
||||
struct list_head *last = output->wait_list.prev;
|
||||
|
@ -366,55 +371,66 @@ void _do_resubmit(struct client_output *output)
|
|||
INIT_LIST_HEAD(&output->wait_list);
|
||||
MARS_IO("done re-submit %p %p\n", first, last);
|
||||
}
|
||||
traced_unlock(&output->lock, flags);
|
||||
}
|
||||
|
||||
static
|
||||
void _do_timeout(struct client_output *output, struct list_head *anchor, bool force)
|
||||
{
|
||||
struct client_brick *brick = output->brick;
|
||||
struct list_head *tmp;
|
||||
struct list_head *next;
|
||||
LIST_HEAD(tmp_list);
|
||||
int rounds = 0;
|
||||
int io_timeout = brick->io_timeout;
|
||||
long io_timeout = brick->io_timeout;
|
||||
unsigned long flags;
|
||||
|
||||
if (io_timeout <= 0)
|
||||
io_timeout = global_net_io_timeout;
|
||||
|
||||
if (!mars_net_is_alive)
|
||||
force = true;
|
||||
|
||||
if (!force && io_timeout <= 0)
|
||||
return;
|
||||
|
||||
io_timeout *= HZ;
|
||||
|
||||
traced_lock(&output->lock, flags);
|
||||
for (tmp = anchor->next, next = tmp->next; tmp != anchor; tmp = next, next = tmp->next) {
|
||||
struct client_mref_aspect *mref_a;
|
||||
|
||||
while (!list_empty(anchor)) {
|
||||
struct list_head *tmp;
|
||||
mref_a = container_of(tmp, struct client_mref_aspect, io_head);
|
||||
|
||||
if (!force &&
|
||||
!time_is_before_jiffies(mref_a->submit_jiffies + io_timeout)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
list_del_init(&mref_a->hash_head);
|
||||
list_del_init(&mref_a->io_head);
|
||||
list_add_tail(&mref_a->tmp_head, &tmp_list);
|
||||
}
|
||||
traced_unlock(&output->lock, flags);
|
||||
|
||||
while (!list_empty(&tmp_list)) {
|
||||
struct client_mref_aspect *mref_a;
|
||||
struct mref_object *mref;
|
||||
int hash_index;
|
||||
unsigned long flags;
|
||||
|
||||
traced_lock(&output->lock, flags);
|
||||
tmp = anchor->next;
|
||||
traced_unlock(&output->lock, flags);
|
||||
|
||||
mref_a = container_of(tmp, struct client_mref_aspect, io_head);
|
||||
tmp = tmp_list.next;
|
||||
list_del_init(tmp);
|
||||
mref_a = container_of(tmp, struct client_mref_aspect, tmp_head);
|
||||
mref = mref_a->object;
|
||||
|
||||
if (!force &&
|
||||
mars_net_is_alive &&
|
||||
(io_timeout <= 0 || !time_is_before_jiffies(mref_a->submit_jiffies + io_timeout * HZ))) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!rounds++) {
|
||||
MARS_WRN("timeout after %d: signalling IO error at pos = %lld len = %d\n",
|
||||
MARS_WRN("timeout after %ld: signalling IO error at pos = %lld len = %d\n",
|
||||
io_timeout,
|
||||
mref->ref_pos,
|
||||
mref->ref_len);
|
||||
}
|
||||
|
||||
atomic_inc(&output->timeout_count);
|
||||
|
||||
hash_index = mref->ref_id % CLIENT_HASH_MAX;
|
||||
|
||||
traced_lock(&output->hash_lock[hash_index], flags);
|
||||
list_del_init(&mref_a->hash_head);
|
||||
traced_unlock(&output->hash_lock[hash_index], flags);
|
||||
|
||||
traced_lock(&output->lock, flags);
|
||||
list_del_init(&mref_a->io_head);
|
||||
traced_unlock(&output->lock, flags);
|
||||
|
||||
SIMPLE_CALLBACK(mref, -ENOTCONN);
|
||||
|
||||
client_ref_put(output, mref);
|
||||
|
@ -438,7 +454,6 @@ static int sender_thread(void *data)
|
|||
struct list_head *tmp = NULL;
|
||||
struct client_mref_aspect *mref_a;
|
||||
struct mref_object *mref;
|
||||
bool do_resubmit = false;
|
||||
|
||||
if (unlikely(!mars_socket_is_alive(&output->socket))) {
|
||||
if (do_kill) {
|
||||
|
@ -456,19 +471,12 @@ static int sender_thread(void *data)
|
|||
continue;
|
||||
}
|
||||
do_kill = true;
|
||||
do_resubmit = true;
|
||||
}
|
||||
|
||||
if (do_resubmit) {
|
||||
/* Re-Submit any waiting requests
|
||||
*/
|
||||
MARS_IO("re-submit\n");
|
||||
traced_lock(&output->lock, flags);
|
||||
_do_resubmit(output);
|
||||
traced_unlock(&output->lock, flags);
|
||||
_do_timeout(output, &output->mref_list, false);
|
||||
}
|
||||
|
||||
|
||||
wait_event_interruptible_timeout(output->event, !list_empty(&output->mref_list) || output->get_info || brick_thread_should_stop(), 1 * HZ);
|
||||
|
||||
if (output->get_info) {
|
||||
|
@ -481,14 +489,19 @@ static int sender_thread(void *data)
|
|||
}
|
||||
}
|
||||
|
||||
if (list_empty(&output->mref_list))
|
||||
continue;
|
||||
|
||||
/* Grab the next mref from the queue
|
||||
*/
|
||||
traced_lock(&output->lock, flags);
|
||||
if (list_empty(&output->mref_list)) {
|
||||
traced_unlock(&output->lock, flags);
|
||||
continue;
|
||||
}
|
||||
tmp = output->mref_list.next;
|
||||
traced_unlock(&output->lock, flags);
|
||||
|
||||
list_del(tmp);
|
||||
list_add(tmp, &output->wait_list);
|
||||
mref_a = container_of(tmp, struct client_mref_aspect, io_head);
|
||||
traced_unlock(&output->lock, flags);
|
||||
|
||||
mref = mref_a->object;
|
||||
|
||||
if (brick->limit_mode) {
|
||||
|
@ -510,15 +523,10 @@ static int sender_thread(void *data)
|
|||
do_kill = false;
|
||||
_kill_socket(output);
|
||||
}
|
||||
brick_msleep(3000);
|
||||
_hash_insert(output, mref_a);
|
||||
brick_msleep(1000);
|
||||
continue;
|
||||
}
|
||||
|
||||
// all ok, remember in-flight mrefs
|
||||
traced_lock(&output->lock, flags);
|
||||
list_del(tmp);
|
||||
list_add(tmp, &output->wait_list);
|
||||
traced_unlock(&output->lock, flags);
|
||||
}
|
||||
//done:
|
||||
if (status < 0) {
|
||||
|
@ -615,6 +623,7 @@ static int client_mref_aspect_init_fn(struct generic_aspect *_ini)
|
|||
struct client_mref_aspect *ini = (void*)_ini;
|
||||
INIT_LIST_HEAD(&ini->io_head);
|
||||
INIT_LIST_HEAD(&ini->hash_head);
|
||||
INIT_LIST_HEAD(&ini->tmp_head);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -638,7 +647,6 @@ static int client_output_construct(struct client_output *output)
|
|||
{
|
||||
int i;
|
||||
for (i = 0; i < CLIENT_HASH_MAX; i++) {
|
||||
spin_lock_init(&output->hash_lock[i]);
|
||||
INIT_LIST_HEAD(&output->hash_table[i]);
|
||||
}
|
||||
spin_lock_init(&output->lock);
|
||||
|
|
|
@ -14,6 +14,7 @@ struct client_mref_aspect {
|
|||
GENERIC_ASPECT(mref);
|
||||
struct list_head io_head;
|
||||
struct list_head hash_head;
|
||||
struct list_head tmp_head;
|
||||
unsigned long submit_jiffies;
|
||||
int alloc_len;
|
||||
bool do_dealloc;
|
||||
|
@ -56,7 +57,6 @@ struct client_output {
|
|||
wait_queue_head_t info_event;
|
||||
bool get_info;
|
||||
bool got_info;
|
||||
spinlock_t hash_lock[CLIENT_HASH_MAX];
|
||||
struct list_head hash_table[CLIENT_HASH_MAX];
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue