MEDIUM: proto_reverse_connect: bootstrap active reverse connection

Implement active reverse connection initialization. This is done through
a new task stored in the receiver structure. This task is instantiated
via bind callback and first woken up via enable callback.

Task handler is separated into two halves. On the first step, a new
connection is allocated and stored in <pend_conn> member of the
receiver. This new client connection will proceed to connect using the
server instance referenced in the bind_conf.

When connect has successfully been executed and HTTP/2 connection is
ready for exchange after SETTINGS, reverse_connect task is woken up. As
<pend_conn> is still set, the second halve is executed which only
execute listener_accept(). This will in turn execute accept_conn
callback which is defined to return the pending connection.

The task is automatically requeued inside accept_conn callback if bind
maxconn is not yet reached. This allows to specify how many connection
should be opened. Each connection is instantiated and reversed serially
one by one until maxconn is reached.

conn_free() has been modified to handle failure if a reverse connection
fails before being accepted. In this case, no session exists to notify
about the failure. Instead, reverse_connect task is requeud with a 1
second delay, giving time to fix a possible network issue. This will
allow to attempt a new connection reverse.

Note that for the moment connection rebinding after accept is disabled
for simplicity. Extra operations are required to migrate an existing
connection and its stack to a new thread which will be implemented
later.
This commit is contained in:
Amaury Denoyelle 2023-08-23 17:16:07 +02:00
parent b781a1bb09
commit 47f502df5e
7 changed files with 192 additions and 6 deletions

View File

@ -703,15 +703,30 @@ static inline int conn_is_reverse(const struct connection *conn)
return !!(conn->reverse.target);
}
/* Returns true if connection must be actively reversed or waiting to be accepted. */
static inline int conn_reverse_in_preconnect(const struct connection *conn)
{
return conn_is_back(conn) ? !!(conn->reverse.target) :
!!(conn->flags & CO_FL_REVERSED);
}
/* Initialize <conn> as a reverse connection to <target>. */
static inline void conn_set_reverse(struct connection *conn, enum obj_type *target)
{
/* Ensure the correct target type is used depending on the connection side before reverse. */
BUG_ON(!conn_is_back(conn) && !objt_server(target));
BUG_ON((!conn_is_back(conn) && !objt_server(target)) ||
(conn_is_back(conn) && !objt_listener(target)));
conn->reverse.target = target;
}
/* Returns the listener instance for connection used for active reverse. */
static inline struct listener *conn_active_reverse_listener(const struct connection *conn)
{
return conn_is_back(conn) ? __objt_listener(conn->reverse.target) :
__objt_listener(conn->target);
}
#endif /* _HAPROXY_CONNECTION_H */
/*

View File

@ -1,13 +1,18 @@
#ifndef _HAPROXY_PROTO_REVERSE_CONNECT_H
#define _HAPROXY_PROTO_REVERSE_CONNECT_H
#include <haproxy/connection-t.h>
#include <haproxy/listener-t.h>
#include <haproxy/receiver-t.h>
int rev_bind_receiver(struct receiver *rx, char **errmsg);
int rev_bind_listener(struct listener *listener, char *errmsg, int errlen);
void rev_enable_listener(struct listener *l);
void rev_disable_listener(struct listener *l);
struct connection *rev_accept_conn(struct listener *l, int *status);
void rev_unbind_receiver(struct listener *l);
int rev_set_affinity(struct connection *conn, int new_tid);
int rev_accepting_conn(const struct receiver *rx);

View File

@ -80,7 +80,9 @@ struct receiver {
struct mt_list rxbuf_list; /* list of buffers to receive and dispatch QUIC datagrams. */
#endif
struct {
struct task *task; /* Task used to open connection for reverse. */
struct server *srv; /* Underlying server used to initiate reverse pre-connect. */
struct connection *pend_conn; /* Pending connection waiting to complete reversal before being accepted. */
} reverse_connect;
/* warning: this struct is huge, keep it at the bottom */

View File

@ -535,6 +535,16 @@ void conn_free(struct connection *conn)
ha_free(&conn->reverse.name.area);
if (conn_reverse_in_preconnect(conn)) {
struct listener *l = conn_active_reverse_listener(conn);
/* For the moment reverse connection are bound only on first thread. */
BUG_ON(tid != 0);
/* Receiver must reference a reverse connection as pending. */
BUG_ON(!l->rx.reverse_connect.pend_conn);
l->rx.reverse_connect.pend_conn = NULL;
}
conn_force_unsubscribe(conn);
pool_free(pool_head_connection, conn);
}
@ -2492,10 +2502,14 @@ int conn_reverse(struct connection *conn)
conn_set_owner(conn, NULL, NULL);
}
else {
/* Wake up receiver to proceed to connection accept. */
struct listener *l = __objt_listener(conn->reverse.target);
conn_backend_deinit(conn);
conn->target = conn->reverse.target;
conn->target = &l->obj_type;
conn->flags |= CO_FL_REVERSED;
task_wakeup(l->rx.reverse_connect.task, TASK_WOKEN_ANY);
}
/* Invert source and destination addresses if already set. */

