client: add control keep_idle_sockets

This commit is contained in:
Thomas Schoebel-Theuer 2016-02-02 09:37:39 +01:00 committed by Thomas Schoebel-Theuer
parent e5f0305cd0
commit 9989e1260a
2 changed files with 37 additions and 12 deletions

View File

@ -122,11 +122,23 @@ void _kill_all_channels(struct client_bundle *bundle)
}
}
static inline
void _set_channel_idlemode(struct client_channel *ch, bool keep_idle_sockets)
{
if (keep_idle_sockets) {
ch->socket.s_send_abort = 0;
ch->socket.s_recv_abort = 0;
} else {
ch->socket.s_send_abort = mars_client_abort;
ch->socket.s_recv_abort = mars_client_abort;
}
}
static
int receiver_thread(void *data);
static
int _setup_channel(struct client_bundle *bundle, int ch_nr)
int _setup_channel(struct client_bundle *bundle, int ch_nr, bool keep_idle_sockets)
{
struct client_channel *ch = &bundle->channel[ch_nr];
struct sockaddr_storage sockaddr = {};
@ -162,9 +174,8 @@ int _setup_channel(struct client_bundle *bundle, int ch_nr)
MARS_DBG("no socket, status = %d\n", status);
goto really_done;
}
_set_channel_idlemode(ch, keep_idle_sockets);
ch->socket.s_shutdown_on_err = true;
ch->socket.s_send_abort = mars_client_abort;
ch->socket.s_recv_abort = mars_client_abort;
ch->is_open = true;
ch->receiver.thread = brick_thread_create(receiver_thread, ch, "mars_receiver%d.%d.%d", bundle->thread_count, ch_nr, ch->thread_count++);
@ -197,7 +208,7 @@ void _kill_bundle(struct client_bundle *bundle)
}
static
void _maintain_bundle(struct client_bundle *bundle)
void _maintain_bundle(struct client_bundle *bundle, bool keep_idle_sockets)
{
int i;
@ -207,8 +218,12 @@ void _maintain_bundle(struct client_bundle *bundle)
for (i = 0; i < MAX_CLIENT_CHANNELS; i++) {
struct client_channel *ch = &bundle->channel[i];
if (!ch->is_used ||
(!ch->recv_error && mars_socket_is_alive(&ch->socket)))
if (!ch->is_used)
continue;
_set_channel_idlemode(ch, keep_idle_sockets);
if (!ch->recv_error && mars_socket_is_alive(&ch->socket))
continue;
MARS_DBG("killing channel %d\n", i);
@ -246,7 +261,9 @@ int _request_info(struct client_channel *ch)
}
static
struct client_channel *_get_channel(struct client_bundle *bundle, int min_channel, int max_channel)
struct client_channel *_get_channel(struct client_bundle *bundle,
int min_channel, int max_channel,
bool keep_idle_sockets)
{
struct client_channel *res;
long best_space;
@ -277,7 +294,7 @@ struct client_channel *_get_channel(struct client_bundle *bundle, int min_channe
/* Slow path. Do all the teady work.
*/
_maintain_bundle(bundle);
_maintain_bundle(bundle, keep_idle_sockets);
res = NULL;
best_space = -1;
@ -290,7 +307,7 @@ struct client_channel *_get_channel(struct client_bundle *bundle, int min_channe
if (unlikely(!ch->is_open)) {
int status;
// only create one new channel at a time
status = _setup_channel(bundle, i);
status = _setup_channel(bundle, i, keep_idle_sockets);
MARS_DBG("setup channel %d status=%d\n", i, status);
if (unlikely(status < 0))
continue;
@ -876,7 +893,7 @@ static int sender_thread(void *data)
// timeouting is a rather expensive operation, don't do it too often
if (do_timeout) {
do_timeout = false;
_maintain_bundle(&output->bundle);
_maintain_bundle(&output->bundle, brick->keep_idle_sockets);
_do_timeout_all(output, false);
}
@ -892,7 +909,7 @@ static int sender_thread(void *data)
old_cork = false;
mars_send_raw(&ch->socket, NULL, 0, false);
}
ch = _get_channel(bundle, 0, 1);
ch = _get_channel(bundle, 0, 1, brick->keep_idle_sockets);
if (unlikely(!ch)) {
do_timeout = true;
brick_msleep(100);
@ -919,6 +936,13 @@ static int sender_thread(void *data)
mutex_unlock(&output->mutex);
MARS_DBG("empty %d %d\n", output->get_info, brick_thread_should_stop());
do_timeout = true;
if (brick->keep_idle_sockets) {
ch = _get_channel(bundle, 0, 1, true);
if (!ch) {
output->get_info = true;
brick_msleep(100);
}
}
brick_yield();
continue;
}
@ -957,7 +981,7 @@ static int sender_thread(void *data)
old_cork = false;
mars_send_raw(&ch->socket, NULL, 0, false);
}
ch = _get_channel(bundle, min_nr, max_nr);
ch = _get_channel(bundle, min_nr, max_nr, brick->keep_idle_sockets);
if (unlikely(!ch)) {
// notice: this will re-assign hash_head without harm
_hash_insert(output, mref_a);

View File

@ -50,6 +50,7 @@ struct client_brick {
MARS_BRICK(client);
// tunables
int max_flying; // limit on parallelism
bool keep_idle_sockets;
bool limit_mode;
bool allow_permuting_writes;
bool separate_reads;