main: allow versionlink recovery for secondaries

This commit is contained in:
Thomas Schoebel-Theuer 2020-04-10 14:07:51 +02:00
parent 86d9774f1f
commit 00474b1307

View File

@ -690,6 +690,7 @@ struct mars_rotate {
struct mutex inf_mutex;
bool infs_is_dirty[MAX_INFOS];
struct trans_logger_info infs[MAX_INFOS];
struct trans_logger_info current_inf;
struct key_value_pair msgs[sizeof(rot_keys) / sizeof(char*)];
};
@ -1425,8 +1426,8 @@ done:;
static
void write_info_links(struct mars_rotate *rot)
{
struct trans_logger_info inf;
int count = 0;
for (;;) {
int hash = -1;
int min = 0;
@ -1448,20 +1449,21 @@ void write_info_links(struct mars_rotate *rot)
}
rot->infs_is_dirty[hash] = false;
memcpy(&inf, &rot->infs[hash], sizeof(struct trans_logger_info));
memcpy(&rot->current_inf, &rot->infs[hash],
sizeof(struct trans_logger_info));
mutex_unlock(&rot->inf_mutex);
MARS_DBG("seq = %d min_pos = %lld max_pos = %lld log_pos = %lld is_replaying = %d is_logging = %d\n",
inf.inf_sequence,
inf.inf_min_pos,
inf.inf_max_pos,
inf.inf_log_pos,
inf.inf_is_replaying,
inf.inf_is_logging);
rot->current_inf.inf_sequence,
rot->current_inf.inf_min_pos,
rot->current_inf.inf_max_pos,
rot->current_inf.inf_log_pos,
rot->current_inf.inf_is_replaying,
rot->current_inf.inf_is_logging);
if (inf.inf_is_logging || inf.inf_is_replaying) {
count += _update_replay_link(rot, &inf);
count += _update_version_link(rot, &inf);
if (rot->current_inf.inf_is_logging | rot->current_inf.inf_is_replaying) {
count += _update_replay_link(rot, &rot->current_inf);
count += _update_version_link(rot, &rot->current_inf);
if (min > rot->inf_old_sequence) {
mars_sync();
rot->inf_old_sequence = min;
@ -1469,7 +1471,7 @@ void write_info_links(struct mars_rotate *rot)
}
}
if (count) {
if (inf.inf_min_pos == inf.inf_max_pos)
if (rot->current_inf.inf_min_pos == rot->current_inf.inf_max_pos)
mars_trigger();
if (rot->todo_primary | rot->is_primary | rot->old_is_primary)
mars_remote_trigger();
@ -1477,7 +1479,9 @@ void write_info_links(struct mars_rotate *rot)
}
static
void _recover_versionlink(struct mars_rotate *rot, int sequence, loff_t end_pos)
void _recover_versionlink(struct mars_rotate *rot,
const char *host,
int sequence, loff_t end_pos)
{
struct trans_logger_info inf = {
.inf_private = rot,
@ -1487,7 +1491,7 @@ void _recover_versionlink(struct mars_rotate *rot, int sequence, loff_t end_pos)
.inf_log_pos = end_pos,
.inf_is_replaying = false,
};
strncpy(inf.inf_host, my_id(), sizeof(inf.inf_host));
strncpy(inf.inf_host, host, sizeof(inf.inf_host));
MARS_DBG("sequence = %d end_pos = %lld\n",
sequence, end_pos);
@ -3983,7 +3987,7 @@ int _make_logging_status(struct mars_rotate *rot)
rot->recover_versionlink = false;
if (rot->aio_dent && rot->aio_dent->d_rest &&
!strcmp(rot->aio_dent->d_rest, my_id()))
_recover_versionlink(rot, log_nr, end_pos);
_recover_versionlink(rot, my_id(), log_nr, end_pos);
}
_make_new_replaylink(rot, my_id(), log_nr + 1, 0);
} else {