MINOR: quic: Enhance the listener RX buffering part

Add a buffer per QUIC connection. At this time the listener which receives
the UDP datagram is responsible of identifying the underlying QUIC connection
and must copy the QUIC packets to its buffer.
->pkt_list member has been added to quic_conn struct to enlist the packets
in the order they have been copied to the connection buffer so that to be
able to consume this buffer when the packets are freed. This list is locked
thanks to a R/W lock to protect it from concurent accesses.
quic_rx_packet struct does not use a static buffer anymore to store the QUIC
packets contents.
This commit is contained in:
Frédéric Lécaille 2021-11-02 10:14:44 +01:00 committed by Amaury Denoyelle
parent c5c69a0ad2
commit 324ecdafbb
4 changed files with 162 additions and 35 deletions

View File

@ -234,6 +234,8 @@ enum quic_pkt_type {
#define QUIC_TX_RING_BUFSZ (1UL << 12)
/* Size of the internal buffer of QUIC RX buffer. */
#define QUIC_RX_BUFSZ (1UL << 18)
/* Size of the QUIC RX buffer for the connections */
#define QUIC_CONN_RX_BUFSZ (1UL << 13)
extern struct trace_source trace_quic;
extern struct pool_head *pool_head_quic_tx_ring;
@ -408,6 +410,7 @@ extern struct quic_transport_params quic_dflt_transport_params;
struct quic_rx_packet {
struct mt_list list;
struct mt_list rx_list;
struct list qc_rx_pkt_list;
struct quic_conn *qc;
unsigned char type;
uint32_t version;
@ -423,9 +426,11 @@ struct quic_rx_packet {
uint64_t token_len;
/* Packet length */
uint64_t len;
/* Packet length before decryption */
uint64_t raw_len;
/* Additional authenticated data length */
size_t aad_len;
unsigned char data[QUIC_PACKET_MAXLEN];
unsigned char *data;
struct eb64_node pn_node;
volatile unsigned int refcnt;
/* Source address of this packet. */
@ -661,6 +666,11 @@ struct quic_conn {
size_t nb_ack_eliciting;
/* Transport parameters the peer will receive */
struct quic_transport_params params;
/* RX buffer */
struct buffer buf;
/* RX buffer read/write lock */
__decl_thread(HA_RWLOCK_T buf_rwlock);
struct list pkt_list;
} rx;
unsigned int max_ack_delay;
struct quic_path paths[1];

View File

@ -1026,6 +1026,27 @@ static inline int qc_pkt_long(const struct quic_rx_packet *pkt)
return pkt->type != QUIC_PACKET_TYPE_SHORT;
}
/* Release the memory for the RX packets which are no more referenced
* and consume their payloads which have been copied to the RX buffer
* for the connection.
* Always succeeds.
*/
static inline void quic_rx_packet_pool_purge(struct quic_conn *qc)
{
struct quic_rx_packet *pkt, *pktback;
list_for_each_entry_safe(pkt, pktback, &qc->rx.pkt_list, qc_rx_pkt_list) {
if (pkt->data != (unsigned char *)b_head(&qc->rx.buf))
break;
if (!HA_ATOMIC_LOAD(&pkt->refcnt)) {
b_del(&qc->rx.buf, pkt->raw_len);
LIST_DELETE(&pkt->qc_rx_pkt_list);
pool_free(pool_head_quic_rx_packet, pkt);
}
}
}
/* Increment the reference counter of <pkt> */
static inline void quic_rx_packet_refinc(struct quic_rx_packet *pkt)
{
@ -1035,8 +1056,27 @@ static inline void quic_rx_packet_refinc(struct quic_rx_packet *pkt)
/* Decrement the reference counter of <pkt> */
static inline void quic_rx_packet_refdec(struct quic_rx_packet *pkt)
{
if (!HA_ATOMIC_SUB_FETCH(&pkt->refcnt, 1))
if (HA_ATOMIC_SUB_FETCH(&pkt->refcnt, 1))
return;
if (!pkt->qc) {
/* It is possible the connection for this packet has not already been
* identified. In such a case, we only need to free this packet.
*/
pool_free(pool_head_quic_rx_packet, pkt);
}
else {
struct quic_conn *qc = pkt->qc;
HA_RWLOCK_WRLOCK(QUIC_LOCK, &qc->rx.buf_rwlock);
if (pkt->data == (unsigned char *)b_head(&qc->rx.buf)) {
b_del(&qc->rx.buf, pkt->raw_len);
LIST_DELETE(&pkt->qc_rx_pkt_list);
pool_free(pool_head_quic_rx_packet, pkt);
quic_rx_packet_pool_purge(qc);
}
HA_RWLOCK_WRUNLOCK(QUIC_LOCK, &qc->rx.buf_rwlock);
}
}
/* Increment the reference counter of <pkt> */
@ -1052,7 +1092,7 @@ static inline void quic_tx_packet_refdec(struct quic_tx_packet *pkt)
pool_free(pool_head_quic_tx_packet, pkt);
}
ssize_t quic_lstnr_dgram_read(char *buf, size_t len, void *owner,
ssize_t quic_lstnr_dgram_read(struct buffer *buf, size_t len, void *owner,
struct sockaddr_storage *saddr);
#endif /* USE_QUIC */
#endif /* _HAPROXY_XPRT_QUIC_H */

View File

@ -175,10 +175,13 @@ struct connection *quic_sock_accept_conn(struct listener *l, int *status)
void quic_sock_fd_iocb(int fd)
{
ssize_t ret;
struct rxbuf *rxbuf;
struct buffer *buf;
struct listener *l = objt_listener(fdtab[fd].owner);
struct quic_transport_params *params = &l->bind_conf->quic_params;
/* Source address */
struct sockaddr_storage saddr = {0};
size_t max_sz;
socklen_t saddrlen;
BUG_ON(!l);
@ -186,20 +189,32 @@ void quic_sock_fd_iocb(int fd)
if (!(fdtab[fd].state & FD_POLL_IN) || !fd_recv_ready(fd))
return;
buf = get_trash_chunk();
rxbuf = MT_LIST_POP(&l->rx.rxbuf_list, typeof(rxbuf), mt_list);
buf = &rxbuf->buf;
max_sz = params->max_udp_payload_size;
if (b_contig_space(buf) < max_sz) {
/* Note that when we enter this function, <buf> is always empty */
b_reset(buf);
if (b_contig_space(buf) < max_sz)
goto out;
}
saddrlen = sizeof saddr;
do {
ret = recvfrom(fd, buf->area, buf->size, 0,
ret = recvfrom(fd, b_tail(buf), max_sz, 0,
(struct sockaddr *)&saddr, &saddrlen);
if (ret < 0) {
if (errno == EINTR)
continue;
if (errno == EAGAIN)
fd_cant_recv(fd);
return;
goto out;
}
} while (0);
buf->data = ret;
quic_lstnr_dgram_read(buf->area, buf->data, l, &saddr);
b_add(buf, ret);
quic_lstnr_dgram_read(buf, ret, l, &saddr);
b_del(buf, ret);
out:
MT_LIST_APPEND(&l->rx.rxbuf_list, &rxbuf->mt_list);
}

View File

@ -140,6 +140,7 @@ static BIO_METHOD *ha_quic_meth;
DECLARE_POOL(pool_head_quic_tx_ring, "quic_tx_ring_pool", QUIC_TX_RING_BUFSZ);
DECLARE_POOL(pool_head_quic_rxbuf, "quic_rxbuf_pool", QUIC_RX_BUFSZ);
DECLARE_POOL(pool_head_quic_conn_rxbuf, "quic_conn_rxbuf", QUIC_CONN_RX_BUFSZ);
DECLARE_STATIC_POOL(pool_head_quic_conn_ctx,
"quic_conn_ctx_pool", sizeof(struct ssl_sock_ctx));
DECLARE_STATIC_POOL(pool_head_quic_conn, "quic_conn", sizeof(struct quic_conn));
@ -2921,6 +2922,7 @@ static void quic_conn_free(struct quic_conn *conn)
quic_conn_enc_level_uninit(&conn->els[i]);
if (conn->timer_task)
task_destroy(conn->timer_task);
pool_free(pool_head_quic_conn_rxbuf, conn->rx.buf.area);
pool_free(pool_head_quic_conn, conn);
}
@ -2994,6 +2996,7 @@ static struct quic_conn *qc_new_conn(unsigned int version, int ipv4,
struct quic_conn *qc;
/* Initial CID. */
struct quic_connection_id *icid;
char *buf_area;
TRACE_ENTER(QUIC_EV_CONN_INIT);
qc = pool_zalloc(pool_head_quic_conn);
@ -3002,6 +3005,12 @@ static struct quic_conn *qc_new_conn(unsigned int version, int ipv4,
goto err;
}
buf_area = pool_alloc(pool_head_quic_conn_rxbuf);
if (!buf_area) {
TRACE_PROTO("Could not allocate a new RX buffer", QUIC_EV_CONN_INIT);
goto err;
}
qc->cids = EB_ROOT;
/* QUIC Server (or listener). */
if (server) {
@ -3063,6 +3072,9 @@ static struct quic_conn *qc_new_conn(unsigned int version, int ipv4,
/* RX part. */
qc->rx.bytes = 0;
qc->rx.nb_ack_eliciting = 0;
qc->rx.buf = b_make(buf_area, QUIC_CONN_RX_BUFSZ, 0, 0);
HA_RWLOCK_INIT(&qc->rx.buf_rwlock);
LIST_INIT(&qc->rx.pkt_list);
/* XXX TO DO: Only one path at this time. */
qc->path = &qc->paths[0];
@ -3179,7 +3191,17 @@ static int qc_pkt_may_rm_hp(struct quic_rx_packet *pkt,
return 0;
}
/* Try to remove the header protecttion of <pkt> QUIC packet attached to <conn>
/* Insert <pkt> RX packet in its <qel> RX packets tree */
static void qc_pkt_insert(struct quic_rx_packet *pkt, struct quic_enc_level *qel)
{
pkt->pn_node.key = pkt->pn;
HA_RWLOCK_WRLOCK(QUIC_LOCK, &qel->rx.pkts_rwlock);
eb64_insert(&qel->rx.pkts, &pkt->pn_node);
HA_RWLOCK_WRUNLOCK(QUIC_LOCK, &qel->rx.pkts_rwlock);
quic_rx_packet_refinc(pkt);
}
/* Try to remove the header protection of <pkt> QUIC packet attached to <qc>
* QUIC connection with <buf> as packet number field address, <end> a pointer to one
* byte past the end of the buffer containing this packet and <beg> the address of
* the packet first byte.
@ -3189,7 +3211,8 @@ static int qc_pkt_may_rm_hp(struct quic_rx_packet *pkt,
static inline int qc_try_rm_hp(struct quic_rx_packet *pkt,
unsigned char **buf, unsigned char *beg,
const unsigned char *end,
struct quic_conn *qc, struct ssl_sock_ctx *ctx)
struct quic_conn *qc, struct quic_enc_level **el,
struct ssl_sock_ctx *ctx)
{
unsigned char *pn = NULL; /* Packet number field */
struct quic_enc_level *qel;
@ -3217,21 +3240,33 @@ static inline int qc_try_rm_hp(struct quic_rx_packet *pkt,
/* The AAD includes the packet number field found at <pn>. */
pkt->aad_len = pn - beg + pkt->pnl;
qpkt_trace = pkt;
/* Store the packet */
pkt->pn_node.key = pkt->pn;
HA_RWLOCK_WRLOCK(QUIC_LOCK, &qel->rx.pkts_rwlock);
eb64_insert(&qel->rx.pkts, &pkt->pn_node);
quic_rx_packet_refinc(pkt);
HA_RWLOCK_WRUNLOCK(QUIC_LOCK, &qel->rx.pkts_rwlock);
}
else if (qel) {
if (qel->tls_ctx.rx.flags & QUIC_FL_TLS_SECRETS_DCD) {
/* If the packet number space has been discarded, this packet
* will be not parsed.
*/
TRACE_PROTO("Discarded pktns", QUIC_EV_CONN_TRMHP, ctx ? ctx->conn : NULL, pkt);
goto out;
}
TRACE_PROTO("hp not removed", QUIC_EV_CONN_TRMHP, ctx ? ctx->conn : NULL, pkt);
pkt->pn_offset = pn - beg;
MT_LIST_APPEND(&qel->rx.pqpkts, &pkt->list);
quic_rx_packet_refinc(pkt);
}
else {
TRACE_PROTO("Unknown packet type", QUIC_EV_CONN_TRMHP, ctx ? ctx->conn : NULL);
goto err;
}
memcpy(pkt->data, beg, pkt->len);
*el = qel;
/* No reference counter incrementation here!!! */
LIST_APPEND(&qc->rx.pkt_list, &pkt->qc_rx_pkt_list);
memcpy(b_tail(&qc->rx.buf), beg, pkt->len);
pkt->data = (unsigned char *)b_tail(&qc->rx.buf);
b_add(&qc->rx.buf, pkt->len);
out:
/* Updtate the offset of <*buf> for the next QUIC packet. */
*buf = beg + pkt->len;
@ -3274,6 +3309,8 @@ static ssize_t qc_srv_pkt_rcv(unsigned char **buf, const unsigned char *end,
struct connection *srv_conn;
struct ssl_sock_ctx *conn_ctx;
int long_header;
size_t b_cspace;
struct quic_enc_level *qel;
qc = NULL;
TRACE_ENTER(QUIC_EV_CONN_SPKT);
@ -3377,14 +3414,27 @@ static ssize_t qc_srv_pkt_rcv(unsigned char **buf, const unsigned char *end,
goto err;
}
if (pkt->len > sizeof pkt->data) {
TRACE_PROTO("Too big packet", QUIC_EV_CONN_SPKT, qc->conn, pkt, &pkt->len);
HA_RWLOCK_WRLOCK(QUIC_LOCK, &qc->rx.buf_rwlock);
b_cspace = b_contig_space(&qc->rx.buf);
if (b_cspace < pkt->len) {
/* Let us consume the remaining contiguous space. */
b_add(&qc->rx.buf, b_cspace);
if (b_contig_space(&qc->rx.buf) < pkt->len) {
HA_RWLOCK_WRUNLOCK(QUIC_LOCK, &qc->rx.buf_rwlock);
TRACE_PROTO("Too big packet", QUIC_EV_CONN_SPKT, qc->conn, pkt, &pkt->len);
goto err;
}
}
if (!qc_try_rm_hp(pkt, buf, beg, end, qc, &qel, conn_ctx)) {
HA_RWLOCK_WRUNLOCK(QUIC_LOCK, &qc->rx.buf_rwlock);
TRACE_PROTO("Packet dropped", QUIC_EV_CONN_SPKT, qc->conn);
goto err;
}
if (!qc_try_rm_hp(pkt, buf, beg, end, qc, conn_ctx))
goto err;
HA_RWLOCK_WRUNLOCK(QUIC_LOCK, &qc->rx.buf_rwlock);
if (pkt->aad_len)
qc_pkt_insert(pkt, qel);
/* Wake the tasklet of the QUIC connection packet handler. */
if (conn_ctx)
tasklet_wakeup(conn_ctx->wait_event.tasklet);
@ -3410,6 +3460,8 @@ static ssize_t qc_lstnr_pkt_rcv(unsigned char **buf, const unsigned char *end,
struct listener *l;
struct ssl_sock_ctx *conn_ctx;
int long_header = 0;
size_t b_cspace;
struct quic_enc_level *qel;
qc = NULL;
conn_ctx = NULL;
@ -3600,6 +3652,8 @@ static ssize_t qc_lstnr_pkt_rcv(unsigned char **buf, const unsigned char *end,
pkt->len = end - *buf;
}
pkt->qc = qc;
/* Store the DCID used for this packet to check the packet which
* come in this UDP datagram match with it.
*/
@ -3609,25 +3663,35 @@ static ssize_t qc_lstnr_pkt_rcv(unsigned char **buf, const unsigned char *end,
}
/* Increase the total length of this packet by the header length. */
pkt->len += *buf - beg;
pkt->raw_len = pkt->len += *buf - beg;
/* Do not check the DCID node before the length. */
if (dgram_ctx->dcid_node != node) {
TRACE_PROTO("Packet dropped", QUIC_EV_CONN_LPKT, qc->conn);
goto err;
}
if (pkt->len > sizeof pkt->data) {
TRACE_PROTO("Too big packet", QUIC_EV_CONN_LPKT, qc->conn, pkt, &pkt->len);
goto err;
HA_RWLOCK_WRLOCK(QUIC_LOCK, &qc->rx.buf_rwlock);
b_cspace = b_contig_space(&qc->rx.buf);
if (b_cspace < pkt->len) {
/* Let us consume the remaining contiguous space. */
b_add(&qc->rx.buf, b_cspace);
if (b_contig_space(&qc->rx.buf) < pkt->len) {
HA_RWLOCK_WRUNLOCK(QUIC_LOCK, &qc->rx.buf_rwlock);
TRACE_PROTO("Too big packet", QUIC_EV_CONN_LPKT, qc->conn, pkt, &pkt->len);
goto err;
}
}
if (!qc_try_rm_hp(pkt, buf, beg, end, qc, conn_ctx)) {
if (!qc_try_rm_hp(pkt, buf, beg, end, qc, &qel, conn_ctx)) {
HA_RWLOCK_WRUNLOCK(QUIC_LOCK, &qc->rx.buf_rwlock);
TRACE_PROTO("Packet dropped", QUIC_EV_CONN_LPKT, qc->conn);
goto err;
}
HA_RWLOCK_WRUNLOCK(QUIC_LOCK, &qc->rx.buf_rwlock);
TRACE_PROTO("New packet", QUIC_EV_CONN_LPKT, qc->conn, pkt);
if (pkt->aad_len)
qc_pkt_insert(pkt, qel);
/* Wake up the connection packet handler task from here only if all
* the contexts have been initialized, especially the mux context
* conn_ctx->conn->ctx. Note that this is ->start xprt callback which
@ -4671,12 +4735,11 @@ static void __quic_conn_deinit(void)
BIO_meth_free(ha_quic_meth);
}
/* Read all the QUIC packets found in <buf> with <len> as length (typically a UDP
* datagram), <ctx> being the QUIC I/O handler context, from QUIC connections,
* calling <func> function;
/* Read all the QUIC packets found in <buf> from QUIC connection with <owner>
* as owner calling <func> function.
* Return the number of bytes read if succeeded, -1 if not.
*/
static ssize_t quic_dgram_read(char *buf, size_t len, void *owner,
static ssize_t quic_dgram_read(struct buffer *buf, size_t len, void *owner,
struct sockaddr_storage *saddr, qpkt_read_func *func)
{
unsigned char *pos;
@ -4686,9 +4749,8 @@ static ssize_t quic_dgram_read(char *buf, size_t len, void *owner,
.owner = owner,
};
pos = (unsigned char *)buf;
pos = (unsigned char *)b_head(buf);
end = pos + len;
do {
int ret;
struct quic_rx_packet *pkt;
@ -4720,7 +4782,7 @@ static ssize_t quic_dgram_read(char *buf, size_t len, void *owner,
return -1;
}
ssize_t quic_lstnr_dgram_read(char *buf, size_t len, void *owner,
ssize_t quic_lstnr_dgram_read(struct buffer *buf, size_t len, void *owner,
struct sockaddr_storage *saddr)
{
return quic_dgram_read(buf, len, owner, saddr, qc_lstnr_pkt_rcv);