diff --git a/include/haproxy/conn_stream-t.h b/include/haproxy/conn_stream-t.h index a1e35389f..3491cfed7 100644 --- a/include/haproxy/conn_stream-t.h +++ b/include/haproxy/conn_stream-t.h @@ -157,6 +157,7 @@ struct conn_stream { unsigned int flags; /* CS_FL_* */ unsigned int hcto; /* half-closed timeout (0 = unset) */ + struct wait_event wait_event; /* We're in a wait list */ struct cs_endpoint *endp; /* points to the end point (MUX stream or appctx) */ enum obj_type *app; /* points to the applicative point (stream or check) */ struct stream_interface *si; diff --git a/include/haproxy/stream_interface-t.h b/include/haproxy/stream_interface-t.h index c435b3193..2b44e30cd 100644 --- a/include/haproxy/stream_interface-t.h +++ b/include/haproxy/stream_interface-t.h @@ -63,8 +63,6 @@ struct stream_interface { unsigned int flags; /* SI_FL_* */ struct conn_stream *cs; /* points to the conn-streams that owns the endpoint (connection or applet) */ struct si_ops *ops; /* general operations at the stream interface layer */ - - struct wait_event wait_event; /* We're in a wait list */ }; /* operations available on a stream-interface */ diff --git a/include/haproxy/stream_interface.h b/include/haproxy/stream_interface.h index 9aba8478c..ce2e4595f 100644 --- a/include/haproxy/stream_interface.h +++ b/include/haproxy/stream_interface.h @@ -108,12 +108,6 @@ static inline int si_init(struct stream_interface *si) si->flags &= SI_FL_ISBACK; si->cs = NULL; si->ops = &si_embedded_ops; - si->wait_event.tasklet = tasklet_new(); - if (!si->wait_event.tasklet) - return -1; - si->wait_event.tasklet->process = si_cs_io_cb; - si->wait_event.tasklet->context = si; - si->wait_event.events = 0; return 0; } diff --git a/src/conn_stream.c b/src/conn_stream.c index b63291291..68db216f2 100644 --- a/src/conn_stream.c +++ b/src/conn_stream.c @@ -64,6 +64,9 @@ struct conn_stream *cs_new(struct cs_endpoint *endp) cs->data_cb = NULL; cs->src = NULL; cs->dst = NULL; + cs->wait_event.tasklet = NULL; + cs->wait_event.events = 0; + if (!endp) { endp = cs_endpoint_new(); if (unlikely(!endp)) @@ -156,6 +159,8 @@ void cs_free(struct conn_stream *cs) BUG_ON(!(cs->endp->flags & CS_EP_DETACHED)); cs_endpoint_free(cs->endp); } + if (cs->wait_event.tasklet) + tasklet_free(cs->wait_event.tasklet); pool_free(pool_head_connstream, cs); } @@ -172,6 +177,15 @@ int cs_attach_mux(struct conn_stream *cs, void *target, void *ctx) if (!conn->ctx) conn->ctx = cs; if (cs_strm(cs)) { + if (!cs->wait_event.tasklet) { + cs->wait_event.tasklet = tasklet_new(); + if (!cs->wait_event.tasklet) + return -1; + cs->wait_event.tasklet->process = si_cs_io_cb; + cs->wait_event.tasklet->context = cs->si; + cs->wait_event.events = 0; + } + cs->si->ops = &si_conn_ops; cs->data_cb = &si_conn_cb; } @@ -204,8 +218,19 @@ int cs_attach_strm(struct conn_stream *cs, struct stream *strm) cs->si = si_new(cs); if (unlikely(!cs->si)) return -1; + cs->endp->flags &= ~CS_EP_ORPHAN; if (cs->endp->flags & CS_EP_T_MUX) { + cs->wait_event.tasklet = tasklet_new(); + if (!cs->wait_event.tasklet) { + si_free(cs->si); + cs->si = NULL; + return -1; + } + cs->wait_event.tasklet->process = si_cs_io_cb; + cs->wait_event.tasklet->context = cs->si; + cs->wait_event.events = 0; + cs->si->ops = &si_conn_ops; cs->data_cb = &si_conn_cb; } @@ -237,8 +262,8 @@ void cs_detach_endp(struct conn_stream *cs) if (conn->mux) { /* TODO: handle unsubscribe for healthchecks too */ cs->endp->flags |= CS_EP_ORPHAN; - if (cs->si && cs->si->wait_event.events != 0) - conn->mux->unsubscribe(cs, cs->si->wait_event.events, &cs->si->wait_event); + if (cs->wait_event.events != 0) + conn->mux->unsubscribe(cs, cs->wait_event.events, &cs->wait_event); conn->mux->detach(cs); cs->endp = NULL; } @@ -290,6 +315,12 @@ void cs_detach_app(struct conn_stream *cs) cs->data_cb = NULL; sockaddr_free(&cs->src); sockaddr_free(&cs->dst); + + if (cs->wait_event.tasklet) + tasklet_free(cs->wait_event.tasklet); + cs->wait_event.tasklet = NULL; + cs->wait_event.events = 0; + if (!cs->endp || (cs->endp->flags & CS_EP_DETACHED)) cs_free(cs); } diff --git a/src/stream.c b/src/stream.c index ec6e1f620..ec7c15993 100644 --- a/src/stream.c +++ b/src/stream.c @@ -1502,9 +1502,8 @@ int stream_set_http_mode(struct stream *s, const struct mux_proto_list *mux_prot * mux will probably want to subscribe to * the underlying XPRT */ - if (cs_si(s->csf)->wait_event.events) - conn->mux->unsubscribe(cs, cs_si(s->csf)->wait_event.events, - &(cs_si(s->csf)->wait_event)); + if (s->csf->wait_event.events) + conn->mux->unsubscribe(cs, s->csf->wait_event.events, &(s->csf->wait_event)); if (conn->mux->flags & MX_FL_NO_UPG) return 0; @@ -3278,22 +3277,22 @@ static int stats_dump_full_strm_to_buffer(struct conn_stream *cs, struct stream strm->txn->req.flags, strm->txn->rsp.flags); chunk_appendf(&trash, - " si[0]=%p (flags=0x%02x endp0=%s:%p sub=%d)\n", + " si[0]=%p (flags=0x%02x endp0=%s:%p)\n", strm->csf->si, strm->csf->si->flags, (strm->csf->endp->flags & CS_EP_T_MUX ? "CONN" : "APPCTX"), - __cs_endp_target(strm->csf), strm->csf->si->wait_event.events); + __cs_endp_target(strm->csf)); chunk_appendf(&trash, - " si[1]=%p (flags=0x%02x endp1=%s:%p sub=%d)\n", + " si[1]=%p (flags=0x%02x endp1=%s:%p)\n", strm->csb->si, strm->csb->si->flags, (strm->csb->endp->flags & CS_EP_T_MUX ? "CONN" : "APPCTX"), - __cs_endp_target(strm->csb), strm->csb->si->wait_event.events); + __cs_endp_target(strm->csb)); csf = strm->csf; - chunk_appendf(&trash, " cs=%p csf=0x%08x state=%s endp=%p,0x%08x\n", csf, csf->flags, - cs_state_str(csf->state), csf->endp->target, csf->endp->flags); + chunk_appendf(&trash, " cs=%p csf=0x%08x state=%s endp=%p,0x%08x sub=%d\n", csf, csf->flags, + cs_state_str(csf->state), csf->endp->target, csf->endp->flags, csf->wait_event.events); if ((conn = cs_conn(csf)) != NULL) { chunk_appendf(&trash, @@ -3329,8 +3328,8 @@ static int stats_dump_full_strm_to_buffer(struct conn_stream *cs, struct stream } csb = strm->csb; - chunk_appendf(&trash, " cs=%p csb=0x%08x state=%s endp=%p,0x%08x\n", csb, csb->flags, - cs_state_str(csb->state), csb->endp->target, csb->endp->flags); + chunk_appendf(&trash, " cs=%p csb=0x%08x state=%s endp=%p,0x%08x sub=%d\n", csb, csb->flags, + cs_state_str(csb->state), csb->endp->target, csb->endp->flags, csb->wait_event.events); if ((conn = cs_conn(csb)) != NULL) { chunk_appendf(&trash, " co1=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n", diff --git a/src/stream_interface.c b/src/stream_interface.c index 139b6548c..d206e9cf9 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -124,7 +124,6 @@ void si_free(struct stream_interface *si) if (!si) return; - tasklet_free(si->wait_event.tasklet); pool_free(pool_head_streaminterface, si); } @@ -235,7 +234,6 @@ static void stream_int_chk_rcv(struct stream_interface *si) } else { /* (re)start reading */ - tasklet_wakeup(si->wait_event.tasklet); if (!(si->cs->flags & CS_FL_DONT_WAKE)) task_wakeup(si_task(si), TASK_WOKEN_IO); } @@ -555,7 +553,7 @@ static int si_cs_process(struct conn_stream *cs) BUG_ON(!conn); /* If we have data to send, try it now */ - if (!channel_is_empty(oc) && !(si->wait_event.events & SUB_RETRY_SEND)) + if (!channel_is_empty(oc) && !(si->cs->wait_event.events & SUB_RETRY_SEND)) si_cs_send(cs); /* First step, report to the conn-stream what was detected at the @@ -658,7 +656,7 @@ static int si_cs_send(struct conn_stream *cs) } /* We're already waiting to be able to send, give up */ - if (si->wait_event.events & SUB_RETRY_SEND) + if (si->cs->wait_event.events & SUB_RETRY_SEND) return 0; /* we might have been called just after an asynchronous shutw */ @@ -773,11 +771,11 @@ static int si_cs_send(struct conn_stream *cs) /* We couldn't send all of our data, let the mux know we'd like to send more */ if (!channel_is_empty(oc)) - conn->mux->subscribe(cs, SUB_RETRY_SEND, &si->wait_event); + conn->mux->subscribe(cs, SUB_RETRY_SEND, &si->cs->wait_event); return did_send; } -/* This is the ->process() function for any stream-interface's wait_event task. +/* This is the ->process() function for any conn-stream's wait_event task. * It's assigned during the stream-interface's initialization, for any type of * stream interface. Thus it is always safe to perform a tasklet_wakeup() on a * stream interface, as the presence of the CS is checked there. @@ -791,9 +789,9 @@ struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned int state) if (!cs_conn(cs)) return t; - if (!(si->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si))) + if (!(cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si))) ret = si_cs_send(cs); - if (!(si->wait_event.events & SUB_RETRY_RECV)) + if (!(cs->wait_event.events & SUB_RETRY_RECV)) ret |= si_cs_recv(cs); if (ret != 0) si_cs_process(cs); @@ -909,7 +907,7 @@ int si_sync_recv(struct stream_interface *si) if (!cs_conn_mux(si->cs)) return 0; // only conn_streams are supported - if (si->wait_event.events & SUB_RETRY_RECV) + if (si->cs->wait_event.events & SUB_RETRY_RECV) return 0; // already subscribed if (!si_rx_endp_ready(si) || si_rx_blocked(si)) @@ -1111,7 +1109,7 @@ static void stream_int_chk_rcv_conn(struct stream_interface *si) { /* (re)start reading */ if (cs_state_in(si->cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST)) - tasklet_wakeup(si->wait_event.tasklet); + tasklet_wakeup(si->cs->wait_event.tasklet); } @@ -1138,7 +1136,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si) !(si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */ return; - if (!(si->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si))) + if (!(si->cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si))) si_cs_send(cs); if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(si)) { @@ -1232,7 +1230,7 @@ static int si_cs_recv(struct conn_stream *cs) /* If another call to si_cs_recv() failed, and we subscribed to * recv events already, give up now. */ - if (si->wait_event.events & SUB_RETRY_RECV) + if (si->cs->wait_event.events & SUB_RETRY_RECV) return 0; /* maybe we were called immediately after an asynchronous shutr */ @@ -1532,7 +1530,7 @@ static int si_cs_recv(struct conn_stream *cs) } else if (!si_rx_blocked(si)) { /* Subscribe to receive events if we're blocking on I/O */ - conn->mux->subscribe(cs, SUB_RETRY_RECV, &si->wait_event); + conn->mux->subscribe(cs, SUB_RETRY_RECV, &si->cs->wait_event); si_rx_endp_done(si); } else { si_rx_endp_more(si);