light: use replay_tolerance only after failed replay attempt

This commit is contained in:
Thomas Schoebel-Theuer 2013-07-08 07:23:03 +02:00
parent f38c56d5ab
commit ad08afe074
3 changed files with 22 additions and 3 deletions

View File

@ -2951,6 +2951,7 @@ void trans_logger_replay(struct trans_logger_brick *brick)
if (finished_pos >= 0) {
input->inf.inf_min_pos = finished_pos;
brick->replay_current_pos = finished_pos;
}
get_lamport(&input->inf.inf_min_pos_stamp);
@ -2958,6 +2959,9 @@ void trans_logger_replay(struct trans_logger_brick *brick)
if (status >= 0 && finished_pos == brick->replay_end_pos) {
MARS_INF("replay finished at %lld\n", finished_pos);
brick->replay_code = 1;
} else if (status == -EAGAIN && finished_pos + brick->replay_tolerance > brick->replay_end_pos) {
MARS_INF("TOLERANCE: logfile is DAMAGED at %lld (of %lld)\n", finished_pos, brick->replay_end_pos);
brick->replay_code = status;
} else {
MARS_INF("replay stopped prematurely at %lld (of %lld)\n", finished_pos, brick->replay_end_pos);
brick->replay_code = 2;

View File

@ -152,6 +152,7 @@ struct trans_logger_brick {
int new_input_nr; // whereto we should switchover ASAP
int replay_tolerance; // how many bytes to ignore at truncated logfiles
// readonly from outside
loff_t replay_current_pos; // end of replay
int log_input_nr; // where we are currently logging to
int old_input_nr; // where old IO requests may be on the fly
int replay_code; // replay errors (if any)

View File

@ -274,6 +274,7 @@ struct mars_rotate {
bool old_is_primary;
bool copy_is_done;
bool created_hole;
bool is_log_damaged;
spinlock_t inf_lock;
bool infs_is_dirty[MAX_INFOS];
struct trans_logger_info infs[MAX_INFOS];
@ -1960,6 +1961,14 @@ const char *get_versionlink(const char *parent_path, int seq, const char *host,
return mars_readlink(_linkpath);
}
static inline
int _get_tolerance(struct mars_rotate *rot)
{
if (rot->is_log_damaged)
return REPLAY_TOLERANCE;
return 0;
}
static
bool is_switchover_possible(struct mars_rotate *rot, const char *old_log_path, const char *new_log_path, int replay_tolerance, bool skip_new)
{
@ -2493,7 +2502,7 @@ int _check_logging_status(struct mars_rotate *rot, int *log_nr, long long *oldpo
(rot->todo_primary ||
(rot->relevant_log &&
rot->next_relevant_log &&
is_switchover_possible(rot, rot->relevant_log->d_path, rot->next_relevant_log->d_path, REPLAY_TOLERANCE, false)))) {
is_switchover_possible(rot, rot->relevant_log->d_path, rot->next_relevant_log->d_path, _get_tolerance(rot), false)))) {
MARS_INF_TO(rot->log_say, "TOLERANCE: transaction log '%s' is treated as fully applied\n", rot->aio_dent->d_path);
status = 1;
} else {
@ -2569,9 +2578,9 @@ int _make_logging_status(struct mars_rotate *rot)
*/
if (!trans_brick->power.button && !trans_brick->power.led_on && trans_brick->power.led_off) {
if (rot->next_relevant_log) {
int replay_tolerance = REPLAY_TOLERANCE;
int replay_tolerance = _get_tolerance(rot);
bool skip_new = !rot->next_next_relevant_log && rot->todo_primary;
MARS_DBG("check switchover from '%s' to '%s' (size = %lld, next_next = %p, skip_new = %d)\n", dent->d_path, rot->next_relevant_log->d_path, rot->next_relevant_log->new_stat.size, rot->next_next_relevant_log, skip_new);
MARS_DBG("check switchover from '%s' to '%s' (size = %lld, next_next = %p, skip_new = %d, replay_tolerance = %d)\n", dent->d_path, rot->next_relevant_log->d_path, rot->next_relevant_log->new_stat.size, rot->next_next_relevant_log, skip_new, replay_tolerance);
if (is_switchover_possible(rot, dent->d_path, rot->next_relevant_log->d_path, replay_tolerance, skip_new)) {
MARS_INF_TO(rot->log_say, "start switchover from transaction log '%s' to '%s'\n", dent->d_path, rot->next_relevant_log->d_path);
_make_new_replaylink(rot, rot->next_relevant_log->d_rest, rot->next_relevant_log->d_serial, rot->next_relevant_log->new_stat.size);
@ -2976,6 +2985,9 @@ int make_log_finalize(struct mars_global *global, struct mars_dent *dent)
if (trans_brick->power.button && trans_brick->power.led_on && !trans_brick->power.led_off) {
bool do_stop = true;
if (trans_brick->replay_mode) {
rot->is_log_damaged =
trans_brick->replay_code == -EAGAIN &&
trans_brick->replay_end_pos - trans_brick->replay_current_pos < trans_brick->replay_tolerance;
do_stop = trans_brick->replay_code != 0 ||
!global->global_power.button ||
!_check_allow(global, parent, "allow-replay") ||
@ -3015,6 +3027,8 @@ int make_log_finalize(struct mars_global *global, struct mars_dent *dent)
goto done;
}
rot->is_log_damaged = false;
do_start = (!rot->replay_mode ||
(rot->start_pos != rot->end_pos &&
_check_allow(global, parent, "allow-replay")));