diff --git a/src/stream_interface.c b/src/stream_interface.c index dbc481f79..a13311cf9 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -628,11 +628,11 @@ void stream_int_notify(struct stream_interface *si) } -/* Callback to be used by connection I/O handlers upon completion. It differs from - * the update function in that it is designed to be called by lower layers after I/O - * events have been completed. It will also try to wake the associated task up if - * an important event requires special handling. It relies on the connection handler - * to commit any polling updates. The function always returns 0. +/* Callback to be used by connection I/O handlers upon completion. It propagates + * connection flags to the stream interface, updates the stream (which may or + * may not take this opportunity to try to forward data), then update the + * connection's polling based on the channels and stream interface's final + * states. The function always returns 0. */ static int si_conn_wake_cb(struct connection *conn) { @@ -640,114 +640,37 @@ static int si_conn_wake_cb(struct connection *conn) struct channel *ic = si_ic(si); struct channel *oc = si_oc(si); - DPRINTF(stderr, "%s: si=%p, si->state=%d ic->flags=%08x oc->flags=%08x\n", - __FUNCTION__, - si, si->state, ic->flags, oc->flags); - + /* First step, report to the stream-int what was detected at the + * connection layer : errors and connection establishment. + */ if (conn->flags & CO_FL_ERROR) si->flags |= SI_FL_ERR; - /* check for recent connection establishment */ if (unlikely(!(conn->flags & (CO_FL_WAIT_L4_CONN | CO_FL_WAIT_L6_CONN | CO_FL_CONNECTED)))) { si->exp = TICK_ETERNITY; oc->flags |= CF_WRITE_NULL; } - /* process consumer side */ - if (channel_is_empty(oc)) { - if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) && - (si->state == SI_ST_EST)) - stream_int_shutw_conn(si); - __conn_data_stop_send(conn); - oc->wex = TICK_ETERNITY; - } - - if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && channel_may_recv(oc)) - si->flags |= SI_FL_WAIT_DATA; - - if (oc->flags & CF_WRITE_ACTIVITY) { - /* update timeouts if we have written something */ - if ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL && - !channel_is_empty(oc)) - if (tick_isset(oc->wex)) - oc->wex = tick_add_ifset(now_ms, oc->wto); - - if (!(si->flags & SI_FL_INDEP_STR)) - if (tick_isset(ic->rex)) - ic->rex = tick_add_ifset(now_ms, ic->rto); - - if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL && - channel_may_recv(oc) && - (si_opposite(si)->flags & SI_FL_WAIT_ROOM))) - si_chk_rcv(si_opposite(si)); - } - - /* Notify the other side when we've injected data into the IC that - * needs to be forwarded. We can do fast-forwarding as soon as there - * are output data, but we avoid doing this if some of the data are - * not yet scheduled for being forwarded, because it is very likely - * that it will be done again immediately afterwards once the following - * data are parsed (eg: HTTP chunking). We only SI_FL_WAIT_ROOM once - * we've emptied *some* of the output buffer, and not just when there - * is available room, because applets are often forced to stop before - * the buffer is full. We must not stop based on input data alone because - * an HTTP parser might need more data to complete the parsing. + /* Second step : update the stream-int and channels, try to forward any + * pending data, then possibly wake the stream up based on the new + * stream-int status. */ - if (!channel_is_empty(ic) && - (si_opposite(si)->flags & SI_FL_WAIT_DATA) && - (ic->buf->i == 0 || ic->pipe)) { - int new_len, last_len; + stream_int_notify(si); - last_len = ic->buf->o; - if (ic->pipe) - last_len += ic->pipe->data; + /* Third step : update the connection's polling status based on what + * was done above (eg: maybe some buffers got emptied). + */ + if (channel_is_empty(oc)) + __conn_data_stop_send(conn); - si_chk_snd(si_opposite(si)); - - new_len = ic->buf->o; - if (ic->pipe) - new_len += ic->pipe->data; - - /* check if the consumer has freed some space either in the - * buffer or in the pipe. - */ - if (channel_may_recv(ic) && new_len < last_len) - si->flags &= ~SI_FL_WAIT_ROOM; - } if (si->flags & SI_FL_WAIT_ROOM) { __conn_data_stop_recv(conn); - ic->rex = TICK_ETERNITY; } else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL && channel_may_recv(ic)) { - /* we must re-enable reading if si_chk_snd() has freed some space */ __conn_data_want_recv(conn); - if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex)) - ic->rex = tick_add_ifset(now_ms, ic->rto); } - - /* wake the task up only when needed */ - if (/* changes on the production side */ - (ic->flags & (CF_READ_NULL|CF_READ_ERROR)) || - si->state != SI_ST_EST || - (si->flags & SI_FL_ERR) || - ((ic->flags & CF_READ_PARTIAL) && - (!ic->to_forward || si_opposite(si)->state != SI_ST_EST)) || - - /* changes on the consumption side */ - (oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) || - ((oc->flags & CF_WRITE_ACTIVITY) && - ((oc->flags & CF_SHUTW) || - ((oc->flags & CF_WAKE_WRITE) && - (si_opposite(si)->state != SI_ST_EST || - (channel_is_empty(oc) && !oc->to_forward)))))) { - task_wakeup(si_task(si), TASK_WOKEN_IO); - } - if (ic->flags & CF_READ_ACTIVITY) - ic->flags &= ~CF_READ_DONTWAIT; - - stream_release_buffers(si_strm(si)); return 0; } @@ -1500,91 +1423,15 @@ void stream_sock_read0(struct stream_interface *si) return; } -/* notifies the stream interface that the applet has completed its work */ +/* Callback to be used by applet handlers upon completion. It updates the stream + * (which may or may not take this opportunity to try to forward data), then + * may disable the applet's based on the channels and stream interface's final + * states. + */ void si_applet_done(struct stream_interface *si) { - struct channel *ic = si_ic(si); - struct channel *oc = si_oc(si); - - /* process consumer side */ - if (channel_is_empty(oc)) { - if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) && - (si->state == SI_ST_EST)) - stream_int_shutw_applet(si); - oc->wex = TICK_ETERNITY; - } - - /* indicate that we may be waiting for data from the output channel */ - if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && channel_may_recv(oc)) - si->flags |= SI_FL_WAIT_DATA; - - /* update OC timeouts and wake the other side up if it's waiting for room */ - if (oc->flags & CF_WRITE_ACTIVITY) { - if ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL && - !channel_is_empty(oc)) - if (tick_isset(oc->wex)) - oc->wex = tick_add_ifset(now_ms, oc->wto); - - if (!(si->flags & SI_FL_INDEP_STR)) - if (tick_isset(ic->rex)) - ic->rex = tick_add_ifset(now_ms, ic->rto); - - if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL && - channel_may_recv(oc) && - (si_opposite(si)->flags & SI_FL_WAIT_ROOM))) - si_chk_rcv(si_opposite(si)); - } - - /* Notify the other side when we've injected data into the IC that - * needs to be forwarded. We can do fast-forwarding as soon as there - * are output data, but we avoid doing this for partial buffers, - * because it is very likely that it will be done again immediately - * afterwards once the following data are parsed (eg: HTTP chunking). - * We only remove SI_FL_WAIT_ROOM once we've emptied the whole output - * buffer, because applets are often forced to stop before the buffer - * is full. We must not stop based on input data alone because an HTTP - * parser might need more data to complete the parsing. - */ - if (!channel_is_empty(ic) && - (si_opposite(si)->flags & SI_FL_WAIT_DATA) && - (si_ib(si)->i == 0 || ic->pipe)) { - si_chk_snd(si_opposite(si)); - if (channel_is_empty(ic)) - si->flags &= ~SI_FL_WAIT_ROOM; - } - - if (si->flags & SI_FL_WAIT_ROOM) { - ic->rex = TICK_ETERNITY; - } - else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL && - channel_may_recv(ic)) { - /* we must re-enable reading if si_chk_snd() has freed some space */ - if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex)) - ic->rex = tick_add_ifset(now_ms, ic->rto); - } - - /* wake the task up only when needed */ - if (/* changes on the production side */ - (ic->flags & (CF_READ_NULL|CF_READ_ERROR)) || - si->state != SI_ST_EST || - (si->flags & SI_FL_ERR) || - ((ic->flags & CF_READ_PARTIAL) && - (!ic->to_forward || si_opposite(si)->state != SI_ST_EST)) || - - /* changes on the consumption side */ - (oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) || - ((oc->flags & CF_WRITE_ACTIVITY) && - ((oc->flags & CF_SHUTW) || - ((oc->flags & CF_WAKE_WRITE) && - (si_opposite(si)->state != SI_ST_EST || - (channel_is_empty(oc) && !oc->to_forward)))))) { - task_wakeup(si_task(si), TASK_WOKEN_IO); - } - - if (ic->flags & CF_READ_ACTIVITY) - ic->flags &= ~CF_READ_DONTWAIT; - - stream_release_buffers(si_strm(si)); + /* update the stream-int, channels, and possibly wake the stream up */ + stream_int_notify(si); /* Get away from the active list if we can't work anymore. * We also do that if the main task has already scheduled, because it