MINOR: proto: extend connection thread rebind API

MINOR: listener: define callback for accept queue push

Extend API for connection thread rebind API by replacing single callback
set_affinity by three different ones. Each one of them is used at a
different stage of the operation :

* set_affinity1 is used similarly to previous set_affinity

* set_affinity2 is called directly from accept_queue_push_mp() when an
  entry has been found in accept ring. This operation cannot fail.

* reset_affinity is called after set_affinity1 in case of failure from
  accept_queue_push_mp() due to no space left in accept ring. This is
  necessary for protocols which must reconfigure resources before
  fallback on the current tid.

This patch does not have any functional changes. However, it will be
required to fix crashes for QUIC connections when accept queue ring is
full. As such, it must be backported with it.
This commit is contained in:
Amaury Denoyelle 2024-07-04 15:23:13 +02:00
parent ff024206f0
commit 1a43b9f32c
6 changed files with 35 additions and 16 deletions

View File

@ -119,7 +119,14 @@ struct protocol {
void (*ignore_events)(struct connection *conn, int event_type); /* unsubscribe from socket events */
int (*get_src)(struct connection *conn, struct sockaddr *, socklen_t); /* retrieve connection's source address; -1=fail */
int (*get_dst)(struct connection *conn, struct sockaddr *, socklen_t); /* retrieve connection's dest address; -1=fail */
int (*set_affinity)(struct connection *conn, int new_tid);
/* functions related to thread affinity update */
/* prepare rebind connection on a new thread, may fail */
int (*set_affinity1)(struct connection *conn, int new_tid);
/* complete connection thread rebinding, no error possible */
void (*set_affinity2)(struct connection *conn);
/* cancel connection thread rebinding */
void (*reset_affinity)(struct connection *conn);
/* functions acting on the receiver */
int (*rx_suspend)(struct receiver *rx); /* temporarily suspend this receiver for a soft restart */

View File

@ -177,7 +177,7 @@ void qc_kill_conn(struct quic_conn *qc);
int qc_parse_hd_form(struct quic_rx_packet *pkt,
unsigned char **buf, const unsigned char *end);
int qc_set_tid_affinity(struct quic_conn *qc, uint new_tid, struct listener *new_li);
int qc_set_tid_affinity1(struct quic_conn *qc, uint new_tid, struct listener *new_li);
void qc_finalize_affinity_rebind(struct quic_conn *qc);
int qc_handle_conn_migration(struct quic_conn *qc,
const struct sockaddr_storage *peer_addr,

View File

@ -105,11 +105,14 @@ struct connection *accept_queue_pop_sc(struct accept_queue_ring *ring)
}
/* tries to push a new accepted connection <conn> into ring <ring>. Returns
* non-zero if it succeeds, or zero if the ring is full. Supports multiple
* producers.
/* Tries to push a new accepted connection <conn> into ring <ring>.
* <accept_push_cb> is called if not NULL just prior to the push operation.
*
* Returns non-zero if it succeeds, or zero if the ring is full. Supports
* multiple producers.
*/
int accept_queue_push_mp(struct accept_queue_ring *ring, struct connection *conn)
int accept_queue_push_mp(struct accept_queue_ring *ring, struct connection *conn,
void (*accept_push_cb)(struct connection *))
{
unsigned int pos, next;
uint32_t idx = _HA_ATOMIC_LOAD(&ring->idx); /* (head << 16) + tail */
@ -124,6 +127,9 @@ int accept_queue_push_mp(struct accept_queue_ring *ring, struct connection *conn
next |= (idx & 0xffff0000U);
} while (unlikely(!_HA_ATOMIC_CAS(&ring->idx, &idx, next) && __ha_cpu_relax()));
if (accept_push_cb)
accept_push_cb(conn);
ring->entry[pos] = conn;
__ha_barrier_store();
return 1;
@ -1013,6 +1019,7 @@ static inline int listener_uses_maxconn(const struct listener *l)
*/
void listener_accept(struct listener *l)
{
void (*li_set_affinity2)(struct connection *);
struct connection *cli_conn;
struct proxy *p;
unsigned int max_accept;
@ -1023,6 +1030,7 @@ void listener_accept(struct listener *l)
int ret;
p = l->bind_conf->frontend;
li_set_affinity2 = l->rx.proto ? l->rx.proto->set_affinity2 : NULL;
/* if l->bind_conf->maxaccept is -1, then max_accept is UINT_MAX. It is
* not really illimited, but it is probably enough.
@ -1461,8 +1469,8 @@ void listener_accept(struct listener *l)
* reservation in the target ring.
*/
if (l->rx.proto && l->rx.proto->set_affinity) {
if (l->rx.proto->set_affinity(cli_conn, t)) {
if (l->rx.proto && l->rx.proto->set_affinity1) {
if (l->rx.proto->set_affinity1(cli_conn, t)) {
/* Failed migration, stay on the same thread. */
goto local_accept;
}
@ -1475,15 +1483,19 @@ void listener_accept(struct listener *l)
* when processing this loop.
*/
ring = &accept_queue_rings[t];
if (accept_queue_push_mp(ring, cli_conn)) {
if (accept_queue_push_mp(ring, cli_conn, li_set_affinity2)) {
_HA_ATOMIC_INC(&activity[t].accq_pushed);
tasklet_wakeup(ring->tasklet);
continue;
}
/* If the ring is full we do a synchronous accept on
* the local thread here.
*/
_HA_ATOMIC_INC(&activity[t].accq_full);
if (l->rx.proto && l->rx.proto->reset_affinity)
l->rx.proto->reset_affinity(cli_conn);
}
#endif // USE_THREAD

View File

@ -61,7 +61,7 @@ static int quic_bind_listener(struct listener *listener, char *errmsg, int errle
static int quic_connect_server(struct connection *conn, int flags);
static void quic_enable_listener(struct listener *listener);
static void quic_disable_listener(struct listener *listener);
static int quic_set_affinity(struct connection *conn, int new_tid);
static int quic_set_affinity1(struct connection *conn, int new_tid);
/* Note: must not be declared <const> as its list will be overwritten */
struct protocol proto_quic4 = {
@ -80,7 +80,7 @@ struct protocol proto_quic4 = {
.get_src = quic_sock_get_src,
.get_dst = quic_sock_get_dst,
.connect = quic_connect_server,
.set_affinity = quic_set_affinity,
.set_affinity1 = quic_set_affinity1,
/* binding layer */
.rx_suspend = udp_suspend_receiver,
@ -124,7 +124,7 @@ struct protocol proto_quic6 = {
.get_src = quic_sock_get_src,
.get_dst = quic_sock_get_dst,
.connect = quic_connect_server,
.set_affinity = quic_set_affinity,
.set_affinity1 = quic_set_affinity1,
/* binding layer */
.rx_suspend = udp_suspend_receiver,
@ -668,10 +668,10 @@ static void quic_disable_listener(struct listener *l)
* target is a listener, and the caller is responsible for guaranteeing that
* the listener assigned to the connection is bound to the requested thread.
*/
static int quic_set_affinity(struct connection *conn, int new_tid)
static int quic_set_affinity1(struct connection *conn, int new_tid)
{
struct quic_conn *qc = conn->handle.qc;
return qc_set_tid_affinity(qc, new_tid, objt_listener(conn->target));
return qc_set_tid_affinity1(qc, new_tid, objt_listener(conn->target));
}
static int quic_alloc_dghdlrs(void)

View File

@ -39,7 +39,7 @@ struct protocol proto_rhttp = {
.unbind = rhttp_unbind_receiver,
.resume = default_resume_listener,
.accept_conn = rhttp_accept_conn,
.set_affinity = rhttp_set_affinity,
.set_affinity1 = rhttp_set_affinity,
/* address family */
.fam = &proto_fam_rhttp,

View File

@ -1730,7 +1730,7 @@ void qc_notify_err(struct quic_conn *qc)
*
* Returns 0 on success else non-zero.
*/
int qc_set_tid_affinity(struct quic_conn *qc, uint new_tid, struct listener *new_li)
int qc_set_tid_affinity1(struct quic_conn *qc, uint new_tid, struct listener *new_li)
{
struct task *t1 = NULL, *t2 = NULL;
struct tasklet *t3 = NULL;