diff --git a/kernel/mars_client.c b/kernel/mars_client.c index 1d9c90dd..b69a00bb 100644 --- a/kernel/mars_client.c +++ b/kernel/mars_client.c @@ -54,12 +54,56 @@ int max_client_channels = 2; int max_client_bulk = 16; +/* //////////////////////// human readable numbers //////////////////////// */ + +#define MAX_HUMAN_NR 512 + +struct human_number_generator { + int hum_got; + u8 hum_wrap; + u8 hum_table[MAX_HUMAN_NR]; +}; + +typedef u8 human_numbers_t[MAX_HUMAN_NR]; + +/* No locking necessary for now (no parallelism). + */ +static +int get_human_nr(struct human_number_generator *hum_gen) +{ + int restarted = 0; + int searched = MAX_HUMAN_NR; + int res = (hum_gen->hum_got + 1) % MAX_HUMAN_NR; + + restart: + while (searched-- >= 0) { + u8 cand = hum_gen->hum_table[res]; + if (cand == hum_gen->hum_wrap) { + hum_gen->hum_table[res]++; + return res; + } + } + /* no candidate found: aliases are no longer avoidable */ + hum_gen->hum_wrap = (hum_gen->hum_wrap + 1) % MAX_HUMAN_NR; + if (!restarted++) + goto restart; + /* fallback is now exhausted: we need to lie */ + return 0; +} + +static +void put_human_nr(struct human_number_generator *hum_gen, int nr) +{ + int index = nr % MAX_HUMAN_NR; + hum_gen->hum_table[index] = hum_gen->hum_wrap; +} + +static struct human_number_generator global_numbers; + ///////////////////////// own helper functions //////////////////////// static atomic_t sender_count = ATOMIC_INIT(0); -static int thread_count = 0; - static void _do_resubmit(struct client_channel *ch) { @@ -176,7 +220,12 @@ int _setup_channel(struct client_bundle *bundle, int ch_nr) ch->socket.s_recv_abort = mars_client_abort; ch->is_open = true; - ch->receiver.thread = brick_thread_create(receiver_thread, ch, "mars_receiver%d.%d.%d", bundle->thread_count, ch_nr, ch->thread_count++); + ch->receiver.thread = + brick_thread_create(receiver_thread, ch, + "mars_rcv%d.%d/%d", + bundle->last_thread_nr, + ch->thread_restart_count++, + ch_nr); if (unlikely(!ch->receiver.thread)) { MARS_ERR("cannot start receiver thread %d, status = %d\n", ch_nr, status); status = -ENOENT; @@ -369,9 +418,13 @@ int _setup_bundle(struct client_bundle *bundle, const char *str) } *bundle->host++ = '\0'; - bundle->thread_count = thread_count++; - bundle->sender.thread = brick_thread_create(sender_thread, bundle, "mars_sender%d", bundle->thread_count); + bundle->last_thread_nr = get_human_nr(&global_numbers); + bundle->sender.thread = + brick_thread_create(sender_thread, bundle, + "mars_snd%d", + bundle->last_thread_nr); if (unlikely(!bundle->sender.thread)) { + put_human_nr(&global_numbers, bundle->last_thread_nr); MARS_ERR("cannot start sender thread for '%s' @%s\n", bundle->path, bundle->host); @@ -887,11 +940,11 @@ static int sender_thread(void *data) // timeouting is a rather expensive operation, don't do it too often if (do_timeout) { do_timeout = false; - _maintain_bundle(&output->bundle); + _maintain_bundle(bundle); _do_timeout_all(output, false); } - brick_wait_smp(output->bundle.sender_event, + brick_wait_smp(bundle->sender_event, !list_empty(&output->mref_list) || output->get_info, 2 * HZ); @@ -912,8 +965,8 @@ static int sender_thread(void *data) status = _request_info(ch); if (unlikely(status < 0)) { MARS_WRN("cannot send info request '%s' @%s, status = %d\n", - output->bundle.path, - output->bundle.host, + bundle->path, + bundle->host, status); do_timeout = true; brick_msleep(1000); @@ -995,8 +1048,8 @@ static int sender_thread(void *data) ch = NULL; // retry submission on next occasion.. MARS_WRN("mref send '%s' @%s failed, status = %d\n", - output->bundle.path, - output->bundle.host, + bundle->path, + bundle->host, status); brick_msleep(100); @@ -1006,8 +1059,8 @@ static int sender_thread(void *data) if (unlikely(status < 0)) { MARS_WRN("sender thread '%s' @%s terminated with status = %d\n", - output->bundle.path, - output->bundle.host, + bundle->path, + bundle->host, status); } @@ -1023,9 +1076,10 @@ static int sender_thread(void *data) if (!atomic_dec_return(&sender_count)) mars_limit_reset(&client_limiter); - brick_wake_smp(&output->bundle.sender_event); - MARS_DBG("sender terminated\n"); atomic_dec(&brick->sender_count); + put_human_nr(&global_numbers, bundle->last_thread_nr); + brick_wake_smp(&bundle->sender_event); + MARS_DBG("sender terminated\n"); atomic_dec(&client_sender_count); return status; } diff --git a/kernel/mars_client.h b/kernel/mars_client.h index bed95862..16f3b3bb 100644 --- a/kernel/mars_client.h +++ b/kernel/mars_client.h @@ -81,8 +81,8 @@ struct client_channel { struct list_head wait_list; struct client_output *output; long current_space; - int thread_count; int recv_error; + int thread_restart_count; int ch_nr; bool is_used; bool is_open; @@ -93,7 +93,7 @@ struct client_bundle { char *host; char *path; struct mars_tcp_params *params; - int thread_count; + int last_thread_nr; int old_channel; wait_queue_head_t sender_event; struct client_threadinfo sender;