mirror of
https://github.com/schoebel/mars
synced 2025-02-26 01:20:29 +00:00
client: improve thread naming
This commit is contained in:
parent
127cbd6d86
commit
1b156ea855
@ -54,12 +54,56 @@ int max_client_channels = 2;
|
|||||||
|
|
||||||
int max_client_bulk = 16;
|
int max_client_bulk = 16;
|
||||||
|
|
||||||
|
/* //////////////////////// human readable numbers //////////////////////// */
|
||||||
|
|
||||||
|
#define MAX_HUMAN_NR 512
|
||||||
|
|
||||||
|
struct human_number_generator {
|
||||||
|
int hum_got;
|
||||||
|
u8 hum_wrap;
|
||||||
|
u8 hum_table[MAX_HUMAN_NR];
|
||||||
|
};
|
||||||
|
|
||||||
|
typedef u8 human_numbers_t[MAX_HUMAN_NR];
|
||||||
|
|
||||||
|
/* No locking necessary for now (no parallelism).
|
||||||
|
*/
|
||||||
|
static
|
||||||
|
int get_human_nr(struct human_number_generator *hum_gen)
|
||||||
|
{
|
||||||
|
int restarted = 0;
|
||||||
|
int searched = MAX_HUMAN_NR;
|
||||||
|
int res = (hum_gen->hum_got + 1) % MAX_HUMAN_NR;
|
||||||
|
|
||||||
|
restart:
|
||||||
|
while (searched-- >= 0) {
|
||||||
|
u8 cand = hum_gen->hum_table[res];
|
||||||
|
if (cand == hum_gen->hum_wrap) {
|
||||||
|
hum_gen->hum_table[res]++;
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* no candidate found: aliases are no longer avoidable */
|
||||||
|
hum_gen->hum_wrap = (hum_gen->hum_wrap + 1) % MAX_HUMAN_NR;
|
||||||
|
if (!restarted++)
|
||||||
|
goto restart;
|
||||||
|
/* fallback is now exhausted: we need to lie */
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static
|
||||||
|
void put_human_nr(struct human_number_generator *hum_gen, int nr)
|
||||||
|
{
|
||||||
|
int index = nr % MAX_HUMAN_NR;
|
||||||
|
hum_gen->hum_table[index] = hum_gen->hum_wrap;
|
||||||
|
}
|
||||||
|
|
||||||
|
static struct human_number_generator global_numbers;
|
||||||
|
|
||||||
///////////////////////// own helper functions ////////////////////////
|
///////////////////////// own helper functions ////////////////////////
|
||||||
|
|
||||||
static atomic_t sender_count = ATOMIC_INIT(0);
|
static atomic_t sender_count = ATOMIC_INIT(0);
|
||||||
|
|
||||||
static int thread_count = 0;
|
|
||||||
|
|
||||||
static
|
static
|
||||||
void _do_resubmit(struct client_channel *ch)
|
void _do_resubmit(struct client_channel *ch)
|
||||||
{
|
{
|
||||||
@ -176,7 +220,12 @@ int _setup_channel(struct client_bundle *bundle, int ch_nr)
|
|||||||
ch->socket.s_recv_abort = mars_client_abort;
|
ch->socket.s_recv_abort = mars_client_abort;
|
||||||
ch->is_open = true;
|
ch->is_open = true;
|
||||||
|
|
||||||
ch->receiver.thread = brick_thread_create(receiver_thread, ch, "mars_receiver%d.%d.%d", bundle->thread_count, ch_nr, ch->thread_count++);
|
ch->receiver.thread =
|
||||||
|
brick_thread_create(receiver_thread, ch,
|
||||||
|
"mars_rcv%d.%d/%d",
|
||||||
|
bundle->last_thread_nr,
|
||||||
|
ch->thread_restart_count++,
|
||||||
|
ch_nr);
|
||||||
if (unlikely(!ch->receiver.thread)) {
|
if (unlikely(!ch->receiver.thread)) {
|
||||||
MARS_ERR("cannot start receiver thread %d, status = %d\n", ch_nr, status);
|
MARS_ERR("cannot start receiver thread %d, status = %d\n", ch_nr, status);
|
||||||
status = -ENOENT;
|
status = -ENOENT;
|
||||||
@ -369,9 +418,13 @@ int _setup_bundle(struct client_bundle *bundle, const char *str)
|
|||||||
}
|
}
|
||||||
*bundle->host++ = '\0';
|
*bundle->host++ = '\0';
|
||||||
|
|
||||||
bundle->thread_count = thread_count++;
|
bundle->last_thread_nr = get_human_nr(&global_numbers);
|
||||||
bundle->sender.thread = brick_thread_create(sender_thread, bundle, "mars_sender%d", bundle->thread_count);
|
bundle->sender.thread =
|
||||||
|
brick_thread_create(sender_thread, bundle,
|
||||||
|
"mars_snd%d",
|
||||||
|
bundle->last_thread_nr);
|
||||||
if (unlikely(!bundle->sender.thread)) {
|
if (unlikely(!bundle->sender.thread)) {
|
||||||
|
put_human_nr(&global_numbers, bundle->last_thread_nr);
|
||||||
MARS_ERR("cannot start sender thread for '%s' @%s\n",
|
MARS_ERR("cannot start sender thread for '%s' @%s\n",
|
||||||
bundle->path,
|
bundle->path,
|
||||||
bundle->host);
|
bundle->host);
|
||||||
@ -887,11 +940,11 @@ static int sender_thread(void *data)
|
|||||||
// timeouting is a rather expensive operation, don't do it too often
|
// timeouting is a rather expensive operation, don't do it too often
|
||||||
if (do_timeout) {
|
if (do_timeout) {
|
||||||
do_timeout = false;
|
do_timeout = false;
|
||||||
_maintain_bundle(&output->bundle);
|
_maintain_bundle(bundle);
|
||||||
_do_timeout_all(output, false);
|
_do_timeout_all(output, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
brick_wait_smp(output->bundle.sender_event,
|
brick_wait_smp(bundle->sender_event,
|
||||||
!list_empty(&output->mref_list) ||
|
!list_empty(&output->mref_list) ||
|
||||||
output->get_info,
|
output->get_info,
|
||||||
2 * HZ);
|
2 * HZ);
|
||||||
@ -912,8 +965,8 @@ static int sender_thread(void *data)
|
|||||||
status = _request_info(ch);
|
status = _request_info(ch);
|
||||||
if (unlikely(status < 0)) {
|
if (unlikely(status < 0)) {
|
||||||
MARS_WRN("cannot send info request '%s' @%s, status = %d\n",
|
MARS_WRN("cannot send info request '%s' @%s, status = %d\n",
|
||||||
output->bundle.path,
|
bundle->path,
|
||||||
output->bundle.host,
|
bundle->host,
|
||||||
status);
|
status);
|
||||||
do_timeout = true;
|
do_timeout = true;
|
||||||
brick_msleep(1000);
|
brick_msleep(1000);
|
||||||
@ -995,8 +1048,8 @@ static int sender_thread(void *data)
|
|||||||
ch = NULL;
|
ch = NULL;
|
||||||
// retry submission on next occasion..
|
// retry submission on next occasion..
|
||||||
MARS_WRN("mref send '%s' @%s failed, status = %d\n",
|
MARS_WRN("mref send '%s' @%s failed, status = %d\n",
|
||||||
output->bundle.path,
|
bundle->path,
|
||||||
output->bundle.host,
|
bundle->host,
|
||||||
status);
|
status);
|
||||||
|
|
||||||
brick_msleep(100);
|
brick_msleep(100);
|
||||||
@ -1006,8 +1059,8 @@ static int sender_thread(void *data)
|
|||||||
|
|
||||||
if (unlikely(status < 0)) {
|
if (unlikely(status < 0)) {
|
||||||
MARS_WRN("sender thread '%s' @%s terminated with status = %d\n",
|
MARS_WRN("sender thread '%s' @%s terminated with status = %d\n",
|
||||||
output->bundle.path,
|
bundle->path,
|
||||||
output->bundle.host,
|
bundle->host,
|
||||||
status);
|
status);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1023,9 +1076,10 @@ static int sender_thread(void *data)
|
|||||||
if (!atomic_dec_return(&sender_count))
|
if (!atomic_dec_return(&sender_count))
|
||||||
mars_limit_reset(&client_limiter);
|
mars_limit_reset(&client_limiter);
|
||||||
|
|
||||||
brick_wake_smp(&output->bundle.sender_event);
|
|
||||||
MARS_DBG("sender terminated\n");
|
|
||||||
atomic_dec(&brick->sender_count);
|
atomic_dec(&brick->sender_count);
|
||||||
|
put_human_nr(&global_numbers, bundle->last_thread_nr);
|
||||||
|
brick_wake_smp(&bundle->sender_event);
|
||||||
|
MARS_DBG("sender terminated\n");
|
||||||
atomic_dec(&client_sender_count);
|
atomic_dec(&client_sender_count);
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
@ -81,8 +81,8 @@ struct client_channel {
|
|||||||
struct list_head wait_list;
|
struct list_head wait_list;
|
||||||
struct client_output *output;
|
struct client_output *output;
|
||||||
long current_space;
|
long current_space;
|
||||||
int thread_count;
|
|
||||||
int recv_error;
|
int recv_error;
|
||||||
|
int thread_restart_count;
|
||||||
int ch_nr;
|
int ch_nr;
|
||||||
bool is_used;
|
bool is_used;
|
||||||
bool is_open;
|
bool is_open;
|
||||||
@ -93,7 +93,7 @@ struct client_bundle {
|
|||||||
char *host;
|
char *host;
|
||||||
char *path;
|
char *path;
|
||||||
struct mars_tcp_params *params;
|
struct mars_tcp_params *params;
|
||||||
int thread_count;
|
int last_thread_nr;
|
||||||
int old_channel;
|
int old_channel;
|
||||||
wait_queue_head_t sender_event;
|
wait_queue_head_t sender_event;
|
||||||
struct client_threadinfo sender;
|
struct client_threadinfo sender;
|
||||||
|
Loading…
Reference in New Issue
Block a user