main: dont replay beyond primary during sync

This commit is contained in:
Thomas Schoebel-Theuer 2021-02-07 18:41:43 +01:00
parent fac0bd5a47
commit c1820e0783
1 changed files with 46 additions and 4 deletions

View File

@ -896,6 +896,7 @@ struct mars_rotate {
bool allow_update;
bool rot_activated;
bool old_fetch_on;
bool want_sync;
bool forbid_replay;
bool replay_mode;
bool todo_primary;
@ -1351,7 +1352,11 @@ int parse_logfile_name(const char *str, int *seq, const char **host)
}
static
int compare_replaylinks(struct mars_rotate *rot, const char *hosta, const char *hostb)
int compare_replaylinks(struct mars_rotate *rot,
const char *hosta,
const char *hostb,
loff_t *save_a,
loff_t *save_b)
{
const char *linka = path_make("%s/replay-%s", rot->parent_path, hosta);
const char *linkb = path_make("%s/replay-%s", rot->parent_path, hostb);
@ -1399,6 +1404,10 @@ int compare_replaylinks(struct mars_rotate *rot, const char *hosta, const char *
goto done;
}
/* The seqence number is equal.
* Now the base offsets are comparable.
*/
posa += skip_part(a + posa);
posb += skip_part(b + posb);
if (unlikely(!a[posa++])) {
@ -1411,11 +1420,13 @@ int compare_replaylinks(struct mars_rotate *rot, const char *hosta, const char *
count = sscanf(a + posa, "%lld", &offa);
if (unlikely(count != 1)) {
MARS_ERR_TO(rot->log_say, "replay link '%s' -> '%s' is malformed\n", linka, a);
}
} else if (save_a)
*save_a = offa;
count = sscanf(b + posb, "%lld", &offb);
if (unlikely(count != 1)) {
MARS_ERR_TO(rot->log_say, "replay link '%s' -> '%s' is malformed\n", linkb, b);
}
} else if (save_b)
*save_b = offb;
if (offa < offb) {
res = -1;
@ -4700,6 +4711,35 @@ int _check_logging_status(struct mars_rotate *rot, int *log_nr, long long *oldpo
check_pos:
*newpos = rot->aio_info.current_size;
/* Shorten the end position when sync wants to run.
* Avoids unnecessary delay when the sync is starting again.
*/
if (rot->want_sync && !rot->todo_primary && rot->parent_path) {
loff_t min_pos = -1;
char *tmp = path_make("%s/primary", rot->parent_path);
const char *primary = ordered_readlink(tmp, NULL);
if (likely(primary && primary[0])) {
int cmp = compare_replaylinks(rot, primary, my_id(), &min_pos, NULL);
MARS_DBG("want_sync compare=%d min_pos=%lld\n",
cmp, min_pos);
}
if (min_pos >= 0 &&
min_pos < *newpos) {
MARS_INF("shortening replay from %lld to %lld due to sync\n",
*newpos, min_pos);
*newpos = min_pos;
}
if (unlikely(*oldpos_end > *newpos)) {
MARS_INF("also shortening from %lld to %lld\n",
*oldpos_end, *newpos);
*oldpos_end = min_pos;
}
brick_string_free(tmp);
brick_string_free(primary);
}
if (unlikely(rot->aio_info.current_size < *oldpos_start)) {
status = -EBADF;
/* Allow primary --force even when logfiles are truncated / damaged.
@ -6291,6 +6331,7 @@ static int make_sync(struct mars_dent *dent)
if (_global_sync_nr > global_sync_limit && global_sync_limit > 0)
do_start = false;
}
rot->want_sync = do_start;
/* Disallow contemporary sync & logfile_replay
*/
@ -6307,7 +6348,8 @@ static int make_sync(struct mars_dent *dent)
MARS_DBG("update info links\n");
write_info_links(rot);
}
rot->forbid_replay = (do_start && compare_replaylinks(rot, peer, my_id()) < 0);
rot->forbid_replay = (do_start &&
compare_replaylinks(rot, peer, my_id(), NULL, NULL) < 0);
if (rot->forbid_replay) {
MARS_INF("cannot start sync because my data is newer than the remote one at '%s'!\n", peer);
do_start = false;