mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2024-12-13 15:04:42 +00:00
MEDIUM: stream-int: use the same stream notification function for applets and conns
The code to report completion after a connection update or an applet update was almost the same since applets stole it from the connection. But the differences made them hard to maintain and prevented the creation of new functions doing only one part of the work. This patch replaces the common code from the si_conn_wake_cb() and si_applet_wake_cb() with a single call to stream_int_notify() which only notifies the stream (si+channels+task) from the outside. No functional change was made beyond this.
This commit is contained in:
parent
615f28bec1
commit
651e18292d
@ -628,11 +628,11 @@ void stream_int_notify(struct stream_interface *si)
|
||||
}
|
||||
|
||||
|
||||
/* Callback to be used by connection I/O handlers upon completion. It differs from
|
||||
* the update function in that it is designed to be called by lower layers after I/O
|
||||
* events have been completed. It will also try to wake the associated task up if
|
||||
* an important event requires special handling. It relies on the connection handler
|
||||
* to commit any polling updates. The function always returns 0.
|
||||
/* Callback to be used by connection I/O handlers upon completion. It propagates
|
||||
* connection flags to the stream interface, updates the stream (which may or
|
||||
* may not take this opportunity to try to forward data), then update the
|
||||
* connection's polling based on the channels and stream interface's final
|
||||
* states. The function always returns 0.
|
||||
*/
|
||||
static int si_conn_wake_cb(struct connection *conn)
|
||||
{
|
||||
@ -640,114 +640,37 @@ static int si_conn_wake_cb(struct connection *conn)
|
||||
struct channel *ic = si_ic(si);
|
||||
struct channel *oc = si_oc(si);
|
||||
|
||||
DPRINTF(stderr, "%s: si=%p, si->state=%d ic->flags=%08x oc->flags=%08x\n",
|
||||
__FUNCTION__,
|
||||
si, si->state, ic->flags, oc->flags);
|
||||
|
||||
/* First step, report to the stream-int what was detected at the
|
||||
* connection layer : errors and connection establishment.
|
||||
*/
|
||||
if (conn->flags & CO_FL_ERROR)
|
||||
si->flags |= SI_FL_ERR;
|
||||
|
||||
/* check for recent connection establishment */
|
||||
if (unlikely(!(conn->flags & (CO_FL_WAIT_L4_CONN | CO_FL_WAIT_L6_CONN | CO_FL_CONNECTED)))) {
|
||||
si->exp = TICK_ETERNITY;
|
||||
oc->flags |= CF_WRITE_NULL;
|
||||
}
|
||||
|
||||
/* process consumer side */
|
||||
if (channel_is_empty(oc)) {
|
||||
if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) &&
|
||||
(si->state == SI_ST_EST))
|
||||
stream_int_shutw_conn(si);
|
||||
__conn_data_stop_send(conn);
|
||||
oc->wex = TICK_ETERNITY;
|
||||
}
|
||||
|
||||
if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && channel_may_recv(oc))
|
||||
si->flags |= SI_FL_WAIT_DATA;
|
||||
|
||||
if (oc->flags & CF_WRITE_ACTIVITY) {
|
||||
/* update timeouts if we have written something */
|
||||
if ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL &&
|
||||
!channel_is_empty(oc))
|
||||
if (tick_isset(oc->wex))
|
||||
oc->wex = tick_add_ifset(now_ms, oc->wto);
|
||||
|
||||
if (!(si->flags & SI_FL_INDEP_STR))
|
||||
if (tick_isset(ic->rex))
|
||||
ic->rex = tick_add_ifset(now_ms, ic->rto);
|
||||
|
||||
if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL &&
|
||||
channel_may_recv(oc) &&
|
||||
(si_opposite(si)->flags & SI_FL_WAIT_ROOM)))
|
||||
si_chk_rcv(si_opposite(si));
|
||||
}
|
||||
|
||||
/* Notify the other side when we've injected data into the IC that
|
||||
* needs to be forwarded. We can do fast-forwarding as soon as there
|
||||
* are output data, but we avoid doing this if some of the data are
|
||||
* not yet scheduled for being forwarded, because it is very likely
|
||||
* that it will be done again immediately afterwards once the following
|
||||
* data are parsed (eg: HTTP chunking). We only SI_FL_WAIT_ROOM once
|
||||
* we've emptied *some* of the output buffer, and not just when there
|
||||
* is available room, because applets are often forced to stop before
|
||||
* the buffer is full. We must not stop based on input data alone because
|
||||
* an HTTP parser might need more data to complete the parsing.
|
||||
/* Second step : update the stream-int and channels, try to forward any
|
||||
* pending data, then possibly wake the stream up based on the new
|
||||
* stream-int status.
|
||||
*/
|
||||
if (!channel_is_empty(ic) &&
|
||||
(si_opposite(si)->flags & SI_FL_WAIT_DATA) &&
|
||||
(ic->buf->i == 0 || ic->pipe)) {
|
||||
int new_len, last_len;
|
||||
stream_int_notify(si);
|
||||
|
||||
last_len = ic->buf->o;
|
||||
if (ic->pipe)
|
||||
last_len += ic->pipe->data;
|
||||
/* Third step : update the connection's polling status based on what
|
||||
* was done above (eg: maybe some buffers got emptied).
|
||||
*/
|
||||
if (channel_is_empty(oc))
|
||||
__conn_data_stop_send(conn);
|
||||
|
||||
si_chk_snd(si_opposite(si));
|
||||
|
||||
new_len = ic->buf->o;
|
||||
if (ic->pipe)
|
||||
new_len += ic->pipe->data;
|
||||
|
||||
/* check if the consumer has freed some space either in the
|
||||
* buffer or in the pipe.
|
||||
*/
|
||||
if (channel_may_recv(ic) && new_len < last_len)
|
||||
si->flags &= ~SI_FL_WAIT_ROOM;
|
||||
}
|
||||
|
||||
if (si->flags & SI_FL_WAIT_ROOM) {
|
||||
__conn_data_stop_recv(conn);
|
||||
ic->rex = TICK_ETERNITY;
|
||||
}
|
||||
else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL &&
|
||||
channel_may_recv(ic)) {
|
||||
/* we must re-enable reading if si_chk_snd() has freed some space */
|
||||
__conn_data_want_recv(conn);
|
||||
if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex))
|
||||
ic->rex = tick_add_ifset(now_ms, ic->rto);
|
||||
}
|
||||
|
||||
/* wake the task up only when needed */
|
||||
if (/* changes on the production side */
|
||||
(ic->flags & (CF_READ_NULL|CF_READ_ERROR)) ||
|
||||
si->state != SI_ST_EST ||
|
||||
(si->flags & SI_FL_ERR) ||
|
||||
((ic->flags & CF_READ_PARTIAL) &&
|
||||
(!ic->to_forward || si_opposite(si)->state != SI_ST_EST)) ||
|
||||
|
||||
/* changes on the consumption side */
|
||||
(oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
|
||||
((oc->flags & CF_WRITE_ACTIVITY) &&
|
||||
((oc->flags & CF_SHUTW) ||
|
||||
((oc->flags & CF_WAKE_WRITE) &&
|
||||
(si_opposite(si)->state != SI_ST_EST ||
|
||||
(channel_is_empty(oc) && !oc->to_forward)))))) {
|
||||
task_wakeup(si_task(si), TASK_WOKEN_IO);
|
||||
}
|
||||
if (ic->flags & CF_READ_ACTIVITY)
|
||||
ic->flags &= ~CF_READ_DONTWAIT;
|
||||
|
||||
stream_release_buffers(si_strm(si));
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -1500,91 +1423,15 @@ void stream_sock_read0(struct stream_interface *si)
|
||||
return;
|
||||
}
|
||||
|
||||
/* notifies the stream interface that the applet has completed its work */
|
||||
/* Callback to be used by applet handlers upon completion. It updates the stream
|
||||
* (which may or may not take this opportunity to try to forward data), then
|
||||
* may disable the applet's based on the channels and stream interface's final
|
||||
* states.
|
||||
*/
|
||||
void si_applet_done(struct stream_interface *si)
|
||||
{
|
||||
struct channel *ic = si_ic(si);
|
||||
struct channel *oc = si_oc(si);
|
||||
|
||||
/* process consumer side */
|
||||
if (channel_is_empty(oc)) {
|
||||
if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) &&
|
||||
(si->state == SI_ST_EST))
|
||||
stream_int_shutw_applet(si);
|
||||
oc->wex = TICK_ETERNITY;
|
||||
}
|
||||
|
||||
/* indicate that we may be waiting for data from the output channel */
|
||||
if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && channel_may_recv(oc))
|
||||
si->flags |= SI_FL_WAIT_DATA;
|
||||
|
||||
/* update OC timeouts and wake the other side up if it's waiting for room */
|
||||
if (oc->flags & CF_WRITE_ACTIVITY) {
|
||||
if ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL &&
|
||||
!channel_is_empty(oc))
|
||||
if (tick_isset(oc->wex))
|
||||
oc->wex = tick_add_ifset(now_ms, oc->wto);
|
||||
|
||||
if (!(si->flags & SI_FL_INDEP_STR))
|
||||
if (tick_isset(ic->rex))
|
||||
ic->rex = tick_add_ifset(now_ms, ic->rto);
|
||||
|
||||
if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL &&
|
||||
channel_may_recv(oc) &&
|
||||
(si_opposite(si)->flags & SI_FL_WAIT_ROOM)))
|
||||
si_chk_rcv(si_opposite(si));
|
||||
}
|
||||
|
||||
/* Notify the other side when we've injected data into the IC that
|
||||
* needs to be forwarded. We can do fast-forwarding as soon as there
|
||||
* are output data, but we avoid doing this for partial buffers,
|
||||
* because it is very likely that it will be done again immediately
|
||||
* afterwards once the following data are parsed (eg: HTTP chunking).
|
||||
* We only remove SI_FL_WAIT_ROOM once we've emptied the whole output
|
||||
* buffer, because applets are often forced to stop before the buffer
|
||||
* is full. We must not stop based on input data alone because an HTTP
|
||||
* parser might need more data to complete the parsing.
|
||||
*/
|
||||
if (!channel_is_empty(ic) &&
|
||||
(si_opposite(si)->flags & SI_FL_WAIT_DATA) &&
|
||||
(si_ib(si)->i == 0 || ic->pipe)) {
|
||||
si_chk_snd(si_opposite(si));
|
||||
if (channel_is_empty(ic))
|
||||
si->flags &= ~SI_FL_WAIT_ROOM;
|
||||
}
|
||||
|
||||
if (si->flags & SI_FL_WAIT_ROOM) {
|
||||
ic->rex = TICK_ETERNITY;
|
||||
}
|
||||
else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL &&
|
||||
channel_may_recv(ic)) {
|
||||
/* we must re-enable reading if si_chk_snd() has freed some space */
|
||||
if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex))
|
||||
ic->rex = tick_add_ifset(now_ms, ic->rto);
|
||||
}
|
||||
|
||||
/* wake the task up only when needed */
|
||||
if (/* changes on the production side */
|
||||
(ic->flags & (CF_READ_NULL|CF_READ_ERROR)) ||
|
||||
si->state != SI_ST_EST ||
|
||||
(si->flags & SI_FL_ERR) ||
|
||||
((ic->flags & CF_READ_PARTIAL) &&
|
||||
(!ic->to_forward || si_opposite(si)->state != SI_ST_EST)) ||
|
||||
|
||||
/* changes on the consumption side */
|
||||
(oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
|
||||
((oc->flags & CF_WRITE_ACTIVITY) &&
|
||||
((oc->flags & CF_SHUTW) ||
|
||||
((oc->flags & CF_WAKE_WRITE) &&
|
||||
(si_opposite(si)->state != SI_ST_EST ||
|
||||
(channel_is_empty(oc) && !oc->to_forward)))))) {
|
||||
task_wakeup(si_task(si), TASK_WOKEN_IO);
|
||||
}
|
||||
|
||||
if (ic->flags & CF_READ_ACTIVITY)
|
||||
ic->flags &= ~CF_READ_DONTWAIT;
|
||||
|
||||
stream_release_buffers(si_strm(si));
|
||||
/* update the stream-int, channels, and possibly wake the stream up */
|
||||
stream_int_notify(si);
|
||||
|
||||
/* Get away from the active list if we can't work anymore.
|
||||
* We also do that if the main task has already scheduled, because it
|
||||
|
Loading…
Reference in New Issue
Block a user