import mars-127.tgz

This commit is contained in:
Thomas Schoebel-Theuer 2011-08-31 12:42:04 +01:00
parent b54dbfb492
commit 188636c6af
19 changed files with 727 additions and 437 deletions

20
brick.c
View File

@ -857,13 +857,13 @@ int set_recursive_button(struct generic_brick *orig_brick, brick_switch_t mode,
/* eliminate duplicates */ \
for (j = 0; j < stack; j++) { \
if (table[j] == (next)) { \
BRICK_DBG(" double entry %d '%s' stack = %d\n", i, (next)->brick_name, stack); \
BRICK_DBG(" double entry %d '%s' stack = %d\n", i, SAFE_STR((next)->brick_name), stack); \
found = true; \
break; \
} \
} \
if (!found) { \
BRICK_DBG(" push '%s' stack = %d\n", (next)->brick_name, stack); \
BRICK_DBG(" push '%s' stack = %d\n", SAFE_STR((next)->brick_name), stack); \
table[stack++] = (next); \
if (unlikely(stack > max)) { \
BRICK_ERR("---- max = %d overflow, restarting...\n", max); \
@ -873,7 +873,7 @@ int set_recursive_button(struct generic_brick *orig_brick, brick_switch_t mode,
}
restart:
BRICK_DBG("-> orig_brick = '%s'\n", orig_brick->brick_name);
BRICK_DBG("-> orig_brick = '%s'\n", SAFE_STR(orig_brick->brick_name));
brick_mem_free(table);
max <<= 1;
table = brick_mem_alloc(max * sizeof(void*));
@ -888,7 +888,7 @@ int set_recursive_button(struct generic_brick *orig_brick, brick_switch_t mode,
for (pos = 0; pos < stack; pos++) {
struct generic_brick *brick = table[pos];
BRICK_DBG("--> pos = %d stack = %d brick = '%s' inputs = %d/%d outputs = %d/%d\n", pos, stack, brick->brick_name, brick->nr_inputs, brick->type->max_inputs, brick->nr_outputs, brick->type->max_outputs);
BRICK_DBG("--> pos = %d stack = %d brick = '%s' inputs = %d/%d outputs = %d/%d\n", pos, stack, SAFE_STR(brick->brick_name), brick->nr_inputs, brick->type->max_inputs, brick->nr_outputs, brick->type->max_outputs);
if (val) {
force = false;
@ -945,19 +945,19 @@ int set_recursive_button(struct generic_brick *orig_brick, brick_switch_t mode,
while (stack > 0) {
struct generic_brick *brick = table[--stack];
BRICK_DBG("--> switch '%s' stack = %d\n", brick->brick_name, stack);
BRICK_DBG("--> switch '%s' stack = %d\n", SAFE_STR(brick->brick_name), stack);
set_button_wait(brick, val, force, timeout);
if (val ? !brick->power.led_on : !brick->power.led_off) {
BRICK_DBG("switching to %d: brick '%s' not ready (%s)\n", val, brick->brick_name, orig_brick->brick_name);
BRICK_DBG("switching to %d: brick '%s' not ready (%s)\n", val, SAFE_STR(brick->brick_name), SAFE_STR(orig_brick->brick_name));
goto done;
}
if (force && !val && (mode == BR_FREE_ONE || mode == BR_FREE_ALL) && brick->free) {
BRICK_DBG("---> freeing '%s'\n", brick->brick_name);
BRICK_DBG("---> freeing '%s'\n", SAFE_STR(brick->brick_name));
status = brick->free(brick);
BRICK_DBG("---> freeing '%s' status = %d\n", brick->brick_name, status);
BRICK_DBG("---> freeing '%s' status = %d\n", SAFE_STR(brick->brick_name), status);
if (status < 0) {
BRICK_DBG("freeing brick '%s' (%s) failed, status = %d\n", brick->brick_name, orig_brick->brick_name, status);
BRICK_DBG("freeing brick '%s' (%s) failed, status = %d\n", SAFE_STR(brick->brick_name), SAFE_STR(orig_brick->brick_name), status);
goto done;
}
}
@ -966,7 +966,7 @@ int set_recursive_button(struct generic_brick *orig_brick, brick_switch_t mode,
status = 0;
done:
BRICK_DBG("-> done '%s' status = %d\n", orig_brick->brick_name, status);
BRICK_DBG("-> done '%s' status = %d\n", SAFE_STR(orig_brick->brick_name), status);
brick_mem_free(table);
return status;
}

36
brick.h
View File

@ -11,6 +11,26 @@
#include "meta.h"
#define MAX_BRICK_TYPES 64
#ifndef BRICK_OBJ_MAX
#define BRICK_OBJ_MAX /*empty => leads to an open array */
#endif
extern int brick_layout_generation;
extern int brick_obj_max;
#ifdef _STRATEGY
#define _STRATEGY_CODE(X) X
#define _NORMAL_CODE(X) /**/
#else
#define _STRATEGY_CODE(X) /**/
#define _NORMAL_CODE(X) X
#endif
/////////////////////////////////////////////////////////////////////////
// printk() replacements
#ifdef CONFIG_DEBUG_KERNEL
#define INLINE static inline
//#define INLINE __attribute__((__noinline__))
@ -27,13 +47,7 @@ extern void brick_dump_stack(void);
#endif // CONFIG_DEBUG_KERNEL
#ifdef _STRATEGY
#define _STRATEGY_CODE(X) X
#define _NORMAL_CODE(X) /**/
#else
#define _STRATEGY_CODE(X) /**/
#define _NORMAL_CODE(X) X
#endif
#define SAFE_STR(str) ((str) ? (str) : "NULL")
#define BRICK_FATAL "BRICK_FATAL "
#define BRICK_ERROR "BRICK_ERROR "
@ -62,14 +76,6 @@ extern void brick_dump_stack(void);
#define BRICK_IO(_args...) /*empty*/
#endif
#define MAX_BRICK_TYPES 64
#ifndef BRICK_OBJ_MAX
#define BRICK_OBJ_MAX /*empty => leads to an open array */
#endif
extern int brick_layout_generation;
extern int brick_obj_max;
/////////////////////////////////////////////////////////////////////////
// number management helpers

View File

@ -3,6 +3,7 @@
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/mm.h>
#include <linux/fs.h>
#include <asm/atomic.h>
@ -27,6 +28,18 @@
/////////////////////////////////////////////////////////////////////////
// limit handling
#define LIMIT_MEM
#ifdef LIMIT_MEM
#include <linux/swap.h>
#include <linux/mm.h>
#endif
long long brick_global_memlimit = 0;
EXPORT_SYMBOL_GPL(brick_global_memlimit);
/////////////////////////////////////////////////////////////////////////
// small memory allocation (use this only for len < PAGE_SIZE)
#ifdef BRICK_DEBUG_MEM
@ -40,7 +53,11 @@ static int mem_len[BRICK_DEBUG_MEM] = {};
void *_brick_mem_alloc(int len, int line)
{
void *res = kmalloc(len + PLUS_SIZE + sizeof(int), GFP_BRICK);
void *res;
#ifdef CONFIG_DEBUG_KERNEL
might_sleep();
#endif
res = kmalloc(len + PLUS_SIZE + sizeof(int), GFP_BRICK);
#ifdef BRICK_DEBUG_MEM
if (likely(res)) {
if (unlikely(line < 0))
@ -96,6 +113,9 @@ static atomic_t string_free[BRICK_DEBUG_MEM] = {};
char *_brick_string_alloc(int len, int line)
{
char *res;
#ifdef CONFIG_DEBUG_KERNEL
might_sleep();
#endif
#ifdef FIXME
if (len <= 0)
len = BRICK_STRING_LEN;
@ -167,6 +187,9 @@ void *_brick_block_alloc(loff_t pos, int len, int line)
return NULL;
}
#endif
#ifdef CONFIG_DEBUG_KERNEL
might_sleep();
#endif
#ifdef USE_OFFSET
offset = pos & (PAGE_SIZE-1);
#endif
@ -291,6 +314,11 @@ EXPORT_SYMBOL_GPL(brick_mem_statistics);
int __init init_brick_mem(void)
{
#ifdef LIMIT_MEM // provisionary
brick_global_memlimit = total_swapcache_pages * (PAGE_SIZE / 4);
BRICK_INF("brick_global_memlimit = %lld\n", brick_global_memlimit);
#endif
return 0;
}

View File

@ -7,6 +7,8 @@
#define GFP_BRICK GFP_NOIO
//#define GFP_BRICK GFP_KERNEL // can lead to deadlocks!
extern long long brick_global_memlimit;
/////////////////////////////////////////////////////////////////////////
// small memory allocation (use this only for len < PAGE_SIZE)

9
mars.h
View File

@ -9,8 +9,6 @@
#define msleep msleep_interruptible
extern long long mars_global_memlimit;
/////////////////////////////////////////////////////////////////////////
// include the generic brick infrastructure
@ -309,11 +307,14 @@ extern const struct meta mars_timespec_meta[];
MARS_ERR("%d: list_head " #head " (%p) not empty\n", __LINE__, head); \
} \
#define CHECK_PTR(ptr,label) \
#define CHECK_PTR_NULL(ptr,label) \
if (CHECKING && unlikely(!(ptr))) { \
MARS_FAT("%d: ptr '" #ptr "' is NULL\n", __LINE__); \
goto label; \
} \
}
#define CHECK_PTR(ptr,label) \
CHECK_PTR_NULL(ptr, label); \
if (CHECKING && unlikely(!virt_addr_valid(ptr))) { \
MARS_FAT("%d: ptr '" #ptr "' is no valid virtual KERNEL address\n", __LINE__); \
goto label; \

View File

@ -616,6 +616,22 @@ static int aio_event_thread(void *data)
return err;
}
#if 1
/* This should go to fs/open.c (as long as vfs_submit() is not implemented)
*/
#include <linux/fdtable.h>
void fd_uninstall(unsigned int fd)
{
struct files_struct *files = current->files;
struct fdtable *fdt;
spin_lock(&files->file_lock);
fdt = files_fdtable(files);
rcu_assign_pointer(fdt->fd[fd], NULL);
spin_unlock(&files->file_lock);
}
EXPORT_SYMBOL(fd_uninstall);
#endif
static int aio_submit_thread(void *data)
{
struct aio_threadinfo *tinfo = data;
@ -623,8 +639,9 @@ static int aio_submit_thread(void *data)
struct file *file = output->filp;
int err;
/* TODO: this is provisionary. We only need it for sys_io_submit().
* The latter should be accompanied by a future vfs_submit() or
/* TODO: this is provisionary. We only need it for sys_io_submit()
* which uses userspace concepts like file handles.
* This should be accompanied by a future kernelsapce vfs_submit() or
* do_submit() which currently does not exist :(
* FIXME: corresponding cleanup NYI
*/
@ -759,7 +776,8 @@ static int aio_submit_thread(void *data)
output->ctxp = 0;
}
#endif
MARS_INF("destroying fd.....\n");
MARS_INF("destroying fd %d\n", output->fd);
fd_uninstall(output->fd);
put_unused_fd(output->fd);
unuse_fake_mm();
err = 0;

View File

@ -21,22 +21,26 @@
static int thread_count = 0;
static void _kill_thread(struct client_threadinfo *ti)
{
if (ti->thread) {
MARS_INF("stopping thread...\n");
kthread_stop(ti->thread);
ti->thread = NULL;
}
}
static void _kill_socket(struct client_output *output)
{
if (output->socket) {
MARS_DBG("shutdown socket\n");
kernel_sock_shutdown(output->socket, SHUT_WR);
//sock_release(output->socket);
output->socket = NULL;
mars_shutdown_socket(output->socket);
}
}
static void _kill_thread(struct client_threadinfo *ti)
{
if (ti->thread && !ti->terminated) {
MARS_INF("stopping thread...\n");
kthread_stop(ti->thread);
ti->thread = NULL;
_kill_thread(&output->receiver);
if (output->socket) {
MARS_DBG("close socket\n");
mars_put_socket(output->socket);
output->socket = NULL;
}
}
@ -48,19 +52,21 @@ static int _request_info(struct client_output *output)
int status;
MARS_DBG("\n");
status = mars_send_struct(&output->socket, &cmd, mars_cmd_meta);
status = mars_send_struct(output->socket, &cmd, mars_cmd_meta);
if (unlikely(status < 0)) {
MARS_DBG("send of getinfo failed, status = %d\n", status);
}
return status;
}
static int receiver_thread(void *data);
static int _connect(struct client_output *output, const char *str)
{
struct sockaddr_storage sockaddr = {};
int status;
if (!output->path) {
if (unlikely(!output->path)) {
output->path = brick_strdup(str);
status = -ENOMEM;
if (!output->path) {
@ -78,14 +84,18 @@ static int _connect(struct client_output *output, const char *str)
*output->host++ = '\0';
}
_kill_socket(output);
status = mars_create_sockaddr(&sockaddr, output->host);
if (unlikely(status < 0)) {
MARS_DBG("no sockaddr, status = %d\n", status);
goto done;
}
status = mars_create_socket(&output->socket, &sockaddr, false);
if (unlikely(status < 0)) {
output->socket = mars_create_socket(&sockaddr, false);
if (unlikely(IS_ERR(output->socket))) {
status = PTR_ERR(output->socket);
output->socket = NULL;
if (status == -EINPROGRESS) {
MARS_DBG("operation is in progress....\n");
goto really_done; // give it a chance next time
@ -94,13 +104,24 @@ static int _connect(struct client_output *output, const char *str)
goto done;
}
output->receiver.thread = kthread_create(receiver_thread, output, "mars_receiver%d", thread_count++);
if (unlikely(IS_ERR(output->receiver.thread))) {
status = PTR_ERR(output->receiver.thread);
MARS_ERR("cannot start receiver thread, status = %d\n", status);
output->receiver.thread = NULL;
output->receiver.terminated = true;
goto done;
}
wake_up_process(output->receiver.thread);
{
struct mars_cmd cmd = {
.cmd_code = CMD_CONNECT,
.cmd_str1 = output->path,
};
status = mars_send_struct(&output->socket, &cmd, mars_cmd_meta);
status = mars_send_struct(output->socket, &cmd, mars_cmd_meta);
if (unlikely(status < 0)) {
MARS_DBG("send of connect failed, status = %d\n", status);
goto done;
@ -239,7 +260,7 @@ int receiver_thread(void *data)
struct client_output *output = data;
int status = 0;
while (status >= 0 && output->socket && !kthread_should_stop()) {
while (status >= 0 && mars_socket_is_alive(output->socket) && !kthread_should_stop()) {
struct mars_cmd cmd = {};
struct list_head *tmp;
struct client_mref_aspect *mref_a = NULL;
@ -247,7 +268,7 @@ int receiver_thread(void *data)
struct generic_callback *cb;
unsigned long flags;
status = mars_recv_struct(&output->socket, &cmd, mars_cmd_meta);
status = mars_recv_struct(output->socket, &cmd, mars_cmd_meta);
MARS_IO("got cmd = %d status = %d\n", cmd.cmd_code, status);
if (status < 0)
goto done;
@ -291,7 +312,7 @@ int receiver_thread(void *data)
MARS_IO("got callback id = %d, old pos = %lld len = %d rw = %d\n", mref->ref_id, mref->ref_pos, mref->ref_len, mref->ref_rw);
status = mars_recv_cb(&output->socket, mref);
status = mars_recv_cb(output->socket, mref);
MARS_IO("new status = %d, pos = %lld len = %d rw = %d\n", status, mref->ref_pos, mref->ref_len, mref->ref_rw);
if (status < 0) {
MARS_ERR("interrupted data transfer during callback, status = %d\n", status);
@ -313,7 +334,7 @@ int receiver_thread(void *data)
break;
}
case CMD_GETINFO:
status = mars_recv_struct(&output->socket, &output->info, mars_info_meta);
status = mars_recv_struct(output->socket, &output->info, mars_info_meta);
if (status < 0) {
MARS_ERR("got bad info from remote side, status = %d\n", status);
goto done;
@ -333,14 +354,8 @@ int receiver_thread(void *data)
if (status < 0) {
MARS_ERR("receiver thread terminated with status = %d\n", status);
}
#if 0
if (output->socket) {
MARS_INF("shutting down socket\n");
kernel_sock_shutdown(output->socket, SHUT_WR);
msleep(1000);
output->socket = NULL;
}
#endif
mars_shutdown_socket(output->socket);
output->receiver.terminated = true;
wake_up_interruptible(&output->receiver.run_event);
return status;
@ -391,26 +406,6 @@ static int sender_thread(void *data)
wait_event_interruptible_timeout(output->event, !list_empty(&output->mref_list) || output->get_info, 10 * HZ);
if (unlikely(output->receiver.terminated)) {
#if 1
if (unlikely(output->receiver.restart_count++ > 3)) { // don't restart too often
MARS_ERR("receiver failed too often, giving up\n");
status = -ECOMM;
break;
}
#endif
output->receiver.terminated = false;
output->receiver.thread = kthread_create(receiver_thread, output, "mars_receiver%d", thread_count++);
if (unlikely(IS_ERR(output->receiver.thread))) {
MARS_ERR("cannot start receiver thread, status = %d\n", (int)PTR_ERR(output->receiver.thread));
output->receiver.thread = NULL;
output->receiver.terminated = true;
msleep(5000);
continue;
}
wake_up_process(output->receiver.thread);
}
if (output->get_info) {
status = _request_info(output);
if (status >= 0) {
@ -432,7 +427,7 @@ static int sender_thread(void *data)
MARS_IO("sending mref, id = %d pos = %lld len = %d rw = %d\n", mref->ref_id, mref->ref_pos, mref->ref_len, mref->ref_rw);
status = mars_send_mref(&output->socket, mref);
status = mars_send_mref(output->socket, mref);
MARS_IO("status = %d\n", status);
if (unlikely(status < 0)) {
// retry submission on next occasion..
@ -444,19 +439,16 @@ static int sender_thread(void *data)
MARS_ERR("sending failed, status = %d\n", status);
_kill_socket(output);
_kill_thread(&output->receiver);
wait_event_interruptible_timeout(output->receiver.run_event, output->receiver.terminated, 10 * HZ);
continue;
}
}
//done:
if (status < 0)
if (status < 0) {
MARS_ERR("sender thread terminated with status = %d\n", status);
}
_kill_socket(output);
_kill_thread(&output->receiver);
output->sender.terminated = true;
wake_up_interruptible(&output->sender.run_event);

View File

@ -38,7 +38,7 @@ struct client_output {
struct list_head wait_list;
wait_queue_head_t event;
int last_id;
struct socket *socket;
struct mars_socket *socket;
char *host;
char *path;
struct client_threadinfo sender;

View File

@ -226,15 +226,6 @@ EXPORT_SYMBOL_GPL(mars_power_led_off);
void (*_mars_trigger)(void) = NULL;
EXPORT_SYMBOL_GPL(_mars_trigger);
#define LIMIT_MEM
#ifdef LIMIT_MEM
#include <linux/swap.h>
#include <linux/mm.h>
#endif
long long mars_global_memlimit = 0;
EXPORT_SYMBOL_GPL(mars_global_memlimit);
struct mm_struct *mm_fake = NULL;
EXPORT_SYMBOL_GPL(mm_fake);
@ -242,11 +233,6 @@ int __init init_mars(void)
{
MARS_INF("init_mars()\n");
#ifdef LIMIT_MEM // provisionary
mars_global_memlimit = total_swapcache_pages * (PAGE_SIZE / 4);
MARS_INF("mars_global_memlimit = %lld\n", mars_global_memlimit);
#endif
set_fake();
#ifdef MARS_TRACING

View File

@ -93,68 +93,164 @@ int mars_create_sockaddr(struct sockaddr_storage *addr, const char *spec)
}
EXPORT_SYMBOL_GPL(mars_create_sockaddr);
int mars_create_socket(struct socket **sock, struct sockaddr_storage *addr, bool is_server)
/* The original struct socket has no refcount. This leads to problems
* during long-lasting system calls when racing with socket shutdown.
* This is just a small wrapper adding a refcount.
*/
struct mars_socket {
atomic_t s_count;
struct socket *s_socket;
bool s_dead;
};
struct mars_socket *mars_create_socket(struct sockaddr_storage *addr, bool is_server)
{
struct mars_socket *msock;
struct socket *sock;
struct sockaddr *sockaddr = (void*)addr;
int x_true = 1;
int status = 0;
int status = -ENOMEM;
if (!*sock) {
status = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, sock);
if (unlikely(status < 0)) {
*sock = NULL;
MARS_WRN("cannot create socket, status = %d\n", status);
goto done;
}
msock = brick_zmem_alloc(sizeof(struct mars_socket));
if (!msock)
goto done;
/* TODO: improve this by a table-driven approach
*/
(*sock)->sk->sk_rcvtimeo = (*sock)->sk->sk_sndtimeo = default_tcp_params.tcp_timeout * HZ;
status = kernel_setsockopt(*sock, SOL_SOCKET, SO_SNDBUF, (char*)&default_tcp_params.window_size, sizeof(default_tcp_params.window_size));
_check(status);
status = kernel_setsockopt(*sock, SOL_SOCKET, SO_RCVBUF, (char*)&default_tcp_params.window_size, sizeof(default_tcp_params.window_size));
_check(status);
status = kernel_setsockopt(*sock, SOL_IP, SO_PRIORITY, (char*)&default_tcp_params.tos, sizeof(default_tcp_params.tos));
_check(status);
#if 0
status = kernel_setsockopt(*sock, IPPROTO_TCP, TCP_NODELAY, (char*)&x_true, sizeof(x_true));
#endif
_check(status);
status = kernel_setsockopt(*sock, SOL_SOCKET, SO_KEEPALIVE, (char*)&x_true, sizeof(x_true));
_check(status);
status = kernel_setsockopt(*sock, IPPROTO_TCP, TCP_KEEPCNT, (char*)&default_tcp_params.tcp_keepcnt, sizeof(default_tcp_params.tcp_keepcnt));
_check(status);
status = kernel_setsockopt(*sock, IPPROTO_TCP, TCP_KEEPINTVL, (char*)&default_tcp_params.tcp_keepintvl, sizeof(default_tcp_params.tcp_keepintvl));
_check(status);
status = kernel_setsockopt(*sock, IPPROTO_TCP, TCP_KEEPIDLE, (char*)&default_tcp_params.tcp_keepidle, sizeof(default_tcp_params.tcp_keepidle));
_check(status);
status = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &msock->s_socket);
if (unlikely(status < 0)) {
MARS_WRN("cannot create socket, status = %d\n", status);
goto done;
}
atomic_set(&msock->s_count, 1);
sock = msock->s_socket;
status = -EINVAL;
CHECK_PTR(sock, done);
/* TODO: improve this by a table-driven approach
*/
sock->sk->sk_rcvtimeo = sock->sk->sk_sndtimeo = default_tcp_params.tcp_timeout * HZ;
status = kernel_setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char*)&default_tcp_params.window_size, sizeof(default_tcp_params.window_size));
_check(status);
status = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char*)&default_tcp_params.window_size, sizeof(default_tcp_params.window_size));
_check(status);
status = kernel_setsockopt(sock, SOL_IP, SO_PRIORITY, (char*)&default_tcp_params.tos, sizeof(default_tcp_params.tos));
_check(status);
#if 0
status = kernel_setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char*)&x_true, sizeof(x_true));
#endif
_check(status);
status = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char*)&x_true, sizeof(x_true));
_check(status);
status = kernel_setsockopt(sock, IPPROTO_TCP, TCP_KEEPCNT, (char*)&default_tcp_params.tcp_keepcnt, sizeof(default_tcp_params.tcp_keepcnt));
_check(status);
status = kernel_setsockopt(sock, IPPROTO_TCP, TCP_KEEPINTVL, (char*)&default_tcp_params.tcp_keepintvl, sizeof(default_tcp_params.tcp_keepintvl));
_check(status);
status = kernel_setsockopt(sock, IPPROTO_TCP, TCP_KEEPIDLE, (char*)&default_tcp_params.tcp_keepidle, sizeof(default_tcp_params.tcp_keepidle));
_check(status);
if (is_server) {
status = kernel_bind(*sock, sockaddr, sizeof(*sockaddr));
status = kernel_bind(sock, sockaddr, sizeof(*sockaddr));
if (unlikely(status < 0)) {
MARS_WRN("bind failed, status = %d\n", status);
sock_release(*sock);
*sock = NULL;
goto done;
}
status = kernel_listen(*sock, 16);
status = kernel_listen(sock, 16);
if (status < 0) {
MARS_WRN("listen failed, status = %d\n", status);
}
} else {
status = kernel_connect(*sock, sockaddr, sizeof(*sockaddr), 0);
status = kernel_connect(sock, sockaddr, sizeof(*sockaddr), 0);
if (status < 0) {
MARS_DBG("connect failed, status = %d\n", status);
}
}
done:
return status;
if (status < 0) {
mars_put_socket(msock);
msock = ERR_PTR(status);
}
return msock;
}
EXPORT_SYMBOL_GPL(mars_create_socket);
int mars_send(struct socket **sock, void *buf, int len)
struct mars_socket *mars_accept_socket(struct mars_socket *msock, bool do_block)
{
int status = -ENOENT;
if (likely(msock)) {
struct mars_socket *new_msock;
struct socket *new_socket = NULL;
mars_get_socket(msock);
status = kernel_accept(msock->s_socket, &new_socket, do_block ? 0 : O_NONBLOCK);
mars_put_socket(msock);
if (unlikely(status < 0)) {
goto err;
}
status = -ENOMEM;
new_msock = brick_zmem_alloc(sizeof(struct mars_socket));
if (!new_msock) {
kernel_sock_shutdown(new_socket, SHUT_WR);
sock_release(new_socket);
goto err;
}
atomic_set(&new_msock->s_count, 1);
new_msock->s_socket = new_socket;
return new_msock;
}
err:
return ERR_PTR(status);
}
EXPORT_SYMBOL_GPL(mars_accept_socket);
struct mars_socket *mars_get_socket(struct mars_socket *msock)
{
if (likely(msock)) {
atomic_inc(&msock->s_count);
}
return msock;
}
EXPORT_SYMBOL_GPL(mars_get_socket);
void mars_put_socket(struct mars_socket *msock)
{
if (likely(msock)) {
if (atomic_dec_and_test(&msock->s_count)) {
struct socket *sock = msock->s_socket;
if (sock) {
if (!msock->s_dead) {
msock->s_dead = true;
kernel_sock_shutdown(sock, SHUT_WR);
}
sock_release(sock);
}
brick_mem_free(msock);
}
}
}
EXPORT_SYMBOL_GPL(mars_put_socket);
void mars_shutdown_socket(struct mars_socket *msock)
{
if (likely(msock)) {
struct socket *sock = msock->s_socket;
if (sock && !msock->s_dead) {
msock->s_dead = true;
kernel_sock_shutdown(sock, SHUT_WR);
}
}
}
EXPORT_SYMBOL_GPL(mars_shutdown_socket);
bool mars_socket_is_alive(struct mars_socket *msock)
{
if (!msock || msock->s_dead)
return false;
return true;
}
EXPORT_SYMBOL_GPL(mars_socket_is_alive);
int mars_send(struct mars_socket *msock, void *buf, int len)
{
struct kvec iov = {
.iov_base = buf,
@ -167,15 +263,18 @@ int mars_send(struct socket **sock, void *buf, int len)
int status = -EIDRM;
int sent = 0;
if (!mars_get_socket(msock))
goto done;
//MARS_IO("buf = %p, len = %d\n", buf, len);
while (sent < len) {
if (unlikely(!*sock)) {
if (unlikely(msock->s_dead)) {
MARS_WRN("socket has disappeared\n");
status = -EIDRM;
goto done;
}
status = kernel_sendmsg(*sock, &msg, &iov, 1, len);
status = kernel_sendmsg(msock->s_socket, &msg, &iov, 1, len);
if (status == -EAGAIN) {
msleep(50);
@ -204,12 +303,14 @@ int mars_send(struct socket **sock, void *buf, int len)
sent += status;
}
status = sent;
done:
mars_put_socket(msock);
return status;
}
EXPORT_SYMBOL_GPL(mars_send);
int mars_recv(struct socket **sock, void *buf, int minlen, int maxlen)
int mars_recv(struct mars_socket *msock, void *buf, int minlen, int maxlen)
{
int status = -EIDRM;
int done = 0;
@ -219,6 +320,9 @@ int mars_recv(struct socket **sock, void *buf, int minlen, int maxlen)
return -EINVAL;
}
if (!mars_get_socket(msock))
goto err;
while (done < minlen) {
struct kvec iov = {
.iov_base = buf + done,
@ -230,7 +334,7 @@ int mars_recv(struct socket **sock, void *buf, int minlen, int maxlen)
.msg_flags = 0 | MSG_WAITALL | MSG_NOSIGNAL,
};
if (unlikely(!*sock)) {
if (unlikely(msock->s_dead)) {
MARS_WRN("socket has disappeared\n");
status = -EIDRM;
goto err;
@ -238,7 +342,7 @@ int mars_recv(struct socket **sock, void *buf, int minlen, int maxlen)
MARS_IO("done %d, fetching %d bytes\n", done, maxlen-done);
status = kernel_recvmsg(*sock, &msg, &iov, 1, maxlen-done, msg.msg_flags);
status = kernel_recvmsg(msock->s_socket, &msg, &iov, 1, maxlen-done, msg.msg_flags);
if (status == -EAGAIN) {
#if 0
@ -249,7 +353,7 @@ int mars_recv(struct socket **sock, void *buf, int minlen, int maxlen)
continue;
}
if (!status) { // EOF
MARS_WRN("got EOF (done=%d, req_size=%d)\n", done, maxlen-done);
MARS_WRN("got EOF from socket (done=%d, req_size=%d)\n", done, maxlen - done);
status = -EPIPE;
goto err;
}
@ -262,6 +366,7 @@ int mars_recv(struct socket **sock, void *buf, int minlen, int maxlen)
status = done;
err:
mars_put_socket(msock);
return status;
}
EXPORT_SYMBOL_GPL(mars_recv);
@ -284,16 +389,16 @@ struct mars_net_header {
u16 h_len;
};
int _mars_send_struct(struct socket **sock, void *data, const struct meta *meta, int *seq)
int _mars_send_struct(struct mars_socket *msock, void *data, const struct meta *meta, int *seq)
{
int count = 0;
int status = 0;
if (!data) { // send EOF
if (!data) { // send EOR
struct mars_net_header header = {
.h_magic = MARS_NET_MAGIC,
.h_seq = -1,
};
return mars_send(sock, &header, sizeof(header));
return mars_send(msock, &header, sizeof(header));
}
for (; ; meta++) {
struct mars_net_header header = {
@ -357,7 +462,7 @@ int _mars_send_struct(struct socket **sock, void *data, const struct meta *meta,
}
MARS_IO("sending header %d '%s' len = %d\n", header.h_seq, header.h_name, len);
status = mars_send(sock, &header, sizeof(header));
status = mars_send(msock, &header, sizeof(header));
if (status < 0 || !meta->field_name) { // EOR
break;
}
@ -365,14 +470,14 @@ int _mars_send_struct(struct socket **sock, void *data, const struct meta *meta,
switch (meta->field_type) {
case FIELD_REF:
case FIELD_SUB:
status = _mars_send_struct(sock, item, meta->field_ref, seq);
status = _mars_send_struct(msock, item, meta->field_ref, seq);
if (status > 0)
count += status;
break;
default:
if (len > 0) {
MARS_IO("sending extra %d\n", len);
status = mars_send(sock, item, len);
status = mars_send(msock, item, len);
if (status > 0)
count++;
}
@ -387,14 +492,14 @@ int _mars_send_struct(struct socket **sock, void *data, const struct meta *meta,
return status;
}
int mars_send_struct(struct socket **sock, void *data, const struct meta *meta)
int mars_send_struct(struct mars_socket *msock, void *data, const struct meta *meta)
{
int seq = 0;
return _mars_send_struct(sock, data, meta, &seq);
return _mars_send_struct(msock, data, meta, &seq);
}
EXPORT_SYMBOL_GPL(mars_send_struct);
int _mars_recv_struct(struct socket **sock, void *data, const struct meta *meta, int *seq, int line)
int _mars_recv_struct(struct mars_socket *msock, void *data, const struct meta *meta, int *seq, int line)
{
int count = 0;
int status = -EINVAL;
@ -408,7 +513,7 @@ int _mars_recv_struct(struct socket **sock, void *data, const struct meta *meta,
const struct meta *tmp;
void *item;
void *mem;
status = mars_recv(sock, &header, sizeof(header), sizeof(header));
status = mars_recv(msock, &header, sizeof(header), sizeof(header));
if (status == -EAGAIN) {
msleep(50);
continue;
@ -423,7 +528,7 @@ int _mars_recv_struct(struct socket **sock, void *data, const struct meta *meta,
status = -ENOMSG;
break;
}
if (header.h_seq == -1) { // got EOF
if (header.h_seq == -1) { // got EOR
status = 0;
break;
};
@ -447,7 +552,7 @@ int _mars_recv_struct(struct socket **sock, void *data, const struct meta *meta,
status = -ENOMEM;
if (!dummy)
break;
status = mars_recv(sock, dummy, header.h_len, header.h_len);
status = mars_recv(msock, dummy, header.h_len, header.h_len);
brick_mem_free(dummy);
if (status < 0)
break;
@ -488,7 +593,7 @@ int _mars_recv_struct(struct socket **sock, void *data, const struct meta *meta,
}
MARS_IO("starting recursive structure\n");
status = _mars_recv_struct(sock, item, tmp->field_ref, seq, line);
status = _mars_recv_struct(msock, item, tmp->field_ref, seq, line);
MARS_IO("ending recursive structure, status = %d\n", status);
if (status > 0)
@ -502,10 +607,10 @@ int _mars_recv_struct(struct socket **sock, void *data, const struct meta *meta,
break;
}
MARS_IO("reading extra %d\n", header.h_len);
status = mars_recv(sock, item, header.h_len, header.h_len);
status = mars_recv(msock, item, header.h_len, header.h_len);
while (status == -EAGAIN) {
msleep(50);
status = mars_recv(sock, item, header.h_len, header.h_len);
status = mars_recv(msock, item, header.h_len, header.h_len);
}
if (status >= 0) {
//MARS_IO("got data len = %d status = %d\n", header.h_len, status);
@ -558,7 +663,7 @@ void mars_check_meta(const struct meta *meta, void *data)
}
int mars_send_mref(struct socket **sock, struct mref_object *mref)
int mars_send_mref(struct mars_socket *msock, struct mref_object *mref)
{
struct mars_cmd cmd = {
.cmd_code = CMD_MREF,
@ -566,26 +671,26 @@ int mars_send_mref(struct socket **sock, struct mref_object *mref)
};
int status;
status = mars_send_struct(sock, &cmd, mars_cmd_meta);
status = mars_send_struct(msock, &cmd, mars_cmd_meta);
if (status < 0)
goto done;
status = mars_send_struct(sock, mref, mars_mref_meta);
status = mars_send_struct(msock, mref, mars_mref_meta);
if (status < 0)
goto done;
if (mref->ref_rw) {
status = mars_send(sock, mref->ref_data, mref->ref_len);
status = mars_send(msock, mref->ref_data, mref->ref_len);
}
done:
return status;
}
EXPORT_SYMBOL_GPL(mars_send_mref);
int mars_recv_mref(struct socket **sock, struct mref_object *mref)
int mars_recv_mref(struct mars_socket *msock, struct mref_object *mref)
{
int status;
status = mars_recv_struct(sock, mref, mars_mref_meta);
status = mars_recv_struct(msock, mref, mars_mref_meta);
if (status < 0)
goto done;
if (mref->ref_rw) {
@ -595,7 +700,7 @@ int mars_recv_mref(struct socket **sock, struct mref_object *mref)
status = -ENOMEM;
goto done;
}
status = mars_recv(sock, mref->ref_data, mref->ref_len, mref->ref_len);
status = mars_recv(msock, mref->ref_data, mref->ref_len, mref->ref_len);
if (status < 0)
MARS_WRN("mref_len = %d, status = %d\n", mref->ref_len, status);
}
@ -604,32 +709,32 @@ done:
}
EXPORT_SYMBOL_GPL(mars_recv_mref);
int mars_send_cb(struct socket **sock, struct mref_object *mref)
int mars_send_cb(struct mars_socket *msock, struct mref_object *mref)
{
struct mars_cmd cmd = {
.cmd_code = CMD_CB,
.cmd_int1 = mref->ref_id,
};
int status;
status = mars_send_struct(sock, &cmd, mars_cmd_meta);
status = mars_send_struct(msock, &cmd, mars_cmd_meta);
if (status < 0)
goto done;
status = mars_send_struct(sock, mref, mars_mref_meta);
status = mars_send_struct(msock, mref, mars_mref_meta);
if (status < 0)
goto done;
if (!mref->ref_rw) {
MARS_IO("sending blocklen = %d\n", mref->ref_len);
status = mars_send(sock, mref->ref_data, mref->ref_len);
status = mars_send(msock, mref->ref_data, mref->ref_len);
}
done:
return status;
}
EXPORT_SYMBOL_GPL(mars_send_cb);
int mars_recv_cb(struct socket **sock, struct mref_object *mref)
int mars_recv_cb(struct mars_socket *msock, struct mref_object *mref)
{
int status;
status = mars_recv_struct(sock, mref, mars_mref_meta);
status = mars_recv_struct(msock, mref, mars_mref_meta);
if (status < 0)
goto done;
if (!mref->ref_rw) {
@ -639,7 +744,7 @@ int mars_recv_cb(struct socket **sock, struct mref_object *mref)
goto done;
}
MARS_IO("receiving blocklen = %d\n", mref->ref_len);
status = mars_recv(sock, mref->ref_data, mref->ref_len, mref->ref_len);
status = mars_recv(msock, mref->ref_data, mref->ref_len, mref->ref_len);
}
done:
return status;

View File

@ -10,6 +10,8 @@
#define MARS_DEFAULT_PORT 7777
struct mars_socket; // opaque
struct mars_tcp_params {
int tcp_timeout;
int window_size;
@ -49,29 +51,36 @@ extern char *(*mars_translate_hostname)(const char *name);
/* Low-level network traffic
*/
extern int mars_create_sockaddr(struct sockaddr_storage *addr, const char *spec);
extern int mars_create_socket(struct socket **sock, struct sockaddr_storage *addr, bool is_server);
extern int mars_send(struct socket **sock, void *buf, int len);
extern int mars_recv(struct socket **sock, void *buf, int minlen, int maxlen);
extern struct mars_socket *mars_create_socket(struct sockaddr_storage *addr, bool is_server);
extern struct mars_socket *mars_accept_socket(struct mars_socket *msock, bool do_block);
extern struct mars_socket *mars_get_socket(struct mars_socket *msock);
extern void mars_put_socket(struct mars_socket *msock);
extern void mars_shutdown_socket(struct mars_socket *msock);
extern bool mars_socket_is_alive(struct mars_socket *msock);
extern int mars_send(struct mars_socket *msock, void *buf, int len);
extern int mars_recv(struct mars_socket *msock, void *buf, int minlen, int maxlen);
/* Mid-level generic field data exchange
*/
extern int mars_send_struct(struct socket **sock, void *data, const struct meta *meta);
extern int mars_send_struct(struct mars_socket *msock, void *data, const struct meta *meta);
#define mars_recv_struct(_sock_,_data_,_meta_) \
({ \
int seq = 0; \
_mars_recv_struct(_sock_, _data_, _meta_, &seq, __LINE__); \
})
extern int _mars_recv_struct(struct socket **sock, void *data, const struct meta *meta, int *seq, int line);
extern int _mars_recv_struct(struct mars_socket *msock, void *data, const struct meta *meta, int *seq, int line);
/* High-level transport of mars structures
*/
extern int mars_send_dent_list(struct socket **sock, struct list_head *anchor);
extern int mars_recv_dent_list(struct socket **sock, struct list_head *anchor);
extern int mars_send_dent_list(struct mars_socket *msock, struct list_head *anchor);
extern int mars_recv_dent_list(struct mars_socket *msock, struct list_head *anchor);
extern int mars_send_mref(struct socket **sock, struct mref_object *mref);
extern int mars_recv_mref(struct socket **sock, struct mref_object *mref);
extern int mars_send_cb(struct socket **sock, struct mref_object *mref);
extern int mars_recv_cb(struct socket **sock, struct mref_object *mref);
extern int mars_send_mref(struct mars_socket *msock, struct mref_object *mref);
extern int mars_recv_mref(struct mars_socket *msock, struct mref_object *mref);
extern int mars_send_cb(struct mars_socket *msock, struct mref_object *mref);
extern int mars_recv_cb(struct mars_socket *msock, struct mref_object *mref);
/////////////////////////////////////////////////////////////////////////

View File

@ -18,8 +18,10 @@
#include "mars_server.h"
static struct socket *server_socket = NULL;
static struct mars_socket *server_socket = NULL;
static struct task_struct *server_thread = NULL;
static LIST_HEAD(server_list);
static spinlock_t server_lock = SPIN_LOCK_UNLOCKED;
///////////////////////// own helper functions ////////////////////////
@ -28,6 +30,7 @@ static
int cb_thread(void *data)
{
struct server_brick *brick = data;
struct mars_socket *sock = brick->handler_socket;
int status = -EINVAL;
brick->cb_running = true;
@ -35,17 +38,12 @@ int cb_thread(void *data)
MARS_DBG("--------------- cb_thread starting on socket %p\n", brick->handler_socket);
while (!kthread_should_stop()) {
while (!kthread_should_stop() || !list_empty(&brick->cb_read_list) || !list_empty(&brick->cb_write_list)) {
struct server_mref_aspect *mref_a;
struct mref_object *mref;
struct list_head *tmp;
struct socket **sock;
unsigned long flags;
status = -EINVAL;
if (!brick->handler_socket)
break;
wait_event_interruptible_timeout(
brick->cb_event,
!list_empty(&brick->cb_read_list) ||
@ -53,9 +51,6 @@ int cb_thread(void *data)
kthread_should_stop(),
3 * HZ);
if (!brick->handler_socket)
break;
traced_lock(&brick->cb_lock, flags);
tmp = brick->cb_write_list.next;
if (tmp == &brick->cb_write_list) {
@ -70,27 +65,34 @@ int cb_thread(void *data)
mref_a = container_of(tmp, struct server_mref_aspect, cb_head);
mref = mref_a->object;
status = -EINVAL;
CHECK_PTR(mref, err);
status = -ENOTSOCK;
sock = mref_a->sock;
CHECK_PTR(sock, err);
CHECK_PTR(*sock, err);
down(&brick->socket_sem);
status = mars_send_cb(sock, mref);
up(&brick->socket_sem);
err:
if (unlikely(status < 0)) {
MARS_WRN("cannot send response, status = %d\n", status);
#if 0 // THINK: not sure whether we need this at all. The _client_ should be responsible for resending any lost operations. Disable this for the next future.
traced_lock(&brick->cb_lock, flags);
if (mref->ref_rw) {
list_add(tmp, &brick->cb_write_list);
} else {
list_add(tmp, &brick->cb_read_list);
}
traced_unlock(&brick->cb_lock, flags);
continue;
#else
mars_shutdown_socket(sock);
#endif
}
atomic_dec(&brick->in_flight);
GENERIC_INPUT_CALL(brick->inputs[0], mref_put, mref);
if (unlikely(status < 0)) {
MARS_ERR("cannot send response, status = %d\n", status);
kernel_sock_shutdown(*sock, SHUT_WR);
break;
}
}
err:
brick->cb_running = false;
MARS_DBG("---------- cb_thread terminating, status = %d\n", status);
wake_up_interruptible(&brick->startup_event);
@ -108,11 +110,15 @@ void server_endio(struct generic_callback *cb)
mref_a = cb->cb_private;
CHECK_PTR(mref_a, err);
brick = mref_a->brick;
CHECK_PTR(brick, err);
mref = mref_a->object;
CHECK_PTR(mref, err);
brick = mref_a->brick;
if (!brick) {
MARS_WRN("late IO callback -- cannot do anything\n");
return;
}
rw = mref->ref_rw;
traced_lock(&brick->cb_lock, flags);
@ -129,13 +135,13 @@ err:
MARS_FAT("cannot handle callback - giving up\n");
}
int server_io(struct server_brick *brick, struct socket **sock)
int server_io(struct server_brick *brick, struct mars_socket *sock)
{
struct mref_object *mref;
struct server_mref_aspect *mref_a;
int status = -ENOTRECOVERABLE;
if (!brick->cb_running)
if (!brick->cb_running || !mars_socket_is_alive(sock))
goto done;
mref = server_alloc_mref(&brick->hidden_output, &brick->mref_object_layout);
@ -156,7 +162,6 @@ int server_io(struct server_brick *brick, struct socket **sock)
}
mref_a->brick = brick;
mref_a->sock = sock;
mref->_ref_cb.cb_private = mref_a;
mref->_ref_cb.cb_fn = server_endio;
mref->ref_cb = &mref->_ref_cb;
@ -190,6 +195,7 @@ void _clean_list(struct server_brick *brick, struct list_head *start)
list_del_init(tmp);
mref_a = container_of(tmp, struct server_mref_aspect, cb_head);
mref_a->brick = NULL;
mref = mref_a->object;
if (!mref)
continue;
@ -198,22 +204,31 @@ void _clean_list(struct server_brick *brick, struct list_head *start)
}
}
static
struct task_struct *_grab_handler(struct server_brick *brick)
{
struct task_struct *res;
spin_lock(&server_lock);
list_del_init(&brick->server_link);
res = brick->handler_thread;
brick->handler_thread = NULL;
spin_unlock(&server_lock);
return res;
}
static
int handler_thread(void *data)
{
struct server_brick *brick = data;
struct socket **sock = &brick->handler_socket;
struct mars_socket *sock = brick->handler_socket;
struct task_struct *cb_thread = brick->cb_thread;
int max_round = 300;
struct task_struct *h_thread;
int status = 0;
brick->cb_thread = NULL;
brick->handler_thread = NULL;
wake_up_interruptible(&brick->startup_event);
MARS_DBG("--------------- handler_thread starting on socket %p\n", *sock);
if (!*sock)
goto done;
MARS_DBG("--------------- handler_thread starting on socket %p\n", sock);
//fake_mm();
while (brick->cb_running && !kthread_should_stop()) {
@ -221,7 +236,7 @@ int handler_thread(void *data)
status = mars_recv_struct(sock, &cmd, mars_cmd_meta);
if (status < 0) {
MARS_ERR("bad command status = %d\n", status);
MARS_WRN("bad command status = %d\n", status);
break;
}
@ -266,7 +281,7 @@ int handler_thread(void *data)
up(&brick->socket_sem);
if (status < 0) {
MARS_ERR("could not send dentry information, status = %d\n", status);
MARS_WRN("could not send dentry information, status = %d\n", status);
}
break;
}
@ -277,10 +292,14 @@ int handler_thread(void *data)
status = -EINVAL;
CHECK_PTR(path, err);
CHECK_PTR(mars_global, err);
CHECK_PTR_NULL(mars_global, err);
CHECK_PTR(_bio_brick_type, err);
//prev = mars_find_brick(mars_global, NULL, cmd.cmd_str1);
if (!mars_global->global_power.button) {
MARS_WRN("system is not alive\n");
goto err;
}
prev = make_brick_all(
mars_global,
NULL,
@ -321,38 +340,48 @@ int handler_thread(void *data)
break;
}
kernel_sock_shutdown(*sock, SHUT_WR);
//sock_release(*sock);
//cleanup_mm();
mars_shutdown_socket(sock);
done:
MARS_DBG("handler_thread terminating, status = %d\n", status);
MARS_INF("stopping thread...\n");
kthread_stop(cb_thread);
wait_event_interruptible_timeout(
brick->startup_event,
!brick->cb_running,
10 * HZ);
if (cb_thread) {
MARS_INF("stopping cb thread...\n");
kthread_stop(cb_thread);
wait_event_interruptible_timeout(
brick->startup_event,
!brick->cb_running,
10 * HZ);
put_task_struct(cb_thread);
}
_clean_list(brick, &brick->cb_read_list);
_clean_list(brick, &brick->cb_write_list);
do {
int status = mars_kill_brick((void*)brick);
if (status >= 0) {
//if(*sock)
//sock_release(*sock);
break;
}
if (status >= 0 || max_round-- < 0) {
MARS_INF("not dead, giving up....\n");
break;
}
msleep(1000);
} while (brick->cb_running && !brick->power.led_off);
MARS_DBG("cleaning up...\n");
MARS_DBG("done\n");
return 0;
h_thread = _grab_handler(brick);
mars_put_socket(sock);
/* Normally, the brick should be shut down from outside.
* In case the handler thread stops abnormally (e.g.
* shutdown of socket etc), it has to cleanup itself.
* This is an exception to the basic rule of instance orientation
* that execution logic should be cleanly separated from strategy
* logic.
* So be careful, avoid races by use of _grab_handler().
*/
if (h_thread) {
int status;
MARS_DBG("self cleanup...\n");
status = mars_kill_brick((void*)brick);
if (status < 0) {
BRICK_ERR("kill status = %d, giving up\n", status);
}
put_task_struct(h_thread);
}
MARS_DBG("done.\n");
return status;
}
////////////////// own brick / input / output operations //////////////////
@ -383,17 +412,61 @@ static void server_ref_io(struct server_output *output, struct mref_object *mref
static int server_switch(struct server_brick *brick)
{
int status = 0;
if (brick->power.button) {
static int version = 0;
struct task_struct *thread;
mars_power_led_off((void*)brick, false);
MARS_INF("starting.....");
mars_power_led_on((void*)brick, true);
spin_lock(&server_lock);
list_add(&brick->server_link, &server_list);
spin_unlock(&server_lock);
thread = kthread_create(cb_thread, brick, "mars_cb%d", version);
if (IS_ERR(thread)) {
status = PTR_ERR(thread);
MARS_ERR("cannot create cb thread, status = %ld\n", status);
goto err;
}
get_task_struct(thread);
brick->cb_thread = thread;
wake_up_process(thread);
thread = kthread_create(handler_thread, brick, "mars_handler%d", version++);
if (IS_ERR(thread)) {
status = PTR_ERR(thread);
MARS_ERR("cannot create handler thread, status = %ld\n", status);
kthread_stop(brick->cb_thread);
goto err;
}
get_task_struct(thread);
brick->handler_thread = thread;
wake_up_process(thread);
wait_event_interruptible(brick->startup_event, brick->cb_thread == NULL);
err:
if (status >= 0) {
mars_power_led_on((void*)brick, true);
}
} else {
struct task_struct *thread;
mars_power_led_on((void*)brick, false);
mars_power_led_off((void*)brick, true);
thread = _grab_handler(brick);
if (thread) {
brick->handler_thread = NULL;
MARS_INF("stopping handler thread....\n");
kthread_stop(thread);
put_task_struct(thread);
} else {
MARS_WRN("handler thread does not exist\n");
mars_power_led_off((void*)brick, true);
}
}
return 0;
return status;
}
//////////////// object / aspect constructors / destructors ///////////////
@ -419,6 +492,7 @@ static int server_brick_construct(struct server_brick *brick)
{
struct server_output *hidden = &brick->hidden_output;
_server_output_init(brick, hidden, "internal");
INIT_LIST_HEAD(&brick->server_link);
init_waitqueue_head(&brick->startup_event);
init_waitqueue_head(&brick->cb_event);
sema_init(&brick->socket_sem, 1);
@ -490,7 +564,6 @@ EXPORT_SYMBOL_GPL(server_brick_type);
static int _server_thread(void *data)
{
char *id = my_id();
int version = 0;
int status = 0;
//fake_mm();
@ -498,80 +571,91 @@ static int _server_thread(void *data)
MARS_INF("-------- server starting on host '%s' ----------\n", id);
while (!kthread_should_stop()) {
int size;
struct server_brick *brick;
struct task_struct *thread;
struct socket *new_socket = NULL;
int status;
status = kernel_accept(server_socket, &new_socket, O_NONBLOCK);
if (status < 0) {
struct mars_socket *new_socket;
new_socket = mars_accept_socket(server_socket, false);
if (IS_ERR(new_socket)) {
status = PTR_ERR(new_socket);
new_socket = NULL;
msleep(500);
if (status == -EAGAIN)
continue; // without error message
MARS_ERR("accept status = %d\n", status);
continue;
}
if (!new_socket) {
MARS_ERR("got no socket\n");
msleep(3000);
MARS_WRN("accept status = %d\n", status);
msleep(4000);
continue;
}
MARS_DBG("got new connection %p\n", new_socket);
/* TODO: check authorization.
*/
size = server_brick_type.brick_size +
(server_brick_type.max_inputs + server_brick_type.max_outputs) * sizeof(void*) +
sizeof(struct server_input),
brick = brick_zmem_alloc(size);
if (!mars_global || !mars_global->global_power.button) {
MARS_WRN("system is not alive\n");
goto err;
}
#if 1
brick = (void*)mars_make_brick(mars_global, NULL, &server_brick_type, "test", "test");
if (!brick) {
MARS_ERR("cannot create server instance\n");
goto err;
}
#else // old code, remove ASAP
{
int size = server_brick_type.brick_size +
(server_brick_type.max_inputs + server_brick_type.max_outputs) * sizeof(void*) +
sizeof(struct server_input),
brick = brick_zmem_alloc(size);
}
if (!brick) {
MARS_ERR("cannot allocate server instance\n");
goto err;
}
status = generic_brick_init_full(brick, size, (void*)&server_brick_type, NULL, NULL, NULL);
if (status) {
if (status < 0) {
MARS_ERR("cannot init server brick, status = %d\n", status);
goto err;
}
#endif
brick->handler_socket = new_socket;
thread = kthread_create(cb_thread, brick, "mars_cb%d", version);
if (IS_ERR(thread)) {
MARS_ERR("cannot create cb thread, status = %ld\n", PTR_ERR(thread));
brick->power.button = true;
status = server_switch(brick);
if (status < 0) {
MARS_ERR("cannot switch on server brick, status = %d\n", status);
goto err;
}
brick->cb_thread = thread;
wake_up_process(thread);
thread = kthread_create(handler_thread, brick, "mars_handler%d", version++);
if (IS_ERR(thread)) {
MARS_ERR("cannot create handler thread, status = %ld\n", PTR_ERR(thread));
goto err;
}
brick->handler_thread = thread;
wake_up_process(thread);
wait_event_interruptible(brick->startup_event, brick->handler_thread == NULL && brick->cb_thread == NULL);
continue;
err:
if (new_socket) {
kernel_sock_shutdown(new_socket, SHUT_WR);
//sock_release(new_socket);
mars_put_socket(new_socket);
}
msleep(3000);
}
MARS_INF("-------- cleaning up ----------\n");
spin_lock(&server_lock);
while (!list_empty(&server_list)) {
struct list_head *tmp = server_list.next;
struct server_brick *brick = container_of(tmp, struct server_brick, server_link);
list_del_init(tmp);
spin_unlock(&server_lock);
brick->power.button = false;
status = server_switch(brick);
spin_lock(&server_lock);
}
spin_unlock(&server_lock);
//cleanup_mm();
MARS_INF("-------- done status = %d ----------\n", status);
server_thread = NULL;
return status;
}
@ -585,22 +669,22 @@ int __init init_mars_server(void)
MARS_INF("init_server()\n");
#if 0
#if 1
status = mars_create_sockaddr(&sockaddr, "");
if (status < 0)
return status;
status = mars_create_socket(&server_socket, &sockaddr, true);
if (status < 0)
return status;
status = kernel_listen(server_socket, 100);
if (status < 0)
server_socket = mars_create_socket(&sockaddr, true);
if (unlikely(IS_ERR(server_socket))) {
status = PTR_ERR(server_socket);
server_socket = NULL;
return status;
}
thread = kthread_create(_server_thread, NULL, "mars_server");
if (IS_ERR(thread)) {
return PTR_ERR(thread);
status = PTR_ERR(thread);
return status;
}
get_task_struct(thread);
@ -617,12 +701,12 @@ void __exit exit_mars_server(void)
server_unregister_brick_type();
if (server_thread) {
if (server_socket) {
kernel_sock_shutdown(server_socket, SHUT_WR);
mars_shutdown_socket(server_socket);
}
MARS_INF("stopping thread...\n");
MARS_INF("stopping server thread...\n");
kthread_stop(server_thread);
if (server_socket) {
//sock_release(server_socket);
mars_put_socket(server_socket);
server_socket = NULL;
}
put_task_struct(server_thread);

View File

@ -6,14 +6,9 @@
#include "mars_net.h"
//extern struct socket *server_socket;
//extern struct task_struct *server_thread;
//extern wait_queue_head_t server_event;
struct server_mref_aspect {
GENERIC_ASPECT(mref);
struct server_brick *brick;
struct socket **sock;
struct list_head cb_head;
};
@ -23,9 +18,10 @@ struct server_output {
struct server_brick {
MARS_BRICK(server);
struct list_head server_link;
atomic_t in_flight;
struct socket *handler_socket;
struct semaphore socket_sem;
struct mars_socket *handler_socket;
struct task_struct *handler_thread;
struct task_struct *cb_thread;
wait_queue_head_t startup_event;

View File

@ -2134,7 +2134,7 @@ void trans_logger_log(struct trans_logger_output *output)
#if 1 // provisionary flood handling FIXME: do better
delay_callers =
(atomic_read(&brick->mshadow_count) > brick->shadow_mem_limit && brick->shadow_mem_limit > 1) ||
(atomic64_read(&brick->shadow_mem_used) > mars_global_memlimit && mars_global_memlimit > 1);
(atomic64_read(&brick->shadow_mem_used) > brick_global_memlimit && brick_global_memlimit > 1);
if (delay_callers != brick->delay_callers) {
//MARS_INF("stalling %d -> %d\n", brick->delay_callers, delay_callers);
brick->delay_callers = delay_callers;
@ -2143,7 +2143,7 @@ void trans_logger_log(struct trans_logger_output *output)
if (unlimited) {
unlimited =
(atomic_read(&brick->mshadow_count) > brick->shadow_mem_limit * 3 / 8 && brick->shadow_mem_limit > 1) ||
(atomic64_read(&brick->shadow_mem_used) > mars_global_memlimit * 3 / 8 && mars_global_memlimit > 1);
(atomic64_read(&brick->shadow_mem_used) > brick_global_memlimit * 3 / 8 && brick_global_memlimit > 1);
if (!unlimited) {
brick->q_phase2.q_unlimited = false;
brick->q_phase3.q_unlimited = false;
@ -2155,7 +2155,7 @@ void trans_logger_log(struct trans_logger_output *output)
} else {
unlimited =
(atomic_read(&brick->mshadow_count) > brick->shadow_mem_limit / 2 && brick->shadow_mem_limit > 1) ||
(atomic64_read(&brick->shadow_mem_used) > mars_global_memlimit / 2 && mars_global_memlimit > 1);
(atomic64_read(&brick->shadow_mem_used) > brick_global_memlimit / 2 && brick_global_memlimit > 1);
if (unlimited) {
brick->q_phase2.q_unlimited = unlimited;
brick->q_phase3.q_unlimited = unlimited;
@ -2519,7 +2519,7 @@ char *trans_logger_statistics(struct trans_logger_brick *brick, int verbose)
brick->do_replay, brick->do_continuous_replay, brick->replay_code, brick->log_reads,
brick->log_start_pos, brick->replay_start_pos, brick->replay_end_pos, brick->current_pos,
atomic_read(&brick->total_replay_count), atomic_read(&brick->total_cb_count), atomic_read(&brick->total_read_count), atomic_read(&brick->total_write_count), atomic_read(&brick->total_flush_count), atomic_read(&brick->total_write_count) ? atomic_read(&brick->total_flush_count) * 100 / atomic_read(&brick->total_write_count) : 0, atomic_read(&brick->total_writeback_cluster_count), atomic_read(&brick->total_writeback_count), atomic_read(&brick->total_writeback_cluster_count) ? atomic_read(&brick->total_writeback_count) * 100 / atomic_read(&brick->total_writeback_cluster_count) : 0, atomic_read(&brick->total_shortcut_count), atomic_read(&brick->total_writeback_count) ? atomic_read(&brick->total_shortcut_count) * 100 / atomic_read(&brick->total_writeback_count) : 0, atomic_read(&brick->total_mshadow_count), atomic_read(&brick->total_sshadow_count), atomic_read(&brick->total_round_count), atomic_read(&brick->total_restart_count), atomic_read(&brick->q_phase1.q_total), atomic_read(&brick->q_phase2.q_total), atomic_read(&brick->q_phase3.q_total), atomic_read(&brick->q_phase4.q_total),
atomic64_read(&brick->shadow_mem_used), mars_global_memlimit, atomic_read(&brick->replay_count), atomic_read(&brick->mshadow_count), brick->shadow_mem_limit, atomic_read(&brick->sshadow_count), atomic_read(&brick->hash_count), atomic_read(&brick->pos_count), atomic_read(&brick->sub_balance_count), atomic_read(&brick->inner_balance_count), atomic_read(&brick->outer_balance_count), atomic_read(&brick->wb_balance_count), atomic_read(&brick->fly_count), atomic_read(&brick->q_phase1.q_queued), atomic_read(&brick->q_phase1.q_flying), atomic_read(&brick->q_phase2.q_queued), atomic_read(&brick->q_phase2.q_flying), atomic_read(&brick->q_phase3.q_queued), atomic_read(&brick->q_phase3.q_flying), atomic_read(&brick->q_phase4.q_queued), atomic_read(&brick->q_phase4.q_flying));
atomic64_read(&brick->shadow_mem_used), brick_global_memlimit, atomic_read(&brick->replay_count), atomic_read(&brick->mshadow_count), brick->shadow_mem_limit, atomic_read(&brick->sshadow_count), atomic_read(&brick->hash_count), atomic_read(&brick->pos_count), atomic_read(&brick->sub_balance_count), atomic_read(&brick->inner_balance_count), atomic_read(&brick->outer_balance_count), atomic_read(&brick->wb_balance_count), atomic_read(&brick->fly_count), atomic_read(&brick->q_phase1.q_queued), atomic_read(&brick->q_phase1.q_flying), atomic_read(&brick->q_phase2.q_queued), atomic_read(&brick->q_phase2.q_flying), atomic_read(&brick->q_phase3.q_queued), atomic_read(&brick->q_phase3.q_flying), atomic_read(&brick->q_phase4.q_queued), atomic_read(&brick->q_phase4.q_flying));
return res;
}

View File

@ -5,6 +5,16 @@
//#define IO_DEBUGGING
//#define STAT_DEBUGGING // here means: display full statistics
// disable this only for debugging!
#define RUN_PEERS
#define RUN_DATA
#define RUN_LOGINIT
#define RUN_PRIMARY
#define RUN_SYNCSTATUS
#define RUN_LOGFILES
#define RUN_REPLAY
#define RUN_DEVICE
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/string.h>
@ -491,7 +501,7 @@ struct mars_peerinfo {
struct mars_global *global;
char *peer;
char *path;
struct socket *socket;
struct mars_socket *socket;
struct task_struct *peer_thread;
spinlock_t lock;
struct list_head remote_dent_list;
@ -789,13 +799,30 @@ int run_bones(struct mars_peerinfo *peer)
// remote working infrastructure
static
void _peer_cleanup(struct mars_peerinfo *peer)
void _peer_cleanup(struct mars_peerinfo *peer, bool do_stop)
{
unsigned long flags;
MARS_DBG("\n");
if (peer->socket) {
kernel_sock_shutdown(peer->socket, SHUT_WR);
mars_shutdown_socket(peer->socket);
}
if (do_stop && peer->peer_thread) {
kthread_stop(peer->peer_thread);
put_task_struct(peer->peer_thread);
peer->peer_thread = NULL;
}
if (peer->socket) {
mars_put_socket(peer->socket);
peer->socket = NULL;
}
//...
if (do_stop) {
traced_lock(&peer->lock, flags);
mars_free_dent_all(&peer->remote_dent_list);
traced_unlock(&peer->lock, flags);
brick_string_free(peer->peer);
brick_string_free(peer->path);
}
}
static
@ -831,8 +858,9 @@ int remote_thread(void *data)
};
if (!peer->socket) {
status = mars_create_socket(&peer->socket, &sockaddr, false);
if (unlikely(status < 0)) {
peer->socket = mars_create_socket(&sockaddr, false);
if (unlikely(IS_ERR(peer->socket))) {
status = PTR_ERR(peer->socket);
peer->socket = NULL;
MARS_INF("no connection to '%s'\n", real_peer);
msleep(5000);
@ -842,18 +870,18 @@ int remote_thread(void *data)
continue;
}
status = mars_send_struct(&peer->socket, &cmd, mars_cmd_meta);
status = mars_send_struct(peer->socket, &cmd, mars_cmd_meta);
if (unlikely(status < 0)) {
MARS_ERR("communication error on send, status = %d\n", status);
_peer_cleanup(peer);
MARS_WRN("communication error on send, status = %d\n", status);
_peer_cleanup(peer, false);
msleep(2000);
continue;
}
status = mars_recv_dent_list(&peer->socket, &tmp_list);
status = mars_recv_dent_list(peer->socket, &tmp_list);
if (unlikely(status < 0)) {
MARS_ERR("communication error on receive, status = %d\n", status);
_peer_cleanup(peer);
MARS_WRN("communication error on receive, status = %d\n", status);
_peer_cleanup(peer, false);
msleep(5000);
continue;
}
@ -879,11 +907,10 @@ int remote_thread(void *data)
MARS_INF("-------- remote thread terminating\n");
_peer_cleanup(peer);
_peer_cleanup(peer, false);
done:
//cleanup_mm();
peer->peer_thread = NULL;
brick_string_free(real_peer);
return 0;
}
@ -903,17 +930,11 @@ static int _kill_peer(void *buf, struct mars_dent *dent)
if (!peer) {
return 0;
}
MARS_DBG("killing peer thread....\n");
if (!peer->peer_thread) {
MARS_ERR("oops, remote thread is not running - doing cleanup myself\n");
_peer_cleanup(peer);
dent->d_private = NULL;
return -1;
}
MARS_INF("stopping thread...\n");
kthread_stop(peer->peer_thread);
MARS_INF("stopping peer thread...\n");
_peer_cleanup(peer, true);
dent->d_private = NULL;
brick_mem_free(peer);
return 0;
}
@ -943,8 +964,8 @@ static int _make_peer(struct mars_global *global, struct mars_dent *dent, char *
peer = dent->d_private;
peer->global = global;
peer->peer = mypeer;
peer->path = path;
peer->peer = brick_strdup(mypeer);
peer->path = brick_strdup(path);
peer->maxdepth = 2;
spin_lock_init(&peer->lock);
INIT_LIST_HEAD(&peer->remote_dent_list);
@ -960,6 +981,7 @@ static int _make_peer(struct mars_global *global, struct mars_dent *dent, char *
return -1;
}
MARS_DBG("starting peer thread\n");
get_task_struct(peer->peer_thread);
wake_up_process(peer->peer_thread);
}
@ -1542,7 +1564,7 @@ int _check_logging_status(struct mars_rotate *rot, long long *oldpos_start, long
parent = dent->d_parent;
CHECK_PTR(parent, done);
global = rot->global;
CHECK_PTR(global, done);
CHECK_PTR_NULL(global, done);
CHECK_PTR(rot->replay_link, done);
CHECK_PTR(rot->aio_brick, done);
@ -1616,7 +1638,7 @@ int _make_logging_status(struct mars_rotate *rot)
parent = dent->d_parent;
CHECK_PTR(parent, done);
global = rot->global;
CHECK_PTR(global, done);
CHECK_PTR_NULL(global, done);
status = 0;
trans_brick = rot->trans_brick;
@ -2402,10 +2424,10 @@ static const struct light_class light_classes[] = {
.cl_len = 3,
.cl_type = 'l',
.cl_father = CL_IPS,
#if 1
#ifdef RUN_PEERS
.cl_forward = make_scan,
.cl_backward = kill_scan,
#endif
.cl_backward = kill_scan,
},
/* Directory containing all items of a resource
@ -2415,8 +2437,6 @@ static const struct light_class light_classes[] = {
.cl_len = 9,
.cl_type = 'd',
.cl_father = CL_ROOT,
.cl_forward = NULL,
.cl_backward = NULL,
},
/* Subdirectory for defaults...
@ -2427,8 +2447,6 @@ static const struct light_class light_classes[] = {
.cl_type = 'd',
.cl_hostcontext = false,
.cl_father = CL_RESOURCE,
.cl_forward = NULL,
.cl_backward = NULL,
},
[CL_DEFAULTS] = {
.cl_name = "defaults-",
@ -2436,8 +2454,6 @@ static const struct light_class light_classes[] = {
.cl_type = 'd',
.cl_hostcontext = false,
.cl_father = CL_RESOURCE,
.cl_forward = NULL,
.cl_backward = NULL,
},
/* ... and its contents
*/
@ -2446,16 +2462,12 @@ static const struct light_class light_classes[] = {
.cl_len = 0, // catch any
.cl_type = 'l',
.cl_father = CL_DEFAULTS0,
.cl_forward = NULL,
.cl_backward = NULL,
},
[CL_DEFAULTS_ITEMS] = {
.cl_name = "",
.cl_len = 0, // catch any
.cl_type = 'l',
.cl_father = CL_DEFAULTS,
.cl_forward = NULL,
.cl_backward = NULL,
},
/* Subdirectory for controlling items...
@ -2466,8 +2478,6 @@ static const struct light_class light_classes[] = {
.cl_type = 'd',
.cl_hostcontext = false,
.cl_father = CL_RESOURCE,
.cl_forward = NULL,
.cl_backward = NULL,
},
/* ... and its contents
*/
@ -2476,8 +2486,6 @@ static const struct light_class light_classes[] = {
.cl_len = 0, // catch any
.cl_type = 'l',
.cl_father = CL_SWITCH,
.cl_forward = NULL,
.cl_backward = NULL,
},
/* Subdirectory for actual state
@ -2488,8 +2496,6 @@ static const struct light_class light_classes[] = {
.cl_type = 'd',
.cl_hostcontext = false,
.cl_father = CL_RESOURCE,
.cl_forward = NULL,
.cl_backward = NULL,
},
/* ... and its contents
*/
@ -2498,8 +2504,6 @@ static const struct light_class light_classes[] = {
.cl_len = 0, // catch any
.cl_type = 'l',
.cl_father = CL_ACTUAL,
.cl_forward = NULL,
.cl_backward = NULL,
},
@ -2511,8 +2515,6 @@ static const struct light_class light_classes[] = {
.cl_type = 'l',
.cl_hostcontext = false, // not used here
.cl_father = CL_RESOURCE,
.cl_forward = NULL,
.cl_backward = NULL,
},
/* Symlink indicating the necessary length of logfile replay
*/
@ -2522,8 +2524,6 @@ static const struct light_class light_classes[] = {
.cl_type = 'l',
.cl_hostcontext = false, // not used here
.cl_father = CL_RESOURCE,
.cl_forward = NULL,
.cl_backward = NULL,
},
/* File or symlink to the real device / real (sparse) file
* when hostcontext is missing, the corresponding peer will
@ -2535,7 +2535,9 @@ static const struct light_class light_classes[] = {
.cl_type = 'F',
.cl_hostcontext = true,
.cl_father = CL_RESOURCE,
#ifdef RUN_DATA
.cl_forward = make_bio,
#endif
.cl_backward = kill_any,
},
/* Symlink indicating the (common) size of the resource
@ -2546,7 +2548,9 @@ static const struct light_class light_classes[] = {
.cl_type = 'l',
.cl_hostcontext = false,
.cl_father = CL_RESOURCE,
#ifdef RUN_LOGINIT
.cl_forward = make_log_init,
#endif
.cl_backward = kill_any,
},
/* Symlink pointing to the name of the primary node
@ -2557,7 +2561,9 @@ static const struct light_class light_classes[] = {
.cl_type = 'l',
.cl_hostcontext = false,
.cl_father = CL_RESOURCE,
#ifdef RUN_PRIMARY
.cl_forward = make_primary,
#endif
.cl_backward = NULL,
},
/* Only for testing: open local file
@ -2581,7 +2587,9 @@ static const struct light_class light_classes[] = {
.cl_type = 'l',
.cl_hostcontext = true,
.cl_father = CL_RESOURCE,
#ifdef RUN_SYNCSTATUS
.cl_forward = make_sync,
#endif
.cl_backward = kill_any,
},
/* Only for testing: make a copy instance
@ -2618,8 +2626,6 @@ static const struct light_class light_classes[] = {
.cl_serial = true,
.cl_hostcontext = false,
.cl_father = CL_RESOURCE,
.cl_forward = NULL,
.cl_backward = NULL,
},
/* Logfiles for transaction logger
*/
@ -2630,10 +2636,10 @@ static const struct light_class light_classes[] = {
.cl_serial = true,
.cl_hostcontext = true,
.cl_father = CL_RESOURCE,
#if 1
#ifdef RUN_LOGFILES
.cl_forward = make_log_step,
.cl_backward = kill_any,
#endif
.cl_backward = kill_any,
},
/* Symlink indicating the last state of
* transaction log replay.
@ -2644,7 +2650,9 @@ static const struct light_class light_classes[] = {
.cl_type = 'l',
.cl_hostcontext = true,
.cl_father = CL_RESOURCE,
#ifdef RUN_REPLAY
.cl_forward = make_replay,
#endif
.cl_backward = kill_any,
},
@ -2656,7 +2664,9 @@ static const struct light_class light_classes[] = {
.cl_type = 'l',
.cl_hostcontext = true,
.cl_father = CL_RESOURCE,
#ifdef RUN_DEVICE
.cl_forward = make_dev,
#endif
.cl_backward = kill_any,
},
{}
@ -2859,6 +2869,37 @@ void _show_status(struct mars_global *global)
}
#ifdef STAT_DEBUGGING
static
void _show_one(struct mars_brick *test, int *brick_count)
{
int i;
if (*brick_count) {
MARS_STAT("---------\n");
}
MARS_STAT("BRICK type = %s path = '%s' name = '%s' level = %d button = %d off = %d on = %d\n", SAFE_STR(test->type->type_name), SAFE_STR(test->brick_path), SAFE_STR(test->brick_name), test->status_level, test->power.button, test->power.led_off, test->power.led_on);
(*brick_count)++;
if (test->ops && test->ops->brick_statistics) {
char *info = test->ops->brick_statistics(test, 0);
if (info) {
MARS_STAT(" %s", info);
brick_string_free(info);
}
}
for (i = 0; i < test->nr_inputs; i++) {
struct mars_input *input = test->inputs[i];
struct mars_output *output = input ? input->connect : NULL;
if (output) {
MARS_STAT(" input %d connected with %s path = '%s' name = '%s'\n", i, SAFE_STR(output->brick->type->type_name), SAFE_STR(output->brick->brick_path), SAFE_STR(output->brick->brick_name));
} else {
MARS_STAT(" input %d not connected\n", i);
}
}
for (i = 0; i < test->nr_outputs; i++) {
struct mars_output *output = test->outputs[i];
MARS_STAT(" output %d nr_connected = %d\n", i, output->nr_connected);
}
}
static
void _show_statist(struct mars_global *global)
{
@ -2868,37 +2909,18 @@ void _show_statist(struct mars_global *global)
brick_mem_statistics();
MARS_STAT("================================== bricks:\n");
down_read(&global->brick_mutex);
MARS_STAT("================================== ordinary bricks:\n");
for (tmp = global->brick_anchor.next; tmp != &global->brick_anchor; tmp = tmp->next) {
struct mars_brick *test;
int i;
test = container_of(tmp, struct mars_brick, global_brick_link);
if (brick_count) {
MARS_STAT("---------\n");
}
MARS_STAT("BRICK type = %s path = '%s' name = '%s' level = %d button = %d off = %d on = %d\n", test->type->type_name, test->brick_path, test->brick_name, test->status_level, test->power.button, test->power.led_off, test->power.led_on);
brick_count++;
if (test->ops && test->ops->brick_statistics) {
char *info = test->ops->brick_statistics(test, 0);
if (info) {
MARS_STAT(" %s", info);
brick_string_free(info);
}
}
for (i = 0; i < test->nr_inputs; i++) {
struct mars_input *input = test->inputs[i];
struct mars_output *output = input ? input->connect : NULL;
if (output) {
MARS_STAT(" input %d connected with %s path = '%s' name = '%s'\n", i, output->brick->type->type_name, output->brick->brick_path, output->brick->brick_name);
} else {
MARS_STAT(" input %d not connected\n", i);
}
}
for (i = 0; i < test->nr_outputs; i++) {
struct mars_output *output = test->outputs[i];
MARS_STAT(" output %d nr_connected = %d\n", i, output->nr_connected);
}
_show_one(test, &brick_count);
}
MARS_STAT("================================== server bricks:\n");
for (tmp = global->server_anchor.next; tmp != &global->server_anchor; tmp = tmp->next) {
struct mars_brick *test;
test = container_of(tmp, struct mars_brick, global_brick_link);
_show_one(test, &brick_count);
}
up_read(&global->brick_mutex);
@ -2908,12 +2930,12 @@ void _show_statist(struct mars_global *global)
struct mars_dent *dent;
struct list_head *sub;
dent = container_of(tmp, struct mars_dent, dent_link);
MARS_STAT("dent %d '%s' '%s' stamp=%ld.%09ld\n", dent->d_class, dent->d_path, dent->new_link ? dent->new_link : "", dent->new_stat.mtime.tv_sec, dent->new_stat.mtime.tv_nsec);
MARS_STAT("dent %d '%s' '%s' stamp=%ld.%09ld\n", dent->d_class, SAFE_STR(dent->d_path), SAFE_STR(dent->new_link), dent->new_stat.mtime.tv_sec, dent->new_stat.mtime.tv_nsec);
dent_count++;
for (sub = dent->brick_list.next; sub != &dent->brick_list; sub = sub->next) {
struct mars_brick *test;
test = container_of(sub, struct mars_brick, dent_brick_link);
MARS_STAT(" owner of brick '%s'\n", test->brick_path);
MARS_STAT(" owner of brick '%s'\n", SAFE_STR(test->brick_path));
}
}
up_read(&global->dent_mutex);
@ -2922,21 +2944,23 @@ void _show_statist(struct mars_global *global)
}
#endif
static struct mars_global _global = {
.dent_anchor = LIST_HEAD_INIT(_global.dent_anchor),
.brick_anchor = LIST_HEAD_INIT(_global.brick_anchor),
.server_anchor = LIST_HEAD_INIT(_global.server_anchor),
.global_power = {
.button = true,
},
.dent_mutex = __RWSEM_INITIALIZER(_global.dent_mutex),
.brick_mutex = __RWSEM_INITIALIZER(_global.brick_mutex),
.main_event = __WAIT_QUEUE_HEAD_INITIALIZER(_global.main_event),
};
static int light_thread(void *data)
{
char *id = my_id();
int status = 0;
struct mars_global global = {
.dent_anchor = LIST_HEAD_INIT(global.dent_anchor),
.brick_anchor = LIST_HEAD_INIT(global.brick_anchor),
.global_power = {
.button = true,
},
.dent_mutex = __RWSEM_INITIALIZER(global.dent_mutex),
.brick_mutex = __RWSEM_INITIALIZER(global.brick_mutex),
.main_event = __WAIT_QUEUE_HEAD_INITIALIZER(global.main_event),
};
mars_global = &global; // TODO: cleanup, avoid stack
mars_global = &_global;
if (!id || strlen(id) < 2) {
MARS_ERR("invalid hostname\n");
@ -2948,32 +2972,38 @@ static int light_thread(void *data)
MARS_INF("-------- starting as host '%s' ----------\n", id);
while (global.global_power.button || !list_empty(&global.brick_anchor)) {
while (_global.global_power.button || !list_empty(&_global.brick_anchor)) {
int status;
global.global_power.button = !kthread_should_stop();
_global.global_power.button = !kthread_should_stop();
status = mars_dent_work(&global, "/mars", sizeof(struct mars_dent), light_checker, light_worker, &global, 3);
if (!_global.global_power.button) {
mars_kill_brick_all(&_global, &_global.server_anchor, false);
}
status = mars_dent_work(&_global, "/mars", sizeof(struct mars_dent), light_checker, light_worker, &_global, 3);
MARS_DBG("worker status = %d\n", status);
_show_status(&global);
_show_status(&_global);
#ifdef STAT_DEBUGGING
_show_statist(&global);
_show_statist(&_global);
#endif
msleep(500);
wait_event_interruptible_timeout(global.main_event, global.main_trigger, 10 * HZ);
global.main_trigger = false;
wait_event_interruptible_timeout(_global.main_event, _global.main_trigger, 10 * HZ);
_global.main_trigger = false;
}
done:
MARS_INF("-------- cleaning up ----------\n");
mars_free_dent_all(&global.dent_anchor);
mars_kill_brick_all(&_global, &_global.server_anchor, false);
mars_free_dent_all(&_global.dent_anchor);
mars_kill_brick_all(&_global, &_global.brick_anchor, false);
_show_status(&global);
_show_status(&_global);
#ifdef STAT_DEBUGGING
_show_statist(&global);
_show_statist(&_global);
#endif
mars_global = NULL;

View File

@ -99,7 +99,7 @@ int __init init_mars_proc(void)
MARS_INF("init_proc()\n");
#if 0
#if 1
header = register_sysctl_table(mars_table);
#endif

View File

@ -51,6 +51,7 @@ struct mars_global {
struct generic_switch global_power;
struct list_head dent_anchor;
struct list_head brick_anchor;
struct list_head server_anchor;
volatile bool main_trigger;
wait_queue_head_t main_event;
//void *private;
@ -71,6 +72,7 @@ extern struct mars_brick *mars_find_brick(struct mars_global *global, const void
extern struct mars_brick *mars_make_brick(struct mars_global *global, struct mars_dent *belongs, const void *_brick_type, const char *path, const char *name);
extern int mars_free_brick(struct mars_brick *brick);
extern int mars_kill_brick(struct mars_brick *brick);
extern int mars_kill_brick_all(struct mars_global *global, struct list_head *anchor, bool use_dent_link);
// mid-level brick instantiation (identity is based on path strings)

View File

@ -636,13 +636,7 @@ EXPORT_SYMBOL_GPL(mars_find_dent);
void mars_kill_dent(struct mars_dent *dent)
{
dent->d_killme = true;
while (!list_empty(&dent->brick_list)) {
struct list_head *tmp = dent->brick_list.next;
struct mars_brick *brick = container_of(tmp, struct mars_brick, dent_brick_link);
list_del_init(tmp);
// note: locking is now done there....
mars_kill_brick(brick);
}
mars_kill_brick_all(NULL, &dent->brick_list, true);
}
EXPORT_SYMBOL_GPL(mars_kill_dent);
@ -664,7 +658,7 @@ void mars_free_dent(struct mars_dent *dent)
brick_string_free(dent->d_path);
brick_string_free(dent->old_link);
brick_string_free(dent->new_link);
//brick_mem_free(dent->d_private);
brick_mem_free(dent->d_private);
brick_mem_free(dent);
}
EXPORT_SYMBOL_GPL(mars_free_dent);
@ -757,12 +751,16 @@ int mars_free_brick(struct mars_brick *brick)
}
}
#ifndef MEMLEAK // TODO: check whether crash remains possible
MARS_DBG("deallocate name = '%s' path = '%s'\n", SAFE_STR(brick->brick_name), SAFE_STR(brick->brick_path));
brick_string_free(brick->brick_name);
brick_string_free(brick->brick_path);
#endif
status = generic_brick_exit_full((void*)brick);
if (status >= 0) {
#ifndef MEMLEAK // TODO: check whether crash remains possible
brick_string_free(brick->brick_name);
brick_string_free(brick->brick_path);
brick_mem_free(brick);
#endif
mars_trigger();
@ -778,7 +776,7 @@ EXPORT_SYMBOL_GPL(mars_free_brick);
struct mars_brick *mars_make_brick(struct mars_global *global, struct mars_dent *belongs, const void *_brick_type, const char *path, const char *_name)
{
const char *name = brick_strdup(_name);
const char *names[] = { name };
const char *names[] = { name, NULL };
const struct generic_brick_type *brick_type = _brick_type;
const struct generic_input_type **input_types;
const struct generic_output_type **output_types;
@ -846,9 +844,11 @@ struct mars_brick *mars_make_brick(struct mars_global *global, struct mars_dent
* Switching on / etc must be done separately.
*/
down_write(&global->brick_mutex);
list_add(&res->global_brick_link, &global->brick_anchor);
if (belongs) {
list_add(&res->global_brick_link, &global->brick_anchor);
list_add(&res->dent_brick_link, &belongs->brick_list);
} else {
list_add(&res->global_brick_link, &global->server_anchor);
}
up_write(&global->brick_mutex);
@ -871,17 +871,18 @@ int mars_kill_brick(struct mars_brick *brick)
CHECK_PTR(brick, done);
global = brick->global;
CHECK_PTR(global, done);
MARS_DBG("===> killing brick path = '%s' name = '%s'\n", brick->brick_path, brick->brick_name);
MARS_DBG("===> killing brick path = '%s' name = '%s'\n", SAFE_STR(brick->brick_path), SAFE_STR(brick->brick_name));
down_write(&global->brick_mutex);
list_del_init(&brick->global_brick_link);
list_del_init(&brick->dent_brick_link);
up_write(&global->brick_mutex);
if (global) {
down_write(&global->brick_mutex);
list_del_init(&brick->global_brick_link);
list_del_init(&brick->dent_brick_link);
up_write(&global->brick_mutex);
}
if (unlikely(brick->nr_outputs > 0 && brick->outputs[0] && brick->outputs[0]->nr_connected)) {
MARS_ERR("sorry, output is in use '%s'\n", brick->brick_path);
MARS_ERR("sorry, output is in use '%s'\n", SAFE_STR(brick->brick_path));
goto done;
}
@ -894,6 +895,36 @@ done:
}
EXPORT_SYMBOL_GPL(mars_kill_brick);
int mars_kill_brick_all(struct mars_global *global, struct list_head *anchor, bool use_dent_link)
{
int status = 0;
if (global) {
down_write(&global->brick_mutex);
}
while (!list_empty(anchor)) {
struct list_head *tmp = anchor->next;
struct mars_brick *brick;
if (use_dent_link) {
brick = container_of(tmp, struct mars_brick, dent_brick_link);
} else {
brick = container_of(tmp, struct mars_brick, global_brick_link);
}
list_del_init(tmp);
if (global) {
up_write(&global->brick_mutex);
}
status |= mars_kill_brick(brick);
if (global) {
down_write(&global->brick_mutex);
}
}
if (global) {
up_write(&global->brick_mutex);
}
return status;
}
EXPORT_SYMBOL_GPL(mars_kill_brick_all);
/////////////////////////////////////////////////////////////////////
@ -1064,7 +1095,7 @@ struct mars_brick *make_brick_all(
prev[i] = mars_find_brick(global, prev_brick_type[i], path);
if (!prev[i]) {
MARS_ERR("prev brick '%s' does not exist\n", path);
MARS_WRN("prev brick '%s' does not exist\n", path);
goto err;
}
MARS_DBG("------> predecessor %d path = '%s'\n", i, path);

View File

@ -38,7 +38,7 @@ done:
return brick_strdup(res);
}
int mars_send_dent_list(struct socket **sock, struct list_head *anchor)
int mars_send_dent_list(struct mars_socket *sock, struct list_head *anchor)
{
struct list_head *tmp;
struct mars_dent *dent;
@ -49,14 +49,14 @@ int mars_send_dent_list(struct socket **sock, struct list_head *anchor)
if (status < 0)
break;
}
if (status >= 0) { // send EOF
if (status >= 0) { // send EOR
status = mars_send_struct(sock, NULL, mars_dent_meta);
}
return status;
}
EXPORT_SYMBOL_GPL(mars_send_dent_list);
int mars_recv_dent_list(struct socket **sock, struct list_head *anchor)
int mars_recv_dent_list(struct mars_socket *sock, struct list_head *anchor)
{
int status;
for (;;) {