mirror of https://github.com/schoebel/mars
trans_logger: fix potential race on log_input
During logrotate, there may exist writeback clusters with mixed log_inputs. Each sub_mref now gets its individual log_input inherited from the orig_mref. Add ref counting to ensure that cleanup can never occur on an active log_input.
This commit is contained in:
parent
51fe58aeac
commit
28d433cc81
|
@ -974,9 +974,12 @@ void pos_complete(struct trans_logger_mref_aspect *orig_mref_a)
|
||||||
// am I the first member? (means "youngest" list entry)
|
// am I the first member? (means "youngest" list entry)
|
||||||
if (tmp == log_input->pos_list.next) {
|
if (tmp == log_input->pos_list.next) {
|
||||||
MARS_IO("first_finished = %lld\n", finished);
|
MARS_IO("first_finished = %lld\n", finished);
|
||||||
if (finished <= log_input->inf.inf_min_pos) {
|
if (unlikely(finished <= log_input->inf.inf_min_pos)) {
|
||||||
MARS_ERR("backskip in log writeback: %lld -> %lld\n", log_input->inf.inf_min_pos, finished);
|
MARS_ERR("backskip in log writeback: %lld -> %lld\n", log_input->inf.inf_min_pos, finished);
|
||||||
}
|
}
|
||||||
|
if (unlikely(finished > log_input->inf.inf_max_pos)) {
|
||||||
|
MARS_ERR("min_pos > max_pos: %lld > %lld\n", finished, log_input->inf.inf_max_pos);
|
||||||
|
}
|
||||||
log_input->inf.inf_min_pos = finished;
|
log_input->inf.inf_min_pos = finished;
|
||||||
get_lamport(&log_input->inf.inf_min_pos_stamp);
|
get_lamport(&log_input->inf.inf_min_pos_stamp);
|
||||||
_inf_callback(log_input, false);
|
_inf_callback(log_input, false);
|
||||||
|
@ -1078,6 +1081,7 @@ void wb_endio(struct generic_callback *cb)
|
||||||
} else {
|
} else {
|
||||||
MARS_ERR("internal: no endio defined\n");
|
MARS_ERR("internal: no endio defined\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
done:
|
done:
|
||||||
wake_up_interruptible_all(&brick->worker_event);
|
wake_up_interruptible_all(&brick->worker_event);
|
||||||
return;
|
return;
|
||||||
|
@ -1093,7 +1097,7 @@ err:
|
||||||
* point in time.
|
* point in time.
|
||||||
*/
|
*/
|
||||||
static noinline
|
static noinline
|
||||||
struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t pos, int len, struct timespec *elder, struct trans_logger_input *log_input)
|
struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t pos, int len, struct timespec *elder)
|
||||||
{
|
{
|
||||||
struct writeback_info *wb;
|
struct writeback_info *wb;
|
||||||
struct trans_logger_input *read_input;
|
struct trans_logger_input *read_input;
|
||||||
|
@ -1147,6 +1151,7 @@ struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t p
|
||||||
while (len > 0) {
|
while (len > 0) {
|
||||||
struct trans_logger_mref_aspect *sub_mref_a;
|
struct trans_logger_mref_aspect *sub_mref_a;
|
||||||
struct mref_object *sub_mref;
|
struct mref_object *sub_mref;
|
||||||
|
struct trans_logger_input *log_input;
|
||||||
int this_len;
|
int this_len;
|
||||||
int status;
|
int status;
|
||||||
|
|
||||||
|
@ -1167,7 +1172,9 @@ struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t p
|
||||||
CHECK_ASPECT(sub_mref_a, sub_mref, err);
|
CHECK_ASPECT(sub_mref_a, sub_mref, err);
|
||||||
|
|
||||||
sub_mref_a->my_input = read_input;
|
sub_mref_a->my_input = read_input;
|
||||||
|
log_input = brick->inputs[brick->log_input_nr];
|
||||||
sub_mref_a->log_input = log_input;
|
sub_mref_a->log_input = log_input;
|
||||||
|
atomic_inc(&log_input->log_ref_count);
|
||||||
sub_mref_a->my_brick = brick;
|
sub_mref_a->my_brick = brick;
|
||||||
sub_mref_a->orig_rw = READ;
|
sub_mref_a->orig_rw = READ;
|
||||||
sub_mref_a->wb = wb;
|
sub_mref_a->wb = wb;
|
||||||
|
@ -1199,6 +1206,7 @@ struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t p
|
||||||
struct mref_object *sub_mref;
|
struct mref_object *sub_mref;
|
||||||
struct trans_logger_mref_aspect *orig_mref_a;
|
struct trans_logger_mref_aspect *orig_mref_a;
|
||||||
struct mref_object *orig_mref;
|
struct mref_object *orig_mref;
|
||||||
|
struct trans_logger_input *log_input;
|
||||||
void *data;
|
void *data;
|
||||||
int this_len = len;
|
int this_len = len;
|
||||||
int diff;
|
int diff;
|
||||||
|
@ -1238,7 +1246,9 @@ struct writeback_info *make_writeback(struct trans_logger_brick *brick, loff_t p
|
||||||
|
|
||||||
sub_mref_a->orig_mref_a = orig_mref_a;
|
sub_mref_a->orig_mref_a = orig_mref_a;
|
||||||
sub_mref_a->my_input = write_input;
|
sub_mref_a->my_input = write_input;
|
||||||
|
log_input = orig_mref_a->log_input;
|
||||||
sub_mref_a->log_input = log_input;
|
sub_mref_a->log_input = log_input;
|
||||||
|
atomic_inc(&log_input->log_ref_count);
|
||||||
sub_mref_a->my_brick = brick;
|
sub_mref_a->my_brick = brick;
|
||||||
sub_mref_a->orig_rw = WRITE;
|
sub_mref_a->orig_rw = WRITE;
|
||||||
sub_mref_a->wb = wb;
|
sub_mref_a->wb = wb;
|
||||||
|
@ -1275,7 +1285,6 @@ void _fire_one(struct list_head *tmp, bool do_update)
|
||||||
struct trans_logger_mref_aspect *sub_mref_a;
|
struct trans_logger_mref_aspect *sub_mref_a;
|
||||||
struct mref_object *sub_mref;
|
struct mref_object *sub_mref;
|
||||||
struct trans_logger_input *sub_input;
|
struct trans_logger_input *sub_input;
|
||||||
struct trans_logger_input *log_input;
|
|
||||||
|
|
||||||
sub_mref_a = container_of(tmp, struct trans_logger_mref_aspect, sub_head);
|
sub_mref_a = container_of(tmp, struct trans_logger_mref_aspect, sub_head);
|
||||||
sub_mref = sub_mref_a->object;
|
sub_mref = sub_mref_a->object;
|
||||||
|
@ -1288,16 +1297,21 @@ void _fire_one(struct list_head *tmp, bool do_update)
|
||||||
|
|
||||||
SETUP_CALLBACK(sub_mref, wb_endio, sub_mref_a);
|
SETUP_CALLBACK(sub_mref, wb_endio, sub_mref_a);
|
||||||
|
|
||||||
sub_input = sub_mref_a->my_input;
|
|
||||||
log_input = sub_mref_a->log_input;
|
|
||||||
|
|
||||||
if (do_update) {
|
if (do_update) {
|
||||||
struct trans_logger_mref_aspect *orig_mref_a = sub_mref_a->orig_mref_a;
|
struct trans_logger_mref_aspect *orig_mref_a = sub_mref_a->orig_mref_a;
|
||||||
if (unlikely(!orig_mref_a)) {
|
if (unlikely(!orig_mref_a)) {
|
||||||
MARS_ERR("internal problem\n");
|
MARS_ERR("internal problem\n");
|
||||||
} else {
|
} else {
|
||||||
loff_t max_pos = orig_mref_a->log_pos;
|
loff_t max_pos = orig_mref_a->log_pos;
|
||||||
|
struct trans_logger_input *log_input;
|
||||||
|
log_input = orig_mref_a->log_input;
|
||||||
|
if (unlikely(!log_input)) {
|
||||||
|
MARS_ERR("internal problem\n");
|
||||||
|
} else {
|
||||||
down(&log_input->inf_mutex);
|
down(&log_input->inf_mutex);
|
||||||
|
if (unlikely(max_pos < log_input->inf.inf_min_pos)) {
|
||||||
|
MARS_ERR("new max_pos < min_pos: %lld < %lld\n", max_pos, log_input->inf.inf_min_pos);
|
||||||
|
}
|
||||||
if (log_input->inf.inf_max_pos < max_pos) {
|
if (log_input->inf.inf_max_pos < max_pos) {
|
||||||
log_input->inf.inf_max_pos = max_pos;
|
log_input->inf.inf_max_pos = max_pos;
|
||||||
get_lamport(&log_input->inf.inf_max_pos_stamp);
|
get_lamport(&log_input->inf.inf_max_pos_stamp);
|
||||||
|
@ -1306,6 +1320,9 @@ void _fire_one(struct list_head *tmp, bool do_update)
|
||||||
up(&log_input->inf_mutex);
|
up(&log_input->inf_mutex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sub_input = sub_mref_a->my_input;
|
||||||
|
|
||||||
#ifdef DO_WRITEBACK
|
#ifdef DO_WRITEBACK
|
||||||
GENERIC_INPUT_CALL(sub_input, mref_io, sub_mref);
|
GENERIC_INPUT_CALL(sub_input, mref_io, sub_mref);
|
||||||
|
@ -1453,9 +1470,8 @@ bool phase0_startio(struct trans_logger_mref_aspect *orig_mref_a)
|
||||||
CHECK_PTR(orig_mref, err);
|
CHECK_PTR(orig_mref, err);
|
||||||
brick = orig_mref_a->my_brick;
|
brick = orig_mref_a->my_brick;
|
||||||
CHECK_PTR(brick, err);
|
CHECK_PTR(brick, err);
|
||||||
input = brick->inputs[brick->log_input_nr];
|
input = orig_mref_a->log_input;
|
||||||
CHECK_PTR(input, err);
|
CHECK_PTR(input, err);
|
||||||
orig_mref_a->log_input = input;
|
|
||||||
logst = &input->logst;
|
logst = &input->logst;
|
||||||
logst->do_crc = trans_logger_do_crc;
|
logst->do_crc = trans_logger_do_crc;
|
||||||
|
|
||||||
|
@ -1576,7 +1592,11 @@ bool prep_phase_startio(struct trans_logger_mref_aspect *mref_a)
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
if (likely(!mref_a->is_hashed)) {
|
if (likely(!mref_a->is_hashed)) {
|
||||||
|
struct trans_logger_input *log_input;
|
||||||
|
log_input = brick->inputs[brick->log_input_nr];
|
||||||
MARS_IO("hashing %d at %lld\n", mref->ref_len, mref->ref_pos);
|
MARS_IO("hashing %d at %lld\n", mref->ref_len, mref->ref_pos);
|
||||||
|
mref_a->log_input = log_input;
|
||||||
|
atomic_inc(&log_input->log_ref_count);
|
||||||
hash_insert(brick, mref_a);
|
hash_insert(brick, mref_a);
|
||||||
} else {
|
} else {
|
||||||
MARS_ERR("tried to hash twice\n");
|
MARS_ERR("tried to hash twice\n");
|
||||||
|
@ -1656,7 +1676,7 @@ bool phase1_startio(struct trans_logger_mref_aspect *orig_mref_a)
|
||||||
goto done;
|
goto done;
|
||||||
}
|
}
|
||||||
|
|
||||||
wb = make_writeback(brick, orig_mref->ref_pos, orig_mref->ref_len, &orig_mref_a->stamp, orig_mref_a->log_input);
|
wb = make_writeback(brick, orig_mref->ref_pos, orig_mref->ref_len, &orig_mref_a->stamp);
|
||||||
if (unlikely(!wb)) {
|
if (unlikely(!wb)) {
|
||||||
MARS_ERR("no mem\n");
|
MARS_ERR("no mem\n");
|
||||||
goto err;
|
goto err;
|
||||||
|
@ -2872,6 +2892,8 @@ char *trans_logger_statistics(struct trans_logger_brick *brick, int verbose)
|
||||||
"hash_count=%d "
|
"hash_count=%d "
|
||||||
"pos_count=%d "
|
"pos_count=%d "
|
||||||
"balance=%d/%d/%d/%d "
|
"balance=%d/%d/%d/%d "
|
||||||
|
"log_refs1=%d "
|
||||||
|
"log_refs2=%d "
|
||||||
"fly=%d "
|
"fly=%d "
|
||||||
"mref_flying1=%d "
|
"mref_flying1=%d "
|
||||||
"mref_flying2=%d "
|
"mref_flying2=%d "
|
||||||
|
@ -2932,6 +2954,8 @@ char *trans_logger_statistics(struct trans_logger_brick *brick, int verbose)
|
||||||
atomic_read(&brick->inner_balance_count),
|
atomic_read(&brick->inner_balance_count),
|
||||||
atomic_read(&brick->outer_balance_count),
|
atomic_read(&brick->outer_balance_count),
|
||||||
atomic_read(&brick->wb_balance_count),
|
atomic_read(&brick->wb_balance_count),
|
||||||
|
atomic_read(&brick->inputs[TL_INPUT_LOG1]->log_ref_count),
|
||||||
|
atomic_read(&brick->inputs[TL_INPUT_LOG2]->log_ref_count),
|
||||||
atomic_read(&brick->fly_count),
|
atomic_read(&brick->fly_count),
|
||||||
atomic_read(&brick->inputs[TL_INPUT_LOG1]->logst.mref_flying),
|
atomic_read(&brick->inputs[TL_INPUT_LOG1]->logst.mref_flying),
|
||||||
atomic_read(&brick->inputs[TL_INPUT_LOG2]->logst.mref_flying),
|
atomic_read(&brick->inputs[TL_INPUT_LOG2]->logst.mref_flying),
|
||||||
|
@ -3007,6 +3031,9 @@ void trans_logger_mref_aspect_exit_fn(struct generic_aspect *_ini)
|
||||||
CHECK_HEAD_EMPTY(&ini->collect_head);
|
CHECK_HEAD_EMPTY(&ini->collect_head);
|
||||||
CHECK_HEAD_EMPTY(&ini->sub_list);
|
CHECK_HEAD_EMPTY(&ini->sub_list);
|
||||||
CHECK_HEAD_EMPTY(&ini->sub_head);
|
CHECK_HEAD_EMPTY(&ini->sub_head);
|
||||||
|
if (ini->log_input) {
|
||||||
|
atomic_dec(&ini->log_input->log_ref_count);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
MARS_MAKE_STATICS(trans_logger);
|
MARS_MAKE_STATICS(trans_logger);
|
||||||
|
|
|
@ -233,6 +233,7 @@ struct trans_logger_input {
|
||||||
// informational
|
// informational
|
||||||
struct trans_logger_info inf;
|
struct trans_logger_info inf;
|
||||||
// readonly from outside
|
// readonly from outside
|
||||||
|
atomic_t log_ref_count;
|
||||||
bool is_operating;
|
bool is_operating;
|
||||||
long long last_jiffies;
|
long long last_jiffies;
|
||||||
|
|
||||||
|
|
|
@ -2175,11 +2175,15 @@ void _rotate_trans(struct mars_rotate *rot)
|
||||||
// try to cleanup old log
|
// try to cleanup old log
|
||||||
if (log_nr != old_nr) {
|
if (log_nr != old_nr) {
|
||||||
struct trans_logger_input *trans_input = trans_brick->inputs[old_nr];
|
struct trans_logger_input *trans_input = trans_brick->inputs[old_nr];
|
||||||
|
struct trans_logger_input *new_input = trans_brick->inputs[log_nr];
|
||||||
if (!trans_input->connect) {
|
if (!trans_input->connect) {
|
||||||
MARS_DBG("ignoring unused input %d\n", old_nr);
|
MARS_DBG("ignoring unused old input %d\n", old_nr);
|
||||||
|
} else if (!new_input->is_operating) {
|
||||||
|
MARS_DBG("ignoring uninitialized new input %d\n", log_nr);
|
||||||
} else if (trans_input->is_operating &&
|
} else if (trans_input->is_operating &&
|
||||||
trans_input->inf.inf_min_pos == trans_input->inf.inf_max_pos &&
|
trans_input->inf.inf_min_pos == trans_input->inf.inf_max_pos &&
|
||||||
list_empty(&trans_input->pos_list)) {
|
list_empty(&trans_input->pos_list) &&
|
||||||
|
atomic_read(&trans_input->log_ref_count) <= 0) {
|
||||||
int status;
|
int status;
|
||||||
MARS_INF("cleanup old transaction log (%d -> %d)\n", old_nr, log_nr);
|
MARS_INF("cleanup old transaction log (%d -> %d)\n", old_nr, log_nr);
|
||||||
status = generic_disconnect((void*)trans_input);
|
status = generic_disconnect((void*)trans_input);
|
||||||
|
|
Loading…
Reference in New Issue