MINOR: stream: Handle stream HTTP upgrade in a dedicated function

The code responsible to perform an HTTP upgrade from a TCP stream is moved
in a dedicated function, stream_set_http_mode().

The stream_set_backend() function is slightly updated, especially to
correctly set the request analysers.
This commit is contained in:
Christopher Faulet 2021-03-15 10:42:02 +01:00
parent 75f619ad92
commit 6c1fd987f6
3 changed files with 94 additions and 59 deletions

View File

@ -60,6 +60,7 @@ extern struct data_cb sess_conn_cb;
struct stream *stream_new(struct session *sess, enum obj_type *origin, struct buffer *input);
int stream_create_from_cs(struct conn_stream *cs, struct buffer *input);
int stream_upgrade_from_cs(struct conn_stream *cs, struct buffer *input);
int stream_set_http_mode(struct stream *s);
/* kill a stream and set the termination flags to <why> (one of SF_ERR_*) */
void stream_shutdown(struct stream *stream, int why);

View File

@ -2116,6 +2116,8 @@ int resume_proxy(struct proxy *p)
*/
int stream_set_backend(struct stream *s, struct proxy *be)
{
unsigned int req_ana;
if (s->flags & SF_BE_ASSIGNED)
return 1;
@ -2140,69 +2142,41 @@ int stream_set_backend(struct stream *s, struct proxy *be)
* be more reliable to store the list of analysers that have been run,
* but what we do here is OK for now.
*/
s->req.analysers |= be->be_req_ana & ~(strm_li(s) ? strm_li(s)->analysers : 0);
req_ana = be->be_req_ana;
if (!(strm_fe(s)->options & PR_O_WREQ_BODY) && be->options & PR_O_WREQ_BODY) {
/* The backend request to parse a request body while it was not
* performed on the frontend, so add the corresponding analyser
*/
req_ana |= AN_REQ_HTTP_BODY;
}
if (IS_HTX_STRM(s) && strm_fe(s)->mode != PR_MODE_HTTP) {
/* The stream was already upgraded to HTTP, so remove analysers
* set during the upgrade
*/
req_ana &= ~(AN_REQ_WAIT_HTTP|AN_REQ_HTTP_PROCESS_FE);
}
s->req.analysers |= req_ana & ~(strm_li(s) ? strm_li(s)->analysers : 0);
/* If the target backend requires HTTP processing, we have to allocate
* the HTTP transaction if we did not have one.
*/
if (unlikely(!s->txn && be->http_needed && !http_create_txn(s)))
return 0;
if (s->txn) {
if (!IS_HTX_STRM(s) && be->mode == PR_MODE_HTTP) {
/* If we chain a TCP frontend to an HTX backend, we must upgrade
* the client mux */
if (!IS_HTX_STRM(s) && be->mode == PR_MODE_HTTP) {
struct connection *conn = objt_conn(strm_sess(s)->origin);
struct conn_stream *cs = objt_cs(s->si[0].end);
if (conn && cs) {
si_rx_endp_more(&s->si[0]);
/* Make sure we're unsubscribed, the the new
* mux will probably want to subscribe to
* the underlying XPRT
*/
if (s->si[0].wait_event.events)
conn->mux->unsubscribe(cs, s->si[0].wait_event.events,
&s->si[0].wait_event);
if (conn->mux->flags & MX_FL_NO_UPG)
return 0;
if (conn_upgrade_mux_fe(conn, cs, &s->req.buf, ist(""), PROTO_MODE_HTTP) == -1)
return 0;
s->req.flags &= ~(CF_READ_PARTIAL|CF_AUTO_CONNECT);
s->req.total = 0;
s->flags |= SF_IGNORE;
if (strcmp(conn->mux->name, "H2") == 0) {
/* For HTTP/2, destroy the conn_stream,
* disable logging, and abort the stream
* process. Thus it will be silently
* destroyed. The new mux will create
* new streams.
*/
cs_free(cs);
si_detach_endpoint(&s->si[0]);
s->logs.logwait = 0;
s->logs.level = 0;
channel_abort(&s->req);
channel_abort(&s->res);
s->req.analysers &= AN_REQ_FLT_END;
s->req.analyse_exp = TICK_ETERNITY;
return 1;
}
}
}
else if (IS_HTX_STRM(s) && be->mode != PR_MODE_HTTP) {
/* If a TCP backend is assgiend to an HTX stream, return
* an error. It may happens for a new stream on a
* previously upgraded connections. */
if (!(s->flags & SF_ERR_MASK))
s->flags |= SF_ERR_INTERNAL;
if (!stream_set_http_mode(s))
return 0;
}
else if (IS_HTX_STRM(s) && be->mode != PR_MODE_HTTP) {
/* If a TCP backend is assgiend to an HTX stream, return an
* error. It may happens for a new stream on a previously
* upgraded connections. */
if (!(s->flags & SF_ERR_MASK))
s->flags |= SF_ERR_INTERNAL;
return 0;
}
else {
/* If the target backend requires HTTP processing, we have to allocate
* the HTTP transaction if we did not have one.
*/
if (unlikely(!s->txn && be->http_needed && !http_create_txn(s)))
return 0;
}
/* we may request to parse a request body */
if (be->options & PR_O_WREQ_BODY)
s->req.analysers |= AN_REQ_HTTP_BODY;
}
s->flags |= SF_BE_ASSIGNED;

View File

@ -1478,6 +1478,66 @@ static int process_store_rules(struct stream *s, struct channel *rep, int an_bit
return 1;
}
/* Set the stream to HTTP mode, if necessary. The minimal request HTTP analysers
* are set and the client mux is upgraded. It returns 1 if the stream processing
* may continue or 0 if it should be stopped. It happens on error or if the
* upgrade required a new stream.
*/
int stream_set_http_mode(struct stream *s)
{
struct connection *conn;
struct conn_stream *cs;
/* Already an HTTP stream */
if (IS_HTX_STRM(s))
return 1;
s->req.analysers |= AN_REQ_WAIT_HTTP|AN_REQ_HTTP_PROCESS_FE;
if (unlikely(!s->txn && !http_create_txn(s)))
return 0;
conn = objt_conn(strm_sess(s)->origin);
cs = objt_cs(s->si[0].end);
if (conn && cs) {
si_rx_endp_more(&s->si[0]);
/* Make sure we're unsubscribed, the the new
* mux will probably want to subscribe to
* the underlying XPRT
*/
if (s->si[0].wait_event.events)
conn->mux->unsubscribe(cs, s->si[0].wait_event.events,
&s->si[0].wait_event);
if (conn->mux->flags & MX_FL_NO_UPG)
return 0;
if (conn_upgrade_mux_fe(conn, cs, &s->req.buf, ist(""), PROTO_MODE_HTTP) == -1)
return 0;
s->req.flags &= ~(CF_READ_PARTIAL|CF_AUTO_CONNECT);
s->req.total = 0;
s->flags |= SF_IGNORE;
if (strcmp(conn->mux->name, "H2") == 0) {
/* For HTTP/2, destroy the conn_stream, disable logging,
* and abort the stream process. Thus it will be
* silently destroyed. The new mux will create new
* streams.
*/
cs_free(cs);
si_detach_endpoint(&s->si[0]);
s->logs.logwait = 0;
s->logs.level = 0;
channel_abort(&s->req);
channel_abort(&s->res);
s->req.analysers &= AN_REQ_FLT_END;
s->req.analyse_exp = TICK_ETERNITY;
}
}
return 1;
}
/* This macro is very specific to the function below. See the comments in
* process_stream() below to understand the logic and the tests.
*/