main: run some additional peer threads

This commit is contained in:
Thomas Schoebel-Theuer 2017-01-03 16:02:33 +01:00 committed by Thomas Schoebel-Theuer
parent 08bf6cb8b5
commit a41c0f8f98
3 changed files with 92 additions and 6 deletions

View File

@ -78,6 +78,12 @@
#include "../mars_usebuf.h"
#endif
/* Portability: can we use get_random_int() in a module? */
#include <linux/string_helpers.h>
#ifdef UNESCAPE_SPACE /* Commit 16c7fa05829e8b91db48e3539c5d6ff3c2b18a23 */
#define HAS_GET_RANDOM_INT
#endif
#define REPLAY_TOLERANCE (PAGE_SIZE + OVERHEAD)
#if 0
@ -152,6 +158,9 @@ EXPORT_SYMBOL_GPL(mars_sync_flip_interval);
int mars_peer_abort = 7;
EXPORT_SYMBOL_GPL(mars_peer_abort);
int mars_running_additional_peers = 0;
int mars_run_additional_peers = 3;
int mars_fast_fullsync =
#ifdef CONFIG_MARS_FAST_FULLSYNC
1
@ -1659,7 +1668,7 @@ done:
// remote workers
static DECLARE_RWSEM(peer_lock);
static int peer_count = 0;
static
struct list_head peer_anchor = LIST_HEAD_INIT(peer_anchor);
@ -1677,6 +1686,8 @@ struct mars_peerinfo {
bool to_remote_trigger;
bool from_remote_trigger;
bool do_communicate;
bool do_additional;
bool doing_additional;
};
static
@ -1699,6 +1710,40 @@ struct mars_peerinfo *find_peer(const char *peer_name)
return res;
}
static
void additional_peers(int add)
{
if (add <= 0)
return;
down_read(&peer_lock);
while (add > 0) {
/* Approximate equal distribution */
#ifdef HAS_GET_RANDOM_INT
int nr = peer_count > 1 ? get_random_int() % peer_count : 0;
#else
struct timespec now = CURRENT_TIME;
int nr = peer_count > 1 ? (now.tv_sec + now.tv_nsec) % peer_count : 0;
#endif
struct list_head *tmp;
for (tmp = peer_anchor.next; tmp != &peer_anchor; tmp = tmp->next) {
struct mars_peerinfo *peer;
peer = container_of(tmp, struct mars_peerinfo, peer_head);
if (peer->do_communicate | peer->do_additional)
continue;
if (!nr) {
peer->do_additional = true;
break;
}
nr--;
}
add--;
}
up_read(&peer_lock);
}
static
void show_peers(void)
{
@ -2072,6 +2117,11 @@ int run_bones(struct mars_peerinfo *peer)
MARS_DBG("remote_dent_list list_empty = %d\n", list_empty(&tmp_list));
if (peer->do_additional && !peer->doing_additional &&
!peer->do_communicate && !list_empty(&tmp_list)) {
peer->doing_additional = true;
mars_running_additional_peers++;
}
for (tmp = tmp_list.next; tmp != &tmp_list; tmp = tmp->next) {
struct mars_dent *remote_dent = container_of(tmp, struct mars_dent, dent_link);
if (!remote_dent->d_path || !remote_dent->d_name) {
@ -2183,6 +2233,9 @@ int peer_thread(void *data)
if (unlikely(status < 0)) {
MARS_INF("no connection to mars module on '%s' (%s) status = %d\n", peer->peer, real_peer, status);
make_msg(peer_pairs, "connection to '%s' (%s) could not be established: status = %d", peer->peer, real_peer, status);
/* additional threads should give up immediately */
if (peer->do_additional)
break;
brick_msleep(2000);
continue;
}
@ -2293,6 +2346,13 @@ int peer_thread(void *data)
brick_msleep(100);
if (!brick_thread_should_stop()) {
if (peer->do_additional && !peer->do_communicate) {
if (mars_running_additional_peers > mars_run_additional_peers)
break;
pause_time += 30;
if (pause_time > 600)
pause_time = 600;
}
if (pause_time < mars_propagate_interval)
pause_time++;
wait_event_interruptible_timeout(remote_event,
@ -2304,6 +2364,9 @@ int peer_thread(void *data)
free_and_restart:
mars_free_dent_all(NULL, &tmp_global.dent_anchor);
/* additional threads should give up immediately */
if (peer->do_additional && !peer->do_communicate)
break;
brick_msleep(2000);
}
@ -2312,6 +2375,11 @@ int peer_thread(void *data)
make_msg(peer_pairs, "NOT connected %s(%s)", peer->peer, real_peer);
show_vals(peer_pairs, "/mars", "connection-from-");
peer->do_additional = false;
if (peer->doing_additional) {
peer->doing_additional = false;
mars_running_additional_peers--;
}
if (do_kill) {
_peer_cleanup(peer);
}
@ -2409,13 +2477,18 @@ void activate_peer(struct mars_rotate *rot, const char *peer_name)
{
struct mars_peerinfo *peer;
if (rot->peer_activated)
if (rot->peer_activated || !peer_name)
return;
peer = find_peer(peer_name);
if (peer) {
peer->do_communicate = true;
rot->peer_activated = true;
peer->do_additional = false;
if (peer->doing_additional) {
peer->doing_additional = false;
mars_running_additional_peers--;
}
}
}
@ -2429,6 +2502,8 @@ static int _kill_peer(struct mars_global *global, struct mars_peerinfo *peer)
}
down_write(&peer_lock);
if (!list_empty(&peer->peer_head))
peer_count--;
list_del_init(&peer->peer_head);
up_write(&peer_lock);
@ -2436,11 +2511,17 @@ static int _kill_peer(struct mars_global *global, struct mars_peerinfo *peer)
if (peer->peer_thread) {
brick_thread_stop(peer->peer_thread);
peer->peer_thread = NULL;
peer->do_communicate = false;
peer->do_additional = false;
}
traced_lock(&peer->lock, flags);
list_replace_init(&peer->remote_dent_list, &tmp_list);
traced_unlock(&peer->lock, flags);
mars_free_dent_all(NULL, &tmp_list);
if (peer->doing_additional) {
peer->doing_additional = false;
mars_running_additional_peers--;
}
brick_string_free(peer->peer);
brick_string_free(peer->path);
return 0;
@ -2460,7 +2541,6 @@ static int _make_peer(struct mars_global *global, struct mars_dent *dent, char *
struct mars_peerinfo *peer;
char *mypeer;
char *parent_path;
bool do_start;
int status = 0;
if (!global || !dent || !dent->new_link || !dent->d_parent || !(parent_path = dent->d_parent->d_path)) {
@ -2496,14 +2576,13 @@ static int _make_peer(struct mars_global *global, struct mars_dent *dent, char *
down_write(&peer_lock);
list_add_tail(&peer->peer_head, &peer_anchor);
peer_count++;
up_write(&peer_lock);
}
peer = dent->d_private;
// check whether we are participating at some resource
do_start = peer->do_communicate;
// create communication thread when necessary
if (!peer->peer_thread && do_start) {
if (!peer->peer_thread && (peer->do_communicate | peer->do_additional)) {
peer->peer_thread = brick_thread_create(peer_thread, peer, "mars_peer%d", serial++);
if (unlikely(!peer->peer_thread)) {
MARS_ERR("cannot start peer thread\n");
@ -2911,6 +2990,8 @@ int make_log_init(void *buf, struct mars_dent *dent)
brick_string_free(rot->preferred_peer);
rot->preferred_peer = NULL;
activate_peer(rot, dent->d_rest);
if (dent->new_link)
sscanf(dent->new_link, "%lld", &rot->dev_size);
if (!rot->parent_path) {
@ -5699,6 +5780,7 @@ static int _main_thread(void *data)
wait_event_interruptible_timeout(_global.main_event, _global.main_trigger, mars_scan_interval * HZ);
_global.main_trigger = false;
additional_peers(mars_run_additional_peers - mars_running_additional_peers);
}
done:

View File

@ -334,6 +334,8 @@ struct ctl_table mars_table[] = {
INT_ENTRY("scan_interval_sec", mars_scan_interval, 0600),
INT_ENTRY("propagate_interval_sec", mars_propagate_interval, 0600),
INT_ENTRY("sync_flip_interval_sec", mars_sync_flip_interval, 0600),
INT_ENTRY("additional_peers_running", mars_running_additional_peers, 0400),
INT_ENTRY("additional_peers_to_run", mars_run_additional_peers, 0600),
INT_ENTRY("peer_abort", mars_peer_abort, 0600),
INT_ENTRY("client_abort", mars_client_abort, 0600),
INT_ENTRY("do_fast_fullsync", mars_fast_fullsync, 0600),

View File

@ -55,6 +55,8 @@ extern int mars_rollover_interval;
extern int mars_scan_interval;
extern int mars_propagate_interval;
extern int mars_sync_flip_interval;
extern int mars_running_additional_peers;
extern int mars_run_additional_peers;
extern int mars_peer_abort;
extern int mars_emergency_mode;
extern int mars_reset_emergency;