diff --git a/include/haproxy/channel.h b/include/haproxy/channel.h index 65fd57d2f..de51d0467 100644 --- a/include/haproxy/channel.h +++ b/include/haproxy/channel.h @@ -312,16 +312,16 @@ static inline size_t ci_contig_data(const struct channel *c) } /* Initialize all fields in the channel. */ -static inline void channel_init(struct channel *chn, struct buffer *input) +static inline void channel_init(struct channel *chn) { - chn->buf = *input; + chn->buf = BUF_NULL; chn->to_forward = 0; chn->last_read = now_ms; chn->xfer_small = chn->xfer_large = 0; - chn->total = (IS_HTX_STRM(chn_strm(chn)) ? htxbuf(input)->data : b_data(input)); + chn->total = 0; chn->pipe = NULL; chn->analysers = 0; - chn->flags = (chn->total ? CF_READ_PARTIAL : 0); + chn->flags = 0; chn->output = 0; } diff --git a/src/mux_h1.c b/src/mux_h1.c index ced300e17..adfe0e51c 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -563,6 +563,13 @@ static inline size_t h1s_data_pending(const struct h1s *h1s) return b_data(&h1s->h1c->ibuf); } +/* Creates a new conn-stream and the associate stream. <input> is used as input + * buffer for the stream. On success, it is transferred to the stream and the + * mux is no longer responsible of it. On error, <input> is unchanged, thus the + * mux must still take care of it. However, there is nothing special to do + * because, on success, <input> is updated to points on BUF_NULL. Thus, calling + * b_free() on it is always safe. This function returns the conn-stream on + * success or NULL on error. */ static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input) { struct conn_stream *cs; @@ -589,7 +596,6 @@ static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input) TRACE_DEVEL("leaving on stream creation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1s->h1c->conn, h1s); goto err; } - *input = BUF_NULL; TRACE_LEAVE(H1_EV_STRM_NEW, h1s->h1c->conn, h1s); return cs; diff --git a/src/stream.c b/src/stream.c index d121fbe7b..14138f066 100644 --- a/src/stream.c +++ b/src/stream.c @@ -267,7 +267,10 @@ static void strm_trace(enum trace_level level, uint64_t mask, const struct trace /* Create a new stream for connection <conn>. Return < 0 on error. This is only * valid right after the handshake, before the connection's data layer is - * initialized, because it relies on the session to be in conn->owner. + * initialized, because it relies on the session to be in conn->owner. On + * success, <input> buffer is transferred to the stream and thus points to + * BUF_NULL. On error, it is unchanged and it is the caller responsibility to + * release it. */ int stream_create_from_cs(struct conn_stream *cs, struct buffer *input) { @@ -313,9 +316,10 @@ int stream_buf_available(void *arg) * end point is assigned to <origin>, which must be valid. The stream's task * is configured with a nice value inherited from the listener's nice if any. * The task's context is set to the new stream, and its function is set to - * process_stream(). Target and analysers are null. <input> is always used as - * Input buffer and may contain data. It is the caller responsibility to not - * reuse it anymore. <input> may point on BUF_NULL. + * process_stream(). Target and analysers are null. <input> is used as input + * buffer for the request channel and may contain data. On success, it is + * transfer to the stream and <input> is set to BUF_NULL. On error, <input> + * buffer is unchanged and it is the caller responsibility to release it. */ struct stream *stream_new(struct session *sess, enum obj_type *origin, struct buffer *input) { @@ -462,7 +466,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin, struct bu /* init store persistence */ s->store_count = 0; - channel_init(&s->req, input); + channel_init(&s->req); s->req.flags |= CF_READ_ATTACHED; /* the producer is already connected */ s->req.analysers = sess->listener ? sess->listener->analysers : 0; @@ -477,7 +481,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin, struct bu s->req.wex = TICK_ETERNITY; s->req.analyse_exp = TICK_ETERNITY; - channel_init(&s->res, &BUF_NULL); + channel_init(&s->res); s->res.flags |= CF_ISRESP; s->res.analysers = 0; @@ -515,6 +519,17 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin, struct bu if (sess->fe->accept && sess->fe->accept(s) < 0) goto out_fail_accept; + if (!b_is_null(input)) { + /* Xfer the input buffer to the request channel. <input> will + * than point to BUF_NULL. From this point, it is the stream + * responsibility to release it. + */ + s->req.buf = *input; + *input = BUF_NULL; + s->req.total = (IS_HTX_STRM(s) ? htxbuf(input)->data : b_data(input)); + s->req.flags |= (s->req.total ? CF_READ_PARTIAL : 0); + } + /* it is important not to call the wakeup function directly but to * pass through task_wakeup(), because this one knows how to apply * priorities to tasks. Using multi thread we must be sure that