diff --git a/kernel/mars_client.c b/kernel/mars_client.c index 9c34bab5..a884dc62 100644 --- a/kernel/mars_client.c +++ b/kernel/mars_client.c @@ -273,8 +273,12 @@ struct client_channel *_get_channel(struct client_bundle *bundle, int min_channe } } - if (unlikely(!res)) + if (unlikely(!res)) { + MARS_WRN("cannot setup communication channel '%s' @%s\n", + bundle->path, + bundle->host); goto done; + } // send initial connect command if (unlikely(!res->is_connected)) { @@ -285,6 +289,11 @@ struct client_channel *_get_channel(struct client_bundle *bundle, int min_channe int status = mars_send_struct(&res->socket, &cmd, mars_cmd_meta); MARS_DBG("send CMD_CONNECT status = %d\n", status); if (unlikely(status < 0)) { + MARS_WRN("connect '%s' @%s on channel %d failed, status = %d\n", + bundle->path, + bundle->host, + best_channel, + status); _kill_channel(res); res = NULL; goto done; @@ -342,7 +351,9 @@ int _setup_bundle(struct client_bundle *bundle, const char *str) bundle->thread_count = thread_count++; bundle->sender.thread = brick_thread_create(sender_thread, bundle, "mars_sender%d", bundle->thread_count); if (unlikely(!bundle->sender.thread)) { - MARS_ERR("cannot start sender thread\n"); + MARS_ERR("cannot start sender thread for '%s' @%s\n", + bundle->path, + bundle->host); status = -ENOENT; goto done; } @@ -540,7 +551,10 @@ int receiver_thread(void *data) case CMD_CONNECT: if (cmd.cmd_int1 < 0) { status = cmd.cmd_int1; - MARS_ERR("at remote side: brick connect failed, remote status = %d\n", status); + MARS_ERR("remote brick connect '%s' @%s failed, remote status = %d\n", + output->bundle.path, + output->bundle.host, + status); goto done; } break; @@ -553,23 +567,26 @@ int receiver_thread(void *data) struct mref_object *tmp_mref; mref_a = container_of(tmp, struct client_mref_aspect, hash_head); tmp_mref = mref_a->object; - if (unlikely(!tmp_mref)) { - traced_unlock(&output->lock, flags); - MARS_ERR("bad internal mref pointer\n"); - status = -EBADR; - goto done; - } - if (tmp_mref->ref_id == cmd.cmd_int1) { - mref = tmp_mref; - list_del_init(&mref_a->hash_head); - list_del_init(&mref_a->io_head); - break; - } + CHECK_PTR(tmp_mref, err); + if (tmp_mref->ref_id != cmd.cmd_int1) + continue; + mref = tmp_mref; + list_del_init(&mref_a->hash_head); + list_del_init(&mref_a->io_head); + break; + + err: + traced_unlock(&output->lock, flags); + status = -EBADR; + goto done; } traced_unlock(&output->lock, flags); if (unlikely(!mref)) { - MARS_WRN("got unknown id = %d for callback\n", cmd.cmd_int1); + MARS_WRN("got unknown callback id %d on '%s' @%s\n", + cmd.cmd_int1, + output->bundle.path, + output->bundle.host); // try to consume the corresponding payload mref = client_alloc_mref(output->brick); status = mars_recv_cb(&ch->socket, mref, &cmd); @@ -582,7 +599,10 @@ int receiver_thread(void *data) status = mars_recv_cb(&ch->socket, mref, &cmd); MARS_IO("new status = %d, pos = %lld len = %d rw = %d\n", status, mref->ref_pos, mref->ref_len, mref->ref_rw); if (unlikely(status < 0)) { - MARS_WRN("interrupted data transfer during callback, status = %d\n", status); + MARS_WRN("interrupted data transfer during callback on '%s' @%s, status = %d\n", + output->bundle.path, + output->bundle.host, + status); _hash_insert(output, mref_a); goto done; } @@ -601,14 +621,20 @@ int receiver_thread(void *data) case CMD_GETINFO: status = mars_recv_struct(&ch->socket, &output->info, mars_info_meta); if (status < 0) { - MARS_WRN("got bad info from remote side, status = %d\n", status); + MARS_WRN("got bad info from remote '%s' @%s, status = %d\n", + output->bundle.path, + output->bundle.host, + status); goto done; } output->got_info = true; wake_up_interruptible_all(&output->info_event); break; default: - MARS_ERR("got bad command %d from remote side, terminating.\n", cmd.cmd_code); + MARS_ERR("got bad command %d from remote '%s' @%s, terminating.\n", + cmd.cmd_code, + output->bundle.path, + output->bundle.host); status = -EBADR; goto done; } @@ -625,8 +651,11 @@ int receiver_thread(void *data) wake_up_interruptible_all(&output->bundle.sender_event); } - if (status < 0) { - MARS_WRN("receiver thread terminated with status = %d\n", status); + if (unlikely(status < 0)) { + MARS_WRN("receiver thread '%s' @%s terminated with status = %d\n", + output->bundle.path, + output->bundle.host, + status); } mars_shutdown_socket(&ch->socket); @@ -634,14 +663,12 @@ int receiver_thread(void *data) } static -void _do_timeout(struct client_output *output, struct list_head *anchor, bool force) +void _do_timeout(struct client_output *output, struct list_head *anchor, int *rounds, bool force) { struct client_brick *brick = output->brick; struct list_head *tmp; struct list_head *next; LIST_HEAD(tmp_list); - int count = 0; - int rounds = 0; long io_timeout = _compute_timeout(brick); unsigned long flags; int i; @@ -697,14 +724,15 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, bool fo mref_a = container_of(tmp, struct client_mref_aspect, tmp_head); mref = mref_a->object; - if (!rounds++) { - MARS_WRN("timeout after %ld: signalling IO error at pos = %lld len = %d\n", + if (unlikely(!(*rounds)++)) { + MARS_WRN("'%s' @%s timeout after %ld: signalling IO error at pos = %lld len = %d\n", + output->bundle.path, + output->bundle.host, io_timeout, mref->ref_pos, mref->ref_len); } - count++; atomic_inc(&output->timeout_count); SIMPLE_CALLBACK(mref, -ETIME); @@ -714,21 +742,28 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, bool fo atomic_dec(&output->fly_count); atomic_dec(&mars_global_io_flying); } - MARS_DBG("force = %d count = %d\n", force, count); } static void _do_timeout_all(struct client_output *output, bool force) { + int rounds = 0; int i; for (i = 0; i < MAX_CLIENT_CHANNELS; i++) { struct client_channel *ch = &output->bundle.channel[i]; if (!ch->is_used) continue; - _do_timeout(output, &ch->wait_list, force); + _do_timeout(output, &ch->wait_list, &rounds, force); + } + _do_timeout(output, &output->mref_list, &rounds, force); + if (unlikely(rounds > 0)) { + MARS_WRN("'%s' @%s had %d timeouts, force = %d\n", + output->bundle.path, + output->bundle.host, + rounds, + force); } - _do_timeout(output, &output->mref_list, force); } static int sender_thread(void *data) @@ -764,19 +799,21 @@ static int sender_thread(void *data) if (output->get_info) { ch = _get_channel(bundle, 0, 1); if (unlikely(!ch)) { - MARS_WRN("cannot setup info communication channel\n"); do_timeout = true; brick_msleep(1000); continue; } status = _request_info(ch); - if (likely(status >= 0)) { - output->get_info = false; - } else { - MARS_WRN("cannot send info request, status = %d\n", status); + if (unlikely(status < 0)) { + MARS_WRN("cannot send info request '%s' @%s, status = %d\n", + output->bundle.path, + output->bundle.host, + status); do_timeout = true; brick_msleep(1000); + continue; } + output->get_info = false; } /* Grab the next mref from the queue @@ -817,20 +854,18 @@ static int sender_thread(void *data) !mars_socket_is_alive(&ch->socket) || ch->ch_nr >= max_nr || --ch_skip < 0) { ch = _get_channel(bundle, min_nr, max_nr); - if (likely(ch)) { - /* estimate: add some headroom for overhead */ - ch_skip = ch->current_space / PAGE_SIZE + - ch->current_space / (PAGE_SIZE * 8); - if (ch_skip > max_client_bulk) - ch_skip = max_client_bulk; - } else { + if (unlikely(!ch)) { // notice: this will re-assign hash_head without harm _hash_insert(output, mref_a); do_timeout = true; - MARS_WRN("cannot setup communication channel\n"); brick_msleep(1000); continue; } + /* estimate: add some headroom for overhead */ + ch_skip = ch->current_space / PAGE_SIZE + + ch->current_space / (PAGE_SIZE * 8); + if (ch_skip > max_client_bulk) + ch_skip = max_client_bulk; } spin_lock(&output->lock); @@ -844,15 +879,21 @@ static int sender_thread(void *data) do_timeout = true; ch = NULL; // retry submission on next occasion.. - MARS_WRN("sending failed, status = %d\n", status); + MARS_WRN("mref send '%s' @%s failed, status = %d\n", + output->bundle.path, + output->bundle.host, + status); brick_msleep(100); continue; } } - if (status < 0) { - MARS_WRN("sender thread terminated with status = %d\n", status); + if (unlikely(status < 0)) { + MARS_WRN("sender thread '%s' @%s terminated with status = %d\n", + output->bundle.path, + output->bundle.host, + status); } _kill_all_channels(bundle);