diff --git a/kernel/sy_old/mars_light.c b/kernel/sy_old/mars_light.c index 93459809..65f6e811 100644 --- a/kernel/sy_old/mars_light.c +++ b/kernel/sy_old/mars_light.c @@ -787,10 +787,6 @@ int _update_version_link(struct mars_rotate *rot, struct trans_logger_info *inf) goto out; } prev_link = mars_readlink(prev); - if (unlikely(!prev_link)) { - MARS_ERR("cannot find previous version symlink '%s'\n", SAFE_STR(prev)); - goto out; - } rot->inf_prev_sequence = inf->inf_sequence; } @@ -2215,8 +2211,11 @@ int make_log_step(void *buf, struct mars_dent *dent) prev_log = rot->next_log; if (prev_log && prev_log->d_serial + 1 != dent->d_serial) { MARS_WRN("transaction logs are not consecutive at '%s' (%d ~> %d)\n", dent->d_path, prev_log->d_serial, dent->d_serial); - status = -EINVAL; - goto done; + // allow the primary to create a hole in the logfile sequence numbers + if (!rot->todo_primary || prev_log->d_serial + 2 != dent->d_serial) { + status = -EINVAL; + goto done; + } } if (dent->d_serial > rot->max_sequence) { @@ -2280,7 +2279,7 @@ err: * ret == 3 : relevant for appending */ static -int _check_logging_status(struct mars_rotate *rot, long long *oldpos_start, long long *oldpos_end, long long *newpos) +int _check_logging_status(struct mars_rotate *rot, int *log_nr, long long *oldpos_start, long long *oldpos_end, long long *newpos) { struct mars_dent *dent = rot->relevant_log; struct mars_dent *parent; @@ -2298,6 +2297,10 @@ int _check_logging_status(struct mars_rotate *rot, long long *oldpos_start, long CHECK_PTR(rot->replay_link, done); CHECK_PTR(rot->aio_brick, done); + if (sscanf(rot->replay_link->d_argv[0], "log-%d", log_nr) != 1) { + MARS_ERR("malformed logfile number '%s'\n", rot->replay_link->d_argv[0]); + goto done; + } if (sscanf(rot->replay_link->d_argv[1], "%lld", oldpos_start) != 1) { MARS_ERR("bad start position argument '%s'\n", rot->replay_link->d_argv[1]); goto done; @@ -2355,6 +2358,7 @@ int _make_logging_status(struct mars_rotate *rot) struct mars_dent *parent; struct mars_global *global = NULL; struct trans_logger_brick *trans_brick; + int log_nr = 0; loff_t start_pos = 0; loff_t dirty_pos = 0; loff_t end_pos = 0; @@ -2378,7 +2382,7 @@ int _make_logging_status(struct mars_rotate *rot) /* Find current logging status. */ - status = _check_logging_status(rot, &start_pos, &dirty_pos, &end_pos); + status = _check_logging_status(rot, &log_nr, &start_pos, &dirty_pos, &end_pos); if (status < 0) { goto done; } @@ -2402,8 +2406,10 @@ int _make_logging_status(struct mars_rotate *rot) _make_new_replaylink(rot, rot->next_relevant_log->d_rest, rot->next_relevant_log->d_serial, rot->next_relevant_log->new_stat.size); } } else if (rot->todo_primary) { - MARS_DBG("preparing new transaction log '%s' from version %d to %d\n", dent->d_path, dent->d_serial, dent->d_serial + 1); - _make_new_replaylink(rot, my_id(), dent->d_serial + 1, 0); + if (dent->d_serial > log_nr) + log_nr = dent->d_serial; + MARS_DBG("preparing new transaction log '%s' from version %d to %d\n", dent->d_path, dent->d_serial, log_nr + 1); + _make_new_replaylink(rot, my_id(), log_nr + 1, 0); } else { MARS_DBG("nothing to do on last transaction log '%s'\n", dent->d_path); } @@ -2517,7 +2523,8 @@ void _rotate_trans(struct mars_rotate *rot) // try to setup new log if (log_nr == trans_brick->new_input_nr && rot->next_relevant_log && - rot->next_relevant_log->d_serial == trans_brick->inputs[log_nr]->inf.inf_sequence + 1 && + (rot->next_relevant_log->d_serial == trans_brick->inputs[log_nr]->inf.inf_sequence + 1 || + trans_brick->cease_logging) && (next_nr = _get_free_input(trans_brick)) >= 0) { struct trans_logger_input *trans_input; int status; @@ -2719,40 +2726,6 @@ done: return status; } -static -void override_all_syncstatus(struct mars_global *global, struct mars_rotate *rot, char *parent_path) -{ - char *prefix = NULL; - struct mars_dent **table = NULL; - int count; - int i; - - - prefix = path_make("%s/syncstatus-", parent_path); - if (!prefix) - goto done; - - count = mars_find_dent_all(global, prefix, &table); - MARS_DBG("prefix='%s' count=%d\n", prefix, count); - - for (i = 0; i < count; i++) { - struct mars_dent *dent = table[i]; - int status; - - if (!dent || !dent->d_path || dent == rot->syncstatus_dent) - continue; - - status = mars_symlink("0", dent->d_path, NULL, 0); - MARS_DBG("clearing syncstatus link '%s' status=%d\n", dent->d_path, status); - } - -done: - if (table) - brick_mem_free(table); - if (prefix) - brick_string_free(prefix); -} - static int make_log_finalize(struct mars_global *global, struct mars_dent *dent) { @@ -2780,12 +2753,23 @@ int make_log_finalize(struct mars_global *global, struct mars_dent *dent) brick_say_logging = 0; if (rot->todo_primary || rot->is_primary) { trans_brick->cease_logging = true; + rot->inf_prev_sequence = 0; // disable checking } } else if (!rot->todo_primary && !rot->is_primary) { trans_brick->cease_logging = false; } - if (trans_brick->cease_logging) - override_all_syncstatus(global, rot, parent->d_path); + if (trans_brick->cease_logging) { + /* Create a hole in the sequence of logfile numbers. + * The secondaries will later stumble over it. + */ + if (trans_brick->inputs[trans_brick->log_input_nr]->inf.inf_max_pos > 0) { + char *new_path = path_make("%s/log-%09d-%s", rot->parent_path, rot->max_sequence + 2, my_id()); + if (likely(new_path && !mars_find_dent(global, new_path))) { + _create_new_logfile(new_path); + } + brick_string_free(new_path); + } + } // check whether some copy has finished copy_brick = (struct copy_brick*)mars_find_brick(global, ©_brick_type, rot->copy_path);