copy: smp safeguard state table

The following variables are used by copy_endio() and thus
may be called both synchronously and asynchronously:

st->error
st->active[]
st->table[]
This commit is contained in:
Thomas Schoebel-Theuer 2021-03-01 10:51:20 +01:00
parent 667cab1ebb
commit 09ca8cce5c
1 changed files with 47 additions and 37 deletions

View File

@ -96,9 +96,9 @@ atomic_t global_copy_write_flight;
static inline
void _clash(struct copy_brick *brick)
{
brick->trigger = true;
set_bit(0, &brick->clash);
atomic_inc(&brick->total_clash_count);
WRITE_ONCE(brick->trigger, true);
wake_up_interruptible(&brick->event);
}
@ -173,14 +173,16 @@ static
void _clear_mref(struct copy_brick *brick, int index, int queue)
{
struct copy_state *st = &GET_STATE(brick, index);
struct mref_object *mref = st->table[queue];
struct mref_object *mref = READ_ONCE(st->table[queue]);
if (mref) {
if (unlikely(st->active[queue])) {
/* This should never happen */
if (unlikely(READ_ONCE(st->active[queue]))) {
WRITE_ONCE(st->active[queue], false);
MARS_ERR("clearing active mref, index = %d queue = %d\n", index, queue);
st->active[queue] = false;
}
__clear_mref(brick, mref, queue);
st->table[queue] = NULL;
WRITE_ONCE(st->table[queue], NULL);
}
}
@ -243,7 +245,7 @@ void copy_endio(struct generic_callback *cb)
error = -EINVAL;
goto exit;
}
if (unlikely(st->table[queue] != mref)) {
if (unlikely(READ_ONCE(st->table[queue]) != mref)) {
MARS_ERR("table corruption at %d %d (%p => %p)\n", index, queue, st->table[queue], mref);
error = -EEXIST;
goto exit;
@ -260,10 +262,10 @@ void copy_endio(struct generic_callback *cb)
exit:
if (unlikely(error < 0)) {
st->error = error;
WRITE_ONCE(st->error, error);
_clash(brick);
}
st->active[queue] = false;
WRITE_ONCE(st->active[queue], false);
if (mref->ref_flags & MREF_WRITE) {
atomic_dec(&brick->copy_write_flight);
atomic_dec(&global_copy_write_flight);
@ -271,7 +273,7 @@ exit:
atomic_dec(&brick->copy_read_flight);
atomic_dec(&global_copy_read_flight);
}
brick->trigger = true;
WRITE_ONCE(brick->trigger, true);
wake_up_interruptible(&brick->event);
return;
@ -344,8 +346,8 @@ int _make_mref(struct copy_brick *brick, int index, int queue, void *data,
}
st = &GET_STATE(brick, index);
st->table[queue] = mref;
st->active[queue] = true;
WRITE_ONCE(st->table[queue], mref);
WRITE_ONCE(st->active[queue], true);
if (flags & MREF_WRITE) {
atomic_inc(&brick->copy_write_flight);
@ -441,14 +443,15 @@ restart:
/* This is the relgular starting state.
* It must be zero, automatically entered via memset()
*/
if (st->table[0] || st->table[1]) {
if ((unsigned long)READ_ONCE(st->table[0]) |
(unsigned long)READ_ONCE(st->table[1])) {
MARS_ERR("index %d not startable\n", index);
progress = -EPROTO;
goto idle;
}
st->writeout = false;
st->error = 0;
WRITE_ONCE(st->error, 0);
if (brick->is_aborting ||
is_read_limited(brick))
@ -482,17 +485,19 @@ restart:
next_state = COPY_STATE_READ2;
/* fallthrough */
case COPY_STATE_READ2:
if (st->active[1]) { // idempotence: wait by unchanged state
if (READ_ONCE(st->active[1])) {
/* idempotence: wait by unchanged state */
goto idle;
}
/* wait for both mrefs to appear */
/* fallthrough */
case COPY_STATE_READ1:
case COPY_STATE_READ3:
if (st->active[0]) { // idempotence: wait by unchanged state
if (READ_ONCE(st->active[0])) {
/* idempotence: wait by unchanged state */
goto idle;
}
mref0 = st->table[0];
mref0 = READ_ONCE(st->table[0]);
if (brick->copy_limiter) {
int amount = (mref0->ref_len - 1) / 1024 + 1;
mars_limit_sleep(brick->copy_limiter, amount);
@ -502,7 +507,7 @@ restart:
brick->copy_end = mref0->ref_total_size;
}
// do verify (when applicable)
mref1 = st->table[1];
mref1 = READ_ONCE(st->table[1]);
if (mref1 && state != COPY_STATE_READ3) {
int len = mref0->ref_len;
bool ok;
@ -569,13 +574,13 @@ restart:
!GET_STATE(brick, st->prev).writeout) {
goto idle;
}
mref0 = st->table[0];
mref0 = READ_ONCE(st->table[0]);
if (unlikely(!mref0 || !mref0->ref_data)) {
MARS_ERR("src buffer for write does not exist, state %d at index %d\n", state, index);
progress = -EILSEQ;
break;
}
if (unlikely(st->active[0])) {
if (unlikely(READ_ONCE(st->active[0]))) {
MARS_ERR("src buffer for write is active, state %d at index %d\n", state, index);
progress = -EILSEQ;
break;
@ -604,7 +609,8 @@ restart:
next_state = COPY_STATE_WRITTEN;
/* fallthrough */
case COPY_STATE_WRITTEN:
if (st->active[1]) { // idempotence: wait by unchanged state
if (READ_ONCE(st->active[1])) {
/* idempotence: wait by unchanged state */
MARS_IO("irrelevant\n");
goto idle;
}
@ -640,8 +646,8 @@ restart:
idle:
if (unlikely(progress < 0)) {
if (st->error >= 0)
st->error = progress;
if (READ_ONCE(st->error) >= 0)
WRITE_ONCE(st->error, progress);
MARS_DBG("progress = %d\n", progress);
progress = 0;
_clash(brick);
@ -725,7 +731,7 @@ int _run_copy(struct copy_brick *brick, loff_t this_start)
}
st->prev = prev;
prev = index;
if (st->active[0] & st->active[1])
if (READ_ONCE(st->active[0]) & READ_ONCE(st->active[1]))
break;
// call the finite state automaton
@ -741,6 +747,7 @@ int _run_copy(struct copy_brick *brick, loff_t this_start)
// check the resulting state: can we advance the copy_last pointer?
if (this_start == brick->copy_last && progress && !brick->clash) {
int count = 0;
int error;
max = all_max;
for (pos = brick->copy_last;
@ -755,16 +762,17 @@ int _run_copy(struct copy_brick *brick, loff_t this_start)
if (max-- <= 0) {
break;
}
if (unlikely(st->error < 0)) {
error = READ_ONCE(st->error);
if (unlikely(error < 0)) {
/* check for fatal consistency errors */
if (st->error == -EMEDIUMTYPE) {
brick->copy_error = st->error;
if (error == -EMEDIUMTYPE) {
brick->copy_error = error;
brick->abort_mode = true;
MARS_WRN("Consistency is violated\n");
}
if (!brick->copy_error) {
brick->copy_error = st->error;
MARS_WRN("IO error = %d\n", st->error);
brick->copy_error = error;
MARS_WRN("IO error = %d\n", error);
}
if (brick->abort_mode) {
brick->is_aborting = true;
@ -821,7 +829,7 @@ static int _copy_thread(void *data)
mars_limit_reset(brick->copy_limiter);
_update_percent(brick, true);
brick->trigger = true;
WRITE_ONCE(brick->trigger, true);
while (!_is_done(brick)) {
loff_t old_start = brick->copy_start;
@ -863,12 +871,12 @@ static int _copy_thread(void *data)
wait_event_interruptible_timeout(brick->event,
progress > 0 ||
brick->trigger ||
READ_ONCE(brick->trigger) ||
brick->copy_start != old_start ||
brick->copy_end != old_end ||
_is_done(brick),
1 * HZ);
brick->trigger = false;
WRITE_ONCE(brick->trigger, false);
}
if (brick->copy_limiter)
@ -921,14 +929,16 @@ static int copy_ref_get(struct copy_output *output, struct mref_object *mref)
static void copy_ref_put(struct copy_output *output, struct mref_object *mref)
{
struct copy_brick *brick = output->brick;
struct copy_input *input;
int index;
index = _determine_input(output->brick, mref);
input = output->brick->inputs[index];
index = _determine_input(brick, mref);
input = brick->inputs[index];
GENERIC_INPUT_CALL(input, mref_put, mref);
if (atomic_dec_and_test(&output->brick->io_flight)) {
output->brick->trigger = true;
wake_up_interruptible(&output->brick->event);
if (atomic_dec_and_test(&brick->io_flight)) {
WRITE_ONCE(brick->trigger, true);
wake_up_interruptible(&brick->event);
}
}
@ -959,7 +969,7 @@ static int copy_switch(struct copy_brick *brick)
get_lamport(NULL, &brick->copy_last_stamp);
brick->thread = brick_thread_create(_copy_thread, brick, "mars_copy%d", version++);
if (brick->thread) {
brick->trigger = true;
WRITE_ONCE(brick->trigger, true);
} else {
mars_power_led_on((void*)brick, false);
mars_power_led_off((void*)brick, true);