mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-01-02 10:12:03 +00:00
MEDIUM: mux-fcgi: merge recv_wait and send_wait event notifications
This is the last of the "recv_wait+send_wait merge" patches and is functionally equivalent to previous commit "MEDIUM: mux-h2: merge recv_wait and send_wait event notifications" but for FCGI this time. The principle is pretty much the same, since the code is very similar. We use a single wait_event for both recv and send and rely on the subscribe flags to know the desired notifications.
This commit is contained in:
parent
f96508aae6
commit
8907e4ddb8
123
src/mux_fcgi.c
123
src/mux_fcgi.c
@ -164,8 +164,7 @@ struct fcgi_strm {
|
||||
struct buffer rxbuf; /* receive buffer, always valid (buf_empty or real buffer) */
|
||||
|
||||
struct eb32_node by_id; /* place in fcgi_conn's streams_by_id */
|
||||
struct wait_event *recv_wait; /* Address of the wait_event the conn_stream associated is waiting on */
|
||||
struct wait_event *send_wait; /* Address of the wait_event the conn_stream associated is waiting on */
|
||||
struct wait_event *subs; /* Address of the wait_event the conn_stream associated is waiting on */
|
||||
struct list send_list; /* To be used when adding in fcgi_conn->send_list */
|
||||
struct tasklet *shut_tl; /* deferred shutdown tasklet, to retry to close after we failed to by lack of space */
|
||||
};
|
||||
@ -911,29 +910,25 @@ static inline void fcgi_strm_error(struct fcgi_strm *fstrm)
|
||||
/* Attempts to notify the data layer of recv availability */
|
||||
static void fcgi_strm_notify_recv(struct fcgi_strm *fstrm)
|
||||
{
|
||||
struct wait_event *sw;
|
||||
|
||||
if (fstrm->recv_wait) {
|
||||
if (fstrm->subs && (fstrm->subs->events & SUB_RETRY_RECV)) {
|
||||
TRACE_POINT(FCGI_EV_STRM_WAKE, fstrm->fconn->conn, fstrm);
|
||||
sw = fstrm->recv_wait;
|
||||
sw->events &= ~SUB_RETRY_RECV;
|
||||
tasklet_wakeup(sw->tasklet);
|
||||
fstrm->recv_wait = NULL;
|
||||
tasklet_wakeup(fstrm->subs->tasklet);
|
||||
fstrm->subs->events &= ~SUB_RETRY_RECV;
|
||||
if (!fstrm->subs->events)
|
||||
fstrm->subs = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/* Attempts to notify the data layer of send availability */
|
||||
static void fcgi_strm_notify_send(struct fcgi_strm *fstrm)
|
||||
{
|
||||
struct wait_event *sw;
|
||||
|
||||
if (fstrm->send_wait) {
|
||||
if (fstrm->subs && (fstrm->subs->events & SUB_RETRY_SEND)) {
|
||||
TRACE_POINT(FCGI_EV_STRM_WAKE, fstrm->fconn->conn, fstrm);
|
||||
sw = fstrm->send_wait;
|
||||
sw->events &= ~SUB_RETRY_SEND;
|
||||
fstrm->flags |= FCGI_SF_NOTIFIED;
|
||||
tasklet_wakeup(sw->tasklet);
|
||||
fstrm->send_wait = NULL;
|
||||
tasklet_wakeup(fstrm->subs->tasklet);
|
||||
fstrm->subs->events &= ~SUB_RETRY_SEND;
|
||||
if (!fstrm->subs->events)
|
||||
fstrm->subs = NULL;
|
||||
}
|
||||
else if (fstrm->flags & (FCGI_SF_WANT_SHUTR | FCGI_SF_WANT_SHUTW)) {
|
||||
TRACE_POINT(FCGI_EV_STRM_WAKE, fstrm->fconn->conn, fstrm);
|
||||
@ -953,7 +948,7 @@ static void fcgi_strm_notify_send(struct fcgi_strm *fstrm)
|
||||
static void fcgi_strm_alert(struct fcgi_strm *fstrm)
|
||||
{
|
||||
TRACE_POINT(FCGI_EV_STRM_WAKE, fstrm->fconn->conn, fstrm);
|
||||
if (fstrm->recv_wait || fstrm->send_wait ||
|
||||
if (fstrm->subs ||
|
||||
(fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW))) {
|
||||
fcgi_strm_notify_recv(fstrm);
|
||||
fcgi_strm_notify_send(fstrm);
|
||||
@ -1018,10 +1013,8 @@ static void fcgi_strm_destroy(struct fcgi_strm *fstrm)
|
||||
b_free(&fstrm->rxbuf);
|
||||
offer_buffers(NULL, tasks_run_queue);
|
||||
}
|
||||
if (fstrm->send_wait != NULL)
|
||||
fstrm->send_wait->events &= ~SUB_RETRY_SEND;
|
||||
if (fstrm->recv_wait != NULL)
|
||||
fstrm->recv_wait->events &= ~SUB_RETRY_RECV;
|
||||
if (fstrm->subs)
|
||||
fstrm->subs->events = 0;
|
||||
/* There's no need to explicitly call unsubscribe here, the only
|
||||
* reference left would be in the fconn send_list/fctl_list, and if
|
||||
* we're in it, we're getting out anyway
|
||||
@ -1054,8 +1047,7 @@ static struct fcgi_strm *fcgi_strm_new(struct fcgi_conn *fconn, int id)
|
||||
pool_free(pool_head_fcgi_strm, fstrm);
|
||||
goto out;
|
||||
}
|
||||
fstrm->send_wait = NULL;
|
||||
fstrm->recv_wait = NULL;
|
||||
fstrm->subs = NULL;
|
||||
fstrm->shut_tl->process = fcgi_deferred_shut;
|
||||
fstrm->shut_tl->context = fstrm;
|
||||
LIST_INIT(&fstrm->send_list);
|
||||
@ -2653,17 +2645,20 @@ static int fcgi_process_mux(struct fcgi_conn *fconn)
|
||||
/* If the sender changed his mind and unsubscribed, let's just
|
||||
* remove the stream from the send_list.
|
||||
*/
|
||||
if (!fstrm->send_wait &&
|
||||
!(fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW))) {
|
||||
if (!(fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW)) &&
|
||||
(!fstrm->subs || !(fstrm->subs->events & SUB_RETRY_SEND))) {
|
||||
LIST_DEL_INIT(&fstrm->send_list);
|
||||
continue;
|
||||
}
|
||||
if (fstrm->send_wait) {
|
||||
|
||||
if (fstrm->subs && fstrm->subs->events & SUB_RETRY_SEND) {
|
||||
TRACE_POINT(FCGI_EV_STRM_WAKE, fconn->conn, fstrm);
|
||||
fstrm->flags &= ~FCGI_SF_BLK_ANY;
|
||||
fstrm->send_wait->events &= ~SUB_RETRY_SEND;
|
||||
fstrm->flags |= FCGI_SF_NOTIFIED;
|
||||
tasklet_wakeup(fstrm->send_wait->tasklet);
|
||||
tasklet_wakeup(fstrm->subs->tasklet);
|
||||
fstrm->subs->events &= ~SUB_RETRY_SEND;
|
||||
if (!fstrm->subs->events)
|
||||
fstrm->subs = NULL;
|
||||
} else {
|
||||
/* it's the shut request that was queued */
|
||||
TRACE_POINT(FCGI_EV_STRM_WAKE, fconn->conn, fstrm);
|
||||
@ -2868,17 +2863,20 @@ static int fcgi_send(struct fcgi_conn *fconn)
|
||||
/* If the sender changed his mind and unsubscribed, let's just
|
||||
* remove the stream from the send_list.
|
||||
*/
|
||||
if (!fstrm->send_wait &&
|
||||
!(fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW))) {
|
||||
if (!(fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW)) &&
|
||||
(!fstrm->subs || !(fstrm->subs->events & SUB_RETRY_SEND))) {
|
||||
LIST_DEL_INIT(&fstrm->send_list);
|
||||
continue;
|
||||
}
|
||||
if (fstrm->send_wait) {
|
||||
fstrm->flags &= ~FCGI_SF_BLK_ANY;
|
||||
fstrm->send_wait->events &= ~SUB_RETRY_SEND;
|
||||
|
||||
if (fstrm->subs && fstrm->subs->events & SUB_RETRY_SEND) {
|
||||
TRACE_DEVEL("waking up pending stream", FCGI_EV_FCONN_SEND|FCGI_EV_STRM_WAKE, conn, fstrm);
|
||||
tasklet_wakeup(fstrm->send_wait->tasklet);
|
||||
fstrm->flags &= ~FCGI_SF_BLK_ANY;
|
||||
fstrm->flags |= FCGI_SF_NOTIFIED;
|
||||
tasklet_wakeup(fstrm->subs->tasklet);
|
||||
fstrm->subs->events &= ~SUB_RETRY_SEND;
|
||||
if (!fstrm->subs->events)
|
||||
fstrm->subs = NULL;
|
||||
} else {
|
||||
/* it's the shut request that was queued */
|
||||
TRACE_POINT(FCGI_EV_STRM_WAKE, fconn->conn, fstrm);
|
||||
@ -3448,7 +3446,7 @@ static void fcgi_detach(struct conn_stream *cs)
|
||||
if (!(cs->conn->flags & CO_FL_ERROR) &&
|
||||
(fconn->state != FCGI_CS_CLOSED) &&
|
||||
(fstrm->flags & (FCGI_SF_BLK_MBUSY|FCGI_SF_BLK_MROOM)) &&
|
||||
(fstrm->send_wait || fstrm->recv_wait || (fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW)))) {
|
||||
(fstrm->subs || (fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW)))) {
|
||||
TRACE_DEVEL("leaving on stream blocked", FCGI_EV_STRM_END|FCGI_EV_FSTRM_BLK, fconn->conn, fstrm);
|
||||
return;
|
||||
}
|
||||
@ -3699,37 +3697,31 @@ static void fcgi_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
|
||||
|
||||
/* Called from the upper layer, to subscribe to events, such as being able to send.
|
||||
* The <param> argument here is supposed to be a pointer to a wait_event struct
|
||||
* which will be passed to fstrm->recv_wait or fstrm->send_wait depending on the
|
||||
* event_type. The event_type must only be a combination of SUB_RETRY_RECV and
|
||||
* SUB_RETRY_SEND, other values will lead to -1 being returned. It always
|
||||
* returns 0 except for the error above.
|
||||
* which will be passed to fstrm->subs. The event_type must only be a
|
||||
* combination of SUB_RETRY_RECV and SUB_RETRY_SEND, other values will lead to -1
|
||||
* being returned. It always returns 0 except for the error above.
|
||||
*/
|
||||
static int fcgi_subscribe(struct conn_stream *cs, int event_type, void *param)
|
||||
{
|
||||
struct wait_event *sw;
|
||||
struct wait_event *sw = param;
|
||||
struct fcgi_strm *fstrm = cs->ctx;
|
||||
struct fcgi_conn *fconn = fstrm->fconn;
|
||||
|
||||
if (event_type & SUB_RETRY_RECV) {
|
||||
BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
|
||||
BUG_ON(fstrm->subs && fstrm->subs->events & event_type);
|
||||
BUG_ON(fstrm->subs && fstrm->subs != sw);
|
||||
|
||||
sw->events |= event_type;
|
||||
fstrm->subs = sw;
|
||||
|
||||
if (event_type & SUB_RETRY_RECV)
|
||||
TRACE_DEVEL("unsubscribe(recv)", FCGI_EV_STRM_RECV, fconn->conn, fstrm);
|
||||
sw = param;
|
||||
BUG_ON(fstrm->recv_wait != NULL || (sw->events & SUB_RETRY_RECV));
|
||||
sw->events |= SUB_RETRY_RECV;
|
||||
fstrm->recv_wait = sw;
|
||||
event_type &= ~SUB_RETRY_RECV;
|
||||
}
|
||||
|
||||
if (event_type & SUB_RETRY_SEND) {
|
||||
TRACE_DEVEL("unsubscribe(send)", FCGI_EV_STRM_SEND, fconn->conn, fstrm);
|
||||
sw = param;
|
||||
BUG_ON(fstrm->send_wait != NULL || (sw->events & SUB_RETRY_SEND));
|
||||
sw->events |= SUB_RETRY_SEND;
|
||||
fstrm->send_wait = sw;
|
||||
if (!LIST_ADDED(&fstrm->send_list))
|
||||
LIST_ADDQ(&fconn->send_list, &fstrm->send_list);
|
||||
event_type &= ~SUB_RETRY_SEND;
|
||||
}
|
||||
if (event_type != 0)
|
||||
return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -3740,26 +3732,25 @@ static int fcgi_subscribe(struct conn_stream *cs, int event_type, void *param)
|
||||
*/
|
||||
static int fcgi_unsubscribe(struct conn_stream *cs, int event_type, void *param)
|
||||
{
|
||||
struct wait_event *sw;
|
||||
struct wait_event *sw = param;
|
||||
struct fcgi_strm *fstrm = cs->ctx;
|
||||
struct fcgi_conn *fconn = fstrm->fconn;
|
||||
|
||||
if (event_type & SUB_RETRY_RECV) {
|
||||
BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
|
||||
BUG_ON(fstrm->subs && fstrm->subs != sw);
|
||||
|
||||
sw->events &= ~event_type;
|
||||
if (!sw->events)
|
||||
fstrm->subs = NULL;
|
||||
|
||||
if (event_type & SUB_RETRY_RECV)
|
||||
TRACE_DEVEL("subscribe(recv)", FCGI_EV_STRM_RECV, fconn->conn, fstrm);
|
||||
sw = param;
|
||||
BUG_ON(fstrm->recv_wait != sw);
|
||||
sw->events &= ~SUB_RETRY_RECV;
|
||||
fstrm->recv_wait = NULL;
|
||||
}
|
||||
|
||||
if (event_type & SUB_RETRY_SEND) {
|
||||
TRACE_DEVEL("subscribe(send)", FCGI_EV_STRM_SEND, fconn->conn, fstrm);
|
||||
sw = param;
|
||||
BUG_ON(fstrm->send_wait != sw);
|
||||
fstrm->flags &= ~FCGI_SF_NOTIFIED;
|
||||
if (!(fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW)))
|
||||
LIST_DEL_INIT(&fstrm->send_list);
|
||||
sw->events &= ~SUB_RETRY_SEND;
|
||||
fstrm->flags &= ~FCGI_SF_NOTIFIED;
|
||||
fstrm->send_wait = NULL;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user