MAJOR: stream: use a regular ->update for all stream interfaces

Now si->update() is used to update any type of stream interface, whether
it's an applet, a connection or even nothing. We don't call si_applet_call()
anymore at the end of the resync and we don't have the risk that the
stream's task is reinserted into the run queue, which makes the code
a bit simpler.

The stream_int_update_applet() function was simplified to ensure that
it remained compatible with this standardized calling convention. It
was almost copy-pasted from the update code dedicated to connections.
Just like for si_applet_done(), it seems that it should be possible to
merge the two functions except that it would require some slow operations,
except maybe if the type of end point is tested inside the update function
itself.
This commit is contained in:
Willy Tarreau 2015-04-19 18:13:56 +02:00
parent 828824af05
commit 563cc37609
2 changed files with 63 additions and 94 deletions

View File

@ -2262,10 +2262,10 @@ struct task *process_stream(struct task *t)
if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED))
stream_process_counters(s);
if (si_f->state == SI_ST_EST && obj_type(si_f->end) != OBJ_TYPE_APPCTX)
if (si_f->state == SI_ST_EST)
si_update(si_f);
if (si_b->state == SI_ST_EST && obj_type(si_b->end) != OBJ_TYPE_APPCTX)
if (si_b->state == SI_ST_EST)
si_update(si_b);
req->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_READ_ATTACHED);
@ -2289,23 +2289,6 @@ struct task *process_stream(struct task *t)
req->rex = TICK_ETERNITY;
}
/* When any of the stream interfaces is attached to an applet,
* we have to call it here. Note that this one may wake the
* task up again. If at least one applet was called, the current
* task might have been woken up, in which case we don't want it
* to be requeued to the wait queue but rather to the run queue
* to run ASAP. The bitwise "or" in the condition ensures that
* both functions are always called and that we wake up if at
* least one did something.
*/
if ((si_applet_call(si_b) | si_applet_call(si_f)) != 0) {
if (task_in_rq(t)) {
t->expire = TICK_ETERNITY;
stream_release_buffers(s);
return t;
}
}
update_exp_and_leave:
t->expire = tick_first(tick_first(req->rex, req->wex),
tick_first(res->rex, res->wex));

View File

