MEDIUM: stream-int: unconditionally call si_chk_rcv() in update and notify

For a long time, stream_int_update() and stream_int_notify() used to only
conditionally call si_chk_rcv() based on state change detection. This
detection is not reliable and quite complex. With the new blocked flags
that si_chk_rcv() checks, it's much more reliable to always call the
function to take into account recent changes,and let it decide if it needs
to wake something up or not.

This also removes the calls to si_chk_rcv() that were performed in
si_update_both() since these ones are systematically performed in
stream_int_update() after updating the Rx flags.
This commit is contained in:
Willy Tarreau 2018-11-15 07:46:57 +01:00
parent abb5d4202f
commit 47baeb85d4

View File

@ -448,6 +448,7 @@ void stream_int_notify(struct stream_interface *si)
{
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
struct stream_interface *sio = si_opposite(si);
/* process consumer side */
if (channel_is_empty(oc)) {
@ -479,11 +480,10 @@ void stream_int_notify(struct stream_interface *si)
ic->rex = tick_add_ifset(now_ms, ic->rto);
}
if ((si_opposite(si)->flags & SI_FL_RXBLK_ROOM) &&
if ((sio->flags & SI_FL_RXBLK_ROOM) &&
((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL ||
channel_is_empty(oc))) {
si_opposite(si)->flags &= ~SI_FL_RXBLK_ROOM;
si_chk_rcv(si_opposite(si));
sio->flags &= ~SI_FL_RXBLK_ROOM;
}
/* Notify the other side when we've injected data into the IC that
@ -498,7 +498,7 @@ void stream_int_notify(struct stream_interface *si)
* an HTTP parser might need more data to complete the parsing.
*/
if (!channel_is_empty(ic) &&
(si_opposite(si)->flags & SI_FL_WAIT_DATA) &&
(sio->flags & SI_FL_WAIT_DATA) &&
(!(ic->flags & CF_EXPECT_MORE) || c_full(ic) || ci_data(ic) == 0 || ic->pipe)) {
int new_len, last_len;
@ -506,7 +506,7 @@ void stream_int_notify(struct stream_interface *si)
if (ic->pipe)
last_len += ic->pipe->data;
si_chk_snd(si_opposite(si));
si_chk_snd(sio);
new_len = co_data(ic);
if (ic->pipe)
@ -515,12 +515,13 @@ void stream_int_notify(struct stream_interface *si)
/* check if the consumer has freed some space either in the
* buffer or in the pipe.
*/
if (new_len < last_len) {
if (new_len < last_len)
si->flags &= ~SI_FL_RXBLK_ROOM;
si_chk_rcv(si);
}
}
si_chk_rcv(si);
si_chk_rcv(sio);
if (si->flags & SI_FL_RXBLK_ROOM) {
ic->rex = TICK_ETERNITY;
}
@ -536,14 +537,14 @@ void stream_int_notify(struct stream_interface *si)
(si->state != SI_ST_EST && si->state != SI_ST_CON) ||
(si->flags & SI_FL_ERR) ||
((ic->flags & CF_READ_PARTIAL) &&
(!ic->to_forward || si_opposite(si)->state != SI_ST_EST)) ||
(!ic->to_forward || sio->state != SI_ST_EST)) ||
/* changes on the consumption side */
(oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
((oc->flags & CF_WRITE_ACTIVITY) &&
((oc->flags & CF_SHUTW) ||
((oc->flags & CF_WAKE_WRITE) &&
(si_opposite(si)->state != SI_ST_EST ||
(sio->state != SI_ST_EST ||
(channel_is_empty(oc) && !oc->to_forward)))))) {
task_wakeup(si_task(si), TASK_WOKEN_IO);
}
@ -762,10 +763,10 @@ void stream_int_update(struct stream_interface *si)
* have updated it if there has been a completed I/O.
*/
si->flags &= ~SI_FL_RXBLK_ROOM;
si_chk_rcv(si);
if (!(ic->flags & (CF_READ_NOEXP|CF_DONT_READ)) && !tick_isset(ic->rex))
ic->rex = tick_add_ifset(now_ms, ic->rto);
}
si_chk_rcv(si);
}
else
si_rx_shut_blk(si);
@ -848,10 +849,6 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b
si_f->flags &= ~SI_FL_RXBLK_ROOM;
}
/* it's time to try to receive */
si_chk_rcv(si_f);
si_chk_rcv(si_b);
/* let's recompute both sides states */
if (si_f->state == SI_ST_EST)
stream_int_update(si_f);