rework logfile replication

This commit is contained in:
Thomas Schoebel-Theuer 2012-02-03 11:25:17 +01:00 committed by Thomas Schoebel-Theuer
parent 05e1051686
commit a5646eeac4
2 changed files with 53 additions and 65 deletions

View File

@ -725,11 +725,13 @@ struct mars_peerinfo {
struct mars_global *global;
char *peer;
char *path;
const char *copy_path;
struct mars_socket socket;
struct task_struct *peer_thread;
spinlock_t lock;
struct list_head remote_dent_list;
//wait_queue_head_t event;
struct copy_brick *copy_brick;
int copy_serial;
int maxdepth;
};
@ -799,7 +801,6 @@ int check_logfile(struct mars_peerinfo *peer, struct mars_dent *remote_dent, str
loff_t src_size = remote_dent->new_stat.size;
struct mars_rotate *rot;
const char *switch_path = NULL;
const char *copy_path = NULL;
struct copy_brick *copy_brick;
int status = 0;
@ -818,67 +819,27 @@ int check_logfile(struct mars_peerinfo *peer, struct mars_dent *remote_dent, str
goto done;
}
// check whether (some/another) copy is already running
copy_path = path_make("%s/logfile-update", parent->d_path);
if (unlikely(!copy_path)) {
status = -ENOMEM;
goto done;
}
copy_brick = (struct copy_brick*)mars_find_brick(peer->global, &copy_brick_type, copy_path);
MARS_DBG("copy_path = '%s' copy_brick = %p dent = '%s'\n", copy_path, copy_brick, remote_dent->d_path);
if (copy_brick) {
bool is_my_copy = (remote_dent->d_serial == parent->d_logfile_serial);
bool copy_is_done = (is_my_copy && copy_brick->copy_last == copy_brick->copy_end && local_dent != NULL);
bool is_next_copy = (remote_dent->d_serial == parent->d_logfile_serial + 1);
MARS_DBG("current copy brick '%s' copy_last = %lld copy_end = %lld dent '%s' serial = %d/%d local_dent = '%s' | is_done = %d is_my_copy = %d is_next_copy = %d\n",
copy_brick->brick_path, copy_brick->copy_last, copy_brick->copy_end, remote_dent->d_path, remote_dent->d_serial, parent->d_logfile_serial, local_dent ? local_dent->d_path : "",
copy_is_done, is_my_copy, is_next_copy);
if (is_my_copy) {
rot->copy_is_done = copy_is_done;
goto treat;
}
if (peer->global->global_power.button && !rot->copy_is_done) {
goto done;
}
MARS_DBG("killing old copy brick '%s'\n", copy_brick->brick_path);
status = mars_kill_brick((void*)copy_brick);
if (status < 0)
goto done;
rot->copy_is_done = false;
// ensure consecutiveness of logfiles
if (!is_next_copy) {
goto done;
}
// fallthrough: take the next logfile
}
treat:
// (new) copy necessary?
status = 0;
if (!rot->allow_update) {
MARS_DBG("logfiles are not for me.\n");
goto done;
}
if (dst_size >= src_size && local_dent != NULL) { // nothing to do
goto ok;
}
// check whether connection is allowed
switch_path = path_make("%s/todo-%s/connect", parent->d_path, my_id());
// start / treat copy brick instance
status = _update_file(rot, switch_path, copy_path, remote_dent->d_path, peer->peer, src_size);
MARS_DBG("update '%s' from peer '%s' status = %d\n", remote_dent->d_path, peer->peer, status);
if (status < 0) {
goto done;
// check whether copy is necessary
copy_brick = peer->copy_brick;
MARS_DBG("copy_brick = %p (remote '%s' %d) copy_serial = %d\n", copy_brick, remote_dent->d_path, remote_dent->d_serial, peer->copy_serial);
if (copy_brick) {
if (remote_dent->d_serial == peer->copy_serial) {
// treat copy brick instance underway
status = _update_file(rot, switch_path, peer->copy_path, remote_dent->d_path, peer->peer, src_size);
MARS_DBG("re-update '%s' from peer '%s' status = %d\n", remote_dent->d_path, peer->peer, status);
}
} else if (!peer->copy_serial && rot->allow_update &&
(dst_size < src_size || !local_dent)) {
// start copy brick instance
status = _update_file(rot, switch_path, peer->copy_path, remote_dent->d_path, peer->peer, src_size);
MARS_DBG("update '%s' from peer '%s' status = %d\n", remote_dent->d_path, peer->peer, status);
peer->copy_serial = remote_dent->d_serial;
}
ok:
parent->d_logfile_serial = remote_dent->d_serial;
done:
brick_string_free(copy_path);
brick_string_free(switch_path);
return status;
}
@ -1175,20 +1136,26 @@ static int _kill_peer(void *buf, struct mars_dent *dent)
mars_free_dent_all(NULL, &tmp_list);
brick_string_free(peer->peer);
brick_string_free(peer->path);
brick_string_free(peer->copy_path);
dent->d_private = NULL;
brick_mem_free(peer);
return 0;
}
static int _make_peer(struct mars_global *global, struct mars_dent *dent, char *mypeer, char *path)
static int _make_peer(struct mars_global *global, struct mars_dent *dent, char *path)
{
static int serial = 0;
struct mars_peerinfo *peer;
char *mypeer;
char *parent_path;
struct copy_brick *copy_brick;
int status = 0;
if (!global->global_power.button || !dent->new_link) {
if (!global || !global->global_power.button || !dent || !dent->new_link || !dent->d_parent || !(parent_path = dent->d_parent->d_path)) {
MARS_DBG("cannot work\n");
return 0;
}
mypeer = dent->d_rest;
if (!mypeer) {
status = _parse_args(dent, dent->new_link, 1);
if (status < 0)
@ -1198,20 +1165,29 @@ static int _make_peer(struct mars_global *global, struct mars_dent *dent, char *
MARS_DBG("peer '%s'\n", mypeer);
if (!dent->d_private) {
const char *copy_path = NULL;
dent->d_private = brick_zmem_alloc(sizeof(struct mars_peerinfo));
if (!dent->d_private) {
MARS_ERR("no memory for peer structure\n");
return -1;
status = -ENOMEM;
goto done;
}
peer = dent->d_private;
copy_path = path_make("%s/logfile-update", parent_path);
if (unlikely(!copy_path)) {
MARS_ERR("cannot create copy_path\n");
brick_mem_free(peer);
status = -ENOMEM;
goto done;
}
peer->copy_path = copy_path;
peer->global = global;
peer->peer = brick_strdup(mypeer);
peer->path = brick_strdup(path);
peer->maxdepth = 2;
spin_lock_init(&peer->lock);
INIT_LIST_HEAD(&peer->remote_dent_list);
//init_waitqueue_head(&peer->event);
}
peer = dent->d_private;
@ -1227,10 +1203,23 @@ static int _make_peer(struct mars_global *global, struct mars_dent *dent, char *
wake_up_process(peer->peer_thread);
}
// check whether some copy has finished
copy_brick = (struct copy_brick*)mars_find_brick(global, &copy_brick_type, peer->copy_path);
MARS_DBG("copy_path = '%s' copy_brick = %p\n", peer->copy_path, copy_brick);
if (copy_brick && (copy_brick->copy_last == copy_brick->copy_end || copy_brick->power.led_off)) {
status = mars_kill_brick((void*)copy_brick);
if (status < 0)
goto done;
copy_brick = NULL;
}
if (!copy_brick)
peer->copy_serial = 0;
/* This must be called by the main thread in order to
* avoid nasty races.
* The peer thread does nothing but fetching the dent list.
*/
peer->copy_brick = copy_brick;
status = run_bones(peer);
done:
@ -1249,7 +1238,7 @@ static int make_scan(void *buf, struct mars_dent *dent)
if (!strcmp(dent->d_rest, my_id())) {
return 0;
}
return _make_peer(buf, dent, dent->d_rest, "/mars");
return _make_peer(buf, dent, "/mars");
}

View File

@ -35,7 +35,6 @@ extern char *my_id(void);
char *new_link; \
char *old_link; \
struct mars_global *d_global; \
int d_logfile_serial; \
void *d_private;
struct mars_dent {