server: obey prosumer epoch timestamp

This commit is contained in:
Thomas Schoebel-Theuer 2020-09-22 11:13:27 +02:00
parent 14ddf5cd03
commit 0115feeeb8
1 changed files with 66 additions and 6 deletions

View File

@ -448,18 +448,24 @@ int check_export_permissions(char *reduce_path, char **peer)
/* This must be called under server_connect_lock */
static
bool check_multi_prosumer(struct trans_logger_brick *logger,
const char *peer)
const char *peer,
struct lamport_time *check_prosumer_epoch)
{
struct trans_logger_output *output;
struct list_head *tmp;
char *resource_path = brick_strdup(logger->resource_name);
char *resource_path;
char *pos;
const char *multi_path = NULL;
const char *multi = NULL;
int others_allowed = 0;
int nr_others = 0;
struct lamport_time old_prosumer_epoch = {};
const char *prosumer_epoch_path = NULL;
const char *prosumer_epoch = NULL;
int status;
bool fail = true;
resource_path = brick_strdup(logger->brick_path);
pos = strstr(resource_path, LOGGER_MARKER);
if (pos)
*pos = '\0';
@ -468,14 +474,16 @@ bool check_multi_prosumer(struct trans_logger_brick *logger,
resource_path, my_id());
multi = ordered_readlink(multi_path, NULL);
if (!multi)
goto done;
goto fail;
sscanf(multi, "%d", &others_allowed);
MARS_DBG("resource='%s' multi_path='%s' multi='%s' others_allowed=%d\n",
resource_path, multi_path, multi,
others_allowed);
fail = false;
if (others_allowed)
if (others_allowed) {
check_prosumer_epoch = NULL;
goto done;
}
/* Check dynamic connections, disallow parallel ones.
*/
@ -507,13 +515,65 @@ bool check_multi_prosumer(struct trans_logger_brick *logger,
strcmp(peer, my_id())))
nr_others++;
}
MARS_DBG("nr_others=%d\n", nr_others);
fail = (nr_others > 0);
MARS_DBG("nr_others=%d fail=%d\n", nr_others, fail);
if (fail)
goto fail;
done:
/* Prosumer epoch: check persistent connection permissions.
*/
prosumer_epoch_path = path_make("%s/actual-%s/prosumer-epoch",
resource_path, my_id());
prosumer_epoch = ordered_readlink(prosumer_epoch_path,
&old_prosumer_epoch);
MARS_DBG("prosumer '%s' -> '%s' peer '%s' epoch old=%lld.%09ld new=%lld.%09ld\n",
prosumer_epoch_path,
prosumer_epoch,
peer,
(s64)old_prosumer_epoch.tv_sec,
old_prosumer_epoch.tv_nsec,
check_prosumer_epoch ? (s64)check_prosumer_epoch->tv_sec : 0,
check_prosumer_epoch ? check_prosumer_epoch->tv_nsec : 0);
/* Always allow when no old prosumer epoch timestamp exists,
*/
if (!old_prosumer_epoch.tv_sec) {
MARS_DBG("ok, no old prosumer epoch '%s'\n", peer);
goto update;
}
/* check for violations of prosumer epoch timestamp */
if (check_prosumer_epoch &&
lamport_time_compare(check_prosumer_epoch, &old_prosumer_epoch) < 0) {
MARS_ERR("prosumer '%s' -> '%s' peer '%s' epoch violation %lld.%09ld < %lld.%09ld\n",
prosumer_epoch_path,
prosumer_epoch,
peer,
(s64)check_prosumer_epoch->tv_sec,
check_prosumer_epoch->tv_nsec,
(s64)old_prosumer_epoch.tv_sec,
old_prosumer_epoch.tv_nsec);
fail = true;
goto fail;
}
update:
/* Update prosumer epoch */
MARS_DBG("update prosumer '%s' peer '%s' new epoch %ld.%09ld\n",
prosumer_epoch_path, peer,
check_prosumer_epoch ? check_prosumer_epoch->tv_sec : 0,
check_prosumer_epoch ? check_prosumer_epoch->tv_nsec : 0);
status = ordered_symlink(peer, prosumer_epoch_path, check_prosumer_epoch);
fail = (status < 0);
if (unlikely(fail))
MARS_ERR("prosumer '%s' epoch update failed, status=%d\n",
peer, status);
fail:
brick_string_free(resource_path);
brick_string_free(multi_path);
brick_string_free(multi);
brick_string_free(prosumer_epoch_path);
brick_string_free(prosumer_epoch);
return fail;
}
@ -857,7 +917,7 @@ int handler_thread(void *data)
}
status = -EISCONN;
if (check_multi_prosumer(prev, peer)) {
if (check_multi_prosumer(prev, peer, &cmd.cmd_stamp)) {
MARS_WRN("already in use '%s'\n", path);
goto unlock;
}