add abort mode to mars_copy (not yet activated)

This commit is contained in:
schoebel 2011-11-04 17:23:52 +01:00 committed by Thomas Schoebel-Theuer
parent 4525d28aed
commit cbd156f307
3 changed files with 64 additions and 19 deletions

View File

@ -357,10 +357,26 @@ int receiver_thread(void *data)
return status;
}
static
void _do_resubmit(struct client_output *output)
{
if (!list_empty(&output->wait_list)) {
struct list_head *first = output->wait_list.next;
struct list_head *last = output->wait_list.prev;
struct list_head *old_start = output->mref_list.next;
#define list_connect __list_del // the original routine has a misleading name: in reality it is more general
list_connect(&output->mref_list, first);
list_connect(last, old_start);
INIT_LIST_HEAD(&output->wait_list);
MARS_IO("done re-submit %p %p\n", first, last);
}
}
static int sender_thread(void *data)
{
struct client_output *output = data;
struct client_brick *brick = output->brick;
unsigned long flags;
int status = 0;
output->receiver.restart_count = 0;
@ -369,7 +385,6 @@ static int sender_thread(void *data)
struct list_head *tmp;
struct client_mref_aspect *mref_a;
struct mref_object *mref;
unsigned long flags;
bool do_resubmit = false;
if (unlikely(!output->socket)) {
@ -387,20 +402,11 @@ static int sender_thread(void *data)
*/
MARS_IO("re-submit\n");
traced_lock(&output->lock, flags);
if (!list_empty(&output->wait_list)) {
struct list_head *first = output->wait_list.next;
struct list_head *last = output->wait_list.prev;
struct list_head *old_start = output->mref_list.next;
#define list_connect __list_del // the original routine has a misleading name: in reality it is more general
list_connect(&output->mref_list, first);
list_connect(last, old_start);
INIT_LIST_HEAD(&output->wait_list);
MARS_IO("done re-submit %p %p\n", first, last);
}
_do_resubmit(output);
traced_unlock(&output->lock, flags);
}
wait_event_interruptible_timeout(output->event, !list_empty(&output->mref_list) || output->get_info, 10 * HZ);
wait_event_interruptible_timeout(output->event, !list_empty(&output->mref_list) || output->get_info || kthread_should_stop(), 1 * HZ);
if (output->get_info) {
status = _request_info(output);
@ -446,8 +452,33 @@ static int sender_thread(void *data)
_kill_socket(output);
/* Signal error on all pending IO requests.
* We have no other chance (except probably delaying
* this until destruction which mostly is not what
* we want).
*/
traced_lock(&output->lock, flags);
_do_resubmit(output);
while (!list_empty(&output->mref_list)) {
struct list_head *tmp = output->mref_list.next;
struct client_mref_aspect *mref_a;
struct mref_object *mref;
list_del_init(tmp);
traced_unlock(&output->lock, flags);
mref_a = container_of(tmp, struct client_mref_aspect, io_head);
mref = mref_a->object;
MARS_DBG("signalling IO error at pos = %lld len = %d\n", mref->ref_pos, mref->ref_len);
atomic_dec(&output->fly_count);
SIMPLE_CALLBACK(mref, -ENOTCONN);
client_ref_put(output, mref);
traced_lock(&output->lock, flags);
}
traced_unlock(&output->lock, flags);
output->sender.terminated = true;
wake_up_interruptible(&output->sender.run_event);
MARS_DBG("sender terminated\n");
return status;
}

View File

@ -372,13 +372,13 @@ done:
}
static
void _run_copy(struct copy_brick *brick)
int _run_copy(struct copy_brick *brick)
{
int max;
loff_t pos;
loff_t limit = 0;
short prev;
int status;
int res_status = 0;
if (unlikely(_clear_clash(brick))) {
int i;
@ -389,7 +389,7 @@ void _run_copy(struct copy_brick *brick)
_clash(brick);
MARS_DBG("re-clash\n");
msleep(100);
return;
return 0;
}
for (i = 0; i < MAX_COPY_PARA; i++) {
_clear_mref(brick, i, 0);
@ -416,7 +416,9 @@ void _run_copy(struct copy_brick *brick)
prev = index;
// call the finite state automaton
if (!st->active[0] && !st->active[1]) {
int status;
status = _next_state(brick, index, pos);
MARS_IO("index = %d pos = %lld status 0 %d\n", index, pos, status);
limit = pos;
}
}
@ -432,6 +434,7 @@ void _run_copy(struct copy_brick *brick)
}
st->state = COPY_STATE_START;
if (unlikely(st->error < 0)) {
res_status = st->error;
break;
}
count += st->len;
@ -446,6 +449,7 @@ void _run_copy(struct copy_brick *brick)
_update_percent(brick);
}
}
return res_status;
}
static int _copy_thread(void *data)
@ -453,6 +457,7 @@ static int _copy_thread(void *data)
struct copy_brick *brick = data;
MARS_DBG("--------------- copy_thread %p starting\n", brick);
brick->copy_error = 0;
mars_power_led_on((void*)brick, true);
brick->trigger = true;
@ -460,18 +465,25 @@ static int _copy_thread(void *data)
loff_t old_start = brick->copy_start;
loff_t old_end = brick->copy_end;
if (old_end > 0) {
_run_copy(brick);
int status = _run_copy(brick);
if (unlikely(status < 0)) {
brick->copy_error = status;
if (brick->abort_mode) {
MARS_INF("IO error, terminating prematurely, status = %d\n", status);
break;
}
}
msleep(10); // yield FIXME: remove this, use event handling for over/underflow
}
wait_event_interruptible_timeout(brick->event,
brick->trigger || brick->copy_start != old_start || brick->copy_end != old_end || kthread_should_stop(),
5 * HZ);
1 * HZ);
brick->trigger = false;
}
MARS_DBG("--------------- copy_thread terminating\n");
MARS_DBG("--------------- copy_thread terminating (%d requests flying)\n", atomic_read(&brick->copy_flight));
wait_event_interruptible_timeout(brick->event, !atomic_read(&brick->copy_flight), 300 * HZ);
mars_power_led_off((void*)brick, true);
MARS_DBG("--------------- copy_thread done.\n");
@ -545,7 +557,7 @@ static int copy_switch(struct copy_brick *brick)
mars_power_led_on((void*)brick, false);
if (brick->thread) {
MARS_INF("stopping thread...\n");
kthread_stop_nowait(brick->thread);
kthread_stop(brick->thread);
put_task_struct(brick->thread);
brick->thread = NULL;
wake_up_interruptible(&brick->event);

View File

@ -48,8 +48,10 @@ struct copy_brick {
int append_mode; // 1 = passively, 2 = actively
bool verify_mode;
bool utilize_mode; // utilize already copied data
bool abort_mode; // abort on IO error (default is retry forever)
// readonly from outside
loff_t copy_last; // current working position
int copy_error;
bool low_dirty;
// internal
volatile bool trigger;