mars_net: drastically speedup network

This commit is contained in:
Thomas Schoebel-Theuer 2012-08-06 15:04:32 +02:00 committed by Thomas Schoebel-Theuer
parent 5737198009
commit 9f28a4ef5f
4 changed files with 163 additions and 53 deletions

View File

@ -390,6 +390,7 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, bool fo
mref = mref_a->object; mref = mref_a->object;
if (!force && if (!force &&
mars_net_is_alive &&
(io_timeout <= 0 || !time_is_before_jiffies(mref_a->submit_jiffies + io_timeout * HZ))) { (io_timeout <= 0 || !time_is_before_jiffies(mref_a->submit_jiffies + io_timeout * HZ))) {
break; break;
} }

View File

@ -12,6 +12,9 @@
#include "mars.h" #include "mars.h"
#include "mars_net.h" #include "mars_net.h"
#undef USE_SENDPAGE // FIXME: does not work, leads to data corruption (probably due to races with asynchrous sending)
#define USE_BUFFERING
static static
void mars_check_meta(const struct meta *meta, void *data); void mars_check_meta(const struct meta *meta, void *data);
@ -28,7 +31,7 @@ void mars_check_meta(const struct meta *meta, void *data);
struct mars_tcp_params default_tcp_params = { struct mars_tcp_params default_tcp_params = {
.window_size = 8 * 1024 * 1024, // for long distance replications .window_size = 8 * 1024 * 1024, // for long distance replications
.tcp_timeout = 20, .tcp_timeout = 20,
.tcp_keepcnt = 6, .tcp_keepcnt = 3,
.tcp_keepintvl = 10, // keepalive ping time .tcp_keepintvl = 10, // keepalive ping time
.tcp_keepidle = 10, .tcp_keepidle = 10,
.tos = IPTOS_LOWDELAY, .tos = IPTOS_LOWDELAY,
@ -136,8 +139,8 @@ void _set_socketopts(struct socket *sock)
} }
#endif #endif
#if 0 // do not use for now #if 1
if (!do_block && sock->file) { // switch back to blocking mode if (sock->file) { // switch back to blocking mode
sock->file->f_flags &= ~O_NONBLOCK; sock->file->f_flags &= ~O_NONBLOCK;
} }
#endif #endif
@ -259,6 +262,8 @@ void mars_put_socket(struct mars_socket *msock)
kernel_sock_shutdown(sock, SHUT_WR); kernel_sock_shutdown(sock, SHUT_WR);
sock_release(sock); sock_release(sock);
} }
brick_block_free(msock->s_buffer, PAGE_SIZE);
msock->s_buffer = NULL;
memset(msock, 0, sizeof(struct mars_socket)); memset(msock, 0, sizeof(struct mars_socket));
} }
} }
@ -299,75 +304,149 @@ done:
} }
EXPORT_SYMBOL_GPL(mars_socket_is_alive); EXPORT_SYMBOL_GPL(mars_socket_is_alive);
int mars_send_raw(struct mars_socket *msock, void *buf, int len, bool cork) static
int _mars_send_raw(struct mars_socket *msock, void *buf, int len)
{ {
struct kvec iov = { int sleeptime = 1000 / HZ;
.iov_base = buf,
.iov_len = len,
};
struct msghdr msg = {
.msg_iov = (struct iovec*)&iov,
.msg_flags = 0 | MSG_NOSIGNAL,
};
int status = -EIDRM;
int sent = 0; int sent = 0;
int status = 0;
if (!mars_get_socket(msock)) while (len > 0) {
goto final; int this_len = len;
#if 0 // leads to obscure effects (short reads at other end) if (!mars_net_is_alive || kthread_should_stop()) {
if (cork) MARS_WRN("interrupting, sent = %d\n", sent);
msg.msg_flags |= TCP_CORK; status = -EIDRM;
break;
}
#ifdef USE_SENDPAGE // FIXME: does not work, leads to data corruption (probably due to races with asynchrous sending)
{
int page_offset = 0;
struct page *page;
int flags = MSG_NOSIGNAL;
page = brick_iomap(buf, &page_offset, &this_len);
if (unlikely(!page)) {
MARS_ERR("cannot iomap() kernel address %p\n", buf);
status = -EINVAL;
break;
}
if (this_len < len)
flags |= MSG_MORE;
status = kernel_sendpage(msock->s_socket, page, page_offset, this_len, flags);
if (status > 0 && status != this_len) {
MARS_WRN("#%d status = %d this_len = %d\n", msock->s_debug_nr, status, this_len);
}
}
#else // spare code, activate in case of problems with sendpage()
{
struct kvec iov = {
.iov_base = buf,
.iov_len = this_len,
};
struct msghdr msg = {
.msg_iov = (struct iovec*)&iov,
.msg_flags = 0 | MSG_NOSIGNAL,
};
status = kernel_sendmsg(msock->s_socket, &msg, &iov, 1, this_len);
}
#endif #endif
MARS_IO("#%d buf = %p, len = %d, cork = %d\n", msock->s_debug_nr, buf, len, cork);
while (sent < len) {
if (unlikely(msock->s_dead)) {
MARS_WRN("#%d socket has disappeared\n", msock->s_debug_nr);
msleep(50);
status = -EIDRM;
goto done;
}
status = kernel_sendmsg(msock->s_socket, &msg, &iov, 1, len);
MARS_IO("#%d sendmsg status = %d\n", msock->s_debug_nr, status);
if (status == -EAGAIN) { if (status == -EAGAIN) {
msleep(50); msleep(sleeptime);
// linearly increasing backoff
if (sleeptime < 100) {
sleeptime += 1000 / HZ;
}
continue; continue;
} }
if (unlikely(status == -EINTR)) { // ignore it
if (status == -EINTR) { // ignore it
flush_signals(current); flush_signals(current);
MARS_IO("#%d got signal\n", msock->s_debug_nr); MARS_IO("#%d got signal\n", msock->s_debug_nr);
msleep(50); msleep(50);
continue; continue;
} }
if (unlikely(!status)) {
if (status < 0) { MARS_WRN("#%d EOF from socket upon send_page()\n", msock->s_debug_nr);
MARS_WRN("#%d bad socket sendmsg, len=%d, iov_len=%d, sent=%d, status = %d\n", msock->s_debug_nr, len, (int)iov.iov_len, sent, status);
msleep(50);
goto done;
}
if (!status) {
MARS_WRN("#%d EOF from socket upon sendmsg\n", msock->s_debug_nr);
msleep(50); msleep(50);
status = -ECOMM; status = -ECOMM;
goto done; break;
}
if (unlikely(status < 0)) {
MARS_WRN("#%d bad socket sendmsg, len=%d, this_len=%d, sent=%d, status = %d\n", msock->s_debug_nr, len, this_len, sent, status);
break;
} }
iov.iov_base += status; len -= status;
iov.iov_len -= status; buf += status;
sent += status; sent += status;
sleeptime = 1000 / HZ;
}
if (sent > 0)
status = sent;
return status;
}
int mars_send_raw(struct mars_socket *msock, void *buf, int len, bool cork)
{
#ifdef USE_BUFFERING
int sent = 0;
int rest = len;
#endif
int status = -EINVAL;
if (!mars_get_socket(msock))
goto final;
#ifdef USE_BUFFERING
restart:
while (!msock->s_buffer) {
msock->s_pos = 0;
msock->s_buffer = brick_block_alloc(0, PAGE_SIZE);
if (unlikely(!msock->s_buffer))
msleep(100);
}
if (msock->s_pos + rest < PAGE_SIZE) {
memcpy(msock->s_buffer + msock->s_pos, buf, rest);
msock->s_pos += rest;
sent += rest;
rest = 0;
status = sent;
if (cork)
goto done;
}
if (msock->s_pos > 0) {
status = _mars_send_raw(msock, msock->s_buffer, msock->s_pos);
if (status < 0)
goto done;
brick_block_free(msock->s_buffer, PAGE_SIZE);
msock->s_buffer = NULL;
msock->s_pos = 0;
}
if (rest >= PAGE_SIZE) {
return _mars_send_raw(msock, buf, rest);
} else if (rest > 0) {
goto restart;
} }
status = sent; status = sent;
MARS_IO("#%d sent %d\n", msock->s_debug_nr, sent);
done: done:
#else
status = _mars_send_raw(msock, buf, len);
#endif
if (status < 0 && msock->s_shutdown_on_err) if (status < 0 && msock->s_shutdown_on_err)
mars_shutdown_socket(msock); mars_shutdown_socket(msock);
mars_put_socket(msock); mars_put_socket(msock);
final: final:
return status; return status;
} }
@ -408,22 +487,31 @@ int mars_recv_raw(struct mars_socket *msock, void *buf, int minlen, int maxlen)
goto err; goto err;
} }
if (!mars_net_is_alive || kthread_should_stop()) {
MARS_WRN("#%d interrupting, done = %d\n", msock->s_debug_nr, done);
if (done > 0)
status = -EIDRM;
goto err;
}
MARS_IO("#%d done %d, fetching %d bytes\n", msock->s_debug_nr, done, maxlen-done); MARS_IO("#%d done %d, fetching %d bytes\n", msock->s_debug_nr, done, maxlen-done);
status = kernel_recvmsg(msock->s_socket, &msg, &iov, 1, maxlen-done, msg.msg_flags); status = kernel_recvmsg(msock->s_socket, &msg, &iov, 1, maxlen-done, msg.msg_flags);
MARS_IO("#%d status = %d\n", msock->s_debug_nr, status); MARS_IO("#%d status = %d\n", msock->s_debug_nr, status);
if (!mars_net_is_alive || kthread_should_stop()) {
MARS_WRN("#%d interrupting, done = %d\n", msock->s_debug_nr, done);
if (done > 0)
status = -EIDRM;
goto err;
}
if (status == -EAGAIN) { if (status == -EAGAIN) {
msleep(sleeptime); msleep(sleeptime);
// linearly increasing backoff // linearly increasing backoff
if (sleeptime < 100) { if (sleeptime < 100) {
sleeptime += 1000 / HZ; sleeptime += 1000 / HZ;
} else if (kthread_should_stop()) {
MARS_WRN("interrupting, done = %d\n", done);
if (done > 0)
status = -EIDRM;
goto err;
} }
continue; continue;
} }
@ -865,14 +953,19 @@ EXPORT_SYMBOL_GPL(mars_recv_cb);
char *(*mars_translate_hostname)(const char *name) = NULL; char *(*mars_translate_hostname)(const char *name) = NULL;
EXPORT_SYMBOL_GPL(mars_translate_hostname); EXPORT_SYMBOL_GPL(mars_translate_hostname);
bool mars_net_is_alive = false;
EXPORT_SYMBOL_GPL(mars_net_is_alive);
int __init init_mars_net(void) int __init init_mars_net(void)
{ {
MARS_INF("init_net()\n"); MARS_INF("init_net()\n");
mars_net_is_alive = true;
return 0; return 0;
} }
void __exit exit_mars_net(void) void __exit exit_mars_net(void)
{ {
mars_net_is_alive = false;
MARS_INF("exit_net()\n"); MARS_INF("exit_net()\n");
} }

