diff --git a/src/mux_h2.c b/src/mux_h2.c index a8f7792e9..35b440e9f 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -32,6 +32,7 @@ #include #include #include +#include /* dummy streams returned for closed, error, refused, idle and states */ @@ -6871,6 +6872,192 @@ static size_t h2_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, in return total; } +static size_t h2_init_ff(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice) +{ + struct h2s *h2s = __sc_mux_strm(sc); + struct h2c *h2c = h2s->h2c; + struct buffer *mbuf; + size_t sz , ret = 0; + + TRACE_ENTER(H2_EV_H2S_SEND|H2_EV_STRM_SEND, h2s->h2c->conn, h2s); + + /* If we were not just woken because we wanted to send but couldn't, + * and there's somebody else that is waiting to send, do nothing, + * we will subscribe later and be put at the end of the list + */ + if (!(h2s->flags & H2_SF_NOTIFIED) && + (!LIST_ISEMPTY(&h2c->send_list) || !LIST_ISEMPTY(&h2c->fctl_list))) { + if (LIST_INLIST(&h2s->list)) + TRACE_DEVEL("stream already waiting, leaving", H2_EV_H2S_SEND|H2_EV_H2S_BLK, h2s->h2c->conn, h2s); + else { + TRACE_DEVEL("other streams already waiting, going to the queue and leaving", H2_EV_H2S_SEND|H2_EV_H2S_BLK, h2s->h2c->conn, h2s); + h2s->h2c->flags |= H2_CF_WAIT_INLIST; + } + h2s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; + goto end; + } + h2s->flags &= ~H2_SF_NOTIFIED; + + if (h2s_mws(h2s) <= 0) { + h2s->flags |= H2_SF_BLK_SFCTL; + if (LIST_INLIST(&h2s->list)) + LIST_DEL_INIT(&h2s->list); + LIST_APPEND(&h2c->blocked_list, &h2s->list); + h2s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; + TRACE_STATE("stream window <=0, flow-controlled", H2_EV_H2S_SEND|H2_EV_H2S_FCTL, h2c->conn, h2s); + goto end; + } + if (h2c->mws <= 0) { + h2s->flags |= H2_SF_BLK_MFCTL; + h2s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; + TRACE_STATE("connection window <=0, stream flow-controlled", H2_EV_H2S_SEND|H2_EV_H2C_FCTL, h2c->conn, h2s); + goto end; + } + + sz = count; + if (sz > h2s_mws(h2s)) + sz = h2s_mws(h2s); + if (h2c->mfs && sz > h2c->mfs) + sz = h2c->mfs; // >0 + if (sz > h2c->mws) + sz = h2c->mws; + + if (count > sz) + count = sz; + + mbuf = br_tail(h2c->mbuf); + retry: + if (!h2_get_buf(h2c, mbuf)) { + h2c->flags |= H2_CF_MUX_MALLOC; + h2s->flags |= H2_SF_BLK_MROOM; + h2s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; + TRACE_STATE("waiting for room in output buffer", H2_EV_H2S_SEND|H2_EV_H2S_BLK, h2c->conn, h2s); + goto end; + } + + if (b_room(mbuf) < sz && b_room(mbuf) < b_size(mbuf) / 4) { + if ((mbuf = br_tail_add(h2c->mbuf)) != NULL) + goto retry; + h2c->flags |= H2_CF_MUX_MFULL; + h2s->flags |= H2_SF_BLK_MROOM; + h2s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; + TRACE_STATE("too large data present in output buffer, waiting for emptiness", H2_EV_H2S_SEND|H2_EV_H2S_BLK, h2c->conn, h2s); + goto end; + } + + while (1) { + if (b_contig_space(mbuf) >= 9 || !b_space_wraps(mbuf)) + break; + b_slow_realign(mbuf, trash.area, b_data(mbuf)); + } + + if (b_contig_space(mbuf) <= 9) { + if ((mbuf = br_tail_add(h2c->mbuf)) != NULL) + goto retry; + h2c->flags |= H2_CF_MUX_MFULL; + h2s->flags |= H2_SF_BLK_MROOM; + h2s->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED; + TRACE_STATE("output buffer full", H2_EV_H2S_SEND|H2_EV_H2S_BLK, h2c->conn, h2s); + goto end; + } + + /* Cannot forward more than available room in output buffer */ + sz = b_contig_space(mbuf) - 9; + if (count > sz) + count = sz; + + /* len: 0x000000 (fill later), type: 0(DATA), flags: none=0 */ + memcpy(b_tail(mbuf), "\x00\x00\x00\x00\x00", 5); + write_n32(b_tail(mbuf) + 5, h2s->id); // 4 bytes + + h2s->sd->iobuf.buf = mbuf; + h2s->sd->iobuf.offset = 9; + h2s->sd->iobuf.data = 0; + + /* forward remaining input data */ + if (b_data(input)) { + size_t xfer = count; + + if (xfer > b_data(input)) + xfer = b_data(input); + b_add(mbuf, 9); + h2s->sd->iobuf.data = b_xfer(mbuf, input, xfer); + b_sub(mbuf, 9); + + /* Cannot forward more data, wait for room */ + if (b_data(input)) + goto end; + } + + ret = count - h2s->sd->iobuf.data; + end: + TRACE_LEAVE(H2_EV_H2S_SEND|H2_EV_STRM_SEND, h2s->h2c->conn, h2s); + return ret; +} + +static void h2_done_ff(struct stconn *sc) +{ + struct h2s *h2s = __sc_mux_strm(sc); + struct h2c *h2c = h2s->h2c; + struct sedesc *sd = h2s->sd; + struct buffer *mbuf; + char *head; + + TRACE_ENTER(H2_EV_H2S_SEND|H2_EV_STRM_SEND, h2s->h2c->conn, h2s); + + mbuf = sd->iobuf.buf; + if (!mbuf) + goto end; + head = b_peek(mbuf, b_data(mbuf) - sd->iobuf.data); + + /* FIXME: Must be handled with a flag. It is just a temporary hack */ + { + struct xref *peer; + struct sedesc *sdo; + + peer = xref_get_peer_and_lock(&h2s->sd->xref); + if (!peer) + goto end; + + sdo = container_of(peer, struct sedesc, xref); + xref_unlock(&h2s->sd->xref, peer); + + if (se_fl_test(sdo, SE_FL_EOI)) + h2s->flags &= ~H2_SF_MORE_HTX_DATA; + } + + if (!(sd->iobuf.flags & IOBUF_FL_FF_BLOCKED) && + !(h2s->flags & H2_SF_BLK_SFCTL) && + !(h2s->flags & (H2_SF_WANT_SHUTR|H2_SF_WANT_SHUTW))) { + /* Ok we managed to send something, leave the send_list if we were still there */ + h2_remove_from_list(h2s); + } + + if (!sd->iobuf.data) + goto end; + + /* Perform a synchronous send but in all cases, consider + * everything was already sent from the SC point of view. + */ + h2_set_frame_size(head, sd->iobuf.data); + b_add(mbuf, 9); + h2s->sws -= sd->iobuf.data; + h2c->mws -= sd->iobuf.data; + h2_process(h2c); + + end: + sd->iobuf.buf = NULL; + sd->iobuf.offset = 0; + sd->iobuf.data = 0; + + TRACE_LEAVE(H2_EV_H2S_SEND|H2_EV_STRM_SEND, h2s->h2c->conn, h2s); +} + +static int h2_resume_ff(struct stconn *sc, unsigned int flags) +{ + return 0; +} + /* appends some info about stream to buffer , or does nothing if * is NULL. Returns non-zero if the stream is considered suspicious. May * emit multiple lines, each new one being prefixed with , if is not @@ -7185,6 +7372,9 @@ static const struct mux_ops h2_ops = { .wake = h2_wake, .snd_buf = h2_snd_buf, .rcv_buf = h2_rcv_buf, + .init_fastfwd = h2_init_ff, + .done_fastfwd = h2_done_ff, + .resume_fastfwd = h2_resume_ff, .subscribe = h2_subscribe, .unsubscribe = h2_unsubscribe, .attach = h2_attach,