MEDIUM: check: Use the CS to handle subscriptions for read/write events

Instead of using the health-check to subscribe to read/write events, we now
rely on the conn-stream. Indeed, on the server side, the conn-stream's
endpoint is a multiplexer. Thus it seems appropriate to handle subscriptions
for read/write events the same way than for the streams. Of course, the I/O
callback function is not the same. We use srv_chk_io_cb() instead of
cs_conn_io_cb().
This commit is contained in:
Christopher Faulet 2022-05-18 15:57:15 +02:00
parent 361417f9b4
commit c95eaefbfd
4 changed files with 25 additions and 36 deletions

View File

@ -176,7 +176,6 @@ struct check {
char **envp; /* the environment to use if running a process-based check */
struct pid_list *curpid; /* entry in pid_list used for current process-based test, or -1 if not in test */
struct sockaddr_storage addr; /* the address to check */
struct wait_event wait_list; /* Waiting for I/O events */
char *sni; /* Server name */
char *alpn_str; /* ALPN to use for checks */
int alpn_len; /* ALPN string length */

View File

@ -1070,8 +1070,7 @@ static int wake_srv_chk(struct conn_stream *cs)
/* This function checks if any I/O is wanted, and if so, attempts to do so */
struct task *srv_chk_io_cb(struct task *t, void *ctx, unsigned int state)
{
struct check *check = ctx;
struct conn_stream *cs = check->cs;
struct conn_stream *cs = ctx;
wake_srv_chk(cs);
return NULL;
@ -1188,13 +1187,6 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
}
if (cs) {
if (conn && check->wait_list.events)
conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list);
/* We may have been scheduled to run, and the
* I/O handler expects to have a cs, so remove
* the tasklet
*/
tasklet_remove_from_tasklet_list(check->wait_list.tasklet);
cs_destroy(cs);
cs = check->cs = NULL;
conn = NULL;
@ -1280,16 +1272,18 @@ int check_buf_available(void *target)
{
struct check *check = target;
BUG_ON(!check->cs);
if ((check->state & CHK_ST_IN_ALLOC) && b_alloc(&check->bi)) {
TRACE_STATE("unblocking check, input buffer allocated", CHK_EV_TCPCHK_EXP|CHK_EV_RX_BLK, check);
check->state &= ~CHK_ST_IN_ALLOC;
tasklet_wakeup(check->wait_list.tasklet);
tasklet_wakeup(check->cs->wait_event.tasklet);
return 1;
}
if ((check->state & CHK_ST_OUT_ALLOC) && b_alloc(&check->bo)) {
TRACE_STATE("unblocking check, output buffer allocated", CHK_EV_TCPCHK_SND|CHK_EV_TX_BLK, check);
check->state &= ~CHK_ST_OUT_ALLOC;
tasklet_wakeup(check->wait_list.tasklet);
tasklet_wakeup(check->cs->wait_event.tasklet);
return 1;
}
@ -1331,13 +1325,6 @@ const char *init_check(struct check *check, int type)
check->bi = BUF_NULL;
check->bo = BUF_NULL;
LIST_INIT(&check->buf_wait.list);
check->wait_list.tasklet = tasklet_new();
if (!check->wait_list.tasklet)
return "out of memory while allocating check tasklet";
check->wait_list.events = 0;
check->wait_list.tasklet->process = srv_chk_io_cb;
check->wait_list.tasklet->context = check;
return NULL;
}
@ -1357,8 +1344,6 @@ void free_check(struct check *check)
}
task_destroy(check->task);
if (check->wait_list.tasklet)
tasklet_free(check->wait_list.tasklet);
check_release_buf(check, &check->bi);
check_release_buf(check, &check->bo);

View File

