MEDIUM: mux: provide the session to the init() and attach() method.

Instead of trying to get the session from the connection, which is not
always there, and of course there could be multiple sessions per connection,
provide it with the init() and attach() methods, so that we know the
session for each outgoing stream.
This commit is contained in:
Olivier Houchard 2018-12-14 19:42:40 +01:00 committed by Willy Tarreau
parent 8a78690229
commit f502aca5c2
8 changed files with 48 additions and 45 deletions

View File

@ -816,11 +816,11 @@ static inline struct wait_event *wl_set_waitcb(struct wait_event *wl, struct tas
* Returns < 0 on error.
*/
static inline int conn_install_mux(struct connection *conn, const struct mux_ops *mux,
void *ctx, struct proxy *prx)
void *ctx, struct proxy *prx, struct session *sess)
{
conn->mux = mux;
conn->mux_ctx = ctx;
return mux->init ? mux->init(conn, prx) : 0;
return mux->init ? mux->init(conn, prx, sess) : 0;
}
/* returns a human-readable error code for conn->err_code, or NULL if the code
@ -1093,14 +1093,14 @@ static inline int conn_install_mux_fe(struct connection *conn, void *ctx)
if (!mux_ops)
return -1;
}
return conn_install_mux(conn, mux_ops, ctx, bind_conf->frontend);
return conn_install_mux(conn, mux_ops, ctx, bind_conf->frontend, conn->owner);
}
/* installs the best mux for outgoing connection <conn> using the upper context
* <ctx>. If the mux protocol is forced, we use it to find the best mux. Returns
* < 0 on error.
*/
static inline int conn_install_mux_be(struct connection *conn, void *ctx)
static inline int conn_install_mux_be(struct connection *conn, void *ctx, struct session *sess)
{
struct server *srv = objt_server(conn->target);
struct proxy *prx = objt_proxy(conn->target);
@ -1134,7 +1134,7 @@ static inline int conn_install_mux_be(struct connection *conn, void *ctx)
if (!mux_ops)
return -1;
}
return conn_install_mux(conn, mux_ops, ctx, prx);
return conn_install_mux(conn, mux_ops, ctx, prx, sess);
}
#endif /* _PROTO_CONNECTION_H */

View File

