From 47f502df5ec54b6f9c3ebaaab6e72fb09e218874 Mon Sep 17 00:00:00 2001 From: Amaury Denoyelle Date: Wed, 23 Aug 2023 17:16:07 +0200 Subject: [PATCH] MEDIUM: proto_reverse_connect: bootstrap active reverse connection Implement active reverse connection initialization. This is done through a new task stored in the receiver structure. This task is instantiated via bind callback and first woken up via enable callback. Task handler is separated into two halves. On the first step, a new connection is allocated and stored in member of the receiver. This new client connection will proceed to connect using the server instance referenced in the bind_conf. When connect has successfully been executed and HTTP/2 connection is ready for exchange after SETTINGS, reverse_connect task is woken up. As is still set, the second halve is executed which only execute listener_accept(). This will in turn execute accept_conn callback which is defined to return the pending connection. The task is automatically requeued inside accept_conn callback if bind maxconn is not yet reached. This allows to specify how many connection should be opened. Each connection is instantiated and reversed serially one by one until maxconn is reached. conn_free() has been modified to handle failure if a reverse connection fails before being accepted. In this case, no session exists to notify about the failure. Instead, reverse_connect task is requeud with a 1 second delay, giving time to fix a possible network issue. This will allow to attempt a new connection reverse. Note that for the moment connection rebinding after accept is disabled for simplicity. Extra operations are required to migrate an existing connection and its stack to a new thread which will be implemented later. --- include/haproxy/connection.h | 17 ++- include/haproxy/proto_reverse_connect.h | 5 + include/haproxy/receiver-t.h | 2 + src/connection.c | 16 ++- src/listener.c | 8 +- src/proto_reverse_connect.c | 149 +++++++++++++++++++++++- src/proxy.c | 1 + 7 files changed, 192 insertions(+), 6 deletions(-) diff --git a/include/haproxy/connection.h b/include/haproxy/connection.h index a0af6334c..92b85afde 100644 --- a/include/haproxy/connection.h +++ b/include/haproxy/connection.h @@ -703,15 +703,30 @@ static inline int conn_is_reverse(const struct connection *conn) return !!(conn->reverse.target); } +/* Returns true if connection must be actively reversed or waiting to be accepted. */ +static inline int conn_reverse_in_preconnect(const struct connection *conn) +{ + return conn_is_back(conn) ? !!(conn->reverse.target) : + !!(conn->flags & CO_FL_REVERSED); +} + /* Initialize as a reverse connection to . */ static inline void conn_set_reverse(struct connection *conn, enum obj_type *target) { /* Ensure the correct target type is used depending on the connection side before reverse. */ - BUG_ON(!conn_is_back(conn) && !objt_server(target)); + BUG_ON((!conn_is_back(conn) && !objt_server(target)) || + (conn_is_back(conn) && !objt_listener(target))); conn->reverse.target = target; } +/* Returns the listener instance for connection used for active reverse. */ +static inline struct listener *conn_active_reverse_listener(const struct connection *conn) +{ + return conn_is_back(conn) ? __objt_listener(conn->reverse.target) : + __objt_listener(conn->target); +} + #endif /* _HAPROXY_CONNECTION_H */ /* diff --git a/include/haproxy/proto_reverse_connect.h b/include/haproxy/proto_reverse_connect.h index 2fdf4b1d4..e40a0217a 100644 --- a/include/haproxy/proto_reverse_connect.h +++ b/include/haproxy/proto_reverse_connect.h @@ -1,13 +1,18 @@ #ifndef _HAPROXY_PROTO_REVERSE_CONNECT_H #define _HAPROXY_PROTO_REVERSE_CONNECT_H +#include #include #include int rev_bind_receiver(struct receiver *rx, char **errmsg); int rev_bind_listener(struct listener *listener, char *errmsg, int errlen); +void rev_enable_listener(struct listener *l); +void rev_disable_listener(struct listener *l); +struct connection *rev_accept_conn(struct listener *l, int *status); void rev_unbind_receiver(struct listener *l); +int rev_set_affinity(struct connection *conn, int new_tid); int rev_accepting_conn(const struct receiver *rx); diff --git a/include/haproxy/receiver-t.h b/include/haproxy/receiver-t.h index 87dc61e34..6b1c2c616 100644 --- a/include/haproxy/receiver-t.h +++ b/include/haproxy/receiver-t.h @@ -80,7 +80,9 @@ struct receiver { struct mt_list rxbuf_list; /* list of buffers to receive and dispatch QUIC datagrams. */ #endif struct { + struct task *task; /* Task used to open connection for reverse. */ struct server *srv; /* Underlying server used to initiate reverse pre-connect. */ + struct connection *pend_conn; /* Pending connection waiting to complete reversal before being accepted. */ } reverse_connect; /* warning: this struct is huge, keep it at the bottom */ diff --git a/src/connection.c b/src/connection.c index ae4ee386d..5ebc1d90a 100644 --- a/src/connection.c +++ b/src/connection.c @@ -535,6 +535,16 @@ void conn_free(struct connection *conn) ha_free(&conn->reverse.name.area); + if (conn_reverse_in_preconnect(conn)) { + struct listener *l = conn_active_reverse_listener(conn); + + /* For the moment reverse connection are bound only on first thread. */ + BUG_ON(tid != 0); + /* Receiver must reference a reverse connection as pending. */ + BUG_ON(!l->rx.reverse_connect.pend_conn); + l->rx.reverse_connect.pend_conn = NULL; + } + conn_force_unsubscribe(conn); pool_free(pool_head_connection, conn); } @@ -2492,10 +2502,14 @@ int conn_reverse(struct connection *conn) conn_set_owner(conn, NULL, NULL); } else { + /* Wake up receiver to proceed to connection accept. */ + struct listener *l = __objt_listener(conn->reverse.target); + conn_backend_deinit(conn); - conn->target = conn->reverse.target; + conn->target = &l->obj_type; conn->flags |= CO_FL_REVERSED; + task_wakeup(l->rx.reverse_connect.task, TASK_WOKEN_ANY); } /* Invert source and destination addresses if already set. */ diff --git a/src/listener.c b/src/listener.c index 14c2b05cb..434291476 100644 --- a/src/listener.c +++ b/src/listener.c @@ -288,13 +288,13 @@ void listener_set_state(struct listener *l, enum li_state st) _HA_ATOMIC_INC(&px->li_paused); break; case LI_LISTEN: - BUG_ON(l->rx.fd == -1); + BUG_ON(l->rx.fd == -1 && !l->rx.reverse_connect.task); _HA_ATOMIC_INC(&px->li_bound); break; case LI_READY: case LI_FULL: case LI_LIMITED: - BUG_ON(l->rx.fd == -1); + BUG_ON(l->rx.fd == -1 && !l->rx.reverse_connect.task); _HA_ATOMIC_INC(&px->li_ready); l->flags |= LI_F_FINALIZED; break; @@ -321,7 +321,7 @@ void enable_listener(struct listener *listener) do_unbind_listener(listener); if (listener->state == LI_LISTEN) { - BUG_ON(listener->rx.fd == -1); + BUG_ON(listener->rx.fd == -1 && !listener->rx.reverse_connect.task); if ((global.mode & (MODE_DAEMON | MODE_MWORKER)) && (!!master != !!(listener->rx.flags & RX_F_MWORKER))) { /* we don't want to enable this listener and don't @@ -799,7 +799,9 @@ int create_listeners(struct bind_conf *bc, const struct sockaddr_storage *ss, l->rx.iocb = proto->default_iocb; l->rx.fd = fd; + l->rx.reverse_connect.task = NULL; l->rx.reverse_connect.srv = NULL; + l->rx.reverse_connect.pend_conn = NULL; memcpy(&l->rx.addr, ss, sizeof(*ss)); if (proto->fam->set_port) diff --git a/src/proto_reverse_connect.c b/src/proto_reverse_connect.c index d081482d4..1bbc70957 100644 --- a/src/proto_reverse_connect.c +++ b/src/proto_reverse_connect.c @@ -2,12 +2,16 @@ #include #include +#include #include #include #include +#include #include #include #include +#include +#include #include @@ -23,9 +27,13 @@ struct protocol proto_reverse_connect = { /* connection layer */ .listen = rev_bind_listener, - .unbind = rev_unbind_receiver, + .enable = rev_enable_listener, + .disable = rev_disable_listener, .add = default_add_listener, + .unbind = rev_unbind_receiver, .resume = default_resume_listener, + .accept_conn = rev_accept_conn, + .set_affinity = rev_set_affinity, /* address family */ .fam = &proto_fam_reverse_connect, @@ -38,6 +46,89 @@ struct protocol proto_reverse_connect = { .receivers = LIST_HEAD_INIT(proto_reverse_connect.receivers), }; +static struct connection *new_reverse_conn(struct listener *l, struct server *srv) +{ + struct connection *conn = conn_new(srv); + if (!conn) + goto err; + + conn_set_reverse(conn, &l->obj_type); + + /* These options is incompatible with a reverse connection. */ + BUG_ON(srv->conn_src.opts & CO_SRC_BIND); + BUG_ON(srv->proxy->conn_src.opts & CO_SRC_BIND); + + sockaddr_alloc(&conn->dst, 0, 0); + if (!conn->dst) + goto err; + *conn->dst = srv->addr; + + if (conn_prepare(conn, protocol_lookup(conn->dst->ss_family, PROTO_TYPE_STREAM, 0), srv->xprt)) + goto err; + + /* TODO simplification of tcp_connect_server() */ + conn->handle.fd = sock_create_server_socket(conn); + if (fd_set_nonblock(conn->handle.fd) == -1) + goto err; + + if (conn->ctrl->connect(conn, 0) != SF_ERR_NONE) + goto err; + + if (conn_xprt_start(conn) < 0) + goto err; + + if (!srv->use_ssl || + (!srv->ssl_ctx.alpn_str && !srv->ssl_ctx.npn_str) || + srv->mux_proto) { + if (conn_install_mux_be(conn, NULL, NULL, NULL) < 0) + goto err; + } + + /* Not expected here. */ + BUG_ON((conn->flags & CO_FL_HANDSHAKE)); + return conn; + + err: + if (conn) { + /* Mark connection as non-reversable. This prevents conn_free() + * to reschedule reverse_connect task on freeing a preconnect + * connection. + */ + conn->reverse.target = NULL; + conn_free(conn); + } + + return NULL; +} + +struct task *rev_process(struct task *task, void *ctx, unsigned int state) +{ + struct listener *l = ctx; + struct connection *conn = l->rx.reverse_connect.pend_conn; + + if (conn) { + /* Spurrious receiver task wake up when pend_conn is not ready/on error. */ + BUG_ON(!(conn->flags & CO_FL_REVERSED)); + /* A connection is ready to be accepted. */ + listener_accept(l); + l->rx.reverse_connect.task->expire = TICK_ETERNITY; + } + else { + /* No pending reverse connection, prepare a new one. Store it in the + * listener and return NULL. Connection will be returned later after + * reversal is completed. + */ + conn = new_reverse_conn(l, l->rx.reverse_connect.srv); + l->rx.reverse_connect.pend_conn = conn; + + /* On success task will be woken up by H2 mux after reversal. */ + l->rx.reverse_connect.task->expire = conn ? TICK_ETERNITY : + MS_TO_TICKS(now_ms + 1000); + } + + return task; +} + int rev_bind_receiver(struct receiver *rx, char **errmsg) { rx->flags |= RX_F_BOUND; @@ -46,11 +137,21 @@ int rev_bind_receiver(struct receiver *rx, char **errmsg) int rev_bind_listener(struct listener *listener, char *errmsg, int errlen) { + struct task *task; struct proxy *be; struct server *srv; struct ist be_name, sv_name; char *name = NULL; + /* TODO for the moment reverse conn creation is pinned to the first thread only. */ + if (!(task = task_new_here())) { + snprintf(errmsg, errlen, "Out of memory."); + goto err; + } + task->process = rev_process; + task->context = listener; + listener->rx.reverse_connect.task = task; + name = strdup(listener->bind_conf->reverse_srvname); if (!name) { snprintf(errmsg, errlen, "Out of memory."); @@ -88,6 +189,8 @@ int rev_bind_listener(struct listener *listener, char *errmsg, int errlen) ha_free(&name); listener->rx.reverse_connect.srv = srv; + listener_set_state(listener, LI_LISTEN); + task_wakeup(listener->rx.reverse_connect.task, TASK_WOKEN_ANY); return ERR_NONE; @@ -96,11 +199,55 @@ int rev_bind_listener(struct listener *listener, char *errmsg, int errlen) return ERR_ALERT | ERR_FATAL; } +void rev_enable_listener(struct listener *l) +{ + task_wakeup(l->rx.reverse_connect.task, TASK_WOKEN_ANY); +} + +void rev_disable_listener(struct listener *l) +{ +} + +struct connection *rev_accept_conn(struct listener *l, int *status) +{ + struct connection *conn = l->rx.reverse_connect.pend_conn; + + if (!conn) { + /* Instantiate a new conn if maxconn not yet exceeded. */ + if (l->bind_conf->maxconn && l->nbconn <= l->bind_conf->maxconn) { + l->rx.reverse_connect.pend_conn = new_reverse_conn(l, l->rx.reverse_connect.srv); + if (!l->rx.reverse_connect.pend_conn) { + *status = CO_AC_PAUSE; + return NULL; + } + } + + *status = CO_AC_DONE; + return NULL; + } + + /* listener_accept() must not be called if no pending connection is not yet reversed. */ + BUG_ON(!(conn->flags & CO_FL_REVERSED)); + conn->flags &= ~CO_FL_REVERSED; + l->rx.reverse_connect.pend_conn = NULL; + *status = CO_AC_NONE; + + return conn; +} + void rev_unbind_receiver(struct listener *l) { l->rx.flags &= ~RX_F_BOUND; } +int rev_set_affinity(struct connection *conn, int new_tid) +{ + /* TODO reversal conn rebinding after is disabled for the moment as we + * did not test possible race conditions. + */ + return -1; +} + int rev_accepting_conn(const struct receiver *rx) { return 1; diff --git a/src/proxy.c b/src/proxy.c index 89e673d94..ae8049dbb 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -321,6 +321,7 @@ void free_proxy(struct proxy *p) free(l->name); free(l->per_thr); free(l->counters); + task_destroy(l->rx.reverse_connect.task); EXTRA_COUNTERS_FREE(l->extra_counters); free(l);