mirror of
https://github.com/schoebel/mars
synced 2025-04-01 00:06:32 +00:00
net: speedup struct communication
This commit is contained in:
parent
bcea0eac40
commit
d07aaa75c3
712
mars_net.c
712
mars_net.c
@ -3,6 +3,13 @@
|
||||
//#define BRICK_DEBUGGING
|
||||
//#define MARS_DEBUGGING
|
||||
//#define IO_DEBUGGING
|
||||
//#define LOWLEVEL_DEBUGGING
|
||||
|
||||
#ifdef LOWLEVEL_DEBUGGING
|
||||
#define MARS_LOW MARS_IO
|
||||
#else
|
||||
#define MARS_LOW(args...) /*empty*/
|
||||
#endif
|
||||
|
||||
#include <linux/kernel.h>
|
||||
#include <linux/module.h>
|
||||
@ -14,9 +21,6 @@
|
||||
#undef USE_SENDPAGE // FIXME: does not work, leads to data corruption (probably due to races with asynchrous sending)
|
||||
#define USE_BUFFERING
|
||||
|
||||
static
|
||||
void mars_check_meta(const struct meta *meta, void *data);
|
||||
|
||||
/* Low-level network traffic
|
||||
*/
|
||||
|
||||
@ -236,7 +240,7 @@ EXPORT_SYMBOL_GPL(mars_accept_socket);
|
||||
|
||||
bool mars_get_socket(struct mars_socket *msock)
|
||||
{
|
||||
MARS_IO("try socket #%d %p s_dead = %d s_count=%d\n", msock->s_debug_nr, msock->s_socket, msock->s_dead, atomic_read(&msock->s_count));
|
||||
MARS_LOW("try socket #%d %p s_dead = %d s_count=%d\n", msock->s_debug_nr, msock->s_socket, msock->s_dead, atomic_read(&msock->s_count));
|
||||
if (unlikely(atomic_read(&msock->s_count) <= 0))
|
||||
return false;
|
||||
atomic_inc(&msock->s_count);
|
||||
@ -244,25 +248,32 @@ bool mars_get_socket(struct mars_socket *msock)
|
||||
mars_put_socket(msock);
|
||||
return false;
|
||||
}
|
||||
MARS_IO("got socket #%d\n", msock->s_debug_nr);
|
||||
MARS_LOW("got socket #%d\n", msock->s_debug_nr);
|
||||
return true;
|
||||
}
|
||||
EXPORT_SYMBOL_GPL(mars_get_socket);
|
||||
|
||||
void mars_put_socket(struct mars_socket *msock)
|
||||
{
|
||||
MARS_IO("try socket #%d %p s_dead = %d s_count=%d\n", msock->s_debug_nr, msock->s_socket, msock->s_dead, atomic_read(&msock->s_count));
|
||||
MARS_LOW("try socket #%d %p s_dead = %d s_count=%d\n", msock->s_debug_nr, msock->s_socket, msock->s_dead, atomic_read(&msock->s_count));
|
||||
if (unlikely(atomic_read(&msock->s_count) <= 0)) {
|
||||
MARS_ERR("bad nesting on msock = %p\n", msock);
|
||||
} else if (atomic_dec_and_test(&msock->s_count)) {
|
||||
struct socket *sock = msock->s_socket;
|
||||
int i;
|
||||
|
||||
MARS_DBG("closing socket #%d %p\n", msock->s_debug_nr, sock);
|
||||
if (likely(sock)) {
|
||||
kernel_sock_shutdown(sock, SHUT_WR);
|
||||
sock_release(sock);
|
||||
}
|
||||
for (i = 0; i < MAX_DESC_CACHE; i++) {
|
||||
if (msock->s_desc_send[i])
|
||||
brick_block_free(msock->s_desc_send[i], PAGE_SIZE);
|
||||
if (msock->s_desc_recv[i])
|
||||
brick_block_free(msock->s_desc_recv[i], PAGE_SIZE);
|
||||
}
|
||||
brick_block_free(msock->s_buffer, PAGE_SIZE);
|
||||
msock->s_buffer = NULL;
|
||||
memset(msock, 0, sizeof(struct mars_socket));
|
||||
}
|
||||
}
|
||||
@ -298,13 +309,13 @@ bool mars_socket_is_alive(struct mars_socket *msock)
|
||||
goto done;
|
||||
res = true;
|
||||
done:
|
||||
MARS_IO("#%d %p s_count = %d s_dead = %d\n", msock->s_debug_nr, msock->s_socket, atomic_read(&msock->s_count), msock->s_dead);
|
||||
MARS_LOW("#%d %p s_count = %d s_dead = %d\n", msock->s_debug_nr, msock->s_socket, atomic_read(&msock->s_count), msock->s_dead);
|
||||
return res;
|
||||
}
|
||||
EXPORT_SYMBOL_GPL(mars_socket_is_alive);
|
||||
|
||||
static
|
||||
int _mars_send_raw(struct mars_socket *msock, void *buf, int len)
|
||||
int _mars_send_raw(struct mars_socket *msock, const void *buf, int len)
|
||||
{
|
||||
int sleeptime = 1000 / HZ;
|
||||
int sent = 0;
|
||||
@ -342,7 +353,7 @@ int _mars_send_raw(struct mars_socket *msock, void *buf, int len)
|
||||
#else // spare code, activate in case of problems with sendpage()
|
||||
{
|
||||
struct kvec iov = {
|
||||
.iov_base = buf,
|
||||
.iov_base = (void*)buf,
|
||||
.iov_len = this_len,
|
||||
};
|
||||
struct msghdr msg = {
|
||||
@ -390,7 +401,7 @@ int _mars_send_raw(struct mars_socket *msock, void *buf, int len)
|
||||
return status;
|
||||
}
|
||||
|
||||
int mars_send_raw(struct mars_socket *msock, void *buf, int len, bool cork)
|
||||
int mars_send_raw(struct mars_socket *msock, const void *buf, int len, bool cork)
|
||||
{
|
||||
#ifdef USE_BUFFERING
|
||||
int sent = 0;
|
||||
@ -401,6 +412,8 @@ int mars_send_raw(struct mars_socket *msock, void *buf, int len, bool cork)
|
||||
if (!mars_get_socket(msock))
|
||||
goto final;
|
||||
|
||||
MARS_IO("#%d cork=%d sending len=%d bytes\n", msock->s_debug_nr, cork, len);
|
||||
|
||||
#ifdef USE_BUFFERING
|
||||
restart:
|
||||
while (!msock->s_buffer) {
|
||||
@ -422,6 +435,7 @@ restart:
|
||||
|
||||
if (msock->s_pos > 0) {
|
||||
status = _mars_send_raw(msock, msock->s_buffer, msock->s_pos);
|
||||
MARS_IO("#%d buffer send %d bytes status=%d\n", msock->s_debug_nr, msock->s_pos, status);
|
||||
if (status < 0)
|
||||
goto done;
|
||||
|
||||
@ -431,7 +445,9 @@ restart:
|
||||
}
|
||||
|
||||
if (rest >= PAGE_SIZE) {
|
||||
return _mars_send_raw(msock, buf, rest);
|
||||
status = _mars_send_raw(msock, buf, rest);
|
||||
MARS_IO("#%d bulk send %d bytes status=%d\n", msock->s_debug_nr, rest, status);
|
||||
goto done;
|
||||
} else if (rest > 0) {
|
||||
goto restart;
|
||||
}
|
||||
@ -451,12 +467,19 @@ final:
|
||||
}
|
||||
EXPORT_SYMBOL_GPL(mars_send_raw);
|
||||
|
||||
/* Note: buf may be NULL. In this case, the data is simply consumed,
|
||||
* like /dev/null
|
||||
*/
|
||||
int mars_recv_raw(struct mars_socket *msock, void *buf, int minlen, int maxlen)
|
||||
{
|
||||
void *dummy = NULL;
|
||||
int sleeptime = 1000 / HZ;
|
||||
int status = -EIDRM;
|
||||
int done = 0;
|
||||
|
||||
if (!buf) {
|
||||
buf = dummy = brick_block_alloc(0, maxlen);
|
||||
}
|
||||
if (!buf) {
|
||||
MARS_WRN("#%d bad receive buffer\n", msock->s_debug_nr);
|
||||
return -EINVAL;
|
||||
@ -465,6 +488,8 @@ int mars_recv_raw(struct mars_socket *msock, void *buf, int minlen, int maxlen)
|
||||
if (!mars_get_socket(msock))
|
||||
goto final;
|
||||
|
||||
MARS_IO("#%d receiving len=%d/%d bytes\n", msock->s_debug_nr, minlen, maxlen);
|
||||
|
||||
while (done < minlen) {
|
||||
struct kvec iov = {
|
||||
.iov_base = buf + done,
|
||||
@ -493,11 +518,11 @@ int mars_recv_raw(struct mars_socket *msock, void *buf, int minlen, int maxlen)
|
||||
goto err;
|
||||
}
|
||||
|
||||
MARS_IO("#%d done %d, fetching %d bytes\n", msock->s_debug_nr, done, maxlen-done);
|
||||
MARS_LOW("#%d done %d, fetching %d bytes\n", msock->s_debug_nr, done, maxlen-done);
|
||||
|
||||
status = kernel_recvmsg(msock->s_socket, &msg, &iov, 1, maxlen-done, msg.msg_flags);
|
||||
|
||||
MARS_IO("#%d status = %d\n", msock->s_debug_nr, status);
|
||||
MARS_LOW("#%d status = %d\n", msock->s_debug_nr, status);
|
||||
|
||||
if (!mars_net_is_alive || brick_thread_should_stop()) {
|
||||
MARS_WRN("#%d interrupting, done = %d\n", msock->s_debug_nr, done);
|
||||
@ -528,11 +553,15 @@ int mars_recv_raw(struct mars_socket *msock, void *buf, int minlen, int maxlen)
|
||||
}
|
||||
status = done;
|
||||
|
||||
MARS_IO("#%d got %d bytes\n", msock->s_debug_nr, done);
|
||||
|
||||
err:
|
||||
if (status < 0 && msock->s_shutdown_on_err)
|
||||
mars_shutdown_socket(msock);
|
||||
mars_put_socket(msock);
|
||||
final:
|
||||
if (dummy)
|
||||
brick_block_free(dummy, maxlen);
|
||||
return status;
|
||||
}
|
||||
EXPORT_SYMBOL_GPL(mars_recv_raw);
|
||||
@ -542,272 +571,405 @@ EXPORT_SYMBOL_GPL(mars_recv_raw);
|
||||
/* Mid-level field data exchange
|
||||
*/
|
||||
|
||||
/* TODO: make this bytesex-aware
|
||||
*/
|
||||
#define MARS_NET_MAGIC 0x63f0A2ec6148f48dll
|
||||
#define MAX_FIELD_LEN 32
|
||||
static
|
||||
int _add_fields(struct mars_desc_item *mi, const struct meta *meta, int offset, const char *prefix, int maxlen)
|
||||
{
|
||||
int count = 0;
|
||||
for (; meta->field_name != NULL; meta++) {
|
||||
const char *new_prefix;
|
||||
int new_offset;
|
||||
int len;
|
||||
|
||||
new_prefix = mi->field_name;
|
||||
new_offset = offset + meta->field_offset;
|
||||
|
||||
struct mars_net_header {
|
||||
MARS_IO("input field_name='%s' field_type=%d\n", meta->field_name, meta->field_type);
|
||||
|
||||
if (unlikely(maxlen < sizeof(struct mars_desc_item))) {
|
||||
MARS_ERR("desc cache item overflow\n");
|
||||
count = -1;
|
||||
goto done;
|
||||
}
|
||||
|
||||
len = snprintf(mi->field_name, MAX_FIELD_LEN, "%s.%s", prefix, meta->field_name);
|
||||
if (unlikely(len >= MAX_FIELD_LEN)) {
|
||||
MARS_ERR("field len overflow on '%s.%s'\n", prefix, meta->field_name);
|
||||
count = -1;
|
||||
goto done;
|
||||
}
|
||||
mi->field_type = meta->field_type;
|
||||
mi->field_size = meta->field_size;
|
||||
mi->field_sender_offset = new_offset;
|
||||
mi->field_recver_offset = -1;
|
||||
|
||||
MARS_IO("output field_name='%s' field_type=%d\n", mi->field_name, mi->field_type);
|
||||
|
||||
mi++;
|
||||
maxlen -= sizeof(struct mars_desc_item);
|
||||
count++;
|
||||
|
||||
if (meta->field_type == FIELD_SUB) {
|
||||
int sub_count;
|
||||
sub_count = _add_fields(mi, meta->field_ref, new_offset, new_prefix, maxlen);
|
||||
if (sub_count < 0)
|
||||
return sub_count;
|
||||
|
||||
mi += sub_count;
|
||||
count += sub_count;
|
||||
maxlen -= sub_count * sizeof(struct mars_desc_item);
|
||||
}
|
||||
}
|
||||
done:
|
||||
MARS_IO("count=%d\n", count);
|
||||
return count;
|
||||
}
|
||||
|
||||
static
|
||||
struct mars_desc_cache *make_sender_cache(struct mars_socket *msock, const struct meta *meta, int *cache_index)
|
||||
{
|
||||
int orig_len = PAGE_SIZE;
|
||||
int maxlen = orig_len;
|
||||
struct mars_desc_cache *mc;
|
||||
struct mars_desc_item *mi;
|
||||
int i;
|
||||
int status;
|
||||
|
||||
for (i = 0; i < MAX_DESC_CACHE; i++) {
|
||||
mc = msock->s_desc_send[i];
|
||||
if (!mc)
|
||||
break;
|
||||
if (mc->cache_sender_cookie == (u64)meta)
|
||||
goto done;
|
||||
}
|
||||
|
||||
MARS_IO("#%d meta=%p i=%d\n", msock->s_debug_nr, meta, i);
|
||||
|
||||
if (unlikely(i >= MAX_DESC_CACHE - 1)) {
|
||||
MARS_ERR("#%d desc cache overflow\n", msock->s_debug_nr);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
mc = brick_block_alloc(0, maxlen);
|
||||
if (unlikely(!mc)) {
|
||||
MARS_ERR("#%d desc cache alloc error\n", msock->s_debug_nr);
|
||||
goto done;
|
||||
}
|
||||
|
||||
memset(mc, 0, maxlen);
|
||||
mc->cache_sender_cookie = (u64)meta;
|
||||
|
||||
maxlen -= sizeof(struct mars_desc_cache);
|
||||
mi = (void*)(mc + 1);
|
||||
|
||||
status = _add_fields(mi, meta, 0, "", maxlen);
|
||||
|
||||
if (likely(status > 0)) {
|
||||
mc->cache_items = status;
|
||||
msock->s_desc_send[i] = mc;
|
||||
*cache_index = i;
|
||||
} else {
|
||||
brick_block_free(mc, orig_len);
|
||||
mc = NULL;
|
||||
}
|
||||
|
||||
done:
|
||||
return mc;
|
||||
}
|
||||
|
||||
static
|
||||
void _make_recver_cache(struct mars_desc_cache *mc, const struct meta *meta, int offset, const char *prefix)
|
||||
{
|
||||
char *tmp = brick_string_alloc(MAX_FIELD_LEN);
|
||||
int i;
|
||||
|
||||
for (; meta->field_name != NULL; meta++) {
|
||||
snprintf(tmp, MAX_FIELD_LEN, "%s.%s", prefix, meta->field_name);
|
||||
for (i = 0; i < mc->cache_items; i++) {
|
||||
struct mars_desc_item *mi = ((struct mars_desc_item*)(mc + 1)) + i;
|
||||
if (meta->field_type == mi->field_type &&
|
||||
!strcmp(tmp, mi->field_name)) {
|
||||
mi->field_recver_offset = offset + meta->field_offset;
|
||||
if (meta->field_type == FIELD_SUB) {
|
||||
_make_recver_cache(mc, meta->field_ref, mi->field_recver_offset, tmp);
|
||||
}
|
||||
goto found;
|
||||
}
|
||||
}
|
||||
MARS_WRN("field '%s' is missing\n", meta->field_name);
|
||||
found:;
|
||||
}
|
||||
brick_string_free(tmp);
|
||||
}
|
||||
|
||||
static
|
||||
void make_recver_cache(struct mars_desc_cache *mc, const struct meta *meta)
|
||||
{
|
||||
int i;
|
||||
|
||||
_make_recver_cache(mc, meta, 0, "");
|
||||
|
||||
for (i = 0; i < mc->cache_items; i++) {
|
||||
struct mars_desc_item *mi = ((struct mars_desc_item*)(mc + 1)) + i;
|
||||
if (unlikely(mi->field_recver_offset < 0)) {
|
||||
MARS_WRN("field '%s' is not transferred\n", mi->field_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static
|
||||
int _desc_send_item(struct mars_socket *msock, const void *data, const struct mars_desc_cache *mc, int index, bool cork)
|
||||
{
|
||||
struct mars_desc_item *mi = ((struct mars_desc_item*)(mc + 1)) + index;
|
||||
const void *item = data + mi->field_sender_offset;
|
||||
int len = mi->field_size;
|
||||
int status;
|
||||
int res = -1;
|
||||
|
||||
MARS_IO("#%d cork=%d mc=%p field_name='%s' field_type=%d\n", msock->s_debug_nr, cork, mc, mi->field_name, mi->field_type);
|
||||
|
||||
switch (mi->field_type) {
|
||||
case FIELD_REF:
|
||||
MARS_ERR("NYI\n");
|
||||
goto done;
|
||||
case FIELD_SUB:
|
||||
/* skip this */
|
||||
res = 0;
|
||||
break;
|
||||
case FIELD_STRING:
|
||||
item = *(void**)item;
|
||||
len = 0;
|
||||
if (item)
|
||||
len = strlen(item) + 1;
|
||||
|
||||
status = mars_send_raw(msock, &len, sizeof(len), cork || len > 0);
|
||||
if (unlikely(status < 0))
|
||||
goto done;
|
||||
/* fallthrough */
|
||||
default:
|
||||
if (likely(len > 0)) {
|
||||
status = mars_send_raw(msock, item, len, cork);
|
||||
if (unlikely(status < 0))
|
||||
goto done;
|
||||
}
|
||||
res = len;
|
||||
}
|
||||
done:
|
||||
return res;
|
||||
}
|
||||
|
||||
static
|
||||
int _desc_recv_item(struct mars_socket *msock, void *data, const struct mars_desc_cache *mc, int index, int line)
|
||||
{
|
||||
struct mars_desc_item *mi = ((struct mars_desc_item*)(mc + 1)) + index;
|
||||
void *item = NULL;
|
||||
int len = mi->field_size;
|
||||
int status;
|
||||
int res = -1;
|
||||
|
||||
if (likely(data && mi->field_recver_offset >= 0)) {
|
||||
item = data + mi->field_recver_offset;
|
||||
}
|
||||
|
||||
switch (mi->field_type) {
|
||||
case FIELD_REF:
|
||||
MARS_ERR("NYI\n");
|
||||
goto done;
|
||||
case FIELD_SUB:
|
||||
/* skip this */
|
||||
res = 0;
|
||||
break;
|
||||
case FIELD_STRING:
|
||||
len = 0;
|
||||
status = mars_recv_raw(msock, &len, sizeof(len), sizeof(len));
|
||||
if (unlikely(status < 0))
|
||||
goto done;
|
||||
|
||||
if (len > 0 && item) {
|
||||
char *str = _brick_string_alloc(len, line);
|
||||
if (unlikely(!str)) {
|
||||
MARS_ERR("#%d string alloc error\n", msock->s_debug_nr);
|
||||
goto done;
|
||||
}
|
||||
*(void**)item = str;
|
||||
item = str;
|
||||
}
|
||||
|
||||
/* fallthrough */
|
||||
default:
|
||||
if (likely(len > 0)) {
|
||||
status = mars_recv_raw(msock, item, len, len);
|
||||
if (unlikely(status < 0))
|
||||
goto done;
|
||||
}
|
||||
res = len;
|
||||
}
|
||||
done:
|
||||
return res;
|
||||
}
|
||||
|
||||
#define MARS_DESC_MAGIC 0x73f0A2ec6148f48dll
|
||||
|
||||
struct mars_desc_header {
|
||||
u64 h_magic;
|
||||
u16 h_seq;
|
||||
u16 h_len;
|
||||
char h_name[MAX_FIELD_LEN];
|
||||
u64 h_cookie;
|
||||
s16 h_meta_len;
|
||||
s16 h_index;
|
||||
};
|
||||
|
||||
int _mars_send_struct(struct mars_socket *msock, void *data, const struct meta *meta, int *seq, bool cork)
|
||||
static inline
|
||||
int _desc_send_struct(struct mars_socket *msock, int cache_index, const void *data, int h_meta_len, bool cork)
|
||||
{
|
||||
const struct mars_desc_cache *mc = msock->s_desc_send[cache_index];
|
||||
struct mars_desc_header header = {
|
||||
.h_magic = MARS_DESC_MAGIC,
|
||||
.h_cookie = mc->cache_sender_cookie,
|
||||
.h_meta_len = h_meta_len,
|
||||
.h_index = data ? cache_index : -1,
|
||||
};
|
||||
int index;
|
||||
int count = 0;
|
||||
int status = 0;
|
||||
|
||||
if (!data) { // directly send EOR
|
||||
goto done;
|
||||
MARS_IO("#%d cork=%d mc=%p h_meta_len=%d\n", msock->s_debug_nr, cork, mc, h_meta_len);
|
||||
|
||||
status = mars_send_raw(msock, &header, sizeof(header), cork || data);
|
||||
if (unlikely(status < 0))
|
||||
goto err;
|
||||
|
||||
if (unlikely(h_meta_len > 0)) {
|
||||
status = mars_send_raw(msock, mc, h_meta_len, true);
|
||||
MARS_IO("#%d sent mc=%p h_meta_len=%d status=%d\n", msock->s_debug_nr, mc, h_meta_len, status);
|
||||
if (unlikely(status < 0))
|
||||
goto err;
|
||||
}
|
||||
for (; meta->field_name != NULL; meta++) {
|
||||
struct mars_net_header header = {
|
||||
.h_magic = MARS_NET_MAGIC,
|
||||
.h_seq = ++(*seq),
|
||||
};
|
||||
void *item = data + meta->field_offset;
|
||||
int len = meta->field_size;
|
||||
#if 1
|
||||
if (len > 16 * PAGE_SIZE) {
|
||||
MARS_WRN("#%d implausible len=%d, \n", msock->s_debug_nr, len);
|
||||
brick_msleep(30000);
|
||||
status = -EINVAL;
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
|
||||
/* Automatically keep the lamport clock correct.
|
||||
*/
|
||||
mars_check_meta(meta, data);
|
||||
|
||||
status = 0;
|
||||
switch (meta->field_type) {
|
||||
case FIELD_STRING:
|
||||
item = *(void**)item;
|
||||
len = 0;
|
||||
if (item)
|
||||
len = strlen(item) + 1;
|
||||
break;
|
||||
case FIELD_REF:
|
||||
if (!meta->field_ref) {
|
||||
MARS_WRN("#%d improper FIELD_REF definition\n", msock->s_debug_nr);
|
||||
status = -EINVAL;
|
||||
break;
|
||||
}
|
||||
item = *(void**)item;
|
||||
len = meta->field_ref->field_size;
|
||||
if (!item)
|
||||
len = 0;
|
||||
break;
|
||||
case FIELD_DONE:
|
||||
len = 0;
|
||||
case FIELD_SUB:
|
||||
case FIELD_RAW:
|
||||
case FIELD_INT:
|
||||
case FIELD_UINT:
|
||||
// all ok
|
||||
break;
|
||||
default:
|
||||
MARS_WRN("#%d invalid field type %d\n", msock->s_debug_nr, meta->field_type);
|
||||
status = -EINVAL;
|
||||
break;
|
||||
}
|
||||
if (status < 0)
|
||||
break;
|
||||
|
||||
header.h_len = len;
|
||||
strncpy(header.h_name, meta->field_name, MAX_FIELD_LEN);
|
||||
header.h_name[MAX_FIELD_LEN-1] = '\0';
|
||||
|
||||
MARS_IO("#%d sending header %d '%s' len = %d\n", msock->s_debug_nr, header.h_seq, header.h_name, len);
|
||||
status = mars_send_raw(msock, &header, sizeof(header), true);
|
||||
if (status < 0)
|
||||
break;
|
||||
|
||||
switch (meta->field_type) {
|
||||
case FIELD_REF:
|
||||
case FIELD_SUB:
|
||||
if (len > 0) {
|
||||
status = _mars_send_struct(msock, item, meta->field_ref, seq, true);
|
||||
if (status > 0)
|
||||
count += status;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
if (len > 0) {
|
||||
MARS_IO("#%d sending extra %d\n", msock->s_debug_nr, len);
|
||||
status = mars_send_raw(msock, item, len, true);
|
||||
if (status > 0)
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
if (status < 0) {
|
||||
break;
|
||||
if (likely(data)) {
|
||||
for (index = 0; index < mc->cache_items; index++) {
|
||||
status = _desc_send_item(msock, data, mc, index, cork || index < mc->cache_items-1);
|
||||
if (unlikely(status < 0))
|
||||
goto err;
|
||||
count++;
|
||||
}
|
||||
}
|
||||
done:
|
||||
if (status >= 0) { // send EOR
|
||||
struct mars_net_header header = {
|
||||
.h_magic = MARS_NET_MAGIC,
|
||||
.h_seq = ++(*seq),
|
||||
// .h_name is left empty
|
||||
};
|
||||
status = mars_send_raw(msock, &header, sizeof(header), cork);
|
||||
}
|
||||
|
||||
if (status >= 0)
|
||||
status = count;
|
||||
err:
|
||||
return status;
|
||||
}
|
||||
|
||||
int mars_send_struct(struct mars_socket *msock, void *data, const struct meta *meta)
|
||||
static
|
||||
int desc_send_struct(struct mars_socket *msock, const void *data, const struct meta *meta, bool cork)
|
||||
{
|
||||
int seq = 0;
|
||||
return _mars_send_struct(msock, data, meta, &seq, false);
|
||||
struct mars_desc_cache *mc;
|
||||
int i;
|
||||
int h_meta_len = 0;
|
||||
int status = -EINVAL;
|
||||
|
||||
for (i = 0; i < MAX_DESC_CACHE; i++) {
|
||||
mc = msock->s_desc_send[i];
|
||||
if (!mc)
|
||||
break;
|
||||
if (mc->cache_sender_cookie == (u64)meta)
|
||||
goto found;
|
||||
}
|
||||
|
||||
mc = make_sender_cache(msock, meta, &i);
|
||||
if (unlikely(!mc))
|
||||
goto done;
|
||||
|
||||
h_meta_len = mc->cache_items * sizeof(struct mars_desc_item) + sizeof(struct mars_desc_cache);
|
||||
|
||||
found:
|
||||
status = _desc_send_struct(msock, i, data, h_meta_len, cork);
|
||||
|
||||
done:
|
||||
return status;
|
||||
}
|
||||
|
||||
static
|
||||
int desc_recv_struct(struct mars_socket *msock, void *data, const struct meta *meta, int line)
|
||||
{
|
||||
struct mars_desc_header header = {};
|
||||
struct mars_desc_cache *mc;
|
||||
int cache_index;
|
||||
int index;
|
||||
int count = 0;
|
||||
int status = 0;
|
||||
|
||||
status = mars_recv_raw(msock, &header, sizeof(header), sizeof(header));
|
||||
if (unlikely(status < 0))
|
||||
goto err;
|
||||
|
||||
if (unlikely(header.h_magic != MARS_DESC_MAGIC)) {
|
||||
MARS_WRN("#%d called from line %d bad packet header magic = %llx\n", msock->s_debug_nr, line, header.h_magic);
|
||||
status = -ENOMSG;
|
||||
goto err;
|
||||
}
|
||||
|
||||
cache_index = header.h_index;
|
||||
if (cache_index < 0) { // EOR
|
||||
goto done;
|
||||
}
|
||||
if (unlikely(cache_index >= MAX_DESC_CACHE - 1)) {
|
||||
MARS_WRN("#%d called from line %d bad cache index %d\n", msock->s_debug_nr, line, cache_index);
|
||||
status = -EBADF;
|
||||
goto err;
|
||||
}
|
||||
|
||||
mc = msock->s_desc_recv[cache_index];
|
||||
if (unlikely(!mc)) {
|
||||
if (unlikely(header.h_meta_len <= 0)) {
|
||||
MARS_WRN("#%d called from line %d missing meta information\n", msock->s_debug_nr, line);
|
||||
status = -ENOMSG;
|
||||
goto err;
|
||||
}
|
||||
|
||||
mc = _brick_block_alloc(0, PAGE_SIZE, line);
|
||||
if (unlikely(!mc)) {
|
||||
MARS_WRN("#%d called from line %d out of memory\n", msock->s_debug_nr, line);
|
||||
status = -ENOMEM;
|
||||
goto err;
|
||||
}
|
||||
|
||||
status = mars_recv_raw(msock, mc, header.h_meta_len, header.h_meta_len);
|
||||
MARS_IO("#%d got mc=%p h_meta_len=%d status=%d\n", msock->s_debug_nr, mc, header.h_meta_len, status);
|
||||
if (unlikely(status < 0)) {
|
||||
brick_block_free(mc, PAGE_SIZE);
|
||||
goto err;
|
||||
}
|
||||
|
||||
make_recver_cache(mc, meta);
|
||||
msock->s_desc_recv[cache_index] = mc;
|
||||
} else if (unlikely(header.h_meta_len > 0)) {
|
||||
MARS_WRN("#%d called from line %d has %d unexpected meta bytes\n", msock->s_debug_nr, line, header.h_meta_len);
|
||||
}
|
||||
|
||||
for (index = 0; index < mc->cache_items; index++) {
|
||||
status = _desc_recv_item(msock, data, mc, index, line);
|
||||
if (unlikely(status < 0))
|
||||
goto err;
|
||||
count++;
|
||||
}
|
||||
|
||||
done:
|
||||
if (status >= 0)
|
||||
status = count;
|
||||
err:
|
||||
return status;
|
||||
}
|
||||
|
||||
int mars_send_struct(struct mars_socket *msock, const void *data, const struct meta *meta)
|
||||
{
|
||||
MARS_IO("#%d meta=%p\n", msock->s_debug_nr, meta);
|
||||
return desc_send_struct(msock, data, meta, false);
|
||||
}
|
||||
EXPORT_SYMBOL_GPL(mars_send_struct);
|
||||
|
||||
int _mars_recv_struct(struct mars_socket *msock, void *data, const struct meta *meta, int *seq, int line)
|
||||
int _mars_recv_struct(struct mars_socket *msock, void *data, const struct meta *meta, int line)
|
||||
{
|
||||
int count = 0;
|
||||
int status = -EINVAL;
|
||||
|
||||
MARS_IO("#%d called from line %d\n", msock->s_debug_nr, line);
|
||||
if (!data) {
|
||||
goto done;
|
||||
}
|
||||
for (;;) {
|
||||
struct mars_net_header header = {};
|
||||
const struct meta *tmp;
|
||||
void *item;
|
||||
void *mem;
|
||||
status = mars_recv_raw(msock, &header, sizeof(header), sizeof(header));
|
||||
if (status == -EAGAIN) {
|
||||
brick_msleep(50);
|
||||
continue;
|
||||
}
|
||||
if (status < 0) {
|
||||
MARS_WRN("#%d called from line %d status = %d\n", msock->s_debug_nr, line, status);
|
||||
break;
|
||||
}
|
||||
MARS_IO("#%d called from line %d got header %d '%s' len = %d\n", msock->s_debug_nr, line, header.h_seq, header.h_name, header.h_len);
|
||||
if (status != sizeof(header)) {
|
||||
MARS_WRN("#%d called from line %d bad header len = %d (required=%d)\n", msock->s_debug_nr, line, status, (int)sizeof(header));
|
||||
break;
|
||||
}
|
||||
if (header.h_magic != MARS_NET_MAGIC) {
|
||||
MARS_WRN("#%d called from line %d bad packet header magic = %llx\n", msock->s_debug_nr, line, header.h_magic);
|
||||
status = -ENOMSG;
|
||||
break;
|
||||
}
|
||||
if (!header.h_name[0]) { // got EOR
|
||||
status = 0;
|
||||
break;
|
||||
};
|
||||
if (header.h_seq <= *seq) {
|
||||
MARS_WRN("#%d called from line %d unexpected packet data, seq=%d (expected=%d)\n", msock->s_debug_nr, line, header.h_seq, (*seq) + 1);
|
||||
status = -ENOMSG;
|
||||
break;
|
||||
}
|
||||
*seq = header.h_seq;
|
||||
|
||||
if (!header.h_name[0]) { // end of record (EOR)
|
||||
status = 0;
|
||||
break;
|
||||
}
|
||||
|
||||
tmp = find_meta(meta, header.h_name);
|
||||
if (unlikely(!tmp)) {
|
||||
MARS_WRN("#%d called from line %d unknown field '%s'\n", msock->s_debug_nr, line, header.h_name);
|
||||
if (header.h_len > 0) { // try to continue by skipping the rest of data
|
||||
void *dummy = brick_mem_alloc(header.h_len);
|
||||
status = -ENOMEM;
|
||||
if (!dummy)
|
||||
break;
|
||||
status = mars_recv_raw(msock, dummy, header.h_len, header.h_len);
|
||||
brick_mem_free(dummy);
|
||||
if (status < 0)
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
status = 0;
|
||||
item = data + tmp->field_offset;
|
||||
switch (tmp->field_type) {
|
||||
case FIELD_REF:
|
||||
case FIELD_STRING:
|
||||
if (header.h_len <= 0) {
|
||||
mem = NULL;
|
||||
} else {
|
||||
if (tmp->field_type == FIELD_STRING) {
|
||||
mem = _brick_string_alloc(header.h_len + 1, line);
|
||||
} else {
|
||||
mem = brick_zmem_alloc(header.h_len + 1);
|
||||
}
|
||||
if (!mem) {
|
||||
status = -ENOMEM;
|
||||
goto done;
|
||||
}
|
||||
}
|
||||
*(void**)item = mem;
|
||||
item = mem;
|
||||
break;
|
||||
}
|
||||
|
||||
switch (tmp->field_type) {
|
||||
case FIELD_REF:
|
||||
case FIELD_SUB:
|
||||
if (!item) {
|
||||
MARS_WRN("#%d called from line %d bad item\n", msock->s_debug_nr, line);
|
||||
status = -EINVAL;
|
||||
break;
|
||||
}
|
||||
|
||||
if (header.h_len > 0) {
|
||||
MARS_IO("#%d called from line %d starting recursive structure\n", msock->s_debug_nr, line);
|
||||
status = _mars_recv_struct(msock, item, tmp->field_ref, seq, line);
|
||||
MARS_IO("#%d called from line %d ending recursive structure, status = %d\n", msock->s_debug_nr, line, status);
|
||||
|
||||
if (status > 0)
|
||||
count += status;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
if (header.h_len > 0) {
|
||||
if (!item) {
|
||||
MARS_WRN("#%d called from line %d bad item\n", msock->s_debug_nr, line);
|
||||
status = -EINVAL;
|
||||
break;
|
||||
}
|
||||
MARS_IO("#%d called from line %d reading extra %d\n", msock->s_debug_nr, line, header.h_len);
|
||||
status = mars_recv_raw(msock, item, header.h_len, header.h_len);
|
||||
while (status == -EAGAIN) {
|
||||
brick_msleep(50);
|
||||
status = mars_recv_raw(msock, item, header.h_len, header.h_len);
|
||||
}
|
||||
if (status >= 0) {
|
||||
//MARS_IO("#%d got data len = %d status = %d\n", msock->s_debug_nr, header.h_len, status);
|
||||
count++;
|
||||
} else {
|
||||
MARS_WRN("#%d called from line %d len = %d, status = %d\n", msock->s_debug_nr, line, header.h_len, status);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (status < 0)
|
||||
break;
|
||||
}
|
||||
done:
|
||||
if (status >= 0) {
|
||||
status = count;
|
||||
mars_check_meta(meta, data);
|
||||
} else {
|
||||
MARS_WRN("#%d called from line %d status = %d\n", msock->s_debug_nr, line, status);
|
||||
}
|
||||
return status;
|
||||
MARS_IO("#%d meta=%p called from line %d\n", msock->s_debug_nr, meta, line);
|
||||
return desc_recv_struct(msock, data, meta, line);
|
||||
}
|
||||
EXPORT_SYMBOL_GPL(_mars_recv_struct);
|
||||
|
||||
@ -825,20 +987,6 @@ const struct meta mars_cmd_meta[] = {
|
||||
};
|
||||
EXPORT_SYMBOL_GPL(mars_cmd_meta);
|
||||
|
||||
static
|
||||
void mars_check_meta(const struct meta *meta, void *data)
|
||||
{
|
||||
/* Automatically keep the lamport clock correct.
|
||||
*/
|
||||
if (meta == mars_cmd_meta) {
|
||||
struct timespec *stamp = &((struct mars_cmd*)data)->cmd_stamp;
|
||||
get_lamport(stamp);
|
||||
} else if (meta == mars_timespec_meta) {
|
||||
set_lamport(data);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
int mars_send_mref(struct mars_socket *msock, struct mref_object *mref)
|
||||
{
|
||||
@ -852,12 +1000,14 @@ int mars_send_mref(struct mars_socket *msock, struct mref_object *mref)
|
||||
if (mref->ref_rw != 0 && mref->ref_data && mref->ref_cs_mode < 2)
|
||||
cmd.cmd_code |= CMD_FLAG_HAS_DATA;
|
||||
|
||||
status = _mars_send_struct(msock, &cmd, mars_cmd_meta, &seq, true);
|
||||
get_lamport(&cmd.cmd_stamp);
|
||||
|
||||
status = desc_send_struct(msock, &cmd, mars_cmd_meta, true);
|
||||
if (status < 0)
|
||||
goto done;
|
||||
|
||||
seq = 0;
|
||||
status = _mars_send_struct(msock, mref, mars_mref_meta, &seq, cmd.cmd_code & CMD_FLAG_HAS_DATA);
|
||||
status = desc_send_struct(msock, mref, mars_mref_meta, cmd.cmd_code & CMD_FLAG_HAS_DATA);
|
||||
if (status < 0)
|
||||
goto done;
|
||||
|
||||
@ -871,13 +1021,14 @@ EXPORT_SYMBOL_GPL(mars_send_mref);
|
||||
|
||||
int mars_recv_mref(struct mars_socket *msock, struct mref_object *mref, struct mars_cmd *cmd)
|
||||
{
|
||||
int seq = 0;
|
||||
int status;
|
||||
|
||||
status = _mars_recv_struct(msock, mref, mars_mref_meta, &seq, __LINE__);
|
||||
status = desc_recv_struct(msock, mref, mars_mref_meta, __LINE__);
|
||||
if (status < 0)
|
||||
goto done;
|
||||
|
||||
set_lamport(&cmd->cmd_stamp);
|
||||
|
||||
if (cmd->cmd_code & CMD_FLAG_HAS_DATA) {
|
||||
if (!mref->ref_data)
|
||||
mref->ref_data = brick_zmem_alloc(mref->ref_len);
|
||||
@ -906,12 +1057,14 @@ int mars_send_cb(struct mars_socket *msock, struct mref_object *mref)
|
||||
if (mref->ref_rw == 0 && mref->ref_data && mref->ref_cs_mode < 2)
|
||||
cmd.cmd_code |= CMD_FLAG_HAS_DATA;
|
||||
|
||||
status = _mars_send_struct(msock, &cmd, mars_cmd_meta, &seq, true);
|
||||
get_lamport(&cmd.cmd_stamp);
|
||||
|
||||
status = desc_send_struct(msock, &cmd, mars_cmd_meta, true);
|
||||
if (status < 0)
|
||||
goto done;
|
||||
|
||||
seq = 0;
|
||||
status = _mars_send_struct(msock, mref, mars_mref_meta, &seq, cmd.cmd_code & CMD_FLAG_HAS_DATA);
|
||||
status = desc_send_struct(msock, mref, mars_mref_meta, cmd.cmd_code & CMD_FLAG_HAS_DATA);
|
||||
if (status < 0)
|
||||
goto done;
|
||||
|
||||
@ -926,13 +1079,14 @@ EXPORT_SYMBOL_GPL(mars_send_cb);
|
||||
|
||||
int mars_recv_cb(struct mars_socket *msock, struct mref_object *mref, struct mars_cmd *cmd)
|
||||
{
|
||||
int seq = 0;
|
||||
int status;
|
||||
|
||||
status = _mars_recv_struct(msock, mref, mars_mref_meta, &seq, __LINE__);
|
||||
status = desc_recv_struct(msock, mref, mars_mref_meta, __LINE__);
|
||||
if (status < 0)
|
||||
goto done;
|
||||
|
||||
set_lamport(&cmd->cmd_stamp);
|
||||
|
||||
if (cmd->cmd_code & CMD_FLAG_HAS_DATA) {
|
||||
if (!mref->ref_data) {
|
||||
MARS_WRN("#%d no internal buffer available\n", msock->s_debug_nr);
|
||||
|
33
mars_net.h
33
mars_net.h
@ -10,13 +10,31 @@
|
||||
|
||||
extern bool mars_net_is_alive;
|
||||
|
||||
#define MAX_FIELD_LEN 32
|
||||
#define MAX_DESC_CACHE 16
|
||||
|
||||
struct mars_desc_cache {
|
||||
u64 cache_sender_cookie;
|
||||
u64 cache_recver_cookie;
|
||||
s32 cache_items;
|
||||
};
|
||||
|
||||
struct mars_desc_item {
|
||||
char field_name[MAX_FIELD_LEN];
|
||||
s32 field_type;
|
||||
s32 field_size;
|
||||
s32 field_sender_offset;
|
||||
s32 field_recver_offset;
|
||||
};
|
||||
|
||||
/* The original struct socket has no refcount. This leads to problems
|
||||
* during long-lasting system calls when racing with socket shutdown.
|
||||
*
|
||||
* The original idea of struct mars_docket was just a small wrapper
|
||||
* The original idea of struct mars_socket was just a small wrapper
|
||||
* adding a refcount and some debugging aid.
|
||||
* Nowadays, some buffering was added in order to take advantage of
|
||||
* Later, some buffering was added in order to take advantage of
|
||||
* kernel_sendpage().
|
||||
* Caching of meta description has also been added.
|
||||
*/
|
||||
struct mars_socket {
|
||||
struct socket *s_socket;
|
||||
@ -26,6 +44,8 @@ struct mars_socket {
|
||||
int s_debug_nr;
|
||||
bool s_dead;
|
||||
bool s_shutdown_on_err;
|
||||
struct mars_desc_cache *s_desc_send[MAX_DESC_CACHE];
|
||||
struct mars_desc_cache *s_desc_recv[MAX_DESC_CACHE];
|
||||
};
|
||||
|
||||
struct mars_tcp_params {
|
||||
@ -78,18 +98,17 @@ extern void mars_put_socket(struct mars_socket *msock);
|
||||
extern void mars_shutdown_socket(struct mars_socket *msock);
|
||||
extern bool mars_socket_is_alive(struct mars_socket *msock);
|
||||
|
||||
extern int mars_send_raw(struct mars_socket *msock, void *buf, int len, bool cork);
|
||||
extern int mars_send_raw(struct mars_socket *msock, const void *buf, int len, bool cork);
|
||||
extern int mars_recv_raw(struct mars_socket *msock, void *buf, int minlen, int maxlen);
|
||||
|
||||
/* Mid-level generic field data exchange
|
||||
*/
|
||||
extern int mars_send_struct(struct mars_socket *msock, void *data, const struct meta *meta);
|
||||
extern int mars_send_struct(struct mars_socket *msock, const void *data, const struct meta *meta);
|
||||
#define mars_recv_struct(_sock_,_data_,_meta_) \
|
||||
({ \
|
||||
int seq = 0; \
|
||||
_mars_recv_struct(_sock_, _data_, _meta_, &seq, __LINE__); \
|
||||
_mars_recv_struct(_sock_, _data_, _meta_, __LINE__); \
|
||||
})
|
||||
extern int _mars_recv_struct(struct mars_socket *msock, void *data, const struct meta *meta, int *seq, int line);
|
||||
extern int _mars_recv_struct(struct mars_socket *msock, void *data, const struct meta *meta, int line);
|
||||
|
||||
/* High-level transport of mars structures
|
||||
*/
|
||||
|
@ -152,6 +152,7 @@ int mars_symlink(const char *oldpath, const char *newpath, const struct timespec
|
||||
memcpy(×[0], &new_stamp, sizeof(struct timespec));
|
||||
memcpy(×[1], &new_stamp, sizeof(struct timespec));
|
||||
status = do_utimes(AT_FDCWD, tmp, times, AT_SYMLINK_NOFOLLOW);
|
||||
set_lamport(&new_stamp);
|
||||
}
|
||||
|
||||
if (status >= 0) {
|
||||
|
Loading…
Reference in New Issue
Block a user