mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-05-05 09:18:10 +00:00
MINOR: stream-int-conn-stream: Move si_update_* in conn-stream scope
si_update_rx(), si_update_tx() and si_update() are renamed cs_update_rx(), cs_upate_tx() and cs_update() and updated to manipulate a conn-stream instead of a stream-interface.
This commit is contained in:
parent
9ffddd5ca5
commit
13045f0eae
@ -33,6 +33,10 @@
|
|||||||
#include <haproxy/stream.h>
|
#include <haproxy/stream.h>
|
||||||
#include <haproxy/stream_interface.h>
|
#include <haproxy/stream_interface.h>
|
||||||
|
|
||||||
|
void cs_update_rx(struct conn_stream *cs);
|
||||||
|
void cs_update_tx(struct conn_stream *cs);
|
||||||
|
void cs_update_both(struct conn_stream *csf, struct conn_stream *csb);
|
||||||
|
|
||||||
/* returns the channel which receives data from this conn-stream (input channel) */
|
/* returns the channel which receives data from this conn-stream (input channel) */
|
||||||
static inline struct channel *cs_ic(struct conn_stream *cs)
|
static inline struct channel *cs_ic(struct conn_stream *cs)
|
||||||
{
|
{
|
||||||
@ -268,6 +272,13 @@ static inline void cs_chk_snd(struct conn_stream *cs)
|
|||||||
cs->ops->chk_snd(cs);
|
cs->ops->chk_snd(cs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Combines both cs_update_rx() and cs_update_tx() at once */
|
||||||
|
static inline void cs_update(struct conn_stream *cs)
|
||||||
|
{
|
||||||
|
cs_update_rx(cs);
|
||||||
|
cs_update_tx(cs);
|
||||||
|
}
|
||||||
|
|
||||||
/* for debugging, reports the stream interface state name */
|
/* for debugging, reports the stream interface state name */
|
||||||
static inline const char *cs_state_str(int state)
|
static inline const char *cs_state_str(int state)
|
||||||
{
|
{
|
||||||
|
@ -55,7 +55,7 @@ enum {
|
|||||||
|
|
||||||
/* Note that if an applet is registered, the update function will not be called
|
/* Note that if an applet is registered, the update function will not be called
|
||||||
* by the session handler, so it may be used to resync flags at the end of the
|
* by the session handler, so it may be used to resync flags at the end of the
|
||||||
* applet handler. See si_update() for reference.
|
* applet handler.
|
||||||
*/
|
*/
|
||||||
struct stream_interface {
|
struct stream_interface {
|
||||||
/* struct members used by the "buffer" side */
|
/* struct members used by the "buffer" side */
|
||||||
|
@ -37,10 +37,7 @@ void si_free(struct stream_interface *si);
|
|||||||
|
|
||||||
/* main event functions used to move data between sockets and buffers */
|
/* main event functions used to move data between sockets and buffers */
|
||||||
void si_applet_wake_cb(struct stream_interface *si);
|
void si_applet_wake_cb(struct stream_interface *si);
|
||||||
void si_update_rx(struct stream_interface *si);
|
|
||||||
void si_update_tx(struct stream_interface *si);
|
|
||||||
struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned int state);
|
struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned int state);
|
||||||
void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b);
|
|
||||||
int si_sync_recv(struct stream_interface *si);
|
int si_sync_recv(struct stream_interface *si);
|
||||||
void si_sync_send(struct stream_interface *si);
|
void si_sync_send(struct stream_interface *si);
|
||||||
|
|
||||||
@ -263,13 +260,6 @@ static inline int si_alloc_ibuf(struct stream_interface *si, struct buffer_wait
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Combines both si_update_rx() and si_update_tx() at once */
|
|
||||||
static inline void si_update(struct stream_interface *si)
|
|
||||||
{
|
|
||||||
si_update_rx(si);
|
|
||||||
si_update_tx(si);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* The stream interface is only responsible for the connection during the early
|
/* The stream interface is only responsible for the connection during the early
|
||||||
* states, before plugging a mux. Thus it should only care about CO_FL_ERROR
|
* states, before plugging a mux. Thus it should only care about CO_FL_ERROR
|
||||||
* before CS_ST_EST, and after that it must absolutely ignore it since the mux
|
* before CS_ST_EST, and after that it must absolutely ignore it since the mux
|
||||||
|
@ -940,3 +940,134 @@ static void cs_app_chk_snd_applet(struct conn_stream *cs)
|
|||||||
appctx_wakeup(__cs_appctx(cs));
|
appctx_wakeup(__cs_appctx(cs));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* This function is designed to be called from within the stream handler to
|
||||||
|
* update the input channel's expiration timer and the conn-stream's
|
||||||
|
* Rx flags based on the channel's flags. It needs to be called only once
|
||||||
|
* after the channel's flags have settled down, and before they are cleared,
|
||||||
|
* though it doesn't harm to call it as often as desired (it just slightly
|
||||||
|
* hurts performance). It must not be called from outside of the stream
|
||||||
|
* handler, as what it does will be used to compute the stream task's
|
||||||
|
* expiration.
|
||||||
|
*/
|
||||||
|
void cs_update_rx(struct conn_stream *cs)
|
||||||
|
{
|
||||||
|
struct channel *ic = cs_ic(cs);
|
||||||
|
|
||||||
|
if (ic->flags & CF_SHUTR) {
|
||||||
|
si_rx_shut_blk(cs->si);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Read not closed, update FD status and timeout for reads */
|
||||||
|
if (ic->flags & CF_DONT_READ)
|
||||||
|
si_rx_chan_blk(cs->si);
|
||||||
|
else
|
||||||
|
si_rx_chan_rdy(cs->si);
|
||||||
|
|
||||||
|
if (!channel_is_empty(ic) || !channel_may_recv(ic)) {
|
||||||
|
/* stop reading, imposed by channel's policy or contents */
|
||||||
|
si_rx_room_blk(cs->si);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
/* (re)start reading and update timeout. Note: we don't recompute the timeout
|
||||||
|
* every time we get here, otherwise it would risk never to expire. We only
|
||||||
|
* update it if is was not yet set. The stream socket handler will already
|
||||||
|
* have updated it if there has been a completed I/O.
|
||||||
|
*/
|
||||||
|
si_rx_room_rdy(cs->si);
|
||||||
|
}
|
||||||
|
if (cs->si->flags & SI_FL_RXBLK_ANY & ~SI_FL_RX_WAIT_EP)
|
||||||
|
ic->rex = TICK_ETERNITY;
|
||||||
|
else if (!(ic->flags & CF_READ_NOEXP) && !tick_isset(ic->rex))
|
||||||
|
ic->rex = tick_add_ifset(now_ms, ic->rto);
|
||||||
|
|
||||||
|
cs_chk_rcv(cs);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* This function is designed to be called from within the stream handler to
|
||||||
|
* update the output channel's expiration timer and the conn-stream's
|
||||||
|
* Tx flags based on the channel's flags. It needs to be called only once
|
||||||
|
* after the channel's flags have settled down, and before they are cleared,
|
||||||
|
* though it doesn't harm to call it as often as desired (it just slightly
|
||||||
|
* hurts performance). It must not be called from outside of the stream
|
||||||
|
* handler, as what it does will be used to compute the stream task's
|
||||||
|
* expiration.
|
||||||
|
*/
|
||||||
|
void cs_update_tx(struct conn_stream *cs)
|
||||||
|
{
|
||||||
|
struct channel *oc = cs_oc(cs);
|
||||||
|
struct channel *ic = cs_ic(cs);
|
||||||
|
|
||||||
|
if (oc->flags & CF_SHUTW)
|
||||||
|
return;
|
||||||
|
|
||||||
|
/* Write not closed, update FD status and timeout for writes */
|
||||||
|
if (channel_is_empty(oc)) {
|
||||||
|
/* stop writing */
|
||||||
|
if (!(cs->si->flags & SI_FL_WAIT_DATA)) {
|
||||||
|
if ((oc->flags & CF_SHUTW_NOW) == 0)
|
||||||
|
cs->si->flags |= SI_FL_WAIT_DATA;
|
||||||
|
oc->wex = TICK_ETERNITY;
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* (re)start writing and update timeout. Note: we don't recompute the timeout
|
||||||
|
* every time we get here, otherwise it would risk never to expire. We only
|
||||||
|
* update it if is was not yet set. The stream socket handler will already
|
||||||
|
* have updated it if there has been a completed I/O.
|
||||||
|
*/
|
||||||
|
cs->si->flags &= ~SI_FL_WAIT_DATA;
|
||||||
|
if (!tick_isset(oc->wex)) {
|
||||||
|
oc->wex = tick_add_ifset(now_ms, oc->wto);
|
||||||
|
if (tick_isset(ic->rex) && !(cs->flags & CS_FL_INDEP_STR)) {
|
||||||
|
/* Note: depending on the protocol, we don't know if we're waiting
|
||||||
|
* for incoming data or not. So in order to prevent the socket from
|
||||||
|
* expiring read timeouts during writes, we refresh the read timeout,
|
||||||
|
* except if it was already infinite or if we have explicitly setup
|
||||||
|
* independent streams.
|
||||||
|
*/
|
||||||
|
ic->rex = tick_add_ifset(now_ms, ic->rto);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Updates at once the channel flags, and timers of both conn-streams of a
|
||||||
|
* same stream, to complete the work after the analysers, then updates the data
|
||||||
|
* layer below. This will ensure that any synchronous update performed at the
|
||||||
|
* data layer will be reflected in the channel flags and/or conn-stream.
|
||||||
|
* Note that this does not change the conn-stream's current state, though
|
||||||
|
* it updates the previous state to the current one.
|
||||||
|
*/
|
||||||
|
void cs_update_both(struct conn_stream *csf, struct conn_stream *csb)
|
||||||
|
{
|
||||||
|
struct channel *req = cs_ic(csf);
|
||||||
|
struct channel *res = cs_oc(csf);
|
||||||
|
|
||||||
|
req->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL);
|
||||||
|
res->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL);
|
||||||
|
|
||||||
|
__cs_strm(csb)->prev_conn_state = csb->state;
|
||||||
|
|
||||||
|
/* let's recompute both sides states */
|
||||||
|
if (cs_state_in(csf->state, CS_SB_RDY|CS_SB_EST))
|
||||||
|
cs_update(csf);
|
||||||
|
|
||||||
|
if (cs_state_in(csb->state, CS_SB_RDY|CS_SB_EST))
|
||||||
|
cs_update(csb);
|
||||||
|
|
||||||
|
/* stream ints are processed outside of process_stream() and must be
|
||||||
|
* handled at the latest moment.
|
||||||
|
*/
|
||||||
|
if (cs_appctx(csf) &&
|
||||||
|
((si_rx_endp_ready(csf->si) && !si_rx_blocked(csf->si)) ||
|
||||||
|
(si_tx_endp_ready(csf->si) && !si_tx_blocked(csf->si))))
|
||||||
|
appctx_wakeup(__cs_appctx(csf));
|
||||||
|
|
||||||
|
if (cs_appctx(csb) &&
|
||||||
|
((si_rx_endp_ready(csb->si) && !si_rx_blocked(csb->si)) ||
|
||||||
|
(si_tx_endp_ready(csb->si) && !si_tx_blocked(csb->si))))
|
||||||
|
appctx_wakeup(__cs_appctx(csb));
|
||||||
|
}
|
||||||
|
@ -1953,7 +1953,7 @@ static void hlua_socket_handler(struct appctx *appctx)
|
|||||||
* interface.
|
* interface.
|
||||||
*/
|
*/
|
||||||
if (!channel_is_empty(cs_ic(cs)))
|
if (!channel_is_empty(cs_ic(cs)))
|
||||||
si_update(cs->si);
|
cs_update(cs);
|
||||||
|
|
||||||
/* If write notifications are registered, we considers we want
|
/* If write notifications are registered, we considers we want
|
||||||
* to write, so we clear the blocking flag.
|
* to write, so we clear the blocking flag.
|
||||||
|
@ -2455,7 +2455,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
|
|||||||
if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED) && !(s->flags & SF_IGNORE))
|
if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED) && !(s->flags & SF_IGNORE))
|
||||||
stream_process_counters(s);
|
stream_process_counters(s);
|
||||||
|
|
||||||
si_update_both(si_f, si_b);
|
cs_update_both(s->csf, s->csb);
|
||||||
|
|
||||||
/* Trick: if a request is being waiting for the server to respond,
|
/* Trick: if a request is being waiting for the server to respond,
|
||||||
* and if we know the server can timeout, we don't want the timeout
|
* and if we know the server can timeout, we don't want the timeout
|
||||||
|
@ -77,14 +77,14 @@ void si_free(struct stream_interface *si)
|
|||||||
pool_free(pool_head_streaminterface, si);
|
pool_free(pool_head_streaminterface, si);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function is the equivalent to si_update() except that it's
|
/* This function is the equivalent to cs_update() except that it's
|
||||||
* designed to be called from outside the stream handlers, typically the lower
|
* designed to be called from outside the stream handlers, typically the lower
|
||||||
* layers (applets, connections) after I/O completion. After updating the stream
|
* layers (applets, connections) after I/O completion. After updating the stream
|
||||||
* interface and timeouts, it will try to forward what can be forwarded, then to
|
* interface and timeouts, it will try to forward what can be forwarded, then to
|
||||||
* wake the associated task up if an important event requires special handling.
|
* wake the associated task up if an important event requires special handling.
|
||||||
* It may update SI_FL_WAIT_DATA and/or SI_FL_RXBLK_ROOM, that the callers are
|
* It may update SI_FL_WAIT_DATA and/or SI_FL_RXBLK_ROOM, that the callers are
|
||||||
* encouraged to watch to take appropriate action.
|
* encouraged to watch to take appropriate action.
|
||||||
* It should not be called from within the stream itself, si_update()
|
* It should not be called from within the stream itself, cs_update()
|
||||||
* is designed for this.
|
* is designed for this.
|
||||||
*/
|
*/
|
||||||
static void stream_int_notify(struct stream_interface *si)
|
static void stream_int_notify(struct stream_interface *si)
|
||||||
@ -474,98 +474,6 @@ struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned int state)
|
|||||||
return t;
|
return t;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function is designed to be called from within the stream handler to
|
|
||||||
* update the input channel's expiration timer and the stream interface's
|
|
||||||
* Rx flags based on the channel's flags. It needs to be called only once
|
|
||||||
* after the channel's flags have settled down, and before they are cleared,
|
|
||||||
* though it doesn't harm to call it as often as desired (it just slightly
|
|
||||||
* hurts performance). It must not be called from outside of the stream
|
|
||||||
* handler, as what it does will be used to compute the stream task's
|
|
||||||
* expiration.
|
|
||||||
*/
|
|
||||||
void si_update_rx(struct stream_interface *si)
|
|
||||||
{
|
|
||||||
struct channel *ic = si_ic(si);
|
|
||||||
|
|
||||||
if (ic->flags & CF_SHUTR) {
|
|
||||||
si_rx_shut_blk(si);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Read not closed, update FD status and timeout for reads */
|
|
||||||
if (ic->flags & CF_DONT_READ)
|
|
||||||
si_rx_chan_blk(si);
|
|
||||||
else
|
|
||||||
si_rx_chan_rdy(si);
|
|
||||||
|
|
||||||
if (!channel_is_empty(ic) || !channel_may_recv(ic)) {
|
|
||||||
/* stop reading, imposed by channel's policy or contents */
|
|
||||||
si_rx_room_blk(si);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
/* (re)start reading and update timeout. Note: we don't recompute the timeout
|
|
||||||
* every time we get here, otherwise it would risk never to expire. We only
|
|
||||||
* update it if is was not yet set. The stream socket handler will already
|
|
||||||
* have updated it if there has been a completed I/O.
|
|
||||||
*/
|
|
||||||
si_rx_room_rdy(si);
|
|
||||||
}
|
|
||||||
if (si->flags & SI_FL_RXBLK_ANY & ~SI_FL_RX_WAIT_EP)
|
|
||||||
ic->rex = TICK_ETERNITY;
|
|
||||||
else if (!(ic->flags & CF_READ_NOEXP) && !tick_isset(ic->rex))
|
|
||||||
ic->rex = tick_add_ifset(now_ms, ic->rto);
|
|
||||||
|
|
||||||
cs_chk_rcv(si->cs);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* This function is designed to be called from within the stream handler to
|
|
||||||
* update the output channel's expiration timer and the stream interface's
|
|
||||||
* Tx flags based on the channel's flags. It needs to be called only once
|
|
||||||
* after the channel's flags have settled down, and before they are cleared,
|
|
||||||
* though it doesn't harm to call it as often as desired (it just slightly
|
|
||||||
* hurts performance). It must not be called from outside of the stream
|
|
||||||
* handler, as what it does will be used to compute the stream task's
|
|
||||||
* expiration.
|
|
||||||
*/
|
|
||||||
void si_update_tx(struct stream_interface *si)
|
|
||||||
{
|
|
||||||
struct channel *oc = si_oc(si);
|
|
||||||
struct channel *ic = si_ic(si);
|
|
||||||
|
|
||||||
if (oc->flags & CF_SHUTW)
|
|
||||||
return;
|
|
||||||
|
|
||||||
/* Write not closed, update FD status and timeout for writes */
|
|
||||||
if (channel_is_empty(oc)) {
|
|
||||||
/* stop writing */
|
|
||||||
if (!(si->flags & SI_FL_WAIT_DATA)) {
|
|
||||||
if ((oc->flags & CF_SHUTW_NOW) == 0)
|
|
||||||
si->flags |= SI_FL_WAIT_DATA;
|
|
||||||
oc->wex = TICK_ETERNITY;
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* (re)start writing and update timeout. Note: we don't recompute the timeout
|
|
||||||
* every time we get here, otherwise it would risk never to expire. We only
|
|
||||||
* update it if is was not yet set. The stream socket handler will already
|
|
||||||
* have updated it if there has been a completed I/O.
|
|
||||||
*/
|
|
||||||
si->flags &= ~SI_FL_WAIT_DATA;
|
|
||||||
if (!tick_isset(oc->wex)) {
|
|
||||||
oc->wex = tick_add_ifset(now_ms, oc->wto);
|
|
||||||
if (tick_isset(ic->rex) && !(si->cs->flags & CS_FL_INDEP_STR)) {
|
|
||||||
/* Note: depending on the protocol, we don't know if we're waiting
|
|
||||||
* for incoming data or not. So in order to prevent the socket from
|
|
||||||
* expiring read timeouts during writes, we refresh the read timeout,
|
|
||||||
* except if it was already infinite or if we have explicitly setup
|
|
||||||
* independent streams.
|
|
||||||
*/
|
|
||||||
ic->rex = tick_add_ifset(now_ms, ic->rto);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* This tries to perform a synchronous receive on the stream interface to
|
/* This tries to perform a synchronous receive on the stream interface to
|
||||||
* try to collect last arrived data. In practice it's only implemented on
|
* try to collect last arrived data. In practice it's only implemented on
|
||||||
* conn_streams. Returns 0 if nothing was done, non-zero if new data or a
|
* conn_streams. Returns 0 if nothing was done, non-zero if new data or a
|
||||||
@ -615,44 +523,6 @@ void si_sync_send(struct stream_interface *si)
|
|||||||
si_cs_send(si->cs);
|
si_cs_send(si->cs);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Updates at once the channel flags, and timers of both stream interfaces of a
|
|
||||||
* same stream, to complete the work after the analysers, then updates the data
|
|
||||||
* layer below. This will ensure that any synchronous update performed at the
|
|
||||||
* data layer will be reflected in the channel flags and/or stream-interface.
|
|
||||||
* Note that this does not change the stream interface's current state, though
|
|
||||||
* it updates the previous state to the current one.
|
|
||||||
*/
|
|
||||||
void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b)
|
|
||||||
{
|
|
||||||
struct channel *req = si_ic(si_f);
|
|
||||||
struct channel *res = si_oc(si_f);
|
|
||||||
|
|
||||||
req->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL);
|
|
||||||
res->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL);
|
|
||||||
|
|
||||||
si_strm(si_b)->prev_conn_state = si_b->cs->state;
|
|
||||||
|
|
||||||
/* let's recompute both sides states */
|
|
||||||
if (cs_state_in(si_f->cs->state, CS_SB_RDY|CS_SB_EST))
|
|
||||||
si_update(si_f);
|
|
||||||
|
|
||||||
if (cs_state_in(si_b->cs->state, CS_SB_RDY|CS_SB_EST))
|
|
||||||
si_update(si_b);
|
|
||||||
|
|
||||||
/* stream ints are processed outside of process_stream() and must be
|
|
||||||
* handled at the latest moment.
|
|
||||||
*/
|
|
||||||
if (cs_appctx(si_f->cs) &&
|
|
||||||
((si_rx_endp_ready(si_f) && !si_rx_blocked(si_f)) ||
|
|
||||||
(si_tx_endp_ready(si_f) && !si_tx_blocked(si_f))))
|
|
||||||
appctx_wakeup(__cs_appctx(si_f->cs));
|
|
||||||
|
|
||||||
if (cs_appctx(si_b->cs) &&
|
|
||||||
((si_rx_endp_ready(si_b) && !si_rx_blocked(si_b)) ||
|
|
||||||
(si_tx_endp_ready(si_b) && !si_tx_blocked(si_b))))
|
|
||||||
appctx_wakeup(__cs_appctx(si_b->cs));
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This is the callback which is called by the connection layer to receive data
|
* This is the callback which is called by the connection layer to receive data
|
||||||
* into the buffer from the connection. It iterates over the mux layer's
|
* into the buffer from the connection. It iterates over the mux layer's
|
||||||
|
Loading…
Reference in New Issue
Block a user