copy: stabilize table for race prevention

This commit is contained in:
Thomas Schoebel-Theuer 2018-09-23 12:50:13 +02:00
parent e6a1197432
commit 28ceff2388

View File

@ -230,27 +230,19 @@ void copy_endio(struct generic_callback *cb)
error = -EINVAL;
goto exit;
}
st->active[queue] = false;
if (unlikely(st->table[queue])) {
if (unlikely(st->table[queue] != mref)) {
MARS_ERR("table corruption at %d %d (%p => %p)\n", index, queue, st->table[queue], mref);
error = -EEXIST;
goto exit;
}
if (unlikely(cb->cb_error < 0)) {
error = cb->cb_error;
__clear_mref(brick, mref, queue);
/* This is racy, but does no harm.
* Worst case just produces more error output.
*/
if (!brick->copy_error_count++) {
MARS_WRN("IO error %d on index %d, old state = %d\n", cb->cb_error, index, st->state);
}
} else {
if (unlikely(st->table[queue])) {
MARS_ERR("overwriting index %d, state = %d\n", index, st->state);
_clear_mref(brick, index, queue);
}
st->table[queue] = mref;
}
exit:
@ -258,6 +250,7 @@ exit:
st->error = error;
_clash(brick);
}
st->active[queue] = false;
if (mref->ref_rw) {
atomic_dec(&brick->copy_write_flight);
} else {
@ -277,6 +270,7 @@ int _make_mref(struct copy_brick *brick, int index, int queue, void *data, loff_
struct mref_object *mref;
struct copy_mref_aspect *mref_a;
struct copy_input *input;
struct copy_state *st;
int offset;
int len;
int status = -EAGAIN;
@ -335,7 +329,10 @@ int _make_mref(struct copy_brick *brick, int index, int queue, void *data, loff_
//MARS_IO("queue = %d index = %d pos = %lld len = %d rw = %d\n", queue, index, mref->ref_pos, mref->ref_len, rw);
GET_STATE(brick, index).active[queue] = true;
st = &GET_STATE(brick, index);
st->table[queue] = mref;
st->active[queue] = true;
if (rw) {
atomic_inc(&brick->copy_write_flight);
} else {
@ -422,8 +419,6 @@ restart:
goto idle;
}
_clear_mref(brick, index, 1);
_clear_mref(brick, index, 0);
st->writeout = false;
st->error = 0;
@ -455,17 +450,16 @@ restart:
next_state = COPY_STATE_READ2;
/* fallthrough */
case COPY_STATE_READ2:
mref1 = st->table[1];
if (!mref1) { // idempotence: wait by unchanged state
if (st->active[1]) { // idempotence: wait by unchanged state
goto idle;
}
/* fallthrough => wait for both mrefs to appear */
case COPY_STATE_READ1:
case COPY_STATE_READ3:
mref0 = st->table[0];
if (!mref0) { // idempotence: wait by unchanged state
if (st->active[0]) { // idempotence: wait by unchanged state
goto idle;
}
mref0 = st->table[0];
if (brick->copy_limiter) {
int amount = (mref0->ref_len - 1) / 1024 + 1;
mars_limit_sleep(brick->copy_limiter, amount);
@ -543,6 +537,11 @@ restart:
progress = -EILSEQ;
break;
}
if (unlikely(st->active[0])) {
MARS_ERR("src buffer for write is active, state %d at index %d\n", state, index);
progress = -EILSEQ;
break;
}
if (unlikely(brick->is_aborting)) {
progress = -EINTR;
break;
@ -565,8 +564,7 @@ restart:
next_state = COPY_STATE_WRITTEN;
/* fallthrough */
case COPY_STATE_WRITTEN:
mref1 = st->table[1];
if (!mref1) { // idempotence: wait by unchanged state
if (st->active[1]) { // idempotence: wait by unchanged state
MARS_IO("irrelevant\n");
goto idle;
}