diff --git a/kernel/mars_net.c b/kernel/mars_net.c index 170e4e8c..5378f80a 100644 --- a/kernel/mars_net.c +++ b/kernel/mars_net.c @@ -54,6 +54,9 @@ int mars_net_default_port = CONFIG_MARS_DEFAULT_PORT; EXPORT_SYMBOL_GPL(mars_net_default_port); module_param_named(mars_port, mars_net_default_port, int, 0); +__u32 enabled_net_compressions = 0; +__u32 used_net_compression = 0; + /* TODO: allow binding to specific source addresses instead of catch-all. * TODO: make all the socket options configurable. * TODO: implement signal handling. @@ -1183,6 +1186,8 @@ const struct meta mars_cmd_meta[] = { META_INI_SUB(cmd_stamp, struct mars_cmd, mars_lamport_time_meta), META_INI(cmd_proto, struct mars_cmd, FIELD_INT), META_INI(cmd_code, struct mars_cmd, FIELD_INT), + META_INI(cmd_compr_flags, struct mars_cmd, FIELD_UINT), + META_INI(cmd_compr_len, struct mars_cmd, FIELD_INT), META_INI(cmd_int1, struct mars_cmd, FIELD_INT), META_INI(cmd_str1, struct mars_cmd, FIELD_STRING), {} @@ -1235,12 +1240,35 @@ int _mars_send_mref(struct mars_socket *msock, struct mars_cmd *cmd, bool cork) { + __u32 compr_flags = 0; + void *compr_buf = NULL; + int compr_len = 0; int seq = 0; int status; if (!cork || !msock->s_pos) get_lamport(NULL, &cmd->cmd_stamp); + if ((cmd->cmd_code & CMD_FLAG_HAS_DATA) && + (mref->ref_flags & MREF_COMPRESS_ANY)) { + compr_buf = brick_mem_alloc(mref->ref_len + compress_overhead); + compr_len = mars_compress(mref->ref_data, mref->ref_len, + compr_buf, mref->ref_len, + mref->ref_flags, &compr_flags); + + if (likely(compr_len > 0 && + compr_len < mref->ref_len && + compr_flags)) { + used_net_compression = compr_flags; + cmd->cmd_compr_flags = compr_flags; + cmd->cmd_compr_len = compr_len; + } else { + /* continue with old uncompressed method */ + brick_mem_free(compr_buf); + compr_buf = NULL; + } + } + status = mars_send_cmd(msock, cmd, true); if (status < 0) goto done; @@ -1253,11 +1281,20 @@ int _mars_send_mref(struct mars_socket *msock, goto done; if (cmd->cmd_code & CMD_FLAG_HAS_DATA) { + if (compr_buf) { + MARS_IO("#%d sending compressed len = %d/%d\n", + msock->s_debug_nr, compr_len, mref->ref_len); + status = mars_send_raw(msock, + compr_buf, compr_len, + cork); + goto done; + } MARS_IO("#%d sending blocklen = %d\n", msock->s_debug_nr, mref->ref_len); status = mars_send_raw(msock, mref->ref_data, mref->ref_len, cork); } done: + brick_mem_free(compr_buf); return status; } @@ -1314,6 +1351,7 @@ static void _recv_deprecated(struct mref_object *mref) int mars_recv_mref(struct mars_socket *msock, struct mref_object *mref, struct mars_cmd *cmd) { + void *tmp_buf = NULL; int status; status = desc_recv_struct(msock, mref, mars_mref_meta, __LINE__); @@ -1326,6 +1364,37 @@ int mars_recv_mref(struct mars_socket *msock, struct mref_object *mref, struct m if (cmd->cmd_stamp.tv_sec) set_lamport_nonstrict(&cmd->cmd_stamp); + if (cmd->cmd_compr_flags && cmd->cmd_compr_len) { + void *decompr_buf; + + tmp_buf = brick_mem_alloc(cmd->cmd_compr_len); + + status = mars_recv_raw(msock, + tmp_buf, + cmd->cmd_compr_len, + cmd->cmd_compr_len); + if (status < 0) { + MARS_WRN("#%d compr_len = %d mref_len = %d, status = %d\n", + msock->s_debug_nr, + cmd->cmd_compr_len, mref->ref_len, + status); + goto done; + } + + status = -EBADMSG; + decompr_buf = + mars_decompress(tmp_buf, cmd->cmd_compr_len, + mref->ref_data, mref->ref_len, + cmd->cmd_compr_flags); + if (unlikely(!decompr_buf)) + goto done; + + /* decompression success */ + mref->ref_data = decompr_buf; + status = mref->ref_len; + goto done; + } + if (cmd->cmd_code & CMD_FLAG_HAS_DATA) { if (!mref->ref_data) mref->ref_data = brick_block_alloc(0, mref->ref_len); @@ -1337,7 +1406,9 @@ int mars_recv_mref(struct mars_socket *msock, struct mref_object *mref, struct m if (status < 0) MARS_WRN("#%d mref_len = %d, status = %d\n", msock->s_debug_nr, mref->ref_len, status); } + done: + brick_mem_free(tmp_buf); return status; } EXPORT_SYMBOL_GPL(mars_recv_mref); diff --git a/kernel/mars_net.h b/kernel/mars_net.h index 1a4ce4f2..51306bf2 100644 --- a/kernel/mars_net.h +++ b/kernel/mars_net.h @@ -33,6 +33,9 @@ extern int mars_net_default_port; extern bool mars_net_is_alive; +extern __u32 enabled_net_compressions; +extern __u32 used_net_compression; + #define MARS_PROTO_LEVEL 1 #define MAX_FIELD_LEN 32 @@ -116,6 +119,8 @@ struct mars_cmd { struct lamport_time cmd_stamp; // for automatic lamport clock int cmd_proto; int cmd_code; + __u32 cmd_compr_flags; + int cmd_compr_len; int cmd_int1; //int cmd_int2; //int cmd_int3; diff --git a/kernel/sy_old/mars_main.c b/kernel/sy_old/mars_main.c index 4ace9009..b9bb1990 100644 --- a/kernel/sy_old/mars_main.c +++ b/kernel/sy_old/mars_main.c @@ -2698,6 +2698,7 @@ void _make_alive(void) __make_alivelink("used-log-digest", used_log_digest, true); __make_alivelink("used-net-digest", used_net_digest, true); __make_alivelink("used-log-compression", used_log_compression, true); + __make_alivelink("used-net-compression", used_net_compression, true); } void from_remote_trigger(void) @@ -5660,6 +5661,8 @@ int make_defaults(void *buf, struct mars_dent *dent) disabled_net_digests = tmp; } else if (!strcmp(dent->d_name, "enabled-log-compressions")) { sscanf(dent->new_link, "0x%x", &enabled_log_compressions); + } else if (!strcmp(dent->d_name, "enabled-net-compressions")) { + sscanf(dent->new_link, "0x%x", &enabled_net_compressions); } else { MARS_DBG("unimplemented default '%s'\n", dent->d_name); }