server: fix races, completely separate server bricks from main bricks

This commit is contained in:
Thomas Schoebel-Theuer 2013-01-22 12:13:09 +01:00
parent 34dac0689b
commit 03803eeea4
5 changed files with 114 additions and 161 deletions

View File

@ -22,8 +22,6 @@
static struct mars_socket server_socket = {};
static struct task_struct *server_thread = NULL;
static LIST_HEAD(server_list);
static spinlock_t server_lock = __SPIN_LOCK_UNLOCKED(server_lock);
atomic_t server_handler_count = ATOMIC_INIT(0);
EXPORT_SYMBOL_GPL(server_handler_count);
@ -40,13 +38,13 @@ int cb_thread(void *data)
bool ok = mars_get_socket(sock);
int status = -EINVAL;
brick->cb_running = ok;
wake_up_interruptible(&brick->startup_event);
MARS_DBG("--------------- cb_thread starting on socket #%d, ok = %d\n", sock->s_debug_nr, ok);
if (!ok)
goto done;
brick->cb_running = true;
wake_up_interruptible(&brick->startup_event);
while (!brick_thread_should_stop() || !list_empty(&brick->cb_read_list) || !list_empty(&brick->cb_write_list) || atomic_read(&brick->in_flight) > 0) {
struct server_mref_aspect *mref_a;
struct mref_object *mref;
@ -96,11 +94,12 @@ int cb_thread(void *data)
GENERIC_INPUT_CALL(brick->inputs[0], mref_put, mref);
atomic_dec(&brick->in_flight);
}
mars_shutdown_socket(sock);
mars_put_socket(sock);
done:
brick->cb_running = false;
MARS_DBG("---------- cb_thread terminating, status = %d\n", status);
if (ok)
mars_put_socket(sock);
wake_up_interruptible(&brick->startup_event);
return status;
}
@ -148,7 +147,7 @@ int server_io(struct server_brick *brick, struct mars_socket *sock, struct mars_
int amount;
int status = -ENOTRECOVERABLE;
if (!brick->cb_running || !mars_socket_is_alive(sock))
if (!brick->cb_running || !brick->handler_running || !mars_socket_is_alive(sock))
goto done;
mref = server_alloc_mref(brick);
@ -268,18 +267,6 @@ int _set_server_bio_params(struct mars_brick *_brick, void *private)
return 1;
}
static
struct task_struct *_grab_handler(struct server_brick *brick)
{
struct task_struct *res;
spin_lock(&server_lock);
list_del_init(&brick->server_link);
res = brick->handler_thread;
brick->handler_thread = NULL;
spin_unlock(&server_lock);
return res;
}
static
int dummy_worker(struct mars_global *global, struct mars_dent *dent, bool prepare, bool direction)
{
@ -291,18 +278,21 @@ int handler_thread(void *data)
{
struct server_brick *brick = data;
struct mars_socket *sock = &brick->handler_socket;
struct task_struct *cb_thread = brick->cb_thread;
bool ok = mars_get_socket(sock);
int debug_nr;
int status = 0;
brick->cb_thread = NULL;
brick->self_shutdown = true;
wake_up_interruptible(&brick->startup_event);
int status = -EINVAL;
MARS_DBG("#%d --------------- handler_thread starting on socket %p\n", sock->s_debug_nr, sock);
while (brick->cb_running && !brick_thread_should_stop() && mars_socket_is_alive(sock)) {
if (!ok)
goto done;
brick->handler_running = true;
wake_up_interruptible(&brick->startup_event);
while (!brick_thread_should_stop() && mars_socket_is_alive(sock)) {
struct mars_cmd cmd = {};
status = -EINTR;
if (unlikely(!mars_global || !mars_global->global_power.button)) {
MARS_DBG("system is not alive\n");
break;
@ -311,11 +301,12 @@ int handler_thread(void *data)
status = mars_recv_struct(sock, &cmd, mars_cmd_meta);
if (unlikely(status < 0)) {
MARS_WRN("#%d recv cmd status = %d\n", sock->s_debug_nr, status);
break;
goto clean;
}
if (unlikely(!mars_socket_is_alive(sock))) {
MARS_WRN("#%d is dead\n", sock->s_debug_nr);
break;
status = -EINTR;
goto clean;
}
MARS_IO("#%d cmd = %d\n", sock->s_debug_nr, cmd.cmd_code);
@ -349,7 +340,6 @@ int handler_thread(void *data)
struct mars_global local = {
.dent_anchor = LIST_HEAD_INIT(local.dent_anchor),
.brick_anchor = LIST_HEAD_INIT(local.brick_anchor),
.server_anchor = LIST_HEAD_INIT(local.server_anchor),
.global_power = {
.button = true,
},
@ -383,16 +373,15 @@ int handler_thread(void *data)
status = -EINVAL;
CHECK_PTR(path, err);
CHECK_PTR_NULL(mars_global, err);
CHECK_PTR_NULL(_bio_brick_type, err);
if (!mars_global->global_power.button) {
if (!brick->global || !mars_global || !mars_global->global_power.button) {
MARS_WRN("#%d system is not alive\n", sock->s_debug_nr);
goto err;
}
prev = make_brick_all(
mars_global,
brick->global,
NULL,
true,
_set_server_bio_params,
@ -442,50 +431,26 @@ int handler_thread(void *data)
default:
MARS_ERR("#%d unknown command %d\n", sock->s_debug_nr, cmd.cmd_code);
}
clean:
brick_string_free(cmd.cmd_str1);
if (status < 0)
break;
}
mars_shutdown_socket(sock);
mars_put_socket(sock);
done:
MARS_DBG("#%d handler_thread terminating, status = %d\n", sock->s_debug_nr, status);
if (cb_thread) {
MARS_INF("#%d stopping cb thread...\n", sock->s_debug_nr);
brick_thread_stop(cb_thread);
}
_clean_list(brick, &brick->cb_read_list);
_clean_list(brick, &brick->cb_write_list);
debug_nr = sock->s_debug_nr;
/* Normally, the brick should be shut down from outside.
* In case the handler thread stops abnormally (e.g.
* shutdown of socket etc), it has to cleanup itself.
* This is an exception to the basic rule of instance orientation
* that execution logic should be cleanly separated from strategy
* logic.
* So be careful, avoid races by use of _grab_handler().
*/
if (brick->self_shutdown) {
struct task_struct *h_thread;
MARS_DBG("#%d self-shutdown\n", debug_nr);
h_thread = _grab_handler(brick);
mars_put_socket(sock);
if (h_thread) {
int status;
MARS_DBG("#%d self cleanup...\n", debug_nr);
status = mars_kill_brick((void*)brick);
if (status < 0) {
MARS_ERR("#%d kill status = %d, giving up\n", debug_nr, status);
}
put_task_struct(h_thread);
}
}
MARS_DBG("#%d done.\n", debug_nr);
atomic_dec(&server_handler_count);
brick->killme = true;
return status;
}
@ -517,18 +482,20 @@ static void server_ref_io(struct server_output *output, struct mref_object *mref
static int server_switch(struct server_brick *brick)
{
struct mars_socket *sock = &brick->handler_socket;
int status = 0;
if (brick->power.button) {
static int version = 0;
bool ok = mars_get_socket(sock);
if (unlikely(!ok)) {
status = -ENOENT;
goto err;
}
mars_power_led_off((void*)brick, false);
MARS_INF("starting.....\n");
spin_lock(&server_lock);
list_add(&brick->server_link, &server_list);
spin_unlock(&server_lock);
brick->cb_thread = brick_thread_create(cb_thread, brick, "mars_cb%d", version);
if (unlikely(!brick->cb_thread)) {
MARS_ERR("cannot create cb thread\n");
@ -540,29 +507,36 @@ static int server_switch(struct server_brick *brick)
if (unlikely(!brick->handler_thread)) {
MARS_ERR("cannot create handler thread\n");
brick_thread_stop(brick->cb_thread);
brick->cb_thread = NULL;
status = -ENOENT;
goto err;
}
wait_event_interruptible(brick->startup_event, brick->cb_thread == NULL);
err:
if (status >= 0) {
mars_power_led_on((void*)brick, true);
}
} else {
mars_power_led_on((void*)brick, true);
} else if (brick->power.led_on) {
struct task_struct *thread;
struct mars_socket *sock = &brick->handler_socket;
mars_power_led_on((void*)brick, false);
mars_shutdown_socket(sock);
thread = _grab_handler(brick);
thread = brick->handler_thread;
if (thread) {
brick->handler_thread = NULL;
MARS_INF("stopping handler thread....\n");
brick->handler_running = false;
MARS_DBG("#%d stopping handler thread....\n", sock->s_debug_nr);
brick_thread_stop(thread);
if (sock->s_socket)
mars_put_socket(sock);
}
thread = brick->cb_thread;
if (thread) {
brick->cb_thread = NULL;
brick->cb_running = false;
MARS_DBG("#%d stopping callback thread....\n", sock->s_debug_nr);
brick_thread_stop(thread);
}
mars_put_socket(sock);
MARS_DBG("#%d socket s_count = %d\n", sock->s_debug_nr, atomic_read(&sock->s_count));
mars_power_led_off((void*)brick, true);
}
err:
if (unlikely(status < 0)) {
mars_power_led_off((void*)brick, true);
}
return status;
@ -579,10 +553,10 @@ char *server_statistics(struct server_brick *brick, int verbose)
snprintf(res, 1024,
"cb_running = %d "
"self_shutdown = %d "
"handler_running = %d "
"in_flight = %d\n",
brick->cb_running,
brick->self_shutdown,
brick->handler_running,
atomic_read(&brick->in_flight));
return res;
@ -614,7 +588,6 @@ MARS_MAKE_STATICS(server);
static int server_brick_construct(struct server_brick *brick)
{
INIT_LIST_HEAD(&brick->server_link);
init_waitqueue_head(&brick->startup_event);
init_waitqueue_head(&brick->cb_event);
sema_init(&brick->socket_sem, 1);
@ -626,9 +599,6 @@ static int server_brick_construct(struct server_brick *brick)
static int server_brick_destruct(struct server_brick *brick)
{
spin_lock(&server_lock);
list_del_init(&brick->server_link);
spin_unlock(&server_lock);
CHECK_HEAD_EMPTY(&brick->cb_read_list);
CHECK_HEAD_EMPTY(&brick->cb_write_list);
return 0;
@ -694,10 +664,20 @@ EXPORT_SYMBOL_GPL(server_brick_type);
static int _server_thread(void *data)
{
struct mars_global server_global = {
.dent_anchor = LIST_HEAD_INIT(server_global.dent_anchor),
.brick_anchor = LIST_HEAD_INIT(server_global.brick_anchor),
.global_power = {
.button = true,
},
.main_event = __WAIT_QUEUE_HEAD_INITIALIZER(server_global.main_event),
};
char *id = my_id();
struct server_brick *brick = NULL;
int status = 0;
init_rwsem(&server_global.dent_mutex);
init_rwsem(&server_global.brick_mutex);
MARS_INF("-------- server starting on host '%s' ----------\n", id);
while (!brick_thread_should_stop() &&
@ -708,19 +688,30 @@ static int _server_thread(void *data)
MARS_INF("-------- server now working on host '%s' ----------\n", id);
while (!brick_thread_should_stop() && mars_global && mars_global->global_power.button) {
while (!brick_thread_should_stop() || !list_empty(&server_global.brick_anchor)) {
struct server_brick *brick = NULL;
struct mars_socket handler_socket = {};
if (!brick) {
brick = (void*)mars_make_brick(mars_global, NULL, true, &server_brick_type, "server", "server");
if (!brick) {
MARS_ERR("cannot create server instance\n");
brick_msleep(5000);
continue;
}
server_global.global_version++;
show_statistics(&server_global, "server");
status = mars_kill_brick_when_possible(&server_global, &server_global.brick_anchor, false, (void*)&server_brick_type, false);
MARS_DBG("kill server bricks (when possible) = %d\n", status);
status = mars_kill_brick_when_possible(&server_global, &server_global.brick_anchor, false, (void*)&bio_brick_type, false);
MARS_DBG("kill server bio bricks (when possible) = %d\n", status);
status = mars_kill_brick_when_possible(&server_global, &server_global.brick_anchor, false, (void*)&aio_brick_type, false);
MARS_DBG("kill server aio bricks (when possible) = %d\n", status);
status = mars_kill_brick_when_possible(&server_global, &server_global.brick_anchor, false, (void*)&sio_brick_type, false);
MARS_DBG("kill server sio bricks (when possible) = %d\n", status);
if (!mars_global || !mars_global->global_power.button) {
brick_msleep(2000);
continue;
}
status = mars_accept_socket(&brick->handler_socket, &server_socket);
if (unlikely(status < 0)) {
status = mars_accept_socket(&handler_socket, &server_socket);
if (unlikely(status < 0 || !mars_socket_is_alive(&handler_socket))) {
brick_msleep(500);
if (status == -EAGAIN)
continue; // without error message
@ -728,20 +719,25 @@ static int _server_thread(void *data)
brick_msleep(2000);
continue;
}
brick->handler_socket.s_shutdown_on_err = true;
handler_socket.s_shutdown_on_err = true;
MARS_DBG("got new connection #%d\n", brick->handler_socket.s_debug_nr);
MARS_DBG("got new connection #%d\n", handler_socket.s_debug_nr);
brick = (void*)mars_make_brick(&server_global, NULL, true, &server_brick_type, "handler", "handler");
if (!brick) {
MARS_ERR("cannot create server instance\n");
mars_shutdown_socket(&handler_socket);
mars_put_socket(&handler_socket);
brick_msleep(2000);
continue;
}
memcpy(&brick->handler_socket, &handler_socket, sizeof(struct mars_socket));
atomic_inc(&server_handler_count);
/* TODO: check authorization.
*/
if (!mars_global || !mars_global->global_power.button || brick_thread_should_stop()) {
MARS_DBG("system is not alive\n");
goto err;
}
brick->power.button = true;
status = server_switch(brick);
if (unlikely(status < 0)) {
@ -749,12 +745,19 @@ static int _server_thread(void *data)
goto err;
}
// further references are usually held by the threads
mars_put_socket(&brick->handler_socket);
/* fire and forget....
* the new instance is now responsible for itself.
*/
brick = NULL;
brick_msleep(1000);
brick_msleep(100);
continue;
err:
if (brick) {
mars_shutdown_socket(&brick->handler_socket);
mars_put_socket(&brick->handler_socket);
status = mars_kill_brick((void*)brick);
if (status < 0) {
@ -768,40 +771,8 @@ static int _server_thread(void *data)
MARS_INF("-------- cleaning up ----------\n");
if (brick) {
MARS_DBG("cleaning up unused brick\n");
#if 0 // FIXME
if (brick->handler_socket.s_socket) {
MARS_DBG("put socket\n");
mars_put_socket(&brick->handler_socket);
}
status = mars_kill_brick((void*)brick);
if (unlikely(status < 0)) {
BRICK_WRN("kill status = %d, giving up\n", status);
}
#endif
brick = NULL;
}
mars_kill_brick_all(&server_global, &server_global.brick_anchor, false);
spin_lock(&server_lock);
while (!list_empty(&server_list)) {
struct list_head *tmp = server_list.next;
struct server_brick *brick = container_of(tmp, struct server_brick, server_link);
list_del_init(tmp);
brick->self_shutdown = false;
spin_unlock(&server_lock);
MARS_INF("cleanup ....\n");
status = mars_kill_brick((void*)brick);
if (status < 0) {
BRICK_ERR("kill status = %d, giving up\n", status);
}
spin_lock(&server_lock);
}
spin_unlock(&server_lock);
//cleanup_mm();
MARS_INF("-------- done status = %d ----------\n", status);

View File

@ -23,7 +23,6 @@ struct server_output {
struct server_brick {
MARS_BRICK(server);
struct list_head server_link;
atomic_t in_flight;
struct semaphore socket_sem;
struct mars_socket handler_socket;
@ -35,7 +34,7 @@ struct server_brick {
struct list_head cb_read_list;
struct list_head cb_write_list;
bool cb_running;
bool self_shutdown;
bool handler_running;
};
struct server_input {

View File

@ -3964,7 +3964,6 @@ err:
static struct mars_global _global = {
.dent_anchor = LIST_HEAD_INIT(_global.dent_anchor),
.brick_anchor = LIST_HEAD_INIT(_global.brick_anchor),
.server_anchor = LIST_HEAD_INIT(_global.server_anchor),
.global_power = {
.button = true,
},
@ -3988,7 +3987,7 @@ static int light_thread(void *data)
MARS_INF("-------- starting as host '%s' ----------\n", id);
while (_global.global_power.button || !list_empty(&_global.brick_anchor) || atomic_read(&server_handler_count) > 0) {
while (_global.global_power.button || !list_empty(&_global.brick_anchor)) {
int status;
loff_t rest_space;
bool exhausted;
@ -4027,12 +4026,6 @@ static int light_thread(void *data)
rest_space = _global.remaining_space - EXHAUSTED_LIMIT(_global.total_space);
_make_alivelink("rest-space", rest_space);
if (!_global.global_power.button && is_shutdown()) {
MARS_INF("global shutdown of all bricks...\n");
brick_msleep(500);
mars_kill_brick_all(&_global, &_global.server_anchor, false);
}
MARS_DBG("-------- start worker ---------\n");
_global.deleted_min = 0;
status = mars_dent_work(&_global, "/mars", sizeof(struct mars_dent), light_checker, light_worker, &_global, 3);
@ -4049,10 +4042,6 @@ static int light_thread(void *data)
MARS_DBG("kill aio bricks (when possible) = %d\n", status);
status = mars_kill_brick_when_possible(&_global, &_global.brick_anchor, false, (void*)&sio_brick_type, false);
MARS_DBG("kill sio bricks (when possible) = %d\n", status);
status = mars_kill_brick_when_possible(&_global, &_global.server_anchor, false, (void*)&aio_brick_type, false);
MARS_DBG("kill server aio bricks (when possible) = %d\n", status);
status = mars_kill_brick_when_possible(&_global, &_global.server_anchor, false, (void*)&sio_brick_type, false);
MARS_DBG("kill server sio bricks (when possible) = %d\n", status);
if ((long long)jiffies + rollover_time * HZ >= last_rollover) {
last_rollover = jiffies;
@ -4075,7 +4064,6 @@ done:
mars_remote_trigger();
brick_msleep(2000);
mars_kill_brick_all(&_global, &_global.server_anchor, false);
mars_free_dent_all(&_global, &_global.dent_anchor);
mars_kill_brick_all(&_global, &_global.brick_anchor, false);

View File

@ -57,7 +57,6 @@ struct mars_global {
struct generic_switch global_power;
struct list_head dent_anchor;
struct list_head brick_anchor;
struct list_head server_anchor;
wait_queue_head_t main_event;
loff_t total_space;
loff_t remaining_space;

View File

@ -1109,13 +1109,9 @@ struct mars_brick *mars_make_brick(struct mars_global *global, struct mars_dent
* Switching on / etc must be done separately.
*/
down_write(&global->brick_mutex);
if (!is_server) {
list_add(&res->global_brick_link, &global->brick_anchor);
if (belongs) {
list_add_tail(&res->dent_brick_link, &belongs->brick_list);
}
} else {
list_add_tail(&res->global_brick_link, &global->server_anchor);
list_add(&res->global_brick_link, &global->brick_anchor);
if (belongs) {
list_add_tail(&res->dent_brick_link, &belongs->brick_list);
}
up_write(&global->brick_mutex);