mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-02-03 12:03:02 +00:00
REORG: stream-int: create si_applet_ops dedicated to applets
These functions are dedicated to applets so that we don't use the default ones anymore in this case.
This commit is contained in:
parent
3057645b37
commit
d45b9f8991
@ -41,6 +41,7 @@ void stream_sock_read0(struct stream_interface *si);
|
|||||||
|
|
||||||
extern struct si_ops si_embedded_ops;
|
extern struct si_ops si_embedded_ops;
|
||||||
extern struct si_ops si_conn_ops;
|
extern struct si_ops si_conn_ops;
|
||||||
|
extern struct si_ops si_applet_ops;
|
||||||
extern struct data_cb si_conn_cb;
|
extern struct data_cb si_conn_cb;
|
||||||
extern struct data_cb si_idle_conn_cb;
|
extern struct data_cb si_idle_conn_cb;
|
||||||
|
|
||||||
@ -198,7 +199,7 @@ static inline int si_conn_ready(struct stream_interface *si)
|
|||||||
*/
|
*/
|
||||||
static inline void si_attach_appctx(struct stream_interface *si, struct appctx *appctx)
|
static inline void si_attach_appctx(struct stream_interface *si, struct appctx *appctx)
|
||||||
{
|
{
|
||||||
si->ops = &si_embedded_ops;
|
si->ops = &si_applet_ops;
|
||||||
si->end = &appctx->obj_type;
|
si->end = &appctx->obj_type;
|
||||||
appctx->owner = si;
|
appctx->owner = si;
|
||||||
}
|
}
|
||||||
|
@ -48,6 +48,11 @@ static void stream_int_shutr_conn(struct stream_interface *si);
|
|||||||
static void stream_int_shutw_conn(struct stream_interface *si);
|
static void stream_int_shutw_conn(struct stream_interface *si);
|
||||||
static void stream_int_chk_rcv_conn(struct stream_interface *si);
|
static void stream_int_chk_rcv_conn(struct stream_interface *si);
|
||||||
static void stream_int_chk_snd_conn(struct stream_interface *si);
|
static void stream_int_chk_snd_conn(struct stream_interface *si);
|
||||||
|
static void stream_int_update_applet(struct stream_interface *si);
|
||||||
|
static void stream_int_shutr_applet(struct stream_interface *si);
|
||||||
|
static void stream_int_shutw_applet(struct stream_interface *si);
|
||||||
|
static void stream_int_chk_rcv_applet(struct stream_interface *si);
|
||||||
|
static void stream_int_chk_snd_applet(struct stream_interface *si);
|
||||||
static void si_conn_recv_cb(struct connection *conn);
|
static void si_conn_recv_cb(struct connection *conn);
|
||||||
static void si_conn_send_cb(struct connection *conn);
|
static void si_conn_send_cb(struct connection *conn);
|
||||||
static int si_conn_wake_cb(struct connection *conn);
|
static int si_conn_wake_cb(struct connection *conn);
|
||||||
@ -72,6 +77,15 @@ struct si_ops si_conn_ops = {
|
|||||||
.shutw = stream_int_shutw_conn,
|
.shutw = stream_int_shutw_conn,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/* stream-interface operations for connections */
|
||||||
|
struct si_ops si_applet_ops = {
|
||||||
|
.update = stream_int_update_applet,
|
||||||
|
.chk_rcv = stream_int_chk_rcv_applet,
|
||||||
|
.chk_snd = stream_int_chk_snd_applet,
|
||||||
|
.shutr = stream_int_shutr_applet,
|
||||||
|
.shutw = stream_int_shutw_applet,
|
||||||
|
};
|
||||||
|
|
||||||
struct data_cb si_conn_cb = {
|
struct data_cb si_conn_cb = {
|
||||||
.recv = si_conn_recv_cb,
|
.recv = si_conn_recv_cb,
|
||||||
.send = si_conn_send_cb,
|
.send = si_conn_send_cb,
|
||||||
@ -225,12 +239,11 @@ static void stream_int_update_embedded(struct stream_interface *si)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This function performs a shutdown-read on a stream interface attached to an
|
* This function performs a shutdown-read on a detached stream interface in a
|
||||||
* applet in a connected or init state (it does nothing for other states). It
|
* connected or init state (it does nothing for other states). It either shuts
|
||||||
* either shuts the read side or marks itself as closed. The buffer flags are
|
* the read side or marks itself as closed. The buffer flags are updated to
|
||||||
* updated to reflect the new state. If the stream interface has SI_FL_NOHALF,
|
* reflect the new state. If the stream interface has SI_FL_NOHALF, we also
|
||||||
* we also forward the close to the write side. The owner task is woken up if
|
* forward the close to the write side. The owner task is woken up if it exists.
|
||||||
* it exists.
|
|
||||||
*/
|
*/
|
||||||
static void stream_int_shutr(struct stream_interface *si)
|
static void stream_int_shutr(struct stream_interface *si)
|
||||||
{
|
{
|
||||||
@ -249,7 +262,6 @@ static void stream_int_shutr(struct stream_interface *si)
|
|||||||
if (si_oc(si)->flags & CF_SHUTW) {
|
if (si_oc(si)->flags & CF_SHUTW) {
|
||||||
si->state = SI_ST_DIS;
|
si->state = SI_ST_DIS;
|
||||||
si->exp = TICK_ETERNITY;
|
si->exp = TICK_ETERNITY;
|
||||||
si_applet_release(si);
|
|
||||||
}
|
}
|
||||||
else if (si->flags & SI_FL_NOHALF) {
|
else if (si->flags & SI_FL_NOHALF) {
|
||||||
/* we want to immediately forward this close to the write side */
|
/* we want to immediately forward this close to the write side */
|
||||||
@ -262,11 +274,11 @@ static void stream_int_shutr(struct stream_interface *si)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This function performs a shutdown-write on a stream interface attached to an
|
* This function performs a shutdown-write on a detached stream interface in a
|
||||||
* applet in a connected or init state (it does nothing for other states). It
|
* connected or init state (it does nothing for other states). It either shuts
|
||||||
* either shuts the write side or marks itself as closed. The buffer flags are
|
* the write side or marks itself as closed. The buffer flags are updated to
|
||||||
* updated to reflect the new state. It does also close everything if the SI
|
* reflect the new state. It does also close everything if the SI was marked as
|
||||||
* was marked as being in error state. The owner task is woken up if it exists.
|
* being in error state. The owner task is woken up if it exists.
|
||||||
*/
|
*/
|
||||||
static void stream_int_shutw(struct stream_interface *si)
|
static void stream_int_shutw(struct stream_interface *si)
|
||||||
{
|
{
|
||||||
@ -299,7 +311,6 @@ static void stream_int_shutw(struct stream_interface *si)
|
|||||||
case SI_ST_TAR:
|
case SI_ST_TAR:
|
||||||
/* Note that none of these states may happen with applets */
|
/* Note that none of these states may happen with applets */
|
||||||
si->state = SI_ST_DIS;
|
si->state = SI_ST_DIS;
|
||||||
si_applet_release(si);
|
|
||||||
default:
|
default:
|
||||||
si->flags &= ~(SI_FL_WAIT_ROOM | SI_FL_NOLINGER);
|
si->flags &= ~(SI_FL_WAIT_ROOM | SI_FL_NOLINGER);
|
||||||
ic->flags &= ~CF_SHUTR_NOW;
|
ic->flags &= ~CF_SHUTR_NOW;
|
||||||
@ -1358,6 +1369,231 @@ void stream_sock_read0(struct stream_interface *si)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* default update function for applets, to be used at the end of the i/o handler */
|
||||||
|
static void stream_int_update_applet(struct stream_interface *si)
|
||||||
|
{
|
||||||
|
int old_flags = si->flags;
|
||||||
|
struct channel *ic = si_ic(si);
|
||||||
|
struct channel *oc = si_oc(si);
|
||||||
|
|
||||||
|
DPRINTF(stderr, "%s: si=%p, si->state=%d ic->flags=%08x oc->flags=%08x\n",
|
||||||
|
__FUNCTION__,
|
||||||
|
si, si->state, ic->flags, oc->flags);
|
||||||
|
|
||||||
|
if (si->state != SI_ST_EST)
|
||||||
|
return;
|
||||||
|
|
||||||
|
if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW &&
|
||||||
|
channel_is_empty(oc))
|
||||||
|
si_shutw(si);
|
||||||
|
|
||||||
|
if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && channel_may_recv(oc))
|
||||||
|
si->flags |= SI_FL_WAIT_DATA;
|
||||||
|
|
||||||
|
/* we're almost sure that we need some space if the buffer is not
|
||||||
|
* empty, even if it's not full, because the applets can't fill it.
|
||||||
|
*/
|
||||||
|
if ((ic->flags & (CF_SHUTR|CF_DONT_READ)) == 0 && !channel_is_empty(ic))
|
||||||
|
si->flags |= SI_FL_WAIT_ROOM;
|
||||||
|
|
||||||
|
if (oc->flags & CF_WRITE_ACTIVITY) {
|
||||||
|
if (tick_isset(oc->wex))
|
||||||
|
oc->wex = tick_add_ifset(now_ms, oc->wto);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ic->flags & CF_READ_ACTIVITY ||
|
||||||
|
(oc->flags & CF_WRITE_ACTIVITY && !(si->flags & SI_FL_INDEP_STR))) {
|
||||||
|
if (tick_isset(ic->rex))
|
||||||
|
ic->rex = tick_add_ifset(now_ms, ic->rto);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* save flags to detect changes */
|
||||||
|
old_flags = si->flags;
|
||||||
|
if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL &&
|
||||||
|
channel_may_recv(oc) &&
|
||||||
|
(si_opposite(si)->flags & SI_FL_WAIT_ROOM)))
|
||||||
|
si_chk_rcv(si_opposite(si));
|
||||||
|
|
||||||
|
if (((ic->flags & CF_READ_PARTIAL) && !channel_is_empty(ic)) &&
|
||||||
|
(ic->pipe /* always try to send spliced data */ ||
|
||||||
|
(ic->buf->i == 0 && (si_opposite(si)->flags & SI_FL_WAIT_DATA)))) {
|
||||||
|
si_chk_snd(si_opposite(si));
|
||||||
|
/* check if the consumer has freed some space */
|
||||||
|
if (channel_may_recv(ic) && !ic->pipe)
|
||||||
|
si->flags &= ~SI_FL_WAIT_ROOM;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Note that we're trying to wake up in two conditions here :
|
||||||
|
* - special event, which needs the holder task attention
|
||||||
|
* - status indicating that the applet can go on working. This
|
||||||
|
* is rather hard because we might be blocking on output and
|
||||||
|
* don't want to wake up on input and vice-versa. The idea is
|
||||||
|
* to only rely on the changes the chk_* might have performed.
|
||||||
|
*/
|
||||||
|
if (/* check stream interface changes */
|
||||||
|
((old_flags & ~si->flags) & (SI_FL_WAIT_ROOM|SI_FL_WAIT_DATA)) ||
|
||||||
|
|
||||||
|
/* changes on the production side */
|
||||||
|
(ic->flags & (CF_READ_NULL|CF_READ_ERROR)) ||
|
||||||
|
si->state != SI_ST_EST ||
|
||||||
|
(si->flags & SI_FL_ERR) ||
|
||||||
|
((ic->flags & CF_READ_PARTIAL) &&
|
||||||
|
(!ic->to_forward || si_opposite(si)->state != SI_ST_EST)) ||
|
||||||
|
|
||||||
|
/* changes on the consumption side */
|
||||||
|
(oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
|
||||||
|
((oc->flags & CF_WRITE_ACTIVITY) &&
|
||||||
|
((oc->flags & CF_SHUTW) ||
|
||||||
|
((oc->flags & CF_WAKE_WRITE) &&
|
||||||
|
(si_opposite(si)->state != SI_ST_EST ||
|
||||||
|
(channel_is_empty(oc) && !oc->to_forward)))))) {
|
||||||
|
if (!(si->flags & SI_FL_DONT_WAKE))
|
||||||
|
task_wakeup(si_task(si), TASK_WOKEN_IO);
|
||||||
|
}
|
||||||
|
if (ic->flags & CF_READ_ACTIVITY)
|
||||||
|
ic->flags &= ~CF_READ_DONTWAIT;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This function performs a shutdown-read on a stream interface attached to an
|
||||||
|
* applet in a connected or init state (it does nothing for other states). It
|
||||||
|
* either shuts the read side or marks itself as closed. The buffer flags are
|
||||||
|
* updated to reflect the new state. If the stream interface has SI_FL_NOHALF,
|
||||||
|
* we also forward the close to the write side. The owner task is woken up if
|
||||||
|
* it exists.
|
||||||
|
*/
|
||||||
|
static void stream_int_shutr_applet(struct stream_interface *si)
|
||||||
|
{
|
||||||
|
struct channel *ic = si_ic(si);
|
||||||
|
|
||||||
|
ic->flags &= ~CF_SHUTR_NOW;
|
||||||
|
if (ic->flags & CF_SHUTR)
|
||||||
|
return;
|
||||||
|
ic->flags |= CF_SHUTR;
|
||||||
|
ic->rex = TICK_ETERNITY;
|
||||||
|
si->flags &= ~SI_FL_WAIT_ROOM;
|
||||||
|
|
||||||
|
if (si->state != SI_ST_EST && si->state != SI_ST_CON)
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (si_oc(si)->flags & CF_SHUTW) {
|
||||||
|
si->state = SI_ST_DIS;
|
||||||
|
si->exp = TICK_ETERNITY;
|
||||||
|
si_applet_release(si);
|
||||||
|
}
|
||||||
|
else if (si->flags & SI_FL_NOHALF) {
|
||||||
|
/* we want to immediately forward this close to the write side */
|
||||||
|
return stream_int_shutw_applet(si);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* note that if the task exists, it must unregister itself once it runs */
|
||||||
|
if (!(si->flags & SI_FL_DONT_WAKE))
|
||||||
|
task_wakeup(si_task(si), TASK_WOKEN_IO);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This function performs a shutdown-write on a stream interface attached to an
|
||||||
|
* applet in a connected or init state (it does nothing for other states). It
|
||||||
|
* either shuts the write side or marks itself as closed. The buffer flags are
|
||||||
|
* updated to reflect the new state. It does also close everything if the SI
|
||||||
|
* was marked as being in error state. The owner task is woken up if it exists.
|
||||||
|
*/
|
||||||
|
static void stream_int_shutw_applet(struct stream_interface *si)
|
||||||
|
{
|
||||||
|
struct channel *ic = si_ic(si);
|
||||||
|
struct channel *oc = si_oc(si);
|
||||||
|
|
||||||
|
oc->flags &= ~CF_SHUTW_NOW;
|
||||||
|
if (oc->flags & CF_SHUTW)
|
||||||
|
return;
|
||||||
|
oc->flags |= CF_SHUTW;
|
||||||
|
oc->wex = TICK_ETERNITY;
|
||||||
|
si->flags &= ~SI_FL_WAIT_DATA;
|
||||||
|
|
||||||
|
switch (si->state) {
|
||||||
|
case SI_ST_EST:
|
||||||
|
/* we have to shut before closing, otherwise some short messages
|
||||||
|
* may never leave the system, especially when there are remaining
|
||||||
|
* unread data in the socket input buffer, or when nolinger is set.
|
||||||
|
* However, if SI_FL_NOLINGER is explicitly set, we know there is
|
||||||
|
* no risk so we close both sides immediately.
|
||||||
|
*/
|
||||||
|
if (!(si->flags & (SI_FL_ERR | SI_FL_NOLINGER)) &&
|
||||||
|
!(ic->flags & (CF_SHUTR|CF_DONT_READ)))
|
||||||
|
return;
|
||||||
|
|
||||||
|
/* fall through */
|
||||||
|
case SI_ST_CON:
|
||||||
|
case SI_ST_CER:
|
||||||
|
case SI_ST_QUE:
|
||||||
|
case SI_ST_TAR:
|
||||||
|
/* Note that none of these states may happen with applets */
|
||||||
|
si->state = SI_ST_DIS;
|
||||||
|
si_applet_release(si);
|
||||||
|
default:
|
||||||
|
si->flags &= ~(SI_FL_WAIT_ROOM | SI_FL_NOLINGER);
|
||||||
|
ic->flags &= ~CF_SHUTR_NOW;
|
||||||
|
ic->flags |= CF_SHUTR;
|
||||||
|
ic->rex = TICK_ETERNITY;
|
||||||
|
si->exp = TICK_ETERNITY;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* note that if the task exists, it must unregister itself once it runs */
|
||||||
|
if (!(si->flags & SI_FL_DONT_WAKE))
|
||||||
|
task_wakeup(si_task(si), TASK_WOKEN_IO);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* chk_rcv function for applets */
|
||||||
|
static void stream_int_chk_rcv_applet(struct stream_interface *si)
|
||||||
|
{
|
||||||
|
struct channel *ic = si_ic(si);
|
||||||
|
|
||||||
|
DPRINTF(stderr, "%s: si=%p, si->state=%d ic->flags=%08x oc->flags=%08x\n",
|
||||||
|
__FUNCTION__,
|
||||||
|
si, si->state, ic->flags, si_oc(si)->flags);
|
||||||
|
|
||||||
|
if (unlikely(si->state != SI_ST_EST || (ic->flags & (CF_SHUTR|CF_DONT_READ))))
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (!channel_may_recv(ic) || ic->pipe) {
|
||||||
|
/* stop reading */
|
||||||
|
si->flags |= SI_FL_WAIT_ROOM;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
/* (re)start reading */
|
||||||
|
si->flags &= ~SI_FL_WAIT_ROOM;
|
||||||
|
if (!(si->flags & SI_FL_DONT_WAKE))
|
||||||
|
task_wakeup(si_task(si), TASK_WOKEN_IO);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* chk_snd function for applets */
|
||||||
|
static void stream_int_chk_snd_applet(struct stream_interface *si)
|
||||||
|
{
|
||||||
|
struct channel *oc = si_oc(si);
|
||||||
|
|
||||||
|
DPRINTF(stderr, "%s: si=%p, si->state=%d ic->flags=%08x oc->flags=%08x\n",
|
||||||
|
__FUNCTION__,
|
||||||
|
si, si->state, si_ic(si)->flags, oc->flags);
|
||||||
|
|
||||||
|
if (unlikely(si->state != SI_ST_EST || (oc->flags & CF_SHUTW)))
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (!(si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */
|
||||||
|
channel_is_empty(oc)) /* called with nothing to send ! */
|
||||||
|
return;
|
||||||
|
|
||||||
|
/* Otherwise there are remaining data to be sent in the buffer,
|
||||||
|
* so we tell the handler.
|
||||||
|
*/
|
||||||
|
si->flags &= ~SI_FL_WAIT_DATA;
|
||||||
|
if (!tick_isset(oc->wex))
|
||||||
|
oc->wex = tick_add_ifset(now_ms, oc->wto);
|
||||||
|
|
||||||
|
if (!(si->flags & SI_FL_DONT_WAKE))
|
||||||
|
task_wakeup(si_task(si), TASK_WOKEN_IO);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Local variables:
|
* Local variables:
|
||||||
* c-indent-level: 8
|
* c-indent-level: 8
|
||||||
|
Loading…
Reference in New Issue
Block a user