MINOR: connection: Be prepared to handle conn-stream with no connection

The conn-stream will progressively replace the stream-interface. Thus, a
stream will have to allocate the backend conn-stream during its
creation. This means it will be possible to have a conn-stream with no
connection. To prepare this change, we test the conn-stream's connection
when we retrieve it.
This commit is contained in:
Christopher Faulet 2021-12-15 09:50:17 +01:00
parent 719ceef79c
commit 0256da14a5
7 changed files with 75 additions and 69 deletions

View File

@ -246,10 +246,16 @@ static inline void conn_xprt_shutw_hard(struct connection *c)
c->xprt->shutw(c, c->xprt_ctx, 0);
}
/* Returns the conn from a cs. If cs is NULL, returns NULL */
static inline struct connection *cs_conn(const struct conn_stream *cs)
{
return cs ? cs->conn : NULL;
}
/* shut read */
static inline void cs_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
{
if (cs->flags & CS_FL_SHR)
if (!cs_conn(cs) || cs->flags & CS_FL_SHR)
return;
/* clean data-layer shutdown */
@ -261,7 +267,7 @@ static inline void cs_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
/* shut write */
static inline void cs_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
{
if (cs->flags & CS_FL_SHW)
if (!cs_conn(cs) || cs->flags & CS_FL_SHW)
return;
/* clean data-layer shutdown */
@ -387,29 +393,25 @@ static inline void conn_force_unsubscribe(struct connection *conn)
/* Release a conn_stream */
static inline void cs_destroy(struct conn_stream *cs)
{
if (cs->conn->mux)
cs->conn->mux->detach(cs);
else {
/* It's too early to have a mux, let's just destroy
* the connection
*/
struct connection *conn = cs->conn;
if (cs_conn(cs)) {
if (cs->conn->mux)
cs->conn->mux->detach(cs);
else {
/* It's too early to have a mux, let's just destroy
* the connection
*/
struct connection *conn = cs->conn;
conn_stop_tracking(conn);
conn_full_close(conn);
if (conn->destroy_cb)
conn->destroy_cb(conn);
conn_free(conn);
conn_stop_tracking(conn);
conn_full_close(conn);
if (conn->destroy_cb)
conn->destroy_cb(conn);
conn_free(conn);
}
}
cs_free(cs);
}
/* Returns the conn from a cs. If cs is NULL, returns NULL */
static inline struct connection *cs_conn(const struct conn_stream *cs)
{
return cs ? cs->conn : NULL;
}
/* Returns the source address of the connection or NULL if not set */
static inline const struct sockaddr_storage *conn_src(struct connection *conn)
{

View File

@ -183,9 +183,9 @@ static inline void si_release_endpoint(struct stream_interface *si)
appctx_free(appctx);
}
else if ((cs = objt_cs(si->end))) {
if (si->wait_event.events != 0)
if (cs_conn(cs) && si->wait_event.events != 0)
cs->conn->mux->unsubscribe(cs, si->wait_event.events,
&si->wait_event);
&si->wait_event);
cs_destroy(cs);
}
si_detach_endpoint(si);
@ -481,7 +481,7 @@ static inline int si_sync_recv(struct stream_interface *si)
return 0;
cs = objt_cs(si->end);
if (!cs || !cs->conn->mux)
if (!cs_conn(cs) || !cs->conn->mux)
return 0; // only conn_streams are supported
if (si->wait_event.events & SUB_RETRY_RECV)
@ -578,7 +578,7 @@ static inline const struct sockaddr_storage *si_src(struct stream_interface *si)
else {
struct conn_stream *cs = objt_cs(si->end);
if (cs && cs->conn)
if (cs_conn(cs))
return conn_src(cs->conn);
}
return NULL;
@ -598,7 +598,7 @@ static inline const struct sockaddr_storage *si_dst(struct stream_interface *si)
else {
struct conn_stream *cs = objt_cs(si->end);
if (cs && cs->conn)
if (cs_conn(cs))
return conn_dst(cs->conn);
}
return NULL;
@ -622,7 +622,7 @@ static inline int si_get_src(struct stream_interface *si)
else {
struct conn_stream *cs = objt_cs(si->end);
if (cs && cs->conn)
if (cs_conn(cs))
src = conn_src(cs->conn);
}
if (!src)
@ -653,7 +653,7 @@ static inline int si_get_dst(struct stream_interface *si)
else {
struct conn_stream *cs = objt_cs(si->end);
if (cs && cs->conn)
if (cs_conn(cs))
dst = conn_dst(cs->conn);
}
if (!dst)

View File

@ -2220,8 +2220,6 @@ void back_handle_st_con(struct stream *s)
void back_handle_st_cer(struct stream *s)
{
struct stream_interface *si = &s->si[1];
struct conn_stream *cs = objt_cs(si->end);
struct connection *conn = cs_conn(cs);
DBG_TRACE_ENTER(STRM_EV_STRM_PROC|STRM_EV_SI_ST, s);
@ -2230,6 +2228,8 @@ void back_handle_st_cer(struct stream *s)
/* we probably have to release last stream from the server */
if (objt_server(s->target)) {
struct connection *conn = cs_conn(objt_cs(si->end));
health_adjust(__objt_server(s->target), HANA_STATUS_L4_ERR);
if (s->flags & SF_CURR_SESS) {

View File

@ -233,7 +233,9 @@ static void check_trace(enum trace_level level, uint64_t mask,
if (check->cs) {
chunk_appendf(&trace_buf, " - conn=%p(0x%08x)", check->cs->conn, check->cs->conn->flags);
struct connection *conn = cs_conn(check->cs);
chunk_appendf(&trace_buf, " - conn=%p(0x%08x)", conn, conn ? conn->flags : 0);
chunk_appendf(&trace_buf, " cs=%p(0x%08x)", check->cs, check->cs->flags);
}
@ -791,7 +793,7 @@ void chk_report_conn_err(struct check *check, int errno_bck, int expired)
retrieve_errno_from_socket(conn);
if (conn && !(conn->flags & CO_FL_ERROR) &&
!(cs->flags & CS_FL_ERROR) && !expired)
cs && !(cs->flags & CS_FL_ERROR) && !expired)
return;
TRACE_ENTER(CHK_EV_HCHK_END|CHK_EV_HCHK_ERR, check, 0, 0, (size_t[]){expired});
@ -904,7 +906,7 @@ void chk_report_conn_err(struct check *check, int errno_bck, int expired)
set_server_check_status(check, HCHK_STATUS_SOCKERR, err_msg);
}
if (!conn || !conn->ctrl) {
if (!cs || !conn || !conn->ctrl) {
/* error before any connection attempt (connection allocation error or no control layer) */
set_server_check_status(check, HCHK_STATUS_SOCKERR, err_msg);
}
@ -1016,7 +1018,7 @@ int httpchk_build_status_header(struct server *s, struct buffer *buf)
*/
static int wake_srv_chk(struct conn_stream *cs)
{
struct connection *conn = cs->conn;
struct connection *conn;
struct check *check = cs->data;
struct email_alertq *q = container_of(check, typeof(*q), check);
int ret = 0;
@ -1031,9 +1033,9 @@ static int wake_srv_chk(struct conn_stream *cs)
ret = tcpcheck_main(check);
cs = check->cs;
conn = cs->conn;
conn = cs_conn(cs);
if (unlikely(conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)) {
if (unlikely(!conn || !cs || conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)) {
/* We may get error reports bypassing the I/O handlers, typically
* the case when sending a pure TCP check which fails, then the I/O
* handlers above are not called. This is completely handled by the
@ -1053,7 +1055,7 @@ static int wake_srv_chk(struct conn_stream *cs)
ret = -1;
if (check->wait_list.events)
cs->conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list);
conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list);
/* We may have been scheduled to run, and the
* I/O handler expects to have a cs, so remove
@ -1171,6 +1173,8 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
TRACE_STATE("health-check complete or aborted", CHK_EV_TASK_WAKE|CHK_EV_HCHK_END, check);
check->current_step = NULL;
cs = check->cs;
conn = cs_conn(cs);
if (conn && conn->xprt) {
/* The check was aborted and the connection was not yet closed.
@ -1182,8 +1186,8 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
}
if (cs) {
if (check->wait_list.events)
cs->conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list);
if (conn && check->wait_list.events)
conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list);
/* We may have been scheduled to run, and the
* I/O handler expects to have a cs, so remove
* the tasklet
@ -1352,7 +1356,10 @@ void free_check(struct check *check)
check_release_buf(check, &check->bi);
check_release_buf(check, &check->bo);
if (check->cs) {
ha_free(&check->cs->conn);
struct connection *conn = cs_conn(check->cs);
if (conn)
conn_free(conn);
cs_free(check->cs);
check->cs = NULL;
}

View File

@ -1325,10 +1325,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
if (unlikely(htx_is_empty(htx) || htx->first == -1)) {
/* 1: have we encountered a read error ? */
if (rep->flags & CF_READ_ERROR) {
struct connection *conn = NULL;
if (objt_cs(s->si[1].end))
conn = __objt_cs(s->si[1].end)->conn;
struct connection *conn = cs_conn(objt_cs(s->si[1].end));
/* Perform a L7 retry because server refuses the early data. */
if ((si_b->flags & SI_FL_L7_RETRY) &&
@ -5007,7 +5004,7 @@ static void http_debug_stline(const char *dir, struct stream *s, const struct ht
chunk_printf(&trash, "%08x:%s.%s[%04x:%04x]: ", s->uniq_id, s->be->id,
dir,
objt_conn(sess->origin) ? (unsigned short)__objt_conn(sess->origin)->handle.fd : -1,
objt_cs(s->si[1].end) ? (unsigned short)__objt_cs(s->si[1].end)->conn->handle.fd : -1);
cs_conn(objt_cs(s->si[1].end)) ? (unsigned short)(cs_conn(__objt_cs(s->si[1].end)))->handle.fd : -1);
max = HTX_SL_P1_LEN(sl);
UBOUND(max, trash.size - trash.data - 3);
@ -5038,7 +5035,7 @@ static void http_debug_hdr(const char *dir, struct stream *s, const struct ist n
chunk_printf(&trash, "%08x:%s.%s[%04x:%04x]: ", s->uniq_id, s->be->id,
dir,
objt_conn(sess->origin) ? (unsigned short)__objt_conn(sess->origin)->handle.fd : -1,
objt_cs(s->si[1].end) ? (unsigned short)__objt_cs(s->si[1].end)->conn->handle.fd : -1);
cs_conn(objt_cs(s->si[1].end)) ? (unsigned short)(cs_conn(__objt_cs(s->si[1].end)))->handle.fd : -1);
max = n.len;
UBOUND(max, trash.size - trash.data - 3);

View File

@ -460,7 +460,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin, struct bu
si_set_state(&s->si[0], SI_ST_EST);
s->si[0].hcto = sess->fe->timeout.clientfin;
if (cs && cs->conn->mux) {
if (cs_conn(cs) && cs->conn->mux) {
if (cs->conn->mux->flags & MX_FL_CLEAN_ABRT)
s->si[0].flags |= SI_FL_CLEAN_ABRT;
if (cs->conn->mux->flags & MX_FL_HTX)
@ -883,8 +883,7 @@ int stream_set_timeout(struct stream *s, enum act_timeout_name name, int timeout
static void back_establish(struct stream *s)
{
struct stream_interface *si = &s->si[1];
struct conn_stream *srv_cs = objt_cs(si->end);
struct connection *conn = srv_cs ? srv_cs->conn : objt_conn(si->end);
struct connection *conn = cs_conn(objt_cs(si->end));
struct channel *req = &s->req;
struct channel *rep = &s->res;
@ -930,7 +929,7 @@ static void back_establish(struct stream *s)
si_rx_endp_more(si);
rep->flags |= CF_READ_ATTACHED; /* producer is now attached */
if (objt_cs(si->end)) {
if (conn) {
/* real connections have timeouts
* if already defined, it means that a set-timeout rule has
* been executed so do not overwrite them
@ -2164,9 +2163,9 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
if (!(req->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
req->to_forward &&
(global.tune.options & GTUNE_USE_SPLICE) &&
(objt_cs(si_f->end) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->rcv_pipe &&
(cs_conn(objt_cs(si_f->end)) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->rcv_pipe &&
__objt_cs(si_f->end)->conn->mux && __objt_cs(si_f->end)->conn->mux->rcv_pipe) &&
(objt_cs(si_b->end) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->snd_pipe &&
(cs_conn(objt_cs(si_b->end)) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->snd_pipe &&
__objt_cs(si_b->end)->conn->mux && __objt_cs(si_b->end)->conn->mux->snd_pipe) &&
(pipes_used < global.maxpipes) &&
(((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_REQ) ||
@ -2357,9 +2356,9 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
if (!(res->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
res->to_forward &&
(global.tune.options & GTUNE_USE_SPLICE) &&
(objt_cs(si_f->end) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->snd_pipe &&
(cs_conn(objt_cs(si_f->end)) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->snd_pipe &&
__objt_cs(si_f->end)->conn->mux && __objt_cs(si_f->end)->conn->mux->snd_pipe) &&
(objt_cs(si_b->end) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->rcv_pipe &&
(cs_conn(objt_cs(si_b->end)) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->rcv_pipe &&
__objt_cs(si_b->end)->conn->mux && __objt_cs(si_b->end)->conn->mux->rcv_pipe) &&
(pipes_used < global.maxpipes) &&
(((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_RTR) ||
@ -2436,18 +2435,18 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
if (si_b->state == SI_ST_CLO &&
si_b->prev_state == SI_ST_EST) {
chunk_printf(&trash, "%08x:%s.srvcls[%04x:%04x]\n",
s->uniq_id, s->be->id,
objt_cs(si_f->end) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
objt_cs(si_b->end) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
s->uniq_id, s->be->id,
cs_conn(objt_cs(si_f->end)) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
cs_conn(objt_cs(si_b->end)) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
DISGUISE(write(1, trash.area, trash.data));
}
if (si_f->state == SI_ST_CLO &&
si_f->prev_state == SI_ST_EST) {
chunk_printf(&trash, "%08x:%s.clicls[%04x:%04x]\n",
s->uniq_id, s->be->id,
objt_cs(si_f->end) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
objt_cs(si_b->end) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
s->uniq_id, s->be->id,
cs_conn(objt_cs(si_f->end)) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
cs_conn(objt_cs(si_b->end)) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
DISGUISE(write(1, trash.area, trash.data));
}
}
@ -2513,9 +2512,9 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
if (unlikely((global.mode & MODE_DEBUG) &&
(!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
chunk_printf(&trash, "%08x:%s.closed[%04x:%04x]\n",
s->uniq_id, s->be->id,
objt_cs(si_f->end) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
objt_cs(si_b->end) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
s->uniq_id, s->be->id,
cs_conn(objt_cs(si_f->end)) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
cs_conn(objt_cs(si_b->end)) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
DISGUISE(write(1, trash.area, trash.data));
}
@ -3291,7 +3290,8 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
TICKS_TO_MS(1000)) : "<NEVER>",
strm->si[1].err_type, strm->si[1].wait_event.events);
if ((cs = objt_cs(strm->si[0].end)) != NULL) {
if (cs_conn(objt_cs(strm->si[0].end)) != NULL) {
cs = __objt_cs(strm->si[0].end);
conn = cs->conn;
chunk_appendf(&trash,
@ -3327,7 +3327,8 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
(unsigned long long)tmpctx->t->cpu_time, (unsigned long long)tmpctx->t->lat_time);
}
if ((cs = objt_cs(strm->si[1].end)) != NULL) {
if (cs_conn(objt_cs(strm->si[1].end)) != NULL) {
cs = __objt_cs(strm->si[1].end);
conn = cs->conn;
chunk_appendf(&trash,

View File

@ -354,12 +354,11 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag)
if (cs && cs->data_cb == &si_conn_cb) {
struct stream_interface *si = cs->data;
struct conn_stream *remote_cs = objt_cs(si_opposite(si)->end);
struct stream *strm = si_strm(si);
ret = make_proxy_line(trash.area, trash.size,
objt_server(conn->target),
remote_cs ? remote_cs->conn : NULL,
cs_conn(objt_cs(si_opposite(si)->end)),
strm);
}
else {
@ -434,7 +433,7 @@ static void stream_int_notify(struct stream_interface *si)
/* process consumer side */
if (channel_is_empty(oc)) {
struct connection *conn = objt_cs(si->end) ? __objt_cs(si->end)->conn : NULL;
struct connection *conn = cs_conn(objt_cs(si->end));
if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) &&
(si->state == SI_ST_EST) && (!conn || !(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS))))
@ -800,7 +799,7 @@ struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned int state)
struct conn_stream *cs = objt_cs(si->end);
int ret = 0;
if (!cs)
if (!cs_conn(cs))
return t;
if (!(si->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si)))
@ -927,7 +926,7 @@ void si_sync_send(struct stream_interface *si)
return;
cs = objt_cs(si->end);
if (!cs || !cs->conn->mux)
if (!cs_conn(cs) || !cs->conn->mux)
return;
si_cs_send(cs);