MEDIUM: quic: rearchitecture Rx path for bidirectional STREAM frames

Reorganize the Rx path for STREAM frames on bidirectional streams. A new
function qcc_recv is implemented on the MUX. It will handle the STREAM
frames copy and offset calculation from transport to MUX.

Another function named qcc_decode_qcs from the MUX can be called by
transport each time new STREAM data has been copied.

The architecture is now cleaner with the MUX layer in charge of parsing
the STREAM frames offsets. This is required to be able to implement the
flow-control on the MUX layer.

Note that as a convenience, a STREAM frame is not partially copied to
the MUX buffer. This simplify the implementation for the moment but it
may change in the future to optimize the STREAM frames handling.

For the moment, only bidirectional streams benefit from this change. In
the future, it may be extended to unidirectional streams to unify the
STREAM frames processing.
This commit is contained in:
Amaury Denoyelle 2022-02-28 11:37:48 +01:00
parent 3c4303998f
commit 0e3010b1bb
3 changed files with 133 additions and 138 deletions

View File

@ -19,6 +19,10 @@ int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
void qcs_notify_recv(struct qcs *qcs);
void qcs_notify_send(struct qcs *qcs);
int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
char fin, char *data, struct qcs **out_qcs);
int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs);
/* Bit shift to get the stream sub ID for internal use which is obtained
* shifting the stream IDs by this value, knowing that the
* QCS_ID_TYPE_SHIFT less significant bits identify the stream ID

View File

@ -192,6 +192,91 @@ struct eb64_node *qcc_get_qcs(struct qcc *qcc, uint64_t id)
return NULL;
}
/* Handle a new STREAM frame <strm_frm>. The frame content will be copied in
* the buffer of the stream instance. The stream instance will be stored in
* <out_qcs>. In case of success, the caller can immediatly call qcc_decode_qcs
* to process the frame content.
*
* Returns 0 on success. On errors, two codes are present.
* - 1 is returned if the frame cannot be decoded and must be discarded.
* - 2 is returned if the stream cannot decode at the moment the frame. The
* frame should be buffered to be handled later.
*/
int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
char fin, char *data, struct qcs **out_qcs)
{
struct qcs *qcs;
struct eb64_node *strm_node;
size_t total, diff;
strm_node = qcc_get_qcs(qcc, id);
if (!strm_node) {
fprintf(stderr, "%s: stream not found\n", __func__);
return 1;
}
qcs = eb64_entry(&strm_node->node, struct qcs, by_id);
*out_qcs = qcs;
if (offset > qcs->rx.offset)
return 2;
if (offset + len <= qcs->rx.offset) {
fprintf(stderr, "%s: already received STREAM data\n", __func__);
return 1;
}
/* Last frame already handled for this stream. */
BUG_ON(qcs->flags & QC_SF_FIN_RECV);
if (!qc_get_buf(qcs, &qcs->rx.buf)) {
/* TODO should mark qcs as full */
return 2;
}
fprintf(stderr, "%s: new STREAM data\n", __func__);
diff = qcs->rx.offset - offset;
/* TODO do not partially copy a frame if not enough size left. Maybe
* this can be optimized.
*/
if (len > b_room(&qcs->rx.buf)) {
/* TODO handle STREAM frames larger than RX buffer. */
BUG_ON(len > b_size(&qcs->rx.buf));
return 2;
}
len -= diff;
data += diff;
total = b_putblk(&qcs->rx.buf, data, len);
/* TODO handle partial copy of a STREAM frame. */
BUG_ON(len != total);
qcs->rx.offset += total;
if (fin)
qcs->flags |= QC_SF_FIN_RECV;
out:
return 0;
}
/* Decode the content of STREAM frames already received on the stream instance
* <qcs>.
*
* Returns 0 on success else non-zero.
*/
int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
{
if (qcc->app_ops->decode_qcs(qcs, qcs->flags & QC_SF_FIN_RECV, qcc->ctx) < 0) {
fprintf(stderr, "%s: decoding error\n", __func__);
return 1;
}
return 0;
}
/* detaches the QUIC stream from its QCC and releases it to the QCS pool. */
static void qcs_destroy(struct qcs *qcs)
{

View File

@ -1984,92 +1984,6 @@ static size_t qc_strm_cpy(struct buffer *buf, struct quic_stream *strm_frm)
return ret;
}
/* Copy as most as possible STREAM data from <strm_frm> into <buf> buffer.
*
* Note that <strm_frm> is not updated as it is implied that the frame may be
* present in a tree and offset node is used as the key. The caller should
* update offset/lenght of the frame after the function call.
*
* Return the total count of copied bytes.
*/
static size_t qc_rx_strm_frm_cpy(struct buffer *buf,
struct quic_rx_strm_frm *strm_frm)
{
size_t flen = strm_frm->len;
size_t ret = 0;
size_t try;
ret = 0;
while (flen && (try = b_contig_space(buf))) {
if (try > flen)
try = flen;
memcpy(b_tail(buf), strm_frm->data + ret, try);
b_add(buf, try);
ret += try;
flen -= try;
}
return ret;
}
/* Process as much as possible RX STREAM frames received for <qcs> */
static size_t qc_treat_rx_strm_frms(struct qcs *qcs)
{
int total;
struct eb64_node *frm_node;
total = 0;
frm_node = eb64_first(&qcs->rx.frms);
while (frm_node) {
int ret;
struct quic_rx_strm_frm *frm;
size_t diff;
frm = eb64_entry(&frm_node->node, struct quic_rx_strm_frm, offset_node);
if (frm->offset_node.key + frm->len < qcs->rx.offset) {
/* fully already received STREAM offset */
goto next;
}
BUG_ON(qcs->flags & QC_SF_FIN_RECV);
if (frm->offset_node.key > qcs->rx.offset)
break;
diff = qcs->rx.offset - frm->offset_node.key;
frm->data += diff;
frm->len -= diff;
ret = qc_rx_strm_frm_cpy(&qcs->rx.buf, frm);
qcs->rx.offset += ret;
total += ret;
BUG_ON(frm->len < ret);
if (frm->len - ret > 0) {
/* Remove the frame from the tree before updating the
* offset field.
*/
eb64_delete(&frm->offset_node);
frm->offset_node.key += (diff + ret);
frm->data += ret;
frm->len -= ret;
eb64_insert(&qcs->rx.frms, &frm->offset_node);
break;
}
if (frm->fin)
qcs->flags |= QC_SF_FIN_RECV;
next:
frm_node = eb64_next(frm_node);
quic_rx_packet_refdec(frm->pkt);
eb64_delete(&frm->offset_node);
pool_free(pool_head_quic_rx_strm_frm, frm);
}
return total;
}
/* Handle <strm_frm> bidirectional STREAM frame. Depending on its ID, several
* streams may be open. The data are copied to the stream RX buffer if possible.
* If not, the STREAM frame is stored to be treated again later.
@ -2080,63 +1994,23 @@ static int qc_handle_bidi_strm_frm(struct quic_rx_packet *pkt,
struct quic_stream *strm_frm,
struct quic_conn *qc)
{
int total;
struct qcs *strm;
struct eb64_node *strm_node;
struct quic_rx_strm_frm *frm;
strm_node = qcc_get_qcs(qc->qcc, strm_frm->id);
if (!strm_node) {
TRACE_PROTO("Stream not found", QUIC_EV_CONN_PSTRM, qc);
return 0;
}
strm = eb64_entry(&strm_node->node, struct qcs, by_id);
if (strm_frm->offset.key < strm->rx.offset) {
size_t diff;
if (strm_frm->offset.key + strm_frm->len <= strm->rx.offset) {
TRACE_PROTO("Already received STREAM data",
QUIC_EV_CONN_PSTRM, qc);
goto out;
}
TRACE_PROTO("Partially already received STREAM data", QUIC_EV_CONN_PSTRM, qc);
diff = strm->rx.offset - strm_frm->offset.key;
strm_frm->offset.key = strm->rx.offset;
strm_frm->len -= diff;
strm_frm->data += diff;
}
BUG_ON(strm->flags & QC_SF_FIN_RECV);
total = 0;
if (strm_frm->offset.key == strm->rx.offset) {
struct eb64_node *frm_node;
struct qcs *qcs = NULL;
int ret;
if (!qc_get_buf(strm, &strm->rx.buf))
goto store_frm;
ret = qcc_recv(qc->qcc, strm_frm->id, strm_frm->len,
strm_frm->offset.key, strm_frm->fin,
(char *)strm_frm->data, &qcs);
ret = qc_strm_cpy(&strm->rx.buf, strm_frm);
total += ret;
strm->rx.offset += ret;
}
/* FIN is set only if all data were copied. */
if (strm_frm->fin && !strm_frm->len)
strm->flags |= QC_SF_FIN_RECV;
total += qc_treat_rx_strm_frms(strm);
if (total && qc->qcc->app_ops->decode_qcs(strm, strm->flags & QC_SF_FIN_RECV, qc->qcc->ctx) < 0) {
TRACE_PROTO("Decoding error", QUIC_EV_CONN_PSTRM, qc);
/* invalid or already received frame */
if (ret == 1)
return 0;
}
if (!strm_frm->len)
goto out;
store_frm:
if (ret == 2) {
/* frame cannot be parsed at the moment and should be
* buffered.
*/
frm = new_quic_rx_strm_frm(strm_frm, pkt);
if (!frm) {
TRACE_PROTO("Could not alloc RX STREAM frame",
@ -2144,10 +2018,42 @@ static int qc_handle_bidi_strm_frm(struct quic_rx_packet *pkt,
return 0;
}
eb64_insert(&strm->rx.frms, &frm->offset_node);
eb64_insert(&qcs->rx.frms, &frm->offset_node);
quic_rx_packet_refinc(pkt);
out:
return 1;
}
/* Frame correctly received by the mux.
* If there is buffered frame for next offset, it may be possible to
* receive them now.
*/
frm_node = eb64_first(&qcs->rx.frms);
while (frm_node) {
frm = eb64_entry(&frm_node->node,
struct quic_rx_strm_frm, offset_node);
ret = qcc_recv(qc->qcc, qcs->by_id.key, frm->len,
frm->offset_node.key, frm->fin,
(char *)frm->data, &qcs);
/* interrupt the parsing if the frame cannot be handled for the
* moment only by the MUX.
*/
if (ret == 2)
break;
/* Remove a newly received frame or an invalid one. */
frm_node = eb64_next(frm_node);
eb64_delete(&frm->offset_node);
quic_rx_packet_refdec(frm->pkt);
pool_free(pool_head_quic_rx_strm_frm, frm);
}
/* Decode the received data. */
if (qcc_decode_qcs(qc->qcc, qcs))
return 0;
return 1;
}