mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-04-07 01:31:35 +00:00
MINOR: stream: Pass an optional input buffer when a stream is created
It is now possible to set the buffer used by the channel request buffer when a stream is created. It may be useful if input data are already received, instead of waiting the first call to the mux rcv_buf() callback. This change is mandatory to support H1 connection with no stream attached. For now, the multiplexers don't pass any buffer. BUF_NULL is thus used to call stream_create_from_cs().
This commit is contained in:
parent
afc02a4436
commit
26256f86e1
@ -312,16 +312,16 @@ static inline size_t ci_contig_data(const struct channel *c)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Initialize all fields in the channel. */
|
/* Initialize all fields in the channel. */
|
||||||
static inline void channel_init(struct channel *chn)
|
static inline void channel_init(struct channel *chn, struct buffer *input)
|
||||||
{
|
{
|
||||||
chn->buf = BUF_NULL;
|
chn->buf = *input;
|
||||||
chn->to_forward = 0;
|
chn->to_forward = 0;
|
||||||
chn->last_read = now_ms;
|
chn->last_read = now_ms;
|
||||||
chn->xfer_small = chn->xfer_large = 0;
|
chn->xfer_small = chn->xfer_large = 0;
|
||||||
chn->total = 0;
|
chn->total = (IS_HTX_STRM(chn_strm(chn)) ? htxbuf(input)->data : b_data(input));
|
||||||
chn->pipe = NULL;
|
chn->pipe = NULL;
|
||||||
chn->analysers = 0;
|
chn->analysers = 0;
|
||||||
chn->flags = 0;
|
chn->flags = (chn->total ? CF_READ_PARTIAL : 0);
|
||||||
chn->output = 0;
|
chn->output = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -58,8 +58,8 @@ extern struct list streams;
|
|||||||
|
|
||||||
extern struct data_cb sess_conn_cb;
|
extern struct data_cb sess_conn_cb;
|
||||||
|
|
||||||
struct stream *stream_new(struct session *sess, enum obj_type *origin);
|
struct stream *stream_new(struct session *sess, enum obj_type *origin, struct buffer *input);
|
||||||
int stream_create_from_cs(struct conn_stream *cs);
|
int stream_create_from_cs(struct conn_stream *cs, struct buffer *input);
|
||||||
|
|
||||||
/* kill a stream and set the termination flags to <why> (one of SF_ERR_*) */
|
/* kill a stream and set the termination flags to <why> (one of SF_ERR_*) */
|
||||||
void stream_shutdown(struct stream *stream, int why);
|
void stream_shutdown(struct stream *stream, int why);
|
||||||
|
@ -1983,7 +1983,7 @@ spoe_create_appctx(struct spoe_config *conf)
|
|||||||
if (!sess)
|
if (!sess)
|
||||||
goto out_free_spoe;
|
goto out_free_spoe;
|
||||||
|
|
||||||
if ((strm = stream_new(sess, &appctx->obj_type)) == NULL)
|
if ((strm = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL)
|
||||||
goto out_free_sess;
|
goto out_free_sess;
|
||||||
|
|
||||||
stream_set_backend(strm, conf->agent->b.be);
|
stream_set_backend(strm, conf->agent->b.be);
|
||||||
|
@ -2843,7 +2843,7 @@ __LJMP static int hlua_socket_new(lua_State *L)
|
|||||||
goto out_fail_sess;
|
goto out_fail_sess;
|
||||||
}
|
}
|
||||||
|
|
||||||
strm = stream_new(sess, &appctx->obj_type);
|
strm = stream_new(sess, &appctx->obj_type, &BUF_NULL);
|
||||||
if (!strm) {
|
if (!strm) {
|
||||||
hlua_pusherror(L, "socket: out of memory");
|
hlua_pusherror(L, "socket: out of memory");
|
||||||
goto out_fail_stream;
|
goto out_fail_stream;
|
||||||
|
@ -508,7 +508,7 @@ static inline size_t h1s_data_pending(const struct h1s *h1s)
|
|||||||
return b_data(&h1s->h1c->ibuf);
|
return b_data(&h1s->h1c->ibuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
static struct conn_stream *h1s_new_cs(struct h1s *h1s)
|
static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input)
|
||||||
{
|
{
|
||||||
struct conn_stream *cs;
|
struct conn_stream *cs;
|
||||||
|
|
||||||
@ -529,10 +529,11 @@ static struct conn_stream *h1s_new_cs(struct h1s *h1s)
|
|||||||
cs->flags |= CS_FL_MAY_SPLICE;
|
cs->flags |= CS_FL_MAY_SPLICE;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (stream_create_from_cs(cs) < 0) {
|
if (stream_create_from_cs(cs, input) < 0) {
|
||||||
TRACE_DEVEL("leaving on stream creation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1s->h1c->conn, h1s);
|
TRACE_DEVEL("leaving on stream creation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1s->h1c->conn, h1s);
|
||||||
goto err;
|
goto err;
|
||||||
}
|
}
|
||||||
|
*input = BUF_NULL;
|
||||||
|
|
||||||
TRACE_LEAVE(H1_EV_STRM_NEW, h1s->h1c->conn, h1s);
|
TRACE_LEAVE(H1_EV_STRM_NEW, h1s->h1c->conn, h1s);
|
||||||
return cs;
|
return cs;
|
||||||
@ -597,7 +598,7 @@ static struct h1s *h1s_create(struct h1c *h1c, struct conn_stream *cs, struct se
|
|||||||
h1s->cs = cs;
|
h1s->cs = cs;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
cs = h1s_new_cs(h1s);
|
cs = h1s_new_cs(h1s, &BUF_NULL);
|
||||||
if (!cs)
|
if (!cs)
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
|
@ -1483,7 +1483,7 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id)
|
|||||||
cs->ctx = h2s;
|
cs->ctx = h2s;
|
||||||
h2c->nb_cs++;
|
h2c->nb_cs++;
|
||||||
|
|
||||||
if (stream_create_from_cs(cs) < 0)
|
if (stream_create_from_cs(cs, &BUF_NULL) < 0)
|
||||||
goto out_free_cs;
|
goto out_free_cs;
|
||||||
|
|
||||||
/* We want the accept date presented to the next stream to be the one
|
/* We want the accept date presented to the next stream to be the one
|
||||||
|
@ -112,7 +112,7 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio
|
|||||||
if (!cs)
|
if (!cs)
|
||||||
goto fail_free_ctx;
|
goto fail_free_ctx;
|
||||||
|
|
||||||
if (stream_create_from_cs(cs) < 0)
|
if (stream_create_from_cs(cs, &BUF_NULL) < 0)
|
||||||
goto fail_free;
|
goto fail_free;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -2675,7 +2675,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
|
|||||||
goto out_free_appctx;
|
goto out_free_appctx;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((s = stream_new(sess, &appctx->obj_type)) == NULL) {
|
if ((s = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL) {
|
||||||
ha_alert("Failed to initialize stream in peer_session_create().\n");
|
ha_alert("Failed to initialize stream in peer_session_create().\n");
|
||||||
goto out_free_sess;
|
goto out_free_sess;
|
||||||
}
|
}
|
||||||
|
@ -640,7 +640,7 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
|
|||||||
goto out_free_appctx;
|
goto out_free_appctx;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((s = stream_new(sess, &appctx->obj_type)) == NULL) {
|
if ((s = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL) {
|
||||||
ha_alert("Failed to initialize stream in peer_session_create().\n");
|
ha_alert("Failed to initialize stream in peer_session_create().\n");
|
||||||
goto out_free_sess;
|
goto out_free_sess;
|
||||||
}
|
}
|
||||||
|
16
src/stream.c
16
src/stream.c
@ -269,11 +269,11 @@ static void strm_trace(enum trace_level level, uint64_t mask, const struct trace
|
|||||||
* valid right after the handshake, before the connection's data layer is
|
* valid right after the handshake, before the connection's data layer is
|
||||||
* initialized, because it relies on the session to be in conn->owner.
|
* initialized, because it relies on the session to be in conn->owner.
|
||||||
*/
|
*/
|
||||||
int stream_create_from_cs(struct conn_stream *cs)
|
int stream_create_from_cs(struct conn_stream *cs, struct buffer *input)
|
||||||
{
|
{
|
||||||
struct stream *strm;
|
struct stream *strm;
|
||||||
|
|
||||||
strm = stream_new(cs->conn->owner, &cs->obj_type);
|
strm = stream_new(cs->conn->owner, &cs->obj_type, input);
|
||||||
if (strm == NULL)
|
if (strm == NULL)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
@ -313,9 +313,11 @@ int stream_buf_available(void *arg)
|
|||||||
* end point is assigned to <origin>, which must be valid. The stream's task
|
* end point is assigned to <origin>, which must be valid. The stream's task
|
||||||
* is configured with a nice value inherited from the listener's nice if any.
|
* is configured with a nice value inherited from the listener's nice if any.
|
||||||
* The task's context is set to the new stream, and its function is set to
|
* The task's context is set to the new stream, and its function is set to
|
||||||
* process_stream(). Target and analysers are null.
|
* process_stream(). Target and analysers are null. <input> is always used as
|
||||||
|
* Input buffer and may contain data. It is the caller responsibility to not
|
||||||
|
* reuse it anymore. <input> may point on BUF_NULL.
|
||||||
*/
|
*/
|
||||||
struct stream *stream_new(struct session *sess, enum obj_type *origin)
|
struct stream *stream_new(struct session *sess, enum obj_type *origin, struct buffer *input)
|
||||||
{
|
{
|
||||||
struct stream *s;
|
struct stream *s;
|
||||||
struct task *t;
|
struct task *t;
|
||||||
@ -405,8 +407,6 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
|
|||||||
* when the default backend is assigned.
|
* when the default backend is assigned.
|
||||||
*/
|
*/
|
||||||
s->be = sess->fe;
|
s->be = sess->fe;
|
||||||
s->req.buf = BUF_NULL;
|
|
||||||
s->res.buf = BUF_NULL;
|
|
||||||
s->req_cap = NULL;
|
s->req_cap = NULL;
|
||||||
s->res_cap = NULL;
|
s->res_cap = NULL;
|
||||||
|
|
||||||
@ -462,7 +462,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
|
|||||||
/* init store persistence */
|
/* init store persistence */
|
||||||
s->store_count = 0;
|
s->store_count = 0;
|
||||||
|
|
||||||
channel_init(&s->req);
|
channel_init(&s->req, input);
|
||||||
s->req.flags |= CF_READ_ATTACHED; /* the producer is already connected */
|
s->req.flags |= CF_READ_ATTACHED; /* the producer is already connected */
|
||||||
s->req.analysers = sess->listener ? sess->listener->analysers : 0;
|
s->req.analysers = sess->listener ? sess->listener->analysers : 0;
|
||||||
|
|
||||||
@ -477,7 +477,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
|
|||||||
s->req.wex = TICK_ETERNITY;
|
s->req.wex = TICK_ETERNITY;
|
||||||
s->req.analyse_exp = TICK_ETERNITY;
|
s->req.analyse_exp = TICK_ETERNITY;
|
||||||
|
|
||||||
channel_init(&s->res);
|
channel_init(&s->res, &BUF_NULL);
|
||||||
s->res.flags |= CF_ISRESP;
|
s->res.flags |= CF_ISRESP;
|
||||||
s->res.analysers = 0;
|
s->res.analysers = 0;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user