From 285f7616ee1b038aa1ec40b9568a954e9a61d1af Mon Sep 17 00:00:00 2001 From: Christopher Faulet Date: Mon, 12 Dec 2022 08:28:55 +0100 Subject: [PATCH] MEDIUM: channel: Use CF_READ_EVENT instead of CF_READ_PARTIAL CF_READ_PARTIAL flag is now merged with CF_READ_EVENT. It means CF_READ_EVENT is set when a read0 is received (formely CF_READ_NULL) or when data are received (formely CF_READ_ACTIVITY). There is nothing special here, except conditions to wake the stream up in sc_notify(). Indeed, the test was a bit changed to reflect recent change. read0 event is now formalized by (CF_READ_EVENT + CF_SHUTR). --- include/haproxy/channel-t.h | 8 ++++---- include/haproxy/channel.h | 2 +- src/backend.c | 2 +- src/channel.c | 2 +- src/stconn.c | 27 +++++++++++++++------------ src/stream.c | 12 ++++++------ 6 files changed, 28 insertions(+), 25 deletions(-) diff --git a/include/haproxy/channel-t.h b/include/haproxy/channel-t.h index 4e7557cb7..b87005319 100644 --- a/include/haproxy/channel-t.h +++ b/include/haproxy/channel-t.h @@ -54,10 +54,10 @@ */ #define CF_READ_EVENT 0x00000001 /* a read event detected on producer side */ -#define CF_READ_PARTIAL 0x00000002 /* some data were read from producer or a read exception occurred */ +/* unused: 0x00000002 */ #define CF_READ_TIMEOUT 0x00000004 /* timeout while waiting for producer */ #define CF_READ_ERROR 0x00000008 /* unrecoverable error on producer side */ -#define CF_READ_ACTIVITY (CF_READ_EVENT|CF_READ_PARTIAL|CF_READ_ERROR) +#define CF_READ_ACTIVITY (CF_READ_EVENT|CF_READ_ERROR) /* unused: 0x00000010 */ #define CF_SHUTR 0x00000020 /* producer has already shut down */ @@ -138,7 +138,7 @@ static forceinline char *chn_show_flags(char *buf, size_t len, const char *delim /* prologue */ _(0); /* flags */ - _(CF_READ_EVENT, _(CF_READ_PARTIAL, _(CF_READ_TIMEOUT, _(CF_READ_ERROR, + _(CF_READ_EVENT, _(CF_READ_TIMEOUT, _(CF_READ_ERROR, _(CF_SHUTR, _(CF_SHUTR_NOW, _(CF_READ_NOEXP, _(CF_WRITE_EVENT, _(CF_WRITE_PARTIAL, _(CF_WRITE_TIMEOUT, _(CF_WRITE_ERROR, _(CF_WAKE_WRITE, _(CF_SHUTW, _(CF_SHUTW_NOW, _(CF_AUTO_CLOSE, @@ -146,7 +146,7 @@ static forceinline char *chn_show_flags(char *buf, size_t len, const char *delim _(CF_READ_ATTACHED, _(CF_KERN_SPLICING, _(CF_READ_DONTWAIT, _(CF_AUTO_CONNECT, _(CF_DONT_READ, _(CF_EXPECT_MORE, _(CF_SEND_DONTWAIT, _(CF_NEVER_WAIT, _(CF_WAKE_ONCE, _(CF_FLT_ANALYZE, - _(CF_EOI, _(CF_ISRESP))))))))))))))))))))))))))))))); + _(CF_EOI, _(CF_ISRESP)))))))))))))))))))))))))))))); /* epilogue */ _(~0U); return buf; diff --git a/include/haproxy/channel.h b/include/haproxy/channel.h index 354e56a16..b8d6c81de 100644 --- a/include/haproxy/channel.h +++ b/include/haproxy/channel.h @@ -378,7 +378,7 @@ static inline void channel_add_input(struct channel *chn, unsigned int len) } /* notify that some data was read */ chn->total += len; - chn->flags |= CF_READ_PARTIAL; + chn->flags |= CF_READ_EVENT; } static inline unsigned long long channel_htx_forward(struct channel *chn, struct htx *htx, unsigned long long bytes) diff --git a/src/backend.c b/src/backend.c index 6e7c989e1..0e5dbfb2f 100644 --- a/src/backend.c +++ b/src/backend.c @@ -1849,7 +1849,7 @@ skip_reuse: * care of it. */ if (sc_ep_test(s->scb, SE_FL_EOI) && !(sc_ic(s->scb)->flags & CF_EOI)) - sc_ic(s->scb)->flags |= (CF_EOI|CF_READ_PARTIAL); + sc_ic(s->scb)->flags |= (CF_EOI|CF_READ_EVENT); /* catch all sync connect while the mux is not already installed */ if (!srv_conn->mux && !(srv_conn->flags & CO_FL_WAIT_XPRT)) { diff --git a/src/channel.c b/src/channel.c index 997057533..62fff1b0e 100644 --- a/src/channel.c +++ b/src/channel.c @@ -119,7 +119,7 @@ int ci_putchr(struct channel *chn, char c) *ci_tail(chn) = c; b_add(&chn->buf, 1); - chn->flags |= CF_READ_PARTIAL; + chn->flags |= CF_READ_EVENT; if (chn->to_forward >= 1) { if (chn->to_forward != CHN_INFINITE_FORWARD) diff --git a/src/stconn.c b/src/stconn.c index b7588a5f7..2238f1d51 100644 --- a/src/stconn.c +++ b/src/stconn.c @@ -1186,19 +1186,22 @@ static void sc_notify(struct stconn *sc) (sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM))) { ic->rex = TICK_ETERNITY; } - else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL)) == CF_READ_PARTIAL) { + else if ((ic->flags & (CF_SHUTR|CF_READ_EVENT)) == CF_READ_EVENT) { /* we must re-enable reading if sc_chk_snd() has freed some space */ if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex)) ic->rex = tick_add_ifset(now_ms, ic->rto); } /* wake the task up only when needed */ - if (/* changes on the production side */ - (ic->flags & (CF_READ_EVENT|CF_READ_ERROR)) || - !sc_state_in(sc->state, SC_SB_CON|SC_SB_RDY|SC_SB_EST) || - sc_ep_test(sc, SE_FL_ERROR) || - ((ic->flags & CF_READ_PARTIAL) && - ((ic->flags & CF_EOI) || !ic->to_forward || sco->state != SC_ST_EST)) || + if (/* changes on the production side that must be handled: + * - An error on receipt: CF_READ_ERROR or SE_FL_ERROR + * - A read event: shutdown for reads (CF_READ_EVENT + SHUTR) + * end of input (CF_READ_EVENT + CF_EOI) + * data received and no fast-forwarding (CF_READ_EVENT + !to_forward) + * read event while consumer side is not established (CF_READ_EVENT + sco->state != SC_ST_EST) + */ + ((ic->flags & CF_READ_EVENT) && ((ic->flags & (CF_SHUTR|CF_EOI)) || !ic->to_forward || sco->state != SC_ST_EST)) || + (ic->flags & CF_READ_ERROR) || sc_ep_test(sc, SE_FL_ERROR) || /* changes on the consumption side */ (oc->flags & (CF_WRITE_EVENT|CF_WRITE_ERROR)) || @@ -1371,7 +1374,7 @@ static int sc_conn_recv(struct stconn *sc) ic->to_forward -= ret; ic->total += ret; cur_read += ret; - ic->flags |= CF_READ_PARTIAL; + ic->flags |= CF_READ_EVENT; } if (sc_ep_test(sc, SE_FL_EOS | SE_FL_ERROR)) @@ -1455,7 +1458,7 @@ static int sc_conn_recv(struct stconn *sc) /* Add READ_PARTIAL because some data are pending but * cannot be xferred to the channel */ - ic->flags |= CF_READ_PARTIAL; + ic->flags |= CF_READ_EVENT; } if (ret <= 0) { @@ -1482,7 +1485,7 @@ static int sc_conn_recv(struct stconn *sc) c_adv(ic, fwd); } - ic->flags |= CF_READ_PARTIAL; + ic->flags |= CF_READ_EVENT; ic->total += ret; /* End-of-input reached, we can leave. In this case, it is @@ -1577,7 +1580,7 @@ static int sc_conn_recv(struct stconn *sc) /* Report EOI on the channel if it was reached from the mux point of * view. */ if (sc_ep_test(sc, SE_FL_EOI) && !(ic->flags & CF_EOI)) { - ic->flags |= (CF_EOI|CF_READ_PARTIAL); + ic->flags |= (CF_EOI|CF_READ_EVENT); ret = 1; } @@ -1880,7 +1883,7 @@ static int sc_conn_process(struct stconn *sc) * care of it. */ if (sc_ep_test(sc, SE_FL_EOI) && !(ic->flags & CF_EOI)) - ic->flags |= (CF_EOI|CF_READ_PARTIAL); + ic->flags |= (CF_EOI|CF_READ_EVENT); /* Second step : update the stream connector and channels, try to forward any * pending data, then possibly wake the stream up based on the new diff --git a/src/stream.c b/src/stream.c index 438d3ce08..79815bd6b 100644 --- a/src/stream.c +++ b/src/stream.c @@ -294,7 +294,7 @@ int stream_upgrade_from_sc(struct stconn *sc, struct buffer *input) s->req.buf = *input; *input = BUF_NULL; s->req.total = (IS_HTX_STRM(s) ? htxbuf(&s->req.buf)->data : b_data(&s->req.buf)); - s->req.flags |= (s->req.total ? CF_READ_PARTIAL : 0); + s->req.flags |= (s->req.total ? CF_READ_EVENT : 0); } s->flags &= ~SF_IGNORE; @@ -567,7 +567,7 @@ struct stream *stream_new(struct session *sess, struct stconn *sc, struct buffer s->req.buf = *input; *input = BUF_NULL; s->req.total = (IS_HTX_STRM(s) ? htxbuf(&s->req.buf)->data : b_data(&s->req.buf)); - s->req.flags |= (s->req.total ? CF_READ_PARTIAL : 0); + s->req.flags |= (s->req.total ? CF_READ_EVENT : 0); } /* it is important not to call the wakeup function directly but to @@ -1516,7 +1516,7 @@ int stream_set_http_mode(struct stream *s, const struct mux_proto_list *mux_prot } sc_conn_commit_endp_upgrade(sc); - s->req.flags &= ~(CF_READ_PARTIAL|CF_AUTO_CONNECT); + s->req.flags &= ~(CF_READ_EVENT|CF_AUTO_CONNECT); s->req.total = 0; s->flags |= SF_IGNORE; if (sc_ep_test(sc, SE_FL_DETACHED)) { @@ -1553,8 +1553,8 @@ static void stream_update_both_sc(struct stream *s) struct channel *req = &s->req; struct channel *res = &s->res; - req->flags &= ~(CF_READ_EVENT|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_EVENT|CF_WRITE_PARTIAL); - res->flags &= ~(CF_READ_EVENT|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_EVENT|CF_WRITE_PARTIAL); + req->flags &= ~(CF_READ_EVENT|CF_READ_ATTACHED|CF_WRITE_EVENT|CF_WRITE_PARTIAL); + res->flags &= ~(CF_READ_EVENT|CF_READ_ATTACHED|CF_WRITE_EVENT|CF_WRITE_PARTIAL); s->prev_conn_state = scb->state; @@ -1710,7 +1710,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) * to a bogus analyser or the fact that we're ignoring a read0. The * call_rate counter only counts calls with no progress made. */ - if (!((req->flags | res->flags) & (CF_READ_PARTIAL|CF_WRITE_PARTIAL))) { + if (!((req->flags | res->flags) & (CF_READ_EVENT|CF_WRITE_PARTIAL))) { rate = update_freq_ctr(&s->call_rate, 1); if (rate >= 100000 && s->call_rate.prev_ctr) // make sure to wait at least a full second stream_dump_and_crash(&s->obj_type, read_freq_ctr(&s->call_rate));