diff --git a/include/proto/backend.h b/include/proto/backend.h index 69ee31c78..54ae057fd 100644 --- a/include/proto/backend.h +++ b/include/proto/backend.h @@ -31,7 +31,7 @@ #include int assign_server(struct stream *s); -int assign_server_address(struct stream *s); +int assign_server_address(struct stream *s, struct connection *srv_conn); int assign_server_and_queue(struct stream *s); int connect_server(struct stream *s); int srv_redispatch_connect(struct stream *t); diff --git a/include/proto/connection.h b/include/proto/connection.h index 611e6ad2d..11d4aa47d 100644 --- a/include/proto/connection.h +++ b/include/proto/connection.h @@ -672,6 +672,16 @@ static inline void conn_free(struct connection *conn) LIST_DEL(&sess->conn_list); LIST_INIT(&sess->conn_list); } + /* If we temporarily stored the connection as the stream_interface's + * end point, remove it. + */ + if (conn->mux_ctx != NULL && conn->mux == NULL) { + struct stream *s = conn->mux_ctx; + + if (objt_conn(s->si[1].end) == conn) + s->si[1].end = NULL; + } + conn_force_unsubscribe(conn); LIST_DEL(&conn->list); LIST_INIT(&conn->list); @@ -1034,6 +1044,9 @@ static inline int conn_install_mux_be(struct connection *conn, void *ctx) if (srv && srv->mux_proto) mux_ops = srv->mux_proto->mux; else { + struct ist mux_proto; + const char *alpn_str = NULL; + int alpn_len = 0; int mode; if (prx->mode == PR_MODE_TCP) @@ -1043,7 +1056,10 @@ static inline int conn_install_mux_be(struct connection *conn, void *ctx) else mode = PROTO_MODE_HTTP; - mux_ops = conn_get_best_mux(conn, ist(NULL), PROTO_SIDE_BE, mode); + conn_get_alpn(conn, &alpn_str, &alpn_len); + mux_proto = ist2(alpn_str, alpn_len); + + mux_ops = conn_get_best_mux(conn, mux_proto, PROTO_SIDE_BE, mode); if (!mux_ops) return -1; } diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index 3d2a6db6e..5568fdb68 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -460,10 +460,8 @@ static inline void si_chk_snd(struct stream_interface *si) } /* Calls chk_snd on the connection using the ctrl layer */ -static inline int si_connect(struct stream_interface *si) +static inline int si_connect(struct stream_interface *si, struct connection *conn) { - struct conn_stream *cs = objt_cs(si->end); - struct connection *conn = cs_conn(cs); int ret = SF_ERR_NONE; if (unlikely(!conn || !conn->ctrl || !conn->ctrl->connect)) diff --git a/src/backend.c b/src/backend.c index 0b45d8591..74de27ad3 100644 --- a/src/backend.c +++ b/src/backend.c @@ -783,10 +783,9 @@ int assign_server(struct stream *s) * to si->end. * */ -int assign_server_address(struct stream *s) +int assign_server_address(struct stream *s, struct connection *srv_conn) { struct connection *cli_conn = objt_conn(strm_orig(s)); - struct connection *srv_conn = cs_conn(objt_cs(s->si[1].end)); DPRINTF(stderr,"assign_server_address : s=%p\n",s); @@ -1036,6 +1035,41 @@ static void assign_tproxy_address(struct stream *s) #endif } +/* + * Pick the right mux once the connection is established, we should now have + * an alpn if available, so we are now able to choose. + */ +static int conn_complete_server(struct connection *conn) +{ + struct conn_stream *cs = NULL; + struct stream *s = conn->mux_ctx; + + conn_clear_xprt_done_cb(conn); + /* Verify if the connection just established. */ + if (unlikely(!(conn->flags & (CO_FL_WAIT_L4_CONN | CO_FL_WAIT_L6_CONN | CO_FL_CONNECTED)))) + conn->flags |= CO_FL_CONNECTED; + + if (!s) + goto fail; + if (conn->flags & CO_FL_ERROR) + goto fail; + cs = si_alloc_cs(&s->si[1], conn); + if (!cs) + goto fail; + if (conn_install_mux_be(conn, cs) < 0) + goto fail; + return 0; + +fail: + if (cs) + cs_free(cs); + /* kill the connection now */ + conn_stop_tracking(conn); + conn_full_close(conn); + conn_free(conn); + return -1; +} + /* * This function initiates a connection to the server assigned to this stream @@ -1179,8 +1213,8 @@ int connect_server(struct stream *s) } if (!reuse) { - srv_cs = si_alloc_cs(&s->si[1], NULL); - srv_conn = cs_conn(srv_cs); + srv_conn = conn_new(); + srv_cs = NULL; } else { if (srv_conn->mux->avail_streams(srv_conn) == 1) { /* No more streams available, remove it from the list */ @@ -1198,11 +1232,11 @@ int connect_server(struct stream *s) LIST_ADDQ(&srv_conn->session_list, &s->sess->conn_list); } - if (!srv_cs) + if (!srv_conn) return SF_ERR_RESOURCE; if (!(s->flags & SF_ADDR_SET)) { - err = assign_server_address(s); + err = assign_server_address(s, srv_conn); if (err != SRV_STATUS_OK) return SF_ERR_INTERNAL; } @@ -1223,8 +1257,33 @@ int connect_server(struct stream *s) else return SF_ERR_INTERNAL; /* how did we get there ? */ - if (conn_install_mux_be(srv_conn, srv_cs) < 0) - return SF_ERR_INTERNAL; +#ifdef USE_OPENSSL + if ((!(srv->ssl_ctx.alpn_str) && !(srv->ssl_ctx.npn_str)) || + srv->mux_proto) +#endif + { + srv_cs = si_alloc_cs(&s->si[1], srv_conn); + if (!srv_cs) { + conn_free(srv_conn); + return SF_ERR_RESOURCE; + } + if (conn_install_mux_be(srv_conn, srv_cs) < 0) + return SF_ERR_INTERNAL; + } +#ifdef USE_OPENSSL + else { + srv_conn->mux_ctx = s; + /* Store the connection into the stream interface, + * while we still don't have a mux, so that if the + * stream is destroyed before the connection is + * established, and a mux is set, we don't attempt + * to access the stream + */ + conn_set_xprt_done_cb(srv_conn, conn_complete_server); + } + +#endif + /* process the case where the server requires the PROXY protocol to be sent */ srv_conn->send_proxy_ofs = 0; @@ -1250,7 +1309,7 @@ int connect_server(struct stream *s) if (s->be->options & PR_O_TCP_NOLING) s->si[1].flags |= SI_FL_NOLINGER; - err = si_connect(&s->si[1]); + err = si_connect(&s->si[1], srv_conn); #ifdef USE_OPENSSL if (!reuse && cli_conn && srv && diff --git a/src/session.c b/src/session.c index 7d21a6a6c..8d9e83609 100644 --- a/src/session.c +++ b/src/session.c @@ -80,6 +80,14 @@ void session_free(struct session *sess) conn = sess->srv_conn; if (conn != NULL && conn->mux) conn->mux->destroy(conn); + else if (conn) { + /* We have a connection, but not yet an associated mux. + * So destroy it now. + */ + conn_stop_tracking(conn); + conn_full_close(conn); + conn_free(conn); + } pool_free(pool_head_session, sess); HA_ATOMIC_SUB(&jobs, 1); } diff --git a/src/stream.c b/src/stream.c index e73d9a163..e464230f3 100644 --- a/src/stream.c +++ b/src/stream.c @@ -638,10 +638,8 @@ static int sess_update_st_con_tcp(struct stream *s) si->exp = TICK_ETERNITY; si->state = SI_ST_CER; - /* XXX cognet: do we really want to kill the connection here ? - * Probably not for multiple streams. - */ - cs_close(srv_cs); + if (srv_cs) + cs_close(srv_cs); if (si->err_type) return 0;