From c95eaefbfd45ae2ed47d1181a7c9179d7742a01b Mon Sep 17 00:00:00 2001 From: Christopher Faulet Date: Wed, 18 May 2022 15:57:15 +0200 Subject: [PATCH] MEDIUM: check: Use the CS to handle subscriptions for read/write events Instead of using the health-check to subscribe to read/write events, we now rely on the conn-stream. Indeed, on the server side, the conn-stream's endpoint is a multiplexer. Thus it seems appropriate to handle subscriptions for read/write events the same way than for the streams. Of course, the I/O callback function is not the same. We use srv_chk_io_cb() instead of cs_conn_io_cb(). --- include/haproxy/check-t.h | 1 - src/check.c | 25 +++++-------------------- src/conn_stream.c | 13 +++++++++++-- src/tcpcheck.c | 22 +++++++++------------- 4 files changed, 25 insertions(+), 36 deletions(-) diff --git a/include/haproxy/check-t.h b/include/haproxy/check-t.h index c7a88a344..066737751 100644 --- a/include/haproxy/check-t.h +++ b/include/haproxy/check-t.h @@ -176,7 +176,6 @@ struct check { char **envp; /* the environment to use if running a process-based check */ struct pid_list *curpid; /* entry in pid_list used for current process-based test, or -1 if not in test */ struct sockaddr_storage addr; /* the address to check */ - struct wait_event wait_list; /* Waiting for I/O events */ char *sni; /* Server name */ char *alpn_str; /* ALPN to use for checks */ int alpn_len; /* ALPN string length */ diff --git a/src/check.c b/src/check.c index f80c66e74..c8891666c 100644 --- a/src/check.c +++ b/src/check.c @@ -1070,8 +1070,7 @@ static int wake_srv_chk(struct conn_stream *cs) /* This function checks if any I/O is wanted, and if so, attempts to do so */ struct task *srv_chk_io_cb(struct task *t, void *ctx, unsigned int state) { - struct check *check = ctx; - struct conn_stream *cs = check->cs; + struct conn_stream *cs = ctx; wake_srv_chk(cs); return NULL; @@ -1188,13 +1187,6 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state) } if (cs) { - if (conn && check->wait_list.events) - conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list); - /* We may have been scheduled to run, and the - * I/O handler expects to have a cs, so remove - * the tasklet - */ - tasklet_remove_from_tasklet_list(check->wait_list.tasklet); cs_destroy(cs); cs = check->cs = NULL; conn = NULL; @@ -1280,16 +1272,18 @@ int check_buf_available(void *target) { struct check *check = target; + BUG_ON(!check->cs); + if ((check->state & CHK_ST_IN_ALLOC) && b_alloc(&check->bi)) { TRACE_STATE("unblocking check, input buffer allocated", CHK_EV_TCPCHK_EXP|CHK_EV_RX_BLK, check); check->state &= ~CHK_ST_IN_ALLOC; - tasklet_wakeup(check->wait_list.tasklet); + tasklet_wakeup(check->cs->wait_event.tasklet); return 1; } if ((check->state & CHK_ST_OUT_ALLOC) && b_alloc(&check->bo)) { TRACE_STATE("unblocking check, output buffer allocated", CHK_EV_TCPCHK_SND|CHK_EV_TX_BLK, check); check->state &= ~CHK_ST_OUT_ALLOC; - tasklet_wakeup(check->wait_list.tasklet); + tasklet_wakeup(check->cs->wait_event.tasklet); return 1; } @@ -1331,13 +1325,6 @@ const char *init_check(struct check *check, int type) check->bi = BUF_NULL; check->bo = BUF_NULL; LIST_INIT(&check->buf_wait.list); - - check->wait_list.tasklet = tasklet_new(); - if (!check->wait_list.tasklet) - return "out of memory while allocating check tasklet"; - check->wait_list.events = 0; - check->wait_list.tasklet->process = srv_chk_io_cb; - check->wait_list.tasklet->context = check; return NULL; } @@ -1357,8 +1344,6 @@ void free_check(struct check *check) } task_destroy(check->task); - if (check->wait_list.tasklet) - tasklet_free(check->wait_list.tasklet); check_release_buf(check, &check->bi); check_release_buf(check, &check->bo); diff --git a/src/conn_stream.c b/src/conn_stream.c index 7fc3a4935..df916cc9b 100644 --- a/src/conn_stream.c +++ b/src/conn_stream.c @@ -266,8 +266,18 @@ int cs_attach_mux(struct conn_stream *cs, void *target, void *ctx) cs->ops = &cs_app_conn_ops; cs->data_cb = &cs_data_conn_cb; } - else if (cs_check(cs)) + else if (cs_check(cs)) { + if (!cs->wait_event.tasklet) { + cs->wait_event.tasklet = tasklet_new(); + if (!cs->wait_event.tasklet) + return -1; + cs->wait_event.tasklet->process = srv_chk_io_cb; + cs->wait_event.tasklet->context = cs; + cs->wait_event.events = 0; + } + cs->data_cb = &check_conn_cb; + } return 0; } @@ -340,7 +350,6 @@ static void cs_detach_endp(struct conn_stream **csp) struct cs_endpoint *endp = cs->endp; if (conn->mux) { - /* TODO: handle unsubscribe for healthchecks too */ if (cs->wait_event.events != 0) conn->mux->unsubscribe(cs, cs->wait_event.events, &cs->wait_event); endp->flags |= CS_EP_ORPHAN; diff --git a/src/tcpcheck.c b/src/tcpcheck.c index 4a4e071d3..3b1d75a18 100644 --- a/src/tcpcheck.c +++ b/src/tcpcheck.c @@ -1071,8 +1071,8 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec if (conn->flags & CO_FL_WAIT_XPRT) { /* We are still waiting for the connection establishment */ if (next && next->action == TCPCHK_ACT_SEND) { - if (!(check->wait_list.events & SUB_RETRY_SEND)) - conn->mux->subscribe(check->cs, SUB_RETRY_SEND, &check->wait_list); + if (!(check->cs->wait_event.events & SUB_RETRY_SEND)) + conn->mux->subscribe(check->cs, SUB_RETRY_SEND, &check->cs->wait_event); ret = TCPCHK_EVAL_WAIT; TRACE_DEVEL("not connected yet", CHK_EV_TCPCHK_CONN, check); } @@ -1108,12 +1108,8 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec goto fail_check; } conn->ctx = check->cs; - tasklet_set_tid(check->wait_list.tasklet, tid); conn_set_owner(conn, check->sess, NULL); - /* Maybe there were an older connection we were waiting on */ - check->wait_list.events = 0; - /* no client address */ if (!sockaddr_alloc(&conn->dst, NULL, 0)) { TRACE_ERROR("sockaddr allocation error", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check); @@ -1298,9 +1294,9 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec if (conn->flags & CO_FL_WAIT_XPRT) { if (conn->mux) { if (next && next->action == TCPCHK_ACT_SEND) - conn->mux->subscribe(check->cs, SUB_RETRY_SEND, &check->wait_list); + conn->mux->subscribe(check->cs, SUB_RETRY_SEND, &check->cs->wait_event); else - conn->mux->subscribe(check->cs, SUB_RETRY_RECV, &check->wait_list); + conn->mux->subscribe(check->cs, SUB_RETRY_RECV, &check->cs->wait_event); } ret = TCPCHK_EVAL_WAIT; TRACE_DEVEL("not connected yet", CHK_EV_TCPCHK_CONN, check); @@ -1495,7 +1491,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_send(struct check *check, struct tcpcheck_r } } if ((IS_HTX_CONN(conn) && !htx_is_empty(htxbuf(&check->bo))) || (!IS_HTX_CONN(conn) && b_data(&check->bo))) { - conn->mux->subscribe(cs, SUB_RETRY_SEND, &check->wait_list); + conn->mux->subscribe(cs, SUB_RETRY_SEND, &cs->wait_event); ret = TCPCHK_EVAL_WAIT; TRACE_DEVEL("data not fully sent, wait", CHK_EV_TCPCHK_SND|CHK_EV_TX_DATA, check); goto out; @@ -1547,7 +1543,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_recv(struct check *check, struct tcpcheck_r TRACE_ENTER(CHK_EV_RX_DATA, check); - if (check->wait_list.events & SUB_RETRY_RECV) { + if (cs->wait_event.events & SUB_RETRY_RECV) { TRACE_DEVEL("waiting for response", CHK_EV_RX_DATA, check); goto wait_more_data; } @@ -1600,7 +1596,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_recv(struct check *check, struct tcpcheck_r goto out; } if (!(cs->endp->flags & (CS_EP_WANT_ROOM|CS_EP_ERROR|CS_EP_EOS))) { - conn->mux->subscribe(cs, SUB_RETRY_RECV, &check->wait_list); + conn->mux->subscribe(cs, SUB_RETRY_RECV, &cs->wait_event); TRACE_DEVEL("waiting for response", CHK_EV_RX_DATA, check); goto wait_more_data; } @@ -2237,8 +2233,8 @@ int tcpcheck_main(struct check *check) if (eval_ret == TCPCHK_EVAL_WAIT) { check->current_step = rule->expect.head; - if (!(check->wait_list.events & SUB_RETRY_RECV)) - conn->mux->subscribe(cs, SUB_RETRY_RECV, &check->wait_list); + if (!(cs->wait_event.events & SUB_RETRY_RECV)) + conn->mux->subscribe(cs, SUB_RETRY_RECV, &cs->wait_event); } break; case TCPCHK_ACT_ACTION_KW: