MEDIUM: stconn: Add two date to track successful reads and blocked sends

The stream endpoint descriptor now owns two date, lra (last read activity) and
fsb (first send blocked).

The first one is updated every time a read activity is reported, including data
received from the endpoint, successful connect, end of input and shutdown for
reads. A read activity is also reported when receives are unblocked. It will be
used to detect read timeouts.

The other one is updated when no data can be sent to the endpoint and reset
when some data are sent. It is the date of the first send blocked by the
endpoint. It will be used to detect write timeouts.

Helper functions are added to report read/send activity and to retrieve lra/fsb
date.
This commit is contained in:
Christopher Faulet 2023-02-16 11:09:31 +01:00
parent 5aaacfbccd
commit 4c13568b49
6 changed files with 102 additions and 7 deletions

View File

@ -309,8 +309,10 @@ static inline int sc_is_recv_allowed(const struct stconn *sc)
static inline void sc_chk_rcv(struct stconn *sc)
{
if (sc_ep_test(sc, SE_FL_APPLET_NEED_CONN) &&
sc_state_in(sc_opposite(sc)->state, SC_SB_RDY|SC_SB_EST|SC_SB_DIS|SC_SB_CLO))
sc_state_in(sc_opposite(sc)->state, SC_SB_RDY|SC_SB_EST|SC_SB_DIS|SC_SB_CLO)) {
sc_ep_clr(sc, SE_FL_APPLET_NEED_CONN);
sc_ep_report_read_activity(sc);
}
if (!sc_is_recv_allowed(sc))
return;

View File

@ -199,15 +199,26 @@ struct stconn;
* <se> is the stream endpoint, i.e. the mux stream or the appctx
* <conn> is the connection for connection-based streams
* <sc> is the stream connector we're attached to, or NULL
* <lra> is the last read activity
* <fsb> is the first send blocked
* <rex> is the expiration date for a read, in ticks
* <wex> is the expiration date for a write or connect, in ticks
* <flags> SE_FL_*
*/
*
* <lra> should be updated when a read activity is detected. It can be a
* sucessful receive, when a shutr is reported or when receives are
* unblocked.
* <fsb> should be updated when the first send of a series is blocked and reset
* when a successful send is reported.
*/
struct sedesc {
void *se;
struct connection *conn;
struct stconn *sc;
unsigned int flags;
unsigned int lra;
unsigned int fsb;
int rex;
int wex;
};

View File

@ -136,6 +136,56 @@ static forceinline uint sc_ep_get(const struct stconn *sc)
return se_fl_get(sc->sedesc);
}
/* Return the last read activity timestamp. May be TICK_ETERNITY */
static forceinline unsigned int sc_ep_lra(const struct stconn *sc)
{
return sc->sedesc->lra;
}
/* Return the first send blocked timestamp. May be TICK_ETERNITY */
static forceinline unsigned int sc_ep_fsb(const struct stconn *sc)
{
return sc->sedesc->fsb;
}
/* Report a read activity. This function sets <lra> to now_ms */
static forceinline void sc_ep_report_read_activity(struct stconn *sc)
{
sc->sedesc->lra = now_ms;
}
/* Report a send blocked. This function sets <fsb> to now_ms if it was not
* already set
*/
static forceinline void sc_ep_report_blocked_send(struct stconn *sc)
{
if (!tick_isset(sc->sedesc->fsb))
sc->sedesc->fsb = now_ms;
}
/* Report a send activity by setting <fsb> to TICK_ETERNITY.
* For non-independent stream, a read activity is reported.
*/
static forceinline void sc_ep_report_send_activity(struct stconn *sc)
{
sc->sedesc->fsb = TICK_ETERNITY;
if (!(sc->flags & SC_FL_INDEP_STR))
sc_ep_report_read_activity(sc);
}
static forceinline int sc_ep_rcv_ex(const struct stconn *sc)
{
return (tick_isset(sc->sedesc->lra)
? tick_add_ifset(sc->sedesc->lra, sc->ioto)
: TICK_ETERNITY);
}
static forceinline int sc_ep_snd_ex(const struct stconn *sc)
{
return (tick_isset(sc->sedesc->fsb)
? tick_add_ifset(sc->sedesc->fsb, sc->ioto)
: TICK_ETERNITY);
}
static forceinline int sc_ep_rex(const struct stconn *sc)
{
@ -345,11 +395,14 @@ static inline void se_have_no_more_data(struct sedesc *se)
}
/* The application layer informs a stream connector that it's willing to
* receive data from the endpoint.
* receive data from the endpoint. A read activity is reported.
*/
static inline void sc_will_read(struct stconn *sc)
{
sc->flags &= ~SC_FL_WONT_READ;
if (sc->flags & SC_FL_WONT_READ) {
sc->flags &= ~SC_FL_WONT_READ;
sc_ep_report_read_activity(sc);
}
}
/* The application layer informs a stream connector that it will not receive
@ -372,11 +425,14 @@ static inline void se_need_remote_conn(struct sedesc *se)
}
/* The application layer tells the stream connector that it just got the input
* buffer it was waiting for.
* buffer it was waiting for. A read activity is reported.
*/
static inline void sc_have_buff(struct stconn *sc)
{
sc->flags &= ~SC_FL_NEED_BUFF;
if (sc->flags & SC_FL_NEED_BUFF) {
sc->flags &= ~SC_FL_NEED_BUFF;
sc_ep_report_read_activity(sc);
}
}
/* The stream connector failed to get an input buffer and is waiting for it.
@ -392,10 +448,14 @@ static inline void sc_need_buff(struct stconn *sc)
/* Tell a stream connector some room was made in the input buffer and any
* failed attempt to inject data into it may be tried again. This is usually
* called after a successful transfer of buffer contents to the other side.
* A read activity is reported.
*/
static inline void sc_have_room(struct stconn *sc)
{
sc->flags &= ~SC_FL_NEED_ROOM;
if (sc->flags & SC_FL_NEED_ROOM) {
sc->flags &= ~SC_FL_NEED_ROOM;
sc_ep_report_read_activity(sc);
}
}
/* The stream connector announces it failed to put data into the input buffer

View File

@ -250,7 +250,17 @@ struct task *task_run_applet(struct task *t, void *context, unsigned int state)
if (count != co_data(sc_oc(sc))) {
sc_oc(sc)->flags |= CF_WRITE_EVENT | CF_WROTE_DATA;
sc_have_room(sc_opposite(sc));
sc_ep_report_send_activity(sc);
}
else {
if (sc_ep_test(sc, SE_FL_WONT_CONSUME))
sc_ep_report_send_activity(sc);
else
sc_ep_report_blocked_send(sc);
}
if (sc_ic(sc)->flags & CF_READ_EVENT)
sc_ep_report_read_activity(sc);
/* measure the call rate and check for anomalies when too high */
if (((b_size(sc_ib(sc)) && sc->flags & SC_FL_NEED_BUFF) || // asks for a buffer which is present

View File

@ -92,6 +92,8 @@ void sedesc_init(struct sedesc *sedesc)
sedesc->se = NULL;
sedesc->conn = NULL;
sedesc->sc = NULL;
sedesc->lra = TICK_ETERNITY;
sedesc->fsb = TICK_ETERNITY;
sedesc->rex = sedesc->wex = TICK_ETERNITY;
se_fl_setall(sedesc, SE_FL_NONE);
}
@ -533,6 +535,7 @@ static void sc_app_shutr(struct stconn *sc)
if (ic->flags & CF_SHUTR)
return;
ic->flags |= CF_SHUTR;
sc_ep_report_read_activity(sc);
sc_ep_reset_rex(sc);
if (!sc_state_in(sc->state, SC_SB_CON|SC_SB_RDY|SC_SB_EST))
@ -1238,6 +1241,7 @@ static void sc_conn_read0(struct stconn *sc)
if (ic->flags & CF_SHUTR)
return;
ic->flags |= CF_SHUTR;
sc_ep_report_read_activity(sc);
sc_ep_reset_rex(sc);
if (!sc_state_in(sc->state, SC_SB_CON|SC_SB_RDY|SC_SB_EST))
@ -1567,6 +1571,7 @@ static int sc_conn_recv(struct stconn *sc)
ic->xfer_large = 0;
}
ic->last_read = now_ms;
sc_ep_report_read_activity(sc);
}
end_recv:
@ -1575,6 +1580,7 @@ static int sc_conn_recv(struct stconn *sc)
/* Report EOI on the channel if it was reached from the mux point of
* view. */
if (sc_ep_test(sc, SE_FL_EOI) && !(ic->flags & CF_EOI)) {
sc_ep_report_read_activity(sc);
ic->flags |= (CF_EOI|CF_READ_EVENT);
ret = 1;
}
@ -1760,7 +1766,10 @@ static int sc_conn_send(struct stconn *sc)
sc->state = SC_ST_RDY;
sc_have_room(sc_opposite(sc));
sc_ep_report_send_activity(sc);
}
else
sc_ep_report_blocked_send(sc);
if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING)) {
oc->flags |= CF_WRITE_EVENT;

View File

@ -295,6 +295,7 @@ int stream_upgrade_from_sc(struct stconn *sc, struct buffer *input)
s->req.buf = *input;
*input = BUF_NULL;
s->req.total = (IS_HTX_STRM(s) ? htxbuf(&s->req.buf)->data : b_data(&s->req.buf));
sc_ep_report_read_activity(s->scf);
}
s->req.flags |= CF_READ_EVENT; /* Always report a read event */
@ -562,6 +563,7 @@ struct stream *stream_new(struct session *sess, struct stconn *sc, struct buffer
s->req.buf = *input;
*input = BUF_NULL;
s->req.total = (IS_HTX_STRM(s) ? htxbuf(&s->req.buf)->data : b_data(&s->req.buf));
sc_ep_report_read_activity(s->scf);
}
/* it is important not to call the wakeup function directly but to
@ -925,6 +927,7 @@ static void back_establish(struct stream *s)
se_have_more_data(s->scb->sedesc);
rep->flags |= CF_READ_EVENT; /* producer is now attached */
sc_ep_report_read_activity(s->scb);
if (conn) {
/* real connections have timeouts
* if already defined, it means that a set-timeout rule has