trans_logger: fix races on _inf_callback()

This commit is contained in:
Thomas Schoebel-Theuer 2012-12-30 23:44:48 +01:00
parent ec69356a14
commit 51fe58aeac
2 changed files with 22 additions and 14 deletions

View File

@ -960,8 +960,6 @@ void pos_complete(struct trans_logger_mref_aspect *orig_mref_a)
struct trans_logger_input *log_input = orig_mref_a->log_input;
loff_t finished;
struct list_head *tmp;
unsigned long flags;
bool do_callback = false;
CHECK_PTR(brick, err);
CHECK_PTR(log_input, err);
@ -970,7 +968,7 @@ void pos_complete(struct trans_logger_mref_aspect *orig_mref_a)
tmp = &orig_mref_a->pos_head;
traced_lock(&log_input->pos_lock, flags);
down(&log_input->inf_mutex);
finished = orig_mref_a->log_pos;
// am I the first member? (means "youngest" list entry)
@ -981,7 +979,7 @@ void pos_complete(struct trans_logger_mref_aspect *orig_mref_a)
}
log_input->inf.inf_min_pos = finished;
get_lamport(&log_input->inf.inf_min_pos_stamp);
do_callback = true;
_inf_callback(log_input, false);
} else {
struct trans_logger_mref_aspect *prev_mref_a;
prev_mref_a = container_of(tmp->prev, struct trans_logger_mref_aspect, pos_head);
@ -994,13 +992,11 @@ void pos_complete(struct trans_logger_mref_aspect *orig_mref_a)
prev_mref_a->log_pos = finished;
}
}
list_del_init(tmp);
atomic_dec(&brick->pos_count);
traced_unlock(&log_input->pos_lock, flags);
if (do_callback) {
_inf_callback(log_input, false);
}
up(&log_input->inf_mutex);
err:;
}
@ -1301,11 +1297,13 @@ void _fire_one(struct list_head *tmp, bool do_update)
MARS_ERR("internal problem\n");
} else {
loff_t max_pos = orig_mref_a->log_pos;
down(&log_input->inf_mutex);
if (log_input->inf.inf_max_pos < max_pos) {
log_input->inf.inf_max_pos = max_pos;
get_lamport(&log_input->inf.inf_max_pos_stamp);
_inf_callback(log_input, false);
}
up(&log_input->inf_mutex);
}
}
@ -1448,7 +1446,6 @@ bool phase0_startio(struct trans_logger_mref_aspect *orig_mref_a)
struct log_status *logst;
loff_t log_pos;
void *data;
unsigned long flags;
bool ok;
CHECK_PTR(orig_mref_a, err);
@ -1485,11 +1482,11 @@ bool phase0_startio(struct trans_logger_mref_aspect *orig_mref_a)
orig_mref_a->log_pos = log_pos;
// update new log_pos in the symlinks
down(&input->inf_mutex);
input->inf.inf_log_pos = log_pos;
memcpy(&input->inf.inf_log_pos_stamp, &logst->log_pos_stamp, sizeof(input->inf.inf_log_pos_stamp));
_inf_callback(input, false);
traced_lock(&input->pos_lock, flags);
#ifdef CONFIG_MARS_DEBUG
if (!list_empty(&input->pos_list)) {
struct trans_logger_mref_aspect *last_mref_a;
@ -1501,7 +1498,7 @@ bool phase0_startio(struct trans_logger_mref_aspect *orig_mref_a)
#endif
list_add_tail(&orig_mref_a->pos_head, &input->pos_list);
atomic_inc(&brick->pos_count);
traced_unlock(&input->pos_lock, flags);
up(&input->inf_mutex);
qq_inc_flying(&brick->q_phase[0]);
return true;
@ -2294,6 +2291,8 @@ void _init_inputs(struct trans_logger_brick *brick, bool is_first)
goto done;
}
down(&input->inf_mutex);
_init_input(input);
input->inf.inf_is_writeback = is_first;
input->inf.inf_is_applying = false;
@ -2302,6 +2301,8 @@ void _init_inputs(struct trans_logger_brick *brick, bool is_first)
brick->log_input_nr = nr;
MARS_INF("switching over to new logfile %d (old = %d) startpos = %lld\n", nr, brick->old_input_nr, input->log_start_pos);
_inf_callback(input, true);
up(&input->inf_mutex);
done: ;
}
@ -2349,6 +2350,7 @@ void _exit_inputs(struct trans_logger_brick *brick, bool force)
MARS_DBG("cleaning up input %d (log = %d old = %d), old_writeback = %d old_applying = %d old_logging = %d\n", i, brick->log_input_nr, brick->old_input_nr, old_writeback, old_applying, old_writeback);
exit_logst(logst);
// no locking here: we should be the only thread doing this.
_inf_callback(input, true);
brick_string_free(input->inf.inf_host);
input->inf.inf_host = NULL;
@ -2358,11 +2360,13 @@ void _exit_inputs(struct trans_logger_brick *brick, bool force)
input->is_operating = false;
if (i == brick->old_input_nr && i != brick->log_input_nr) {
struct trans_logger_input *other_input = brick->inputs[brick->log_input_nr];
down(&other_input->inf_mutex);
brick->old_input_nr = brick->log_input_nr;
other_input->inf.inf_is_writeback = old_writeback;
other_input->inf.inf_is_applying = old_applying;
other_input->inf.inf_is_logging = old_logging;
_inf_callback(other_input, true);
up(&other_input->inf_mutex);
}
}
}
@ -2441,7 +2445,9 @@ void trans_logger_log(struct trans_logger_brick *brick)
old_jiffies = jiffies;
for (i = TL_INPUT_LOG1; i <= TL_INPUT_LOG2; i++) {
struct trans_logger_input *input = brick->inputs[i];
down(&input->inf_mutex);
_inf_callback(input, false);
up(&input->inf_mutex);
}
}
@ -2718,10 +2724,12 @@ void trans_logger_replay(struct trans_logger_brick *brick)
// do this _after_ any opportunities for errors...
if (atomic_read(&brick->replay_count) <= 0 ||
((long long)jiffies) - old_jiffies >= HZ) {
down(&input->inf_mutex);
input->inf.inf_min_pos = finished_pos;
get_lamport(&input->inf.inf_min_pos_stamp);
old_jiffies = jiffies;
_inf_callback(input, false);
up(&input->inf_mutex);
}
_exit_inputs(brick, false);
}
@ -3065,8 +3073,8 @@ int trans_logger_output_construct(struct trans_logger_output *output)
static noinline
int trans_logger_input_construct(struct trans_logger_input *input)
{
spin_lock_init(&input->pos_lock);
INIT_LIST_HEAD(&input->pos_list);
sema_init(&input->inf_mutex, 1);
return 0;
}

View File

@ -208,7 +208,7 @@ struct trans_logger_output {
};
struct trans_logger_info {
// to be maintained from outside
// to be maintained / initialized from outside
void (*inf_callback)(struct trans_logger_info *inf);
void *inf_private;
char *inf_host;
@ -238,9 +238,9 @@ struct trans_logger_input {
// private
struct log_status logst;
spinlock_t pos_lock;
struct list_head pos_list;
long long inf_last_jiffies;
struct semaphore inf_mutex;
};
MARS_TYPES(trans_logger);