@ -1466,89 +1466,75 @@ void si_applet_done(struct stream_interface *si)
stream_release_buffers(si_strm(si));
}
/* default update function for applets, to be used at the end of the i/o handler */
static void stream_int_update_applet(struct stream_interface *si)
/* updates the timers and flags of a stream interface attached to an applet.
* it's called from the upper layers after the buffers/channels have been
* updated.
*/
void stream_int_update_applet(struct stream_interface *si)
{
int old_flags = si->flags;
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
DPRINTF(stderr, "%s: si=%p, si->state=%d ic->flags=%08x oc->flags=%08x\n",
__FUNCTION__,
si, si->state, ic->flags, oc->flags);
if (si->state != SI_ST_EST)
return;
if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW &&
channel_is_empty(oc))
si_shutw(si);
if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && channel_may_recv(oc))
si->flags |= SI_FL_WAIT_DATA;
/* we're almost sure that we need some space if the buffer is not
* empty, even if it's not full, because the applets can't fill it.
*/
if ((ic->flags & (CF_SHUTR|CF_DONT_READ)) == 0 && !channel_is_empty(ic))
si->flags |= SI_FL_WAIT_ROOM;
if (oc->flags & CF_WRITE_ACTIVITY) {
if (tick_isset(oc->wex))
oc->wex = tick_add_ifset(now_ms, oc->wto);
}
if (ic->flags & CF_READ_ACTIVITY ||
(oc->flags & CF_WRITE_ACTIVITY && !(si->flags & SI_FL_INDEP_STR))) {
if (tick_isset(ic->rex))
ic->rex = tick_add_ifset(now_ms, ic->rto);
}
/* save flags to detect changes */
old_flags = si->flags;
if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL &&
channel_may_recv(oc) &&
(si_opposite(si)->flags & SI_FL_WAIT_ROOM)))
si_chk_rcv(si_opposite(si));
if (((ic->flags & CF_READ_PARTIAL) && !channel_is_empty(ic)) &&
(ic->pipe /* always try to send spliced data */ ||
(ic->buf->i == 0 && (si_opposite(si)->flags & SI_FL_WAIT_DATA)))) {
si_chk_snd(si_opposite(si));
/* check if the consumer has freed some space */
if (channel_may_recv(ic) && !ic->pipe)
/* Check if we need to close the read side */
if (!(ic->flags & CF_SHUTR)) {
/* Read not closed, update FD status and timeout for reads */
if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) {
/* stop reading */
if (!(si->flags & SI_FL_WAIT_ROOM)) {
if (!(ic->flags & CF_DONT_READ)) /* full */
si->flags |= SI_FL_WAIT_ROOM;
ic->rex = TICK_ETERNITY;
}
}
else {
/* (re)start reading and update timeout. Note: we don't recompute the timeout
* everytime we get here, otherwise it would risk never to expire. We only
* update it if is was not yet set. The stream socket handler will already
* have updated it if there has been a completed I/O.
*/
si->flags &= ~SI_FL_WAIT_ROOM;
if (!(ic->flags & (CF_READ_NOEXP|CF_DONT_READ)) && !tick_isset(ic->rex))
ic->rex = tick_add_ifset(now_ms, ic->rto);
}
}
/* Note that we're trying to wake up in two conditions here :
* - special event, which needs the holder task attention
* - status indicating that the applet can go on working. This
* is rather hard because we might be blocking on output and
* don't want to wake up on input and vice-versa. The idea is
* to only rely on the changes the chk_* might have performed.
*/
if (/* check stream interface changes */
((old_flags & ~si->flags) & (SI_FL_WAIT_ROOM|SI_FL_WAIT_DATA)) ||
/* changes on the production side */
(ic->flags & (CF_READ_NULL|CF_READ_ERROR)) ||
si->state != SI_ST_EST ||
(si->flags & SI_FL_ERR) ||
((ic->flags & CF_READ_PARTIAL) &&
(!ic->to_forward || si_opposite(si)->state != SI_ST_EST)) ||
/* changes on the consumption side */
(oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
((oc->flags & CF_WRITE_ACTIVITY) &&
((oc->flags & CF_SHUTW) ||
((oc->flags & CF_WAKE_WRITE) &&
(si_opposite(si)->state != SI_ST_EST ||
(channel_is_empty(oc) && !oc->to_forward)))))) {
if (!(si->flags & SI_FL_DONT_WAKE))
task_wakeup(si_task(si), TASK_WOKEN_IO);
/* Check if we need to close the write side */
if (!(oc->flags & CF_SHUTW)) {
/* Write not closed, update FD status and timeout for writes */
if (channel_is_empty(oc)) {
/* stop writing */
if (!(si->flags & SI_FL_WAIT_DATA)) {
if ((oc->flags & CF_SHUTW_NOW) == 0)
si->flags |= SI_FL_WAIT_DATA;
oc->wex = TICK_ETERNITY;
}
}
else {
/* (re)start writing and update timeout. Note: we don't recompute the timeout
* everytime we get here, otherwise it would risk never to expire. We only
* update it if is was not yet set. The stream socket handler will already
* have updated it if there has been a completed I/O.
*/
si->flags &= ~SI_FL_WAIT_DATA;
if (!tick_isset(oc->wex)) {
oc->wex = tick_add_ifset(now_ms, oc->wto);
if (tick_isset(ic->rex) && !(si->flags & SI_FL_INDEP_STR)) {
/* Note: depending on the protocol, we don't know if we're waiting
* for incoming data or not. So in order to prevent the socket from
* expiring read timeouts during writes, we refresh the read timeout,
* except if it was already infinite or if we have explicitly setup
* independent streams.
*/
ic->rex = tick_add_ifset(now_ms, ic->rto);
}
}
}
}
if (ic->flags & CF_READ_ACTIVITY)
ic->flags &= ~CF_READ_DONTWAIT;
if (!(si->flags & (SI_FL_WAIT_ROOM|SI_FL_WAIT_DATA)) &&
!(ic->flags & CF_DONT_READ) &&
(!(ic->flags & CF_SHUTR) || !(oc->flags & CF_SHUTW)))
appctx_wakeup(si_appctx(si));
}
/*