MAJOR: check: Use a persistent conn-stream for health-checks

In the same way a stream has always valid conn-streams, when a health-checks
is created, a conn-stream is now created and the health-check is attached on
it, as an app. This simplify a bit the connect part when a health-check is
running.
This commit is contained in:
Christopher Faulet 2022-01-06 08:46:56 +01:00
parent 14fd99a20c
commit 54e85cbfc7
3 changed files with 47 additions and 65 deletions

View File

@ -1094,7 +1094,6 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
{
struct check *check = context;
struct proxy *proxy = check->proxy;
struct conn_stream *cs;
struct connection *conn;
int rv;
int expired = tick_is_expired(t->expire, now_ms);
@ -1137,8 +1136,7 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
expired = 0;
}
cs = check->cs;
conn = cs_conn(cs);
conn = cs_conn(check->cs);
/* there was a test running.
* First, let's check whether there was an uncaught error,
@ -1148,17 +1146,15 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
/* Here the connection must be defined. Otherwise the
* error would have already been detected
*/
if ((conn && ((conn->flags & CO_FL_ERROR) || (cs->flags & CS_FL_ERROR))) || expired) {
if ((conn && ((conn->flags & CO_FL_ERROR) || (check->cs->flags & CS_FL_ERROR))) || expired) {
TRACE_ERROR("report connection error", CHK_EV_TASK_WAKE|CHK_EV_HCHK_END|CHK_EV_HCHK_ERR, check);
chk_report_conn_err(check, 0, expired);
}
else {
if (check->state & CHK_ST_CLOSE_CONN) {
TRACE_DEVEL("closing current connection", CHK_EV_TASK_WAKE|CHK_EV_HCHK_RUN, check);
cs_destroy(cs);
cs = NULL;
cs_detach_endp(check->cs);
conn = NULL;
check->cs = NULL;
check->state &= ~CHK_ST_CLOSE_CONN;
tcpcheck_main(check);
}
@ -1173,8 +1169,7 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
TRACE_STATE("health-check complete or aborted", CHK_EV_TASK_WAKE|CHK_EV_HCHK_END, check);
check->current_step = NULL;
cs = check->cs;
conn = cs_conn(cs);
conn = cs_conn(check->cs);
if (conn && conn->xprt) {
/* The check was aborted and the connection was not yet closed.
@ -1182,21 +1177,18 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
* as a failed response coupled with "observe layer7" caused the
* server state to be suddenly changed.
*/
cs_drain_and_close(cs);
cs_drain_and_close(check->cs);
}
if (cs) {
if (conn && check->wait_list.events)
conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list);
/* We may have been scheduled to run, and the
* I/O handler expects to have a cs, so remove
* the tasklet
*/
tasklet_remove_from_tasklet_list(check->wait_list.tasklet);
cs_destroy(cs);
cs = check->cs = NULL;
conn = NULL;
}
/* TODO: must be handled by cs_detach_endp */
if (conn && check->wait_list.events)
conn->mux->unsubscribe(check->cs, check->wait_list.events, &check->wait_list);
/* We may have been scheduled to run, and the
* I/O handler expects to have a cs, so remove
* the tasklet
*/
tasklet_remove_from_tasklet_list(check->wait_list.tasklet);
cs_detach_endp(check->cs);
if (check->sess != NULL) {
vars_prune(&check->vars, check->sess, NULL);
@ -1356,11 +1348,8 @@ void free_check(struct check *check)
check_release_buf(check, &check->bi);
check_release_buf(check, &check->bo);
if (check->cs) {
struct connection *conn = cs_conn(check->cs);
if (conn)
conn_free(conn);
cs_free(check->cs);
cs_detach_endp(check->cs);
cs_detach_app(check->cs);
check->cs = NULL;
}
}
@ -1399,15 +1388,18 @@ int start_check_task(struct check *check, int mininter,
/* task for the check. Process-based checks exclusively run on thread 1. */
if (check->type == PR_O2_EXT_CHK)
t = task_new_on(0);
else
else {
check->cs = cs_new();
if (!check->cs)
goto fail_alloc_cs;
if (cs_attach_app(check->cs, &check->obj_type) < 0)
goto fail_attach_cs;
t = task_new_anywhere();
if (!t) {
ha_alert("Starting [%s:%s] check: out of memory.\n",
check->server->proxy->id, check->server->id);
return 0;
}
if (!t)
goto fail_alloc_task;
check->task = t;
t->process = process_chk;
t->context = check;
@ -1424,6 +1416,14 @@ int start_check_task(struct check *check, int mininter,
task_queue(t);
return 1;
fail_alloc_task:
fail_attach_cs:
cs_free(check->cs);
fail_alloc_cs:
ha_alert("Starting [%s:%s] check: out of memory.\n",
check->server->proxy->id, check->server->id);
return 0;
}
/*

View File

@ -112,6 +112,7 @@ void cs_detach_endp(struct conn_stream *cs)
if ((conn = cs_conn(cs))) {
if (conn->mux) {
/* TODO: handle unsubscribe for healthchecks too */
if (cs->si && cs->si->wait_event.events != 0)
conn->mux->unsubscribe(cs, cs->si->wait_event.events, &cs->si->wait_event);
conn->mux->detach(cs);

View File

@ -1057,8 +1057,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
struct proxy *proxy = check->proxy;
struct server *s = check->server;
struct task *t = check->task;
struct conn_stream *cs = check->cs;
struct connection *conn = cs_conn(cs);
struct connection *conn = cs_conn(check->cs);
struct protocol *proto;
struct xprt_ops *xprt;
struct tcpcheck_rule *next;
@ -1074,7 +1073,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
/* We are still waiting for the connection establishment */
if (next && next->action == TCPCHK_ACT_SEND) {
if (!(check->wait_list.events & SUB_RETRY_SEND))
conn->mux->subscribe(cs, SUB_RETRY_SEND, &check->wait_list);
conn->mux->subscribe(check->cs, SUB_RETRY_SEND, &check->wait_list);
ret = TCPCHK_EVAL_WAIT;
TRACE_DEVEL("not connected yet", CHK_EV_TCPCHK_CONN, check);
}
@ -1092,9 +1091,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
/* No connection, prepare a new one */
conn = conn_new((s ? &s->obj_type : &proxy->obj_type));
if (conn)
cs = cs_new();
if (!conn || !cs) {
if (!conn) {
chunk_printf(&trash, "TCPCHK error allocating connection at step %d",
tcpcheck_get_step_id(check, rule));
if (rule->comment)
@ -1102,25 +1099,10 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
set_server_check_status(check, HCHK_STATUS_SOCKERR, trash.area);
ret = TCPCHK_EVAL_STOP;
TRACE_ERROR("conn-stream allocation error", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check);
if (conn)
conn_free(conn);
goto out;
}
cs_attach_endp(cs, &conn->obj_type, conn);
if (cs_attach_app(cs, &check->obj_type) < 0) {
chunk_printf(&trash, "TCPCHK error allocating connection at step %d",
tcpcheck_get_step_id(check, rule));
if (rule->comment)
chunk_appendf(&trash, " comment: '%s'", rule->comment);
set_server_check_status(check, HCHK_STATUS_SOCKERR, trash.area);
ret = TCPCHK_EVAL_STOP;
TRACE_ERROR("conn-stream allocation error", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check);
cs_destroy(cs);
goto out;
}
cs_attach_endp(check->cs, &conn->obj_type, conn);
tasklet_set_tid(check->wait_list.tasklet, tid);
check->cs = cs;
conn_set_owner(conn, check->sess, NULL);
/* Maybe there were an older connection we were waiting on */
@ -1214,7 +1196,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
goto fail_check;
conn_set_private(conn);
conn->ctx = cs;
conn->ctx = check->cs;
#ifdef USE_OPENSSL
if (connect->sni)
@ -1263,7 +1245,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
mux_ops = conn_get_best_mux(conn, IST_NULL, PROTO_SIDE_BE, mode);
}
if (mux_ops && conn_install_mux(conn, mux_ops, cs, proxy, check->sess) < 0) {
if (mux_ops && conn_install_mux(conn, mux_ops, check->cs, proxy, check->sess) < 0) {
TRACE_ERROR("failed to install mux", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check);
status = SF_ERR_INTERNAL;
goto fail_check;
@ -1310,9 +1292,9 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
if (conn->flags & CO_FL_WAIT_XPRT) {
if (conn->mux) {
if (next && next->action == TCPCHK_ACT_SEND)
conn->mux->subscribe(cs, SUB_RETRY_SEND, &check->wait_list);
conn->mux->subscribe(check->cs, SUB_RETRY_SEND, &check->wait_list);
else
conn->mux->subscribe(cs, SUB_RETRY_RECV, &check->wait_list);
conn->mux->subscribe(check->cs, SUB_RETRY_RECV, &check->wait_list);
}
ret = TCPCHK_EVAL_WAIT;
TRACE_DEVEL("not connected yet", CHK_EV_TCPCHK_CONN, check);
@ -2157,7 +2139,7 @@ int tcpcheck_main(struct check *check)
*/
/* 1- check for connection error, if any */
if ((conn && conn->flags & CO_FL_ERROR) || (cs && cs->flags & CS_FL_ERROR))
if ((conn && conn->flags & CO_FL_ERROR) || (cs->flags & CS_FL_ERROR))
goto out_end_tcpcheck;
/* 2- check if a rule must be resume. It happens if check->current_step
@ -2202,7 +2184,7 @@ int tcpcheck_main(struct check *check)
switch (rule->action) {
case TCPCHK_ACT_CONNECT:
/* Not the first connection, release it first */
if (cs && check->current_step != rule) {
if (cs_conn(cs) && check->current_step != rule) {
check->state |= CHK_ST_CLOSE_CONN;
retcode = -1;
}
@ -2219,11 +2201,10 @@ int tcpcheck_main(struct check *check)
TRACE_PROTO("eval connect rule", CHK_EV_TCPCHK_EVAL|CHK_EV_TCPCHK_CONN, check);
eval_ret = tcpcheck_eval_connect(check, rule);
/* Refresh conn-stream and connection */
cs = check->cs;
/* Refresh connection */
conn = cs_conn(cs);
last_read = 0;
must_read = ((cs && IS_HTX_CS(cs)) ? htx_is_empty(htxbuf(&check->bi)) : !b_data(&check->bi));
must_read = (IS_HTX_CS(cs) ? htx_is_empty(htxbuf(&check->bi)) : !b_data(&check->bi));
break;
case TCPCHK_ACT_SEND:
check->current_step = rule;
@ -2321,7 +2302,7 @@ int tcpcheck_main(struct check *check)
TRACE_PROTO("tcp-check passed", CHK_EV_TCPCHK_EVAL, check);
out_end_tcpcheck:
if ((conn && conn->flags & CO_FL_ERROR) || (cs && cs->flags & CS_FL_ERROR)) {
if ((conn && conn->flags & CO_FL_ERROR) || (cs->flags & CS_FL_ERROR)) {
TRACE_ERROR("report connection error", CHK_EV_TCPCHK_EVAL|CHK_EV_TCPCHK_ERR, check);
chk_report_conn_err(check, errno, 0);
}