MAJOR: mux-h1: Create the client stream as later as possible

This is the reason for all previous patches. The conn-stream and the
associated stream are created as later as possible. It only concerns the
frontend connections. But it means the request headers, and possibly the
first data block, are received and parsed before the conn-stream
creation. To do so, an embryonic H1 stream, with no conn-stream, is
created. The result of this "early parsing" is stored in its rx buffer, used
to fill the request channel when the stream is created. During this step,
some HTTP errors may be returned by the mux. It must also handle
http-request/keep-alive timeouts. A significative change is about H1 to H2
upgrade. It happens very early now, and no H1 stream are created (and thus
of course no conn-stream).

The most important part of this patch is located to the h1_process()
function. Because it must trigger the parsing when there is no H1
stream. h1_recv() function has also been simplified.
This commit is contained in:
Christopher Faulet 2020-10-06 17:45:34 +02:00
parent c18fc234d9
commit c4bfa59f1d

View File

@ -497,15 +497,8 @@ static void h1_refresh_timeout(struct h1c *h1c)
TRACE_DEVEL("refreshing connection's timeout (pending outgoing data)", H1_EV_H1C_SEND|H1_EV_H1C_RECV, h1c->conn);
}
else if (!(h1c->flags & H1C_F_IS_BACK) && (h1c->flags & (H1C_F_CS_IDLE|H1C_F_CS_EMBRYONIC))) {
/* front connections waiting for a stream need a timeout. client timeout by
* default but http-keep-alive if defined
*/
int timeout = h1c->timeout;
if (h1c->flags & H1C_F_WAIT_NEXT_REQ)
timeout = tick_first(timeout, h1c->px->timeout.httpka);
h1c->task->expire = tick_add(now_ms, timeout);
/* front connections waiting for a stream need a timeout. */
h1c->task->expire = tick_add(now_ms, h1c->timeout);
TRACE_DEVEL("refreshing connection's timeout (alive front h1c without a CS)", H1_EV_H1C_SEND|H1_EV_H1C_RECV, h1c->conn);
}
else {
@ -521,7 +514,7 @@ static void h1_refresh_timeout(struct h1c *h1c)
}
}
static __maybe_unused void h1_set_idle_expiration(struct h1c *h1c)
static void h1_set_idle_expiration(struct h1c *h1c)
{
if (h1c->flags & H1C_F_IS_BACK || !h1c->task) {
TRACE_DEVEL("no idle expiration (backend connection || no task)", H1_EV_H1C_RECV, h1c->conn);
@ -661,16 +654,12 @@ static struct h1s *h1c_frt_stream_new(struct h1c *h1c)
if (h1c->px->options2 & PR_O2_REQBUG_OK)
h1s->req.err_pos = -1;
if (!h1s_new_cs(h1s, &BUF_NULL))
goto fail_cs;
h1c->idle_exp = TICK_ETERNITY;
h1_set_idle_expiration(h1c);
TRACE_LEAVE(H1_EV_H1S_NEW, h1c->conn, h1s);
return h1s;
fail_cs:
pool_free(pool_head_h1s, h1s);
fail:
sess_log(sess);
TRACE_DEVEL("leaving in error", H1_EV_H1S_NEW|H1_EV_H1S_END|H1_EV_H1S_ERR, h1c->conn);
return NULL;
}
@ -795,26 +784,26 @@ static int h1_init(struct connection *conn, struct proxy *proxy, struct session
h1c->task = t;
t->process = h1_timeout_task;
t->context = h1c;
t->expire = tick_add(now_ms, h1c->timeout);
}
conn->ctx = h1c;
/* Always Create a new H1S */
if (!(h1c->flags & H1C_F_IS_BACK)) {
if (!h1c_frt_stream_new(h1c))
goto fail;
}
else {
if (h1c->flags & H1C_F_IS_BACK) {
/* Create a new H1S now for backend connection only */
if (!h1c_bck_stream_new(h1c, conn_ctx, sess))
goto fail;
}
if (t)
if (t) {
h1_set_idle_expiration(h1c);
t->expire = tick_first(t->expire, h1c->idle_exp);
task_queue(t);
}
/* Try to read, if nothing is available yet we'll just subscribe */
if (!h1_recv_allowed(h1c))
/* prepare to read something */
if (h1_recv_allowed(h1c))
h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
/* mux->wake will be called soon to complete the operation */
@ -850,7 +839,6 @@ static void h1_release(struct h1c *h1c)
if (conn && h1c->flags & H1C_F_UPG_H2C) {
TRACE_DEVEL("upgrading H1 to H2", H1_EV_H1C_END, conn);
h1c->flags &= ~H1C_F_UPG_H2C;
/* Make sure we're no longer subscribed to anything */
if (h1c->wait_event.events)
conn->xprt->unsubscribe(conn, conn->xprt_ctx,
@ -1304,21 +1292,10 @@ static size_t h1_process_headers(struct h1s *h1s, struct h1m *h1m, struct htx *h
TRACE_ENTER(H1_EV_RX_DATA|H1_EV_RX_HDRS, h1s->h1c->conn, h1s, 0, (size_t[]){max});
if (!(h1s->h1c->px->options2 & PR_O2_NO_H2_UPGRADE) && /* H2 upgrade supported by the proxy */
!(h1s->flags & H1S_F_NOT_FIRST) && /* It is the first transaction */
!(h1m->flags & H1_MF_RESP)) { /* It is a request */
/* Try to match H2 preface before parsing the request headers. */
ret = b_isteq(buf, 0, b_data(buf), ist(H2_CONN_PREFACE));
if (ret > 0) {
goto h2c_upgrade;
}
}
else {
if (h1s->meth == HTTP_METH_CONNECT)
h1m->flags |= H1_MF_METH_CONNECT;
if (h1s->meth == HTTP_METH_HEAD)
h1m->flags |= H1_MF_METH_HEAD;
}
if (h1s->meth == HTTP_METH_CONNECT)
h1m->flags |= H1_MF_METH_CONNECT;
if (h1s->meth == HTTP_METH_HEAD)
h1m->flags |= H1_MF_METH_HEAD;
ret = h1_parse_msg_hdrs(h1m, &h1sl, htx, buf, *ofs, max);
if (!ret) {
@ -1356,13 +1333,6 @@ static size_t h1_process_headers(struct h1s *h1s, struct h1m *h1m, struct htx *h
end:
TRACE_LEAVE(H1_EV_RX_DATA|H1_EV_RX_HDRS, h1s->h1c->conn, h1s, 0, (size_t[]){ret});
return ret;
h2c_upgrade:
h1s->h1c->flags |= H1C_F_UPG_H2C;
h1s->flags |= H1S_F_PARSING_DONE;
htx->flags |= HTX_FL_UPGRADE;
TRACE_DEVEL("leaving on H2 update", H1_EV_RX_DATA|H1_EV_RX_HDRS|H1_EV_RX_EOI, h1s->h1c->conn, h1s);
return 0;
}
/*
@ -1462,15 +1432,16 @@ static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, size_t count
struct h1s *h1s = h1c->h1s;
struct h1m *h1m;
struct htx *htx;
size_t ret, data;
size_t data;
size_t ret = 0;
size_t total = 0;
htx = htx_from_buf(buf);
TRACE_ENTER(H1_EV_RX_DATA, h1c->conn, h1s, htx, (size_t[]){count});
h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->req : &h1s->res);
data = htx->data;
if (h1s->flags & H1S_F_PARSING_ERROR)
goto end;
@ -1549,20 +1520,38 @@ static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, size_t count
if (h1s->flags & H1S_F_PARSING_ERROR) {
TRACE_PROTO("parsing error", H1_EV_RX_DATA, h1c->conn, h1s);
goto parsing_err;
goto err;
}
b_del(&h1c->ibuf, total);
end:
htx_to_buf(htx, buf);
TRACE_DEVEL("incoming data parsed", H1_EV_RX_DATA, h1c->conn, h1s, htx, (size_t[]){ret});
ret = htx->data - data;
if ((h1c->flags & H1C_F_IN_FULL) && buf_room_for_htx_data(&h1c->ibuf)) {
h1c->flags &= ~H1C_F_IN_FULL;
TRACE_STATE("h1c ibuf not full anymore", H1_EV_RX_DATA|H1_EV_H1C_BLK|H1_EV_H1C_WAKE);
TRACE_STATE("h1c ibuf not full anymore", H1_EV_RX_DATA|H1_EV_H1C_BLK|H1_EV_H1C_WAKE, h1c->conn, h1s);
h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
}
if (!b_data(&h1c->ibuf))
h1_release_buf(h1c, &h1c->ibuf);
if (!h1s->cs) {
if (h1m->state <= H1_MSG_LAST_LF) {
TRACE_STATE("Incomplete message, subscribing", H1_EV_RX_DATA|H1_EV_H1C_BLK|H1_EV_H1C_WAKE, h1c->conn, h1s);
h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
goto end;
}
if (!h1s_new_cs(h1s, buf)) {
h1c->flags |= H1C_F_CS_ERROR;
goto err;
}
}
/* Here h1s->cs is always defined */
if (!(h1m->flags & H1_MF_CHNK) &&
((h1m->state == H1_MSG_DATA && h1m->curr_len) || (h1m->state == H1_MSG_TUNNEL))) {
TRACE_STATE("notify the mux can use splicing", H1_EV_RX_DATA|H1_EV_RX_BODY, h1c->conn, h1s);
@ -1576,27 +1565,28 @@ static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, size_t count
if (h1s->flags & H1S_F_PARSING_DONE)
h1s->cs->flags |= CS_FL_EOI;
h1s->cs->flags &= ~(CS_FL_RCV_MORE | CS_FL_WANT_ROOM);
if (!b_data(&h1c->ibuf))
h1_release_buf(h1c, &h1c->ibuf);
if (h1s_data_pending(h1s) && !htx_is_empty(htx))
h1s->cs->flags |= CS_FL_RCV_MORE | CS_FL_WANT_ROOM;
else if (h1s->flags & H1S_F_REOS) {
h1s->cs->flags |= CS_FL_EOS;
if (h1m->state == H1_MSG_TUNNEL)
h1s->cs->flags |= CS_FL_EOI;
else if (h1m->state > H1_MSG_LAST_LF && h1m->state < H1_MSG_DONE)
h1s->cs->flags |= CS_FL_ERROR;
else {
h1s->cs->flags &= ~(CS_FL_RCV_MORE | CS_FL_WANT_ROOM);
if (h1s->flags & H1S_F_REOS) {
h1s->cs->flags |= CS_FL_EOS;
if (h1m->state == H1_MSG_TUNNEL)
h1s->cs->flags |= CS_FL_EOI;
else if (h1m->state > H1_MSG_LAST_LF && h1m->state < H1_MSG_DONE)
h1s->cs->flags |= CS_FL_ERROR;
}
}
end:
TRACE_LEAVE(H1_EV_RX_DATA, h1c->conn, h1s, htx, (size_t[]){ret});
return ret;
parsing_err:
err:
b_reset(&h1c->ibuf);
htx_to_buf(htx, buf);
h1s->cs->flags |= CS_FL_EOI;
if (h1s->cs)
h1s->cs->flags |= CS_FL_EOI;
TRACE_DEVEL("leaving on error", H1_EV_RX_DATA|H1_EV_STRM_ERR, h1c->conn, h1s);
return 0;
}
@ -1624,6 +1614,9 @@ static size_t h1_process_output(struct h1c *h1c, struct buffer *buf, size_t coun
if (htx_is_empty(chn_htx))
goto end;
if (h1s->flags & H1S_F_PROCESSING_ERROR)
goto end;
if (!h1_get_buf(h1c, &h1c->obuf)) {
h1c->flags |= H1C_F_OUT_ALLOC;
TRACE_STATE("waiting for h1c obuf allocation", H1_EV_TX_DATA|H1_EV_H1S_BLK, h1c->conn, h1s);
@ -1632,9 +1625,6 @@ static size_t h1_process_output(struct h1c *h1c, struct buffer *buf, size_t coun
h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req);
if (h1s->flags & H1S_F_PROCESSING_ERROR)
goto end;
/* the htx is non-empty thus has at least one block */
blk = htx_get_head_blk(chn_htx);
@ -2000,7 +1990,6 @@ static size_t h1_process_output(struct h1c *h1c, struct buffer *buf, size_t coun
/* Unexpected error during output processing */
chn_htx->flags |= HTX_FL_PROCESSING_ERROR;
h1s->flags |= H1S_F_PROCESSING_ERROR;
h1c->flags |= H1C_F_CS_ERROR;
TRACE_STATE("processing error, set error on h1c/h1s", H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
TRACE_DEVEL("unexpected error", H1_EV_TX_DATA|H1_EV_STRM_ERR, h1c->conn, h1s);
break;
@ -2071,7 +2060,7 @@ static void h1_wake_stream_for_send(struct h1s *h1s)
* retryable errors (allocation error or buffer full). On success, the error is
* copied in the output buffer.
*/
static __maybe_unused int h1_send_error(struct h1c *h1c)
static int h1_send_error(struct h1c *h1c)
{
int rc = http_get_status_idx(h1c->errcode);
int ret = 0;
@ -2120,7 +2109,7 @@ static __maybe_unused int h1_send_error(struct h1c *h1c)
/* Try to send a 500 internal error. It relies on h1_send_error to send the
* error. This function takes care of incrementing stats and tracked counters.
*/
static __maybe_unused int h1_handle_internal_err(struct h1c *h1c)
static int h1_handle_internal_err(struct h1c *h1c)
{
struct session *sess = h1c->conn->owner;
int ret = 1;
@ -2142,7 +2131,7 @@ static __maybe_unused int h1_handle_internal_err(struct h1c *h1c)
/* Try to send a 400 bad request error. It relies on h1_send_error to send the
* error. This function takes care of incrementing stats and tracked counters.
*/
static __maybe_unused int h1_handle_bad_req(struct h1c *h1c)
static int h1_handle_bad_req(struct h1c *h1c)
{
struct session *sess = h1c->conn->owner;
int ret = 1;
@ -2169,7 +2158,7 @@ static __maybe_unused int h1_handle_bad_req(struct h1c *h1c)
/* Try to send a 408 timeout error. It relies on h1_send_error to send the
* error. This function takes care of incrementing stats and tracked counters.
*/
static __maybe_unused int h1_handle_req_tout(struct h1c *h1c)
static int h1_handle_req_tout(struct h1c *h1c)
{
struct session *sess = h1c->conn->owner;
int ret = 1;
@ -2199,9 +2188,7 @@ static __maybe_unused int h1_handle_req_tout(struct h1c *h1c)
static int h1_recv(struct h1c *h1c)
{
struct connection *conn = h1c->conn;
struct h1s *h1s = h1c->h1s;
size_t ret = 0, max;
int rcvd = 0;
int flags = 0;
TRACE_ENTER(H1_EV_H1C_RECV, h1c->conn);
@ -2213,14 +2200,13 @@ static int h1_recv(struct h1c *h1c)
if ((h1c->flags & H1C_F_WANT_SPLICE) || !h1_recv_allowed(h1c)) {
TRACE_DEVEL("leaving on (want_splice|!recv_allowed)", H1_EV_H1C_RECV, h1c->conn);
rcvd = 1;
goto end;
return 1;
}
if (!h1_get_buf(h1c, &h1c->ibuf)) {
h1c->flags |= H1C_F_IN_ALLOC;
TRACE_STATE("waiting for h1c ibuf allocation", H1_EV_H1C_RECV|H1_EV_H1C_BLK, h1c->conn);
goto end;
return 0;
}
/*
@ -2231,8 +2217,9 @@ static int h1_recv(struct h1c *h1c)
b_slow_realign(&h1c->ibuf, trash.area, 0);
/* avoid useless reads after first responses */
if (h1s && ((!(h1c->flags & H1C_F_IS_BACK) && h1s->req.state == H1_MSG_RQBEFORE) ||
((h1c->flags & H1C_F_IS_BACK) && h1s->res.state == H1_MSG_RPBEFORE)))
if (!h1c->h1s ||
(!(h1c->flags & H1C_F_IS_BACK) && h1c->h1s->req.state == H1_MSG_RQBEFORE) ||
((h1c->flags & H1C_F_IS_BACK) && h1c->h1s->res.state == H1_MSG_RPBEFORE))
flags |= CO_RFL_READ_ONCE;
max = buf_room_for_htx_data(&h1c->ibuf);
@ -2251,29 +2238,13 @@ static int h1_recv(struct h1c *h1c)
}
ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, &h1c->ibuf, max, flags);
}
if (ret > 0) {
if (max && !ret && h1_recv_allowed(h1c)) {
TRACE_STATE("failed to receive data, subscribing", H1_EV_H1C_RECV, h1c->conn);
conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
}
else {
h1_wake_stream_for_recv(h1c->h1s);
TRACE_DATA("data received", H1_EV_H1C_RECV, h1c->conn, 0, 0, (size_t[]){ret});
rcvd = 1;
if (h1c->flags & H1C_F_CS_ATTACHED)
h1s->cs->flags |= (CS_FL_READ_PARTIAL|CS_FL_RCV_MORE);
}
if (ret > 0 || !h1_recv_allowed(h1c) || !buf_room_for_htx_data(&h1c->ibuf)) {
rcvd = 1;
goto end;
}
TRACE_STATE("failed to receive data, subscribing", H1_EV_H1C_RECV, h1c->conn);
conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
end:
if (ret > 0 || (conn->flags & CO_FL_ERROR) || conn_xprt_read0_pending(conn))
h1_wake_stream_for_recv(h1s);
if (conn_xprt_read0_pending(conn) && h1s) {
h1s->flags |= H1S_F_REOS;
TRACE_STATE("read0 on connection", H1_EV_H1C_RECV, conn, h1s);
rcvd = 1;
}
if (!b_data(&h1c->ibuf))
@ -2284,7 +2255,7 @@ static int h1_recv(struct h1c *h1c)
}
TRACE_LEAVE(H1_EV_H1C_RECV, h1c->conn);
return rcvd;
return !!ret || (conn->flags & CO_FL_ERROR) || conn_xprt_read0_pending(conn);
}
@ -2302,7 +2273,8 @@ static int h1_send(struct h1c *h1c)
if (conn->flags & CO_FL_ERROR) {
TRACE_DEVEL("leaving on connection error", H1_EV_H1C_SEND, h1c->conn);
return 0;
b_reset(&h1c->obuf);
return 1;
}
if (!b_data(&h1c->obuf))
@ -2352,7 +2324,6 @@ static int h1_send(struct h1c *h1c)
return sent;
}
/* callback called on any event by the connection handler.
* It applies changes and returns zero, or < 0 if it wants immediate
* destruction of the connection.
@ -2364,45 +2335,117 @@ static int h1_process(struct h1c * h1c)
TRACE_ENTER(H1_EV_H1C_WAKE, conn);
if (!conn->ctx)
return -1;
/* Try to parse now the first block of a request, creating the H1 stream if necessary */
if (b_data(&h1c->ibuf) && /* Input data to be processed */
(h1c->flags & (H1C_F_CS_IDLE|H1C_F_CS_EMBRYONIC)) && /* IDLE h1 connection or no CS attached to the h1 stream */
!(h1c->flags & H1C_F_IN_SALLOC)) { /* No allocation failure on the stream rxbuf */
struct buffer *buf;
size_t count;
if (!h1s) {
if ((h1c->flags & H1C_F_CS_ERROR) ||
((h1c->flags & H1C_F_CS_SHUTDOWN) && !b_data(&h1c->obuf)) ||
conn->flags & (CO_FL_ERROR|CO_FL_SOCK_RD_SH|CO_FL_SOCK_WR_SH))
/* When it happens for a backend connection, we may release it (it is probably a 408) */
if (h1c->flags & H1C_F_IS_BACK)
goto release;
if (!(h1c->flags & H1C_F_IS_BACK) && (h1c->flags & H1C_F_CS_IDLE)) {
TRACE_STATE("K/A incoming connection, create new H1 stream", H1_EV_H1C_WAKE, conn);
if (!h1c_frt_stream_new(h1c))
/* First of all handle H1 to H2 upgrade (no need to create the H1 stream) */
if (((h1c->flags & (H1C_F_CS_IDLE|H1C_F_WAIT_NEXT_REQ)) == H1C_F_CS_IDLE) && /* First request with no h1s */
!(h1c->px->options2 & PR_O2_NO_H2_UPGRADE)) { /* H2 upgrade supported by the proxy */
/* Try to match H2 preface before parsing the request headers. */
if (b_isteq(&h1c->ibuf, 0, b_data(&h1c->ibuf), ist(H2_CONN_PREFACE)) > 0) {
h1c->flags |= H1C_F_UPG_H2C;
TRACE_STATE("release h1c to perform H2 upgrade ", H1_EV_RX_DATA|H1_EV_H1C_WAKE);
goto release;
}
}
/* Create the H1 stream if not already there */
if (!h1s) {
h1s = h1c_frt_stream_new(h1c);
if (!h1s) {
b_reset(&h1c->ibuf);
h1c->flags = (h1c->flags & ~(H1C_F_CS_IDLE|H1C_F_WAIT_NEXT_REQ)) | H1C_F_CS_ERROR;
goto no_parsing;
}
}
if (h1s->sess->t_idle == -1)
h1s->sess->t_idle = tv_ms_elapsed(&h1s->sess->tv_accept, &now) - h1s->sess->t_handshake;
/* Get the stream rxbuf */
buf = h1_get_buf(h1c, &h1s->rxbuf);
if (!buf) {
h1c->flags |= H1C_F_IN_SALLOC;
TRACE_STATE("waiting for stream rxbuf allocation", H1_EV_H1C_WAKE|H1_EV_H1C_BLK, h1c->conn);
return 0;
}
count = (buf->size - sizeof(struct htx) - global.tune.maxrewrite);
h1_process_input(h1c, buf, count);
h1_release_buf(h1c, &h1s->rxbuf);
h1_set_idle_expiration(h1c);
no_parsing:
if (h1c->flags & H1C_F_CS_ERROR) {
h1_handle_internal_err(h1c);
h1c->flags &= ~(H1C_F_CS_IDLE|H1C_F_WAIT_NEXT_REQ);
}
else if (h1s->flags & H1S_F_PARSING_ERROR) {
h1_handle_bad_req(h1c);
h1c->flags = (h1c->flags & ~(H1C_F_CS_IDLE|H1C_F_WAIT_NEXT_REQ)) | H1C_F_CS_ERROR;
}
else
goto end;
h1s = h1c->h1s;
}
h1_send(h1c);
if ((conn->flags & CO_FL_ERROR) || conn_xprt_read0_pending(conn) || (h1c->flags & H1C_F_CS_ERROR)) {
if (!(h1c->flags & H1C_F_CS_ATTACHED)) {
/* No conn-stream */
/* shutdown for reads and error on the frontend connection: Send an error */
if (!(h1c->flags & (H1C_F_IS_BACK|H1C_F_CS_ERROR))) {
if (h1_handle_bad_req(h1c))
h1_send(h1c);
h1c->flags = (h1c->flags & ~(H1C_F_CS_IDLE|H1C_F_WAIT_NEXT_REQ)) | H1C_F_CS_ERROR;
}
/* Handle pending error, if any (only possible on frontend connection) */
if (h1c->flags & H1C_F_ERR_PENDING) {
BUG_ON(h1c->flags & H1C_F_IS_BACK);
if (h1_send_error(h1c))
h1_send(h1c);
}
/* If there is some pending outgoing data or error, just wait */
if (b_data(&h1c->obuf) || (h1c->flags & H1C_F_ERR_PENDING))
goto end;
/* Otherwise we can release the H1 connection */
goto release;
}
else {
/* Here there is still a H1 stream with a conn-stream.
* Report the connection state at the stream level
*/
if (conn_xprt_read0_pending(conn)) {
h1s->flags |= H1S_F_REOS;
TRACE_STATE("read0 on connection", H1_EV_H1C_RECV, conn, h1s);
}
if ((h1c->flags & H1C_F_CS_ERROR) || (conn->flags & CO_FL_ERROR))
h1s->cs->flags |= CS_FL_ERROR;
TRACE_POINT(H1_EV_STRM_WAKE, h1c->conn, h1s);
if (h1s->cs->data_cb->wake) {
TRACE_POINT(H1_EV_STRM_WAKE, h1c->conn, h1s);
h1s->cs->data_cb->wake(h1s->cs);
}
}
}
if (!b_data(&h1c->ibuf))
h1_release_buf(h1c, &h1c->ibuf);
if ((h1c->flags & H1C_F_WANT_SPLICE) && !h1s_data_pending(h1s)) {
TRACE_DEVEL("xprt rcv_buf blocked (want_splice), notify h1s for recv", H1_EV_H1C_RECV, h1c->conn);
h1_wake_stream_for_recv(h1s);
}
if (b_data(&h1c->ibuf) && h1s->sess->t_idle == -1)
h1s->sess->t_idle = tv_ms_elapsed(&h1s->sess->tv_accept, &now) - h1s->sess->t_handshake;
if (conn_xprt_read0_pending(conn)) {
h1s->flags |= H1S_F_REOS;
TRACE_STATE("read0 on connection", H1_EV_H1C_RECV, conn, h1s);
}
if (!h1s_data_pending(h1s) && h1s && (h1c->flags & H1C_F_CS_ATTACHED) && h1s->cs->data_cb->wake &&
(h1s->flags & H1S_F_REOS || h1c->flags & H1C_F_CS_ERROR ||
conn->flags & (CO_FL_ERROR | CO_FL_SOCK_WR_SH))) {
if (h1c->flags & H1C_F_CS_ERROR || conn->flags & CO_FL_ERROR)
h1s->cs->flags |= CS_FL_ERROR;
TRACE_POINT(H1_EV_STRM_WAKE, h1c->conn, h1s);
h1s->cs->data_cb->wake(h1s->cs);
}
end:
h1_refresh_timeout(h1c);
TRACE_LEAVE(H1_EV_H1C_WAKE, conn);
@ -2451,7 +2494,7 @@ static struct task *h1_io_cb(struct task *t, void *ctx, unsigned short status)
ret = h1_send(h1c);
if (!(h1c->wait_event.events & SUB_RETRY_RECV))
ret |= h1_recv(h1c);
if (ret || !h1c->h1s)
if (ret || b_data(&h1c->ibuf))
ret = h1_process(h1c);
/* If we were in an idle list, we want to add it back into it,
* unless h1_process() returned -1, which mean it has destroyed
@ -2533,6 +2576,17 @@ static struct task *h1_timeout_task(struct task *t, void *context, unsigned shor
return t;
}
/* Try to send an error to the client */
if (!(h1c->flags & (H1C_F_IS_BACK|H1C_F_CS_ERROR|H1C_F_ERR_PENDING|H1C_F_CS_SHUTDOWN))) {
h1c->flags = (h1c->flags & ~H1C_F_CS_IDLE) | H1C_F_CS_ERROR;
if (h1_handle_req_tout(h1c))
h1_send(h1c);
if (b_data(&h1c->obuf) || (h1c->flags & H1C_F_ERR_PENDING)) {
h1_refresh_timeout(h1c);
return t;
}
}
/* We're about to destroy the connection, so make sure nobody attempts
* to steal it from us.
*/
@ -2649,7 +2703,7 @@ static void h1_detach(struct conn_stream *cs)
is_not_first = h1s->flags & H1S_F_NOT_FIRST;
h1s_destroy(h1s);
if ((h1c->flags & H1C_F_IS_BACK) && (h1c->flags & H1C_F_CS_IDLE)) {
if ((h1c->flags & (H1C_F_IS_BACK|H1C_F_CS_IDLE)) == (H1C_F_IS_BACK|H1C_F_CS_IDLE)) {
/* If there are any excess server data in the input buffer,
* release it and close the connection ASAP (some data may
* remain in the output buffer). This happens if a server sends
@ -2697,7 +2751,7 @@ static void h1_detach(struct conn_stream *cs)
release:
/* We don't want to close right now unless the connection is in error or shut down for writes */
if ((h1c->flags & (H1C_F_CS_ERROR|H1C_F_UPG_H2C)) ||
if ((h1c->flags & H1C_F_CS_ERROR) ||
(h1c->conn->flags & (CO_FL_ERROR|CO_FL_SOCK_WR_SH)) ||
((h1c->flags & H1C_F_CS_SHUTDOWN) && !b_data(&h1c->obuf)) ||
!h1c->conn->owner) {
@ -2714,6 +2768,7 @@ static void h1_detach(struct conn_stream *cs)
else
h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
}
h1_set_idle_expiration(h1c);
h1_refresh_timeout(h1c);
}
end:
@ -2743,8 +2798,8 @@ static void h1_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
goto do_shutr;
}
if ((h1c->flags & H1C_F_UPG_H2C) || (h1s->flags & H1S_F_WANT_KAL)) {
TRACE_STATE("keep connection alive (upg_h2c|want_kal)", H1_EV_STRM_SHUT, h1c->conn, h1s);
if (h1s->flags & H1S_F_WANT_KAL) {
TRACE_STATE("keep connection alive (want_kal)", H1_EV_STRM_SHUT, h1c->conn, h1s);
goto end;
}
@ -2781,9 +2836,8 @@ static void h1_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
goto do_shutw;
}
if ((h1c->flags & H1C_F_UPG_H2C) ||
((h1s->flags & H1S_F_WANT_KAL) && h1s->req.state == H1_MSG_DONE && h1s->res.state == H1_MSG_DONE)) {
TRACE_STATE("keep connection alive (upg_h2c|want_kal)", H1_EV_STRM_SHUT, h1c->conn, h1s);
if (((h1s->flags & H1S_F_WANT_KAL) && h1s->req.state == H1_MSG_DONE && h1s->res.state == H1_MSG_DONE)) {
TRACE_STATE("keep connection alive (want_kal)", H1_EV_STRM_SHUT, h1c->conn, h1s);
goto end;
}