MAJOR: mux-h1: Drain requests on client side before shut a stream down

unlike for H2 and H3, there is no mechanism in H1 to notify the client it
must stop to upload data when a response is replied before the end of the
request without closing the connection. There is no RST_STREAM frame
equivalent.

Thus, there is only two ways to deal with this situation: closing the
connection or draining the request. Until now, HAProxy didn't support
draining H1 messages. Closing the connection in this case has however a
major drawback. It leads to send a TCP reset, dropping this way all in-fly
data. There is no warranty the client has fully received the response.

Draining H1 messages was never implemented because in old versions it was a
bit tricky to implement. However, it is now far simplier to support this
feature because it is possible to have a H1 stream without any applicative
stream. It is the purpose of this patch. Now, when a shutdown is requested
and the stream is detached from the connection, if the request is unfinished
while the response was fully sent, the request in drained.

To do so, in this case the shutdown and the detach are delayed. From the
upper layer point of view, there is no changes. The endpoint is shut down
and detached as usual. But on H1 mux point of view, the H1 stream is still
alive and is being able to drain data. However the stream-endpoint
descriptor is orphan. Once the request is fully received (and drained), the
connection is shut down if it cannot be reused for a new transaction and the
H1 stream is destroyed.
This commit is contained in:
Christopher Faulet 2024-02-26 07:50:23 +01:00
parent 14db433db9
commit 077906da14
2 changed files with 69 additions and 6 deletions

View File

