diff --git a/include/haproxy/channel.h b/include/haproxy/channel.h index de51d0467..65fd57d2f 100644 --- a/include/haproxy/channel.h +++ b/include/haproxy/channel.h @@ -312,16 +312,16 @@ static inline size_t ci_contig_data(const struct channel *c) } /* Initialize all fields in the channel. */ -static inline void channel_init(struct channel *chn) +static inline void channel_init(struct channel *chn, struct buffer *input) { - chn->buf = BUF_NULL; + chn->buf = *input; chn->to_forward = 0; chn->last_read = now_ms; chn->xfer_small = chn->xfer_large = 0; - chn->total = 0; + chn->total = (IS_HTX_STRM(chn_strm(chn)) ? htxbuf(input)->data : b_data(input)); chn->pipe = NULL; chn->analysers = 0; - chn->flags = 0; + chn->flags = (chn->total ? CF_READ_PARTIAL : 0); chn->output = 0; } diff --git a/include/haproxy/stream.h b/include/haproxy/stream.h index e50fbc8af..e5c9c2937 100644 --- a/include/haproxy/stream.h +++ b/include/haproxy/stream.h @@ -58,8 +58,8 @@ extern struct list streams; extern struct data_cb sess_conn_cb; -struct stream *stream_new(struct session *sess, enum obj_type *origin); -int stream_create_from_cs(struct conn_stream *cs); +struct stream *stream_new(struct session *sess, enum obj_type *origin, struct buffer *input); +int stream_create_from_cs(struct conn_stream *cs, struct buffer *input); /* kill a stream and set the termination flags to (one of SF_ERR_*) */ void stream_shutdown(struct stream *stream, int why); diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 58a7e7903..2de26b4a5 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -1983,7 +1983,7 @@ spoe_create_appctx(struct spoe_config *conf) if (!sess) goto out_free_spoe; - if ((strm = stream_new(sess, &appctx->obj_type)) == NULL) + if ((strm = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL) goto out_free_sess; stream_set_backend(strm, conf->agent->b.be); diff --git a/src/hlua.c b/src/hlua.c index fbccaaaaa..b6138251a 100644 --- a/src/hlua.c +++ b/src/hlua.c @@ -2843,7 +2843,7 @@ __LJMP static int hlua_socket_new(lua_State *L) goto out_fail_sess; } - strm = stream_new(sess, &appctx->obj_type); + strm = stream_new(sess, &appctx->obj_type, &BUF_NULL); if (!strm) { hlua_pusherror(L, "socket: out of memory"); goto out_fail_stream; diff --git a/src/mux_h1.c b/src/mux_h1.c index fc4fde7bb..e2c969361 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -508,7 +508,7 @@ static inline size_t h1s_data_pending(const struct h1s *h1s) return b_data(&h1s->h1c->ibuf); } -static struct conn_stream *h1s_new_cs(struct h1s *h1s) +static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input) { struct conn_stream *cs; @@ -529,10 +529,11 @@ static struct conn_stream *h1s_new_cs(struct h1s *h1s) cs->flags |= CS_FL_MAY_SPLICE; } - if (stream_create_from_cs(cs) < 0) { + if (stream_create_from_cs(cs, input) < 0) { TRACE_DEVEL("leaving on stream creation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1s->h1c->conn, h1s); goto err; } + *input = BUF_NULL; TRACE_LEAVE(H1_EV_STRM_NEW, h1s->h1c->conn, h1s); return cs; @@ -597,7 +598,7 @@ static struct h1s *h1s_create(struct h1c *h1c, struct conn_stream *cs, struct se h1s->cs = cs; } else { - cs = h1s_new_cs(h1s); + cs = h1s_new_cs(h1s, &BUF_NULL); if (!cs) goto fail; } diff --git a/src/mux_h2.c b/src/mux_h2.c index 741708d19..75874a38e 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -1483,7 +1483,7 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id) cs->ctx = h2s; h2c->nb_cs++; - if (stream_create_from_cs(cs) < 0) + if (stream_create_from_cs(cs, &BUF_NULL) < 0) goto out_free_cs; /* We want the accept date presented to the next stream to be the one diff --git a/src/mux_pt.c b/src/mux_pt.c index 57c1b9ef4..3161d1676 100644 --- a/src/mux_pt.c +++ b/src/mux_pt.c @@ -112,7 +112,7 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio if (!cs) goto fail_free_ctx; - if (stream_create_from_cs(cs) < 0) + if (stream_create_from_cs(cs, &BUF_NULL) < 0) goto fail_free; } diff --git a/src/peers.c b/src/peers.c index abc2c596b..b5c1d429f 100644 --- a/src/peers.c +++ b/src/peers.c @@ -2675,7 +2675,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer goto out_free_appctx; } - if ((s = stream_new(sess, &appctx->obj_type)) == NULL) { + if ((s = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL) { ha_alert("Failed to initialize stream in peer_session_create().\n"); goto out_free_sess; } diff --git a/src/sink.c b/src/sink.c index a7f689780..35bf7707c 100644 --- a/src/sink.c +++ b/src/sink.c @@ -640,7 +640,7 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink goto out_free_appctx; } - if ((s = stream_new(sess, &appctx->obj_type)) == NULL) { + if ((s = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL) { ha_alert("Failed to initialize stream in peer_session_create().\n"); goto out_free_sess; } diff --git a/src/stream.c b/src/stream.c index abef21032..0ca54afe0 100644 --- a/src/stream.c +++ b/src/stream.c @@ -269,11 +269,11 @@ static void strm_trace(enum trace_level level, uint64_t mask, const struct trace * valid right after the handshake, before the connection's data layer is * initialized, because it relies on the session to be in conn->owner. */ -int stream_create_from_cs(struct conn_stream *cs) +int stream_create_from_cs(struct conn_stream *cs, struct buffer *input) { struct stream *strm; - strm = stream_new(cs->conn->owner, &cs->obj_type); + strm = stream_new(cs->conn->owner, &cs->obj_type, input); if (strm == NULL) return -1; @@ -313,9 +313,11 @@ int stream_buf_available(void *arg) * end point is assigned to , which must be valid. The stream's task * is configured with a nice value inherited from the listener's nice if any. * The task's context is set to the new stream, and its function is set to - * process_stream(). Target and analysers are null. + * process_stream(). Target and analysers are null. is always used as + * Input buffer and may contain data. It is the caller responsibility to not + * reuse it anymore. may point on BUF_NULL. */ -struct stream *stream_new(struct session *sess, enum obj_type *origin) +struct stream *stream_new(struct session *sess, enum obj_type *origin, struct buffer *input) { struct stream *s; struct task *t; @@ -405,8 +407,6 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) * when the default backend is assigned. */ s->be = sess->fe; - s->req.buf = BUF_NULL; - s->res.buf = BUF_NULL; s->req_cap = NULL; s->res_cap = NULL; @@ -462,7 +462,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) /* init store persistence */ s->store_count = 0; - channel_init(&s->req); + channel_init(&s->req, input); s->req.flags |= CF_READ_ATTACHED; /* the producer is already connected */ s->req.analysers = sess->listener ? sess->listener->analysers : 0; @@ -477,7 +477,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) s->req.wex = TICK_ETERNITY; s->req.analyse_exp = TICK_ETERNITY; - channel_init(&s->res); + channel_init(&s->res, &BUF_NULL); s->res.flags |= CF_ISRESP; s->res.analysers = 0;