main: use the logger epoch

This commit is contained in:
Thomas Schoebel-Theuer 2020-09-07 11:40:14 +02:00
parent e0a046e93f
commit 5dd5ea462c

View File

@ -852,6 +852,7 @@ struct mars_rotate {
struct mars_dent *next_log;
struct mars_dent *syncstatus_dent;
struct lamport_time sync_finish_stamp;
struct lamport_time prosumer_epoch;
struct client_brick *prosumer_brick;
struct client_brick *prosumer_new;
struct client_brick *prosumer_old;
@ -4695,8 +4696,30 @@ int make_log_init(struct mars_dent *dent)
parent_path,
my_id());
mutex_unlock(&server_connect_lock);
if (!rot->trans_brick && trans_brick)
/* Recovery after a crash:
* upon first instantiation, read the old persisted epoch stamp
*/
if (!rot->trans_brick && trans_brick) {
struct trans_logger_brick *tra = (void *)trans_brick;
const char *epoch_path;
const char *epoch_str;
epoch_path = path_make("%s/actual-%s/state-logger",
parent_path,
my_id());
epoch_str = ordered_readlink(epoch_path, NULL);
sscanf(epoch_str,
#ifdef CURRENT_TIME
"%lld.%ld",
#else
"%lld.%ld",
#endif
&tra->logger_epoch.tv_sec,
&tra->logger_epoch.tv_nsec);
brick_string_free(epoch_path);
brick_string_free(epoch_str);
clear_vals(rot->msgs);
}
rot->trans_brick = (void*)trans_brick;
status = -ENOENT;
if (!trans_brick) {
@ -5487,6 +5510,14 @@ int _start_trans(struct mars_rotate *rot)
MARS_ERR("initial connect failed\n");
goto done;
}
/* Reset the epoch stamp when necessary */
if (!trans_brick->logger_epoch.tv_sec ||
(trans_brick->replay_mode &&
(!rot->relevant_log->d_rest ||
strcmp(rot->relevant_log->d_rest, my_id()) != 0))) {
MARS_DBG("reset logger_epoch '%s'\n", rot->parent_path);
get_lamport(NULL, &trans_brick->logger_epoch);
}
_change_trans(rot);
@ -5562,6 +5593,30 @@ bool _is_secondary_fixing_safe(struct mars_rotate *rot)
return true;
}
static
void __reset_all_epoch(struct mars_rotate *rot)
{
const char *prosumer_epoch_path;
struct kstat old_epoch = {};
get_lamport(NULL, &rot->trans_brick->logger_epoch);
/* prevent conflicts with ongoing prosumer handover */
if (rot->old_inhibit_mask ||
(rot->gate_brick && rot->gate_brick->inhibit_mask))
return;
prosumer_epoch_path = path_make("%s/actual-%s/prosumer-epoch",
rot->parent_path,
my_id());
/* race avoidance: only add small epsilon */
mars_stat(prosumer_epoch_path, &old_epoch, true);
if (old_epoch.mtime.tv_sec)
lamport_time_add_ns(&old_epoch.mtime, 1);
ordered_symlink("(none)", prosumer_epoch_path, &old_epoch.mtime);
brick_string_free(prosumer_epoch_path);
}
static
int make_log_finalize(struct mars_dent *dent)
{
@ -5704,6 +5759,8 @@ int make_log_finalize(struct mars_dent *dent)
trans_brick->replay_end_pos - trans_brick->replay_current_pos);
rot->replay_code = trans_brick->replay_code;
rot->log_is_really_damaged = true;
/* For safety, reset both epoch stamps */
__reset_all_epoch(rot);
/* Exception: set actual position for recovery */
_recover_versionlink(rot,
rot->current_inf.inf_host,
@ -6703,6 +6760,7 @@ int make_dev(struct mars_dent *dent)
goto done;
}
rot->if_brick->shutdown = rot->kill_device;
rot->if_brick->open_epoch = &rot->prosumer_epoch;
dev_brick->killme = true;
dev_brick->kill_ptr = (void**)&rot->if_brick;
dev_brick->rewire = true;