fix logfile update from multiple peers

This commit is contained in:
Thomas Schoebel-Theuer 2012-02-21 12:23:38 +01:00 committed by Thomas Schoebel-Theuer
parent d008e20fcd
commit cb517fb34c
1 changed files with 65 additions and 42 deletions

View File

@ -529,12 +529,15 @@ struct mars_rotate {
struct mars_dent *prev_log;
struct mars_dent *next_log;
struct if_brick *if_brick;
const char *copy_path;
struct copy_brick *copy_brick;
long long switchover_timeout;
loff_t total_space;
loff_t remaining_space;
loff_t start_pos;
loff_t end_pos;
int max_sequence;
int copy_serial;
bool has_error;
bool allow_update;
bool allow_sync;
@ -747,13 +750,10 @@ struct mars_peerinfo {
struct mars_global *global;
char *peer;
char *path;
const char *copy_path;
struct mars_socket socket;
struct task_struct *peer_thread;
spinlock_t lock;
struct list_head remote_dent_list;
struct copy_brick *copy_brick;
int copy_serial;
int maxdepth;
};
@ -818,7 +818,7 @@ done:
}
static
int check_logfile(struct mars_peerinfo *peer, struct mars_dent *remote_dent, struct mars_dent *local_dent, struct mars_dent *parent, loff_t dst_size)
int check_logfile(const char *peer, struct mars_dent *remote_dent, struct mars_dent *local_dent, struct mars_dent *parent, loff_t dst_size)
{
loff_t src_size = remote_dent->new_stat.size;
struct mars_rotate *rot;
@ -840,25 +840,30 @@ int check_logfile(struct mars_peerinfo *peer, struct mars_dent *remote_dent, str
status = -EINVAL;
goto done;
}
if (!rot->copy_path) {
MARS_WRN("parent has no copy_path\n");
status = -EINVAL;
goto done;
}
// check whether connection is allowed
switch_path = path_make("%s/todo-%s/connect", parent->d_path, my_id());
// check whether copy is necessary
copy_brick = peer->copy_brick;
MARS_DBG("copy_brick = %p (remote '%s' %d) copy_serial = %d\n", copy_brick, remote_dent->d_path, remote_dent->d_serial, peer->copy_serial);
copy_brick = rot->copy_brick;
MARS_DBG("copy_brick = %p (remote '%s' %d) copy_serial = %d\n", copy_brick, remote_dent->d_path, remote_dent->d_serial, rot->copy_serial);
if (copy_brick) {
if (remote_dent->d_serial == peer->copy_serial) {
if (remote_dent->d_serial == rot->copy_serial) {
// treat copy brick instance underway
status = _update_file(rot, switch_path, peer->copy_path, remote_dent->d_path, peer->peer, src_size);
MARS_DBG("re-update '%s' from peer '%s' status = %d\n", remote_dent->d_path, peer->peer, status);
status = _update_file(rot, switch_path, rot->copy_path, remote_dent->d_path, peer, src_size);
MARS_DBG("re-update '%s' from peer '%s' status = %d\n", remote_dent->d_path, peer, status);
}
} else if (!peer->copy_serial && rot->allow_update &&
} else if (!rot->copy_serial && rot->allow_update &&
(dst_size < src_size || !local_dent)) {
// start copy brick instance
status = _update_file(rot, switch_path, peer->copy_path, remote_dent->d_path, peer->peer, src_size);
MARS_DBG("update '%s' from peer '%s' status = %d\n", remote_dent->d_path, peer->peer, status);
peer->copy_serial = remote_dent->d_serial;
status = _update_file(rot, switch_path, rot->copy_path, remote_dent->d_path, peer, src_size);
MARS_DBG("update '%s' from peer '%s' status = %d\n", remote_dent->d_path, peer, status);
rot->copy_serial = remote_dent->d_serial;
}
done:
@ -937,7 +942,7 @@ int run_bone(struct mars_peerinfo *peer, struct mars_dent *remote_dent)
if (unlikely(!parent)) {
MARS_IO("ignoring non-existing local resource '%s'\n", parent_path);
} else {
status = check_logfile(peer, remote_dent, local_dent, parent, local_stat.size);
status = check_logfile(peer->peer, remote_dent, local_dent, parent, local_stat.size);
}
brick_string_free(parent_path);
}
@ -1158,7 +1163,6 @@ static int _kill_peer(void *buf, struct mars_dent *dent)
mars_free_dent_all(NULL, &tmp_list);
brick_string_free(peer->peer);
brick_string_free(peer->path);
brick_string_free(peer->copy_path);
dent->d_private = NULL;
brick_mem_free(peer);
return 0;
@ -1170,7 +1174,6 @@ static int _make_peer(struct mars_global *global, struct mars_dent *dent, char *
struct mars_peerinfo *peer;
char *mypeer;
char *parent_path;
struct copy_brick *copy_brick;
int status = 0;
if (!global || !global->global_power.button || !dent || !dent->new_link || !dent->d_parent || !(parent_path = dent->d_parent->d_path)) {
@ -1187,7 +1190,6 @@ static int _make_peer(struct mars_global *global, struct mars_dent *dent, char *
MARS_DBG("peer '%s'\n", mypeer);
if (!dent->d_private) {
const char *copy_path = NULL;
dent->d_private = brick_zmem_alloc(sizeof(struct mars_peerinfo));
if (!dent->d_private) {
MARS_ERR("no memory for peer structure\n");
@ -1195,15 +1197,6 @@ static int _make_peer(struct mars_global *global, struct mars_dent *dent, char *
goto done;
}
peer = dent->d_private;
copy_path = path_make("%s/logfile-update", parent_path);
if (unlikely(!copy_path)) {
MARS_ERR("cannot create copy_path\n");
brick_mem_free(peer);
status = -ENOMEM;
goto done;
}
peer->copy_path = copy_path;
peer->global = global;
peer->peer = brick_strdup(mypeer);
peer->path = brick_strdup(path);
@ -1225,23 +1218,10 @@ static int _make_peer(struct mars_global *global, struct mars_dent *dent, char *
wake_up_process(peer->peer_thread);
}
// check whether some copy has finished
copy_brick = (struct copy_brick*)mars_find_brick(global, &copy_brick_type, peer->copy_path);
MARS_DBG("copy_path = '%s' copy_brick = %p\n", peer->copy_path, copy_brick);
if (copy_brick && (copy_brick->copy_last == copy_brick->copy_end || copy_brick->power.led_off)) {
status = mars_kill_brick((void*)copy_brick);
if (status < 0)
goto done;
copy_brick = NULL;
}
if (!copy_brick)
peer->copy_serial = 0;
/* This must be called by the main thread in order to
* avoid nasty races.
* The peer thread does nothing but fetching the dent list.
*/
peer->copy_brick = copy_brick;
status = run_bones(peer);
done:
@ -1287,6 +1267,24 @@ int kill_any(void *buf, struct mars_dent *dent)
return 1;
}
static
int kill_log(void *buf, struct mars_dent *dent)
{
struct mars_global *global = buf;
struct mars_rotate *rot = dent->d_private;
if (global->global_power.button) {
return 0;
}
if (likely(rot)) {
brick_string_free(rot->copy_path);
rot->copy_path = NULL;
}
return kill_any(buf, dent);
}
///////////////////////////////////////////////////////////////////////
// handlers / helpers for logfile rotation
@ -1626,14 +1624,23 @@ int make_log_init(void *buf, struct mars_dent *dent)
CHECK_PTR(parent_path, done);
if (!rot) {
const char *copy_path;
rot = brick_zmem_alloc(sizeof(struct mars_rotate));
parent->d_private = rot;
if (!rot) {
if (unlikely(!rot)) {
MARS_ERR("cannot allocate rot structure\n");
status = -ENOMEM;
goto done;
}
copy_path = path_make("%s/logfile-update", parent_path);
if (unlikely(!copy_path)) {
MARS_ERR("cannot create copy_path\n");
brick_mem_free(rot);
status = -ENOMEM;
goto done;
}
rot->copy_path = copy_path;
rot->global = global;
parent->d_private = rot;
}
rot->replay_link = NULL;
@ -2332,6 +2339,7 @@ int make_log_finalize(struct mars_global *global, struct mars_dent *dent)
struct mars_dent *parent = dent->d_parent;
struct mars_rotate *rot;
struct trans_logger_brick *trans_brick;
struct copy_brick *copy_brick;
int status = -EINVAL;
CHECK_PTR(parent, err);
@ -2344,6 +2352,21 @@ int make_log_finalize(struct mars_global *global, struct mars_dent *dent)
goto done;
}
// check whether some copy has finished
copy_brick = (struct copy_brick*)mars_find_brick(global, &copy_brick_type, rot->copy_path);
MARS_DBG("copy_path = '%s' copy_brick = %p\n", rot->copy_path, copy_brick);
if (copy_brick && (copy_brick->copy_last == copy_brick->copy_end || copy_brick->power.led_off)) {
status = mars_kill_brick((void*)copy_brick);
if (status < 0) {
MARS_ERR("could not kill copy_brick, status = %d\n", status);
goto done;
}
copy_brick = NULL;
}
rot->copy_brick = copy_brick;
if (!copy_brick)
rot->copy_serial = 0;
/* Stopping is also possible in case of errors
*/
if (trans_brick->power.button && trans_brick->power.led_on && !trans_brick->power.led_off) {
@ -3145,7 +3168,7 @@ static const struct light_class light_classes[] = {
#ifdef RUN_LOGINIT
.cl_forward = make_log_init,
#endif
.cl_backward = kill_any,
.cl_backward = kill_log,
},
/* Symlink pointing to the name of the primary node
*/