MEDIUM: mux-h1: Handle errors and timeouts in the stream

To do so, the stream is created as earlier as possible. It means, during the mux
creation for the first request, and for others, just at the end of the previous
transaction. Because all timeouts are handled by the strream, the mux's task is
now useless, so it is removed. Finally, to report errors, flags are set on the
HTX message. The HTX message is passed to the stream if there is some content to
analyse or if there is some error to handle.

All of this will probably be reworked later to handle errors and timeouts
directly in the mux. For now, it is the simpler way to handle all of this.
This commit is contained in:
Christopher Faulet 2018-10-31 17:40:50 +01:00 committed by Willy Tarreau
parent ed28da534a
commit 473652733a
2 changed files with 171 additions and 257 deletions

View File

@ -80,10 +80,6 @@ struct h1c {
struct wait_event wait_event; /* To be used if we're waiting for I/Os */
struct h1s *h1s; /* H1 stream descriptor */
struct task *task; /* timeout management task */
int idle_exp; /* expiration date for idle connections, in ticks (client-side only)*/
int http_exp; /* expiration date for HTTP headers parsing (client-side only) */
};
/* H1 stream descriptor */
@ -108,7 +104,6 @@ struct h1s {
static struct pool_head *pool_head_h1c;
static struct pool_head *pool_head_h1s;
static struct task *h1_timeout_task(struct task *t, void *context, unsigned short state);
static int h1_recv(struct h1c *h1c);
static int h1_send(struct h1c *h1c);
static int h1_process(struct h1c *h1c);
@ -225,80 +220,6 @@ static int h1_avail_streams(struct connection *conn)
/*****************************************************************/
/* functions below are dedicated to the mux setup and management */
/*****************************************************************/
static struct h1s *h1s_create(struct h1c *h1c, struct conn_stream *cs)
{
struct h1s *h1s;
h1s = pool_alloc(pool_head_h1s);
if (!h1s)
goto end;
h1s->h1c = h1c;
h1c->h1s = h1s;
h1s->cs = NULL;
h1s->rxbuf = BUF_NULL;
h1s->flags = H1S_F_NONE;
h1s->recv_wait = NULL;
h1s->send_wait = NULL;
h1m_init_req(&h1s->req);
h1s->req.flags |= H1_MF_NO_PHDR;
h1m_init_res(&h1s->res);
h1s->res.flags |= H1_MF_NO_PHDR;
h1s->status = 0;
h1s->meth = HTTP_METH_OTHER;
if (!conn_is_back(h1c->conn)) {
if (h1c->px->options2 & PR_O2_REQBUG_OK)
h1s->req.err_pos = -1;
if (h1c->flags & H1C_F_WAIT_NEXT_REQ)
h1s->flags |= H1S_F_NOT_FIRST;
h1c->flags &= ~H1C_F_WAIT_NEXT_REQ;
h1c->http_exp = tick_add_ifset(now_ms, h1c->px->timeout.httpreq);
}
else {
if (h1c->px->options2 & PR_O2_RSPBUG_OK)
h1s->res.err_pos = -1;
}
/* If a conn_stream already exists, attach it to this H1S */
if (cs) {
cs->ctx = h1s;
h1s->cs = cs;
}
end:
return h1s;
}
static void h1s_destroy(struct h1s *h1s)
{
if (h1s) {
struct h1c *h1c = h1s->h1c;
h1c->h1s = NULL;
h1c->flags &= ~(H1C_F_RX_FULL|H1C_F_RX_ALLOC);
if (h1s->recv_wait != NULL)
h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
if (h1s->send_wait != NULL)
h1s->send_wait->wait_reason &= ~SUB_CAN_SEND;
if (!conn_is_back(h1c->conn)) {
h1c->flags |= H1C_F_WAIT_NEXT_REQ;
h1c->http_exp = tick_add_ifset(now_ms, h1c->px->timeout.httpka);
}
h1_release_buf(h1c, &h1s->rxbuf);
cs_free(h1s->cs);
pool_free(pool_head_h1s, h1s);
}
}
static struct conn_stream *h1s_new_cs(struct h1s *h1s)
{
struct conn_stream *cs;
@ -322,6 +243,88 @@ static struct conn_stream *h1s_new_cs(struct h1s *h1s)
return NULL;
}
static struct h1s *h1s_create(struct h1c *h1c, struct conn_stream *cs)
{
struct h1s *h1s;
h1s = pool_alloc(pool_head_h1s);
if (!h1s)
goto fail;
h1s->h1c = h1c;
h1c->h1s = h1s;
h1s->cs = NULL;
h1s->rxbuf = BUF_NULL;
h1s->flags = H1S_F_NONE;
h1s->recv_wait = NULL;
h1s->send_wait = NULL;
h1m_init_req(&h1s->req);
h1s->req.flags |= H1_MF_NO_PHDR;
h1m_init_res(&h1s->res);
h1s->res.flags |= H1_MF_NO_PHDR;
h1s->status = 0;
h1s->meth = HTTP_METH_OTHER;
if (h1c->flags & H1C_F_WAIT_NEXT_REQ)
h1s->flags |= H1S_F_NOT_FIRST;
h1c->flags &= ~H1C_F_WAIT_NEXT_REQ;
if (!conn_is_back(h1c->conn)) {
if (h1c->px->options2 & PR_O2_REQBUG_OK)
h1s->req.err_pos = -1;
}
else {
if (h1c->px->options2 & PR_O2_RSPBUG_OK)
h1s->res.err_pos = -1;
}
if (cs) {
/* If a conn_stream already exists, attach it to this H1S */
cs->ctx = h1s;
h1s->cs = cs;
}
#if 1
else {
cs = h1s_new_cs(h1s);
if (!cs)
goto fail;
}
#endif
return h1s;
fail:
pool_free(pool_head_h1s, h1s);
return NULL;
}
static void h1s_destroy(struct h1s *h1s)
{
if (h1s) {
struct h1c *h1c = h1s->h1c;
h1c->h1s = NULL;
h1c->flags &= ~(H1C_F_RX_FULL|H1C_F_RX_ALLOC);
if (h1s->recv_wait != NULL)
h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
if (h1s->send_wait != NULL)
h1s->send_wait->wait_reason &= ~SUB_CAN_SEND;
h1c->flags |= H1C_F_WAIT_NEXT_REQ;
if (h1s->flags & (H1S_F_REQ_ERROR|H1S_F_RES_ERROR))
h1c->flags |= H1C_F_CS_ERROR;
h1_release_buf(h1c, &h1s->rxbuf);
cs_free(h1s->cs);
pool_free(pool_head_h1s, h1s);
}
}
/*
* Initialize the mux once it's attached. It is expected that conn->mux_ctx
* points to the existing conn_stream (for outgoing connections) or NULL (for
@ -330,7 +333,6 @@ static struct conn_stream *h1s_new_cs(struct h1s *h1s)
static int h1_init(struct connection *conn, struct proxy *proxy)
{
struct h1c *h1c;
struct task *t = NULL;
h1c = pool_alloc(pool_head_h1c);
if (!h1c)
@ -343,17 +345,6 @@ static int h1_init(struct connection *conn, struct proxy *proxy)
h1c->obuf = BUF_NULL;
h1c->h1s = NULL;
t = task_new(tid_bit);
if (!t)
goto fail;
h1c->task = t;
t->process = h1_timeout_task;
t->context = h1c;
t->expire = TICK_ETERNITY;
h1c->idle_exp = TICK_ETERNITY;
h1c->http_exp = TICK_ETERNITY;
LIST_INIT(&h1c->buf_wait.list);
h1c->wait_event.task = tasklet_new();
if (!h1c->wait_event.task)
@ -370,7 +361,6 @@ static int h1_init(struct connection *conn, struct proxy *proxy)
goto fail;
conn->mux_ctx = h1c;
task_wakeup(t, TASK_WOKEN_INIT);
/* Try to read, if nothing is available yet we'll just subscribe */
if (h1_recv(h1c))
@ -380,9 +370,7 @@ static int h1_init(struct connection *conn, struct proxy *proxy)
return 0;
fail:
if (t)
task_free(t);
if (h1c && h1c->wait_event.task)
if (h1c->wait_event.task)
tasklet_free(h1c->wait_event.task);
pool_free(pool_head_h1c, h1c);
fail_h1c:
@ -410,11 +398,6 @@ static void h1_release(struct connection *conn)
h1_release_buf(h1c, &h1c->ibuf);
h1_release_buf(h1c, &h1c->obuf);
if (h1c->task) {
h1c->task->context = NULL;
task_wakeup(h1c->task, TASK_WOKEN_OTHER);
h1c->task = NULL;
}
if (h1c->wait_event.task)
tasklet_free(h1c->wait_event.task);
@ -438,21 +421,6 @@ static void h1_release(struct connection *conn)
/******************************************************/
/* functions below are for the H1 protocol processing */
/******************************************************/
/*
* Set the appropriate error message. It first tries to get it from the proxy if
* it exists. Otherwise, it falls back on default one.
*/
static void h1_cpy_error_message(struct h1c *h1c, struct buffer *dst, int status)
{
const int msgnum = http_get_status_idx(status);
const struct buffer *err;
err = (h1c->px->errmsg[msgnum].area
? &h1c->px->errmsg[msgnum]
: &http_err_chunks[msgnum]);
b_putblk(dst, b_head(err), b_data(err));
}
/* Parse the request version and set H1_MF_VER_11 on <h1m> if the version is
* greater or equal to 1.1
*/
@ -1099,22 +1067,22 @@ static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, size_t count
h1s = NULL;
/* Create a new H1S without CS if not already done */
/* Create a new H1S if not already done */
if (!h1c->h1s && !h1s_create(h1c, NULL))
goto err;
goto fatal_err;
h1s = h1c->h1s;
#if 0
// FIXME: Use a proxy option to enable early creation of the CS
/* Create the CS if not already attached to the H1S */
if (!h1s->cs && !h1s_new_cs(h1s))
goto err;
goto fatal_err;
#endif
if (!count)
goto end;
if (!h1_get_buf(h1c, &h1s->rxbuf)) {
h1c->flags |= H1C_F_RX_ALLOC;
goto end;
}
htx = htx_from_buf(&h1s->rxbuf);
if (!conn_is_back(h1c->conn)) {
@ -1132,13 +1100,11 @@ static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, size_t count
ret = h1_process_headers(h1s, h1m, htx, buf, &total, max);
if (!ret)
break;
/* Reset request timeout */
h1s->h1c->http_exp = TICK_ETERNITY;
#if 0
/* Create the CS if not already attached to the H1S */
if (!h1s->cs && !h1s_new_cs(h1s))
goto err;
goto fatal_err;
#endif
}
else if (h1m->state <= H1_MSG_TRAILERS) {
/* Do not parse the body if the header part is not yet
@ -1165,59 +1131,41 @@ static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, size_t count
max -= ret;
}
if (h1s->flags & errflag) {
if (conn_is_back(h1c->conn))
goto err;
// FIXME: Do following actions when an error is catched during
// the request parsing:
//
// * Do same than stream_inc_http_req_ctr,
// stream_inc_http_err_ctr and proxy_inc_fe_req_ctr
// * Capture bad message for snapshots
// * Increment fe->fe_counters.failed_req and
// listeners->counters->failed_req
//
// FIXME: Do following actions when an error is catched during
// the response parsing:
//
// * Capture bad message for snapshots
// * increment be->be_counters.failed_resp
// * increment srv->counters.failed_resp (if srv assigned)
if (!h1_get_buf(h1c, &h1c->obuf)) {
h1c->flags |= H1C_F_OUT_ALLOC;
goto err;
}
h1_cpy_error_message(h1c, &h1c->obuf, 400);
goto err;
}
if (h1s->flags & errflag)
goto parsing_err;
b_del(buf, total);
if (htx_is_not_empty(htx)) {
b_set_data(&h1s->rxbuf, b_size(&h1s->rxbuf));
if (!htx_free_data_space(htx))
h1c->flags |= H1C_F_RX_FULL;
if (h1s->recv_wait) {
h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
tasklet_wakeup(h1s->recv_wait->task);
h1s->recv_wait = NULL;
}
}
else
h1_release_buf(h1c, &h1s->rxbuf);
ret = count - max;
if (h1s->recv_wait) {
h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
tasklet_wakeup(h1s->recv_wait->task);
h1s->recv_wait = NULL;
}
end:
return ret;
err:
//h1s_destroy(h1s);
fatal_err:
h1c->flags |= H1C_F_CS_ERROR;
if (!h1s || !h1s->cs)
sess_log(h1c->conn->owner);
sess_log(h1c->conn->owner);
return 0;
parsing_err:
// FIXME: create an error snapshot here
b_reset(&h1c->ibuf);
h1s->cs->flags |= CS_FL_REOS;
if (h1s->recv_wait) {
h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
tasklet_wakeup(h1s->recv_wait->task);
h1s->recv_wait = NULL;
}
return 0;
}
@ -1236,6 +1184,8 @@ static size_t h1_process_output(struct h1c *h1c, struct buffer *buf, size_t coun
size_t total = 0;
int errflag;
if (!count)
goto end;
chn_htx = htx_from_buf(buf);
if (!h1_get_buf(h1c, &h1c->obuf)) {
@ -1391,7 +1341,6 @@ static size_t h1_process_output(struct h1c *h1c, struct buffer *buf, size_t coun
htx_reset(chn_htx);
b_set_data(buf, 0);
}
end:
return total;
}
@ -1411,12 +1360,15 @@ static size_t h1_xfer(struct h1s *h1s, struct buffer *buf, int flags)
h1m = (!conn_is_back(h1c->conn) ? &h1s->req : &h1s->res);
mux_htx = htx_from_buf(&h1s->rxbuf);
if (htx_is_empty(mux_htx))
goto end;
chn_htx = htx_from_buf(buf);
if (h1s->flags & (H1S_F_REQ_ERROR|H1S_F_RES_ERROR)) {
chn_htx->flags |= HTX_FL_PARSING_ERROR;
b_set_data(buf, b_size(buf));
goto end;
}
if (htx_is_empty(mux_htx))
goto end;
count = htx_free_space(chn_htx);
if (flags & CO_RFL_KEEP_RSV) {
if (count < global.tune.maxrewrite)
@ -1442,7 +1394,6 @@ static size_t h1_xfer(struct h1s *h1s, struct buffer *buf, int flags)
if (htx_is_not_empty(chn_htx))
b_set_data(buf, b_size(buf));
end:
if (h1c->flags & H1C_F_RX_FULL && htx_free_data_space(mux_htx)) {
h1c->flags &= ~H1C_F_RX_FULL;
@ -1501,8 +1452,11 @@ static int h1_recv(struct h1c *h1c)
h1c->flags &= ~H1C_F_IN_FULL;
ret = conn->xprt->rcv_buf(conn, &h1c->ibuf, max, 0);
}
if (ret > 0)
if (ret > 0) {
rcvd = 1;
if (h1c->h1s && h1c->h1s->cs)
h1c->h1s->cs->flags |= CS_FL_READ_PARTIAL;
}
if (h1_recv_allowed(h1c))
conn->xprt->subscribe(conn, SUB_CAN_RECV, &h1c->wait_event);
@ -1609,7 +1563,7 @@ static int h1_process(struct h1c * h1c)
{
struct connection *conn = h1c->conn;
if (b_data(&h1c->ibuf) && !(h1c->flags & (H1C_F_CS_ERROR|H1C_F_RX_FULL|H1C_F_RX_ALLOC))) {
if (!(h1c->flags & (H1C_F_CS_ERROR|H1C_F_RX_FULL|H1C_F_RX_ALLOC))) {
size_t ret;
ret = h1_process_input(h1c, &h1c->ibuf, b_data(&h1c->ibuf));
@ -1630,7 +1584,7 @@ static int h1_process(struct h1c * h1c)
h1c->flags &= ~H1C_F_CS_WAIT_CONN;
h1_wake_stream(h1c);
}
return 0;
goto end;
}
if ((h1c->flags & H1C_F_CS_ERROR) || (conn->flags & CO_FL_ERROR) || conn_xprt_read0_pending(conn)) {
@ -1641,20 +1595,7 @@ static int h1_process(struct h1c * h1c)
}
}
/* If there is a stream attached to the mux, let it
* handle the timeout.
*/
if (h1c->h1s && h1c->h1s->cs)
h1c->idle_exp = TICK_ETERNITY;
else {
int tout = (!conn_is_back(conn)
? h1c->px->timeout.client
: h1c->px->timeout.server);
h1c->idle_exp = tick_add_ifset(now_ms, tout);
}
h1c->task->expire = tick_first(h1c->http_exp, h1c->idle_exp);
if (tick_isset(h1c->task->expire))
task_queue(h1c->task);
end:
return 0;
}
@ -1677,74 +1618,9 @@ static int h1_wake(struct connection *conn)
{
struct h1c *h1c = conn->mux_ctx;
//return 0;
return (h1_process(h1c));
}
/* Connection timeout management. The principle is that if there's no receipt
* nor sending for a certain amount of time, the connection is closed.
*/
static struct task *h1_timeout_task(struct task *t, void *context, unsigned short state)
{
struct h1c *h1c = context;
int expired = tick_is_expired(t->expire, now_ms);
if (!h1c)
goto end;
if (!expired) {
t->expire = tick_first(t->expire, tick_first(h1c->idle_exp, h1c->http_exp));
return t;
}
h1c->flags |= H1C_F_CS_ERROR;
h1c->idle_exp = TICK_ETERNITY;
h1c->http_exp = TICK_ETERNITY;
t->expire = TICK_ETERNITY;
/* Don't try send error message on the server-side */
if (conn_is_back(h1c->conn))
goto release;
/* Don't send error message if no input data is pending _AND_ if null
* requests is ignored or it's not the first request.
*/
if (!b_data(&h1c->ibuf) && (h1c->px->options & PR_O_IGNORE_PRB ||
h1c->flags & H1C_F_WAIT_NEXT_REQ))
goto release;
/* Try to allocate output buffer to store the error message. If
* allocation fails, just go away.
*/
if (!h1_get_buf(h1c, &h1c->obuf))
goto release;
// FIXME: Do the following:
//
// * Do same than stream_inc_http_req_ctr,
// stream_inc_http_err_ctr and proxy_inc_fe_req_ctr
// * Capture bad message for snapshots
// * Increment fe->fe_counters.failed_req and
// listeners->counters->failed_req
h1_cpy_error_message(h1c, &h1c->obuf, 408);
tasklet_wakeup(h1c->wait_event.task);
sess_log(h1c->conn->owner);
return t;
release:
if (h1c->h1s) {
tasklet_wakeup(h1c->wait_event.task);
return t;
}
h1c->task = NULL;
h1_release(h1c->conn);
end:
task_delete(t);
task_free(t);
return NULL;
}
/*******************************************/
/* functions below are used by the streams */
/*******************************************/

View File

@ -117,6 +117,16 @@ int htx_wait_for_request(struct stream *s, struct channel *req, int an_bit)
* a bad request is.
*/
if (unlikely(htx_is_empty(htx) || htx_get_tail_type(htx) < HTX_BLK_EOH)) {
/*
* First catch invalid request
*/
if (htx->flags & HTX_FL_PARSING_ERROR) {
stream_inc_http_req_ctr(s);
stream_inc_http_err_ctr(s);
proxy_inc_fe_req_ctr(sess->fe);
goto return_bad_req;
}
/* 1: have we encountered a read error ? */
if (req->flags & CF_READ_ERROR) {
if (!(s->flags & SF_ERR_MASK))
@ -217,8 +227,7 @@ int htx_wait_for_request(struct stream *s, struct channel *req, int an_bit)
setsockopt(__objt_conn(sess->origin)->handle.fd, IPPROTO_TCP, TCP_QUICKACK, &one, sizeof(one));
}
#endif
if ((msg->msg_state != HTTP_MSG_RQBEFORE) && (txn->flags & TX_WAIT_NEXT_RQ)) {
if ((req->flags & CF_READ_PARTIAL) && (txn->flags & TX_WAIT_NEXT_RQ)) {
/* If the client starts to talk, let's fall back to
* request timeout processing.
*/
@ -228,9 +237,7 @@ int htx_wait_for_request(struct stream *s, struct channel *req, int an_bit)
/* just set the request timeout once at the beginning of the request */
if (!tick_isset(req->analyse_exp)) {
if ((msg->msg_state == HTTP_MSG_RQBEFORE) &&
(txn->flags & TX_WAIT_NEXT_RQ) &&
tick_isset(s->be->timeout.httpka))
if ((txn->flags & TX_WAIT_NEXT_RQ) && tick_isset(s->be->timeout.httpka))
req->analyse_exp = tick_add(now_ms, s->be->timeout.httpka);
else
req->analyse_exp = tick_add_ifset(now_ms, s->be->timeout.httpreq);
@ -1091,6 +1098,9 @@ int htx_wait_for_request_body(struct stream *s, struct channel *req, int an_bit)
goto http_end;
missing_data:
if (htx->flags & HTX_FL_PARSING_ERROR)
goto return_bad_req;
if ((req->flags & CF_READ_TIMEOUT) || tick_is_expired(req->analyse_exp, now_ms)) {
txn->status = 408;
htx_reply_and_close(s, txn->status, http_error_message(s));
@ -1305,6 +1315,8 @@ int htx_request_forward_body(struct stream *s, struct channel *req, int an_bit)
if (req->flags & CF_SHUTW)
goto aborted_xfer;
if (htx->flags & HTX_FL_PARSING_ERROR)
goto return_bad_req;
/* When TE: chunked is used, we need to get there again to parse remaining
* chunks even if the client has closed, so we don't want to set CF_DONTCLOSE.
@ -1438,6 +1450,12 @@ int htx_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
* errors somewhere else.
*/
if (unlikely(htx_is_empty(htx) || htx_get_tail_type(htx) < HTX_BLK_EOH)) {
/*
* First catch invalid response
*/
if (htx->flags & HTX_FL_PARSING_ERROR)
goto return_bad_res;
/* 1: have we encountered a read error ? */
if (rep->flags & CF_READ_ERROR) {
if (txn->flags & TX_NOT_FIRST)
@ -1704,6 +1722,23 @@ int htx_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
channel_auto_close(rep);
return 1;
return_bad_res:
HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1);
if (objt_server(s->target)) {
HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_resp, 1);
health_adjust(objt_server(s->target), HANA_STATUS_HTTP_HDRRSP);
}
txn->status = 502;
s->si[1].flags |= SI_FL_NOLINGER;
htx_reply_and_close(s, txn->status, http_error_message(s));
rep->analysers &= AN_RES_FLT_END;
if (!(s->flags & SF_ERR_MASK))
s->flags |= SF_ERR_PRXCOND;
if (!(s->flags & SF_FINST_MASK))
s->flags |= SF_FINST_H;
return 0;
abort_keep_alive:
/* A keep-alive request to the server failed on a network error.
* The client is required to retry. We need to close without returning
@ -2145,6 +2180,9 @@ int htx_response_forward_body(struct stream *s, struct channel *res, int an_bit)
if (res->flags & CF_SHUTW)
goto aborted_xfer;
if (htx->flags & HTX_FL_PARSING_ERROR)
goto return_bad_res;
/* stop waiting for data if the input is closed before the end. If the
* client side was already closed, it means that the client has aborted,
* so we don't want to count this as a server abort. Otherwise it's a