View File

@ -10,13 +10,21 @@
#define MARS_DEFAULT_PORT 7777 #define MARS_DEFAULT_PORT 7777
extern bool mars_net_is_alive;
/* The original struct socket has no refcount. This leads to problems /* The original struct socket has no refcount. This leads to problems
* during long-lasting system calls when racing with socket shutdown. * during long-lasting system calls when racing with socket shutdown.
* This is just a small wrapper adding a refcount and some debugging aid. *
* The original idea of struct mars_docket was just a small wrapper
* adding a refcount and some debugging aid.
* Nowadays, some buffering was added in order to take advantage of
* kernel_sendpage().
*/ */
struct mars_socket { struct mars_socket {
struct socket *s_socket; struct socket *s_socket;
void *s_buffer;
atomic_t s_count; atomic_t s_count;
int s_pos;
int s_debug_nr; int s_debug_nr;
bool s_dead; bool s_dead;
bool s_shutdown_on_err; bool s_shutdown_on_err;

View File

@ -1045,6 +1045,11 @@ int peer_thread(void *data)
msleep(5000); msleep(5000);
continue; continue;
} }
if (!mars_net_is_alive) {
msleep(1000);
continue;
}
status = mars_create_socket(&peer->socket, &sockaddr, false); status = mars_create_socket(&peer->socket, &sockaddr, false);
if (unlikely(status < 0)) { if (unlikely(status < 0)) {
MARS_INF("no connection to '%s'\n", real_peer); MARS_INF("no connection to '%s'\n", real_peer);
@ -3771,7 +3776,10 @@ static int light_thread(void *data)
msleep(100); msleep(100);
_global.global_power.button = !kthread_should_stop(); if (kthread_should_stop()) {
_global.global_power.button = false;
mars_net_is_alive = false;
}
_make_alivelink("alive", _global.global_power.button ? 1 : 0); _make_alivelink("alive", _global.global_power.button ? 1 : 0);
mars_remaining_space("/mars", &_global.total_space, &_global.remaining_space); mars_remaining_space("/mars", &_global.total_space, &_global.remaining_space);