diff --git a/brick.c b/brick.c index 9817a5e0..7d4f9d75 100644 --- a/brick.c +++ b/brick.c @@ -857,13 +857,13 @@ int set_recursive_button(struct generic_brick *orig_brick, brick_switch_t mode, /* eliminate duplicates */ \ for (j = 0; j < stack; j++) { \ if (table[j] == (next)) { \ - BRICK_DBG(" double entry %d '%s' stack = %d\n", i, (next)->brick_name, stack); \ + BRICK_DBG(" double entry %d '%s' stack = %d\n", i, SAFE_STR((next)->brick_name), stack); \ found = true; \ break; \ } \ } \ if (!found) { \ - BRICK_DBG(" push '%s' stack = %d\n", (next)->brick_name, stack); \ + BRICK_DBG(" push '%s' stack = %d\n", SAFE_STR((next)->brick_name), stack); \ table[stack++] = (next); \ if (unlikely(stack > max)) { \ BRICK_ERR("---- max = %d overflow, restarting...\n", max); \ @@ -873,7 +873,7 @@ int set_recursive_button(struct generic_brick *orig_brick, brick_switch_t mode, } restart: - BRICK_DBG("-> orig_brick = '%s'\n", orig_brick->brick_name); + BRICK_DBG("-> orig_brick = '%s'\n", SAFE_STR(orig_brick->brick_name)); brick_mem_free(table); max <<= 1; table = brick_mem_alloc(max * sizeof(void*)); @@ -888,7 +888,7 @@ int set_recursive_button(struct generic_brick *orig_brick, brick_switch_t mode, for (pos = 0; pos < stack; pos++) { struct generic_brick *brick = table[pos]; - BRICK_DBG("--> pos = %d stack = %d brick = '%s' inputs = %d/%d outputs = %d/%d\n", pos, stack, brick->brick_name, brick->nr_inputs, brick->type->max_inputs, brick->nr_outputs, brick->type->max_outputs); + BRICK_DBG("--> pos = %d stack = %d brick = '%s' inputs = %d/%d outputs = %d/%d\n", pos, stack, SAFE_STR(brick->brick_name), brick->nr_inputs, brick->type->max_inputs, brick->nr_outputs, brick->type->max_outputs); if (val) { force = false; @@ -945,19 +945,19 @@ int set_recursive_button(struct generic_brick *orig_brick, brick_switch_t mode, while (stack > 0) { struct generic_brick *brick = table[--stack]; - BRICK_DBG("--> switch '%s' stack = %d\n", brick->brick_name, stack); + BRICK_DBG("--> switch '%s' stack = %d\n", SAFE_STR(brick->brick_name), stack); set_button_wait(brick, val, force, timeout); if (val ? !brick->power.led_on : !brick->power.led_off) { - BRICK_DBG("switching to %d: brick '%s' not ready (%s)\n", val, brick->brick_name, orig_brick->brick_name); + BRICK_DBG("switching to %d: brick '%s' not ready (%s)\n", val, SAFE_STR(brick->brick_name), SAFE_STR(orig_brick->brick_name)); goto done; } if (force && !val && (mode == BR_FREE_ONE || mode == BR_FREE_ALL) && brick->free) { - BRICK_DBG("---> freeing '%s'\n", brick->brick_name); + BRICK_DBG("---> freeing '%s'\n", SAFE_STR(brick->brick_name)); status = brick->free(brick); - BRICK_DBG("---> freeing '%s' status = %d\n", brick->brick_name, status); + BRICK_DBG("---> freeing '%s' status = %d\n", SAFE_STR(brick->brick_name), status); if (status < 0) { - BRICK_DBG("freeing brick '%s' (%s) failed, status = %d\n", brick->brick_name, orig_brick->brick_name, status); + BRICK_DBG("freeing brick '%s' (%s) failed, status = %d\n", SAFE_STR(brick->brick_name), SAFE_STR(orig_brick->brick_name), status); goto done; } } @@ -966,7 +966,7 @@ int set_recursive_button(struct generic_brick *orig_brick, brick_switch_t mode, status = 0; done: - BRICK_DBG("-> done '%s' status = %d\n", orig_brick->brick_name, status); + BRICK_DBG("-> done '%s' status = %d\n", SAFE_STR(orig_brick->brick_name), status); brick_mem_free(table); return status; } diff --git a/brick.h b/brick.h index 6a78c240..774c6b2e 100644 --- a/brick.h +++ b/brick.h @@ -11,6 +11,26 @@ #include "meta.h" +#define MAX_BRICK_TYPES 64 +#ifndef BRICK_OBJ_MAX +#define BRICK_OBJ_MAX /*empty => leads to an open array */ +#endif + +extern int brick_layout_generation; +extern int brick_obj_max; + +#ifdef _STRATEGY +#define _STRATEGY_CODE(X) X +#define _NORMAL_CODE(X) /**/ +#else +#define _STRATEGY_CODE(X) /**/ +#define _NORMAL_CODE(X) X +#endif + +///////////////////////////////////////////////////////////////////////// + +// printk() replacements + #ifdef CONFIG_DEBUG_KERNEL #define INLINE static inline //#define INLINE __attribute__((__noinline__)) @@ -27,13 +47,7 @@ extern void brick_dump_stack(void); #endif // CONFIG_DEBUG_KERNEL -#ifdef _STRATEGY -#define _STRATEGY_CODE(X) X -#define _NORMAL_CODE(X) /**/ -#else -#define _STRATEGY_CODE(X) /**/ -#define _NORMAL_CODE(X) X -#endif +#define SAFE_STR(str) ((str) ? (str) : "NULL") #define BRICK_FATAL "BRICK_FATAL " #define BRICK_ERROR "BRICK_ERROR " @@ -62,14 +76,6 @@ extern void brick_dump_stack(void); #define BRICK_IO(_args...) /*empty*/ #endif -#define MAX_BRICK_TYPES 64 -#ifndef BRICK_OBJ_MAX -#define BRICK_OBJ_MAX /*empty => leads to an open array */ -#endif - -extern int brick_layout_generation; -extern int brick_obj_max; - ///////////////////////////////////////////////////////////////////////// // number management helpers diff --git a/brick_mem.c b/brick_mem.c index fd5b5916..38af939d 100644 --- a/brick_mem.c +++ b/brick_mem.c @@ -3,6 +3,7 @@ #include #include #include +#include #include @@ -27,6 +28,18 @@ ///////////////////////////////////////////////////////////////////////// +// limit handling + +#define LIMIT_MEM +#ifdef LIMIT_MEM +#include +#include +#endif +long long brick_global_memlimit = 0; +EXPORT_SYMBOL_GPL(brick_global_memlimit); + +///////////////////////////////////////////////////////////////////////// + // small memory allocation (use this only for len < PAGE_SIZE) #ifdef BRICK_DEBUG_MEM @@ -40,7 +53,11 @@ static int mem_len[BRICK_DEBUG_MEM] = {}; void *_brick_mem_alloc(int len, int line) { - void *res = kmalloc(len + PLUS_SIZE + sizeof(int), GFP_BRICK); + void *res; +#ifdef CONFIG_DEBUG_KERNEL + might_sleep(); +#endif + res = kmalloc(len + PLUS_SIZE + sizeof(int), GFP_BRICK); #ifdef BRICK_DEBUG_MEM if (likely(res)) { if (unlikely(line < 0)) @@ -96,6 +113,9 @@ static atomic_t string_free[BRICK_DEBUG_MEM] = {}; char *_brick_string_alloc(int len, int line) { char *res; +#ifdef CONFIG_DEBUG_KERNEL + might_sleep(); +#endif #ifdef FIXME if (len <= 0) len = BRICK_STRING_LEN; @@ -167,6 +187,9 @@ void *_brick_block_alloc(loff_t pos, int len, int line) return NULL; } #endif +#ifdef CONFIG_DEBUG_KERNEL + might_sleep(); +#endif #ifdef USE_OFFSET offset = pos & (PAGE_SIZE-1); #endif @@ -291,6 +314,11 @@ EXPORT_SYMBOL_GPL(brick_mem_statistics); int __init init_brick_mem(void) { +#ifdef LIMIT_MEM // provisionary + brick_global_memlimit = total_swapcache_pages * (PAGE_SIZE / 4); + BRICK_INF("brick_global_memlimit = %lld\n", brick_global_memlimit); +#endif + return 0; } diff --git a/brick_mem.h b/brick_mem.h index c29cb80f..579ac2ef 100644 --- a/brick_mem.h +++ b/brick_mem.h @@ -7,6 +7,8 @@ #define GFP_BRICK GFP_NOIO //#define GFP_BRICK GFP_KERNEL // can lead to deadlocks! +extern long long brick_global_memlimit; + ///////////////////////////////////////////////////////////////////////// // small memory allocation (use this only for len < PAGE_SIZE) diff --git a/mars.h b/mars.h index 50ce04bf..c131e138 100644 --- a/mars.h +++ b/mars.h @@ -9,8 +9,6 @@ #define msleep msleep_interruptible -extern long long mars_global_memlimit; - ///////////////////////////////////////////////////////////////////////// // include the generic brick infrastructure @@ -309,11 +307,14 @@ extern const struct meta mars_timespec_meta[]; MARS_ERR("%d: list_head " #head " (%p) not empty\n", __LINE__, head); \ } \ -#define CHECK_PTR(ptr,label) \ +#define CHECK_PTR_NULL(ptr,label) \ if (CHECKING && unlikely(!(ptr))) { \ MARS_FAT("%d: ptr '" #ptr "' is NULL\n", __LINE__); \ goto label; \ - } \ + } + +#define CHECK_PTR(ptr,label) \ + CHECK_PTR_NULL(ptr, label); \ if (CHECKING && unlikely(!virt_addr_valid(ptr))) { \ MARS_FAT("%d: ptr '" #ptr "' is no valid virtual KERNEL address\n", __LINE__); \ goto label; \ diff --git a/mars_aio.c b/mars_aio.c index e4ff24ff..c62a8258 100644 --- a/mars_aio.c +++ b/mars_aio.c @@ -616,6 +616,22 @@ static int aio_event_thread(void *data) return err; } +#if 1 +/* This should go to fs/open.c (as long as vfs_submit() is not implemented) + */ +#include +void fd_uninstall(unsigned int fd) +{ + struct files_struct *files = current->files; + struct fdtable *fdt; + spin_lock(&files->file_lock); + fdt = files_fdtable(files); + rcu_assign_pointer(fdt->fd[fd], NULL); + spin_unlock(&files->file_lock); +} +EXPORT_SYMBOL(fd_uninstall); +#endif + static int aio_submit_thread(void *data) { struct aio_threadinfo *tinfo = data; @@ -623,8 +639,9 @@ static int aio_submit_thread(void *data) struct file *file = output->filp; int err; - /* TODO: this is provisionary. We only need it for sys_io_submit(). - * The latter should be accompanied by a future vfs_submit() or + /* TODO: this is provisionary. We only need it for sys_io_submit() + * which uses userspace concepts like file handles. + * This should be accompanied by a future kernelsapce vfs_submit() or * do_submit() which currently does not exist :( * FIXME: corresponding cleanup NYI */ @@ -759,7 +776,8 @@ static int aio_submit_thread(void *data) output->ctxp = 0; } #endif - MARS_INF("destroying fd.....\n"); + MARS_INF("destroying fd %d\n", output->fd); + fd_uninstall(output->fd); put_unused_fd(output->fd); unuse_fake_mm(); err = 0; diff --git a/mars_client.c b/mars_client.c index 1e5e5254..3d716a56 100644 --- a/mars_client.c +++ b/mars_client.c @@ -21,22 +21,26 @@ static int thread_count = 0; +static void _kill_thread(struct client_threadinfo *ti) +{ + if (ti->thread) { + MARS_INF("stopping thread...\n"); + kthread_stop(ti->thread); + ti->thread = NULL; + } +} + static void _kill_socket(struct client_output *output) { if (output->socket) { MARS_DBG("shutdown socket\n"); - kernel_sock_shutdown(output->socket, SHUT_WR); - //sock_release(output->socket); - output->socket = NULL; + mars_shutdown_socket(output->socket); } -} - -static void _kill_thread(struct client_threadinfo *ti) -{ - if (ti->thread && !ti->terminated) { - MARS_INF("stopping thread...\n"); - kthread_stop(ti->thread); - ti->thread = NULL; + _kill_thread(&output->receiver); + if (output->socket) { + MARS_DBG("close socket\n"); + mars_put_socket(output->socket); + output->socket = NULL; } } @@ -48,19 +52,21 @@ static int _request_info(struct client_output *output) int status; MARS_DBG("\n"); - status = mars_send_struct(&output->socket, &cmd, mars_cmd_meta); + status = mars_send_struct(output->socket, &cmd, mars_cmd_meta); if (unlikely(status < 0)) { MARS_DBG("send of getinfo failed, status = %d\n", status); } return status; } +static int receiver_thread(void *data); + static int _connect(struct client_output *output, const char *str) { struct sockaddr_storage sockaddr = {}; int status; - if (!output->path) { + if (unlikely(!output->path)) { output->path = brick_strdup(str); status = -ENOMEM; if (!output->path) { @@ -78,14 +84,18 @@ static int _connect(struct client_output *output, const char *str) *output->host++ = '\0'; } + _kill_socket(output); + status = mars_create_sockaddr(&sockaddr, output->host); if (unlikely(status < 0)) { MARS_DBG("no sockaddr, status = %d\n", status); goto done; } - status = mars_create_socket(&output->socket, &sockaddr, false); - if (unlikely(status < 0)) { + output->socket = mars_create_socket(&sockaddr, false); + if (unlikely(IS_ERR(output->socket))) { + status = PTR_ERR(output->socket); + output->socket = NULL; if (status == -EINPROGRESS) { MARS_DBG("operation is in progress....\n"); goto really_done; // give it a chance next time @@ -94,13 +104,24 @@ static int _connect(struct client_output *output, const char *str) goto done; } + output->receiver.thread = kthread_create(receiver_thread, output, "mars_receiver%d", thread_count++); + if (unlikely(IS_ERR(output->receiver.thread))) { + status = PTR_ERR(output->receiver.thread); + MARS_ERR("cannot start receiver thread, status = %d\n", status); + output->receiver.thread = NULL; + output->receiver.terminated = true; + goto done; + } + wake_up_process(output->receiver.thread); + + { struct mars_cmd cmd = { .cmd_code = CMD_CONNECT, .cmd_str1 = output->path, }; - status = mars_send_struct(&output->socket, &cmd, mars_cmd_meta); + status = mars_send_struct(output->socket, &cmd, mars_cmd_meta); if (unlikely(status < 0)) { MARS_DBG("send of connect failed, status = %d\n", status); goto done; @@ -239,7 +260,7 @@ int receiver_thread(void *data) struct client_output *output = data; int status = 0; - while (status >= 0 && output->socket && !kthread_should_stop()) { + while (status >= 0 && mars_socket_is_alive(output->socket) && !kthread_should_stop()) { struct mars_cmd cmd = {}; struct list_head *tmp; struct client_mref_aspect *mref_a = NULL; @@ -247,7 +268,7 @@ int receiver_thread(void *data) struct generic_callback *cb; unsigned long flags; - status = mars_recv_struct(&output->socket, &cmd, mars_cmd_meta); + status = mars_recv_struct(output->socket, &cmd, mars_cmd_meta); MARS_IO("got cmd = %d status = %d\n", cmd.cmd_code, status); if (status < 0) goto done; @@ -291,7 +312,7 @@ int receiver_thread(void *data) MARS_IO("got callback id = %d, old pos = %lld len = %d rw = %d\n", mref->ref_id, mref->ref_pos, mref->ref_len, mref->ref_rw); - status = mars_recv_cb(&output->socket, mref); + status = mars_recv_cb(output->socket, mref); MARS_IO("new status = %d, pos = %lld len = %d rw = %d\n", status, mref->ref_pos, mref->ref_len, mref->ref_rw); if (status < 0) { MARS_ERR("interrupted data transfer during callback, status = %d\n", status); @@ -313,7 +334,7 @@ int receiver_thread(void *data) break; } case CMD_GETINFO: - status = mars_recv_struct(&output->socket, &output->info, mars_info_meta); + status = mars_recv_struct(output->socket, &output->info, mars_info_meta); if (status < 0) { MARS_ERR("got bad info from remote side, status = %d\n", status); goto done; @@ -333,14 +354,8 @@ int receiver_thread(void *data) if (status < 0) { MARS_ERR("receiver thread terminated with status = %d\n", status); } -#if 0 - if (output->socket) { - MARS_INF("shutting down socket\n"); - kernel_sock_shutdown(output->socket, SHUT_WR); - msleep(1000); - output->socket = NULL; - } -#endif + + mars_shutdown_socket(output->socket); output->receiver.terminated = true; wake_up_interruptible(&output->receiver.run_event); return status; @@ -391,26 +406,6 @@ static int sender_thread(void *data) wait_event_interruptible_timeout(output->event, !list_empty(&output->mref_list) || output->get_info, 10 * HZ); - if (unlikely(output->receiver.terminated)) { -#if 1 - if (unlikely(output->receiver.restart_count++ > 3)) { // don't restart too often - MARS_ERR("receiver failed too often, giving up\n"); - status = -ECOMM; - break; - } -#endif - output->receiver.terminated = false; - output->receiver.thread = kthread_create(receiver_thread, output, "mars_receiver%d", thread_count++); - if (unlikely(IS_ERR(output->receiver.thread))) { - MARS_ERR("cannot start receiver thread, status = %d\n", (int)PTR_ERR(output->receiver.thread)); - output->receiver.thread = NULL; - output->receiver.terminated = true; - msleep(5000); - continue; - } - wake_up_process(output->receiver.thread); - } - if (output->get_info) { status = _request_info(output); if (status >= 0) { @@ -432,7 +427,7 @@ static int sender_thread(void *data) MARS_IO("sending mref, id = %d pos = %lld len = %d rw = %d\n", mref->ref_id, mref->ref_pos, mref->ref_len, mref->ref_rw); - status = mars_send_mref(&output->socket, mref); + status = mars_send_mref(output->socket, mref); MARS_IO("status = %d\n", status); if (unlikely(status < 0)) { // retry submission on next occasion.. @@ -444,19 +439,16 @@ static int sender_thread(void *data) MARS_ERR("sending failed, status = %d\n", status); _kill_socket(output); - _kill_thread(&output->receiver); - - wait_event_interruptible_timeout(output->receiver.run_event, output->receiver.terminated, 10 * HZ); continue; } } //done: - if (status < 0) + if (status < 0) { MARS_ERR("sender thread terminated with status = %d\n", status); + } _kill_socket(output); - _kill_thread(&output->receiver); output->sender.terminated = true; wake_up_interruptible(&output->sender.run_event); diff --git a/mars_client.h b/mars_client.h index 6b19907f..2e9cca05 100644 --- a/mars_client.h +++ b/mars_client.h @@ -38,7 +38,7 @@ struct client_output { struct list_head wait_list; wait_queue_head_t event; int last_id; - struct socket *socket; + struct mars_socket *socket; char *host; char *path; struct client_threadinfo sender; diff --git a/mars_generic.c b/mars_generic.c index c5bd5097..aefbde73 100644 --- a/mars_generic.c +++ b/mars_generic.c @@ -226,15 +226,6 @@ EXPORT_SYMBOL_GPL(mars_power_led_off); void (*_mars_trigger)(void) = NULL; EXPORT_SYMBOL_GPL(_mars_trigger); -#define LIMIT_MEM -#ifdef LIMIT_MEM -#include -#include -#endif -long long mars_global_memlimit = 0; -EXPORT_SYMBOL_GPL(mars_global_memlimit); - - struct mm_struct *mm_fake = NULL; EXPORT_SYMBOL_GPL(mm_fake); @@ -242,11 +233,6 @@ int __init init_mars(void) { MARS_INF("init_mars()\n"); -#ifdef LIMIT_MEM // provisionary - mars_global_memlimit = total_swapcache_pages * (PAGE_SIZE / 4); - MARS_INF("mars_global_memlimit = %lld\n", mars_global_memlimit); -#endif - set_fake(); #ifdef MARS_TRACING diff --git a/mars_net.c b/mars_net.c index 7d982b0b..7f675c87 100644 --- a/mars_net.c +++ b/mars_net.c @@ -93,68 +93,164 @@ int mars_create_sockaddr(struct sockaddr_storage *addr, const char *spec) } EXPORT_SYMBOL_GPL(mars_create_sockaddr); -int mars_create_socket(struct socket **sock, struct sockaddr_storage *addr, bool is_server) +/* The original struct socket has no refcount. This leads to problems + * during long-lasting system calls when racing with socket shutdown. + * This is just a small wrapper adding a refcount. + */ +struct mars_socket { + atomic_t s_count; + struct socket *s_socket; + bool s_dead; +}; + +struct mars_socket *mars_create_socket(struct sockaddr_storage *addr, bool is_server) { + struct mars_socket *msock; + struct socket *sock; struct sockaddr *sockaddr = (void*)addr; int x_true = 1; - int status = 0; + int status = -ENOMEM; - if (!*sock) { - status = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, sock); - if (unlikely(status < 0)) { - *sock = NULL; - MARS_WRN("cannot create socket, status = %d\n", status); - goto done; - } + msock = brick_zmem_alloc(sizeof(struct mars_socket)); + if (!msock) + goto done; - /* TODO: improve this by a table-driven approach - */ - (*sock)->sk->sk_rcvtimeo = (*sock)->sk->sk_sndtimeo = default_tcp_params.tcp_timeout * HZ; - status = kernel_setsockopt(*sock, SOL_SOCKET, SO_SNDBUF, (char*)&default_tcp_params.window_size, sizeof(default_tcp_params.window_size)); - _check(status); - status = kernel_setsockopt(*sock, SOL_SOCKET, SO_RCVBUF, (char*)&default_tcp_params.window_size, sizeof(default_tcp_params.window_size)); - _check(status); - status = kernel_setsockopt(*sock, SOL_IP, SO_PRIORITY, (char*)&default_tcp_params.tos, sizeof(default_tcp_params.tos)); - _check(status); -#if 0 - status = kernel_setsockopt(*sock, IPPROTO_TCP, TCP_NODELAY, (char*)&x_true, sizeof(x_true)); -#endif - _check(status); - status = kernel_setsockopt(*sock, SOL_SOCKET, SO_KEEPALIVE, (char*)&x_true, sizeof(x_true)); - _check(status); - status = kernel_setsockopt(*sock, IPPROTO_TCP, TCP_KEEPCNT, (char*)&default_tcp_params.tcp_keepcnt, sizeof(default_tcp_params.tcp_keepcnt)); - _check(status); - status = kernel_setsockopt(*sock, IPPROTO_TCP, TCP_KEEPINTVL, (char*)&default_tcp_params.tcp_keepintvl, sizeof(default_tcp_params.tcp_keepintvl)); - _check(status); - status = kernel_setsockopt(*sock, IPPROTO_TCP, TCP_KEEPIDLE, (char*)&default_tcp_params.tcp_keepidle, sizeof(default_tcp_params.tcp_keepidle)); - _check(status); + status = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &msock->s_socket); + if (unlikely(status < 0)) { + MARS_WRN("cannot create socket, status = %d\n", status); + goto done; } + atomic_set(&msock->s_count, 1); + sock = msock->s_socket; + status = -EINVAL; + CHECK_PTR(sock, done); + + /* TODO: improve this by a table-driven approach + */ + sock->sk->sk_rcvtimeo = sock->sk->sk_sndtimeo = default_tcp_params.tcp_timeout * HZ; + status = kernel_setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char*)&default_tcp_params.window_size, sizeof(default_tcp_params.window_size)); + _check(status); + status = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char*)&default_tcp_params.window_size, sizeof(default_tcp_params.window_size)); + _check(status); + status = kernel_setsockopt(sock, SOL_IP, SO_PRIORITY, (char*)&default_tcp_params.tos, sizeof(default_tcp_params.tos)); + _check(status); +#if 0 + status = kernel_setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char*)&x_true, sizeof(x_true)); +#endif + _check(status); + status = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char*)&x_true, sizeof(x_true)); + _check(status); + status = kernel_setsockopt(sock, IPPROTO_TCP, TCP_KEEPCNT, (char*)&default_tcp_params.tcp_keepcnt, sizeof(default_tcp_params.tcp_keepcnt)); + _check(status); + status = kernel_setsockopt(sock, IPPROTO_TCP, TCP_KEEPINTVL, (char*)&default_tcp_params.tcp_keepintvl, sizeof(default_tcp_params.tcp_keepintvl)); + _check(status); + status = kernel_setsockopt(sock, IPPROTO_TCP, TCP_KEEPIDLE, (char*)&default_tcp_params.tcp_keepidle, sizeof(default_tcp_params.tcp_keepidle)); + _check(status); if (is_server) { - status = kernel_bind(*sock, sockaddr, sizeof(*sockaddr)); + status = kernel_bind(sock, sockaddr, sizeof(*sockaddr)); if (unlikely(status < 0)) { MARS_WRN("bind failed, status = %d\n", status); - sock_release(*sock); - *sock = NULL; goto done; } - status = kernel_listen(*sock, 16); + status = kernel_listen(sock, 16); if (status < 0) { MARS_WRN("listen failed, status = %d\n", status); } } else { - status = kernel_connect(*sock, sockaddr, sizeof(*sockaddr), 0); + status = kernel_connect(sock, sockaddr, sizeof(*sockaddr), 0); if (status < 0) { MARS_DBG("connect failed, status = %d\n", status); } } done: - return status; + if (status < 0) { + mars_put_socket(msock); + msock = ERR_PTR(status); + } + return msock; } EXPORT_SYMBOL_GPL(mars_create_socket); -int mars_send(struct socket **sock, void *buf, int len) +struct mars_socket *mars_accept_socket(struct mars_socket *msock, bool do_block) +{ + int status = -ENOENT; + if (likely(msock)) { + struct mars_socket *new_msock; + struct socket *new_socket = NULL; + + mars_get_socket(msock); + status = kernel_accept(msock->s_socket, &new_socket, do_block ? 0 : O_NONBLOCK); + mars_put_socket(msock); + if (unlikely(status < 0)) { + goto err; + } + + status = -ENOMEM; + new_msock = brick_zmem_alloc(sizeof(struct mars_socket)); + if (!new_msock) { + kernel_sock_shutdown(new_socket, SHUT_WR); + sock_release(new_socket); + goto err; + } + atomic_set(&new_msock->s_count, 1); + new_msock->s_socket = new_socket; + return new_msock; + } +err: + return ERR_PTR(status); +} +EXPORT_SYMBOL_GPL(mars_accept_socket); + +struct mars_socket *mars_get_socket(struct mars_socket *msock) +{ + if (likely(msock)) { + atomic_inc(&msock->s_count); + } + return msock; +} +EXPORT_SYMBOL_GPL(mars_get_socket); + +void mars_put_socket(struct mars_socket *msock) +{ + if (likely(msock)) { + if (atomic_dec_and_test(&msock->s_count)) { + struct socket *sock = msock->s_socket; + if (sock) { + if (!msock->s_dead) { + msock->s_dead = true; + kernel_sock_shutdown(sock, SHUT_WR); + } + sock_release(sock); + } + brick_mem_free(msock); + } + } +} +EXPORT_SYMBOL_GPL(mars_put_socket); + +void mars_shutdown_socket(struct mars_socket *msock) +{ + if (likely(msock)) { + struct socket *sock = msock->s_socket; + if (sock && !msock->s_dead) { + msock->s_dead = true; + kernel_sock_shutdown(sock, SHUT_WR); + } + } +} +EXPORT_SYMBOL_GPL(mars_shutdown_socket); + +bool mars_socket_is_alive(struct mars_socket *msock) +{ + if (!msock || msock->s_dead) + return false; + return true; +} +EXPORT_SYMBOL_GPL(mars_socket_is_alive); + +int mars_send(struct mars_socket *msock, void *buf, int len) { struct kvec iov = { .iov_base = buf, @@ -167,15 +263,18 @@ int mars_send(struct socket **sock, void *buf, int len) int status = -EIDRM; int sent = 0; + if (!mars_get_socket(msock)) + goto done; + //MARS_IO("buf = %p, len = %d\n", buf, len); while (sent < len) { - if (unlikely(!*sock)) { + if (unlikely(msock->s_dead)) { MARS_WRN("socket has disappeared\n"); status = -EIDRM; goto done; } - status = kernel_sendmsg(*sock, &msg, &iov, 1, len); + status = kernel_sendmsg(msock->s_socket, &msg, &iov, 1, len); if (status == -EAGAIN) { msleep(50); @@ -204,12 +303,14 @@ int mars_send(struct socket **sock, void *buf, int len) sent += status; } status = sent; + done: + mars_put_socket(msock); return status; } EXPORT_SYMBOL_GPL(mars_send); -int mars_recv(struct socket **sock, void *buf, int minlen, int maxlen) +int mars_recv(struct mars_socket *msock, void *buf, int minlen, int maxlen) { int status = -EIDRM; int done = 0; @@ -219,6 +320,9 @@ int mars_recv(struct socket **sock, void *buf, int minlen, int maxlen) return -EINVAL; } + if (!mars_get_socket(msock)) + goto err; + while (done < minlen) { struct kvec iov = { .iov_base = buf + done, @@ -230,7 +334,7 @@ int mars_recv(struct socket **sock, void *buf, int minlen, int maxlen) .msg_flags = 0 | MSG_WAITALL | MSG_NOSIGNAL, }; - if (unlikely(!*sock)) { + if (unlikely(msock->s_dead)) { MARS_WRN("socket has disappeared\n"); status = -EIDRM; goto err; @@ -238,7 +342,7 @@ int mars_recv(struct socket **sock, void *buf, int minlen, int maxlen) MARS_IO("done %d, fetching %d bytes\n", done, maxlen-done); - status = kernel_recvmsg(*sock, &msg, &iov, 1, maxlen-done, msg.msg_flags); + status = kernel_recvmsg(msock->s_socket, &msg, &iov, 1, maxlen-done, msg.msg_flags); if (status == -EAGAIN) { #if 0 @@ -249,7 +353,7 @@ int mars_recv(struct socket **sock, void *buf, int minlen, int maxlen) continue; } if (!status) { // EOF - MARS_WRN("got EOF (done=%d, req_size=%d)\n", done, maxlen-done); + MARS_WRN("got EOF from socket (done=%d, req_size=%d)\n", done, maxlen - done); status = -EPIPE; goto err; } @@ -262,6 +366,7 @@ int mars_recv(struct socket **sock, void *buf, int minlen, int maxlen) status = done; err: + mars_put_socket(msock); return status; } EXPORT_SYMBOL_GPL(mars_recv); @@ -284,16 +389,16 @@ struct mars_net_header { u16 h_len; }; -int _mars_send_struct(struct socket **sock, void *data, const struct meta *meta, int *seq) +int _mars_send_struct(struct mars_socket *msock, void *data, const struct meta *meta, int *seq) { int count = 0; int status = 0; - if (!data) { // send EOF + if (!data) { // send EOR struct mars_net_header header = { .h_magic = MARS_NET_MAGIC, .h_seq = -1, }; - return mars_send(sock, &header, sizeof(header)); + return mars_send(msock, &header, sizeof(header)); } for (; ; meta++) { struct mars_net_header header = { @@ -357,7 +462,7 @@ int _mars_send_struct(struct socket **sock, void *data, const struct meta *meta, } MARS_IO("sending header %d '%s' len = %d\n", header.h_seq, header.h_name, len); - status = mars_send(sock, &header, sizeof(header)); + status = mars_send(msock, &header, sizeof(header)); if (status < 0 || !meta->field_name) { // EOR break; } @@ -365,14 +470,14 @@ int _mars_send_struct(struct socket **sock, void *data, const struct meta *meta, switch (meta->field_type) { case FIELD_REF: case FIELD_SUB: - status = _mars_send_struct(sock, item, meta->field_ref, seq); + status = _mars_send_struct(msock, item, meta->field_ref, seq); if (status > 0) count += status; break; default: if (len > 0) { MARS_IO("sending extra %d\n", len); - status = mars_send(sock, item, len); + status = mars_send(msock, item, len); if (status > 0) count++; } @@ -387,14 +492,14 @@ int _mars_send_struct(struct socket **sock, void *data, const struct meta *meta, return status; } -int mars_send_struct(struct socket **sock, void *data, const struct meta *meta) +int mars_send_struct(struct mars_socket *msock, void *data, const struct meta *meta) { int seq = 0; - return _mars_send_struct(sock, data, meta, &seq); + return _mars_send_struct(msock, data, meta, &seq); } EXPORT_SYMBOL_GPL(mars_send_struct); -int _mars_recv_struct(struct socket **sock, void *data, const struct meta *meta, int *seq, int line) +int _mars_recv_struct(struct mars_socket *msock, void *data, const struct meta *meta, int *seq, int line) { int count = 0; int status = -EINVAL; @@ -408,7 +513,7 @@ int _mars_recv_struct(struct socket **sock, void *data, const struct meta *meta, const struct meta *tmp; void *item; void *mem; - status = mars_recv(sock, &header, sizeof(header), sizeof(header)); + status = mars_recv(msock, &header, sizeof(header), sizeof(header)); if (status == -EAGAIN) { msleep(50); continue; @@ -423,7 +528,7 @@ int _mars_recv_struct(struct socket **sock, void *data, const struct meta *meta, status = -ENOMSG; break; } - if (header.h_seq == -1) { // got EOF + if (header.h_seq == -1) { // got EOR status = 0; break; }; @@ -447,7 +552,7 @@ int _mars_recv_struct(struct socket **sock, void *data, const struct meta *meta, status = -ENOMEM; if (!dummy) break; - status = mars_recv(sock, dummy, header.h_len, header.h_len); + status = mars_recv(msock, dummy, header.h_len, header.h_len); brick_mem_free(dummy); if (status < 0) break; @@ -488,7 +593,7 @@ int _mars_recv_struct(struct socket **sock, void *data, const struct meta *meta, } MARS_IO("starting recursive structure\n"); - status = _mars_recv_struct(sock, item, tmp->field_ref, seq, line); + status = _mars_recv_struct(msock, item, tmp->field_ref, seq, line); MARS_IO("ending recursive structure, status = %d\n", status); if (status > 0) @@ -502,10 +607,10 @@ int _mars_recv_struct(struct socket **sock, void *data, const struct meta *meta, break; } MARS_IO("reading extra %d\n", header.h_len); - status = mars_recv(sock, item, header.h_len, header.h_len); + status = mars_recv(msock, item, header.h_len, header.h_len); while (status == -EAGAIN) { msleep(50); - status = mars_recv(sock, item, header.h_len, header.h_len); + status = mars_recv(msock, item, header.h_len, header.h_len); } if (status >= 0) { //MARS_IO("got data len = %d status = %d\n", header.h_len, status); @@ -558,7 +663,7 @@ void mars_check_meta(const struct meta *meta, void *data) } -int mars_send_mref(struct socket **sock, struct mref_object *mref) +int mars_send_mref(struct mars_socket *msock, struct mref_object *mref) { struct mars_cmd cmd = { .cmd_code = CMD_MREF, @@ -566,26 +671,26 @@ int mars_send_mref(struct socket **sock, struct mref_object *mref) }; int status; - status = mars_send_struct(sock, &cmd, mars_cmd_meta); + status = mars_send_struct(msock, &cmd, mars_cmd_meta); if (status < 0) goto done; - status = mars_send_struct(sock, mref, mars_mref_meta); + status = mars_send_struct(msock, mref, mars_mref_meta); if (status < 0) goto done; if (mref->ref_rw) { - status = mars_send(sock, mref->ref_data, mref->ref_len); + status = mars_send(msock, mref->ref_data, mref->ref_len); } done: return status; } EXPORT_SYMBOL_GPL(mars_send_mref); -int mars_recv_mref(struct socket **sock, struct mref_object *mref) +int mars_recv_mref(struct mars_socket *msock, struct mref_object *mref) { int status; - status = mars_recv_struct(sock, mref, mars_mref_meta); + status = mars_recv_struct(msock, mref, mars_mref_meta); if (status < 0) goto done; if (mref->ref_rw) { @@ -595,7 +700,7 @@ int mars_recv_mref(struct socket **sock, struct mref_object *mref) status = -ENOMEM; goto done; } - status = mars_recv(sock, mref->ref_data, mref->ref_len, mref->ref_len); + status = mars_recv(msock, mref->ref_data, mref->ref_len, mref->ref_len); if (status < 0) MARS_WRN("mref_len = %d, status = %d\n", mref->ref_len, status); } @@ -604,32 +709,32 @@ done: } EXPORT_SYMBOL_GPL(mars_recv_mref); -int mars_send_cb(struct socket **sock, struct mref_object *mref) +int mars_send_cb(struct mars_socket *msock, struct mref_object *mref) { struct mars_cmd cmd = { .cmd_code = CMD_CB, .cmd_int1 = mref->ref_id, }; int status; - status = mars_send_struct(sock, &cmd, mars_cmd_meta); + status = mars_send_struct(msock, &cmd, mars_cmd_meta); if (status < 0) goto done; - status = mars_send_struct(sock, mref, mars_mref_meta); + status = mars_send_struct(msock, mref, mars_mref_meta); if (status < 0) goto done; if (!mref->ref_rw) { MARS_IO("sending blocklen = %d\n", mref->ref_len); - status = mars_send(sock, mref->ref_data, mref->ref_len); + status = mars_send(msock, mref->ref_data, mref->ref_len); } done: return status; } EXPORT_SYMBOL_GPL(mars_send_cb); -int mars_recv_cb(struct socket **sock, struct mref_object *mref) +int mars_recv_cb(struct mars_socket *msock, struct mref_object *mref) { int status; - status = mars_recv_struct(sock, mref, mars_mref_meta); + status = mars_recv_struct(msock, mref, mars_mref_meta); if (status < 0) goto done; if (!mref->ref_rw) { @@ -639,7 +744,7 @@ int mars_recv_cb(struct socket **sock, struct mref_object *mref) goto done; } MARS_IO("receiving blocklen = %d\n", mref->ref_len); - status = mars_recv(sock, mref->ref_data, mref->ref_len, mref->ref_len); + status = mars_recv(msock, mref->ref_data, mref->ref_len, mref->ref_len); } done: return status; diff --git a/mars_net.h b/mars_net.h index 70e3d5fa..d837d4f4 100644 --- a/mars_net.h +++ b/mars_net.h @@ -10,6 +10,8 @@ #define MARS_DEFAULT_PORT 7777 +struct mars_socket; // opaque + struct mars_tcp_params { int tcp_timeout; int window_size; @@ -49,29 +51,36 @@ extern char *(*mars_translate_hostname)(const char *name); /* Low-level network traffic */ extern int mars_create_sockaddr(struct sockaddr_storage *addr, const char *spec); -extern int mars_create_socket(struct socket **sock, struct sockaddr_storage *addr, bool is_server); -extern int mars_send(struct socket **sock, void *buf, int len); -extern int mars_recv(struct socket **sock, void *buf, int minlen, int maxlen); + +extern struct mars_socket *mars_create_socket(struct sockaddr_storage *addr, bool is_server); +extern struct mars_socket *mars_accept_socket(struct mars_socket *msock, bool do_block); +extern struct mars_socket *mars_get_socket(struct mars_socket *msock); +extern void mars_put_socket(struct mars_socket *msock); +extern void mars_shutdown_socket(struct mars_socket *msock); +extern bool mars_socket_is_alive(struct mars_socket *msock); + +extern int mars_send(struct mars_socket *msock, void *buf, int len); +extern int mars_recv(struct mars_socket *msock, void *buf, int minlen, int maxlen); /* Mid-level generic field data exchange */ -extern int mars_send_struct(struct socket **sock, void *data, const struct meta *meta); +extern int mars_send_struct(struct mars_socket *msock, void *data, const struct meta *meta); #define mars_recv_struct(_sock_,_data_,_meta_) \ ({ \ int seq = 0; \ _mars_recv_struct(_sock_, _data_, _meta_, &seq, __LINE__); \ }) -extern int _mars_recv_struct(struct socket **sock, void *data, const struct meta *meta, int *seq, int line); +extern int _mars_recv_struct(struct mars_socket *msock, void *data, const struct meta *meta, int *seq, int line); /* High-level transport of mars structures */ -extern int mars_send_dent_list(struct socket **sock, struct list_head *anchor); -extern int mars_recv_dent_list(struct socket **sock, struct list_head *anchor); +extern int mars_send_dent_list(struct mars_socket *msock, struct list_head *anchor); +extern int mars_recv_dent_list(struct mars_socket *msock, struct list_head *anchor); -extern int mars_send_mref(struct socket **sock, struct mref_object *mref); -extern int mars_recv_mref(struct socket **sock, struct mref_object *mref); -extern int mars_send_cb(struct socket **sock, struct mref_object *mref); -extern int mars_recv_cb(struct socket **sock, struct mref_object *mref); +extern int mars_send_mref(struct mars_socket *msock, struct mref_object *mref); +extern int mars_recv_mref(struct mars_socket *msock, struct mref_object *mref); +extern int mars_send_cb(struct mars_socket *msock, struct mref_object *mref); +extern int mars_recv_cb(struct mars_socket *msock, struct mref_object *mref); ///////////////////////////////////////////////////////////////////////// diff --git a/mars_server.c b/mars_server.c index c63f5e58..40c325e8 100644 --- a/mars_server.c +++ b/mars_server.c @@ -18,8 +18,10 @@ #include "mars_server.h" -static struct socket *server_socket = NULL; +static struct mars_socket *server_socket = NULL; static struct task_struct *server_thread = NULL; +static LIST_HEAD(server_list); +static spinlock_t server_lock = SPIN_LOCK_UNLOCKED; ///////////////////////// own helper functions //////////////////////// @@ -28,6 +30,7 @@ static int cb_thread(void *data) { struct server_brick *brick = data; + struct mars_socket *sock = brick->handler_socket; int status = -EINVAL; brick->cb_running = true; @@ -35,17 +38,12 @@ int cb_thread(void *data) MARS_DBG("--------------- cb_thread starting on socket %p\n", brick->handler_socket); - while (!kthread_should_stop()) { + while (!kthread_should_stop() || !list_empty(&brick->cb_read_list) || !list_empty(&brick->cb_write_list)) { struct server_mref_aspect *mref_a; struct mref_object *mref; struct list_head *tmp; - struct socket **sock; unsigned long flags; - - status = -EINVAL; - if (!brick->handler_socket) - break; - + wait_event_interruptible_timeout( brick->cb_event, !list_empty(&brick->cb_read_list) || @@ -53,9 +51,6 @@ int cb_thread(void *data) kthread_should_stop(), 3 * HZ); - if (!brick->handler_socket) - break; - traced_lock(&brick->cb_lock, flags); tmp = brick->cb_write_list.next; if (tmp == &brick->cb_write_list) { @@ -70,27 +65,34 @@ int cb_thread(void *data) mref_a = container_of(tmp, struct server_mref_aspect, cb_head); mref = mref_a->object; + status = -EINVAL; CHECK_PTR(mref, err); - status = -ENOTSOCK; - sock = mref_a->sock; - CHECK_PTR(sock, err); - CHECK_PTR(*sock, err); down(&brick->socket_sem); status = mars_send_cb(sock, mref); up(&brick->socket_sem); + err: + if (unlikely(status < 0)) { + MARS_WRN("cannot send response, status = %d\n", status); +#if 0 // THINK: not sure whether we need this at all. The _client_ should be responsible for resending any lost operations. Disable this for the next future. + traced_lock(&brick->cb_lock, flags); + if (mref->ref_rw) { + list_add(tmp, &brick->cb_write_list); + } else { + list_add(tmp, &brick->cb_read_list); + } + traced_unlock(&brick->cb_lock, flags); + continue; +#else + mars_shutdown_socket(sock); +#endif + } + atomic_dec(&brick->in_flight); GENERIC_INPUT_CALL(brick->inputs[0], mref_put, mref); - - if (unlikely(status < 0)) { - MARS_ERR("cannot send response, status = %d\n", status); - kernel_sock_shutdown(*sock, SHUT_WR); - break; - } } -err: brick->cb_running = false; MARS_DBG("---------- cb_thread terminating, status = %d\n", status); wake_up_interruptible(&brick->startup_event); @@ -108,11 +110,15 @@ void server_endio(struct generic_callback *cb) mref_a = cb->cb_private; CHECK_PTR(mref_a, err); - brick = mref_a->brick; - CHECK_PTR(brick, err); mref = mref_a->object; CHECK_PTR(mref, err); + brick = mref_a->brick; + if (!brick) { + MARS_WRN("late IO callback -- cannot do anything\n"); + return; + } + rw = mref->ref_rw; traced_lock(&brick->cb_lock, flags); @@ -129,13 +135,13 @@ err: MARS_FAT("cannot handle callback - giving up\n"); } -int server_io(struct server_brick *brick, struct socket **sock) +int server_io(struct server_brick *brick, struct mars_socket *sock) { struct mref_object *mref; struct server_mref_aspect *mref_a; int status = -ENOTRECOVERABLE; - if (!brick->cb_running) + if (!brick->cb_running || !mars_socket_is_alive(sock)) goto done; mref = server_alloc_mref(&brick->hidden_output, &brick->mref_object_layout); @@ -156,7 +162,6 @@ int server_io(struct server_brick *brick, struct socket **sock) } mref_a->brick = brick; - mref_a->sock = sock; mref->_ref_cb.cb_private = mref_a; mref->_ref_cb.cb_fn = server_endio; mref->ref_cb = &mref->_ref_cb; @@ -190,6 +195,7 @@ void _clean_list(struct server_brick *brick, struct list_head *start) list_del_init(tmp); mref_a = container_of(tmp, struct server_mref_aspect, cb_head); + mref_a->brick = NULL; mref = mref_a->object; if (!mref) continue; @@ -198,22 +204,31 @@ void _clean_list(struct server_brick *brick, struct list_head *start) } } +static +struct task_struct *_grab_handler(struct server_brick *brick) +{ + struct task_struct *res; + spin_lock(&server_lock); + list_del_init(&brick->server_link); + res = brick->handler_thread; + brick->handler_thread = NULL; + spin_unlock(&server_lock); + return res; +} + static int handler_thread(void *data) { struct server_brick *brick = data; - struct socket **sock = &brick->handler_socket; + struct mars_socket *sock = brick->handler_socket; struct task_struct *cb_thread = brick->cb_thread; - int max_round = 300; + struct task_struct *h_thread; int status = 0; brick->cb_thread = NULL; - brick->handler_thread = NULL; wake_up_interruptible(&brick->startup_event); - MARS_DBG("--------------- handler_thread starting on socket %p\n", *sock); - if (!*sock) - goto done; + MARS_DBG("--------------- handler_thread starting on socket %p\n", sock); //fake_mm(); while (brick->cb_running && !kthread_should_stop()) { @@ -221,7 +236,7 @@ int handler_thread(void *data) status = mars_recv_struct(sock, &cmd, mars_cmd_meta); if (status < 0) { - MARS_ERR("bad command status = %d\n", status); + MARS_WRN("bad command status = %d\n", status); break; } @@ -266,7 +281,7 @@ int handler_thread(void *data) up(&brick->socket_sem); if (status < 0) { - MARS_ERR("could not send dentry information, status = %d\n", status); + MARS_WRN("could not send dentry information, status = %d\n", status); } break; } @@ -277,10 +292,14 @@ int handler_thread(void *data) status = -EINVAL; CHECK_PTR(path, err); - CHECK_PTR(mars_global, err); + CHECK_PTR_NULL(mars_global, err); CHECK_PTR(_bio_brick_type, err); - //prev = mars_find_brick(mars_global, NULL, cmd.cmd_str1); + if (!mars_global->global_power.button) { + MARS_WRN("system is not alive\n"); + goto err; + } + prev = make_brick_all( mars_global, NULL, @@ -321,38 +340,48 @@ int handler_thread(void *data) break; } - kernel_sock_shutdown(*sock, SHUT_WR); - //sock_release(*sock); - //cleanup_mm(); + mars_shutdown_socket(sock); -done: MARS_DBG("handler_thread terminating, status = %d\n", status); - MARS_INF("stopping thread...\n"); - kthread_stop(cb_thread); - wait_event_interruptible_timeout( - brick->startup_event, - !brick->cb_running, - 10 * HZ); + if (cb_thread) { + MARS_INF("stopping cb thread...\n"); + kthread_stop(cb_thread); + wait_event_interruptible_timeout( + brick->startup_event, + !brick->cb_running, + 10 * HZ); + put_task_struct(cb_thread); + } _clean_list(brick, &brick->cb_read_list); _clean_list(brick, &brick->cb_write_list); - do { - int status = mars_kill_brick((void*)brick); - if (status >= 0) { - //if(*sock) - //sock_release(*sock); - break; - } - if (status >= 0 || max_round-- < 0) { - MARS_INF("not dead, giving up....\n"); - break; - } - msleep(1000); - } while (brick->cb_running && !brick->power.led_off); + MARS_DBG("cleaning up...\n"); - MARS_DBG("done\n"); - return 0; + h_thread = _grab_handler(brick); + + mars_put_socket(sock); + + /* Normally, the brick should be shut down from outside. + * In case the handler thread stops abnormally (e.g. + * shutdown of socket etc), it has to cleanup itself. + * This is an exception to the basic rule of instance orientation + * that execution logic should be cleanly separated from strategy + * logic. + * So be careful, avoid races by use of _grab_handler(). + */ + if (h_thread) { + int status; + MARS_DBG("self cleanup...\n"); + status = mars_kill_brick((void*)brick); + if (status < 0) { + BRICK_ERR("kill status = %d, giving up\n", status); + } + put_task_struct(h_thread); + } + + MARS_DBG("done.\n"); + return status; } ////////////////// own brick / input / output operations ////////////////// @@ -383,17 +412,61 @@ static void server_ref_io(struct server_output *output, struct mref_object *mref static int server_switch(struct server_brick *brick) { + int status = 0; if (brick->power.button) { + static int version = 0; + struct task_struct *thread; + mars_power_led_off((void*)brick, false); MARS_INF("starting....."); - mars_power_led_on((void*)brick, true); + spin_lock(&server_lock); + list_add(&brick->server_link, &server_list); + spin_unlock(&server_lock); + + thread = kthread_create(cb_thread, brick, "mars_cb%d", version); + if (IS_ERR(thread)) { + status = PTR_ERR(thread); + MARS_ERR("cannot create cb thread, status = %ld\n", status); + goto err; + } + get_task_struct(thread); + brick->cb_thread = thread; + wake_up_process(thread); + + thread = kthread_create(handler_thread, brick, "mars_handler%d", version++); + if (IS_ERR(thread)) { + status = PTR_ERR(thread); + MARS_ERR("cannot create handler thread, status = %ld\n", status); + kthread_stop(brick->cb_thread); + goto err; + } + get_task_struct(thread); + brick->handler_thread = thread; + wake_up_process(thread); + + wait_event_interruptible(brick->startup_event, brick->cb_thread == NULL); + + err: + if (status >= 0) { + mars_power_led_on((void*)brick, true); + } } else { + struct task_struct *thread; mars_power_led_on((void*)brick, false); - mars_power_led_off((void*)brick, true); + thread = _grab_handler(brick); + if (thread) { + brick->handler_thread = NULL; + MARS_INF("stopping handler thread....\n"); + kthread_stop(thread); + put_task_struct(thread); + } else { + MARS_WRN("handler thread does not exist\n"); + mars_power_led_off((void*)brick, true); + } } - return 0; + return status; } //////////////// object / aspect constructors / destructors /////////////// @@ -419,6 +492,7 @@ static int server_brick_construct(struct server_brick *brick) { struct server_output *hidden = &brick->hidden_output; _server_output_init(brick, hidden, "internal"); + INIT_LIST_HEAD(&brick->server_link); init_waitqueue_head(&brick->startup_event); init_waitqueue_head(&brick->cb_event); sema_init(&brick->socket_sem, 1); @@ -490,7 +564,6 @@ EXPORT_SYMBOL_GPL(server_brick_type); static int _server_thread(void *data) { char *id = my_id(); - int version = 0; int status = 0; //fake_mm(); @@ -498,80 +571,91 @@ static int _server_thread(void *data) MARS_INF("-------- server starting on host '%s' ----------\n", id); while (!kthread_should_stop()) { - int size; struct server_brick *brick; - struct task_struct *thread; - struct socket *new_socket = NULL; - int status; - status = kernel_accept(server_socket, &new_socket, O_NONBLOCK); - if (status < 0) { + struct mars_socket *new_socket; + + new_socket = mars_accept_socket(server_socket, false); + if (IS_ERR(new_socket)) { + status = PTR_ERR(new_socket); + new_socket = NULL; msleep(500); if (status == -EAGAIN) continue; // without error message - MARS_ERR("accept status = %d\n", status); - continue; - } - if (!new_socket) { - MARS_ERR("got no socket\n"); - msleep(3000); + MARS_WRN("accept status = %d\n", status); + msleep(4000); continue; } + MARS_DBG("got new connection %p\n", new_socket); /* TODO: check authorization. */ - size = server_brick_type.brick_size + - (server_brick_type.max_inputs + server_brick_type.max_outputs) * sizeof(void*) + - sizeof(struct server_input), - - brick = brick_zmem_alloc(size); + if (!mars_global || !mars_global->global_power.button) { + MARS_WRN("system is not alive\n"); + goto err; + } +#if 1 + brick = (void*)mars_make_brick(mars_global, NULL, &server_brick_type, "test", "test"); + if (!brick) { + MARS_ERR("cannot create server instance\n"); + goto err; + } +#else // old code, remove ASAP + { + int size = server_brick_type.brick_size + + (server_brick_type.max_inputs + server_brick_type.max_outputs) * sizeof(void*) + + sizeof(struct server_input), + brick = brick_zmem_alloc(size); + } if (!brick) { MARS_ERR("cannot allocate server instance\n"); goto err; } status = generic_brick_init_full(brick, size, (void*)&server_brick_type, NULL, NULL, NULL); - if (status) { + if (status < 0) { MARS_ERR("cannot init server brick, status = %d\n", status); goto err; } - +#endif brick->handler_socket = new_socket; - thread = kthread_create(cb_thread, brick, "mars_cb%d", version); - if (IS_ERR(thread)) { - MARS_ERR("cannot create cb thread, status = %ld\n", PTR_ERR(thread)); + brick->power.button = true; + status = server_switch(brick); + if (status < 0) { + MARS_ERR("cannot switch on server brick, status = %d\n", status); goto err; } - brick->cb_thread = thread; - wake_up_process(thread); - thread = kthread_create(handler_thread, brick, "mars_handler%d", version++); - if (IS_ERR(thread)) { - MARS_ERR("cannot create handler thread, status = %ld\n", PTR_ERR(thread)); - goto err; - } - brick->handler_thread = thread; - wake_up_process(thread); - - wait_event_interruptible(brick->startup_event, brick->handler_thread == NULL && brick->cb_thread == NULL); continue; err: if (new_socket) { - kernel_sock_shutdown(new_socket, SHUT_WR); - //sock_release(new_socket); + mars_put_socket(new_socket); } msleep(3000); } MARS_INF("-------- cleaning up ----------\n"); + spin_lock(&server_lock); + while (!list_empty(&server_list)) { + struct list_head *tmp = server_list.next; + struct server_brick *brick = container_of(tmp, struct server_brick, server_link); + list_del_init(tmp); + spin_unlock(&server_lock); + + brick->power.button = false; + status = server_switch(brick); + + spin_lock(&server_lock); + } + spin_unlock(&server_lock); + //cleanup_mm(); MARS_INF("-------- done status = %d ----------\n", status); - server_thread = NULL; return status; } @@ -585,22 +669,22 @@ int __init init_mars_server(void) MARS_INF("init_server()\n"); -#if 0 +#if 1 status = mars_create_sockaddr(&sockaddr, ""); if (status < 0) return status; - status = mars_create_socket(&server_socket, &sockaddr, true); - if (status < 0) - return status; - - status = kernel_listen(server_socket, 100); - if (status < 0) + server_socket = mars_create_socket(&sockaddr, true); + if (unlikely(IS_ERR(server_socket))) { + status = PTR_ERR(server_socket); + server_socket = NULL; return status; + } thread = kthread_create(_server_thread, NULL, "mars_server"); if (IS_ERR(thread)) { - return PTR_ERR(thread); + status = PTR_ERR(thread); + return status; } get_task_struct(thread); @@ -617,12 +701,12 @@ void __exit exit_mars_server(void) server_unregister_brick_type(); if (server_thread) { if (server_socket) { - kernel_sock_shutdown(server_socket, SHUT_WR); + mars_shutdown_socket(server_socket); } - MARS_INF("stopping thread...\n"); + MARS_INF("stopping server thread...\n"); kthread_stop(server_thread); if (server_socket) { - //sock_release(server_socket); + mars_put_socket(server_socket); server_socket = NULL; } put_task_struct(server_thread); diff --git a/mars_server.h b/mars_server.h index d4ef3933..2f85ce0f 100644 --- a/mars_server.h +++ b/mars_server.h @@ -6,14 +6,9 @@ #include "mars_net.h" -//extern struct socket *server_socket; -//extern struct task_struct *server_thread; -//extern wait_queue_head_t server_event; - struct server_mref_aspect { GENERIC_ASPECT(mref); struct server_brick *brick; - struct socket **sock; struct list_head cb_head; }; @@ -23,9 +18,10 @@ struct server_output { struct server_brick { MARS_BRICK(server); + struct list_head server_link; atomic_t in_flight; - struct socket *handler_socket; struct semaphore socket_sem; + struct mars_socket *handler_socket; struct task_struct *handler_thread; struct task_struct *cb_thread; wait_queue_head_t startup_event; diff --git a/mars_trans_logger.c b/mars_trans_logger.c index 72eac96d..443c6e3c 100644 --- a/mars_trans_logger.c +++ b/mars_trans_logger.c @@ -2134,7 +2134,7 @@ void trans_logger_log(struct trans_logger_output *output) #if 1 // provisionary flood handling FIXME: do better delay_callers = (atomic_read(&brick->mshadow_count) > brick->shadow_mem_limit && brick->shadow_mem_limit > 1) || - (atomic64_read(&brick->shadow_mem_used) > mars_global_memlimit && mars_global_memlimit > 1); + (atomic64_read(&brick->shadow_mem_used) > brick_global_memlimit && brick_global_memlimit > 1); if (delay_callers != brick->delay_callers) { //MARS_INF("stalling %d -> %d\n", brick->delay_callers, delay_callers); brick->delay_callers = delay_callers; @@ -2143,7 +2143,7 @@ void trans_logger_log(struct trans_logger_output *output) if (unlimited) { unlimited = (atomic_read(&brick->mshadow_count) > brick->shadow_mem_limit * 3 / 8 && brick->shadow_mem_limit > 1) || - (atomic64_read(&brick->shadow_mem_used) > mars_global_memlimit * 3 / 8 && mars_global_memlimit > 1); + (atomic64_read(&brick->shadow_mem_used) > brick_global_memlimit * 3 / 8 && brick_global_memlimit > 1); if (!unlimited) { brick->q_phase2.q_unlimited = false; brick->q_phase3.q_unlimited = false; @@ -2155,7 +2155,7 @@ void trans_logger_log(struct trans_logger_output *output) } else { unlimited = (atomic_read(&brick->mshadow_count) > brick->shadow_mem_limit / 2 && brick->shadow_mem_limit > 1) || - (atomic64_read(&brick->shadow_mem_used) > mars_global_memlimit / 2 && mars_global_memlimit > 1); + (atomic64_read(&brick->shadow_mem_used) > brick_global_memlimit / 2 && brick_global_memlimit > 1); if (unlimited) { brick->q_phase2.q_unlimited = unlimited; brick->q_phase3.q_unlimited = unlimited; @@ -2519,7 +2519,7 @@ char *trans_logger_statistics(struct trans_logger_brick *brick, int verbose) brick->do_replay, brick->do_continuous_replay, brick->replay_code, brick->log_reads, brick->log_start_pos, brick->replay_start_pos, brick->replay_end_pos, brick->current_pos, atomic_read(&brick->total_replay_count), atomic_read(&brick->total_cb_count), atomic_read(&brick->total_read_count), atomic_read(&brick->total_write_count), atomic_read(&brick->total_flush_count), atomic_read(&brick->total_write_count) ? atomic_read(&brick->total_flush_count) * 100 / atomic_read(&brick->total_write_count) : 0, atomic_read(&brick->total_writeback_cluster_count), atomic_read(&brick->total_writeback_count), atomic_read(&brick->total_writeback_cluster_count) ? atomic_read(&brick->total_writeback_count) * 100 / atomic_read(&brick->total_writeback_cluster_count) : 0, atomic_read(&brick->total_shortcut_count), atomic_read(&brick->total_writeback_count) ? atomic_read(&brick->total_shortcut_count) * 100 / atomic_read(&brick->total_writeback_count) : 0, atomic_read(&brick->total_mshadow_count), atomic_read(&brick->total_sshadow_count), atomic_read(&brick->total_round_count), atomic_read(&brick->total_restart_count), atomic_read(&brick->q_phase1.q_total), atomic_read(&brick->q_phase2.q_total), atomic_read(&brick->q_phase3.q_total), atomic_read(&brick->q_phase4.q_total), - atomic64_read(&brick->shadow_mem_used), mars_global_memlimit, atomic_read(&brick->replay_count), atomic_read(&brick->mshadow_count), brick->shadow_mem_limit, atomic_read(&brick->sshadow_count), atomic_read(&brick->hash_count), atomic_read(&brick->pos_count), atomic_read(&brick->sub_balance_count), atomic_read(&brick->inner_balance_count), atomic_read(&brick->outer_balance_count), atomic_read(&brick->wb_balance_count), atomic_read(&brick->fly_count), atomic_read(&brick->q_phase1.q_queued), atomic_read(&brick->q_phase1.q_flying), atomic_read(&brick->q_phase2.q_queued), atomic_read(&brick->q_phase2.q_flying), atomic_read(&brick->q_phase3.q_queued), atomic_read(&brick->q_phase3.q_flying), atomic_read(&brick->q_phase4.q_queued), atomic_read(&brick->q_phase4.q_flying)); + atomic64_read(&brick->shadow_mem_used), brick_global_memlimit, atomic_read(&brick->replay_count), atomic_read(&brick->mshadow_count), brick->shadow_mem_limit, atomic_read(&brick->sshadow_count), atomic_read(&brick->hash_count), atomic_read(&brick->pos_count), atomic_read(&brick->sub_balance_count), atomic_read(&brick->inner_balance_count), atomic_read(&brick->outer_balance_count), atomic_read(&brick->wb_balance_count), atomic_read(&brick->fly_count), atomic_read(&brick->q_phase1.q_queued), atomic_read(&brick->q_phase1.q_flying), atomic_read(&brick->q_phase2.q_queued), atomic_read(&brick->q_phase2.q_flying), atomic_read(&brick->q_phase3.q_queued), atomic_read(&brick->q_phase3.q_flying), atomic_read(&brick->q_phase4.q_queued), atomic_read(&brick->q_phase4.q_flying)); return res; } diff --git a/sy_old/mars_light.c b/sy_old/mars_light.c index bca8707f..41ddac92 100644 --- a/sy_old/mars_light.c +++ b/sy_old/mars_light.c @@ -5,6 +5,16 @@ //#define IO_DEBUGGING //#define STAT_DEBUGGING // here means: display full statistics +// disable this only for debugging! +#define RUN_PEERS +#define RUN_DATA +#define RUN_LOGINIT +#define RUN_PRIMARY +#define RUN_SYNCSTATUS +#define RUN_LOGFILES +#define RUN_REPLAY +#define RUN_DEVICE + #include #include #include @@ -491,7 +501,7 @@ struct mars_peerinfo { struct mars_global *global; char *peer; char *path; - struct socket *socket; + struct mars_socket *socket; struct task_struct *peer_thread; spinlock_t lock; struct list_head remote_dent_list; @@ -789,13 +799,30 @@ int run_bones(struct mars_peerinfo *peer) // remote working infrastructure static -void _peer_cleanup(struct mars_peerinfo *peer) +void _peer_cleanup(struct mars_peerinfo *peer, bool do_stop) { + unsigned long flags; + MARS_DBG("\n"); if (peer->socket) { - kernel_sock_shutdown(peer->socket, SHUT_WR); + mars_shutdown_socket(peer->socket); + } + if (do_stop && peer->peer_thread) { + kthread_stop(peer->peer_thread); + put_task_struct(peer->peer_thread); + peer->peer_thread = NULL; + } + if (peer->socket) { + mars_put_socket(peer->socket); peer->socket = NULL; } - //... + + if (do_stop) { + traced_lock(&peer->lock, flags); + mars_free_dent_all(&peer->remote_dent_list); + traced_unlock(&peer->lock, flags); + brick_string_free(peer->peer); + brick_string_free(peer->path); + } } static @@ -831,8 +858,9 @@ int remote_thread(void *data) }; if (!peer->socket) { - status = mars_create_socket(&peer->socket, &sockaddr, false); - if (unlikely(status < 0)) { + peer->socket = mars_create_socket(&sockaddr, false); + if (unlikely(IS_ERR(peer->socket))) { + status = PTR_ERR(peer->socket); peer->socket = NULL; MARS_INF("no connection to '%s'\n", real_peer); msleep(5000); @@ -842,18 +870,18 @@ int remote_thread(void *data) continue; } - status = mars_send_struct(&peer->socket, &cmd, mars_cmd_meta); + status = mars_send_struct(peer->socket, &cmd, mars_cmd_meta); if (unlikely(status < 0)) { - MARS_ERR("communication error on send, status = %d\n", status); - _peer_cleanup(peer); + MARS_WRN("communication error on send, status = %d\n", status); + _peer_cleanup(peer, false); msleep(2000); continue; } - status = mars_recv_dent_list(&peer->socket, &tmp_list); + status = mars_recv_dent_list(peer->socket, &tmp_list); if (unlikely(status < 0)) { - MARS_ERR("communication error on receive, status = %d\n", status); - _peer_cleanup(peer); + MARS_WRN("communication error on receive, status = %d\n", status); + _peer_cleanup(peer, false); msleep(5000); continue; } @@ -879,11 +907,10 @@ int remote_thread(void *data) MARS_INF("-------- remote thread terminating\n"); - _peer_cleanup(peer); + _peer_cleanup(peer, false); done: //cleanup_mm(); - peer->peer_thread = NULL; brick_string_free(real_peer); return 0; } @@ -903,17 +930,11 @@ static int _kill_peer(void *buf, struct mars_dent *dent) if (!peer) { return 0; } - MARS_DBG("killing peer thread....\n"); - if (!peer->peer_thread) { - MARS_ERR("oops, remote thread is not running - doing cleanup myself\n"); - _peer_cleanup(peer); - dent->d_private = NULL; - return -1; - } - MARS_INF("stopping thread...\n"); - kthread_stop(peer->peer_thread); + MARS_INF("stopping peer thread...\n"); + _peer_cleanup(peer, true); dent->d_private = NULL; + brick_mem_free(peer); return 0; } @@ -943,8 +964,8 @@ static int _make_peer(struct mars_global *global, struct mars_dent *dent, char * peer = dent->d_private; peer->global = global; - peer->peer = mypeer; - peer->path = path; + peer->peer = brick_strdup(mypeer); + peer->path = brick_strdup(path); peer->maxdepth = 2; spin_lock_init(&peer->lock); INIT_LIST_HEAD(&peer->remote_dent_list); @@ -960,6 +981,7 @@ static int _make_peer(struct mars_global *global, struct mars_dent *dent, char * return -1; } MARS_DBG("starting peer thread\n"); + get_task_struct(peer->peer_thread); wake_up_process(peer->peer_thread); } @@ -1542,7 +1564,7 @@ int _check_logging_status(struct mars_rotate *rot, long long *oldpos_start, long parent = dent->d_parent; CHECK_PTR(parent, done); global = rot->global; - CHECK_PTR(global, done); + CHECK_PTR_NULL(global, done); CHECK_PTR(rot->replay_link, done); CHECK_PTR(rot->aio_brick, done); @@ -1616,7 +1638,7 @@ int _make_logging_status(struct mars_rotate *rot) parent = dent->d_parent; CHECK_PTR(parent, done); global = rot->global; - CHECK_PTR(global, done); + CHECK_PTR_NULL(global, done); status = 0; trans_brick = rot->trans_brick; @@ -2402,10 +2424,10 @@ static const struct light_class light_classes[] = { .cl_len = 3, .cl_type = 'l', .cl_father = CL_IPS, -#if 1 +#ifdef RUN_PEERS .cl_forward = make_scan, - .cl_backward = kill_scan, #endif + .cl_backward = kill_scan, }, /* Directory containing all items of a resource @@ -2415,8 +2437,6 @@ static const struct light_class light_classes[] = { .cl_len = 9, .cl_type = 'd', .cl_father = CL_ROOT, - .cl_forward = NULL, - .cl_backward = NULL, }, /* Subdirectory for defaults... @@ -2427,8 +2447,6 @@ static const struct light_class light_classes[] = { .cl_type = 'd', .cl_hostcontext = false, .cl_father = CL_RESOURCE, - .cl_forward = NULL, - .cl_backward = NULL, }, [CL_DEFAULTS] = { .cl_name = "defaults-", @@ -2436,8 +2454,6 @@ static const struct light_class light_classes[] = { .cl_type = 'd', .cl_hostcontext = false, .cl_father = CL_RESOURCE, - .cl_forward = NULL, - .cl_backward = NULL, }, /* ... and its contents */ @@ -2446,16 +2462,12 @@ static const struct light_class light_classes[] = { .cl_len = 0, // catch any .cl_type = 'l', .cl_father = CL_DEFAULTS0, - .cl_forward = NULL, - .cl_backward = NULL, }, [CL_DEFAULTS_ITEMS] = { .cl_name = "", .cl_len = 0, // catch any .cl_type = 'l', .cl_father = CL_DEFAULTS, - .cl_forward = NULL, - .cl_backward = NULL, }, /* Subdirectory for controlling items... @@ -2466,8 +2478,6 @@ static const struct light_class light_classes[] = { .cl_type = 'd', .cl_hostcontext = false, .cl_father = CL_RESOURCE, - .cl_forward = NULL, - .cl_backward = NULL, }, /* ... and its contents */ @@ -2476,8 +2486,6 @@ static const struct light_class light_classes[] = { .cl_len = 0, // catch any .cl_type = 'l', .cl_father = CL_SWITCH, - .cl_forward = NULL, - .cl_backward = NULL, }, /* Subdirectory for actual state @@ -2488,8 +2496,6 @@ static const struct light_class light_classes[] = { .cl_type = 'd', .cl_hostcontext = false, .cl_father = CL_RESOURCE, - .cl_forward = NULL, - .cl_backward = NULL, }, /* ... and its contents */ @@ -2498,8 +2504,6 @@ static const struct light_class light_classes[] = { .cl_len = 0, // catch any .cl_type = 'l', .cl_father = CL_ACTUAL, - .cl_forward = NULL, - .cl_backward = NULL, }, @@ -2511,8 +2515,6 @@ static const struct light_class light_classes[] = { .cl_type = 'l', .cl_hostcontext = false, // not used here .cl_father = CL_RESOURCE, - .cl_forward = NULL, - .cl_backward = NULL, }, /* Symlink indicating the necessary length of logfile replay */ @@ -2522,8 +2524,6 @@ static const struct light_class light_classes[] = { .cl_type = 'l', .cl_hostcontext = false, // not used here .cl_father = CL_RESOURCE, - .cl_forward = NULL, - .cl_backward = NULL, }, /* File or symlink to the real device / real (sparse) file * when hostcontext is missing, the corresponding peer will @@ -2535,7 +2535,9 @@ static const struct light_class light_classes[] = { .cl_type = 'F', .cl_hostcontext = true, .cl_father = CL_RESOURCE, +#ifdef RUN_DATA .cl_forward = make_bio, +#endif .cl_backward = kill_any, }, /* Symlink indicating the (common) size of the resource @@ -2546,7 +2548,9 @@ static const struct light_class light_classes[] = { .cl_type = 'l', .cl_hostcontext = false, .cl_father = CL_RESOURCE, +#ifdef RUN_LOGINIT .cl_forward = make_log_init, +#endif .cl_backward = kill_any, }, /* Symlink pointing to the name of the primary node @@ -2557,7 +2561,9 @@ static const struct light_class light_classes[] = { .cl_type = 'l', .cl_hostcontext = false, .cl_father = CL_RESOURCE, +#ifdef RUN_PRIMARY .cl_forward = make_primary, +#endif .cl_backward = NULL, }, /* Only for testing: open local file @@ -2581,7 +2587,9 @@ static const struct light_class light_classes[] = { .cl_type = 'l', .cl_hostcontext = true, .cl_father = CL_RESOURCE, +#ifdef RUN_SYNCSTATUS .cl_forward = make_sync, +#endif .cl_backward = kill_any, }, /* Only for testing: make a copy instance @@ -2618,8 +2626,6 @@ static const struct light_class light_classes[] = { .cl_serial = true, .cl_hostcontext = false, .cl_father = CL_RESOURCE, - .cl_forward = NULL, - .cl_backward = NULL, }, /* Logfiles for transaction logger */ @@ -2630,10 +2636,10 @@ static const struct light_class light_classes[] = { .cl_serial = true, .cl_hostcontext = true, .cl_father = CL_RESOURCE, -#if 1 +#ifdef RUN_LOGFILES .cl_forward = make_log_step, - .cl_backward = kill_any, #endif + .cl_backward = kill_any, }, /* Symlink indicating the last state of * transaction log replay. @@ -2644,7 +2650,9 @@ static const struct light_class light_classes[] = { .cl_type = 'l', .cl_hostcontext = true, .cl_father = CL_RESOURCE, +#ifdef RUN_REPLAY .cl_forward = make_replay, +#endif .cl_backward = kill_any, }, @@ -2656,7 +2664,9 @@ static const struct light_class light_classes[] = { .cl_type = 'l', .cl_hostcontext = true, .cl_father = CL_RESOURCE, +#ifdef RUN_DEVICE .cl_forward = make_dev, +#endif .cl_backward = kill_any, }, {} @@ -2859,6 +2869,37 @@ void _show_status(struct mars_global *global) } #ifdef STAT_DEBUGGING +static +void _show_one(struct mars_brick *test, int *brick_count) +{ + int i; + if (*brick_count) { + MARS_STAT("---------\n"); + } + MARS_STAT("BRICK type = %s path = '%s' name = '%s' level = %d button = %d off = %d on = %d\n", SAFE_STR(test->type->type_name), SAFE_STR(test->brick_path), SAFE_STR(test->brick_name), test->status_level, test->power.button, test->power.led_off, test->power.led_on); + (*brick_count)++; + if (test->ops && test->ops->brick_statistics) { + char *info = test->ops->brick_statistics(test, 0); + if (info) { + MARS_STAT(" %s", info); + brick_string_free(info); + } + } + for (i = 0; i < test->nr_inputs; i++) { + struct mars_input *input = test->inputs[i]; + struct mars_output *output = input ? input->connect : NULL; + if (output) { + MARS_STAT(" input %d connected with %s path = '%s' name = '%s'\n", i, SAFE_STR(output->brick->type->type_name), SAFE_STR(output->brick->brick_path), SAFE_STR(output->brick->brick_name)); + } else { + MARS_STAT(" input %d not connected\n", i); + } + } + for (i = 0; i < test->nr_outputs; i++) { + struct mars_output *output = test->outputs[i]; + MARS_STAT(" output %d nr_connected = %d\n", i, output->nr_connected); + } +} + static void _show_statist(struct mars_global *global) { @@ -2868,37 +2909,18 @@ void _show_statist(struct mars_global *global) brick_mem_statistics(); - MARS_STAT("================================== bricks:\n"); down_read(&global->brick_mutex); + MARS_STAT("================================== ordinary bricks:\n"); for (tmp = global->brick_anchor.next; tmp != &global->brick_anchor; tmp = tmp->next) { struct mars_brick *test; - int i; test = container_of(tmp, struct mars_brick, global_brick_link); - if (brick_count) { - MARS_STAT("---------\n"); - } - MARS_STAT("BRICK type = %s path = '%s' name = '%s' level = %d button = %d off = %d on = %d\n", test->type->type_name, test->brick_path, test->brick_name, test->status_level, test->power.button, test->power.led_off, test->power.led_on); - brick_count++; - if (test->ops && test->ops->brick_statistics) { - char *info = test->ops->brick_statistics(test, 0); - if (info) { - MARS_STAT(" %s", info); - brick_string_free(info); - } - } - for (i = 0; i < test->nr_inputs; i++) { - struct mars_input *input = test->inputs[i]; - struct mars_output *output = input ? input->connect : NULL; - if (output) { - MARS_STAT(" input %d connected with %s path = '%s' name = '%s'\n", i, output->brick->type->type_name, output->brick->brick_path, output->brick->brick_name); - } else { - MARS_STAT(" input %d not connected\n", i); - } - } - for (i = 0; i < test->nr_outputs; i++) { - struct mars_output *output = test->outputs[i]; - MARS_STAT(" output %d nr_connected = %d\n", i, output->nr_connected); - } + _show_one(test, &brick_count); + } + MARS_STAT("================================== server bricks:\n"); + for (tmp = global->server_anchor.next; tmp != &global->server_anchor; tmp = tmp->next) { + struct mars_brick *test; + test = container_of(tmp, struct mars_brick, global_brick_link); + _show_one(test, &brick_count); } up_read(&global->brick_mutex); @@ -2908,12 +2930,12 @@ void _show_statist(struct mars_global *global) struct mars_dent *dent; struct list_head *sub; dent = container_of(tmp, struct mars_dent, dent_link); - MARS_STAT("dent %d '%s' '%s' stamp=%ld.%09ld\n", dent->d_class, dent->d_path, dent->new_link ? dent->new_link : "", dent->new_stat.mtime.tv_sec, dent->new_stat.mtime.tv_nsec); + MARS_STAT("dent %d '%s' '%s' stamp=%ld.%09ld\n", dent->d_class, SAFE_STR(dent->d_path), SAFE_STR(dent->new_link), dent->new_stat.mtime.tv_sec, dent->new_stat.mtime.tv_nsec); dent_count++; for (sub = dent->brick_list.next; sub != &dent->brick_list; sub = sub->next) { struct mars_brick *test; test = container_of(sub, struct mars_brick, dent_brick_link); - MARS_STAT(" owner of brick '%s'\n", test->brick_path); + MARS_STAT(" owner of brick '%s'\n", SAFE_STR(test->brick_path)); } } up_read(&global->dent_mutex); @@ -2922,21 +2944,23 @@ void _show_statist(struct mars_global *global) } #endif +static struct mars_global _global = { + .dent_anchor = LIST_HEAD_INIT(_global.dent_anchor), + .brick_anchor = LIST_HEAD_INIT(_global.brick_anchor), + .server_anchor = LIST_HEAD_INIT(_global.server_anchor), + .global_power = { + .button = true, + }, + .dent_mutex = __RWSEM_INITIALIZER(_global.dent_mutex), + .brick_mutex = __RWSEM_INITIALIZER(_global.brick_mutex), + .main_event = __WAIT_QUEUE_HEAD_INITIALIZER(_global.main_event), +}; + static int light_thread(void *data) { char *id = my_id(); int status = 0; - struct mars_global global = { - .dent_anchor = LIST_HEAD_INIT(global.dent_anchor), - .brick_anchor = LIST_HEAD_INIT(global.brick_anchor), - .global_power = { - .button = true, - }, - .dent_mutex = __RWSEM_INITIALIZER(global.dent_mutex), - .brick_mutex = __RWSEM_INITIALIZER(global.brick_mutex), - .main_event = __WAIT_QUEUE_HEAD_INITIALIZER(global.main_event), - }; - mars_global = &global; // TODO: cleanup, avoid stack + mars_global = &_global; if (!id || strlen(id) < 2) { MARS_ERR("invalid hostname\n"); @@ -2948,32 +2972,38 @@ static int light_thread(void *data) MARS_INF("-------- starting as host '%s' ----------\n", id); - while (global.global_power.button || !list_empty(&global.brick_anchor)) { + while (_global.global_power.button || !list_empty(&_global.brick_anchor)) { int status; - global.global_power.button = !kthread_should_stop(); + _global.global_power.button = !kthread_should_stop(); - status = mars_dent_work(&global, "/mars", sizeof(struct mars_dent), light_checker, light_worker, &global, 3); + if (!_global.global_power.button) { + mars_kill_brick_all(&_global, &_global.server_anchor, false); + } + + status = mars_dent_work(&_global, "/mars", sizeof(struct mars_dent), light_checker, light_worker, &_global, 3); MARS_DBG("worker status = %d\n", status); - _show_status(&global); + _show_status(&_global); #ifdef STAT_DEBUGGING - _show_statist(&global); + _show_statist(&_global); #endif msleep(500); - wait_event_interruptible_timeout(global.main_event, global.main_trigger, 10 * HZ); - global.main_trigger = false; + wait_event_interruptible_timeout(_global.main_event, _global.main_trigger, 10 * HZ); + _global.main_trigger = false; } done: MARS_INF("-------- cleaning up ----------\n"); - mars_free_dent_all(&global.dent_anchor); + mars_kill_brick_all(&_global, &_global.server_anchor, false); + mars_free_dent_all(&_global.dent_anchor); + mars_kill_brick_all(&_global, &_global.brick_anchor, false); - _show_status(&global); + _show_status(&_global); #ifdef STAT_DEBUGGING - _show_statist(&global); + _show_statist(&_global); #endif mars_global = NULL; diff --git a/sy_old/mars_proc.c b/sy_old/mars_proc.c index 4c786d68..5f45672d 100644 --- a/sy_old/mars_proc.c +++ b/sy_old/mars_proc.c @@ -99,7 +99,7 @@ int __init init_mars_proc(void) MARS_INF("init_proc()\n"); -#if 0 +#if 1 header = register_sysctl_table(mars_table); #endif diff --git a/sy_old/strategy.h b/sy_old/strategy.h index 55290c7a..f32ecc49 100644 --- a/sy_old/strategy.h +++ b/sy_old/strategy.h @@ -51,6 +51,7 @@ struct mars_global { struct generic_switch global_power; struct list_head dent_anchor; struct list_head brick_anchor; + struct list_head server_anchor; volatile bool main_trigger; wait_queue_head_t main_event; //void *private; @@ -71,6 +72,7 @@ extern struct mars_brick *mars_find_brick(struct mars_global *global, const void extern struct mars_brick *mars_make_brick(struct mars_global *global, struct mars_dent *belongs, const void *_brick_type, const char *path, const char *name); extern int mars_free_brick(struct mars_brick *brick); extern int mars_kill_brick(struct mars_brick *brick); +extern int mars_kill_brick_all(struct mars_global *global, struct list_head *anchor, bool use_dent_link); // mid-level brick instantiation (identity is based on path strings) diff --git a/sy_old/sy_generic.c b/sy_old/sy_generic.c index 106debe4..8b7b4c88 100644 --- a/sy_old/sy_generic.c +++ b/sy_old/sy_generic.c @@ -636,13 +636,7 @@ EXPORT_SYMBOL_GPL(mars_find_dent); void mars_kill_dent(struct mars_dent *dent) { dent->d_killme = true; - while (!list_empty(&dent->brick_list)) { - struct list_head *tmp = dent->brick_list.next; - struct mars_brick *brick = container_of(tmp, struct mars_brick, dent_brick_link); - list_del_init(tmp); - // note: locking is now done there.... - mars_kill_brick(brick); - } + mars_kill_brick_all(NULL, &dent->brick_list, true); } EXPORT_SYMBOL_GPL(mars_kill_dent); @@ -664,7 +658,7 @@ void mars_free_dent(struct mars_dent *dent) brick_string_free(dent->d_path); brick_string_free(dent->old_link); brick_string_free(dent->new_link); - //brick_mem_free(dent->d_private); + brick_mem_free(dent->d_private); brick_mem_free(dent); } EXPORT_SYMBOL_GPL(mars_free_dent); @@ -757,12 +751,16 @@ int mars_free_brick(struct mars_brick *brick) } } +#ifndef MEMLEAK // TODO: check whether crash remains possible + MARS_DBG("deallocate name = '%s' path = '%s'\n", SAFE_STR(brick->brick_name), SAFE_STR(brick->brick_path)); + brick_string_free(brick->brick_name); + brick_string_free(brick->brick_path); +#endif + status = generic_brick_exit_full((void*)brick); if (status >= 0) { #ifndef MEMLEAK // TODO: check whether crash remains possible - brick_string_free(brick->brick_name); - brick_string_free(brick->brick_path); brick_mem_free(brick); #endif mars_trigger(); @@ -778,7 +776,7 @@ EXPORT_SYMBOL_GPL(mars_free_brick); struct mars_brick *mars_make_brick(struct mars_global *global, struct mars_dent *belongs, const void *_brick_type, const char *path, const char *_name) { const char *name = brick_strdup(_name); - const char *names[] = { name }; + const char *names[] = { name, NULL }; const struct generic_brick_type *brick_type = _brick_type; const struct generic_input_type **input_types; const struct generic_output_type **output_types; @@ -846,9 +844,11 @@ struct mars_brick *mars_make_brick(struct mars_global *global, struct mars_dent * Switching on / etc must be done separately. */ down_write(&global->brick_mutex); - list_add(&res->global_brick_link, &global->brick_anchor); if (belongs) { + list_add(&res->global_brick_link, &global->brick_anchor); list_add(&res->dent_brick_link, &belongs->brick_list); + } else { + list_add(&res->global_brick_link, &global->server_anchor); } up_write(&global->brick_mutex); @@ -871,17 +871,18 @@ int mars_kill_brick(struct mars_brick *brick) CHECK_PTR(brick, done); global = brick->global; - CHECK_PTR(global, done); - MARS_DBG("===> killing brick path = '%s' name = '%s'\n", brick->brick_path, brick->brick_name); + MARS_DBG("===> killing brick path = '%s' name = '%s'\n", SAFE_STR(brick->brick_path), SAFE_STR(brick->brick_name)); - down_write(&global->brick_mutex); - list_del_init(&brick->global_brick_link); - list_del_init(&brick->dent_brick_link); - up_write(&global->brick_mutex); + if (global) { + down_write(&global->brick_mutex); + list_del_init(&brick->global_brick_link); + list_del_init(&brick->dent_brick_link); + up_write(&global->brick_mutex); + } if (unlikely(brick->nr_outputs > 0 && brick->outputs[0] && brick->outputs[0]->nr_connected)) { - MARS_ERR("sorry, output is in use '%s'\n", brick->brick_path); + MARS_ERR("sorry, output is in use '%s'\n", SAFE_STR(brick->brick_path)); goto done; } @@ -894,6 +895,36 @@ done: } EXPORT_SYMBOL_GPL(mars_kill_brick); +int mars_kill_brick_all(struct mars_global *global, struct list_head *anchor, bool use_dent_link) +{ + int status = 0; + if (global) { + down_write(&global->brick_mutex); + } + while (!list_empty(anchor)) { + struct list_head *tmp = anchor->next; + struct mars_brick *brick; + if (use_dent_link) { + brick = container_of(tmp, struct mars_brick, dent_brick_link); + } else { + brick = container_of(tmp, struct mars_brick, global_brick_link); + } + list_del_init(tmp); + if (global) { + up_write(&global->brick_mutex); + } + status |= mars_kill_brick(brick); + if (global) { + down_write(&global->brick_mutex); + } + } + if (global) { + up_write(&global->brick_mutex); + } + return status; +} +EXPORT_SYMBOL_GPL(mars_kill_brick_all); + ///////////////////////////////////////////////////////////////////// @@ -1064,7 +1095,7 @@ struct mars_brick *make_brick_all( prev[i] = mars_find_brick(global, prev_brick_type[i], path); if (!prev[i]) { - MARS_ERR("prev brick '%s' does not exist\n", path); + MARS_WRN("prev brick '%s' does not exist\n", path); goto err; } MARS_DBG("------> predecessor %d path = '%s'\n", i, path); diff --git a/sy_old/sy_net.c b/sy_old/sy_net.c index 701c39a1..2766ec9b 100644 --- a/sy_old/sy_net.c +++ b/sy_old/sy_net.c @@ -38,7 +38,7 @@ done: return brick_strdup(res); } -int mars_send_dent_list(struct socket **sock, struct list_head *anchor) +int mars_send_dent_list(struct mars_socket *sock, struct list_head *anchor) { struct list_head *tmp; struct mars_dent *dent; @@ -49,14 +49,14 @@ int mars_send_dent_list(struct socket **sock, struct list_head *anchor) if (status < 0) break; } - if (status >= 0) { // send EOF + if (status >= 0) { // send EOR status = mars_send_struct(sock, NULL, mars_dent_meta); } return status; } EXPORT_SYMBOL_GPL(mars_send_dent_list); -int mars_recv_dent_list(struct socket **sock, struct list_head *anchor) +int mars_recv_dent_list(struct mars_socket *sock, struct list_head *anchor) { int status; for (;;) {