mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2024-12-27 23:22:09 +00:00
MEDIUM: connection: start to introduce a mux layer between xprt and data
For HTTP/2 and QUIC, we'll need to deal with multiplexed streams inside a connection. After quite a long brainstorming, it appears that the connection interface to the existing streams is appropriate just like the connection interface to the lower layers. In fact we need to have the mux layer in the middle of the connection, between the transport and the data layer. A mux can exist on two directions/sides. On the inbound direction, it instanciates new streams from incoming connections, while on the outbound direction it muxes streams into outgoing connections. The difference is visible on the mux->init() call : in one case, an upper context is already known (outgoing connection), and in the other case, the upper context is not yet known (incoming connection) and will have to be allocated by the mux. The session doesn't have to create the new streams anymore, as this is performed by the mux itself. This patch introduces this and creates a pass-through mux called "mux_pt" which is used for all new connections and which only calls the data layer's recv,send,wake() calls. One incoming stream is immediately created when init() is called on the inbound direction. There should not be any visible impact. Note that the connection's mux is purposely not set until the session is completed so that we don't accidently run with the wrong mux. This must not cause any issue as the xprt_done_cb function is always called prior to using mux's recv/send functions.
This commit is contained in:
parent
d7bddda151
commit
53a4766e40
2
Makefile
2
Makefile
@ -856,7 +856,7 @@ OBJS = src/cfgparse.o src/proto_http.o src/stats.o src/server.o src/stream.o \
|
||||
src/raw_sock.o src/lb_chash.o src/lb_fwlc.o src/lb_fwrr.o \
|
||||
src/lb_fas.o src/applet.o src/hdr_idx.o src/ev_select.o src/hash.o \
|
||||
src/lb_map.o src/base64.o src/sha1.o src/protocol.o src/h1.o \
|
||||
src/action.o src/hathreads.o
|
||||
src/action.o src/hathreads.o src/mux_pt.o
|
||||
|
||||
EBTREE_OBJS = $(EBTREE_DIR)/ebtree.o \
|
||||
$(EBTREE_DIR)/eb32tree.o $(EBTREE_DIR)/eb64tree.o \
|
||||
|
@ -474,14 +474,17 @@ static inline int conn_xprt_read0_pending(struct connection *c)
|
||||
}
|
||||
|
||||
/* prepares a connection to work with protocol <proto> and transport <xprt>.
|
||||
* The transport's context is initialized as well.
|
||||
* The transport's is initialized as well, and the mux and its context are
|
||||
* cleared.
|
||||
*/
|
||||
static inline void conn_prepare(struct connection *conn, const struct protocol *proto, const struct xprt_ops *xprt)
|
||||
{
|
||||
conn->ctrl = proto;
|
||||
conn->xprt = xprt;
|
||||
conn->mux = NULL;
|
||||
conn->xprt_st = 0;
|
||||
conn->xprt_ctx = NULL;
|
||||
conn->mux_ctx = NULL;
|
||||
}
|
||||
|
||||
/* Initializes all required fields for a new connection. Note that it does the
|
||||
@ -495,6 +498,8 @@ static inline void conn_init(struct connection *conn)
|
||||
conn->flags = CO_FL_NONE;
|
||||
conn->data = NULL;
|
||||
conn->tmp_early_data = -1;
|
||||
conn->mux = NULL;
|
||||
conn->mux_ctx = NULL;
|
||||
conn->owner = NULL;
|
||||
conn->send_proxy_ofs = 0;
|
||||
conn->handle.fd = DEAD_FD_MAGIC;
|
||||
@ -540,6 +545,8 @@ static inline struct connection *conn_new()
|
||||
/* Releases a connection previously allocated by conn_new() */
|
||||
static inline void conn_free(struct connection *conn)
|
||||
{
|
||||
if (conn->mux && conn->mux->release)
|
||||
conn->mux->release(conn);
|
||||
pool_free2(pool2_connection, conn);
|
||||
}
|
||||
|
||||
@ -583,6 +590,16 @@ static inline void conn_attach(struct connection *conn, void *owner, const struc
|
||||
conn->owner = owner;
|
||||
}
|
||||
|
||||
/* Installs the connection's mux layer for upper context <ctx>.
|
||||
* Returns < 0 on error.
|
||||
*/
|
||||
static inline int conn_install_mux(struct connection *conn, const struct mux_ops *mux, void *ctx)
|
||||
{
|
||||
conn->mux = mux;
|
||||
conn->mux_ctx = ctx;
|
||||
return mux->init ? mux->init(conn) : 0;
|
||||
}
|
||||
|
||||
/* returns a human-readable error code for conn->err_code, or NULL if the code
|
||||
* is unknown.
|
||||
*/
|
||||
@ -648,6 +665,13 @@ static inline const char *conn_get_xprt_name(const struct connection *conn)
|
||||
return conn->xprt->name;
|
||||
}
|
||||
|
||||
static inline const char *conn_get_mux_name(const struct connection *conn)
|
||||
{
|
||||
if (!conn->mux)
|
||||
return "NONE";
|
||||
return conn->mux->name;
|
||||
}
|
||||
|
||||
static inline const char *conn_get_data_name(const struct connection *conn)
|
||||
{
|
||||
if (!conn->data)
|
||||
|
37
include/proto/mux_pt.h
Normal file
37
include/proto/mux_pt.h
Normal file
@ -0,0 +1,37 @@
|
||||
/*
|
||||
* include/proto/mux_pt.h
|
||||
* This file contains the pass-though mux function prototypes
|
||||
*
|
||||
* Copyright (C) 2017 Willy Tarreau - w@1wt.eu
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License as published by the Free Software Foundation, version 2.1
|
||||
* exclusively.
|
||||
*
|
||||
* This library is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public
|
||||
* License along with this library; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
||||
*/
|
||||
|
||||
#ifndef _PROTO_MUX_PT_H
|
||||
#define _PROTO_MUX_PT_H
|
||||
|
||||
#include <common/config.h>
|
||||
#include <types/connection.h>
|
||||
|
||||
extern const struct mux_ops mux_pt_ops;
|
||||
|
||||
#endif /* _PROTO_MUX_PT_H */
|
||||
|
||||
/*
|
||||
* Local variables:
|
||||
* c-indent-level: 8
|
||||
* c-basic-offset: 8
|
||||
* End:
|
||||
*/
|
@ -243,6 +243,22 @@ struct xprt_ops {
|
||||
char name[8]; /* transport layer name, zero-terminated */
|
||||
};
|
||||
|
||||
/* mux_ops describes the mux operations, which are to be performed at the
|
||||
* connection level after data are exchanged with the transport layer in order
|
||||
* to propagate them to streams. The <init> function will automatically be
|
||||
* called once the mux is instanciated by the connection's owner at the end
|
||||
* of a transport handshake, when it is about to transfer data and the data
|
||||
* layer is not ready yet.
|
||||
*/
|
||||
struct mux_ops {
|
||||
int (*init)(struct connection *conn); /* early initialization */
|
||||
void (*recv)(struct connection *conn); /* mux-layer recv callback */
|
||||
void (*send)(struct connection *conn); /* mux-layer send callback */
|
||||
int (*wake)(struct connection *conn); /* mux-layer callback to report activity, mandatory */
|
||||
void (*release)(struct connection *conn); /* release all resources allocated by the mux */
|
||||
char name[8]; /* mux layer name, zero-terminated */
|
||||
};
|
||||
|
||||
/* data_cb describes the data layer's recv and send callbacks which are called
|
||||
* when I/O activity was detected after the transport layer is ready. These
|
||||
* callbacks are supposed to make use of the xprt_ops above to exchange data
|
||||
@ -297,11 +313,13 @@ struct connection {
|
||||
unsigned int flags; /* CO_FL_* */
|
||||
const struct protocol *ctrl; /* operations at the socket layer */
|
||||
const struct xprt_ops *xprt; /* operations at the transport layer */
|
||||
const struct mux_ops *mux; /* mux layer opreations. Must be set before xprt->init() */
|
||||
const struct data_cb *data; /* data layer callbacks. Must be set before xprt->init() */
|
||||
void *xprt_ctx; /* general purpose pointer, initialized to NULL */
|
||||
int tmp_early_data; /* 1st byte of early data, if any */
|
||||
void *mux_ctx; /* mux-specific context, initialized to NULL */
|
||||
void *owner; /* pointer to upper layer's entity (eg: session, stream interface) */
|
||||
int xprt_st; /* transport layer state, initialized to zero */
|
||||
int tmp_early_data; /* 1st byte of early data, if any */
|
||||
union conn_handle handle; /* connection handle at the socket layer */
|
||||
enum obj_type *target; /* the target to connect to (server, proxy, applet, ...) */
|
||||
struct list list; /* attach point to various connection lists (idle, ...) */
|
||||
|
@ -41,6 +41,7 @@
|
||||
#include <proto/lb_fwrr.h>
|
||||
#include <proto/lb_map.h>
|
||||
#include <proto/log.h>
|
||||
#include <proto/mux_pt.h>
|
||||
#include <proto/obj_type.h>
|
||||
#include <proto/payload.h>
|
||||
#include <proto/protocol.h>
|
||||
@ -1159,12 +1160,14 @@ int connect_server(struct stream *s)
|
||||
/* set the correct protocol on the output stream interface */
|
||||
if (srv) {
|
||||
conn_prepare(srv_conn, protocol_by_family(srv_conn->addr.to.ss_family), srv->xprt);
|
||||
conn_install_mux(srv_conn, &mux_pt_ops, srv_conn);
|
||||
}
|
||||
else if (obj_type(s->target) == OBJ_TYPE_PROXY) {
|
||||
/* proxies exclusively run on raw_sock right now */
|
||||
conn_prepare(srv_conn, protocol_by_family(srv_conn->addr.to.ss_family), xprt_get(XPRT_RAW));
|
||||
if (!objt_conn(s->si[1].end) || !objt_conn(s->si[1].end)->ctrl)
|
||||
return SF_ERR_INTERNAL;
|
||||
conn_install_mux(srv_conn, &mux_pt_ops, srv_conn);
|
||||
}
|
||||
else
|
||||
return SF_ERR_INTERNAL; /* how did we get there ? */
|
||||
|
@ -46,6 +46,7 @@
|
||||
#include <proto/stats.h>
|
||||
#include <proto/fd.h>
|
||||
#include <proto/log.h>
|
||||
#include <proto/mux_pt.h>
|
||||
#include <proto/queue.h>
|
||||
#include <proto/port_range.h>
|
||||
#include <proto/proto_http.h>
|
||||
@ -1562,6 +1563,7 @@ static int connect_conn_chk(struct task *t)
|
||||
proto = protocol_by_family(conn->addr.to.ss_family);
|
||||
|
||||
conn_prepare(conn, proto, check->xprt);
|
||||
conn_install_mux(conn, &mux_pt_ops, conn);
|
||||
conn_attach(conn, check, &check_conn_cb);
|
||||
conn->target = &s->obj_type;
|
||||
|
||||
@ -2725,6 +2727,7 @@ static int tcpcheck_main(struct check *check)
|
||||
xprt = xprt_get(XPRT_RAW);
|
||||
}
|
||||
conn_prepare(conn, proto, xprt);
|
||||
conn_install_mux(conn, &mux_pt_ops, conn);
|
||||
|
||||
ret = SF_ERR_INTERNAL;
|
||||
if (proto->connect)
|
||||
|
@ -105,7 +105,7 @@ void conn_fd_handler(int fd)
|
||||
* both of which will be detected below.
|
||||
*/
|
||||
flags = 0;
|
||||
conn->data->send(conn);
|
||||
conn->mux->send(conn);
|
||||
}
|
||||
|
||||
/* The data transfer starts here and stops on error and handshakes. Note
|
||||
@ -119,7 +119,7 @@ void conn_fd_handler(int fd)
|
||||
* both of which will be detected below.
|
||||
*/
|
||||
flags = 0;
|
||||
conn->data->recv(conn);
|
||||
conn->mux->recv(conn);
|
||||
}
|
||||
|
||||
/* It may happen during the data phase that a handshake is
|
||||
@ -169,7 +169,7 @@ void conn_fd_handler(int fd)
|
||||
if ((((conn->flags ^ flags) & CO_FL_NOTIFY_DATA) ||
|
||||
((flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) != CO_FL_CONNECTED &&
|
||||
(conn->flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) == CO_FL_CONNECTED)) &&
|
||||
conn->data->wake(conn) < 0)
|
||||
conn->mux->wake(conn) < 0)
|
||||
return;
|
||||
|
||||
/* remove the events before leaving */
|
||||
|
61
src/mux_pt.c
Normal file
61
src/mux_pt.c
Normal file
@ -0,0 +1,61 @@
|
||||
/*
|
||||
* Pass-through mux-demux for connections
|
||||
*
|
||||
* Copyright 2017 Willy Tarreau <w@1wt.eu>
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version
|
||||
* 2 of the License, or (at your option) any later version.
|
||||
*
|
||||
*/
|
||||
|
||||
#include <common/config.h>
|
||||
#include <proto/connection.h>
|
||||
#include <proto/stream.h>
|
||||
|
||||
/* Initialize the mux once it's attached. If conn->mux_ctx is NULL, it is
|
||||
* assumed that no data layer has yet been instanciated so the mux is
|
||||
* attached to an incoming connection and will instanciate a new stream. If
|
||||
* conn->mux_ctx exists, it is assumed that it is an outgoing connection
|
||||
* requested for this context. Returns < 0 on error.
|
||||
*/
|
||||
static int mux_pt_init(struct connection *conn)
|
||||
{
|
||||
if (!conn->mux_ctx)
|
||||
return stream_create_from_conn(conn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* callback to be used by default for the pass-through mux. It calls the data
|
||||
* layer wake() callback if it is set otherwise returns 0.
|
||||
*/
|
||||
static int mux_pt_wake(struct connection *conn)
|
||||
{
|
||||
return conn->data->wake ? conn->data->wake(conn) : 0;
|
||||
}
|
||||
|
||||
/* callback to be used by default for the pass-through mux. It simply calls the
|
||||
* data layer recv() callback much must be set.
|
||||
*/
|
||||
static void mux_pt_recv(struct connection *conn)
|
||||
{
|
||||
conn->data->recv(conn);
|
||||
}
|
||||
|
||||
/* callback to be used by default for the pass-through mux. It simply calls the
|
||||
* data layer send() callback which must be set.
|
||||
*/
|
||||
static void mux_pt_send(struct connection *conn)
|
||||
{
|
||||
conn->data->send(conn);
|
||||
}
|
||||
|
||||
/* The mux operations */
|
||||
const struct mux_ops mux_pt_ops = {
|
||||
.init = mux_pt_init,
|
||||
.recv = mux_pt_recv,
|
||||
.send = mux_pt_send,
|
||||
.wake = mux_pt_wake,
|
||||
.name = "PASS",
|
||||
};
|
@ -38,6 +38,7 @@
|
||||
#include <proto/frontend.h>
|
||||
#include <proto/log.h>
|
||||
#include <proto/hdr_idx.h>
|
||||
#include <proto/mux_pt.h>
|
||||
#include <proto/proto_tcp.h>
|
||||
#include <proto/proto_http.h>
|
||||
#include <proto/proxy.h>
|
||||
@ -1912,6 +1913,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
|
||||
goto out_free_strm;
|
||||
|
||||
conn_prepare(conn, peer->proto, peer->xprt);
|
||||
conn_install_mux(conn, &mux_pt_ops, conn);
|
||||
si_attach_conn(&s->si[1], conn);
|
||||
|
||||
conn->target = s->target = &s->be->obj_type;
|
||||
|
@ -21,6 +21,7 @@
|
||||
#include <proto/connection.h>
|
||||
#include <proto/listener.h>
|
||||
#include <proto/log.h>
|
||||
#include <proto/mux_pt.h>
|
||||
#include <proto/proto_http.h>
|
||||
#include <proto/proxy.h>
|
||||
#include <proto/session.h>
|
||||
@ -406,7 +407,7 @@ static int conn_complete_session(struct connection *conn)
|
||||
goto fail;
|
||||
|
||||
session_count_new(sess);
|
||||
if (stream_create_from_conn(conn) < 0)
|
||||
if (conn_install_mux(conn, &mux_pt_ops, NULL) < 0)
|
||||
goto fail;
|
||||
|
||||
/* the embryonic session's task is not needed anymore */
|
||||
|
@ -2871,10 +2871,11 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
|
||||
|
||||
if ((conn = objt_conn(strm->si[0].end)) != NULL) {
|
||||
chunk_appendf(&trash,
|
||||
" co0=%p ctrl=%s xprt=%s data=%s target=%s:%p\n",
|
||||
" co0=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n",
|
||||
conn,
|
||||
conn_get_ctrl_name(conn),
|
||||
conn_get_xprt_name(conn),
|
||||
conn_get_mux_name(conn),
|
||||
conn_get_data_name(conn),
|
||||
obj_type_name(conn->target),
|
||||
obj_base_ptr(conn->target));
|
||||
@ -2899,10 +2900,11 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
|
||||
|
||||
if ((conn = objt_conn(strm->si[1].end)) != NULL) {
|
||||
chunk_appendf(&trash,
|
||||
" co1=%p ctrl=%s xprt=%s data=%s target=%s:%p\n",
|
||||
" co1=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n",
|
||||
conn,
|
||||
conn_get_ctrl_name(conn),
|
||||
conn_get_xprt_name(conn),
|
||||
conn_get_mux_name(conn),
|
||||
conn_get_data_name(conn),
|
||||
obj_type_name(conn->target),
|
||||
obj_base_ptr(conn->target));
|
||||
|
Loading…
Reference in New Issue
Block a user