diff --git a/include/haproxy/connection.h b/include/haproxy/connection.h index a0af6334c..92b85afde 100644 --- a/include/haproxy/connection.h +++ b/include/haproxy/connection.h @@ -703,15 +703,30 @@ static inline int conn_is_reverse(const struct connection *conn) return !!(conn->reverse.target); } +/* Returns true if connection must be actively reversed or waiting to be accepted. */ +static inline int conn_reverse_in_preconnect(const struct connection *conn) +{ + return conn_is_back(conn) ? !!(conn->reverse.target) : + !!(conn->flags & CO_FL_REVERSED); +} + /* Initialize as a reverse connection to . */ static inline void conn_set_reverse(struct connection *conn, enum obj_type *target) { /* Ensure the correct target type is used depending on the connection side before reverse. */ - BUG_ON(!conn_is_back(conn) && !objt_server(target)); + BUG_ON((!conn_is_back(conn) && !objt_server(target)) || + (conn_is_back(conn) && !objt_listener(target))); conn->reverse.target = target; } +/* Returns the listener instance for connection used for active reverse. */ +static inline struct listener *conn_active_reverse_listener(const struct connection *conn) +{ + return conn_is_back(conn) ? __objt_listener(conn->reverse.target) : + __objt_listener(conn->target); +} + #endif /* _HAPROXY_CONNECTION_H */ /* diff --git a/include/haproxy/proto_reverse_connect.h b/include/haproxy/proto_reverse_connect.h index 2fdf4b1d4..e40a0217a 100644 --- a/include/haproxy/proto_reverse_connect.h +++ b/include/haproxy/proto_reverse_connect.h @@ -1,13 +1,18 @@ #ifndef _HAPROXY_PROTO_REVERSE_CONNECT_H #define _HAPROXY_PROTO_REVERSE_CONNECT_H +#include #include #include int rev_bind_receiver(struct receiver *rx, char **errmsg); int rev_bind_listener(struct listener *listener, char *errmsg, int errlen); +void rev_enable_listener(struct listener *l); +void rev_disable_listener(struct listener *l); +struct connection *rev_accept_conn(struct listener *l, int *status); void rev_unbind_receiver(struct listener *l); +int rev_set_affinity(struct connection *conn, int new_tid); int rev_accepting_conn(const struct receiver *rx); diff --git a/include/haproxy/receiver-t.h b/include/haproxy/receiver-t.h index 87dc61e34..6b1c2c616 100644 --- a/include/haproxy/receiver-t.h +++ b/include/haproxy/receiver-t.h @@ -80,7 +80,9 @@ struct receiver { struct mt_list rxbuf_list; /* list of buffers to receive and dispatch QUIC datagrams. */ #endif struct { + struct task *task; /* Task used to open connection for reverse. */ struct server *srv; /* Underlying server used to initiate reverse pre-connect. */ + struct connection *pend_conn; /* Pending connection waiting to complete reversal before being accepted. */ } reverse_connect; /* warning: this struct is huge, keep it at the bottom */ diff --git a/src/connection.c b/src/connection.c index ae4ee386d..5ebc1d90a 100644 --- a/src/connection.c +++ b/src/connection.c @@ -535,6 +535,16 @@ void conn_free(struct connection *conn) ha_free(&conn->reverse.name.area); + if (conn_reverse_in_preconnect(conn)) { + struct listener *l = conn_active_reverse_listener(conn); + + /* For the moment reverse connection are bound only on first thread. */ + BUG_ON(tid != 0); + /* Receiver must reference a reverse connection as pending. */ + BUG_ON(!l->rx.reverse_connect.pend_conn); + l->rx.reverse_connect.pend_conn = NULL; + } + conn_force_unsubscribe(conn); pool_free(pool_head_connection, conn); } @@ -2492,10 +2502,14 @@ int conn_reverse(struct connection *conn) conn_set_owner(conn, NULL, NULL); } else { + /* Wake up receiver to proceed to connection accept. */ + struct listener *l = __objt_listener(conn->reverse.target); + conn_backend_deinit(conn); - conn->target = conn->reverse.target; + conn->target = &l->obj_type; conn->flags |= CO_FL_REVERSED; + task_wakeup(l->rx.reverse_connect.task, TASK_WOKEN_ANY); } /* Invert source and destination addresses if already set. */ diff --git a/src/listener.c b/src/listener.c index 14c2b05cb..434291476 100644 --- a/src/listener.c +++ b/src/listener.c @@ -288,13 +288,13 @@ void listener_set_state(struct listener *l, enum li_state st) _HA_ATOMIC_INC(&px->li_paused); break; case LI_LISTEN: - BUG_ON(l->rx.fd == -1); + BUG_ON(l->rx.fd == -1 && !l->rx.reverse_connect.task); _HA_ATOMIC_INC(&px->li_bound); break; case LI_READY: case LI_FULL: case LI_LIMITED: - BUG_ON(l->rx.fd == -1); + BUG_ON(l->rx.fd == -1 && !l->rx.reverse_connect.task); _HA_ATOMIC_INC(&px->li_ready); l->flags |= LI_F_FINALIZED; break; @@ -321,7 +321,7 @@ void enable_listener(struct listener *listener) do_unbind_listener(listener); if (listener->state == LI_LISTEN) { - BUG_ON(listener->rx.fd == -1); + BUG_ON(listener->rx.fd == -1 && !listener->rx.reverse_connect.task); if ((global.mode & (MODE_DAEMON | MODE_MWORKER)) && (!!master != !!(listener->rx.flags & RX_F_MWORKER))) { /* we don't want to enable this listener and don't @@ -799,7 +799,9 @@ int create_listeners(struct bind_conf *bc, const struct sockaddr_storage *ss, l->rx.iocb = proto->default_iocb; l->rx.fd = fd; + l->rx.reverse_connect.task = NULL; l->rx.reverse_connect.srv = NULL; + l->rx.reverse_connect.pend_conn = NULL; memcpy(&l->rx.addr, ss, sizeof(*ss)); if (proto->fam->set_port) diff --git a/src/proto_reverse_connect.c b/src/proto_reverse_connect.c index d081482d4..1bbc70957 100644 --- a/src/proto_reverse_connect.c +++ b/src/proto_reverse_connect.c @@ -2,12 +2,16 @@ #include #include +#include #include #include #include +#include #include #include #include +#include +#include #include @@ -23,9 +27,13 @@ struct protocol proto_reverse_connect = { /* connection layer */ .listen = rev_bind_listener, - .unbind = rev_unbind_receiver, + .enable = rev_enable_listener, + .disable = rev_disable_listener, .add = default_add_listener, + .unbind = rev_unbind_receiver, .resume = default_resume_listener, + .accept_conn = rev_accept_conn, + .set_affinity = rev_set_affinity, /* address family */ .fam = &proto_fam_reverse_connect, @@ -38,6 +46,89 @@ struct protocol proto_reverse_connect = { .receivers = LIST_HEAD_INIT(proto_reverse_connect.receivers), }; +static struct connection *new_reverse_conn(struct listener *l, struct server *srv) +{ + struct connection *conn = conn_new(srv); + if (!conn) + goto err; + + conn_set_reverse(conn, &l->obj_type); + + /* These options is incompatible with a reverse connection. */ + BUG_ON(srv->conn_src.opts & CO_SRC_BIND); + BUG_ON(srv->proxy->conn_src.opts & CO_SRC_BIND); + + sockaddr_alloc(&conn->dst, 0, 0); + if (!conn->dst) + goto err; + *conn->dst = srv->addr; + + if (conn_prepare(conn, protocol_lookup(conn->dst->ss_family, PROTO_TYPE_STREAM, 0), srv->xprt)) + goto err; + + /* TODO simplification of tcp_connect_server() */ + conn->handle.fd = sock_create_server_socket(conn); + if (fd_set_nonblock(conn->handle.fd) == -1) + goto err; + + if (conn->ctrl->connect(conn, 0) != SF_ERR_NONE) + goto err; + + if (conn_xprt_start(conn) < 0) + goto err; + + if (!srv->use_ssl || + (!srv->ssl_ctx.alpn_str && !srv->ssl_ctx.npn_str) || + srv->mux_proto) { + if (conn_install_mux_be(conn, NULL, NULL, NULL) < 0) + goto err; + } + + /* Not expected here. */ + BUG_ON((conn->flags & CO_FL_HANDSHAKE)); + return conn; + + err: + if (conn) { + /* Mark connection as non-reversable. This prevents conn_free() + * to reschedule reverse_connect task on freeing a preconnect + * connection. + */ + conn->reverse.target = NULL; + conn_free(conn); + } + + return NULL; +} + +struct task *rev_process(struct task *task, void *ctx, unsigned int state) +{ + struct listener *l = ctx; + struct connection *conn = l->rx.reverse_connect.pend_conn; + + if (conn) { + /* Spurrious receiver task wake up when pend_conn is not ready/on error. */ + BUG_ON(!(conn->flags & CO_FL_REVERSED)); + /* A connection is ready to be accepted. */ + listener_accept(l); + l->rx.reverse_connect.task->expire = TICK_ETERNITY; + } + else { + /* No pending reverse connection, prepare a new one. Store it in the + * listener and return NULL. Connection will be returned later after + * reversal is completed. + */ + conn = new_reverse_conn(l, l->rx.reverse_connect.srv); + l->rx.reverse_connect.pend_conn = conn; + + /* On success task will be woken up by H2 mux after reversal. */ + l->rx.reverse_connect.task->expire = conn ? TICK_ETERNITY : + MS_TO_TICKS(now_ms + 1000); + } + + return task; +} + int rev_bind_receiver(struct receiver *rx, char **errmsg) { rx->flags |= RX_F_BOUND; @@ -46,11 +137,21 @@ int rev_bind_receiver(struct receiver *rx, char **errmsg) int rev_bind_listener(struct listener *listener, char *errmsg, int errlen) { + struct task *task; struct proxy *be; struct server *srv; struct ist be_name, sv_name; char *name = NULL; + /* TODO for the moment reverse conn creation is pinned to the first thread only. */ + if (!(task = task_new_here())) { + snprintf(errmsg, errlen, "Out of memory."); + goto err; + } + task->process = rev_process; + task->context = listener; + listener->rx.reverse_connect.task = task; + name = strdup(listener->bind_conf->reverse_srvname); if (!name) { snprintf(errmsg, errlen, "Out of memory."); @@ -88,6 +189,8 @@ int rev_bind_listener(struct listener *listener, char *errmsg, int errlen) ha_free(&name); listener->rx.reverse_connect.srv = srv; + listener_set_state(listener, LI_LISTEN); + task_wakeup(listener->rx.reverse_connect.task, TASK_WOKEN_ANY); return ERR_NONE; @@ -96,11 +199,55 @@ int rev_bind_listener(struct listener *listener, char *errmsg, int errlen) return ERR_ALERT | ERR_FATAL; } +void rev_enable_listener(struct listener *l) +{ + task_wakeup(l->rx.reverse_connect.task, TASK_WOKEN_ANY); +} + +void rev_disable_listener(struct listener *l) +{ +} + +struct connection *rev_accept_conn(struct listener *l, int *status) +{ + struct connection *conn = l->rx.reverse_connect.pend_conn; + + if (!conn) { + /* Instantiate a new conn if maxconn not yet exceeded. */ + if (l->bind_conf->maxconn && l->nbconn <= l->bind_conf->maxconn) { + l->rx.reverse_connect.pend_conn = new_reverse_conn(l, l->rx.reverse_connect.srv); + if (!l->rx.reverse_connect.pend_conn) { + *status = CO_AC_PAUSE; + return NULL; + } + } + + *status = CO_AC_DONE; + return NULL; + } + + /* listener_accept() must not be called if no pending connection is not yet reversed. */ + BUG_ON(!(conn->flags & CO_FL_REVERSED)); + conn->flags &= ~CO_FL_REVERSED; + l->rx.reverse_connect.pend_conn = NULL; + *status = CO_AC_NONE; + + return conn; +} + void rev_unbind_receiver(struct listener *l) { l->rx.flags &= ~RX_F_BOUND; } +int rev_set_affinity(struct connection *conn, int new_tid) +{ + /* TODO reversal conn rebinding after is disabled for the moment as we + * did not test possible race conditions. + */ + return -1; +} + int rev_accepting_conn(const struct receiver *rx) { return 1; diff --git a/src/proxy.c b/src/proxy.c index 89e673d94..ae8049dbb 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -321,6 +321,7 @@ void free_proxy(struct proxy *p) free(l->name); free(l->per_thr); free(l->counters); + task_destroy(l->rx.reverse_connect.task); EXTRA_COUNTERS_FREE(l->extra_counters); free(l);