main: fix logrotate link creation races

This commit is contained in:
Thomas Schoebel-Theuer 2020-06-28 18:00:44 +02:00
parent 94fbcb1c8f
commit 34f3dec11d
2 changed files with 85 additions and 47 deletions

View File

@ -260,6 +260,7 @@ struct trans_logger_info {
void (*inf_callback)(struct trans_logger_info *inf);
void *inf_private;
char inf_host[MAX_HOST_LEN];
int inf_index;
int inf_sequence; // logfile sequence number
// maintained by trans_logger

View File

@ -614,7 +614,7 @@ enum {
// needed for logfile rotation
#define MAX_INFOS 4
#define INFS_MAX (TL_INPUT_LOG2 - TL_INPUT_LOG1 + 1)
struct mars_rotate {
struct list_head rot_head;
@ -688,8 +688,8 @@ struct mars_rotate {
bool has_emergency;
bool log_is_really_damaged;
struct mutex inf_mutex;
bool infs_is_dirty[MAX_INFOS];
struct trans_logger_info infs[MAX_INFOS];
bool infs_is_dirty[INFS_MAX];
struct trans_logger_info infs[INFS_MAX];
struct trans_logger_info current_inf;
struct key_value_pair msgs[sizeof(rot_keys) / sizeof(char*)];
};
@ -1404,11 +1404,22 @@ void _update_info(struct trans_logger_info *inf)
inf->inf_is_replaying,
inf->inf_is_logging);
hash = inf->inf_sequence % MAX_INFOS;
if (unlikely(rot->infs_is_dirty[hash])) {
if (unlikely(rot->infs[hash].inf_sequence != inf->inf_sequence)) {
MARS_ERR_TO(rot->log_say, "buffer %d: sequence trash %d -> %d. is the mars_main thread hanging?\n", hash, rot->infs[hash].inf_sequence, inf->inf_sequence);
make_rot_msg(rot, "err-sequence-trash", "buffer %d: sequence trash %d -> %d", hash, rot->infs[hash].inf_sequence, inf->inf_sequence);
hash = inf->inf_index - TL_INPUT_LOG1;
if (unlikely(hash < 0 || hash >= INFS_MAX)) {
MARS_ERR_TO(rot->log_say,
"bad inf_index=%d hash=%d\n",
inf->inf_index, hash);
goto done;
}
if (rot->infs_is_dirty[hash]) {
/* The logger thread is updating faster than the main thread
* can deal with. This may happen.
*/
if (rot->infs[hash].inf_sequence != inf->inf_sequence) {
MARS_DBG("buffer %d: sequence trash %d -> %d\n",
hash,
rot->infs[hash].inf_sequence,
inf->inf_sequence);
} else {
MARS_DBG("buffer %d is overwritten (sequence=%d)\n", hash, inf->inf_sequence);
}
@ -1427,47 +1438,59 @@ static
void write_info_links(struct mars_rotate *rot)
{
int count = 0;
int hash = -1;
int min = 0;
int i;
for (;;) {
int hash = -1;
int min = 0;
int i;
if (unlikely(!rot->trans_brick))
return;
mutex_lock(&rot->inf_mutex);
for (i = 0; i < MAX_INFOS; i++) {
if (!rot->infs_is_dirty[i])
continue;
if (!min || min > rot->infs[i].inf_sequence) {
min = rot->infs[i].inf_sequence;
hash = i;
}
mutex_lock(&rot->inf_mutex);
/* Only update the lowest log number, even if multiple
* logfiles are written in parallel during logrotate.
* Otherwise we would get nasty races, since their update
* speed may be different.
*/
for (i = TL_INPUT_LOG1; i <= TL_INPUT_LOG2; i++) {
struct trans_logger_input *trans_input;
int inf_nr;
trans_input = rot->trans_brick->inputs[i];
if (!trans_input ||
!trans_input->connect)
continue;
inf_nr = i - TL_INPUT_LOG1;
if (!min || rot->infs[inf_nr].inf_sequence < min) {
min = rot->infs[inf_nr].inf_sequence;
hash = inf_nr;
}
}
if (hash < 0) {
mutex_unlock(&rot->inf_mutex);
break;
}
rot->infs_is_dirty[hash] = false;
memcpy(&rot->current_inf, &rot->infs[hash],
sizeof(struct trans_logger_info));
if (hash < 0 || !rot->infs_is_dirty[hash]) {
mutex_unlock(&rot->inf_mutex);
MARS_DBG("seq = %d min_pos = %lld max_pos = %lld log_pos = %lld is_replaying = %d is_logging = %d\n",
rot->current_inf.inf_sequence,
rot->current_inf.inf_min_pos,
rot->current_inf.inf_max_pos,
rot->current_inf.inf_log_pos,
rot->current_inf.inf_is_replaying,
rot->current_inf.inf_is_logging);
if (rot->current_inf.inf_is_logging | rot->current_inf.inf_is_replaying) {
count += _update_replay_link(rot, &rot->current_inf);
count += _update_version_link(rot, &rot->current_inf);
if (min > rot->inf_old_sequence) {
mars_sync();
rot->inf_old_sequence = min;
}
MARS_DBG("hash=%d\n", hash);
return;
}
rot->infs_is_dirty[hash] = false;
memcpy(&rot->current_inf, &rot->infs[hash],
sizeof(struct trans_logger_info));
mutex_unlock(&rot->inf_mutex);
MARS_DBG("seq=%d min_pos=%lld max_pos=%lld log_pos=%lld is_replaying=%d is_logging=%d\n",
rot->current_inf.inf_sequence,
rot->current_inf.inf_min_pos,
rot->current_inf.inf_max_pos,
rot->current_inf.inf_log_pos,
rot->current_inf.inf_is_replaying,
rot->current_inf.inf_is_logging);
if (rot->current_inf.inf_is_logging | rot->current_inf.inf_is_replaying) {
count += _update_replay_link(rot, &rot->current_inf);
count += _update_version_link(rot, &rot->current_inf);
if (min > rot->inf_old_sequence) {
mars_sync();
rot->inf_old_sequence = min;
}
}
if (count) {
@ -4030,7 +4053,10 @@ done:
}
static
void _init_trans_input(struct trans_logger_input *trans_input, struct mars_dent *log_dent, struct mars_rotate *rot)
void _init_trans_input(struct trans_logger_input *trans_input,
struct mars_dent *log_dent,
int input_nr,
struct mars_rotate *rot)
{
if (unlikely(trans_input->connect || trans_input->is_operating)) {
MARS_ERR("this should not happen\n");
@ -4040,6 +4066,7 @@ void _init_trans_input(struct trans_logger_input *trans_input, struct mars_dent
memset(&trans_input->inf, 0, sizeof(trans_input->inf));
strncpy(trans_input->inf.inf_host, log_dent->d_rest, sizeof(trans_input->inf.inf_host));
trans_input->inf.inf_index = input_nr;
trans_input->inf.inf_sequence = log_dent->d_serial;
trans_input->inf.inf_private = rot;
trans_input->inf.inf_callback = _update_info;
@ -4088,11 +4115,16 @@ void _rotate_trans(struct mars_rotate *rot)
list_empty(&trans_input->pos_list) &&
atomic_read(&trans_input->log_ref_count) <= 0) {
int status;
write_info_links(rot);
MARS_INF("cleanup old transaction log (%d -> %d)\n", old_nr, log_nr);
status = mars_disconnect((void*)trans_input);
if (unlikely(status < 0)) {
MARS_ERR("disconnect failed\n");
} else {
/* Once again: now the other input should be active */
write_info_links(rot);
mars_trigger();
mars_remote_trigger();
}
} else {
@ -4136,7 +4168,10 @@ void _rotate_trans(struct mars_rotate *rot)
goto done;
}
_init_trans_input(trans_input, rot->next_relevant_log, rot);
_init_trans_input(trans_input,
rot->next_relevant_log,
next_nr,
rot);
status = mars_connect((void *)trans_input, rot->next_relevant_brick->outputs[0]);
if (unlikely(status < 0)) {
@ -4258,7 +4293,7 @@ int _start_trans(struct mars_rotate *rot)
*/
trans_brick->replay_mode = rot->replay_mode;
trans_brick->replay_tolerance = REPLAY_TOLERANCE;
_init_trans_input(trans_input, rot->relevant_log, rot);
_init_trans_input(trans_input, rot->relevant_log, nr, rot);
rot->replay_code = TL_REPLAY_RUNNING;
/* Connect to new transaction log
@ -4303,6 +4338,8 @@ int _stop_trans(struct mars_rotate *rot)
*/
if (trans_brick->power.led_off) {
int i;
write_info_links(rot);
for (i = TL_INPUT_LOG1; i <= TL_INPUT_LOG2; i++) {
struct trans_logger_input *trans_input;
trans_input = trans_brick->inputs[i];