light: offload trans_logger from symlink update (deadlock prevention)

This commit is contained in:
Thomas Schoebel-Theuer 2013-01-07 17:43:14 +01:00
parent a036a01188
commit 7cc5c12436
3 changed files with 81 additions and 23 deletions

View File

@ -2448,8 +2448,6 @@ void _exit_inputs(struct trans_logger_brick *brick, bool force)
// no locking here: we should be the only thread doing this.
_inf_callback(input, true);
input->inf_last_jiffies = 0;
brick_string_free(input->inf.inf_host);
input->inf.inf_host = NULL;
input->inf.inf_is_applying = false;
input->inf.inf_is_logging = false;
input->is_operating = false;
@ -3199,8 +3197,6 @@ static noinline
int trans_logger_input_destruct(struct trans_logger_input *input)
{
CHECK_HEAD_EMPTY(&input->pos_list);
brick_string_free(input->inf.inf_host);
input->inf.inf_host = NULL;
return 0;
}

View File

@ -206,11 +206,13 @@ struct trans_logger_output {
MARS_OUTPUT(trans_logger);
};
#define MAX_HOST_LEN 32
struct trans_logger_info {
// to be maintained / initialized from outside
void (*inf_callback)(struct trans_logger_info *inf);
void *inf_private;
char *inf_host;
char inf_host[MAX_HOST_LEN];
int inf_sequence; // logfile sequence number
// maintained by trans_logger

View File

@ -71,6 +71,8 @@ struct light_class {
// needed for logfile rotation
#define MAX_INFOS 4
struct mars_rotate {
struct mars_global *global;
struct copy_brick *sync_brick;
@ -95,7 +97,6 @@ struct mars_rotate {
struct mars_limiter replay_limiter;
struct mars_limiter sync_limiter;
struct mars_limiter file_limiter;
struct semaphore inf_mutex;
int inf_prev_sequence;
long long flip_start;
loff_t dev_size;
@ -116,6 +117,9 @@ struct mars_rotate {
bool is_primary;
bool old_is_primary;
bool copy_is_done;
spinlock_t inf_lock;
bool infs_is_dirty[MAX_INFOS];
struct trans_logger_info infs[MAX_INFOS];
};
///////////////////////////////////////////////////////////////////////
@ -603,13 +607,14 @@ static
void _update_info(struct trans_logger_info *inf)
{
struct mars_rotate *rot = inf->inf_private;
int hash;
unsigned long flags;
if (unlikely(!rot)) {
MARS_ERR("rot is NULL\n");
goto done;
}
down(&rot->inf_mutex);
MARS_DBG("inf = %p '%s' seq = %d min_pos = %lld max_pos = %lld log_pos = %lld is_applying = %d is_logging = %d\n",
inf,
SAFE_STR(inf->inf_host),
@ -620,40 +625,93 @@ void _update_info(struct trans_logger_info *inf)
inf->inf_is_applying,
inf->inf_is_logging);
if (inf->inf_is_logging || inf->inf_is_applying) {
_update_replay_link(rot, inf);
}
if (inf->inf_is_logging || inf->inf_is_applying) {
_update_version_link(rot, inf);
hash = inf->inf_sequence % MAX_INFOS;
if (unlikely(rot->infs_is_dirty[hash])) {
if (unlikely(rot->infs[hash].inf_sequence != inf->inf_sequence)) {
MARS_ERR("buffer %d: sequence trash %d -> %d. is the mar_light thread hanging?\n", hash, rot->infs[hash].inf_sequence, inf->inf_sequence);
} else {
MARS_DBG("buffer %d is overwritten (sequence=%d)\n", hash, inf->inf_sequence);
}
}
up(&rot->inf_mutex);
traced_lock(&rot->inf_lock, flags);
memcpy(&rot->infs[hash], inf, sizeof(struct trans_logger_info));
rot->infs_is_dirty[hash] = true;
traced_unlock(&rot->inf_lock, flags);
mars_trigger();
done:;
}
static
void write_info_links(struct mars_rotate *rot)
{
struct trans_logger_info inf;
int count = 0;
for (;;) {
unsigned long flags;
int hash = -1;
int min = 0;
int i;
traced_lock(&rot->inf_lock, flags);
for (i = 0; i < MAX_INFOS; i++) {
if (!rot->infs_is_dirty[i])
continue;
if (!min || min > rot->infs[i].inf_sequence) {
min = rot->infs[i].inf_sequence;
hash = i;
}
}
if (hash < 0) {
traced_unlock(&rot->inf_lock, flags);
break;
}
rot->infs_is_dirty[hash] = false;
memcpy(&inf, &rot->infs[hash], sizeof(struct trans_logger_info));
traced_unlock(&rot->inf_lock, flags);
MARS_DBG("seq = %d min_pos = %lld max_pos = %lld log_pos = %lld is_applying = %d is_logging = %d\n",
inf.inf_sequence,
inf.inf_min_pos,
inf.inf_max_pos,
inf.inf_log_pos,
inf.inf_is_applying,
inf.inf_is_logging);
if (inf.inf_is_logging || inf.inf_is_applying) {
_update_replay_link(rot, &inf);
count++;
}
if (inf.inf_is_logging || inf.inf_is_applying) {
_update_version_link(rot, &inf);
count++;
}
}
if (count)
mars_remote_trigger();
}
static
void _make_new_replaylink(struct mars_rotate *rot, char *new_host, int new_sequence, loff_t end_pos)
{
struct trans_logger_info inf = {
.inf_private = rot,
.inf_host = new_host,
.inf_sequence = new_sequence,
.inf_min_pos = 0,
.inf_max_pos = 0,
.inf_log_pos = end_pos,
.inf_is_applying = true,
};
down(&rot->inf_mutex);
strncpy(inf.inf_host, new_host, sizeof(inf.inf_host));
MARS_DBG("new_host = '%s' new_sequence = %d end_pos = %lld\n", new_host, new_sequence, end_pos);
_update_replay_link(rot, &inf);
_update_version_link(rot, &inf);
up(&rot->inf_mutex);
#ifdef CONFIG_MARS_FAST_TRIGGER
mars_trigger();
mars_remote_trigger();
@ -1692,6 +1750,7 @@ void rot_destruct(void *_rot)
{
struct mars_rotate *rot = _rot;
if (likely(rot)) {
write_info_links(rot);
brick_string_free(rot->copy_path);
brick_string_free(rot->parent_path);
rot->copy_path = NULL;
@ -1734,7 +1793,7 @@ int make_log_init(void *buf, struct mars_dent *dent)
status = -ENOMEM;
goto done;
}
sema_init(&rot->inf_mutex, 1);
spin_lock_init(&rot->inf_lock);
copy_path = path_make("%s/logfile-update", parent_path);
if (unlikely(!copy_path)) {
MARS_ERR("cannot create copy_path\n");
@ -1766,6 +1825,8 @@ int make_log_init(void *buf, struct mars_dent *dent)
if (!rot->parent_path)
rot->parent_path =brick_strdup(parent_path);
write_info_links(rot);
mars_remaining_space(parent_path, &rot->total_space, &rot->remaining_space);
/* Fetch the replay status symlink.
@ -1796,7 +1857,7 @@ int make_log_init(void *buf, struct mars_dent *dent)
*/
if (rot->trans_brick) {
struct trans_logger_input *trans_input = rot->trans_brick->inputs[rot->trans_brick->old_input_nr];
if (trans_input && trans_input->is_operating && trans_input->inf.inf_host) {
if (trans_input && trans_input->is_operating) {
aio_path = path_make("%s/log-%09d-%s", parent_path, trans_input->inf.inf_sequence, trans_input->inf.inf_host);
MARS_DBG("using logfile '%s' from trans_input %d (new=%d)\n", SAFE_STR(aio_path), rot->trans_brick->old_input_nr, rot->trans_brick->log_input_nr);
}
@ -2180,11 +2241,10 @@ void _init_trans_input(struct trans_logger_input *trans_input, struct mars_dent
MARS_ERR("this should not happen\n");
return;
}
brick_string_free(trans_input->inf.inf_host); // just for safety
memset(&trans_input->inf, 0, sizeof(trans_input->inf));
trans_input->inf.inf_host = brick_strdup(log_dent->d_rest);
strncpy(trans_input->inf.inf_host, log_dent->d_rest, sizeof(trans_input->inf.inf_host));
trans_input->inf.inf_sequence = log_dent->d_serial;
trans_input->inf.inf_private = rot;
trans_input->inf.inf_callback = _update_info;