mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-05-06 01:37:59 +00:00
MEDIUM: stream-int/conn-stream: Handle I/O subscriptions in the conn-stream
wait_event structure is moved in the conn-stream. The tasklet is only created if the conn-stream is attached to a mux and released when the mux is detached. This implies a subtle change. In stream_int_chk_rcv() function, the wakeup of the tasklet was removed because there is no longer tasklet at this stage (stream_int_chk_rcv() is a callback function of si_embedded_ops).
This commit is contained in:
parent
070b91bc11
commit
2f35e7b6ab
@ -157,6 +157,7 @@ struct conn_stream {
|
||||
|
||||
unsigned int flags; /* CS_FL_* */
|
||||
unsigned int hcto; /* half-closed timeout (0 = unset) */
|
||||
struct wait_event wait_event; /* We're in a wait list */
|
||||
struct cs_endpoint *endp; /* points to the end point (MUX stream or appctx) */
|
||||
enum obj_type *app; /* points to the applicative point (stream or check) */
|
||||
struct stream_interface *si;
|
||||
|
@ -63,8 +63,6 @@ struct stream_interface {
|
||||
unsigned int flags; /* SI_FL_* */
|
||||
struct conn_stream *cs; /* points to the conn-streams that owns the endpoint (connection or applet) */
|
||||
struct si_ops *ops; /* general operations at the stream interface layer */
|
||||
|
||||
struct wait_event wait_event; /* We're in a wait list */
|
||||
};
|
||||
|
||||
/* operations available on a stream-interface */
|
||||
|
@ -108,12 +108,6 @@ static inline int si_init(struct stream_interface *si)
|
||||
si->flags &= SI_FL_ISBACK;
|
||||
si->cs = NULL;
|
||||
si->ops = &si_embedded_ops;
|
||||
si->wait_event.tasklet = tasklet_new();
|
||||
if (!si->wait_event.tasklet)
|
||||
return -1;
|
||||
si->wait_event.tasklet->process = si_cs_io_cb;
|
||||
si->wait_event.tasklet->context = si;
|
||||
si->wait_event.events = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -64,6 +64,9 @@ struct conn_stream *cs_new(struct cs_endpoint *endp)
|
||||
cs->data_cb = NULL;
|
||||
cs->src = NULL;
|
||||
cs->dst = NULL;
|
||||
cs->wait_event.tasklet = NULL;
|
||||
cs->wait_event.events = 0;
|
||||
|
||||
if (!endp) {
|
||||
endp = cs_endpoint_new();
|
||||
if (unlikely(!endp))
|
||||
@ -156,6 +159,8 @@ void cs_free(struct conn_stream *cs)
|
||||
BUG_ON(!(cs->endp->flags & CS_EP_DETACHED));
|
||||
cs_endpoint_free(cs->endp);
|
||||
}
|
||||
if (cs->wait_event.tasklet)
|
||||
tasklet_free(cs->wait_event.tasklet);
|
||||
pool_free(pool_head_connstream, cs);
|
||||
}
|
||||
|
||||
@ -172,6 +177,15 @@ int cs_attach_mux(struct conn_stream *cs, void *target, void *ctx)
|
||||
if (!conn->ctx)
|
||||
conn->ctx = cs;
|
||||
if (cs_strm(cs)) {
|
||||
if (!cs->wait_event.tasklet) {
|
||||
cs->wait_event.tasklet = tasklet_new();
|
||||
if (!cs->wait_event.tasklet)
|
||||
return -1;
|
||||
cs->wait_event.tasklet->process = si_cs_io_cb;
|
||||
cs->wait_event.tasklet->context = cs->si;
|
||||
cs->wait_event.events = 0;
|
||||
}
|
||||
|
||||
cs->si->ops = &si_conn_ops;
|
||||
cs->data_cb = &si_conn_cb;
|
||||
}
|
||||
@ -204,8 +218,19 @@ int cs_attach_strm(struct conn_stream *cs, struct stream *strm)
|
||||
cs->si = si_new(cs);
|
||||
if (unlikely(!cs->si))
|
||||
return -1;
|
||||
|
||||
cs->endp->flags &= ~CS_EP_ORPHAN;
|
||||
if (cs->endp->flags & CS_EP_T_MUX) {
|
||||
cs->wait_event.tasklet = tasklet_new();
|
||||
if (!cs->wait_event.tasklet) {
|
||||
si_free(cs->si);
|
||||
cs->si = NULL;
|
||||
return -1;
|
||||
}
|
||||
cs->wait_event.tasklet->process = si_cs_io_cb;
|
||||
cs->wait_event.tasklet->context = cs->si;
|
||||
cs->wait_event.events = 0;
|
||||
|
||||
cs->si->ops = &si_conn_ops;
|
||||
cs->data_cb = &si_conn_cb;
|
||||
}
|
||||
@ -237,8 +262,8 @@ void cs_detach_endp(struct conn_stream *cs)
|
||||
if (conn->mux) {
|
||||
/* TODO: handle unsubscribe for healthchecks too */
|
||||
cs->endp->flags |= CS_EP_ORPHAN;
|
||||
if (cs->si && cs->si->wait_event.events != 0)
|
||||
conn->mux->unsubscribe(cs, cs->si->wait_event.events, &cs->si->wait_event);
|
||||
if (cs->wait_event.events != 0)
|
||||
conn->mux->unsubscribe(cs, cs->wait_event.events, &cs->wait_event);
|
||||
conn->mux->detach(cs);
|
||||
cs->endp = NULL;
|
||||
}
|
||||
@ -290,6 +315,12 @@ void cs_detach_app(struct conn_stream *cs)
|
||||
cs->data_cb = NULL;
|
||||
sockaddr_free(&cs->src);
|
||||
sockaddr_free(&cs->dst);
|
||||
|
||||
if (cs->wait_event.tasklet)
|
||||
tasklet_free(cs->wait_event.tasklet);
|
||||
cs->wait_event.tasklet = NULL;
|
||||
cs->wait_event.events = 0;
|
||||
|
||||
if (!cs->endp || (cs->endp->flags & CS_EP_DETACHED))
|
||||
cs_free(cs);
|
||||
}
|
||||
|
21
src/stream.c
21
src/stream.c
@ -1502,9 +1502,8 @@ int stream_set_http_mode(struct stream *s, const struct mux_proto_list *mux_prot
|
||||
* mux will probably want to subscribe to
|
||||
* the underlying XPRT
|
||||
*/
|
||||
if (cs_si(s->csf)->wait_event.events)
|
||||
conn->mux->unsubscribe(cs, cs_si(s->csf)->wait_event.events,
|
||||
&(cs_si(s->csf)->wait_event));
|
||||
if (s->csf->wait_event.events)
|
||||
conn->mux->unsubscribe(cs, s->csf->wait_event.events, &(s->csf->wait_event));
|
||||
|
||||
if (conn->mux->flags & MX_FL_NO_UPG)
|
||||
return 0;
|
||||
@ -3278,22 +3277,22 @@ static int stats_dump_full_strm_to_buffer(struct conn_stream *cs, struct stream
|
||||
strm->txn->req.flags, strm->txn->rsp.flags);
|
||||
|
||||
chunk_appendf(&trash,
|
||||
" si[0]=%p (flags=0x%02x endp0=%s:%p sub=%d)\n",
|
||||
" si[0]=%p (flags=0x%02x endp0=%s:%p)\n",
|
||||
strm->csf->si,
|
||||
strm->csf->si->flags,
|
||||
(strm->csf->endp->flags & CS_EP_T_MUX ? "CONN" : "APPCTX"),
|
||||
__cs_endp_target(strm->csf), strm->csf->si->wait_event.events);
|
||||
__cs_endp_target(strm->csf));
|
||||
|
||||
chunk_appendf(&trash,
|
||||
" si[1]=%p (flags=0x%02x endp1=%s:%p sub=%d)\n",
|
||||
" si[1]=%p (flags=0x%02x endp1=%s:%p)\n",
|
||||
strm->csb->si,
|
||||
strm->csb->si->flags,
|
||||
(strm->csb->endp->flags & CS_EP_T_MUX ? "CONN" : "APPCTX"),
|
||||
__cs_endp_target(strm->csb), strm->csb->si->wait_event.events);
|
||||
__cs_endp_target(strm->csb));
|
||||
|
||||
csf = strm->csf;
|
||||
chunk_appendf(&trash, " cs=%p csf=0x%08x state=%s endp=%p,0x%08x\n", csf, csf->flags,
|
||||
cs_state_str(csf->state), csf->endp->target, csf->endp->flags);
|
||||
chunk_appendf(&trash, " cs=%p csf=0x%08x state=%s endp=%p,0x%08x sub=%d\n", csf, csf->flags,
|
||||
cs_state_str(csf->state), csf->endp->target, csf->endp->flags, csf->wait_event.events);
|
||||
|
||||
if ((conn = cs_conn(csf)) != NULL) {
|
||||
chunk_appendf(&trash,
|
||||
@ -3329,8 +3328,8 @@ static int stats_dump_full_strm_to_buffer(struct conn_stream *cs, struct stream
|
||||
}
|
||||
|
||||
csb = strm->csb;
|
||||
chunk_appendf(&trash, " cs=%p csb=0x%08x state=%s endp=%p,0x%08x\n", csb, csb->flags,
|
||||
cs_state_str(csb->state), csb->endp->target, csb->endp->flags);
|
||||
chunk_appendf(&trash, " cs=%p csb=0x%08x state=%s endp=%p,0x%08x sub=%d\n", csb, csb->flags,
|
||||
cs_state_str(csb->state), csb->endp->target, csb->endp->flags, csb->wait_event.events);
|
||||
if ((conn = cs_conn(csb)) != NULL) {
|
||||
chunk_appendf(&trash,
|
||||
" co1=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n",
|
||||
|
@ -124,7 +124,6 @@ void si_free(struct stream_interface *si)
|
||||
if (!si)
|
||||
return;
|
||||
|
||||
tasklet_free(si->wait_event.tasklet);
|
||||
pool_free(pool_head_streaminterface, si);
|
||||
}
|
||||
|
||||
@ -235,7 +234,6 @@ static void stream_int_chk_rcv(struct stream_interface *si)
|
||||
}
|
||||
else {
|
||||
/* (re)start reading */
|
||||
tasklet_wakeup(si->wait_event.tasklet);
|
||||
if (!(si->cs->flags & CS_FL_DONT_WAKE))
|
||||
task_wakeup(si_task(si), TASK_WOKEN_IO);
|
||||
}
|
||||
@ -555,7 +553,7 @@ static int si_cs_process(struct conn_stream *cs)
|
||||
BUG_ON(!conn);
|
||||
|
||||
/* If we have data to send, try it now */
|
||||
if (!channel_is_empty(oc) && !(si->wait_event.events & SUB_RETRY_SEND))
|
||||
if (!channel_is_empty(oc) && !(si->cs->wait_event.events & SUB_RETRY_SEND))
|
||||
si_cs_send(cs);
|
||||
|
||||
/* First step, report to the conn-stream what was detected at the
|
||||
@ -658,7 +656,7 @@ static int si_cs_send(struct conn_stream *cs)
|
||||
}
|
||||
|
||||
/* We're already waiting to be able to send, give up */
|
||||
if (si->wait_event.events & SUB_RETRY_SEND)
|
||||
if (si->cs->wait_event.events & SUB_RETRY_SEND)
|
||||
return 0;
|
||||
|
||||
/* we might have been called just after an asynchronous shutw */
|
||||
@ -773,11 +771,11 @@ static int si_cs_send(struct conn_stream *cs)
|
||||
|
||||
/* We couldn't send all of our data, let the mux know we'd like to send more */
|
||||
if (!channel_is_empty(oc))
|
||||
conn->mux->subscribe(cs, SUB_RETRY_SEND, &si->wait_event);
|
||||
conn->mux->subscribe(cs, SUB_RETRY_SEND, &si->cs->wait_event);
|
||||
return did_send;
|
||||
}
|
||||
|
||||
/* This is the ->process() function for any stream-interface's wait_event task.
|
||||
/* This is the ->process() function for any conn-stream's wait_event task.
|
||||
* It's assigned during the stream-interface's initialization, for any type of
|
||||
* stream interface. Thus it is always safe to perform a tasklet_wakeup() on a
|
||||
* stream interface, as the presence of the CS is checked there.
|
||||
@ -791,9 +789,9 @@ struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned int state)
|
||||
if (!cs_conn(cs))
|
||||
return t;
|
||||
|
||||
if (!(si->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si)))
|
||||
if (!(cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si)))
|
||||
ret = si_cs_send(cs);
|
||||
if (!(si->wait_event.events & SUB_RETRY_RECV))
|
||||
if (!(cs->wait_event.events & SUB_RETRY_RECV))
|
||||
ret |= si_cs_recv(cs);
|
||||
if (ret != 0)
|
||||
si_cs_process(cs);
|
||||
@ -909,7 +907,7 @@ int si_sync_recv(struct stream_interface *si)
|
||||
if (!cs_conn_mux(si->cs))
|
||||
return 0; // only conn_streams are supported
|
||||
|
||||
if (si->wait_event.events & SUB_RETRY_RECV)
|
||||
if (si->cs->wait_event.events & SUB_RETRY_RECV)
|
||||
return 0; // already subscribed
|
||||
|
||||
if (!si_rx_endp_ready(si) || si_rx_blocked(si))
|
||||
@ -1111,7 +1109,7 @@ static void stream_int_chk_rcv_conn(struct stream_interface *si)
|
||||
{
|
||||
/* (re)start reading */
|
||||
if (cs_state_in(si->cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
|
||||
tasklet_wakeup(si->wait_event.tasklet);
|
||||
tasklet_wakeup(si->cs->wait_event.tasklet);
|
||||
}
|
||||
|
||||
|
||||
@ -1138,7 +1136,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
|
||||
!(si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */
|
||||
return;
|
||||
|
||||
if (!(si->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si)))
|
||||
if (!(si->cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si)))
|
||||
si_cs_send(cs);
|
||||
|
||||
if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(si)) {
|
||||
@ -1232,7 +1230,7 @@ static int si_cs_recv(struct conn_stream *cs)
|
||||
/* If another call to si_cs_recv() failed, and we subscribed to
|
||||
* recv events already, give up now.
|
||||
*/
|
||||
if (si->wait_event.events & SUB_RETRY_RECV)
|
||||
if (si->cs->wait_event.events & SUB_RETRY_RECV)
|
||||
return 0;
|
||||
|
||||
/* maybe we were called immediately after an asynchronous shutr */
|
||||
@ -1532,7 +1530,7 @@ static int si_cs_recv(struct conn_stream *cs)
|
||||
}
|
||||
else if (!si_rx_blocked(si)) {
|
||||
/* Subscribe to receive events if we're blocking on I/O */
|
||||
conn->mux->subscribe(cs, SUB_RETRY_RECV, &si->wait_event);
|
||||
conn->mux->subscribe(cs, SUB_RETRY_RECV, &si->cs->wait_event);
|
||||
si_rx_endp_done(si);
|
||||
} else {
|
||||
si_rx_endp_more(si);
|
||||
|
Loading…
Reference in New Issue
Block a user