MINOR: quic: replace custom buf on Tx by default struct buffer

On first prototype version of QUIC, emission was multithreaded. To
support this, a custom thread-safe ring-buffer has been implemented with
qring/cbuf.

Now the thread model has been adjusted : a quic-conn is always used on
the same thread and emission is not multi-threaded. Thus, qring/cbuf
usage can be replace by a standard struct buffer.

The code has been simplified even more as for now buffer is always
drained after a prepare/send invocation. This is the case since a
datagram is always considered as sent even on sendto() error. BUG_ON
statements guard are here to ensure that this model is always valid.
Thus, code to handle data wrapping and consume too small contiguous
space with a 0-length datagram is removed.
This commit is contained in:
Amaury Denoyelle 2022-08-04 16:19:57 +02:00
parent 56c6154dba
commit f2476053f9
2 changed files with 120 additions and 172 deletions

View File

@ -657,8 +657,8 @@ struct quic_conn {
uint64_t prep_bytes;
/* Transport parameters sent by the peer */
struct quic_transport_params params;
/* A pointer to a list of TX ring buffers */
struct mt_list *qring_list;
/* Send buffer used to write datagrams. */
struct buffer buf;
} tx;
struct {
/* Number of received bytes. */

View File

@ -2638,62 +2638,16 @@ static int qc_parse_pkt_frms(struct quic_rx_packet *pkt, struct ssl_sock_ctx *ct
return 0;
}
/* Must be called only by a <cbuf> writer (packet builder).
* Return 1 if <cbuf> may be reused to build packets, depending on its <rd> and
* <wr> internal indexes, 0 if not. When this is the case, reset <wr> writer
* index after having marked the end of written data. This the responsibility
* of the caller to ensure there is enough room in <cbuf> to write the end of
* data made of a uint16_t null field.
/* Write datagram header used for sending into <buf>. It is composed of the
* datagram length and address of the first packet in datagram.
*
* +XXXXXXXXXXXXXXXXXXXXXXX---------------+ (cannot be reused)
* ^ ^
* r w
*
* +-------XXXXXXXXXXXXXXXX---------------+ (can be reused)
* ^ ^
* r w
* +--------------------------------------+ (empty buffer, can be reused)
* ^
* (r = w)
*
* +XXXXXXXXXXXXXXXXXXXXX-XXXXXXXXXXXXXXXX+ (full buffer, cannot be reused)
* ^ ^
* w r
* Caller is responsible that there is enough space in the buffer.
*/
static int qc_may_reuse_cbuf(struct cbuf *cbuf)
{
int rd = HA_ATOMIC_LOAD(&cbuf->rd);
/* We can reset the writer index only if in front of the reader index and
* if the reader index is not null. Resetting the writer when the reader
* index is null would empty the buffer.
* XXX Note than the writer index cannot reach the reader index.
* Only the reader index can reach the writer index.
*/
if (rd && rd <= cbuf->wr) {
/* Mark the end of contiguous data for the reader */
write_u16(cb_wr(cbuf), 0);
cb_add(cbuf, sizeof(uint16_t));
cb_wr_reset(cbuf);
return 1;
}
return 0;
}
/* Write <dglen> datagram length and <pkt> first packet address into <cbuf> ring
* buffer. This is the responsibility of the caller to check there is enough
* room in <cbuf>. Also increase the <cbuf> write index consequently.
* This function must be called only after having built a correct datagram.
* Always succeeds.
*/
static inline void qc_set_dg(struct cbuf *cbuf,
static inline void qc_set_dg(struct buffer *buf,
uint16_t dglen, struct quic_tx_packet *pkt)
{
write_u16(cb_wr(cbuf), dglen);
write_ptr(cb_wr(cbuf) + sizeof dglen, pkt);
cb_add(cbuf, dglen + sizeof dglen + sizeof pkt);
write_u16(b_tail(buf), dglen);
write_ptr(b_tail(buf) + sizeof(dglen), pkt);
}
/* Returns 1 if a packet may be built for <qc> from <qel> encryption level
@ -2724,40 +2678,33 @@ static int qc_may_build_pkt(struct quic_conn *qc, struct list *frms,
return 1;
}
/* Prepare as much as possible short packets which are also datagrams into <qr>
* ring buffer for the QUIC connection with <ctx> as I/O handler context from
* <frms> list of prebuilt frames.
* A header made of two fields is added to each datagram: the datagram length followed
* by the address of the first packet in this datagram.
* Returns the number of bytes prepared in packets if succeeded (may be 0),
* or -1 if something wrong happened.
/* Prepare as much as possible QUIC packets for sending from prebuilt frames
* <frms>. Each packet is stored in a distinct datagram written to <buf>.
*
* Each datagram is prepended by a two fields header : the datagram length and
* the address of the packet contained in the datagram.
*
* Returns the number of bytes prepared in packets if succeeded (may be 0), or
* -1 if something wrong happened.
*/
static int qc_prep_app_pkts(struct quic_conn *qc, struct qring *qr,
static int qc_prep_app_pkts(struct quic_conn *qc, struct buffer *buf,
struct list *frms)
{
struct quic_enc_level *qel;
struct cbuf *cbuf;
unsigned char *end_buf, *end, *pos;
unsigned char *end, *pos, *beg;
struct quic_tx_packet *pkt;
size_t total;
size_t dg_headlen;
/* Each datagram is prepended with its length followed by the address
* of the first packet in the datagram.
*/
const size_t dg_headlen = sizeof(uint16_t) + sizeof(pkt);
TRACE_ENTER(QUIC_EV_CONN_PHPKTS, qc);
/* Each datagram is prepended with its length followed by the
* address of the first packet in the datagram.
*/
dg_headlen = sizeof(uint16_t) + sizeof pkt;
qel = &qc->els[QUIC_TLS_ENC_LEVEL_APP];
total = 0;
start:
cbuf = qr->cbuf;
pos = cb_wr(cbuf);
/* Leave at least <sizeof(uint16_t)> bytes at the end of this buffer
* to ensure there is enough room to mark the end of prepared
* contiguous data with a zero length.
*/
end_buf = pos + cb_contig_space(cbuf) - sizeof(uint16_t);
while (end_buf - pos >= (int)qc->path->mtu + dg_headlen) {
beg = pos = (unsigned char *)b_tail(buf);
while (b_contig_space(buf) >= (int)qc->path->mtu + dg_headlen) {
int err, probe, cc;
TRACE_POINT(QUIC_EV_CONN_PHPKTS, qc, qel);
@ -2802,15 +2749,11 @@ static int qc_prep_app_pkts(struct quic_conn *qc, struct qring *qr,
pkt->flags |= QUIC_FL_TX_PACKET_PROBE_WITH_OLD_DATA;
total += pkt->len;
/* Set the current datagram as prepared into <cbuf>. */
qc_set_dg(cbuf, pkt->len, pkt);
}
/* Reset <wr> writer index if in front of <rd> index */
if (end_buf - pos < (int)qc->path->mtu + dg_headlen) {
TRACE_DEVEL("buffer full", QUIC_EV_CONN_PHPKTS, qc);
if (qc_may_reuse_cbuf(cbuf))
goto start;
/* Write datagram header. */
qc_set_dg(buf, pkt->len, pkt);
b_add(buf, pos - beg);
beg = pos;
}
out:
@ -2822,49 +2765,48 @@ static int qc_prep_app_pkts(struct quic_conn *qc, struct qring *qr,
return -1;
}
/* Prepare as much as possible packets into <qr> ring buffer for
* the QUIC connection with <ctx> as I/O handler context, possibly concatenating
* several packets in the same datagram. A header made of two fields is added
* to each datagram: the datagram length followed by the address of the first
* packet in this datagram.
* Returns the number of bytes prepared in packets if succeeded (may be 0),
* or -1 if something wrong happened.
/* Prepare as much as possible QUIC packets for sending from prebuilt frames
* <frms>. Several packets can be regrouped in a single datagram. The result is
* written into <buf>.
*
* Each datagram is prepended by a two fields header : the datagram length and
* the address of first packet in the datagram.
*
* Returns the number of bytes prepared in packets if succeeded (may be 0), or
* -1 if something wrong happened.
*/
static int qc_prep_pkts(struct quic_conn *qc, struct qring *qr,
static int qc_prep_pkts(struct quic_conn *qc, struct buffer *buf,
enum quic_tls_enc_level tel, struct list *tel_frms,
enum quic_tls_enc_level next_tel, struct list *next_tel_frms)
{
struct quic_enc_level *qel;
struct cbuf *cbuf;
unsigned char *end_buf, *end, *pos;
unsigned char *end, *pos, *beg;
struct quic_tx_packet *first_pkt, *cur_pkt, *prv_pkt;
/* length of datagrams */
uint16_t dglen;
size_t total;
int padding;
/* Each datagram is prepended with its length followed by the
* address of the first packet in the datagram.
/* Each datagram is prepended with its length followed by the address
* of the first packet in the datagram.
*/
size_t dg_headlen = sizeof dglen + sizeof first_pkt;
const size_t dg_headlen = sizeof(uint16_t) + sizeof(first_pkt);
struct list *frms;
TRACE_ENTER(QUIC_EV_CONN_PHPKTS, qc);
/* Currently qc_prep_pkts() does not handle buffer wrapping so the
* caller must ensure that buf is resetted.
*/
BUG_ON_HOT(buf->head || buf->data);
total = 0;
qel = &qc->els[tel];
frms = tel_frms;
start:
dglen = 0;
padding = 0;
cbuf = qr->cbuf;
pos = cb_wr(cbuf);
/* Leave at least <dglen> bytes at the end of this buffer
* to ensure there is enough room to mark the end of prepared
* contiguous data with a zero length.
*/
end_buf = pos + cb_contig_space(cbuf) - sizeof dglen;
beg = pos = (unsigned char *)b_head(buf);
first_pkt = prv_pkt = NULL;
while (end_buf - pos >= (int)qc->path->mtu + dg_headlen || prv_pkt) {
while (b_contig_space(buf) >= (int)qc->path->mtu + dg_headlen || prv_pkt) {
int err, probe, cc;
enum quic_pkt_type pkt_type;
struct quic_tls_ctx *tls_ctx;
@ -2881,8 +2823,11 @@ static int qc_prep_pkts(struct quic_conn *qc, struct qring *qr,
probe = qel->pktns->tx.pto_probe;
if (!qc_may_build_pkt(qc, frms, qel, cc, probe, force_ack)) {
if (prv_pkt)
qc_set_dg(cbuf, dglen, first_pkt);
if (prv_pkt) {
qc_set_dg(buf, dglen, first_pkt);
b_add(buf, pos - beg);
beg = pos;
}
/* Let's select the next encryption level */
if (tel != next_tel && next_tel != QUIC_TLS_ENC_LEVEL_NONE) {
tel = next_tel;
@ -2929,9 +2874,12 @@ static int qc_prep_pkts(struct quic_conn *qc, struct qring *qr,
/* If there was already a correct packet present, set the
* current datagram as prepared into <cbuf>.
*/
if (prv_pkt)
qc_set_dg(cbuf, dglen, first_pkt);
goto stop_build;
if (prv_pkt) {
qc_set_dg(buf, dglen, first_pkt);
b_add(buf, pos - beg);
beg = pos;
}
goto out;
default:
break;
}
@ -2989,11 +2937,14 @@ static int qc_prep_pkts(struct quic_conn *qc, struct qring *qr,
prv_pkt = cur_pkt;
}
}
/* If we have to build a new datagram, set the current datagram as
* prepared into <cbuf>.
*/
if (!prv_pkt) {
qc_set_dg(cbuf, dglen, first_pkt);
qc_set_dg(buf, dglen, first_pkt);
b_add(buf, pos - beg);
beg = pos;
first_pkt = NULL;
dglen = 0;
padding = 0;
@ -3005,14 +2956,6 @@ static int qc_prep_pkts(struct quic_conn *qc, struct qring *qr,
}
}
stop_build:
/* Reset <wr> writer index if in front of <rd> index */
if (end_buf - pos < (int)qc->path->mtu + dg_headlen) {
TRACE_DEVEL("buffer full", QUIC_EV_CONN_PHPKTS, qc);
if (qc_may_reuse_cbuf(cbuf))
goto start;
}
out:
TRACE_LEAVE(QUIC_EV_CONN_PHPKTS, qc);
return total;
@ -3022,19 +2965,20 @@ static int qc_prep_pkts(struct quic_conn *qc, struct qring *qr,
return -1;
}
/* Send the QUIC packets which have been prepared for QUIC connections
* from <qr> ring buffer with <ctx> as I/O handler context.
/* Send datagrams stored in <buf>.
*
* This function always returns 1 for success. Even if sendto() syscall failed,
* buffer is drained and packets are considered as emitted. QUIC loss detection
* mechanism is used as a back door way to retry sending.
*/
int qc_send_ppkts(struct qring *qr, struct ssl_sock_ctx *ctx)
int qc_send_ppkts(struct buffer *buf, struct ssl_sock_ctx *ctx)
{
struct quic_conn *qc;
struct cbuf *cbuf;
char skip_sendto = 0;
qc = ctx->qc;
cbuf = qr->cbuf;
TRACE_ENTER(QUIC_EV_CONN_SPPKTS, qc);
while (cb_contig_data(cbuf)) {
while (b_contig_data(buf, 0)) {
unsigned char *pos;
struct buffer tmpbuf = { };
struct quic_tx_packet *first_pkt, *pkt, *next_pkt;
@ -3042,20 +2986,9 @@ int qc_send_ppkts(struct qring *qr, struct ssl_sock_ctx *ctx)
size_t headlen = sizeof dglen + sizeof first_pkt;
unsigned int time_sent;
pos = cb_rd(cbuf);
pos = (unsigned char *)b_head(buf);
dglen = read_u16(pos);
/* End of prepared datagrams.
* Reset the reader index only if in front of the writer index.
*/
if (!dglen) {
int wr = HA_ATOMIC_LOAD(&cbuf->wr);
if (wr && wr < cbuf->rd) {
cb_rd_reset(cbuf);
continue;
}
break;
}
BUG_ON_HOT(!dglen); /* this should not happen */
pos += sizeof dglen;
first_pkt = read_ptr(pos);
@ -3081,7 +3014,7 @@ int qc_send_ppkts(struct qring *qr, struct ssl_sock_ctx *ctx)
}
}
cb_del(cbuf, dglen + headlen);
b_del(buf, dglen + headlen);
qc->tx.bytes += tmpbuf.data;
time_sent = now_ms;
@ -3684,35 +3617,39 @@ static int qc_qel_may_rm_hp(struct quic_conn *qc, struct quic_enc_level *qel)
int qc_send_app_pkts(struct quic_conn *qc, int old_data, struct list *frms)
{
int ret;
struct qring *qr;
qr = MT_LIST_POP(qc->tx.qring_list, typeof(qr), mt_list);
if (!qr)
/* Never happens */
return 1;
struct buffer *buf = b_alloc(&qc->tx.buf);
if (!buf)
return 0;
/* Prepare and send packets until we could not further prepare packets. */
while (1) {
/* Currently buf cannot be non-empty at this stage. Even if a
* previous sendto() has failed it is emptied to simulate
* packet emission and rely on QUIC lost detection to try to
* emit it.
*/
BUG_ON_HOT(b_data(buf));
b_reset(buf);
if (old_data)
qc->flags |= QUIC_FL_CONN_RETRANS_OLD_DATA;
ret = qc_prep_app_pkts(qc, qr, frms);
ret = qc_prep_app_pkts(qc, buf, frms);
if (ret == -1)
goto err;
else if (ret == 0)
goto out;
if (!qc_send_ppkts(qr, qc->xprt_ctx))
if (!qc_send_ppkts(buf, qc->xprt_ctx))
goto err;
}
out:
qc->flags &= ~QUIC_FL_CONN_RETRANS_OLD_DATA;
MT_LIST_APPEND(qc->tx.qring_list, &qr->mt_list);
return 1;
err:
qc->flags &= ~QUIC_FL_CONN_RETRANS_OLD_DATA;
MT_LIST_APPEND(qc->tx.qring_list, &qr->mt_list);
TRACE_DEVEL("leaving in error", QUIC_EV_CONN_IO_CB, qc);
return 0;
}
@ -3727,32 +3664,35 @@ int qc_send_hdshk_pkts(struct quic_conn *qc, int old_data,
enum quic_tls_enc_level next_tel, struct list *next_tel_frms)
{
int ret;
struct qring *qr;
struct buffer *buf = b_alloc(&qc->tx.buf);
if (!buf)
return 0;
qr = MT_LIST_POP(qc->tx.qring_list, typeof(qr), mt_list);
if (!qr)
/* Never happens */
return 1;
/* Currently buf cannot be non-empty at this stage. Even if a previous
* sendto() has failed it is emptied to simulate packet emission and
* rely on QUIC lost detection to try to emit it.
*/
BUG_ON_HOT(b_data(buf));
b_reset(buf);
if (old_data)
qc->flags |= QUIC_FL_CONN_RETRANS_OLD_DATA;
ret = qc_prep_pkts(qc, qr, tel, tel_frms, next_tel, next_tel_frms);
ret = qc_prep_pkts(qc, buf, tel, tel_frms, next_tel, next_tel_frms);
if (ret == -1)
goto err;
else if (ret == 0)
goto out;
if (!qc_send_ppkts(qr, qc->xprt_ctx))
if (!qc_send_ppkts(buf, qc->xprt_ctx))
goto err;
out:
qc->flags &= ~QUIC_FL_CONN_RETRANS_OLD_DATA;
MT_LIST_APPEND(qc->tx.qring_list, &qr->mt_list);
return 1;
err:
qc->flags &= ~QUIC_FL_CONN_RETRANS_OLD_DATA;
MT_LIST_APPEND(qc->tx.qring_list, &qr->mt_list);
TRACE_DEVEL("leaving in error", QUIC_EV_CONN_IO_CB, qc);
return 0;
}
@ -3877,13 +3817,12 @@ struct task *quic_conn_io_cb(struct task *t, void *context, unsigned int state)
struct quic_conn *qc;
enum quic_tls_enc_level tel, next_tel;
struct quic_enc_level *qel, *next_qel;
struct qring *qr; // Tx ring
struct buffer *buf = NULL;
int st, force_ack, zero_rtt;
ctx = context;
qc = ctx->qc;
TRACE_ENTER(QUIC_EV_CONN_IO_CB, qc);
qr = NULL;
st = qc->state;
TRACE_PROTO("state", QUIC_EV_CONN_IO_CB, qc, &st);
@ -3979,21 +3918,31 @@ struct task *quic_conn_io_cb(struct task *t, void *context, unsigned int state)
}
}
if (!qr)
qr = MT_LIST_POP(qc->tx.qring_list, typeof(qr), mt_list);
/* A listener does not send any O-RTT packet. O-RTT packet number space must not
* be considered.
*/
if (!quic_get_tls_enc_levels(&tel, &next_tel, st, 0))
goto err;
ret = qc_prep_pkts(qc, qr, tel, &qc->els[tel].pktns->tx.frms,
buf = b_alloc(&qc->tx.buf);
if (!buf)
goto err;
/* Currently buf cannot be non-empty at this stage. Even if a previous
* sendto() has failed it is emptied to simulate packet emission and
* rely on QUIC lost detection to try to emit it.
*/
BUG_ON_HOT(b_data(buf));
b_reset(buf);
ret = qc_prep_pkts(qc, buf, tel, &qc->els[tel].pktns->tx.frms,
next_tel, &qc->els[next_tel].pktns->tx.frms);
if (ret == -1)
goto err;
else if (ret == 0)
goto skip_send;
if (!qc_send_ppkts(qr, ctx))
if (!qc_send_ppkts(buf, ctx))
goto err;
skip_send:
@ -4008,14 +3957,10 @@ struct task *quic_conn_io_cb(struct task *t, void *context, unsigned int state)
}
out:
if (qr)
MT_LIST_APPEND(qc->tx.qring_list, &qr->mt_list);
TRACE_LEAVE(QUIC_EV_CONN_IO_CB, qc, &st);
return t;
err:
if (qr)
MT_LIST_APPEND(qc->tx.qring_list, &qr->mt_list);
TRACE_DEVEL("leaving in error", QUIC_EV_CONN_IO_CB, qc, &st, &ssl_err);
return t;
}
@ -4116,6 +4061,8 @@ static void quic_conn_release(struct quic_conn *qc)
pool_free(pool_head_quic_rx_packet, pkt);
}
b_free(&qc->tx.buf);
if (qc->idle_timer_task) {
task_destroy(qc->idle_timer_task);
qc->idle_timer_task = NULL;
@ -4338,7 +4285,7 @@ static struct quic_conn *qc_new_conn(const struct quic_version *qv, int ipv4,
if (scid->len)
memcpy(qc->dcid.data, scid->data, scid->len);
qc->dcid.len = scid->len;
qc->tx.qring_list = &l->rx.tx_qring_list;
qc->tx.buf = BUF_NULL;
qc->li = l;
}
/* QUIC Client (outgoing connection to servers) */
@ -4386,6 +4333,7 @@ static struct quic_conn *qc_new_conn(const struct quic_version *qv, int ipv4,
qc->tx.nb_buf = QUIC_CONN_TX_BUFS_NB;
qc->tx.wbuf = qc->tx.rbuf = 0;
qc->tx.bytes = 0;
qc->tx.buf = BUF_NULL;
/* RX part. */
qc->rx.bytes = 0;
qc->rx.buf = b_make(buf_area, QUIC_CONN_RX_BUFSZ, 0, 0);