@ -134,6 +134,7 @@ enum h1_cs {
H1_CS_EMBRYONIC, /* Connection is waiting for the message headers (H1S is not NULL, not attached to a SC - Frontend connection only) */
H1_CS_UPGRADING, /* TCP>H1 upgrade in-progress (H1S is not NULL and attached to a SC - Frontend connection only) */
H1_CS_RUNNING, /* Connection fully established and the H1S is processing data (H1S is not NULL and attached to a SC) */
H1_CS_DRAINING, /* H1C is draining the message before destroying the H1S (H1S is not NULL but no SC attached) */
H1_CS_CLOSING, /* Send pending outgoing data and close the connection ASAP (H1S may be NULL) */
H1_CS_CLOSED, /* Connection must be closed now and H1C must be released (H1S is NULL) */
H1_CS_ENTRIES,
@ -150,6 +151,7 @@ static inline const char *h1c_st_to_str(enum h1_cs st)
case H1_CS_EMBRYONIC: return "EMB";
case H1_CS_UPGRADING: return "UPG";
case H1_CS_RUNNING: return "RUN";
case H1_CS_DRAINING: return "DRN";
case H1_CS_CLOSING: return "CLI";
case H1_CS_CLOSED: return "CLD";
default: return "???";

View File

@ -556,11 +556,11 @@ static inline void h1_release_buf(struct h1c *h1c, struct buffer *bptr)
}
/* Returns 1 if the H1 connection is alive (IDLE, EMBRYONIC, RUNNING or
* RUNNING). Ortherwise 0 is returned.
* DRAINING). Ortherwise 0 is returned.
*/
static inline int h1_is_alive(const struct h1c *h1c)
{
return (h1c->state <= H1_CS_RUNNING);
return (h1c->state <= H1_CS_DRAINING);
}
/* Switch the H1 connection to CLOSING or CLOSED mode, depending on the output
@ -952,6 +952,10 @@ static int h1s_must_shut_conn(struct h1s *h1s)
TRACE_STATE("keep connection alive (UPGRADING)", H1_EV_STRM_SHUT, h1c->conn, h1s);
ret = 0;
}
else if (!(h1c->flags & H1C_F_IS_BACK) && h1s->req.state != H1_MSG_DONE && h1s->res.state == H1_MSG_DONE) {
TRACE_STATE("defer shutdown to drain request first", H1_EV_STRM_SHUT, h1c->conn, h1s);
ret = 0;
}
else if (((h1s->flags & H1S_F_WANT_KAL) && h1s->req.state == H1_MSG_DONE && h1s->res.state == H1_MSG_DONE)) {
TRACE_STATE("keep connection alive (want_kal)", H1_EV_STRM_SHUT, h1c->conn, h1s);
ret = 0;
@ -3485,6 +3489,11 @@ static int h1_handle_internal_err(struct h1c *h1c)
struct session *sess = h1c->conn->owner;
int ret = 0;
if (h1c->state == H1_CS_DRAINING) {
h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED;
h1s_destroy(h1c->h1s);
goto end;
}
session_inc_http_req_ctr(sess);
proxy_inc_fe_req_ctr(sess->listener, sess->fe, 1);
_HA_ATOMIC_INC(&sess->fe->fe_counters.p.http.rsp[5]);
@ -3495,6 +3504,7 @@ static int h1_handle_internal_err(struct h1c *h1c)
h1c->errcode = 500;
ret = h1_send_error(h1c);
sess_log(sess);
end:
return ret;
}
@ -3508,6 +3518,11 @@ static int h1_handle_parsing_error(struct h1c *h1c)
struct session *sess = h1c->conn->owner;
int ret = 0;
if (h1c->state == H1_CS_DRAINING) {
h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED;
h1s_destroy(h1c->h1s);
goto end;
}
if (!b_data(&h1c->ibuf) && ((h1c->flags & H1C_F_WAIT_NEXT_REQ) || (sess->fe->options & PR_O_IGNORE_PRB))) {
h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED;
h1_close(h1c);
@ -3541,6 +3556,11 @@ static int h1_handle_not_impl_err(struct h1c *h1c)
struct session *sess = h1c->conn->owner;
int ret = 0;
if (h1c->state == H1_CS_DRAINING) {
h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED;
h1s_destroy(h1c->h1s);
goto end;
}
if (!b_data(&h1c->ibuf) && ((h1c->flags & H1C_F_WAIT_NEXT_REQ) || (sess->fe->options & PR_O_IGNORE_PRB))) {
h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED;
h1_close(h1c);
@ -3571,6 +3591,11 @@ static int h1_handle_req_tout(struct h1c *h1c)
struct session *sess = h1c->conn->owner;
int ret = 0;
if (h1c->state == H1_CS_DRAINING) {
h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED;
h1s_destroy(h1c->h1s);
goto end;
}
if (!b_data(&h1c->ibuf) && ((h1c->flags & H1C_F_WAIT_NEXT_REQ) || (sess->fe->options & PR_O_IGNORE_PRB))) {
h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ABRTED;
h1_close(h1c);
@ -3788,7 +3813,7 @@ static int h1_process(struct h1c * h1c)
/* Try to parse now the first block of a request, creating the H1 stream if necessary */
if (b_data(&h1c->ibuf) && /* Input data to be processed */
(h1c->state < H1_CS_RUNNING) && /* IDLE, EMBRYONIC or UPGRADING */
((h1c->state < H1_CS_RUNNING) || (h1c->state == H1_CS_DRAINING)) && /* IDLE, EMBRYONIC, UPGRADING or DRAINING */
!(h1c->flags & (H1C_F_IN_SALLOC|H1C_F_ABRT_PENDING))) { /* No allocation failure on the stream rxbuf and no ERROR on the H1C */
struct h1s *h1s = h1c->h1s;
struct buffer *buf;
@ -3799,7 +3824,8 @@ static int h1_process(struct h1c * h1c)
goto release;
/* First of all handle H1 to H2 upgrade (no need to create the H1 stream) */
if (!(h1c->flags & H1C_F_WAIT_NEXT_REQ) && /* First request */
if (h1c->state != H1_CS_DRAINING && /* Not draining message */
!(h1c->flags & H1C_F_WAIT_NEXT_REQ) && /* First request */
!(h1c->px->options2 & PR_O2_NO_H2_UPGRADE) && /* H2 upgrade supported by the proxy */
!(conn->mux->flags & MX_FL_NO_UPG)) { /* the current mux supports upgrades */
/* Try to match H2 preface before parsing the request headers. */
@ -3840,7 +3866,7 @@ static int h1_process(struct h1c * h1c)
h1_process_demux(h1c, buf, count);
h1_release_buf(h1c, &h1s->rxbuf);
h1_set_idle_expiration(h1c);
if (h1c->state < H1_CS_RUNNING) {
if (h1c->state != H1_CS_RUNNING) { // TODO: be sure state cannot change in h1_process_demux
if (h1s->flags & H1S_F_INTERNAL_ERROR) {
h1_handle_internal_err(h1c);
TRACE_ERROR("internal error detected", H1_EV_H1C_WAKE|H1_EV_H1C_ERR);
@ -3883,6 +3909,11 @@ static int h1_process(struct h1c * h1c)
if (h1_send_error(h1c))
h1_send(h1c);
}
else if (h1c->state == H1_CS_DRAINING) {
BUG_ON(h1c->h1s->sd && !se_fl_test(h1c->h1s->sd, SE_FL_ORPHAN));
h1s_destroy(h1c->h1s);
TRACE_STATE("abort/error when draining message. destroy h1s and close h1c", H1_EV_H1S_END, h1c->conn);
}
else {
h1_close(h1c);
TRACE_STATE("close h1c", H1_EV_H1S_END, h1c->conn);
@ -3911,6 +3942,17 @@ static int h1_process(struct h1c * h1c)
h1_alert(h1s);
}
}
else if (h1c->state == H1_CS_DRAINING) {
BUG_ON(!h1c->h1s);
if (se_fl_test(h1c->h1s->sd, SE_FL_EOI)) {
if (h1s_must_shut_conn(h1c->h1s)) {
h1_shutw_conn(conn);
goto release;
}
h1s_finish_detach(h1c->h1s);
goto end;
}
}
if (!b_data(&h1c->ibuf))
h1_release_buf(h1c, &h1c->ibuf);
@ -4218,6 +4260,7 @@ static void h1_destroy(void *ctx)
static void h1_detach(struct sedesc *sd)
{
struct h1s *h1s = sd->se;
struct h1c *h1c;
TRACE_ENTER(H1_EV_STRM_END, h1s ? h1s->h1c->conn : NULL, h1s);
@ -4225,7 +4268,25 @@ static void h1_detach(struct sedesc *sd)
TRACE_LEAVE(H1_EV_STRM_END);
return;
}
h1s_finish_detach(h1s);
h1c = h1s->h1c;
if (h1c->state == H1_CS_RUNNING && !(h1c->flags & H1C_F_IS_BACK) && h1s->req.state != H1_MSG_DONE) {
h1c->state = H1_CS_DRAINING;
TRACE_DEVEL("Deferring H1S destroy to drain message", H1_EV_STRM_END, h1s->h1c->conn, h1s);
/* If we have a pending data, process it immediately or
* subscribe for reads waiting for new data
*/
if (unlikely(b_data(&h1c->ibuf))) {
if (h1_process(h1c) == -1)
goto end;
}
else
h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
h1_set_idle_expiration(h1c);
h1_refresh_timeout(h1c);
}
else
h1s_finish_detach(h1s);
end:
TRACE_LEAVE(H1_EV_STRM_END);