mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-03-04 18:39:37 +00:00
MEDIUM: stream-int: add a new function si_applet_done()
This is the equivalent of si_conn_wake() but for applets. It will be called after changes to the stream interface are brought by the applet I/O handler. Ultimately it will release buffers and may be even wake the stream's task up if some important changes are detected. It would be nice to be able to merge it with the connection's wake function since it mostly manipulates the stream interface, but there are minor differences (such as how to enable/disable polling on a fd vs applet) and some specificities to applets (eg: don't wake the applet up until the output is empty) which would require abstract functions which would slow down everything.
This commit is contained in:
parent
3c595ac3ad
commit
e5f8649102
@ -47,6 +47,7 @@ extern struct data_cb si_idle_conn_cb;
|
||||
|
||||
struct appctx *stream_int_register_handler(struct stream_interface *si, struct applet *app);
|
||||
void stream_int_unregister_handler(struct stream_interface *si);
|
||||
void si_applet_done(struct stream_interface *si);
|
||||
|
||||
/* returns the channel which receives data from this stream interface (input channel) */
|
||||
static inline struct channel *si_ic(struct stream_interface *si)
|
||||
|
@ -1369,6 +1369,102 @@ void stream_sock_read0(struct stream_interface *si)
|
||||
return;
|
||||
}
|
||||
|
||||
/* notifies the stream interface that the applet has completed its work */
|
||||
void si_applet_done(struct stream_interface *si)
|
||||
{
|
||||
struct channel *ic = si_ic(si);
|
||||
struct channel *oc = si_oc(si);
|
||||
|
||||
/* process consumer side */
|
||||
if (channel_is_empty(oc)) {
|
||||
if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) &&
|
||||
(si->state == SI_ST_EST))
|
||||
stream_int_shutw_applet(si);
|
||||
oc->wex = TICK_ETERNITY;
|
||||
}
|
||||
|
||||
/* indicate that we may be waiting for data from the output channel */
|
||||
if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && channel_may_recv(oc))
|
||||
si->flags |= SI_FL_WAIT_DATA;
|
||||
|
||||
/* update OC timeouts and wake the other side up if it's waiting for room */
|
||||
if (oc->flags & CF_WRITE_ACTIVITY) {
|
||||
if ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL &&
|
||||
!channel_is_empty(oc))
|
||||
if (tick_isset(oc->wex))
|
||||
oc->wex = tick_add_ifset(now_ms, oc->wto);
|
||||
|
||||
if (!(si->flags & SI_FL_INDEP_STR))
|
||||
if (tick_isset(ic->rex))
|
||||
ic->rex = tick_add_ifset(now_ms, ic->rto);
|
||||
|
||||
if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL &&
|
||||
channel_may_recv(oc) &&
|
||||
(si_opposite(si)->flags & SI_FL_WAIT_ROOM)))
|
||||
si_chk_rcv(si_opposite(si));
|
||||
}
|
||||
|
||||
/* Notify the other side when we've injected data into the IC that
|
||||
* needs to be forwarded. We can do fast-forwarding as soon as there
|
||||
* are output data, but we avoid doing this for partial buffers,
|
||||
* because it is very likely that it will be done again immediately
|
||||
* afterwards once the following data are parsed (eg: HTTP chunking).
|
||||
* We only remove SI_FL_WAIT_ROOM once we've emptied the whole output
|
||||
* buffer, because applets are often forced to stop before the buffer
|
||||
* is full. We must not stop based on input data alone because an HTTP
|
||||
* parser might need more data to complete the parsing.
|
||||
*/
|
||||
if (!channel_is_empty(ic) &&
|
||||
(si_opposite(si)->flags & SI_FL_WAIT_DATA) &&
|
||||
(si_ib(si)->i == 0 || ic->pipe)) {
|
||||
si_chk_snd(si_opposite(si));
|
||||
if (channel_is_empty(ic))
|
||||
si->flags &= ~SI_FL_WAIT_ROOM;
|
||||
}
|
||||
|
||||
if (si->flags & SI_FL_WAIT_ROOM) {
|
||||
ic->rex = TICK_ETERNITY;
|
||||
}
|
||||
else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL &&
|
||||
channel_may_recv(ic)) {
|
||||
/* we must re-enable reading if si_chk_snd() has freed some space */
|
||||
if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex))
|
||||
ic->rex = tick_add_ifset(now_ms, ic->rto);
|
||||
}
|
||||
|
||||
/* get away from the active list if we can't work anymore, that is
|
||||
* we're blocked both for reads or writes or once both sides are closed.
|
||||
* FIXME: we may have a problem here with bidirectional applets which
|
||||
* might block on a single direction while the other one is still free.
|
||||
*/
|
||||
if ((si->flags & (SI_FL_WAIT_ROOM|SI_FL_WAIT_DATA)) ||
|
||||
(ic->flags & CF_DONT_READ) ||
|
||||
(ic->flags & CF_SHUTR && oc->flags & CF_SHUTW))
|
||||
appctx_pause(si_appctx(si));
|
||||
|
||||
/* wake the task up only when needed */
|
||||
if (/* changes on the production side */
|
||||
(ic->flags & (CF_READ_NULL|CF_READ_ERROR)) ||
|
||||
si->state != SI_ST_EST ||
|
||||
(si->flags & SI_FL_ERR) ||
|
||||
((ic->flags & CF_READ_PARTIAL) &&
|
||||
(!ic->to_forward || si_opposite(si)->state != SI_ST_EST)) ||
|
||||
|
||||
/* changes on the consumption side */
|
||||
(oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
|
||||
((oc->flags & CF_WRITE_ACTIVITY) &&
|
||||
((oc->flags & CF_SHUTW) ||
|
||||
((oc->flags & CF_WAKE_WRITE) &&
|
||||
(si_opposite(si)->state != SI_ST_EST ||
|
||||
(channel_is_empty(oc) && !oc->to_forward)))))) {
|
||||
task_wakeup(si_task(si), TASK_WOKEN_IO);
|
||||
}
|
||||
if (ic->flags & CF_READ_ACTIVITY)
|
||||
ic->flags &= ~CF_READ_DONTWAIT;
|
||||
|
||||
stream_release_buffers(si_strm(si));
|
||||
}
|
||||
|
||||
/* default update function for applets, to be used at the end of the i/o handler */
|
||||
static void stream_int_update_applet(struct stream_interface *si)
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user