From f4795b6c747c68b6727960d663a130efa566b4b9 Mon Sep 17 00:00:00 2001 From: Thomas Schoebel-Theuer Date: Tue, 1 Apr 2014 11:24:46 +0200 Subject: [PATCH] client: implement socket bundling --- kernel/mars_client.c | 622 ++++++++++++++++++++++++-------------- kernel/mars_client.h | 40 ++- kernel/sy_old/mars_proc.c | 2 + 3 files changed, 433 insertions(+), 231 deletions(-) diff --git a/kernel/mars_client.c b/kernel/mars_client.c index 568eb66f..25c44e1c 100644 --- a/kernel/mars_client.c +++ b/kernel/mars_client.c @@ -22,8 +22,6 @@ */ -// Client brick (just for demonstration) - //#define BRICK_DEBUGGING //#define MARS_DEBUGGING //#define IO_DEBUGGING @@ -31,6 +29,7 @@ #include #include #include +#include #include "mars.h" @@ -43,33 +42,234 @@ int mars_client_abort = 10; EXPORT_SYMBOL_GPL(mars_client_abort); +int max_client_channels = 1; +EXPORT_SYMBOL_GPL(max_client_channels); + +int max_client_bulk = 16; +EXPORT_SYMBOL_GPL(max_client_bulk); + ///////////////////////// own helper functions //////////////////////// static int thread_count = 0; -static void _kill_thread(struct client_threadinfo *ti, const char *name) +static +void _do_resubmit(struct client_channel *ch) { - if (ti->thread) { + struct client_output *output = ch->output; + + spin_lock(&output->lock); + if (!list_empty(&ch->wait_list)) { + struct list_head *first = ch->wait_list.next; + struct list_head *last = ch->wait_list.prev; + struct list_head *old_start = output->mref_list.next; +#define list_connect __list_del // the original routine has a misleading name: in reality it is more general + list_connect(&output->mref_list, first); + list_connect(last, old_start); + INIT_LIST_HEAD(&ch->wait_list); + } + spin_unlock(&output->lock); +} + +static +void _kill_thread(struct client_threadinfo *ti, const char *name) +{ + struct task_struct *thread = ti->thread; + if (thread) { MARS_DBG("stopping %s thread\n", name); - brick_thread_stop(ti->thread); ti->thread = NULL; + brick_thread_stop(thread); } } -static void _kill_socket(struct client_output *output) +static +void _kill_channel(struct client_channel *ch) { - output->brick->connection_state = 1; - if (mars_socket_is_alive(&output->socket)) { + MARS_DBG("channel = %p\n", ch); + if (mars_socket_is_alive(&ch->socket)) { MARS_DBG("shutdown socket\n"); - mars_shutdown_socket(&output->socket); + mars_shutdown_socket(&ch->socket); } - _kill_thread(&output->receiver, "receiver"); - output->recv_error = 0; - MARS_DBG("close socket\n"); - mars_put_socket(&output->socket); + _kill_thread(&ch->receiver, "receiver"); + if (ch->is_open) { + MARS_DBG("close socket\n"); + mars_put_socket(&ch->socket); + } + ch->recv_error = 0; + ch->is_open = false; + ch->is_connected = false; + /* Re-Submit any waiting requests + */ + _do_resubmit(ch); } -static int _request_info(struct client_output *output) +static inline +void _kill_all_channels(struct client_bundle *bundle) +{ + int i; + // first pass: shutdown in parallel without waiting + for (i = 0; i < MAX_CLIENT_CHANNELS; i++) { + struct client_channel *ch =&bundle->channel[i]; + if (mars_socket_is_alive(&ch->socket)) { + MARS_DBG("shutdown socket %d\n", i); + mars_shutdown_socket(&ch->socket); + } + } + // separate pass (may wait) + for (i = 0; i < MAX_CLIENT_CHANNELS; i++) { + _kill_channel(&bundle->channel[i]); + } +} + +static +int receiver_thread(void *data); + +static +int _setup_channel(struct client_bundle *bundle, int ch_nr) +{ + struct client_channel *ch = &bundle->channel[ch_nr]; + struct sockaddr_storage sockaddr = {}; + int status; + + ch->ch_nr = ch_nr; + if (unlikely(ch->receiver.thread)) { + MARS_WRN("receiver thread %d unexpectedly not dead\n", ch_nr); + _kill_thread(&ch->receiver, "receiver"); + } + + status = mars_create_sockaddr(&sockaddr, bundle->host); + if (unlikely(status < 0)) { + MARS_DBG("no sockaddr, status = %d\n", status); + goto done; + } + + status = mars_create_socket(&ch->socket, &sockaddr, false); + if (unlikely(status < 0)) { + MARS_DBG("no socket, status = %d\n", status); + goto really_done; + } + ch->socket.s_shutdown_on_err = true; + ch->socket.s_send_abort = mars_client_abort; + ch->socket.s_recv_abort = mars_client_abort; + ch->is_open = true; + + ch->receiver.thread = brick_thread_create(receiver_thread, ch, "mars_receiver%d.%d.%d", bundle->thread_count, ch_nr, ch->thread_count++); + if (unlikely(!ch->receiver.thread)) { + MARS_ERR("cannot start receiver thread %d, status = %d\n", ch_nr, status); + status = -ENOENT; + goto done; + } + ch->is_used = true; + +done: + if (status < 0) { + MARS_INF("cannot connect channel %d to remote host '%s' (status = %d) -- retrying\n", + ch_nr, + bundle->host ? bundle->host : "NULL", + status); + _kill_channel(ch); + } + +really_done: + return status; +} + +static +void _kill_bundle(struct client_bundle *bundle) +{ + MARS_DBG("\n"); + _kill_thread(&bundle->sender, "sender"); + _kill_all_channels(bundle); +} + +static +void _maintain_bundle(struct client_bundle *bundle) +{ + int i; + + /* Re-open _any_ failed channel, even old ones. + * Reason: the number of channels might change during operation. + */ + for (i = 0; i < MAX_CLIENT_CHANNELS; i++) { + struct client_channel *ch = &bundle->channel[i]; + int status; + + if (!ch->is_used || + (!ch->recv_error && mars_socket_is_alive(&ch->socket))) + continue; + + MARS_DBG("killing channel %d\n", i); + _kill_channel(ch); + + status = _setup_channel(bundle, i); + MARS_DBG("setup channel %d status=%d\n", i, status); + } +} + +static +struct client_channel *_get_channel(struct client_bundle *bundle, int min_channel, int max_channel) +{ + struct client_channel *res = NULL; + long best_space = -1; + int i; + + _maintain_bundle(bundle); + + if (unlikely(max_channel <= 0 || max_channel > MAX_CLIENT_CHANNELS)) + max_channel = MAX_CLIENT_CHANNELS; + if (unlikely(min_channel < 0 || min_channel >= max_channel)) + min_channel = max_channel - 1; + + for (i = min_channel; i < max_channel; i++) { + struct client_channel *ch = &bundle->channel[i]; + long this_space; + + // create new channels when necessary + if (unlikely(!ch->is_open)) { + int status; + // only create one new channel at a time + status = _setup_channel(bundle, i); + MARS_DBG("setup channel %d status=%d\n", i, status); + if (unlikely(status < 0)) + continue; + + this_space = mars_socket_send_space_available(&ch->socket); + ch->current_space = this_space; + /* Always prefer the newly opened channel */ + res = ch; + break; + } + + // select the best usable channel + this_space = mars_socket_send_space_available(&ch->socket); + ch->current_space = this_space; + if (this_space > best_space) { + best_space = this_space; + res = ch; + } + } + + // send initial connect command + if (res && !res->is_connected) { + struct mars_cmd cmd = { + .cmd_code = CMD_CONNECT, + .cmd_str1 = bundle->path, + }; + int status = mars_send_struct(&res->socket, &cmd, mars_cmd_meta); + MARS_DBG("send CMD_CONNECT status = %d\n", status); + if (unlikely(status < 0)) { + _kill_channel(res); + res = NULL; + goto done; + } + res->is_connected = true; + } + + done: + return res; +} + +static +int _request_info(struct client_channel *ch) { struct mars_cmd cmd = { .cmd_code = CMD_GETINFO, @@ -77,88 +277,49 @@ static int _request_info(struct client_output *output) int status; MARS_DBG("\n"); - status = mars_send_struct(&output->socket, &cmd, mars_cmd_meta); + status = mars_send_struct(&ch->socket, &cmd, mars_cmd_meta); + MARS_DBG("send CMD_GETINFO status = %d\n", status); if (unlikely(status < 0)) { MARS_DBG("send of getinfo failed, status = %d\n", status); } return status; } -static int receiver_thread(void *data); +static int sender_thread(void *data); -static int _connect(struct client_output *output, const char *str) +static +int _setup_bundle(struct client_bundle *bundle, const char *str) { - struct sockaddr_storage sockaddr = {}; - int status; + int status = -ENOMEM; - if (unlikely(!output->path)) { - output->path = brick_strdup(str); - status = -ENOMEM; - if (!output->path) { - MARS_DBG("no mem\n"); - goto done; - } - status = -EINVAL; - output->host = strchr(output->path, '@'); - if (!output->host) { - brick_string_free(output->path); - output->path = NULL; - MARS_ERR("parameter string '%s' contains no remote specifier with '@'-syntax\n", str); - goto done; - } - *output->host++ = '\0'; - } + MARS_DBG("\n"); + _kill_bundle(bundle); + brick_string_free(bundle->path); - if (unlikely(output->receiver.thread)) { - MARS_WRN("receiver thread unexpectedly not dead\n"); - _kill_thread(&output->receiver, "receiver"); - } + bundle->path = brick_strdup(str); - status = mars_create_sockaddr(&sockaddr, output->host); - if (unlikely(status < 0)) { - MARS_DBG("no sockaddr, status = %d\n", status); + status = -EINVAL; + bundle->host = strchr(bundle->path, '@'); + if (unlikely(!bundle->host)) { + brick_string_free(bundle->path); + bundle->path = NULL; + MARS_ERR("parameter string '%s' contains no remote specifier with '@'-syntax\n", str); goto done; } - - status = mars_create_socket(&output->socket, &sockaddr, false); - if (unlikely(status < 0)) { - MARS_DBG("no socket, status = %d\n", status); - goto really_done; - } - output->socket.s_shutdown_on_err = true; - output->socket.s_send_abort = mars_client_abort; - output->socket.s_recv_abort = mars_client_abort; + *bundle->host++ = '\0'; - output->receiver.thread = brick_thread_create(receiver_thread, output, "mars_receiver%d", thread_count++); - if (unlikely(!output->receiver.thread)) { - MARS_ERR("cannot start receiver thread, status = %d\n", status); + bundle->thread_count = thread_count++; + bundle->sender.thread = brick_thread_create(sender_thread, bundle, "mars_sender%d", bundle->thread_count); + if (unlikely(!bundle->sender.thread)) { + MARS_ERR("cannot start sender thread\n"); status = -ENOENT; goto done; } - - { - struct mars_cmd cmd = { - .cmd_code = CMD_CONNECT, - .cmd_str1 = output->path, - }; - - status = mars_send_struct(&output->socket, &cmd, mars_cmd_meta); - if (unlikely(status < 0)) { - MARS_DBG("send of connect failed, status = %d\n", status); - goto done; - } - } - if (status >= 0) { - status = _request_info(output); - } + status = 0; done: - if (status < 0) { - MARS_INF("cannot connect to remote host '%s' (status = %d) -- retrying\n", output->host ? output->host : "NULL", status); - _kill_socket(output); - } -really_done: + MARS_DBG("status = %d\n", status); return status; } @@ -186,7 +347,7 @@ static int client_get_info(struct client_output *output, struct mars_info *info) goto timeout; output->get_info = true; - wake_up_interruptible(&output->event); + wake_up_interruptible_all(&output->bundle.sender_event); wait_event_interruptible_timeout(output->info_event, output->got_info, io_timeout * HZ); timeout: @@ -283,6 +444,10 @@ static void client_ref_io(struct client_output *output, struct mref_object *mref #endif } + if (!output->brick->power.led_on) { + MARS_ERR("IO submission on dead instance\n"); + } + atomic_inc(&mars_global_io_flying); atomic_inc(&output->fly_count); _mref_get(mref); @@ -292,7 +457,7 @@ static void client_ref_io(struct client_output *output, struct mref_object *mref MARS_IO("added request id = %d pos = %lld len = %d rw = %d (flying = %d)\n", mref->ref_id, mref->ref_pos, mref->ref_len, mref->ref_rw, atomic_read(&output->fly_count)); - wake_up_interruptible(&output->event); + wake_up_interruptible_all(&output->bundle.sender_event); return; @@ -305,7 +470,8 @@ error: static int receiver_thread(void *data) { - struct client_output *output = data; + struct client_channel *ch = data; + struct client_output *output = ch->output; int status = 0; while (!brick_thread_should_stop()) { @@ -315,21 +481,26 @@ int receiver_thread(void *data) struct mref_object *mref = NULL; unsigned long flags; - if (output->recv_error) { + if (ch->recv_error) { /* The protocol may be out of sync. * Consume some data to avoid distributed deadlocks. */ - (void)mars_recv_raw(&output->socket, &cmd, 0, sizeof(cmd)); - wake_up_interruptible(&output->event); + (void)mars_recv_raw(&ch->socket, &cmd, 0, sizeof(cmd)); brick_msleep(100); - status = output->recv_error; + status = ch->recv_error; continue; } - status = mars_recv_struct(&output->socket, &cmd, mars_cmd_meta); + status = mars_recv_struct(&ch->socket, &cmd, mars_cmd_meta); MARS_IO("got cmd = %d status = %d\n", cmd.cmd_code, status); - if (status <= 0) + if (status <= 0) { + if (!mars_socket_is_alive(&ch->socket)) { + MARS_DBG("socket is dead\n"); + brick_msleep(1000); + continue; + } goto done; + } switch (cmd.cmd_code & CMD_FLAG_MASK) { case CMD_NOTIFY: @@ -368,13 +539,16 @@ int receiver_thread(void *data) if (unlikely(!mref)) { MARS_WRN("got unknown id = %d for callback\n", cmd.cmd_int1); - status = -EBADR; + // try to consume the corresponding payload + mref = client_alloc_mref(output->brick); + status = mars_recv_cb(&ch->socket, mref, &cmd); + client_free_mref(mref); goto done; } MARS_IO("got callback id = %d, old pos = %lld len = %d rw = %d\n", mref->ref_id, mref->ref_pos, mref->ref_len, mref->ref_rw); - status = mars_recv_cb(&output->socket, mref, &cmd); + status = mars_recv_cb(&ch->socket, mref, &cmd); MARS_IO("new status = %d, pos = %lld len = %d rw = %d\n", status, mref->ref_pos, mref->ref_len, mref->ref_rw); if (unlikely(status < 0)) { MARS_WRN("interrupted data transfer during callback, status = %d\n", status); @@ -382,6 +556,9 @@ int receiver_thread(void *data) goto done; } + if (mref->_object_cb.cb_error < 0) { + MARS_DBG("ERROR %d\n", mref->_object_cb.cb_error); + } SIMPLE_CALLBACK(mref, mref->_object_cb.cb_error); client_ref_put(output, mref); @@ -391,13 +568,13 @@ int receiver_thread(void *data) break; } case CMD_GETINFO: - status = mars_recv_struct(&output->socket, &output->info, mars_info_meta); + status = mars_recv_struct(&ch->socket, &output->info, mars_info_meta); if (status < 0) { MARS_WRN("got bad info from remote side, status = %d\n", status); goto done; } output->got_info = true; - wake_up_interruptible(&output->info_event); + wake_up_interruptible_all(&output->info_event); break; default: MARS_ERR("got bad command %d from remote side, terminating.\n", cmd.cmd_code); @@ -407,43 +584,24 @@ int receiver_thread(void *data) done: brick_string_free(cmd.cmd_str1); if (unlikely(status < 0)) { - if (!output->recv_error) { - MARS_DBG("signalling status = %d\n", status); - output->recv_error = status; + if (!ch->recv_error) { + MARS_DBG("signalling recv_error = %d\n", status); + ch->recv_error = status; } - wake_up_interruptible(&output->event); brick_msleep(100); } + // wake up sender in any case + wake_up_interruptible_all(&output->bundle.sender_event); } if (status < 0) { - MARS_WRN("receiver thread terminated with status = %d, recv_error = %d\n", status, output->recv_error); + MARS_WRN("receiver thread terminated with status = %d\n", status); } - mars_shutdown_socket(&output->socket); - wake_up_interruptible(&output->receiver.run_event); + mars_shutdown_socket(&ch->socket); return status; } -static -void _do_resubmit(struct client_output *output) -{ - unsigned long flags; - - traced_lock(&output->lock, flags); - if (!list_empty(&output->wait_list)) { - struct list_head *first = output->wait_list.next; - struct list_head *last = output->wait_list.prev; - struct list_head *old_start = output->mref_list.next; -#define list_connect __list_del // the original routine has a misleading name: in reality it is more general - list_connect(&output->mref_list, first); - list_connect(last, old_start); - INIT_LIST_HEAD(&output->wait_list); - MARS_IO("done re-submit %p %p\n", first, last); - } - traced_unlock(&output->lock, flags); -} - static void _do_timeout(struct client_output *output, struct list_head *anchor, bool force) { @@ -451,21 +609,34 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, bool fo struct list_head *tmp; struct list_head *next; LIST_HEAD(tmp_list); + int count = 0; int rounds = 0; long io_timeout = _compute_timeout(brick); unsigned long flags; + int i; + + if (list_empty(anchor)) + return; if (!mars_net_is_alive) force = true; if (!force && io_timeout <= 0) { - output->socket.s_send_abort = mars_client_abort; - output->socket.s_recv_abort = mars_client_abort; + for (i = 0; i < MAX_CLIENT_CHANNELS; i++) { + struct client_channel *ch = &output->bundle.channel[i]; + + ch->socket.s_send_abort = mars_client_abort; + ch->socket.s_recv_abort = mars_client_abort; + } return; } - output->socket.s_send_abort = 1; - output->socket.s_recv_abort = 1; + for (i = 0; i < MAX_CLIENT_CHANNELS; i++) { + struct client_channel *ch = &output->bundle.channel[i]; + + ch->socket.s_send_abort = 1; + ch->socket.s_recv_abort = 1; + } io_timeout *= HZ; @@ -502,6 +673,7 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, bool fo mref->ref_len); } + count++; atomic_inc(&output->timeout_count); SIMPLE_CALLBACK(mref, -ETIME); @@ -511,75 +683,67 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, bool fo atomic_dec(&output->fly_count); atomic_dec(&mars_global_io_flying); } + MARS_DBG("force = %d count = %d\n", force, count); +} + +static +void _do_timeout_all(struct client_output *output, bool force) +{ + int i; + for (i = 0; i < MAX_CLIENT_CHANNELS; i++) { + struct client_channel *ch = &output->bundle.channel[i]; + + if (!ch->is_used) + continue; + _do_timeout(output, &ch->wait_list, force); + } + _do_timeout(output, &output->mref_list, force); } static int sender_thread(void *data) { - struct client_output *output = data; + struct client_bundle *bundle = data; + struct client_output *output = container_of(bundle, struct client_output, bundle); struct client_brick *brick = output->brick; + struct client_channel *ch = NULL; + bool do_timeout = false; + int ch_skip = max_client_bulk; + int status = -ENOTCONN; unsigned long flags; - bool do_kill = false; - int status = 0; - - output->receiver.restart_count = 0; while (brick->power.button && !brick_thread_should_stop()) { struct list_head *tmp = NULL; struct client_mref_aspect *mref_a; struct mref_object *mref; + int min_nr; + int max_nr; - if (brick->power.io_timeout > 0) { - _do_timeout(output, &output->wait_list, false); - _do_timeout(output, &output->mref_list, false); + // timeouting is a rather expensive operation, don't do it too often + if (do_timeout) { + do_timeout = false; + _do_timeout_all(output, false); } - if (unlikely(output->recv_error != 0 || !mars_socket_is_alive(&output->socket))) { - MARS_DBG("recv_error = %d do_kill = %d\n", output->recv_error, do_kill); - if (do_kill) { - do_kill = false; - _kill_socket(output); - brick_msleep(3000); - } + wait_event_interruptible_timeout(output->bundle.sender_event, + !list_empty(&output->mref_list) || + output->get_info, + 2 * HZ); - status = _connect(output, brick->brick_name); - MARS_IO("connect status = %d\n", status); - if (unlikely(status < 0)) { - brick_msleep(3000); - _do_timeout(output, &output->wait_list, false); - _do_timeout(output, &output->mref_list, false); + + if (output->get_info) { + ch = _get_channel(bundle, 0, 1); + if (unlikely(!ch)) { + MARS_WRN("cannot setup info communication channel\n"); + do_timeout = true; + brick_msleep(1000); continue; } - brick->connection_state = 2; - do_kill = true; - /* Re-Submit any waiting requests - */ - MARS_IO("re-submit\n"); - _do_resubmit(output); - } - - wait_event_interruptible_timeout(output->event, - !list_empty(&output->mref_list) || - output->get_info || - output->recv_error != 0 || - !brick->power.button || - brick_thread_should_stop(), - 1 * HZ); - - if (unlikely(!brick->power.button || brick_thread_should_stop())) - break; - - if (unlikely(output->recv_error != 0)) { - MARS_DBG("recv_error = %d\n", output->recv_error); - brick_msleep(1000); - continue; - } - - if (output->get_info) { - status = _request_info(output); - if (status >= 0) { + status = _request_info(ch); + if (likely(status >= 0)) { output->get_info = false; } else { - MARS_WRN("cannot get info, status = %d\n", status); + MARS_WRN("cannot send info request, status = %d\n", status); + do_timeout = true; brick_msleep(1000); } } @@ -587,16 +751,18 @@ static int sender_thread(void *data) /* Grab the next mref from the queue */ traced_lock(&output->lock, flags); - if (list_empty(&output->mref_list)) { + tmp = output->mref_list.next; + if (tmp == &output->mref_list) { traced_unlock(&output->lock, flags); + MARS_DBG("empty %d %d\n", output->get_info, brick_thread_should_stop()); + do_timeout = true; continue; } - tmp = output->mref_list.next; - list_del(tmp); - list_add(tmp, &output->wait_list); - mref_a = container_of(tmp, struct client_mref_aspect, io_head); + list_del_init(tmp); + // notice: hash_head remains in its list! traced_unlock(&output->lock, flags); + mref_a = container_of(tmp, struct client_mref_aspect, io_head); mref = mref_a->object; if (brick->limit_mode) { @@ -606,41 +772,67 @@ static int sender_thread(void *data) mars_limit_sleep(&client_limiter, amount); } - MARS_IO("sending mref, id = %d pos = %lld len = %d rw = %d\n", mref->ref_id, mref->ref_pos, mref->ref_len, mref->ref_rw); + // try to spread reads over multiple channels.... + min_nr = 0; + max_nr = max_client_channels; + if (!mref->ref_rw) { + /* optionally separate reads from writes */ + if (brick->separate_reads && max_nr > 1) + min_nr = 1; + } else if (!brick->allow_permuting_writes) { + max_nr = 1; + } + if (!ch || ch->recv_error || + !mars_socket_is_alive(&ch->socket) || + ch->ch_nr >= max_nr || --ch_skip < 0) { + ch = _get_channel(bundle, min_nr, max_nr); + if (likely(ch)) { + /* estimate: add some headroom for overhead */ + ch_skip = ch->current_space / PAGE_SIZE + + ch->current_space / (PAGE_SIZE * 8); + if (ch_skip > max_client_bulk) + ch_skip = max_client_bulk; + } else { + // notice: this will re-assign hash_head without harm + _hash_insert(output, mref_a); + do_timeout = true; + MARS_WRN("cannot setup communication channel\n"); + brick_msleep(1000); + continue; + } + } - status = mars_send_mref(&output->socket, mref); - MARS_IO("status = %d\n", status); + spin_lock(&output->lock); + list_add(tmp, &ch->wait_list); + // notice: hash_head is already there! + spin_unlock(&output->lock); + + status = mars_send_mref(&ch->socket, mref); if (unlikely(status < 0)) { + _hash_insert(output, mref_a); + do_timeout = true; + ch = NULL; // retry submission on next occasion.. MARS_WRN("sending failed, status = %d\n", status); - if (do_kill) { - do_kill = false; - _kill_socket(output); - } - _hash_insert(output, mref_a); - brick_msleep(1000); + brick_msleep(100); continue; } } -//done: + if (status < 0) { MARS_WRN("sender thread terminated with status = %d\n", status); } - if (do_kill) { - _kill_socket(output); - } + _kill_all_channels(bundle); /* Signal error on all pending IO requests. * We have no other chance (except probably delaying * this until destruction which is probably not what * we want). */ - _do_timeout(output, &output->wait_list, true); - _do_timeout(output, &output->mref_list, true); - - wake_up_interruptible(&output->sender.run_event); + _do_timeout_all(output, true); + wake_up_interruptible_all(&output->bundle.sender_event); MARS_DBG("sender terminated\n"); return status; } @@ -654,34 +846,21 @@ static int client_switch(struct client_brick *brick) if (brick->power.led_on) goto done; mars_power_led_off((void*)brick, false); - if (!output->sender.thread) { + status = _setup_bundle(&output->bundle, brick->brick_name); + if (likely(status >= 0)) { + output->get_info = true; brick->connection_state = 1; - output->sender.thread = brick_thread_create(sender_thread, output, "mars_sender%d", thread_count++); - if (unlikely(!output->sender.thread)) { - MARS_ERR("cannot start sender thread\n"); - status = -ENOENT; - goto done; - } - } - if (output->sender.thread) { mars_power_led_on((void*)brick, true); } } else { if (brick->power.led_off) goto done; mars_power_led_on((void*)brick, false); - if (mars_get_socket(&output->socket)) { - if (mars_socket_is_alive(&output->socket)) { - MARS_DBG("shutdown socket\n"); - mars_shutdown_socket(&output->socket); - } - mars_put_socket(&output->socket); - } - _kill_thread(&output->sender, "sender"); + _kill_bundle(&output->bundle); + _do_timeout_all(output, true); + output->got_info = false; brick->connection_state = 0; - if (!output->sender.thread) { - mars_power_led_off((void*)brick, !output->sender.thread); - } + mars_power_led_off((void*)brick, !output->bundle.sender.thread); } done: return status; @@ -699,12 +878,10 @@ char *client_statistics(struct client_brick *brick, int verbose) return NULL; snprintf(res, 1024, - "#%d socket " "max_flying = %d " "io_timeout = %d | " "timeout_count = %d " "fly_count = %d\n", - output->socket.s_debug_nr, brick->max_flying, brick->power.io_timeout, atomic_read(&output->timeout_count), @@ -760,22 +937,25 @@ static int client_output_construct(struct client_output *output) for (i = 0; i < CLIENT_HASH_MAX; i++) { INIT_LIST_HEAD(&output->hash_table[i]); } + + for (i = 0; i < MAX_CLIENT_CHANNELS; i++) { + struct client_channel *ch = &output->bundle.channel[i]; + ch->output = output; + INIT_LIST_HEAD(&ch->wait_list); + } + + init_waitqueue_head(&output->bundle.sender_event); + spin_lock_init(&output->lock); INIT_LIST_HEAD(&output->mref_list); - INIT_LIST_HEAD(&output->wait_list); - init_waitqueue_head(&output->event); - init_waitqueue_head(&output->sender.run_event); - init_waitqueue_head(&output->receiver.run_event); init_waitqueue_head(&output->info_event); return 0; } static int client_output_destruct(struct client_output *output) { - if (output->path) { - brick_string_free(output->path); - output->path = NULL; - } + brick_string_free(output->bundle.path); + output->bundle.path = NULL; brick_block_free(output->hash_table, PAGE_SIZE); return 0; } diff --git a/kernel/mars_client.h b/kernel/mars_client.h index f821905f..2bd5893a 100644 --- a/kernel/mars_client.h +++ b/kernel/mars_client.h @@ -30,6 +30,10 @@ extern struct mars_limiter client_limiter; extern int global_net_io_timeout; extern int mars_client_abort; +extern int max_client_channels; +extern int max_client_bulk; + +#define MAX_CLIENT_CHANNELS 4 struct client_mref_aspect { GENERIC_ASPECT(mref); @@ -46,6 +50,8 @@ struct client_brick { // tunables int max_flying; // limit on parallelism bool limit_mode; + bool allow_permuting_writes; + bool separate_reads; // readonly from outside int connection_state; // 0 = switched off, 1 = not connected, 2 = connected }; @@ -56,8 +62,29 @@ struct client_input { struct client_threadinfo { struct task_struct *thread; - wait_queue_head_t run_event; - int restart_count; +}; + +struct client_channel { + struct mars_socket socket; + struct client_threadinfo receiver; + struct list_head wait_list; + struct client_output *output; + long current_space; + int thread_count; + int recv_error; + int ch_nr; + bool is_used; + bool is_open; + bool is_connected; +}; + +struct client_bundle { + char *host; + char *path; + int thread_count; + wait_queue_head_t sender_event; + struct client_threadinfo sender; + struct client_channel channel[MAX_CLIENT_CHANNELS]; }; struct client_output { @@ -66,15 +93,8 @@ struct client_output { atomic_t timeout_count; spinlock_t lock; struct list_head mref_list; - struct list_head wait_list; - wait_queue_head_t event; int last_id; - int recv_error; - struct mars_socket socket; - char *host; - char *path; - struct client_threadinfo sender; - struct client_threadinfo receiver; + struct client_bundle bundle; struct mars_info info; wait_queue_head_t info_event; bool get_info; diff --git a/kernel/sy_old/mars_proc.c b/kernel/sy_old/mars_proc.c index 3cd359ea..b92ef364 100644 --- a/kernel/sy_old/mars_proc.c +++ b/kernel/sy_old/mars_proc.c @@ -356,6 +356,8 @@ struct ctl_table mars_table[] = { // changing makes no sense because the server will immediately start upon modprobe INT_ENTRY("mars_port", mars_net_default_port, 0400), INT_ENTRY("network_io_timeout", global_net_io_timeout, 0600), + INT_ENTRY("parallel_connections", max_client_channels, 0600), + INT_ENTRY("parallel_bulk_feed", max_client_bulk, 0600), { _CTL_NAME .procname = "traffic_tuning",