@ -44,6 +44,7 @@ struct cs_info;
struct buffer;
struct proxy;
struct server;
struct session;
struct pipe;
enum sub_event_type {
@ -319,7 +320,7 @@ struct xprt_ops {
* layer is not ready yet.
*/
struct mux_ops {
int (*init)(struct connection *conn, struct proxy *prx); /* early initialization */
int (*init)(struct connection *conn, struct proxy *prx, struct session *sess); /* early initialization */
int (*wake)(struct connection *conn); /* mux-layer callback to report activity, mandatory */
size_t (*rcv_buf)(struct conn_stream *cs, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to get data */
size_t (*snd_buf)(struct conn_stream *cs, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to send data */
@ -328,7 +329,7 @@ struct mux_ops {
void (*shutr)(struct conn_stream *cs, enum cs_shr_mode); /* shutr function */
void (*shutw)(struct conn_stream *cs, enum cs_shw_mode); /* shutw function */
struct conn_stream *(*attach)(struct connection *); /* Create and attach a conn_stream to an outgoing connection */
struct conn_stream *(*attach)(struct connection *, struct session *sess); /* Create and attach a conn_stream to an outgoing connection */
const struct conn_stream *(*get_first_cs)(const struct connection *); /* retrieves any valid conn_stream from this connection */
void (*detach)(struct conn_stream *); /* Detach a conn_stream from an outgoing connection, when the request is done */
void (*show_fd)(struct buffer *, struct connection *); /* append some data about connection into chunk for "show fd" */

View File

@ -1074,7 +1074,7 @@ static int conn_complete_server(struct connection *conn)
cs = si_alloc_cs(&s->si[1], conn);
if (!cs)
goto fail;
if (conn_install_mux_be(conn, cs) < 0)
if (conn_install_mux_be(conn, cs, s->sess) < 0)
goto fail;
srv = objt_server(s->target);
if (srv && ((s->be->options & PR_O_REUSE_MASK) == PR_O_REUSE_ALWS) &&
@ -1275,7 +1275,7 @@ int connect_server(struct stream *s)
LIST_DEL(&srv_conn->list);
LIST_INIT(&srv_conn->list);
}
srv_cs = srv_conn->mux->attach(srv_conn);
srv_cs = srv_conn->mux->attach(srv_conn, s->sess);
if (srv_cs)
si_attach_cs(&s->si[1], srv_cs);
}
@ -1324,7 +1324,7 @@ int connect_server(struct stream *s)
conn_free(srv_conn);
return SF_ERR_RESOURCE;
}
if (conn_install_mux_be(srv_conn, srv_cs) < 0)
if (conn_install_mux_be(srv_conn, srv_cs, s->sess) < 0)
return SF_ERR_INTERNAL;
/* If we're doing http-reuse always, and the connection
* is an http2 connection, add it to the available list,

View File

@ -1604,7 +1604,7 @@ static int connect_conn_chk(struct task *t)
clear_addr(&conn->addr.from);
conn_prepare(conn, proto, check->xprt);
conn_install_mux(conn, &mux_pt_ops, cs, s->proxy);
conn_install_mux(conn, &mux_pt_ops, cs, s->proxy, NULL);
cs_attach(cs, check, &check_conn_cb);
/* only plain tcp-check supports quick ACK */
@ -2787,7 +2787,7 @@ static int tcpcheck_main(struct check *check)
}
conn_prepare(conn, proto, xprt);
conn_install_mux(conn, &mux_pt_ops, cs, s->proxy);
conn_install_mux(conn, &mux_pt_ops, cs, s->proxy, NULL);
cs_attach(cs, check, &check_conn_cb);
ret = SF_ERR_INTERNAL;

View File

@ -92,6 +92,7 @@ struct h1s {
struct wait_event *recv_wait; /* Address of the wait_event the conn_stream associated is waiting on */
struct wait_event *send_wait; /* Address of the wait_event the conn_stream associated is waiting on */
struct session *sess; /* Associated session */
struct h1m req;
struct h1m res;
@ -239,7 +240,7 @@ static struct conn_stream *h1s_new_cs(struct h1s *h1s)
return NULL;
}
static struct h1s *h1s_create(struct h1c *h1c, struct conn_stream *cs)
static struct h1s *h1s_create(struct h1c *h1c, struct conn_stream *cs, struct session *sess)
{
struct h1s *h1s;
@ -250,6 +251,8 @@ static struct h1s *h1s_create(struct h1c *h1c, struct conn_stream *cs)
h1s->h1c = h1c;
h1c->h1s = h1s;
h1s->sess = sess;
h1s->cs = NULL;
h1s->flags = H1S_F_NONE;
@ -291,7 +294,8 @@ static struct h1s *h1s_create(struct h1c *h1c, struct conn_stream *cs)
h1s->cs = cs;
}
else {
struct session *sess = h1c->conn->owner;
/* For frontend connections we should always have a session */
sess = h1c->conn->owner;
h1s->csinfo.create_date = sess->accept_date;
h1s->csinfo.tv_create = sess->tv_accept;
@ -345,7 +349,7 @@ static const struct cs_info *h1_get_cs_info(struct conn_stream *cs)
* points to the existing conn_stream (for outgoing connections) or NULL (for
* incoming ones). Returns < 0 on error.
*/
static int h1_init(struct connection *conn, struct proxy *proxy)
static int h1_init(struct connection *conn, struct proxy *proxy, struct session *sess)
{
struct h1c *h1c;
@ -372,7 +376,7 @@ static int h1_init(struct connection *conn, struct proxy *proxy)
h1c->flags |= H1C_F_CS_WAIT_CONN;
/* Always Create a new H1S */
if (!h1s_create(h1c, conn->mux_ctx))
if (!h1s_create(h1c, conn->mux_ctx, sess))
goto fail;
conn->mux_ctx = h1c;
@ -616,19 +620,19 @@ static void h1_set_cli_conn_mode(struct h1s *h1s, struct h1m *h1m)
static void h1_set_srv_conn_mode(struct h1s *h1s, struct h1m *h1m)
{
struct h1c *h1c = h1s->h1c;
struct session *sess = h1c->conn->owner;
struct proxy *fe = sess->fe;
struct session *sess = h1s->sess;
struct proxy *be = h1c->px;
int flag = H1S_F_WANT_KAL;
int fe_flags = sess ? sess->fe->options : 0;
/* Tunnel mode can only by set on the frontend */
if ((fe->options & PR_O_HTTP_MODE) == PR_O_HTTP_TUN)
if ((fe_flags & PR_O_HTTP_MODE) == PR_O_HTTP_TUN)
flag = H1S_F_WANT_TUN;
/* For the server connection: server-close == httpclose */
if ((fe->options & PR_O_HTTP_MODE) == PR_O_HTTP_SCL ||
if ((fe_flags & PR_O_HTTP_MODE) == PR_O_HTTP_SCL ||
(be->options & PR_O_HTTP_MODE) == PR_O_HTTP_SCL ||
(fe->options & PR_O_HTTP_MODE) == PR_O_HTTP_CLO ||
(fe_flags & PR_O_HTTP_MODE) == PR_O_HTTP_CLO ||
(be->options & PR_O_HTTP_MODE) == PR_O_HTTP_CLO)
flag = H1S_F_WANT_CLO;
@ -1789,7 +1793,7 @@ static int h1_process(struct h1c * h1c)
conn_xprt_read0_pending(conn))
goto release;
if (!conn_is_back(conn) && !(h1c->flags & (H1C_F_CS_SHUTW_NOW|H1C_F_CS_SHUTW))) {
if (!h1s_create(h1c, NULL))
if (!h1s_create(h1c, NULL, NULL))
goto release;
}
else
@ -1865,7 +1869,7 @@ static int h1_wake(struct connection *conn)
* Attach a new stream to a connection
* (Used for outgoing connections)
*/
static struct conn_stream *h1_attach(struct connection *conn)
static struct conn_stream *h1_attach(struct connection *conn, struct session *sess)
{
struct h1c *h1c = conn->mux_ctx;
struct conn_stream *cs = NULL;
@ -1878,7 +1882,7 @@ static struct conn_stream *h1_attach(struct connection *conn)
if (!cs)
goto end;
h1s = h1s_create(h1c, cs);
h1s = h1s_create(h1c, cs, sess);
if (h1s == NULL)
goto end;
@ -1917,6 +1921,7 @@ static void h1_detach(struct conn_stream *cs)
{
struct h1s *h1s = cs->ctx;
struct h1c *h1c;
struct session *sess;
int has_keepalive;
int is_not_first;
@ -1924,6 +1929,7 @@ static void h1_detach(struct conn_stream *cs)
if (!h1s)
return;
sess = h1s->sess;
h1c = h1s->h1c;
h1s->cs = NULL;
@ -1933,16 +1939,13 @@ static void h1_detach(struct conn_stream *cs)
if (conn_is_back(h1c->conn) && has_keepalive &&
!(h1c->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH))) {
struct stream_interface *si = cs->data;
struct stream *s = si_strm(si);
/* Never ever allow to reuse a connection from a non-reuse backend */
if ((h1c->px->options & PR_O_REUSE_MASK) == PR_O_REUSE_NEVR)
h1c->conn->flags |= CO_FL_PRIVATE;
if (!(h1c->conn->owner)) {
h1c->conn->owner = s->sess;
session_add_conn(s->sess, h1c->conn, s->target);
h1c->conn->owner = sess;
session_add_conn(sess, h1c->conn, h1c->conn->target);
}
/* we're in keep-alive with an idle connection, monitor it if not already done */
if (LIST_ISEMPTY(&h1c->conn->list)) {

View File

@ -175,6 +175,7 @@ enum h2_ss {
*/
struct h2s {
struct conn_stream *cs;
struct session *sess;
struct h2c *h2c;
struct h1m h1m; /* request or response parser state for H1 */
struct eb32_node by_id; /* place in h2c's streams_by_id */
@ -245,7 +246,7 @@ static inline struct h2s *h2c_st_by_id(struct h2c *h2c, int id);
static int h2s_decode_headers(struct h2s *h2s);
static int h2_frt_transfer_data(struct h2s *h2s);
static struct task *h2_deferred_shut(struct task *t, void *ctx, unsigned short state);
static struct h2s *h2c_bck_stream_new(struct h2c *h2c, struct conn_stream *cs);
static struct h2s *h2c_bck_stream_new(struct h2c *h2c, struct conn_stream *cs, struct session *sess);
/*****************************************************/
/* functions below are for dynamic buffer management */
@ -376,7 +377,7 @@ static int h2_max_streams(struct connection *conn)
* connections from the fact that the context is still NULL. Returns < 0 on
* error.
*/
static int h2_init(struct connection *conn, struct proxy *prx)
static int h2_init(struct connection *conn, struct proxy *prx, struct session *sess)
{
struct h2c *h2c;
struct task *t = NULL;
@ -457,7 +458,7 @@ static int h2_init(struct connection *conn, struct proxy *prx)
*/
struct h2s *h2s;
h2s = h2c_bck_stream_new(h2c, conn->mux_ctx);
h2s = h2c_bck_stream_new(h2c, conn->mux_ctx, sess);
if (!h2s)
goto fail_stream;
}
@ -830,7 +831,7 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id)
* and returns it, or NULL in case of memory allocation error or if the highest
* possible stream ID was reached.
*/
static struct h2s *h2c_bck_stream_new(struct h2c *h2c, struct conn_stream *cs)
static struct h2s *h2c_bck_stream_new(struct h2c *h2c, struct conn_stream *cs, struct session *sess)
{
struct h2s *h2s = NULL;
@ -843,6 +844,7 @@ static struct h2s *h2c_bck_stream_new(struct h2c *h2c, struct conn_stream *cs)
goto out;
h2s->cs = cs;
h2s->sess = sess;
cs->ctx = h2s;
h2c->nb_cs++;
@ -2747,7 +2749,7 @@ static struct task *h2_timeout_task(struct task *t, void *context, unsigned shor
* Attach a new stream to a connection
* (Used for outgoing connections)
*/
static struct conn_stream *h2_attach(struct connection *conn)
static struct conn_stream *h2_attach(struct connection *conn, struct session *sess)
{
struct conn_stream *cs;
struct h2s *h2s;
@ -2756,7 +2758,7 @@ static struct conn_stream *h2_attach(struct connection *conn)
cs = cs_new(conn);
if (!cs)
return NULL;
h2s = h2c_bck_stream_new(h2c, cs);
h2s = h2c_bck_stream_new(h2c, cs, sess);
if (!h2s) {
cs_free(cs);
return NULL;
@ -2803,11 +2805,13 @@ static void h2_detach(struct conn_stream *cs)
{
struct h2s *h2s = cs->ctx;
struct h2c *h2c;
struct session *sess;
cs->ctx = NULL;
if (!h2s)
return;
sess = h2s->sess;
h2c = h2s->h2c;
h2s->cs = NULL;
h2c->nb_cs--;
@ -2840,16 +2844,11 @@ static void h2_detach(struct conn_stream *cs)
if (h2c->flags & H2_CF_IS_BACK &&
(h2c->proxy->options2 & PR_O2_USE_HTX)) {
struct stream_interface *si;
struct stream *s;
si = cs->data;
s = si_strm(si);
if (!(h2c->conn->flags &
(CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH))) {
if (!h2c->conn->owner) {
h2c->conn->owner = s->sess;
session_add_conn(s->sess, h2c->conn, s->target);
h2c->conn->owner = sess;
session_add_conn(sess, h2c->conn, h2c->conn->target);
}
/* Never ever allow to reuse a connection from a non-reuse backend */
if ((h2c->proxy->options & PR_O_REUSE_MASK) == PR_O_REUSE_NEVR)

View File

@ -63,7 +63,7 @@ static struct task *mux_pt_io_cb(struct task *t, void *tctx, unsigned short stat
* incoming ones, in which case one will be allocated and a new stream will be
* instanciated). Returns < 0 on error.
*/
static int mux_pt_init(struct connection *conn, struct proxy *prx)
static int mux_pt_init(struct connection *conn, struct proxy *prx, struct session *sess)
{
struct conn_stream *cs = conn->mux_ctx;
struct mux_pt_ctx *ctx = pool_alloc(pool_head_pt_ctx);
@ -138,7 +138,7 @@ static int mux_pt_wake(struct connection *conn)
* Attach a new stream to a connection
* (Used for outgoing connections)
*/
static struct conn_stream *mux_pt_attach(struct connection *conn)
static struct conn_stream *mux_pt_attach(struct connection *conn, struct session *sess)
{
struct conn_stream *cs;
struct mux_pt_ctx *ctx = conn->mux_ctx;

View File

@ -2000,7 +2000,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
memcpy(&conn->addr.to, &peer->addr, sizeof(conn->addr.to));
conn_prepare(conn, peer->proto, peer->xprt);
conn_install_mux(conn, &mux_pt_ops, cs, s->be);
conn_install_mux(conn, &mux_pt_ops, cs, s->be, NULL);
si_attach_cs(&s->si[1], cs);
s->do_log = NULL;