light: when jammed, create a hole in logfile sequence numbers

The hole will indicate the disruption, and the secondaries will
stuble over it.
This commit is contained in:
Thomas Schoebel-Theuer 2013-04-08 08:46:13 +02:00
parent c275bec28d
commit 7a6c24c0c1

View File

@ -787,10 +787,6 @@ int _update_version_link(struct mars_rotate *rot, struct trans_logger_info *inf)
goto out;
}
prev_link = mars_readlink(prev);
if (unlikely(!prev_link)) {
MARS_ERR("cannot find previous version symlink '%s'\n", SAFE_STR(prev));
goto out;
}
rot->inf_prev_sequence = inf->inf_sequence;
}
@ -2215,8 +2211,11 @@ int make_log_step(void *buf, struct mars_dent *dent)
prev_log = rot->next_log;
if (prev_log && prev_log->d_serial + 1 != dent->d_serial) {
MARS_WRN("transaction logs are not consecutive at '%s' (%d ~> %d)\n", dent->d_path, prev_log->d_serial, dent->d_serial);
status = -EINVAL;
goto done;
// allow the primary to create a hole in the logfile sequence numbers
if (!rot->todo_primary || prev_log->d_serial + 2 != dent->d_serial) {
status = -EINVAL;
goto done;
}
}
if (dent->d_serial > rot->max_sequence) {
@ -2280,7 +2279,7 @@ err:
* ret == 3 : relevant for appending
*/
static
int _check_logging_status(struct mars_rotate *rot, long long *oldpos_start, long long *oldpos_end, long long *newpos)
int _check_logging_status(struct mars_rotate *rot, int *log_nr, long long *oldpos_start, long long *oldpos_end, long long *newpos)
{
struct mars_dent *dent = rot->relevant_log;
struct mars_dent *parent;
@ -2298,6 +2297,10 @@ int _check_logging_status(struct mars_rotate *rot, long long *oldpos_start, long
CHECK_PTR(rot->replay_link, done);
CHECK_PTR(rot->aio_brick, done);
if (sscanf(rot->replay_link->d_argv[0], "log-%d", log_nr) != 1) {
MARS_ERR("malformed logfile number '%s'\n", rot->replay_link->d_argv[0]);
goto done;
}
if (sscanf(rot->replay_link->d_argv[1], "%lld", oldpos_start) != 1) {
MARS_ERR("bad start position argument '%s'\n", rot->replay_link->d_argv[1]);
goto done;
@ -2355,6 +2358,7 @@ int _make_logging_status(struct mars_rotate *rot)
struct mars_dent *parent;
struct mars_global *global = NULL;
struct trans_logger_brick *trans_brick;
int log_nr = 0;
loff_t start_pos = 0;
loff_t dirty_pos = 0;
loff_t end_pos = 0;
@ -2378,7 +2382,7 @@ int _make_logging_status(struct mars_rotate *rot)
/* Find current logging status.
*/
status = _check_logging_status(rot, &start_pos, &dirty_pos, &end_pos);
status = _check_logging_status(rot, &log_nr, &start_pos, &dirty_pos, &end_pos);
if (status < 0) {
goto done;
}
@ -2402,8 +2406,10 @@ int _make_logging_status(struct mars_rotate *rot)
_make_new_replaylink(rot, rot->next_relevant_log->d_rest, rot->next_relevant_log->d_serial, rot->next_relevant_log->new_stat.size);
}
} else if (rot->todo_primary) {
MARS_DBG("preparing new transaction log '%s' from version %d to %d\n", dent->d_path, dent->d_serial, dent->d_serial + 1);
_make_new_replaylink(rot, my_id(), dent->d_serial + 1, 0);
if (dent->d_serial > log_nr)
log_nr = dent->d_serial;
MARS_DBG("preparing new transaction log '%s' from version %d to %d\n", dent->d_path, dent->d_serial, log_nr + 1);
_make_new_replaylink(rot, my_id(), log_nr + 1, 0);
} else {
MARS_DBG("nothing to do on last transaction log '%s'\n", dent->d_path);
}
@ -2517,7 +2523,8 @@ void _rotate_trans(struct mars_rotate *rot)
// try to setup new log
if (log_nr == trans_brick->new_input_nr &&
rot->next_relevant_log &&
rot->next_relevant_log->d_serial == trans_brick->inputs[log_nr]->inf.inf_sequence + 1 &&
(rot->next_relevant_log->d_serial == trans_brick->inputs[log_nr]->inf.inf_sequence + 1 ||
trans_brick->cease_logging) &&
(next_nr = _get_free_input(trans_brick)) >= 0) {
struct trans_logger_input *trans_input;
int status;
@ -2719,40 +2726,6 @@ done:
return status;
}
static
void override_all_syncstatus(struct mars_global *global, struct mars_rotate *rot, char *parent_path)
{
char *prefix = NULL;
struct mars_dent **table = NULL;
int count;
int i;
prefix = path_make("%s/syncstatus-", parent_path);
if (!prefix)
goto done;
count = mars_find_dent_all(global, prefix, &table);
MARS_DBG("prefix='%s' count=%d\n", prefix, count);
for (i = 0; i < count; i++) {
struct mars_dent *dent = table[i];
int status;
if (!dent || !dent->d_path || dent == rot->syncstatus_dent)
continue;
status = mars_symlink("0", dent->d_path, NULL, 0);
MARS_DBG("clearing syncstatus link '%s' status=%d\n", dent->d_path, status);
}
done:
if (table)
brick_mem_free(table);
if (prefix)
brick_string_free(prefix);
}
static
int make_log_finalize(struct mars_global *global, struct mars_dent *dent)
{
@ -2780,12 +2753,23 @@ int make_log_finalize(struct mars_global *global, struct mars_dent *dent)
brick_say_logging = 0;
if (rot->todo_primary || rot->is_primary) {
trans_brick->cease_logging = true;
rot->inf_prev_sequence = 0; // disable checking
}
} else if (!rot->todo_primary && !rot->is_primary) {
trans_brick->cease_logging = false;
}
if (trans_brick->cease_logging)
override_all_syncstatus(global, rot, parent->d_path);
if (trans_brick->cease_logging) {
/* Create a hole in the sequence of logfile numbers.
* The secondaries will later stumble over it.
*/
if (trans_brick->inputs[trans_brick->log_input_nr]->inf.inf_max_pos > 0) {
char *new_path = path_make("%s/log-%09d-%s", rot->parent_path, rot->max_sequence + 2, my_id());
if (likely(new_path && !mars_find_dent(global, new_path))) {
_create_new_logfile(new_path);
}
brick_string_free(new_path);
}
}
// check whether some copy has finished
copy_brick = (struct copy_brick*)mars_find_brick(global, &copy_brick_type, rot->copy_path);