mirror of
https://github.com/schoebel/mars
synced 2025-01-30 03:42:48 +00:00
client: better errors and warnings
This commit is contained in:
parent
4793b2c0d2
commit
ae7d89fdaf
@ -273,8 +273,12 @@ struct client_channel *_get_channel(struct client_bundle *bundle, int min_channe
|
||||
}
|
||||
}
|
||||
|
||||
if (unlikely(!res))
|
||||
if (unlikely(!res)) {
|
||||
MARS_WRN("cannot setup communication channel '%s' @%s\n",
|
||||
bundle->path,
|
||||
bundle->host);
|
||||
goto done;
|
||||
}
|
||||
|
||||
// send initial connect command
|
||||
if (unlikely(!res->is_connected)) {
|
||||
@ -285,6 +289,11 @@ struct client_channel *_get_channel(struct client_bundle *bundle, int min_channe
|
||||
int status = mars_send_struct(&res->socket, &cmd, mars_cmd_meta);
|
||||
MARS_DBG("send CMD_CONNECT status = %d\n", status);
|
||||
if (unlikely(status < 0)) {
|
||||
MARS_WRN("connect '%s' @%s on channel %d failed, status = %d\n",
|
||||
bundle->path,
|
||||
bundle->host,
|
||||
best_channel,
|
||||
status);
|
||||
_kill_channel(res);
|
||||
res = NULL;
|
||||
goto done;
|
||||
@ -342,7 +351,9 @@ int _setup_bundle(struct client_bundle *bundle, const char *str)
|
||||
bundle->thread_count = thread_count++;
|
||||
bundle->sender.thread = brick_thread_create(sender_thread, bundle, "mars_sender%d", bundle->thread_count);
|
||||
if (unlikely(!bundle->sender.thread)) {
|
||||
MARS_ERR("cannot start sender thread\n");
|
||||
MARS_ERR("cannot start sender thread for '%s' @%s\n",
|
||||
bundle->path,
|
||||
bundle->host);
|
||||
status = -ENOENT;
|
||||
goto done;
|
||||
}
|
||||
@ -540,7 +551,10 @@ int receiver_thread(void *data)
|
||||
case CMD_CONNECT:
|
||||
if (cmd.cmd_int1 < 0) {
|
||||
status = cmd.cmd_int1;
|
||||
MARS_ERR("at remote side: brick connect failed, remote status = %d\n", status);
|
||||
MARS_ERR("remote brick connect '%s' @%s failed, remote status = %d\n",
|
||||
output->bundle.path,
|
||||
output->bundle.host,
|
||||
status);
|
||||
goto done;
|
||||
}
|
||||
break;
|
||||
@ -553,23 +567,26 @@ int receiver_thread(void *data)
|
||||
struct mref_object *tmp_mref;
|
||||
mref_a = container_of(tmp, struct client_mref_aspect, hash_head);
|
||||
tmp_mref = mref_a->object;
|
||||
if (unlikely(!tmp_mref)) {
|
||||
traced_unlock(&output->lock, flags);
|
||||
MARS_ERR("bad internal mref pointer\n");
|
||||
status = -EBADR;
|
||||
goto done;
|
||||
}
|
||||
if (tmp_mref->ref_id == cmd.cmd_int1) {
|
||||
mref = tmp_mref;
|
||||
list_del_init(&mref_a->hash_head);
|
||||
list_del_init(&mref_a->io_head);
|
||||
break;
|
||||
}
|
||||
CHECK_PTR(tmp_mref, err);
|
||||
if (tmp_mref->ref_id != cmd.cmd_int1)
|
||||
continue;
|
||||
mref = tmp_mref;
|
||||
list_del_init(&mref_a->hash_head);
|
||||
list_del_init(&mref_a->io_head);
|
||||
break;
|
||||
|
||||
err:
|
||||
traced_unlock(&output->lock, flags);
|
||||
status = -EBADR;
|
||||
goto done;
|
||||
}
|
||||
traced_unlock(&output->lock, flags);
|
||||
|
||||
if (unlikely(!mref)) {
|
||||
MARS_WRN("got unknown id = %d for callback\n", cmd.cmd_int1);
|
||||
MARS_WRN("got unknown callback id %d on '%s' @%s\n",
|
||||
cmd.cmd_int1,
|
||||
output->bundle.path,
|
||||
output->bundle.host);
|
||||
// try to consume the corresponding payload
|
||||
mref = client_alloc_mref(output->brick);
|
||||
status = mars_recv_cb(&ch->socket, mref, &cmd);
|
||||
@ -582,7 +599,10 @@ int receiver_thread(void *data)
|
||||
status = mars_recv_cb(&ch->socket, mref, &cmd);
|
||||
MARS_IO("new status = %d, pos = %lld len = %d rw = %d\n", status, mref->ref_pos, mref->ref_len, mref->ref_rw);
|
||||
if (unlikely(status < 0)) {
|
||||
MARS_WRN("interrupted data transfer during callback, status = %d\n", status);
|
||||
MARS_WRN("interrupted data transfer during callback on '%s' @%s, status = %d\n",
|
||||
output->bundle.path,
|
||||
output->bundle.host,
|
||||
status);
|
||||
_hash_insert(output, mref_a);
|
||||
goto done;
|
||||
}
|
||||
@ -601,14 +621,20 @@ int receiver_thread(void *data)
|
||||
case CMD_GETINFO:
|
||||
status = mars_recv_struct(&ch->socket, &output->info, mars_info_meta);
|
||||
if (status < 0) {
|
||||
MARS_WRN("got bad info from remote side, status = %d\n", status);
|
||||
MARS_WRN("got bad info from remote '%s' @%s, status = %d\n",
|
||||
output->bundle.path,
|
||||
output->bundle.host,
|
||||
status);
|
||||
goto done;
|
||||
}
|
||||
output->got_info = true;
|
||||
wake_up_interruptible_all(&output->info_event);
|
||||
break;
|
||||
default:
|
||||
MARS_ERR("got bad command %d from remote side, terminating.\n", cmd.cmd_code);
|
||||
MARS_ERR("got bad command %d from remote '%s' @%s, terminating.\n",
|
||||
cmd.cmd_code,
|
||||
output->bundle.path,
|
||||
output->bundle.host);
|
||||
status = -EBADR;
|
||||
goto done;
|
||||
}
|
||||
@ -625,8 +651,11 @@ int receiver_thread(void *data)
|
||||
wake_up_interruptible_all(&output->bundle.sender_event);
|
||||
}
|
||||
|
||||
if (status < 0) {
|
||||
MARS_WRN("receiver thread terminated with status = %d\n", status);
|
||||
if (unlikely(status < 0)) {
|
||||
MARS_WRN("receiver thread '%s' @%s terminated with status = %d\n",
|
||||
output->bundle.path,
|
||||
output->bundle.host,
|
||||
status);
|
||||
}
|
||||
|
||||
mars_shutdown_socket(&ch->socket);
|
||||
@ -634,14 +663,12 @@ int receiver_thread(void *data)
|
||||
}
|
||||
|
||||
static
|
||||
void _do_timeout(struct client_output *output, struct list_head *anchor, bool force)
|
||||
void _do_timeout(struct client_output *output, struct list_head *anchor, int *rounds, bool force)
|
||||
{
|
||||
struct client_brick *brick = output->brick;
|
||||
struct list_head *tmp;
|
||||
struct list_head *next;
|
||||
LIST_HEAD(tmp_list);
|
||||
int count = 0;
|
||||
int rounds = 0;
|
||||
long io_timeout = _compute_timeout(brick);
|
||||
unsigned long flags;
|
||||
int i;
|
||||
@ -697,14 +724,15 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, bool fo
|
||||
mref_a = container_of(tmp, struct client_mref_aspect, tmp_head);
|
||||
mref = mref_a->object;
|
||||
|
||||
if (!rounds++) {
|
||||
MARS_WRN("timeout after %ld: signalling IO error at pos = %lld len = %d\n",
|
||||
if (unlikely(!(*rounds)++)) {
|
||||
MARS_WRN("'%s' @%s timeout after %ld: signalling IO error at pos = %lld len = %d\n",
|
||||
output->bundle.path,
|
||||
output->bundle.host,
|
||||
io_timeout,
|
||||
mref->ref_pos,
|
||||
mref->ref_len);
|
||||
}
|
||||
|
||||
count++;
|
||||
atomic_inc(&output->timeout_count);
|
||||
|
||||
SIMPLE_CALLBACK(mref, -ETIME);
|
||||
@ -714,21 +742,28 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, bool fo
|
||||
atomic_dec(&output->fly_count);
|
||||
atomic_dec(&mars_global_io_flying);
|
||||
}
|
||||
MARS_DBG("force = %d count = %d\n", force, count);
|
||||
}
|
||||
|
||||
static
|
||||
void _do_timeout_all(struct client_output *output, bool force)
|
||||
{
|
||||
int rounds = 0;
|
||||
int i;
|
||||
for (i = 0; i < MAX_CLIENT_CHANNELS; i++) {
|
||||
struct client_channel *ch = &output->bundle.channel[i];
|
||||
|
||||
if (!ch->is_used)
|
||||
continue;
|
||||
_do_timeout(output, &ch->wait_list, force);
|
||||
_do_timeout(output, &ch->wait_list, &rounds, force);
|
||||
}
|
||||
_do_timeout(output, &output->mref_list, &rounds, force);
|
||||
if (unlikely(rounds > 0)) {
|
||||
MARS_WRN("'%s' @%s had %d timeouts, force = %d\n",
|
||||
output->bundle.path,
|
||||
output->bundle.host,
|
||||
rounds,
|
||||
force);
|
||||
}
|
||||
_do_timeout(output, &output->mref_list, force);
|
||||
}
|
||||
|
||||
static int sender_thread(void *data)
|
||||
@ -764,19 +799,21 @@ static int sender_thread(void *data)
|
||||
if (output->get_info) {
|
||||
ch = _get_channel(bundle, 0, 1);
|
||||
if (unlikely(!ch)) {
|
||||
MARS_WRN("cannot setup info communication channel\n");
|
||||
do_timeout = true;
|
||||
brick_msleep(1000);
|
||||
continue;
|
||||
}
|
||||
status = _request_info(ch);
|
||||
if (likely(status >= 0)) {
|
||||
output->get_info = false;
|
||||
} else {
|
||||
MARS_WRN("cannot send info request, status = %d\n", status);
|
||||
if (unlikely(status < 0)) {
|
||||
MARS_WRN("cannot send info request '%s' @%s, status = %d\n",
|
||||
output->bundle.path,
|
||||
output->bundle.host,
|
||||
status);
|
||||
do_timeout = true;
|
||||
brick_msleep(1000);
|
||||
continue;
|
||||
}
|
||||
output->get_info = false;
|
||||
}
|
||||
|
||||
/* Grab the next mref from the queue
|
||||
@ -817,20 +854,18 @@ static int sender_thread(void *data)
|
||||
!mars_socket_is_alive(&ch->socket) ||
|
||||
ch->ch_nr >= max_nr || --ch_skip < 0) {
|
||||
ch = _get_channel(bundle, min_nr, max_nr);
|
||||
if (likely(ch)) {
|
||||
/* estimate: add some headroom for overhead */
|
||||
ch_skip = ch->current_space / PAGE_SIZE +
|
||||
ch->current_space / (PAGE_SIZE * 8);
|
||||
if (ch_skip > max_client_bulk)
|
||||
ch_skip = max_client_bulk;
|
||||
} else {
|
||||
if (unlikely(!ch)) {
|
||||
// notice: this will re-assign hash_head without harm
|
||||
_hash_insert(output, mref_a);
|
||||
do_timeout = true;
|
||||
MARS_WRN("cannot setup communication channel\n");
|
||||
brick_msleep(1000);
|
||||
continue;
|
||||
}
|
||||
/* estimate: add some headroom for overhead */
|
||||
ch_skip = ch->current_space / PAGE_SIZE +
|
||||
ch->current_space / (PAGE_SIZE * 8);
|
||||
if (ch_skip > max_client_bulk)
|
||||
ch_skip = max_client_bulk;
|
||||
}
|
||||
|
||||
spin_lock(&output->lock);
|
||||
@ -844,15 +879,21 @@ static int sender_thread(void *data)
|
||||
do_timeout = true;
|
||||
ch = NULL;
|
||||
// retry submission on next occasion..
|
||||
MARS_WRN("sending failed, status = %d\n", status);
|
||||
MARS_WRN("mref send '%s' @%s failed, status = %d\n",
|
||||
output->bundle.path,
|
||||
output->bundle.host,
|
||||
status);
|
||||
|
||||
brick_msleep(100);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (status < 0) {
|
||||
MARS_WRN("sender thread terminated with status = %d\n", status);
|
||||
if (unlikely(status < 0)) {
|
||||
MARS_WRN("sender thread '%s' @%s terminated with status = %d\n",
|
||||
output->bundle.path,
|
||||
output->bundle.host,
|
||||
status);
|
||||
}
|
||||
|
||||
_kill_all_channels(bundle);
|
||||
|
Loading…
Reference in New Issue
Block a user