fixed network problems / races

reworked struct mars_socket for inplace usage instead of
dynmaic memory allocation (races were a nightmare)
This commit is contained in:
Thomas Schoebel-Theuer 2012-01-11 12:35:09 +01:00 committed by Thomas Schoebel-Theuer
parent cc3f4e9ac5
commit f6b1419176
7 changed files with 326 additions and 263 deletions

View File

@ -33,16 +33,13 @@ static void _kill_thread(struct client_threadinfo *ti)
static void _kill_socket(struct client_output *output)
{
if (output->socket) {
if (mars_socket_is_alive(&output->socket)) {
MARS_DBG("shutdown socket\n");
mars_shutdown_socket(output->socket);
mars_shutdown_socket(&output->socket);
}
_kill_thread(&output->receiver);
if (output->socket) {
MARS_DBG("close socket\n");
mars_put_socket(output->socket);
output->socket = NULL;
}
MARS_DBG("close socket\n");
mars_put_socket(&output->socket);
}
static int _request_info(struct client_output *output)
@ -53,7 +50,7 @@ static int _request_info(struct client_output *output)
int status;
MARS_DBG("\n");
status = mars_send_struct(output->socket, &cmd, mars_cmd_meta);
status = mars_send_struct(&output->socket, &cmd, mars_cmd_meta);
if (unlikely(status < 0)) {
MARS_DBG("send of getinfo failed, status = %d\n", status);
}
@ -85,24 +82,16 @@ static int _connect(struct client_output *output, const char *str)
*output->host++ = '\0';
}
_kill_socket(output);
status = mars_create_sockaddr(&sockaddr, output->host);
if (unlikely(status < 0)) {
MARS_DBG("no sockaddr, status = %d\n", status);
goto done;
}
output->socket = mars_create_socket(&sockaddr, false);
if (unlikely(IS_ERR(output->socket))) {
status = PTR_ERR(output->socket);
output->socket = NULL;
if (status == -EINPROGRESS) {
MARS_DBG("operation is in progress....\n");
goto really_done; // give it a chance next time
}
status = mars_create_socket(&output->socket, &sockaddr, false);
if (unlikely(status < 0)) {
MARS_DBG("no socket, status = %d\n", status);
goto done;
goto really_done;
}
output->receiver.thread = kthread_create(receiver_thread, output, "mars_receiver%d", thread_count++);
@ -123,7 +112,7 @@ static int _connect(struct client_output *output, const char *str)
.cmd_str1 = output->path,
};
status = mars_send_struct(output->socket, &cmd, mars_cmd_meta);
status = mars_send_struct(&output->socket, &cmd, mars_cmd_meta);
if (unlikely(status < 0)) {
MARS_DBG("send of connect failed, status = %d\n", status);
goto done;
@ -259,14 +248,14 @@ int receiver_thread(void *data)
struct client_output *output = data;
int status = 0;
while (status >= 0 && mars_socket_is_alive(output->socket) && !kthread_should_stop()) {
while (status >= 0 && mars_socket_is_alive(&output->socket) && !kthread_should_stop()) {
struct mars_cmd cmd = {};
struct list_head *tmp;
struct client_mref_aspect *mref_a = NULL;
struct mref_object *mref = NULL;
unsigned long flags;
status = mars_recv_struct(output->socket, &cmd, mars_cmd_meta);
status = mars_recv_struct(&output->socket, &cmd, mars_cmd_meta);
MARS_IO("got cmd = %d status = %d\n", cmd.cmd_code, status);
if (status < 0)
goto done;
@ -310,7 +299,7 @@ int receiver_thread(void *data)
MARS_IO("got callback id = %d, old pos = %lld len = %d rw = %d\n", mref->ref_id, mref->ref_pos, mref->ref_len, mref->ref_rw);
status = mars_recv_cb(output->socket, mref);
status = mars_recv_cb(&output->socket, mref);
MARS_IO("new status = %d, pos = %lld len = %d rw = %d\n", status, mref->ref_pos, mref->ref_len, mref->ref_rw);
if (status < 0) {
MARS_WRN("interrupted data transfer during callback, status = %d\n", status);
@ -330,7 +319,7 @@ int receiver_thread(void *data)
break;
}
case CMD_GETINFO:
status = mars_recv_struct(output->socket, &output->info, mars_info_meta);
status = mars_recv_struct(&output->socket, &output->info, mars_info_meta);
if (status < 0) {
MARS_WRN("got bad info from remote side, status = %d\n", status);
goto done;
@ -351,7 +340,7 @@ int receiver_thread(void *data)
MARS_WRN("receiver thread terminated with status = %d\n", status);
}
mars_shutdown_socket(output->socket);
mars_shutdown_socket(&output->socket);
output->receiver.terminated = true;
wake_up_interruptible(&output->receiver.run_event);
return status;
@ -377,6 +366,7 @@ static int sender_thread(void *data)
struct client_output *output = data;
struct client_brick *brick = output->brick;
unsigned long flags;
bool do_kill = false;
int status = 0;
output->receiver.restart_count = 0;
@ -387,13 +377,18 @@ static int sender_thread(void *data)
struct mref_object *mref;
bool do_resubmit = false;
if (unlikely(!output->socket)) {
if (unlikely(!mars_socket_is_alive(&output->socket))) {
if (do_kill) {
do_kill = false;
_kill_socket(output);
}
status = _connect(output, brick->brick_name);
MARS_IO("connect status = %d\n", status);
if (unlikely(status < 0)) {
msleep(5000);
continue;
}
do_kill = true;
do_resubmit = true;
}
@ -429,7 +424,7 @@ static int sender_thread(void *data)
MARS_IO("sending mref, id = %d pos = %lld len = %d rw = %d\n", mref->ref_id, mref->ref_pos, mref->ref_len, mref->ref_rw);
status = mars_send_mref(output->socket, mref);
status = mars_send_mref(&output->socket, mref);
MARS_IO("status = %d\n", status);
if (unlikely(status < 0)) {
// retry submission on next occasion..
@ -440,8 +435,10 @@ static int sender_thread(void *data)
MARS_WRN("sending failed, status = %d\n", status);
_kill_socket(output);
if (do_kill) {
do_kill = false;
_kill_socket(output);
}
continue;
}
}
@ -450,7 +447,9 @@ static int sender_thread(void *data)
MARS_WRN("sender thread terminated with status = %d\n", status);
}
_kill_socket(output);
if (do_kill) {
_kill_socket(output);
}
/* Signal error on all pending IO requests.
* We have no other chance (except probably delaying
@ -528,8 +527,8 @@ char *client_statistics(struct client_brick *brick, int verbose)
if (!res)
return NULL;
snprintf(res, 512, "socket = %p fly_count = %d\n",
output->socket,
snprintf(res, 512, "#%d socket fly_count = %d\n",
output->socket.s_debug_nr,
atomic_read(&output->fly_count));
return res;

View File

@ -39,7 +39,7 @@ struct client_output {
struct list_head wait_list;
wait_queue_head_t event;
int last_id;
struct mars_socket *socket;
struct mars_socket socket;
char *host;
char *path;
struct client_threadinfo sender;

View File

@ -34,7 +34,8 @@ struct mars_tcp_params default_tcp_params = {
};
EXPORT_SYMBOL(default_tcp_params);
static void _check(int status)
static
void _check(int status)
{
if (status < 0) {
MARS_WRN("cannot set socket option, status = %d\n", status);
@ -93,34 +94,31 @@ int mars_create_sockaddr(struct sockaddr_storage *addr, const char *spec)
}
EXPORT_SYMBOL_GPL(mars_create_sockaddr);
/* The original struct socket has no refcount. This leads to problems
* during long-lasting system calls when racing with socket shutdown.
* This is just a small wrapper adding a refcount.
*/
struct mars_socket {
atomic_t s_count;
struct socket *s_socket;
bool s_dead;
};
static int current_debug_nr = 0; // no locking, just for debugging
struct mars_socket *mars_create_socket(struct sockaddr_storage *addr, bool is_server)
int mars_create_socket(struct mars_socket *msock, struct sockaddr_storage *addr, bool is_server)
{
struct mars_socket *msock;
struct socket *sock;
struct sockaddr *sockaddr = (void*)addr;
int x_true = 1;
int status = -ENOMEM;
msock = brick_zmem_alloc(sizeof(struct mars_socket));
if (!msock)
goto done;
int status = -EEXIST;
if (unlikely(atomic_read(&msock->s_count))) {
MARS_WRN("#%d socket already in use\n", msock->s_debug_nr);
goto final;
}
if (unlikely(msock->s_socket)) {
MARS_WRN("#%d socket already open\n", msock->s_debug_nr);
goto final;
}
status = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &msock->s_socket);
if (unlikely(status < 0)) {
msock->s_socket = NULL;
MARS_WRN("cannot create socket, status = %d\n", status);
goto done;
goto final;
}
atomic_set(&msock->s_count, 1);
msock->s_debug_nr = ++current_debug_nr;
sock = msock->s_socket;
status = -EINVAL;
CHECK_PTR(sock, done);
@ -128,6 +126,7 @@ struct mars_socket *mars_create_socket(struct sockaddr_storage *addr, bool is_se
/* TODO: improve this by a table-driven approach
*/
sock->sk->sk_rcvtimeo = sock->sk->sk_sndtimeo = default_tcp_params.tcp_timeout * HZ;
sock->sk->sk_reuse = 1;
status = kernel_setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char*)&default_tcp_params.window_size, sizeof(default_tcp_params.window_size));
_check(status);
status = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char*)&default_tcp_params.window_size, sizeof(default_tcp_params.window_size));
@ -150,39 +149,40 @@ struct mars_socket *mars_create_socket(struct sockaddr_storage *addr, bool is_se
if (is_server) {
status = kernel_bind(sock, sockaddr, sizeof(*sockaddr));
if (unlikely(status < 0)) {
MARS_WRN("bind failed, status = %d\n", status);
MARS_WRN("#%d bind failed, status = %d\n", msock->s_debug_nr, status);
goto done;
}
status = kernel_listen(sock, 16);
if (status < 0) {
MARS_WRN("listen failed, status = %d\n", status);
MARS_WRN("#%d listen failed, status = %d\n", msock->s_debug_nr, status);
}
} else {
status = kernel_connect(sock, sockaddr, sizeof(*sockaddr), 0);
if (status < 0) {
MARS_DBG("connect failed, status = %d\n", status);
MARS_DBG("#%d connect failed, status = %d\n", msock->s_debug_nr, status);
}
}
done:
if (status < 0) {
mars_put_socket(msock);
msock = ERR_PTR(status);
} else {
MARS_DBG("successfully created socket #%d\n", msock->s_debug_nr);
}
return msock;
final:
return status;
}
EXPORT_SYMBOL_GPL(mars_create_socket);
struct mars_socket *mars_accept_socket(struct mars_socket *msock, bool do_block)
int mars_accept_socket(struct mars_socket *new_msock, struct mars_socket *old_msock, bool do_block)
{
int status = -ENOENT;
if (likely(msock)) {
struct mars_socket *new_msock;
struct socket *new_socket = NULL;
struct socket *new_socket = NULL;
bool ok;
mars_get_socket(msock);
status = kernel_accept(msock->s_socket, &new_socket, do_block ? 0 : O_NONBLOCK);
mars_put_socket(msock);
ok = mars_get_socket(old_msock);
if (likely(ok)) {
status = kernel_accept(old_msock->s_socket, &new_socket, do_block ? 0 : O_NONBLOCK);
if (unlikely(status < 0)) {
goto err;
}
@ -191,61 +191,70 @@ struct mars_socket *mars_accept_socket(struct mars_socket *msock, bool do_block)
goto err;
}
MARS_IO("old#%d status = %d file = %p flags = 0x%x\n", old_msock->s_debug_nr, status, new_socket->file, new_socket->file ? new_socket->file->f_flags : 0);
#if 0 // do not use for now
if (!do_block && new_socket->file) { // switch back to blocking mode
new_socket->file->f_flags &= ~O_NONBLOCK;
}
#endif
status = -ENOMEM;
new_msock = brick_zmem_alloc(sizeof(struct mars_socket));
if (!new_msock) {
kernel_sock_shutdown(new_socket, SHUT_WR);
sock_release(new_socket);
goto err;
}
atomic_set(&new_msock->s_count, 1);
memset(new_msock, 0, sizeof(struct mars_socket));
new_msock->s_socket = new_socket;
return new_msock;
}
atomic_set(&new_msock->s_count, 1);
new_msock->s_debug_nr = ++current_debug_nr;
MARS_DBG("#%d successfully accepted socket #%d\n", old_msock->s_debug_nr, new_msock->s_debug_nr);
status = 0;
err:
return ERR_PTR(status);
mars_put_socket(old_msock);
}
return status;
}
EXPORT_SYMBOL_GPL(mars_accept_socket);
struct mars_socket *mars_get_socket(struct mars_socket *msock)
bool mars_get_socket(struct mars_socket *msock)
{
if (likely(msock)) {
atomic_inc(&msock->s_count);
MARS_IO("try socket #%d %p s_dead = %d s_count=%d\n", msock->s_debug_nr, msock->s_socket, msock->s_dead, atomic_read(&msock->s_count));
if (unlikely(atomic_read(&msock->s_count) <= 0))
return false;
atomic_inc(&msock->s_count);
if (unlikely(!msock->s_socket || msock->s_dead)) {
mars_put_socket(msock);
return false;
}
return msock;
MARS_IO("got socket #%d\n", msock->s_debug_nr);
return true;
}
EXPORT_SYMBOL_GPL(mars_get_socket);
void mars_put_socket(struct mars_socket *msock)
{
if (likely(msock)) {
if (atomic_dec_and_test(&msock->s_count)) {
struct socket *sock = msock->s_socket;
if (sock) {
if (!msock->s_dead) {
msock->s_dead = true;
kernel_sock_shutdown(sock, SHUT_WR);
}
sock_release(sock);
}
brick_mem_free(msock);
MARS_IO("try socket #%d %p s_dead = %d s_count=%d\n", msock->s_debug_nr, msock->s_socket, msock->s_dead, atomic_read(&msock->s_count));
if (unlikely(atomic_read(&msock->s_count) <= 0)) {
MARS_ERR("bad nesting on msock = %p\n", msock);
} else if (atomic_dec_and_test(&msock->s_count)) {
struct socket *sock = msock->s_socket;
MARS_DBG("closing socket #%d %p\n", msock->s_debug_nr, sock);
if (likely(sock)) {
kernel_sock_shutdown(sock, SHUT_WR);
sock_release(sock);
}
memset(msock, 0, sizeof(struct mars_socket));
}
}
EXPORT_SYMBOL_GPL(mars_put_socket);
void mars_shutdown_socket(struct mars_socket *msock)
{
if (likely(msock)) {
struct socket *sock = msock->s_socket;
if (sock && !msock->s_dead) {
struct socket *sock = msock->s_socket;
MARS_IO("try socket #%d %p s_dead = %d s_count=%d\n", msock->s_debug_nr, msock->s_socket, msock->s_dead, atomic_read(&msock->s_count));
if (likely(sock)) {
if (unlikely(atomic_read(&msock->s_count) <= 0)) {
MARS_ERR("bad nesting on msock = %p sock = %p\n", msock, sock);
}
if (!msock->s_dead) {
msock->s_dead = true;
MARS_DBG("shutdown socket #%d %p\n", msock->s_debug_nr, sock);
kernel_sock_shutdown(sock, SHUT_WR);
}
}
@ -254,13 +263,23 @@ EXPORT_SYMBOL_GPL(mars_shutdown_socket);
bool mars_socket_is_alive(struct mars_socket *msock)
{
if (!msock || msock->s_dead)
return false;
return true;
bool res = false;
if (!msock->s_socket)
goto done;
if (unlikely(atomic_read(&msock->s_count) <= 0)) {
MARS_ERR("bad nesting on msock = %p\n", msock);
goto done;
}
if (msock->s_dead)
goto done;
res = true;
done:
MARS_IO("#%d %p s_count = %d s_dead = %d\n", msock->s_debug_nr, msock->s_socket, atomic_read(&msock->s_count), msock->s_dead);
return res;
}
EXPORT_SYMBOL_GPL(mars_socket_is_alive);
int mars_send(struct mars_socket *msock, void *buf, int len)
int mars_send_raw(struct mars_socket *msock, void *buf, int len, bool cork)
{
struct kvec iov = {
.iov_base = buf,
@ -274,17 +293,24 @@ int mars_send(struct mars_socket *msock, void *buf, int len)
int sent = 0;
if (!mars_get_socket(msock))
goto done;
goto final;
//MARS_IO("buf = %p, len = %d\n", buf, len);
#if 0 // leads to obscure effects (short reads at other end)
if (cork)
msg.msg_flags |= TCP_CORK;
#endif
MARS_IO("#%d buf = %p, len = %d, cork = %d\n", msock->s_debug_nr, buf, len, cork);
while (sent < len) {
if (unlikely(msock->s_dead)) {
MARS_WRN("socket has disappeared\n");
MARS_WRN("#%d socket has disappeared\n", msock->s_debug_nr);
msleep(50);
status = -EIDRM;
goto done;
}
status = kernel_sendmsg(msock->s_socket, &msg, &iov, 1, len);
MARS_IO("#%d sendmsg status = %d\n", msock->s_debug_nr, status);
if (status == -EAGAIN) {
msleep(50);
@ -293,17 +319,20 @@ int mars_send(struct mars_socket *msock, void *buf, int len)
if (status == -EINTR) { // ignore it
flush_signals(current);
MARS_IO("#%d got signal\n", msock->s_debug_nr);
msleep(50);
continue;
}
if (status < 0) {
MARS_WRN("bad socket sendmsg, len=%d, iov_len=%d, sent=%d, status = %d\n", len, (int)iov.iov_len, sent, status);
MARS_WRN("#%d bad socket sendmsg, len=%d, iov_len=%d, sent=%d, status = %d\n", msock->s_debug_nr, len, (int)iov.iov_len, sent, status);
msleep(50);
goto done;
}
if (!status) {
MARS_WRN("EOF from socket upon sendmsg\n");
MARS_WRN("#%d EOF from socket upon sendmsg\n", msock->s_debug_nr);
msleep(50);
status = -ECOMM;
goto done;
}
@ -313,26 +342,28 @@ int mars_send(struct mars_socket *msock, void *buf, int len)
sent += status;
}
status = sent;
MARS_IO("#%d sent %d\n", msock->s_debug_nr, sent);
done:
mars_put_socket(msock);
final:
return status;
}
EXPORT_SYMBOL_GPL(mars_send);
EXPORT_SYMBOL_GPL(mars_send_raw);
int mars_recv(struct mars_socket *msock, void *buf, int minlen, int maxlen)
int mars_recv_raw(struct mars_socket *msock, void *buf, int minlen, int maxlen)
{
int sleeptime = 1000 / HZ;
int status = -EIDRM;
int done = 0;
if (!buf) {
MARS_WRN("bad receive buffer\n");
MARS_WRN("#%d bad receive buffer\n", msock->s_debug_nr);
return -EINVAL;
}
if (!mars_get_socket(msock))
goto err;
goto final;
while (done < minlen) {
struct kvec iov = {
@ -346,15 +377,17 @@ int mars_recv(struct mars_socket *msock, void *buf, int minlen, int maxlen)
};
if (unlikely(msock->s_dead)) {
MARS_WRN("socket has disappeared\n");
MARS_WRN("#%d socket has disappeared\n", msock->s_debug_nr);
status = -EIDRM;
goto err;
}
MARS_IO("done %d, fetching %d bytes\n", done, maxlen-done);
MARS_IO("#%d done %d, fetching %d bytes\n", msock->s_debug_nr, done, maxlen-done);
status = kernel_recvmsg(msock->s_socket, &msg, &iov, 1, maxlen-done, msg.msg_flags);
MARS_IO("#%d status = %d\n", msock->s_debug_nr, status);
if (status == -EAGAIN) {
// linearly increasing backoff
if (sleeptime < 100)
@ -363,12 +396,12 @@ int mars_recv(struct mars_socket *msock, void *buf, int minlen, int maxlen)
continue;
}
if (!status) { // EOF
MARS_WRN("got EOF from socket (done=%d, req_size=%d)\n", done, maxlen - done);
MARS_WRN("#%d got EOF from socket (done=%d, req_size=%d)\n", msock->s_debug_nr, done, maxlen - done);
status = -EPIPE;
goto err;
}
if (status < 0) {
MARS_WRN("bad recvmsg, status = %d\n", status);
MARS_WRN("#%d bad recvmsg, status = %d\n", msock->s_debug_nr, status);
goto err;
}
done += status;
@ -378,9 +411,10 @@ int mars_recv(struct mars_socket *msock, void *buf, int minlen, int maxlen)
err:
mars_put_socket(msock);
final:
return status;
}
EXPORT_SYMBOL_GPL(mars_recv);
EXPORT_SYMBOL_GPL(mars_recv_raw);
///////////////////////////////////////////////////////////////////////
@ -389,29 +423,26 @@ EXPORT_SYMBOL_GPL(mars_recv);
/* TODO: make this bytesex-aware
*/
#define MARS_NET_MAGIC 0x63f092ec6048f48cll
#define MARS_NET_MAGIC 0x63f092ec6148f48cll
#define MAX_FIELD_LEN 32
struct mars_net_header {
u64 h_magic;
char h_name[MAX_FIELD_LEN];
u16 h_seq;
u16 h_len;
char h_name[MAX_FIELD_LEN];
};
int _mars_send_struct(struct mars_socket *msock, void *data, const struct meta *meta, int *seq)
int _mars_send_struct(struct mars_socket *msock, void *data, const struct meta *meta, int *seq, bool cork)
{
int count = 0;
int status = 0;
if (!data) { // send EOR
struct mars_net_header header = {
.h_magic = MARS_NET_MAGIC,
.h_seq = -1,
};
return mars_send(msock, &header, sizeof(header));
if (!data) { // directly send EOR
goto done;
}
for (; ; meta++) {
for (; meta->field_name != NULL; meta++) {
struct mars_net_header header = {
.h_magic = MARS_NET_MAGIC,
.h_seq = ++(*seq),
@ -420,7 +451,7 @@ int _mars_send_struct(struct mars_socket *msock, void *data, const struct meta *
int len = meta->field_size;
#if 1
if (len > 16 * PAGE_SIZE) {
MARS_WRN("implausible len=%d, \n", len);
MARS_WRN("#%d implausible len=%d, \n", msock->s_debug_nr, len);
msleep(30000);
status = -EINVAL;
break;
@ -441,7 +472,7 @@ int _mars_send_struct(struct mars_socket *msock, void *data, const struct meta *
break;
case FIELD_REF:
if (!meta->field_ref) {
MARS_WRN("improper FIELD_REF definition\n");
MARS_WRN("#%d improper FIELD_REF definition\n", msock->s_debug_nr);
status = -EINVAL;
break;
}
@ -459,7 +490,7 @@ int _mars_send_struct(struct mars_socket *msock, void *data, const struct meta *
// all ok
break;
default:
MARS_WRN("invalid field type %d\n", meta->field_type);
MARS_WRN("#%d invalid field type %d\n", msock->s_debug_nr, meta->field_type);
status = -EINVAL;
break;
}
@ -467,28 +498,27 @@ int _mars_send_struct(struct mars_socket *msock, void *data, const struct meta *
break;
header.h_len = len;
if (meta->field_name) {
strncpy(header.h_name, meta->field_name, MAX_FIELD_LEN);
header.h_name[MAX_FIELD_LEN-1] = '\0';
}
strncpy(header.h_name, meta->field_name, MAX_FIELD_LEN);
header.h_name[MAX_FIELD_LEN-1] = '\0';
MARS_IO("sending header %d '%s' len = %d\n", header.h_seq, header.h_name, len);
status = mars_send(msock, &header, sizeof(header));
if (status < 0 || !meta->field_name) { // EOR
MARS_IO("#%d sending header %d '%s' len = %d\n", msock->s_debug_nr, header.h_seq, header.h_name, len);
status = mars_send_raw(msock, &header, sizeof(header), true);
if (status < 0)
break;
}
switch (meta->field_type) {
case FIELD_REF:
case FIELD_SUB:
status = _mars_send_struct(msock, item, meta->field_ref, seq);
if (status > 0)
count += status;
if (len > 0) {
status = _mars_send_struct(msock, item, meta->field_ref, seq, true);
if (status > 0)
count += status;
}
break;
default:
if (len > 0) {
MARS_IO("sending extra %d\n", len);
status = mars_send(msock, item, len);
MARS_IO("#%d sending extra %d\n", msock->s_debug_nr, len);
status = mars_send_raw(msock, item, len, true);
if (status > 0)
count++;
}
@ -498,6 +528,15 @@ int _mars_send_struct(struct mars_socket *msock, void *data, const struct meta *
break;
}
}
done:
if (status >= 0) { // send EOR
struct mars_net_header header = {
.h_magic = MARS_NET_MAGIC,
.h_seq = ++(*seq),
// .h_name is left empty
};
status = mars_send_raw(msock, &header, sizeof(header), cork);
}
if (status >= 0)
status = count;
return status;
@ -506,7 +545,7 @@ int _mars_send_struct(struct mars_socket *msock, void *data, const struct meta *
int mars_send_struct(struct mars_socket *msock, void *data, const struct meta *meta)
{
int seq = 0;
return _mars_send_struct(msock, data, meta, &seq);
return _mars_send_struct(msock, data, meta, &seq, false);
}
EXPORT_SYMBOL_GPL(mars_send_struct);
@ -515,7 +554,7 @@ int _mars_recv_struct(struct mars_socket *msock, void *data, const struct meta *
int count = 0;
int status = -EINVAL;
//MARS_IO("\n");
MARS_IO("#%d called from line %d\n", msock->s_debug_nr, line);
if (!data) {
goto done;
}
@ -524,27 +563,31 @@ int _mars_recv_struct(struct mars_socket *msock, void *data, const struct meta *
const struct meta *tmp;
void *item;
void *mem;
status = mars_recv(msock, &header, sizeof(header), sizeof(header));
status = mars_recv_raw(msock, &header, sizeof(header), sizeof(header));
if (status == -EAGAIN) {
msleep(50);
continue;
}
if (status < 0) {
MARS_WRN("status = %d\n", status);
MARS_WRN("#%d called from line %d status = %d\n", msock->s_debug_nr, line, status);
break;
}
MARS_IO("#%d called from line %d got header %d '%s' len = %d\n", msock->s_debug_nr, line, header.h_seq, header.h_name, header.h_len);
if (status != sizeof(header)) {
MARS_WRN("#%d called from line %d bad header len = %d (required=%d)\n", msock->s_debug_nr, line, status, (int)sizeof(header));
break;
}
MARS_IO("got header %d '%s' len = %d\n", header.h_seq, header.h_name, header.h_len);
if (header.h_magic != MARS_NET_MAGIC) {
MARS_WRN("bad packet header magic = %llx\n", header.h_magic);
MARS_WRN("#%d called from line %d bad packet header magic = %llx\n", msock->s_debug_nr, line, header.h_magic);
status = -ENOMSG;
break;
}
if (header.h_seq == -1) { // got EOR
if (!header.h_name[0]) { // got EOR
status = 0;
break;
};
if (header.h_seq <= *seq) {
MARS_WRN("unexpected packet data, seq=%d (expected=%d)\n", header.h_seq, (*seq) + 1);
MARS_WRN("#%d called from line %d unexpected packet data, seq=%d (expected=%d)\n", msock->s_debug_nr, line, header.h_seq, (*seq) + 1);
status = -ENOMSG;
break;
}
@ -556,14 +599,14 @@ int _mars_recv_struct(struct mars_socket *msock, void *data, const struct meta *
}
tmp = find_meta(meta, header.h_name);
if (!tmp) {
MARS_WRN("unknown field '%s'\n", header.h_name);
if (unlikely(!tmp)) {
MARS_WRN("#%d called from line %d unknown field '%s'\n", msock->s_debug_nr, line, header.h_name);
if (header.h_len > 0) { // try to continue by skipping the rest of data
void *dummy = brick_mem_alloc(header.h_len);
status = -ENOMEM;
if (!dummy)
break;
status = mars_recv(msock, dummy, header.h_len, header.h_len);
status = mars_recv_raw(msock, dummy, header.h_len, header.h_len);
brick_mem_free(dummy);
if (status < 0)
break;
@ -598,36 +641,38 @@ int _mars_recv_struct(struct mars_socket *msock, void *data, const struct meta *
case FIELD_REF:
case FIELD_SUB:
if (!item) {
MARS_WRN("bad item\n");
MARS_WRN("#%d called from line %d bad item\n", msock->s_debug_nr, line);
status = -EINVAL;
break;
}
MARS_IO("starting recursive structure\n");
status = _mars_recv_struct(msock, item, tmp->field_ref, seq, line);
MARS_IO("ending recursive structure, status = %d\n", status);
if (header.h_len > 0) {
MARS_IO("#%d called from line %d starting recursive structure\n", msock->s_debug_nr, line);
status = _mars_recv_struct(msock, item, tmp->field_ref, seq, line);
MARS_IO("#%d called from line %d ending recursive structure, status = %d\n", msock->s_debug_nr, line, status);
if (status > 0)
count += status;
if (status > 0)
count += status;
}
break;
default:
if (header.h_len > 0) {
if (!item) {
MARS_WRN("bad item\n");
MARS_WRN("#%d called from line %d bad item\n", msock->s_debug_nr, line);
status = -EINVAL;
break;
}
MARS_IO("reading extra %d\n", header.h_len);
status = mars_recv(msock, item, header.h_len, header.h_len);
MARS_IO("#%d called from line %d reading extra %d\n", msock->s_debug_nr, line, header.h_len);
status = mars_recv_raw(msock, item, header.h_len, header.h_len);
while (status == -EAGAIN) {
msleep(50);
status = mars_recv(msock, item, header.h_len, header.h_len);
status = mars_recv_raw(msock, item, header.h_len, header.h_len);
}
if (status >= 0) {
//MARS_IO("got data len = %d status = %d\n", header.h_len, status);
//MARS_IO("#%d got data len = %d status = %d\n", msock->s_debug_nr, header.h_len, status);
count++;
} else {
MARS_WRN("len = %d, status = %d\n", header.h_len, status);
MARS_WRN("#%d called from line %d len = %d, status = %d\n", msock->s_debug_nr, line, header.h_len, status);
}
}
}
@ -639,7 +684,7 @@ done:
status = count;
mars_check_meta(meta, data);
} else {
MARS_WRN("status = %d\n", status);
MARS_WRN("#%d called from line %d status = %d\n", msock->s_debug_nr, line, status);
}
return status;
}
@ -680,18 +725,20 @@ int mars_send_mref(struct mars_socket *msock, struct mref_object *mref)
.cmd_code = CMD_MREF,
.cmd_int1 = mref->ref_id,
};
int seq = 0;
int status;
status = mars_send_struct(msock, &cmd, mars_cmd_meta);
status = _mars_send_struct(msock, &cmd, mars_cmd_meta, &seq, true);
if (status < 0)
goto done;
status = mars_send_struct(msock, mref, mars_mref_meta);
seq = 0;
status = _mars_send_struct(msock, mref, mars_mref_meta, &seq, mref->ref_rw != 0);
if (status < 0)
goto done;
if (mref->ref_rw) {
status = mars_send(msock, mref->ref_data, mref->ref_len);
if (mref->ref_rw != 0) {
status = mars_send_raw(msock, mref->ref_data, mref->ref_len, false);
}
done:
return status;
@ -700,8 +747,10 @@ EXPORT_SYMBOL_GPL(mars_send_mref);
int mars_recv_mref(struct mars_socket *msock, struct mref_object *mref)
{
int seq = 0;
int status;
status = mars_recv_struct(msock, mref, mars_mref_meta);
status = _mars_recv_struct(msock, mref, mars_mref_meta, &seq, __LINE__);
if (status < 0)
goto done;
if (mref->ref_rw) {
@ -711,9 +760,9 @@ int mars_recv_mref(struct mars_socket *msock, struct mref_object *mref)
status = -ENOMEM;
goto done;
}
status = mars_recv(msock, mref->ref_data, mref->ref_len, mref->ref_len);
status = mars_recv_raw(msock, mref->ref_data, mref->ref_len, mref->ref_len);
if (status < 0)
MARS_WRN("mref_len = %d, status = %d\n", mref->ref_len, status);
MARS_WRN("#%d mref_len = %d, status = %d\n", msock->s_debug_nr, mref->ref_len, status);
}
done:
return status;
@ -726,16 +775,21 @@ int mars_send_cb(struct mars_socket *msock, struct mref_object *mref)
.cmd_code = CMD_CB,
.cmd_int1 = mref->ref_id,
};
int seq = 0;
int status;
status = mars_send_struct(msock, &cmd, mars_cmd_meta);
status = _mars_send_struct(msock, &cmd, mars_cmd_meta, &seq, true);
if (status < 0)
goto done;
status = mars_send_struct(msock, mref, mars_mref_meta);
seq = 0;
status = _mars_send_struct(msock, mref, mars_mref_meta, &seq, !mref->ref_rw);
if (status < 0)
goto done;
if (!mref->ref_rw) {
MARS_IO("sending blocklen = %d\n", mref->ref_len);
status = mars_send(msock, mref->ref_data, mref->ref_len);
MARS_IO("#%d sending blocklen = %d\n", msock->s_debug_nr, mref->ref_len);
status = mars_send_raw(msock, mref->ref_data, mref->ref_len, false);
}
done:
return status;
@ -744,18 +798,20 @@ EXPORT_SYMBOL_GPL(mars_send_cb);
int mars_recv_cb(struct mars_socket *msock, struct mref_object *mref)
{
int seq = 0;
int status;
status = mars_recv_struct(msock, mref, mars_mref_meta);
status = _mars_recv_struct(msock, mref, mars_mref_meta, &seq, __LINE__);
if (status < 0)
goto done;
if (!mref->ref_rw) {
if (!mref->ref_data) {
MARS_WRN("no internal buffer available\n");
MARS_WRN("#%d no internal buffer available\n", msock->s_debug_nr);
status = -EINVAL;
goto done;
}
MARS_IO("receiving blocklen = %d\n", mref->ref_len);
status = mars_recv(msock, mref->ref_data, mref->ref_len, mref->ref_len);
MARS_IO("#%d receiving blocklen = %d\n", msock->s_debug_nr, mref->ref_len);
status = mars_recv_raw(msock, mref->ref_data, mref->ref_len, mref->ref_len);
}
done:
return status;

View File

@ -10,7 +10,16 @@
#define MARS_DEFAULT_PORT 7777
struct mars_socket; // opaque
/* The original struct socket has no refcount. This leads to problems
* during long-lasting system calls when racing with socket shutdown.
* This is just a small wrapper adding a refcount and some debugging aid.
*/
struct mars_socket {
struct socket *s_socket;
atomic_t s_count;
int s_debug_nr;
bool s_dead;
};
struct mars_tcp_params {
int tcp_timeout;
@ -52,15 +61,15 @@ extern char *(*mars_translate_hostname)(const char *name);
*/
extern int mars_create_sockaddr(struct sockaddr_storage *addr, const char *spec);
extern struct mars_socket *mars_create_socket(struct sockaddr_storage *addr, bool is_server);
extern struct mars_socket *mars_accept_socket(struct mars_socket *msock, bool do_block);
extern struct mars_socket *mars_get_socket(struct mars_socket *msock);
extern int mars_create_socket(struct mars_socket *msock, struct sockaddr_storage *addr, bool is_server);
extern int mars_accept_socket(struct mars_socket *new_msock, struct mars_socket *old_msock, bool do_block);
extern bool mars_get_socket(struct mars_socket *msock);
extern void mars_put_socket(struct mars_socket *msock);
extern void mars_shutdown_socket(struct mars_socket *msock);
extern bool mars_socket_is_alive(struct mars_socket *msock);
extern int mars_send(struct mars_socket *msock, void *buf, int len);
extern int mars_recv(struct mars_socket *msock, void *buf, int minlen, int maxlen);
extern int mars_send_raw(struct mars_socket *msock, void *buf, int len, bool cork);
extern int mars_recv_raw(struct mars_socket *msock, void *buf, int minlen, int maxlen);
/* Mid-level generic field data exchange
*/

View File

@ -18,7 +18,7 @@
#include "mars_server.h"
static struct mars_socket *server_socket = NULL;
static struct mars_socket server_socket = {};
static struct task_struct *server_thread = NULL;
static LIST_HEAD(server_list);
static spinlock_t server_lock = SPIN_LOCK_UNLOCKED;
@ -30,13 +30,16 @@ static
int cb_thread(void *data)
{
struct server_brick *brick = data;
struct mars_socket *sock = brick->handler_socket;
struct mars_socket *sock = &brick->handler_socket;
bool ok = mars_get_socket(sock);
int status = -EINVAL;
brick->cb_running = true;
brick->cb_running = ok;
wake_up_interruptible(&brick->startup_event);
MARS_DBG("--------------- cb_thread starting on socket %p\n", brick->handler_socket);
MARS_DBG("--------------- cb_thread starting on socket #%d, ok = %d\n", sock->s_debug_nr, ok);
if (!ok)
goto done;
while (!kthread_should_stop() || !list_empty(&brick->cb_read_list) || !list_empty(&brick->cb_write_list)) {
struct server_mref_aspect *mref_a;
@ -92,9 +95,11 @@ int cb_thread(void *data)
atomic_dec(&brick->in_flight);
GENERIC_INPUT_CALL(brick->inputs[0], mref_put, mref);
}
done:
brick->cb_running = false;
MARS_DBG("---------- cb_thread terminating, status = %d\n", status);
if (ok)
mars_put_socket(sock);
wake_up_interruptible(&brick->startup_event);
return status;
}
@ -217,7 +222,7 @@ static
int handler_thread(void *data)
{
struct server_brick *brick = data;
struct mars_socket *sock = brick->handler_socket;
struct mars_socket *sock = &brick->handler_socket;
struct task_struct *cb_thread = brick->cb_thread;
int status = 0;
@ -366,7 +371,6 @@ int handler_thread(void *data)
MARS_DBG("self-shutdown\n");
h_thread = _grab_handler(brick);
mars_put_socket(sock);
brick->handler_socket = NULL;
if (h_thread) {
int status;
MARS_DBG("self cleanup...\n");
@ -452,15 +456,16 @@ static int server_switch(struct server_brick *brick)
}
} else {
struct task_struct *thread;
struct mars_socket *sock = &brick->handler_socket;
mars_power_led_on((void*)brick, false);
mars_shutdown_socket(sock);
thread = _grab_handler(brick);
if (thread) {
brick->handler_thread = NULL;
MARS_INF("stopping handler thread....\n");
mars_shutdown_socket(brick->handler_socket);
kthread_stop(thread);
mars_put_socket(brick->handler_socket);
brick->handler_socket = NULL;
if (sock->s_socket)
mars_put_socket(sock);
put_task_struct(thread);
} else {
MARS_WRN("handler thread does not exist\n");
@ -559,6 +564,7 @@ EXPORT_SYMBOL_GPL(server_brick_type);
static int _server_thread(void *data)
{
char *id = my_id();
struct server_brick *brick = NULL;
int status = 0;
MARS_INF("-------- server starting on host '%s' ----------\n", id);
@ -572,22 +578,27 @@ static int _server_thread(void *data)
MARS_INF("-------- server now working on host '%s' ----------\n", id);
while (!kthread_should_stop() && mars_global && mars_global->global_power.button) {
struct server_brick *brick;
struct mars_socket *new_socket;
new_socket = mars_accept_socket(server_socket, false);
if (IS_ERR(new_socket)) {
status = PTR_ERR(new_socket);
new_socket = NULL;
if (!brick) {
brick = (void*)mars_make_brick(mars_global, NULL, true, &server_brick_type, "server", "server");
if (!brick) {
MARS_ERR("cannot create server instance\n");
msleep(5000);
continue;
}
}
status = mars_accept_socket(&brick->handler_socket, &server_socket, false);
if (unlikely(status < 0)) {
msleep(500);
if (status == -EAGAIN)
continue; // without error message
MARS_WRN("accept status = %d\n", status);
msleep(4000);
msleep(2000);
continue;
}
MARS_DBG("got new connection %p\n", new_socket);
MARS_DBG("got new connection #%d\n", brick->handler_socket.s_debug_nr);
/* TODO: check authorization.
*/
@ -597,27 +608,24 @@ static int _server_thread(void *data)
goto err;
}
brick = (void*)mars_make_brick(mars_global, NULL, true, &server_brick_type, "server", "server");
if (!brick) {
MARS_ERR("cannot create server instance\n");
goto err;
}
brick->handler_socket = new_socket;
brick->power.button = true;
status = server_switch(brick);
if (status < 0) {
if (unlikely(status < 0)) {
MARS_ERR("cannot switch on server brick, status = %d\n", status);
goto err;
}
brick = NULL;
msleep(1000);
continue;
err:
if (new_socket) {
mars_put_socket(new_socket);
if (brick) {
mars_put_socket(&brick->handler_socket);
status = mars_kill_brick((void*)brick);
if (status < 0) {
BRICK_ERR("kill status = %d, giving up\n", status);
}
}
msleep(3000);
}
@ -664,10 +672,9 @@ int __init init_mars_server(void)
if (status < 0)
return status;
server_socket = mars_create_socket(&sockaddr, true);
if (unlikely(IS_ERR(server_socket))) {
status = PTR_ERR(server_socket);
server_socket = NULL;
status = mars_create_socket(&server_socket, &sockaddr, true);
if (unlikely(status < 0)) {
MARS_ERR("could not create server socket, status = %d\n", status);
return status;
}
@ -690,15 +697,9 @@ void __exit exit_mars_server(void)
MARS_INF("exit_server()\n");
server_unregister_brick_type();
if (server_thread) {
if (server_socket) {
mars_shutdown_socket(server_socket);
}
mars_put_socket(&server_socket);
MARS_INF("stopping server thread...\n");
kthread_stop(server_thread);
if (server_socket) {
mars_put_socket(server_socket);
server_socket = NULL;
}
put_task_struct(server_thread);
server_thread = NULL;
}

View File

@ -21,7 +21,7 @@ struct server_brick {
struct list_head server_link;
atomic_t in_flight;
struct semaphore socket_sem;
struct mars_socket *handler_socket;
struct mars_socket handler_socket;
struct task_struct *handler_thread;
struct task_struct *cb_thread;
wait_queue_head_t startup_event;

View File

@ -315,7 +315,7 @@ int _set_copy_params(struct mars_brick *_brick, void *private)
for (i = 0; i < 2; i++) {
status = cc->output[i]->ops->mars_get_info(cc->output[i], &cc->info[i]);
if (status < 0) {
MARS_ERR("cannot determine current size of '%s'\n", cc->argv[i]);
MARS_WRN("cannot determine current size of '%s'\n", cc->argv[i]);
goto done;
}
MARS_DBG("%d '%s' current_size = %lld\n", i, cc->fullpath[i], cc->info[i].current_size);
@ -664,7 +664,7 @@ struct mars_peerinfo {
struct mars_global *global;
char *peer;
char *path;
struct mars_socket *socket;
struct mars_socket socket;
struct task_struct *peer_thread;
spinlock_t lock;
struct list_head remote_dent_list;
@ -949,36 +949,19 @@ int run_bones(struct mars_peerinfo *peer)
// remote working infrastructure
static
void _peer_cleanup(struct mars_peerinfo *peer, bool do_stop)
void _peer_cleanup(struct mars_peerinfo *peer)
{
unsigned long flags;
MARS_DBG("\n");
if (peer->socket) {
mars_shutdown_socket(peer->socket);
}
if (do_stop && peer->peer_thread) {
kthread_stop(peer->peer_thread);
put_task_struct(peer->peer_thread);
peer->peer_thread = NULL;
}
if (peer->socket) {
mars_put_socket(peer->socket);
peer->socket = NULL;
MARS_DBG("cleanup\n");
if (peer->socket.s_socket) {
MARS_DBG("really shutdown socket\n");
mars_shutdown_socket(&peer->socket);
mars_put_socket(&peer->socket);
}
if (do_stop) {
LIST_HEAD(tmp_list);
traced_lock(&peer->lock, flags);
list_replace_init(&peer->remote_dent_list, &tmp_list);
traced_unlock(&peer->lock, flags);
mars_free_dent_all(NULL, &tmp_list);
brick_string_free(peer->peer);
brick_string_free(peer->path);
}
}
static
int remote_thread(void *data)
int peer_thread(void *data)
{
struct mars_peerinfo *peer = data;
char *real_peer;
@ -989,7 +972,7 @@ int remote_thread(void *data)
return -1;
real_peer = mars_translate_hostname(peer->peer);
MARS_INF("-------- remote thread starting on peer '%s' (%s)\n", peer->peer, real_peer);
MARS_INF("-------- peer thread starting on peer '%s' (%s)\n", peer->peer, real_peer);
status = mars_create_sockaddr(&sockaddr, real_peer);
if (unlikely(status < 0)) {
@ -1007,32 +990,35 @@ int remote_thread(void *data)
.cmd_int1 = peer->maxdepth,
};
if (!peer->socket) {
peer->socket = mars_create_socket(&sockaddr, false);
if (unlikely(IS_ERR(peer->socket))) {
status = PTR_ERR(peer->socket);
peer->socket = NULL;
if (!mars_socket_is_alive(&peer->socket)) {
if (peer->socket.s_socket) {
_peer_cleanup(peer);
msleep(5000);
continue;
}
status = mars_create_socket(&peer->socket, &sockaddr, false);
if (unlikely(status < 0)) {
MARS_INF("no connection to '%s'\n", real_peer);
msleep(5000);
continue;
}
MARS_DBG("successfully opened socket to '%s'\n", real_peer);
msleep(1000);
msleep(100);
continue;
}
status = mars_send_struct(peer->socket, &cmd, mars_cmd_meta);
status = mars_send_struct(&peer->socket, &cmd, mars_cmd_meta);
if (unlikely(status < 0)) {
MARS_WRN("communication error on send, status = %d\n", status);
_peer_cleanup(peer, false);
_peer_cleanup(peer);
msleep(2000);
continue;
}
status = mars_recv_dent_list(peer->socket, &tmp_list);
status = mars_recv_dent_list(&peer->socket, &tmp_list);
if (unlikely(status < 0)) {
MARS_WRN("communication error on receive, status = %d\n", status);
_peer_cleanup(peer, false);
_peer_cleanup(peer);
msleep(5000);
continue;
}
@ -1059,10 +1045,9 @@ int remote_thread(void *data)
MARS_INF("-------- remote thread terminating\n");
_peer_cleanup(peer, false);
_peer_cleanup(peer);
done:
//cleanup_mm();
brick_string_free(real_peer);
return 0;
}
@ -1073,8 +1058,10 @@ done:
static int _kill_peer(void *buf, struct mars_dent *dent)
{
LIST_HEAD(tmp_list);
struct mars_global *global = buf;
struct mars_peerinfo *peer = dent->d_private;
unsigned long flags;
if (global->global_power.button) {
return 0;
@ -1084,7 +1071,17 @@ static int _kill_peer(void *buf, struct mars_dent *dent)
}
MARS_INF("stopping peer thread...\n");
_peer_cleanup(peer, true);
if (peer->peer_thread) {
kthread_stop(peer->peer_thread);
put_task_struct(peer->peer_thread);
peer->peer_thread = NULL;
}
traced_lock(&peer->lock, flags);
list_replace_init(&peer->remote_dent_list, &tmp_list);
traced_unlock(&peer->lock, flags);
mars_free_dent_all(NULL, &tmp_list);
brick_string_free(peer->peer);
brick_string_free(peer->path);
dent->d_private = NULL;
brick_mem_free(peer);
return 0;
@ -1126,7 +1123,7 @@ static int _make_peer(struct mars_global *global, struct mars_dent *dent, char *
peer = dent->d_private;
if (!peer->peer_thread) {
peer->peer_thread = kthread_create(remote_thread, peer, "mars_peer%d", serial++);
peer->peer_thread = kthread_create(peer_thread, peer, "mars_peer%d", serial++);
if (unlikely(IS_ERR(peer->peer_thread))) {
MARS_ERR("cannot start peer thread, status = %d\n", (int)PTR_ERR(peer->peer_thread));
peer->peer_thread = NULL;
@ -3368,6 +3365,7 @@ static void (*exit_fn[INIT_MAX])(void) = {};
static int exit_fn_nr = 0;
#define DO_INIT(name) \
MARS_DBG("=== starting module " #name "...\n"); \
do { \
if ((status = init_##name()) < 0) goto done; \
exit_names[exit_fn_nr] = #name; \