diff --git a/src/stream.c b/src/stream.c index ed97e4886..15c5538f8 100644 --- a/src/stream.c +++ b/src/stream.c @@ -2262,10 +2262,10 @@ struct task *process_stream(struct task *t) if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED)) stream_process_counters(s); - if (si_f->state == SI_ST_EST && obj_type(si_f->end) != OBJ_TYPE_APPCTX) + if (si_f->state == SI_ST_EST) si_update(si_f); - if (si_b->state == SI_ST_EST && obj_type(si_b->end) != OBJ_TYPE_APPCTX) + if (si_b->state == SI_ST_EST) si_update(si_b); req->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_READ_ATTACHED); @@ -2289,23 +2289,6 @@ struct task *process_stream(struct task *t) req->rex = TICK_ETERNITY; } - /* When any of the stream interfaces is attached to an applet, - * we have to call it here. Note that this one may wake the - * task up again. If at least one applet was called, the current - * task might have been woken up, in which case we don't want it - * to be requeued to the wait queue but rather to the run queue - * to run ASAP. The bitwise "or" in the condition ensures that - * both functions are always called and that we wake up if at - * least one did something. - */ - if ((si_applet_call(si_b) | si_applet_call(si_f)) != 0) { - if (task_in_rq(t)) { - t->expire = TICK_ETERNITY; - stream_release_buffers(s); - return t; - } - } - update_exp_and_leave: t->expire = tick_first(tick_first(req->rex, req->wex), tick_first(res->rex, res->wex)); diff --git a/src/stream_interface.c b/src/stream_interface.c index 2fdd94f6b..55f0e2211 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -1466,89 +1466,75 @@ void si_applet_done(struct stream_interface *si) stream_release_buffers(si_strm(si)); } -/* default update function for applets, to be used at the end of the i/o handler */ -static void stream_int_update_applet(struct stream_interface *si) +/* updates the timers and flags of a stream interface attached to an applet. + * it's called from the upper layers after the buffers/channels have been + * updated. + */ +void stream_int_update_applet(struct stream_interface *si) { - int old_flags = si->flags; struct channel *ic = si_ic(si); struct channel *oc = si_oc(si); - DPRINTF(stderr, "%s: si=%p, si->state=%d ic->flags=%08x oc->flags=%08x\n", - __FUNCTION__, - si, si->state, ic->flags, oc->flags); - - if (si->state != SI_ST_EST) - return; - - if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW && - channel_is_empty(oc)) - si_shutw(si); - - if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && channel_may_recv(oc)) - si->flags |= SI_FL_WAIT_DATA; - - /* we're almost sure that we need some space if the buffer is not - * empty, even if it's not full, because the applets can't fill it. - */ - if ((ic->flags & (CF_SHUTR|CF_DONT_READ)) == 0 && !channel_is_empty(ic)) - si->flags |= SI_FL_WAIT_ROOM; - - if (oc->flags & CF_WRITE_ACTIVITY) { - if (tick_isset(oc->wex)) - oc->wex = tick_add_ifset(now_ms, oc->wto); - } - - if (ic->flags & CF_READ_ACTIVITY || - (oc->flags & CF_WRITE_ACTIVITY && !(si->flags & SI_FL_INDEP_STR))) { - if (tick_isset(ic->rex)) - ic->rex = tick_add_ifset(now_ms, ic->rto); - } - - /* save flags to detect changes */ - old_flags = si->flags; - if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL && - channel_may_recv(oc) && - (si_opposite(si)->flags & SI_FL_WAIT_ROOM))) - si_chk_rcv(si_opposite(si)); - - if (((ic->flags & CF_READ_PARTIAL) && !channel_is_empty(ic)) && - (ic->pipe /* always try to send spliced data */ || - (ic->buf->i == 0 && (si_opposite(si)->flags & SI_FL_WAIT_DATA)))) { - si_chk_snd(si_opposite(si)); - /* check if the consumer has freed some space */ - if (channel_may_recv(ic) && !ic->pipe) + /* Check if we need to close the read side */ + if (!(ic->flags & CF_SHUTR)) { + /* Read not closed, update FD status and timeout for reads */ + if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) { + /* stop reading */ + if (!(si->flags & SI_FL_WAIT_ROOM)) { + if (!(ic->flags & CF_DONT_READ)) /* full */ + si->flags |= SI_FL_WAIT_ROOM; + ic->rex = TICK_ETERNITY; + } + } + else { + /* (re)start reading and update timeout. Note: we don't recompute the timeout + * everytime we get here, otherwise it would risk never to expire. We only + * update it if is was not yet set. The stream socket handler will already + * have updated it if there has been a completed I/O. + */ si->flags &= ~SI_FL_WAIT_ROOM; + if (!(ic->flags & (CF_READ_NOEXP|CF_DONT_READ)) && !tick_isset(ic->rex)) + ic->rex = tick_add_ifset(now_ms, ic->rto); + } } - /* Note that we're trying to wake up in two conditions here : - * - special event, which needs the holder task attention - * - status indicating that the applet can go on working. This - * is rather hard because we might be blocking on output and - * don't want to wake up on input and vice-versa. The idea is - * to only rely on the changes the chk_* might have performed. - */ - if (/* check stream interface changes */ - ((old_flags & ~si->flags) & (SI_FL_WAIT_ROOM|SI_FL_WAIT_DATA)) || - - /* changes on the production side */ - (ic->flags & (CF_READ_NULL|CF_READ_ERROR)) || - si->state != SI_ST_EST || - (si->flags & SI_FL_ERR) || - ((ic->flags & CF_READ_PARTIAL) && - (!ic->to_forward || si_opposite(si)->state != SI_ST_EST)) || - - /* changes on the consumption side */ - (oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) || - ((oc->flags & CF_WRITE_ACTIVITY) && - ((oc->flags & CF_SHUTW) || - ((oc->flags & CF_WAKE_WRITE) && - (si_opposite(si)->state != SI_ST_EST || - (channel_is_empty(oc) && !oc->to_forward)))))) { - if (!(si->flags & SI_FL_DONT_WAKE)) - task_wakeup(si_task(si), TASK_WOKEN_IO); + /* Check if we need to close the write side */ + if (!(oc->flags & CF_SHUTW)) { + /* Write not closed, update FD status and timeout for writes */ + if (channel_is_empty(oc)) { + /* stop writing */ + if (!(si->flags & SI_FL_WAIT_DATA)) { + if ((oc->flags & CF_SHUTW_NOW) == 0) + si->flags |= SI_FL_WAIT_DATA; + oc->wex = TICK_ETERNITY; + } + } + else { + /* (re)start writing and update timeout. Note: we don't recompute the timeout + * everytime we get here, otherwise it would risk never to expire. We only + * update it if is was not yet set. The stream socket handler will already + * have updated it if there has been a completed I/O. + */ + si->flags &= ~SI_FL_WAIT_DATA; + if (!tick_isset(oc->wex)) { + oc->wex = tick_add_ifset(now_ms, oc->wto); + if (tick_isset(ic->rex) && !(si->flags & SI_FL_INDEP_STR)) { + /* Note: depending on the protocol, we don't know if we're waiting + * for incoming data or not. So in order to prevent the socket from + * expiring read timeouts during writes, we refresh the read timeout, + * except if it was already infinite or if we have explicitly setup + * independent streams. + */ + ic->rex = tick_add_ifset(now_ms, ic->rto); + } + } + } } - if (ic->flags & CF_READ_ACTIVITY) - ic->flags &= ~CF_READ_DONTWAIT; + + if (!(si->flags & (SI_FL_WAIT_ROOM|SI_FL_WAIT_DATA)) && + !(ic->flags & CF_DONT_READ) && + (!(ic->flags & CF_SHUTR) || !(oc->flags & CF_SHUTW))) + appctx_wakeup(si_appctx(si)); } /*