mirror of
https://github.com/schoebel/mars
synced 2025-02-22 07:16:50 +00:00
client: rework channel states and transitions
This commit is contained in:
parent
ba67e756a7
commit
74321eee3c
@ -144,14 +144,12 @@ void _kill_channel(struct client_channel *ch)
|
||||
mars_shutdown_socket(&ch->socket);
|
||||
}
|
||||
_kill_thread(&ch->receiver, "receiver");
|
||||
if (ch->is_open) {
|
||||
if (ch->ch_state >= CL_CHANNEL_OPEN) {
|
||||
MARS_DBG("close socket\n");
|
||||
mars_put_socket(&ch->socket);
|
||||
}
|
||||
ch->recv_error = 0;
|
||||
ch->is_used = false;
|
||||
ch->is_open = false;
|
||||
ch->is_connected = false;
|
||||
ch->ch_state = CL_CHANNEL_INITIALIZED;
|
||||
/* Re-Submit any waiting requests
|
||||
*/
|
||||
_do_resubmit(ch);
|
||||
@ -218,7 +216,7 @@ int _setup_channel(struct client_bundle *bundle, int ch_nr)
|
||||
ch->socket.s_shutdown_on_err = true;
|
||||
ch->socket.s_send_abort = mars_client_abort;
|
||||
ch->socket.s_recv_abort = mars_client_abort;
|
||||
ch->is_open = true;
|
||||
ch->ch_state = CL_CHANNEL_OPEN;
|
||||
|
||||
ch->receiver.thread =
|
||||
brick_thread_create(receiver_thread, ch,
|
||||
@ -228,10 +226,12 @@ int _setup_channel(struct client_bundle *bundle, int ch_nr)
|
||||
ch_nr);
|
||||
if (unlikely(!ch->receiver.thread)) {
|
||||
MARS_ERR("cannot start receiver thread %d, status = %d\n", ch_nr, status);
|
||||
mars_shutdown_socket(&ch->socket);
|
||||
ch->ch_state = CL_CHANNEL_INITIALIZED;
|
||||
status = -ENOENT;
|
||||
goto done;
|
||||
}
|
||||
ch->is_used = true;
|
||||
ch->ch_state = CL_CHANNEL_USED;
|
||||
atomic_inc(&client_receiver_count);
|
||||
|
||||
done:
|
||||
@ -266,7 +266,7 @@ void _maintain_bundle(struct client_bundle *bundle)
|
||||
for (i = 0; i < MAX_CLIENT_CHANNELS; i++) {
|
||||
struct client_channel *ch = &bundle->channel[i];
|
||||
|
||||
if (!ch->is_used ||
|
||||
if (ch->ch_state <= CL_CHANNEL_USED ||
|
||||
(!ch->recv_error && mars_socket_is_alive(&ch->socket)))
|
||||
continue;
|
||||
|
||||
@ -294,8 +294,7 @@ struct client_channel *_get_channel(struct client_bundle *bundle, int min_channe
|
||||
}
|
||||
/* Use higher channels only when the first one is fully established */
|
||||
if (max_channel > 1 &&
|
||||
(!bundle->channel[0].is_connected ||
|
||||
!bundle->channel[0].is_open)) {
|
||||
(bundle->channel[0].ch_state < CL_CHANNEL_CONNECTED)) {
|
||||
max_channel = 1;
|
||||
min_channel = 0;
|
||||
}
|
||||
@ -308,7 +307,9 @@ struct client_channel *_get_channel(struct client_bundle *bundle, int min_channe
|
||||
if (best_channel >= max_channel)
|
||||
best_channel = min_channel;
|
||||
res = &bundle->channel[best_channel];
|
||||
if (res->is_connected && !res->recv_error && mars_socket_is_alive(&res->socket)) {
|
||||
if (res->ch_state >= CL_CHANNEL_CONNECTED &&
|
||||
!res->recv_error &&
|
||||
mars_socket_is_alive(&res->socket)) {
|
||||
res->current_space = mars_socket_send_space_available(&res->socket);
|
||||
if (res->current_space > (PAGE_SIZE + PAGE_SIZE / 4))
|
||||
goto found;
|
||||
@ -326,7 +327,7 @@ struct client_channel *_get_channel(struct client_bundle *bundle, int min_channe
|
||||
long this_space;
|
||||
|
||||
// create new channels when necessary
|
||||
if (unlikely(!ch->is_open)) {
|
||||
if (ch->ch_state < CL_CHANNEL_OPEN) {
|
||||
int status;
|
||||
// only create one new channel at a time
|
||||
status = _setup_channel(bundle, i);
|
||||
@ -360,7 +361,7 @@ struct client_channel *_get_channel(struct client_bundle *bundle, int min_channe
|
||||
}
|
||||
|
||||
// send initial connect command
|
||||
if (unlikely(!res->is_connected)) {
|
||||
if (unlikely(res->ch_state < CL_CHANNEL_CONNECTED)) {
|
||||
struct mars_cmd cmd = {
|
||||
.cmd_code = CMD_CONNECT,
|
||||
.cmd_str1 = bundle->path,
|
||||
@ -377,7 +378,7 @@ struct client_channel *_get_channel(struct client_bundle *bundle, int min_channe
|
||||
res = NULL;
|
||||
goto done;
|
||||
}
|
||||
res->is_connected = true;
|
||||
res->ch_state = CL_CHANNEL_CONNECTED;
|
||||
}
|
||||
|
||||
found:
|
||||
@ -907,7 +908,7 @@ void _do_timeout_all(struct client_output *output, bool force)
|
||||
for (i = 0; i < MAX_CLIENT_CHANNELS; i++) {
|
||||
struct client_channel *ch = &output->bundle.channel[i];
|
||||
|
||||
if (!ch->is_used)
|
||||
if (ch->ch_state < CL_CHANNEL_USED)
|
||||
continue;
|
||||
_do_timeout(output, &ch->wait_list, &rounds, force);
|
||||
}
|
||||
@ -1101,7 +1102,7 @@ static int client_switch(struct client_brick *brick)
|
||||
int i;
|
||||
|
||||
for (i = 0; i < MAX_CLIENT_CHANNELS; i++)
|
||||
if (output->bundle.channel[i].is_connected)
|
||||
if (output->bundle.channel[i].ch_state >= CL_CHANNEL_CONNECTED)
|
||||
socket_count++;
|
||||
brick->socket_count = socket_count;
|
||||
if (brick->power.led_on)
|
||||
|
@ -75,6 +75,13 @@ struct client_threadinfo {
|
||||
struct task_struct *thread;
|
||||
};
|
||||
|
||||
enum CL_CHANNEL_STATE {
|
||||
CL_CHANNEL_INITIALIZED,
|
||||
CL_CHANNEL_OPEN, /* socket is estabished */
|
||||
CL_CHANNEL_USED, /* receiver thread has been created */
|
||||
CL_CHANNEL_CONNECTED, /* first communication had no error */
|
||||
};
|
||||
|
||||
struct client_channel {
|
||||
struct mars_socket socket;
|
||||
struct client_threadinfo receiver;
|
||||
@ -84,9 +91,7 @@ struct client_channel {
|
||||
int recv_error;
|
||||
int thread_restart_count;
|
||||
int ch_nr;
|
||||
bool is_used;
|
||||
bool is_open;
|
||||
bool is_connected;
|
||||
enum CL_CHANNEL_STATE ch_state;
|
||||
};
|
||||
|
||||
struct client_bundle {
|
||||
|
Loading…
Reference in New Issue
Block a user