View File

@ -288,13 +288,13 @@ void listener_set_state(struct listener *l, enum li_state st)
_HA_ATOMIC_INC(&px->li_paused);
break;
case LI_LISTEN:
BUG_ON(l->rx.fd == -1);
BUG_ON(l->rx.fd == -1 && !l->rx.reverse_connect.task);
_HA_ATOMIC_INC(&px->li_bound);
break;
case LI_READY:
case LI_FULL:
case LI_LIMITED:
BUG_ON(l->rx.fd == -1);
BUG_ON(l->rx.fd == -1 && !l->rx.reverse_connect.task);
_HA_ATOMIC_INC(&px->li_ready);
l->flags |= LI_F_FINALIZED;
break;
@ -321,7 +321,7 @@ void enable_listener(struct listener *listener)
do_unbind_listener(listener);
if (listener->state == LI_LISTEN) {
BUG_ON(listener->rx.fd == -1);
BUG_ON(listener->rx.fd == -1 && !listener->rx.reverse_connect.task);
if ((global.mode & (MODE_DAEMON | MODE_MWORKER)) &&
(!!master != !!(listener->rx.flags & RX_F_MWORKER))) {
/* we don't want to enable this listener and don't
@ -799,7 +799,9 @@ int create_listeners(struct bind_conf *bc, const struct sockaddr_storage *ss,
l->rx.iocb = proto->default_iocb;
l->rx.fd = fd;
l->rx.reverse_connect.task = NULL;
l->rx.reverse_connect.srv = NULL;
l->rx.reverse_connect.pend_conn = NULL;
memcpy(&l->rx.addr, ss, sizeof(*ss));
if (proto->fam->set_port)

View File

@ -2,12 +2,16 @@
#include <string.h>
#include <haproxy/api.h>
#include <haproxy/connection.h>
#include <haproxy/errors.h>
#include <haproxy/list.h>
#include <haproxy/listener.h>
#include <haproxy/proto_tcp.h>
#include <haproxy/protocol.h>
#include <haproxy/proxy.h>
#include <haproxy/server.h>
#include <haproxy/sock.h>
#include <haproxy/task.h>
#include <haproxy/proto_reverse_connect.h>
@ -23,9 +27,13 @@ struct protocol proto_reverse_connect = {
/* connection layer */
.listen = rev_bind_listener,
.unbind = rev_unbind_receiver,
.enable = rev_enable_listener,
.disable = rev_disable_listener,
.add = default_add_listener,
.unbind = rev_unbind_receiver,
.resume = default_resume_listener,
.accept_conn = rev_accept_conn,
.set_affinity = rev_set_affinity,
/* address family */
.fam = &proto_fam_reverse_connect,
@ -38,6 +46,89 @@ struct protocol proto_reverse_connect = {
.receivers = LIST_HEAD_INIT(proto_reverse_connect.receivers),
};
static struct connection *new_reverse_conn(struct listener *l, struct server *srv)
{
struct connection *conn = conn_new(srv);
if (!conn)
goto err;
conn_set_reverse(conn, &l->obj_type);
/* These options is incompatible with a reverse connection. */
BUG_ON(srv->conn_src.opts & CO_SRC_BIND);
BUG_ON(srv->proxy->conn_src.opts & CO_SRC_BIND);
sockaddr_alloc(&conn->dst, 0, 0);
if (!conn->dst)
goto err;
*conn->dst = srv->addr;
if (conn_prepare(conn, protocol_lookup(conn->dst->ss_family, PROTO_TYPE_STREAM, 0), srv->xprt))
goto err;
/* TODO simplification of tcp_connect_server() */
conn->handle.fd = sock_create_server_socket(conn);
if (fd_set_nonblock(conn->handle.fd) == -1)
goto err;
if (conn->ctrl->connect(conn, 0) != SF_ERR_NONE)
goto err;
if (conn_xprt_start(conn) < 0)
goto err;
if (!srv->use_ssl ||
(!srv->ssl_ctx.alpn_str && !srv->ssl_ctx.npn_str) ||
srv->mux_proto) {
if (conn_install_mux_be(conn, NULL, NULL, NULL) < 0)
goto err;
}
/* Not expected here. */
BUG_ON((conn->flags & CO_FL_HANDSHAKE));
return conn;
err:
if (conn) {
/* Mark connection as non-reversable. This prevents conn_free()
* to reschedule reverse_connect task on freeing a preconnect
* connection.
*/
conn->reverse.target = NULL;
conn_free(conn);
}
return NULL;
}
struct task *rev_process(struct task *task, void *ctx, unsigned int state)
{
struct listener *l = ctx;
struct connection *conn = l->rx.reverse_connect.pend_conn;
if (conn) {
/* Spurrious receiver task wake up when pend_conn is not ready/on error. */
BUG_ON(!(conn->flags & CO_FL_REVERSED));
/* A connection is ready to be accepted. */
listener_accept(l);
l->rx.reverse_connect.task->expire = TICK_ETERNITY;
}
else {
/* No pending reverse connection, prepare a new one. Store it in the
* listener and return NULL. Connection will be returned later after
* reversal is completed.
*/
conn = new_reverse_conn(l, l->rx.reverse_connect.srv);
l->rx.reverse_connect.pend_conn = conn;
/* On success task will be woken up by H2 mux after reversal. */
l->rx.reverse_connect.task->expire = conn ? TICK_ETERNITY :
MS_TO_TICKS(now_ms + 1000);
}
return task;
}
int rev_bind_receiver(struct receiver *rx, char **errmsg)
{
rx->flags |= RX_F_BOUND;
@ -46,11 +137,21 @@ int rev_bind_receiver(struct receiver *rx, char **errmsg)
int rev_bind_listener(struct listener *listener, char *errmsg, int errlen)
{
struct task *task;
struct proxy *be;
struct server *srv;
struct ist be_name, sv_name;
char *name = NULL;
/* TODO for the moment reverse conn creation is pinned to the first thread only. */
if (!(task = task_new_here())) {
snprintf(errmsg, errlen, "Out of memory.");
goto err;
}
task->process = rev_process;
task->context = listener;
listener->rx.reverse_connect.task = task;
name = strdup(listener->bind_conf->reverse_srvname);
if (!name) {
snprintf(errmsg, errlen, "Out of memory.");
@ -88,6 +189,8 @@ int rev_bind_listener(struct listener *listener, char *errmsg, int errlen)
ha_free(&name);
listener->rx.reverse_connect.srv = srv;
listener_set_state(listener, LI_LISTEN);
task_wakeup(listener->rx.reverse_connect.task, TASK_WOKEN_ANY);
return ERR_NONE;
@ -96,11 +199,55 @@ int rev_bind_listener(struct listener *listener, char *errmsg, int errlen)
return ERR_ALERT | ERR_FATAL;
}
void rev_enable_listener(struct listener *l)
{
task_wakeup(l->rx.reverse_connect.task, TASK_WOKEN_ANY);
}
void rev_disable_listener(struct listener *l)
{
}
struct connection *rev_accept_conn(struct listener *l, int *status)
{
struct connection *conn = l->rx.reverse_connect.pend_conn;
if (!conn) {
/* Instantiate a new conn if maxconn not yet exceeded. */
if (l->bind_conf->maxconn && l->nbconn <= l->bind_conf->maxconn) {
l->rx.reverse_connect.pend_conn = new_reverse_conn(l, l->rx.reverse_connect.srv);
if (!l->rx.reverse_connect.pend_conn) {
*status = CO_AC_PAUSE;
return NULL;
}
}
*status = CO_AC_DONE;
return NULL;
}
/* listener_accept() must not be called if no pending connection is not yet reversed. */
BUG_ON(!(conn->flags & CO_FL_REVERSED));
conn->flags &= ~CO_FL_REVERSED;
l->rx.reverse_connect.pend_conn = NULL;
*status = CO_AC_NONE;
return conn;
}
void rev_unbind_receiver(struct listener *l)
{
l->rx.flags &= ~RX_F_BOUND;
}
int rev_set_affinity(struct connection *conn, int new_tid)
{
/* TODO reversal conn rebinding after is disabled for the moment as we
* did not test possible race conditions.
*/
return -1;
}
int rev_accepting_conn(const struct receiver *rx)
{
return 1;

View File

@ -321,6 +321,7 @@ void free_proxy(struct proxy *p)
free(l->name);
free(l->per_thr);
free(l->counters);
task_destroy(l->rx.reverse_connect.task);
EXTRA_COUNTERS_FREE(l->extra_counters);
free(l);