infra: allow transport compression

This commit is contained in:
Thomas Schoebel-Theuer 2019-08-05 09:42:41 +02:00 committed by Thomas Schoebel-Theuer
parent 66d6659462
commit 95883b055c
3 changed files with 79 additions and 0 deletions

View File

@ -54,6 +54,9 @@ int mars_net_default_port = CONFIG_MARS_DEFAULT_PORT;
EXPORT_SYMBOL_GPL(mars_net_default_port); EXPORT_SYMBOL_GPL(mars_net_default_port);
module_param_named(mars_port, mars_net_default_port, int, 0); module_param_named(mars_port, mars_net_default_port, int, 0);
__u32 enabled_net_compressions = 0;
__u32 used_net_compression = 0;
/* TODO: allow binding to specific source addresses instead of catch-all. /* TODO: allow binding to specific source addresses instead of catch-all.
* TODO: make all the socket options configurable. * TODO: make all the socket options configurable.
* TODO: implement signal handling. * TODO: implement signal handling.
@ -1183,6 +1186,8 @@ const struct meta mars_cmd_meta[] = {
META_INI_SUB(cmd_stamp, struct mars_cmd, mars_lamport_time_meta), META_INI_SUB(cmd_stamp, struct mars_cmd, mars_lamport_time_meta),
META_INI(cmd_proto, struct mars_cmd, FIELD_INT), META_INI(cmd_proto, struct mars_cmd, FIELD_INT),
META_INI(cmd_code, struct mars_cmd, FIELD_INT), META_INI(cmd_code, struct mars_cmd, FIELD_INT),
META_INI(cmd_compr_flags, struct mars_cmd, FIELD_UINT),
META_INI(cmd_compr_len, struct mars_cmd, FIELD_INT),
META_INI(cmd_int1, struct mars_cmd, FIELD_INT), META_INI(cmd_int1, struct mars_cmd, FIELD_INT),
META_INI(cmd_str1, struct mars_cmd, FIELD_STRING), META_INI(cmd_str1, struct mars_cmd, FIELD_STRING),
{} {}
@ -1235,12 +1240,35 @@ int _mars_send_mref(struct mars_socket *msock,
struct mars_cmd *cmd, struct mars_cmd *cmd,
bool cork) bool cork)
{ {
__u32 compr_flags = 0;
void *compr_buf = NULL;
int compr_len = 0;
int seq = 0; int seq = 0;
int status; int status;
if (!cork || !msock->s_pos) if (!cork || !msock->s_pos)
get_lamport(NULL, &cmd->cmd_stamp); get_lamport(NULL, &cmd->cmd_stamp);
if ((cmd->cmd_code & CMD_FLAG_HAS_DATA) &&
(mref->ref_flags & MREF_COMPRESS_ANY)) {
compr_buf = brick_mem_alloc(mref->ref_len + compress_overhead);
compr_len = mars_compress(mref->ref_data, mref->ref_len,
compr_buf, mref->ref_len,
mref->ref_flags, &compr_flags);
if (likely(compr_len > 0 &&
compr_len < mref->ref_len &&
compr_flags)) {
used_net_compression = compr_flags;
cmd->cmd_compr_flags = compr_flags;
cmd->cmd_compr_len = compr_len;
} else {
/* continue with old uncompressed method */
brick_mem_free(compr_buf);
compr_buf = NULL;
}
}
status = mars_send_cmd(msock, cmd, true); status = mars_send_cmd(msock, cmd, true);
if (status < 0) if (status < 0)
goto done; goto done;
@ -1253,11 +1281,20 @@ int _mars_send_mref(struct mars_socket *msock,
goto done; goto done;
if (cmd->cmd_code & CMD_FLAG_HAS_DATA) { if (cmd->cmd_code & CMD_FLAG_HAS_DATA) {
if (compr_buf) {
MARS_IO("#%d sending compressed len = %d/%d\n",
msock->s_debug_nr, compr_len, mref->ref_len);
status = mars_send_raw(msock,
compr_buf, compr_len,
cork);
goto done;
}
MARS_IO("#%d sending blocklen = %d\n", MARS_IO("#%d sending blocklen = %d\n",
msock->s_debug_nr, mref->ref_len); msock->s_debug_nr, mref->ref_len);
status = mars_send_raw(msock, mref->ref_data, mref->ref_len, cork); status = mars_send_raw(msock, mref->ref_data, mref->ref_len, cork);
} }
done: done:
brick_mem_free(compr_buf);
return status; return status;
} }
@ -1314,6 +1351,7 @@ static void _recv_deprecated(struct mref_object *mref)
int mars_recv_mref(struct mars_socket *msock, struct mref_object *mref, struct mars_cmd *cmd) int mars_recv_mref(struct mars_socket *msock, struct mref_object *mref, struct mars_cmd *cmd)
{ {
void *tmp_buf = NULL;
int status; int status;
status = desc_recv_struct(msock, mref, mars_mref_meta, __LINE__); status = desc_recv_struct(msock, mref, mars_mref_meta, __LINE__);
@ -1326,6 +1364,37 @@ int mars_recv_mref(struct mars_socket *msock, struct mref_object *mref, struct m
if (cmd->cmd_stamp.tv_sec) if (cmd->cmd_stamp.tv_sec)
set_lamport_nonstrict(&cmd->cmd_stamp); set_lamport_nonstrict(&cmd->cmd_stamp);
if (cmd->cmd_compr_flags && cmd->cmd_compr_len) {
void *decompr_buf;
tmp_buf = brick_mem_alloc(cmd->cmd_compr_len);
status = mars_recv_raw(msock,
tmp_buf,
cmd->cmd_compr_len,
cmd->cmd_compr_len);
if (status < 0) {
MARS_WRN("#%d compr_len = %d mref_len = %d, status = %d\n",
msock->s_debug_nr,
cmd->cmd_compr_len, mref->ref_len,
status);
goto done;
}
status = -EBADMSG;
decompr_buf =
mars_decompress(tmp_buf, cmd->cmd_compr_len,
mref->ref_data, mref->ref_len,
cmd->cmd_compr_flags);
if (unlikely(!decompr_buf))
goto done;
/* decompression success */
mref->ref_data = decompr_buf;
status = mref->ref_len;
goto done;
}
if (cmd->cmd_code & CMD_FLAG_HAS_DATA) { if (cmd->cmd_code & CMD_FLAG_HAS_DATA) {
if (!mref->ref_data) if (!mref->ref_data)
mref->ref_data = brick_block_alloc(0, mref->ref_len); mref->ref_data = brick_block_alloc(0, mref->ref_len);
@ -1337,7 +1406,9 @@ int mars_recv_mref(struct mars_socket *msock, struct mref_object *mref, struct m
if (status < 0) if (status < 0)
MARS_WRN("#%d mref_len = %d, status = %d\n", msock->s_debug_nr, mref->ref_len, status); MARS_WRN("#%d mref_len = %d, status = %d\n", msock->s_debug_nr, mref->ref_len, status);
} }
done: done:
brick_mem_free(tmp_buf);
return status; return status;
} }
EXPORT_SYMBOL_GPL(mars_recv_mref); EXPORT_SYMBOL_GPL(mars_recv_mref);

View File

@ -33,6 +33,9 @@
extern int mars_net_default_port; extern int mars_net_default_port;
extern bool mars_net_is_alive; extern bool mars_net_is_alive;
extern __u32 enabled_net_compressions;
extern __u32 used_net_compression;
#define MARS_PROTO_LEVEL 1 #define MARS_PROTO_LEVEL 1
#define MAX_FIELD_LEN 32 #define MAX_FIELD_LEN 32
@ -116,6 +119,8 @@ struct mars_cmd {
struct lamport_time cmd_stamp; // for automatic lamport clock struct lamport_time cmd_stamp; // for automatic lamport clock
int cmd_proto; int cmd_proto;
int cmd_code; int cmd_code;
__u32 cmd_compr_flags;
int cmd_compr_len;
int cmd_int1; int cmd_int1;
//int cmd_int2; //int cmd_int2;
//int cmd_int3; //int cmd_int3;

View File

@ -2698,6 +2698,7 @@ void _make_alive(void)
__make_alivelink("used-log-digest", used_log_digest, true); __make_alivelink("used-log-digest", used_log_digest, true);
__make_alivelink("used-net-digest", used_net_digest, true); __make_alivelink("used-net-digest", used_net_digest, true);
__make_alivelink("used-log-compression", used_log_compression, true); __make_alivelink("used-log-compression", used_log_compression, true);
__make_alivelink("used-net-compression", used_net_compression, true);
} }
void from_remote_trigger(void) void from_remote_trigger(void)
@ -5660,6 +5661,8 @@ int make_defaults(void *buf, struct mars_dent *dent)
disabled_net_digests = tmp; disabled_net_digests = tmp;
} else if (!strcmp(dent->d_name, "enabled-log-compressions")) { } else if (!strcmp(dent->d_name, "enabled-log-compressions")) {
sscanf(dent->new_link, "0x%x", &enabled_log_compressions); sscanf(dent->new_link, "0x%x", &enabled_log_compressions);
} else if (!strcmp(dent->d_name, "enabled-net-compressions")) {
sscanf(dent->new_link, "0x%x", &enabled_net_compressions);
} else { } else {
MARS_DBG("unimplemented default '%s'\n", dent->d_name); MARS_DBG("unimplemented default '%s'\n", dent->d_name);
} }