@ -266,8 +266,18 @@ int cs_attach_mux(struct conn_stream *cs, void *target, void *ctx)
cs->ops = &cs_app_conn_ops;
cs->data_cb = &cs_data_conn_cb;
}
else if (cs_check(cs))
else if (cs_check(cs)) {
if (!cs->wait_event.tasklet) {
cs->wait_event.tasklet = tasklet_new();
if (!cs->wait_event.tasklet)
return -1;
cs->wait_event.tasklet->process = srv_chk_io_cb;
cs->wait_event.tasklet->context = cs;
cs->wait_event.events = 0;
}
cs->data_cb = &check_conn_cb;
}
return 0;
}
@ -340,7 +350,6 @@ static void cs_detach_endp(struct conn_stream **csp)
struct cs_endpoint *endp = cs->endp;
if (conn->mux) {
/* TODO: handle unsubscribe for healthchecks too */
if (cs->wait_event.events != 0)
conn->mux->unsubscribe(cs, cs->wait_event.events, &cs->wait_event);
endp->flags |= CS_EP_ORPHAN;

View File

@ -1071,8 +1071,8 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
if (conn->flags & CO_FL_WAIT_XPRT) {
/* We are still waiting for the connection establishment */
if (next && next->action == TCPCHK_ACT_SEND) {
if (!(check->wait_list.events & SUB_RETRY_SEND))
conn->mux->subscribe(check->cs, SUB_RETRY_SEND, &check->wait_list);
if (!(check->cs->wait_event.events & SUB_RETRY_SEND))
conn->mux->subscribe(check->cs, SUB_RETRY_SEND, &check->cs->wait_event);
ret = TCPCHK_EVAL_WAIT;
TRACE_DEVEL("not connected yet", CHK_EV_TCPCHK_CONN, check);
}
@ -1108,12 +1108,8 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
goto fail_check;
}
conn->ctx = check->cs;
tasklet_set_tid(check->wait_list.tasklet, tid);
conn_set_owner(conn, check->sess, NULL);
/* Maybe there were an older connection we were waiting on */
check->wait_list.events = 0;
/* no client address */
if (!sockaddr_alloc(&conn->dst, NULL, 0)) {
TRACE_ERROR("sockaddr allocation error", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check);
@ -1298,9 +1294,9 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
if (conn->flags & CO_FL_WAIT_XPRT) {
if (conn->mux) {
if (next && next->action == TCPCHK_ACT_SEND)
conn->mux->subscribe(check->cs, SUB_RETRY_SEND, &check->wait_list);
conn->mux->subscribe(check->cs, SUB_RETRY_SEND, &check->cs->wait_event);
else
conn->mux->subscribe(check->cs, SUB_RETRY_RECV, &check->wait_list);
conn->mux->subscribe(check->cs, SUB_RETRY_RECV, &check->cs->wait_event);
}
ret = TCPCHK_EVAL_WAIT;
TRACE_DEVEL("not connected yet", CHK_EV_TCPCHK_CONN, check);
@ -1495,7 +1491,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_send(struct check *check, struct tcpcheck_r
}
}
if ((IS_HTX_CONN(conn) && !htx_is_empty(htxbuf(&check->bo))) || (!IS_HTX_CONN(conn) && b_data(&check->bo))) {
conn->mux->subscribe(cs, SUB_RETRY_SEND, &check->wait_list);
conn->mux->subscribe(cs, SUB_RETRY_SEND, &cs->wait_event);
ret = TCPCHK_EVAL_WAIT;
TRACE_DEVEL("data not fully sent, wait", CHK_EV_TCPCHK_SND|CHK_EV_TX_DATA, check);
goto out;
@ -1547,7 +1543,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_recv(struct check *check, struct tcpcheck_r
TRACE_ENTER(CHK_EV_RX_DATA, check);
if (check->wait_list.events & SUB_RETRY_RECV) {
if (cs->wait_event.events & SUB_RETRY_RECV) {
TRACE_DEVEL("waiting for response", CHK_EV_RX_DATA, check);
goto wait_more_data;
}
@ -1600,7 +1596,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_recv(struct check *check, struct tcpcheck_r
goto out;
}
if (!(cs->endp->flags & (CS_EP_WANT_ROOM|CS_EP_ERROR|CS_EP_EOS))) {
conn->mux->subscribe(cs, SUB_RETRY_RECV, &check->wait_list);
conn->mux->subscribe(cs, SUB_RETRY_RECV, &cs->wait_event);
TRACE_DEVEL("waiting for response", CHK_EV_RX_DATA, check);
goto wait_more_data;
}
@ -2237,8 +2233,8 @@ int tcpcheck_main(struct check *check)
if (eval_ret == TCPCHK_EVAL_WAIT) {
check->current_step = rule->expect.head;
if (!(check->wait_list.events & SUB_RETRY_RECV))
conn->mux->subscribe(cs, SUB_RETRY_RECV, &check->wait_list);
if (!(cs->wait_event.events & SUB_RETRY_RECV))
conn->mux->subscribe(cs, SUB_RETRY_RECV, &cs->wait_event);
}
break;
case TCPCHK_ACT_ACTION_KW: