light: report timestamp of replay in actual-*

This commit is contained in:
Thomas Schoebel-Theuer 2012-09-27 14:46:07 +02:00 committed by Thomas Schoebel-Theuer
parent fadaacef15
commit 9016c81057
3 changed files with 35 additions and 0 deletions

View File

@ -875,6 +875,7 @@ void pos_complete(struct trans_logger_mref_aspect *orig_mref_a)
MARS_ERR("backskip in log replay: %lld -> %lld\n", log_input->replay_min_pos, orig_mref_a->log_pos); MARS_ERR("backskip in log replay: %lld -> %lld\n", log_input->replay_min_pos, orig_mref_a->log_pos);
} }
log_input->replay_min_pos = finished; log_input->replay_min_pos = finished;
memcpy(&log_input->last_stamp, &orig_mref_a->stamp, sizeof(log_input->last_stamp));
} }
} else { } else {
struct trans_logger_mref_aspect *prev_mref_a; struct trans_logger_mref_aspect *prev_mref_a;
@ -2024,6 +2025,7 @@ void _init_input(struct trans_logger_input *input)
input->replay_min_pos = start_pos; input->replay_min_pos = start_pos;
input->replay_max_pos = start_pos; // FIXME: Theoretically, this could be wrong when starting on an interrupted replay / inconsistent system. However, we normally never start ordinary logging in such a case (possibly except some desperate emergency cases when there really is no other chance, such as physical loss of transaction logs). Nevertheless, better use old consistenty information from the FS here. input->replay_max_pos = start_pos; // FIXME: Theoretically, this could be wrong when starting on an interrupted replay / inconsistent system. However, we normally never start ordinary logging in such a case (possibly except some desperate emergency cases when there really is no other chance, such as physical loss of transaction logs). Nevertheless, better use old consistenty information from the FS here.
logst->log_pos = start_pos; logst->log_pos = start_pos;
memset(&input->last_stamp, 0, sizeof(input->last_stamp));
input->is_operating = true; input->is_operating = true;
} }
@ -2356,6 +2358,7 @@ void trans_logger_replay(struct trans_logger_brick *brick)
input->replay_min_pos = start_pos; input->replay_min_pos = start_pos;
input->replay_max_pos = start_pos; // FIXME: this is wrong. input->replay_max_pos = start_pos; // FIXME: this is wrong.
memset(&input->last_stamp, 0, sizeof(input->last_stamp));
mars_power_led_on((void*)brick, true); mars_power_led_on((void*)brick, true);
@ -2424,6 +2427,7 @@ void trans_logger_replay(struct trans_logger_brick *brick)
if (atomic_read(&brick->replay_count) <= 0 || ((long long)jiffies) - old_jiffies >= HZ * 5) { if (atomic_read(&brick->replay_count) <= 0 || ((long long)jiffies) - old_jiffies >= HZ * 5) {
input->replay_min_pos = finished_pos; input->replay_min_pos = finished_pos;
input->replay_max_pos = finished_pos; // FIXME input->replay_max_pos = finished_pos; // FIXME
memcpy(&input->last_stamp, &lh.l_stamp, sizeof(input->last_stamp));
old_jiffies = jiffies; old_jiffies = jiffies;
} }
_exit_inputs(brick, false); _exit_inputs(brick, false);

View File

@ -194,6 +194,7 @@ struct trans_logger_input {
long long last_jiffies; long long last_jiffies;
loff_t replay_min_pos; // current replay position (both in replay mode and in logging mode) loff_t replay_min_pos; // current replay position (both in replay mode and in logging mode)
loff_t replay_max_pos; // dito, indicating the "dirty" area which could be potentially "inconsistent" loff_t replay_max_pos; // dito, indicating the "dirty" area which could be potentially "inconsistent"
struct timespec last_stamp;
// private // private
struct log_status logst; struct log_status logst;

View File

@ -1505,6 +1505,34 @@ out:
return status; return status;
} }
static
int _update_timelink(struct mars_global *global, const char *parent_path, const char *host, int sequence, loff_t start_pos, loff_t end_pos, struct timespec *stamp)
{
struct timespec now = {};
char *new = NULL;
char *old = NULL;
int status = -ENOMEM;
old = path_make("%s,%09d,%lld,%lld,%lu.%09lu", host, sequence, start_pos, end_pos - start_pos, stamp->tv_sec, stamp->tv_nsec);
new = path_make("%s/actual-%s/timestamp", parent_path, my_id());
if (unlikely(!old || !new)) {
goto out;
}
get_lamport(&now);
status = mars_symlink(old, new, &now, 0);
if (status < 0) {
MARS_ERR("cannot create symlink '%s' -> '%s' status = %d\n", old, new, status);
} else {
MARS_DBG("make version symlink '%s' -> '%s' status = %d\n", old, new, status);
}
out:
brick_string_free(new);
brick_string_free(old);
return status;
}
static static
int __update_all_links(struct mars_global *global, const char *parent_path, struct trans_logger_brick *trans_brick, const char *override_host, int override_sequence, bool check_exist, bool force, bool reset_pos, int nr) int __update_all_links(struct mars_global *global, const char *parent_path, struct trans_logger_brick *trans_brick, const char *override_host, int override_sequence, bool check_exist, bool force, bool reset_pos, int nr)
{ {
@ -1552,6 +1580,7 @@ int __update_all_links(struct mars_global *global, const char *parent_path, stru
status = _update_replaylink(global, parent_path, host, sequence, min_pos, max_pos, check_exist); status = _update_replaylink(global, parent_path, host, sequence, min_pos, max_pos, check_exist);
status |= _update_versionlink(global, parent_path, host, sequence, max_pos, max_pos); status |= _update_versionlink(global, parent_path, host, sequence, max_pos, max_pos);
status |= _update_timelink(global, parent_path, host, sequence, min_pos, max_pos, &trans_input->last_stamp);
if (!status) if (!status)
trans_input->last_jiffies = jiffies; trans_input->last_jiffies = jiffies;
done: done:
@ -2133,6 +2162,7 @@ void _init_trans_input(struct trans_logger_input *trans_input, struct mars_dent
trans_input->replay_min_pos = 0; trans_input->replay_min_pos = 0;
trans_input->replay_max_pos = 0; trans_input->replay_max_pos = 0;
trans_input->log_start_pos = 0; trans_input->log_start_pos = 0;
memset(&trans_input->last_stamp, 0, sizeof(trans_input->last_stamp));
trans_input->is_prepared = true; trans_input->is_prepared = true;
MARS_DBG("initialized '%s' %d\n", trans_input->inf_host, trans_input->inf_sequence); MARS_DBG("initialized '%s' %d\n", trans_input->inf_host, trans_input->inf_sequence);
} }