mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2024-12-12 22:44:32 +00:00
MINOR: stream_interface: introduce a new "struct connection" type
We start to move everything needed to manage a connection to a special entity "struct connection". We have the data layer operations and the control operations there. We'll also have more info in the future such as file descriptors and applet contexts, so that in the end it becomes detachable from the stream interface, which will allow connections to be reused between sessions. For now on, we start with minimal changes.
This commit is contained in:
parent
fe7f1ea68e
commit
73b013b070
@ -42,6 +42,16 @@ struct task *stream_int_register_handler_task(struct stream_interface *si,
|
||||
struct task *(*fct)(struct task *));
|
||||
void stream_int_unregister_handler(struct stream_interface *si);
|
||||
|
||||
static inline const struct protocol *si_ctrl(struct stream_interface *si)
|
||||
{
|
||||
return si->conn.ctrl;
|
||||
}
|
||||
|
||||
static inline const struct sock_ops *si_data(struct stream_interface *si)
|
||||
{
|
||||
return si->conn.data;
|
||||
}
|
||||
|
||||
static inline void clear_target(struct target *dest)
|
||||
{
|
||||
dest->type = TARG_TYPE_NONE;
|
||||
@ -98,7 +108,7 @@ static inline struct server *target_srv(struct target *t)
|
||||
|
||||
static inline void stream_interface_prepare(struct stream_interface *si, const struct sock_ops *ops)
|
||||
{
|
||||
memcpy(&si->sock, ops, sizeof(si->sock));
|
||||
si->conn.data = ops;
|
||||
}
|
||||
|
||||
|
||||
@ -108,12 +118,12 @@ static inline void si_get_from_addr(struct stream_interface *si)
|
||||
if (si->flags & SI_FL_FROM_SET)
|
||||
return;
|
||||
|
||||
if (!si->proto || !si->proto->get_src)
|
||||
if (!si_ctrl(si) || !si_ctrl(si)->get_src)
|
||||
return;
|
||||
|
||||
if (si->proto->get_src(si->fd, (struct sockaddr *)&si->addr.from,
|
||||
sizeof(si->addr.from),
|
||||
si->target.type != TARG_TYPE_CLIENT) == -1)
|
||||
if (si_ctrl(si)->get_src(si->fd, (struct sockaddr *)&si->addr.from,
|
||||
sizeof(si->addr.from),
|
||||
si->target.type != TARG_TYPE_CLIENT) == -1)
|
||||
return;
|
||||
si->flags |= SI_FL_FROM_SET;
|
||||
}
|
||||
@ -124,16 +134,53 @@ static inline void si_get_to_addr(struct stream_interface *si)
|
||||
if (si->flags & SI_FL_TO_SET)
|
||||
return;
|
||||
|
||||
if (!si->proto || !si->proto->get_dst)
|
||||
if (!si_ctrl(si) || !si_ctrl(si)->get_dst)
|
||||
return;
|
||||
|
||||
if (si->proto->get_dst(si->fd, (struct sockaddr *)&si->addr.to,
|
||||
sizeof(si->addr.to),
|
||||
si->target.type != TARG_TYPE_CLIENT) == -1)
|
||||
if (si_ctrl(si)->get_dst(si->fd, (struct sockaddr *)&si->addr.to,
|
||||
sizeof(si->addr.to),
|
||||
si->target.type != TARG_TYPE_CLIENT) == -1)
|
||||
return;
|
||||
si->flags |= SI_FL_TO_SET;
|
||||
}
|
||||
|
||||
/* Sends a shutr to the connection using the data layer */
|
||||
static inline void si_shutr(struct stream_interface *si)
|
||||
{
|
||||
si_data(si)->shutr(si);
|
||||
}
|
||||
|
||||
/* Sends a shutw to the connection using the data layer */
|
||||
static inline void si_shutw(struct stream_interface *si)
|
||||
{
|
||||
si_data(si)->shutw(si);
|
||||
}
|
||||
|
||||
/* Calls the data state update on the stream interfaace */
|
||||
static inline void si_update(struct stream_interface *si)
|
||||
{
|
||||
si_data(si)->update(si);
|
||||
}
|
||||
|
||||
/* Calls chk_rcv on the connection using the data layer */
|
||||
static inline void si_chk_rcv(struct stream_interface *si)
|
||||
{
|
||||
si_data(si)->chk_rcv(si);
|
||||
}
|
||||
|
||||
/* Calls chk_snd on the connection using the data layer */
|
||||
static inline void si_chk_snd(struct stream_interface *si)
|
||||
{
|
||||
si_data(si)->chk_snd(si);
|
||||
}
|
||||
|
||||
/* Calls chk_snd on the connection using the ctrl layer */
|
||||
static inline int si_connect(struct stream_interface *si)
|
||||
{
|
||||
if (unlikely(!si_ctrl(si) || !si_ctrl(si)->connect))
|
||||
return SN_ERR_INTERNAL;
|
||||
return si_ctrl(si)->connect(si);
|
||||
}
|
||||
|
||||
#endif /* _PROTO_STREAM_INTERFACE_H */
|
||||
|
||||
|
@ -95,6 +95,18 @@ struct proxy;
|
||||
struct si_applet;
|
||||
struct stream_interface;
|
||||
|
||||
/* This structure describes a connection with its methods and data.
|
||||
* A connection may be performed to proxy or server via a local or remote
|
||||
* socket, and can also be made to an internal applet. It can support
|
||||
* several data schemes (applet, raw, ssl, ...). It can support several
|
||||
* connection control schemes, generally a protocol for socket-oriented
|
||||
* connections, but other methods for applets.
|
||||
*/
|
||||
struct connection {
|
||||
const struct sock_ops *data; /* operations at the data layer */
|
||||
const struct protocol *ctrl; /* operations at the control layer, generally a protocol */
|
||||
};
|
||||
|
||||
struct target {
|
||||
int type;
|
||||
union {
|
||||
@ -138,8 +150,7 @@ struct stream_interface {
|
||||
unsigned int err_type; /* first error detected, one of SI_ET_* */
|
||||
void *err_loc; /* commonly the server, NULL when SI_ET_NONE */
|
||||
|
||||
struct sock_ops sock; /* socket level operations */
|
||||
struct protocol *proto; /* socket protocol */
|
||||
struct connection conn; /* descriptor for a connection */
|
||||
|
||||
void (*release)(struct stream_interface *); /* handler to call after the last close() */
|
||||
|
||||
|
@ -984,14 +984,14 @@ int connect_server(struct session *s)
|
||||
|
||||
/* set the correct protocol on the output stream interface */
|
||||
if (s->target.type == TARG_TYPE_SERVER) {
|
||||
s->req->cons->proto = target_srv(&s->target)->proto;
|
||||
s->req->cons->conn.ctrl = target_srv(&s->target)->proto;
|
||||
stream_interface_prepare(s->req->cons, target_srv(&s->target)->sock);
|
||||
}
|
||||
else if (s->target.type == TARG_TYPE_PROXY) {
|
||||
/* proxies exclusively run on sock_raw right now */
|
||||
s->req->cons->proto = protocol_by_family(s->req->cons->addr.to.ss_family);
|
||||
s->req->cons->conn.ctrl = protocol_by_family(s->req->cons->addr.to.ss_family);
|
||||
stream_interface_prepare(s->req->cons, &sock_raw);
|
||||
if (!s->req->cons->proto)
|
||||
if (!si_ctrl(s->req->cons))
|
||||
return SN_ERR_INTERNAL;
|
||||
}
|
||||
else
|
||||
@ -1010,7 +1010,7 @@ int connect_server(struct session *s)
|
||||
if (s->fe->options2 & PR_O2_SRC_ADDR)
|
||||
s->req->cons->flags |= SI_FL_SRC_ADDR;
|
||||
|
||||
err = s->req->cons->proto->connect(s->req->cons);
|
||||
err = si_connect(s->req->cons);
|
||||
|
||||
if (err != SN_ERR_NONE)
|
||||
return err;
|
||||
|
@ -1379,7 +1379,7 @@ static void cli_io_handler(struct stream_interface *si)
|
||||
/* Let's close for real now. We just close the request
|
||||
* side, the conditions below will complete if needed.
|
||||
*/
|
||||
si->sock.shutw(si);
|
||||
si_shutw(si);
|
||||
break;
|
||||
}
|
||||
else if (si->applet.st0 == STAT_CLI_GETREQ) {
|
||||
@ -1521,7 +1521,7 @@ static void cli_io_handler(struct stream_interface *si)
|
||||
* we forward the close to the request side so that it flows upstream to
|
||||
* the client.
|
||||
*/
|
||||
si->sock.shutw(si);
|
||||
si_shutw(si);
|
||||
}
|
||||
|
||||
if ((req->flags & BF_SHUTW) && (si->state == SI_ST_EST) && (si->applet.st0 < STAT_CLI_OUTPUT)) {
|
||||
@ -1531,12 +1531,12 @@ static void cli_io_handler(struct stream_interface *si)
|
||||
* the client side has closed. So we'll forward this state downstream
|
||||
* on the response buffer.
|
||||
*/
|
||||
si->sock.shutr(si);
|
||||
si_shutr(si);
|
||||
res->flags |= BF_READ_NULL;
|
||||
}
|
||||
|
||||
/* update all other flags and resync with the other side */
|
||||
si->sock.update(si);
|
||||
si_update(si);
|
||||
|
||||
/* we don't want to expire timeouts while we're processing requests */
|
||||
si->ib->rex = TICK_ETERNITY;
|
||||
@ -1738,26 +1738,26 @@ static void http_stats_io_handler(struct stream_interface *si)
|
||||
if (s->txn.meth == HTTP_METH_POST) {
|
||||
if (stats_http_redir(si, s->be->uri_auth)) {
|
||||
si->applet.st0 = 1;
|
||||
si->sock.shutw(si);
|
||||
si_shutw(si);
|
||||
}
|
||||
} else {
|
||||
if (stats_dump_http(si, s->be->uri_auth)) {
|
||||
si->applet.st0 = 1;
|
||||
si->sock.shutw(si);
|
||||
si_shutw(si);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if ((res->flags & BF_SHUTR) && (si->state == SI_ST_EST))
|
||||
si->sock.shutw(si);
|
||||
si_shutw(si);
|
||||
|
||||
if ((req->flags & BF_SHUTW) && (si->state == SI_ST_EST) && si->applet.st0) {
|
||||
si->sock.shutr(si);
|
||||
si_shutr(si);
|
||||
res->flags |= BF_READ_NULL;
|
||||
}
|
||||
|
||||
/* update all other flags and resync with the other side */
|
||||
si->sock.update(si);
|
||||
si_update(si);
|
||||
|
||||
/* we don't want to expire timeouts while we're processing requests */
|
||||
si->ib->rex = TICK_ETERNITY;
|
||||
|
10
src/peers.c
10
src/peers.c
@ -1023,15 +1023,15 @@ incomplete:
|
||||
si->applet.st0 = PEER_SESSION_END;
|
||||
/* fall through */
|
||||
case PEER_SESSION_END: {
|
||||
si->sock.shutw(si);
|
||||
si->sock.shutr(si);
|
||||
si_shutw(si);
|
||||
si_shutr(si);
|
||||
si->ib->flags |= BF_READ_NULL;
|
||||
goto quit;
|
||||
}
|
||||
}
|
||||
}
|
||||
out:
|
||||
si->sock.update(si);
|
||||
si_update(si);
|
||||
si->ob->flags |= BF_READ_DONTWAIT;
|
||||
/* we don't want to expire timeouts while we're processing requests */
|
||||
si->ib->rex = TICK_ETERNITY;
|
||||
@ -1154,7 +1154,7 @@ static struct session *peer_session_create(struct peer *peer, struct peer_sessio
|
||||
s->si[0].state = s->si[0].prev_state = SI_ST_EST;
|
||||
s->si[0].err_type = SI_ET_NONE;
|
||||
s->si[0].err_loc = NULL;
|
||||
s->si[0].proto = NULL;
|
||||
s->si[0].conn.ctrl = NULL;
|
||||
s->si[0].release = NULL;
|
||||
s->si[0].send_proxy_ofs = 0;
|
||||
set_target_client(&s->si[0].target);
|
||||
@ -1173,7 +1173,7 @@ static struct session *peer_session_create(struct peer *peer, struct peer_sessio
|
||||
s->si[1].conn_retries = p->conn_retries;
|
||||
s->si[1].err_type = SI_ET_NONE;
|
||||
s->si[1].err_loc = NULL;
|
||||
s->si[1].proto = peer->proto;
|
||||
s->si[1].conn.ctrl = peer->proto;
|
||||
s->si[1].release = NULL;
|
||||
s->si[1].send_proxy_ofs = 0;
|
||||
set_target_proxy(&s->si[1].target, s->be);
|
||||
|
@ -803,8 +803,8 @@ void perform_http_redirect(struct session *s, struct stream_interface *si)
|
||||
}
|
||||
|
||||
/* prepare to return without error. */
|
||||
si->sock.shutr(si);
|
||||
si->sock.shutw(si);
|
||||
si_shutr(si);
|
||||
si_shutw(si);
|
||||
si->err_type = SI_ET_NONE;
|
||||
si->err_loc = NULL;
|
||||
si->state = SI_ST_CLO;
|
||||
@ -3690,8 +3690,8 @@ void http_end_txn_clean_session(struct session *s)
|
||||
http_silent_debug(__LINE__, s);
|
||||
|
||||
s->req->cons->flags |= SI_FL_NOLINGER | SI_FL_NOHALF;
|
||||
s->req->cons->sock.shutr(s->req->cons);
|
||||
s->req->cons->sock.shutw(s->req->cons);
|
||||
si_shutr(s->req->cons);
|
||||
si_shutw(s->req->cons);
|
||||
|
||||
http_silent_debug(__LINE__, s);
|
||||
|
||||
|
@ -461,8 +461,8 @@ int tcp_connect_server(struct stream_interface *si)
|
||||
fdtab[fd].cb[DIR_WR].f = tcp_connect_write;
|
||||
}
|
||||
else {
|
||||
fdtab[fd].cb[DIR_RD].f = si->sock.read;
|
||||
fdtab[fd].cb[DIR_WR].f = si->sock.write;
|
||||
fdtab[fd].cb[DIR_RD].f = si_data(si)->read;
|
||||
fdtab[fd].cb[DIR_WR].f = si_data(si)->write;
|
||||
}
|
||||
|
||||
fdinfo[fd].peeraddr = (struct sockaddr *)&si->addr.to;
|
||||
@ -602,11 +602,11 @@ static int tcp_connect_write(int fd)
|
||||
/* The FD is ready now, we can hand the handlers to the socket layer
|
||||
* and forward the event there to start working on the socket.
|
||||
*/
|
||||
fdtab[fd].cb[DIR_RD].f = si->sock.read;
|
||||
fdtab[fd].cb[DIR_WR].f = si->sock.write;
|
||||
fdtab[fd].cb[DIR_RD].f = si_data(si)->read;
|
||||
fdtab[fd].cb[DIR_WR].f = si_data(si)->write;
|
||||
fdtab[fd].state = FD_STREADY;
|
||||
si->exp = TICK_ETERNITY;
|
||||
return si->sock.write(fd);
|
||||
return si_data(si)->write(fd);
|
||||
|
||||
out_wakeup:
|
||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||
|
@ -166,7 +166,7 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
|
||||
s->si[0].state = s->si[0].prev_state = SI_ST_EST;
|
||||
s->si[0].err_type = SI_ET_NONE;
|
||||
s->si[0].err_loc = NULL;
|
||||
s->si[0].proto = l->proto;
|
||||
s->si[0].conn.ctrl = l->proto;
|
||||
s->si[0].release = NULL;
|
||||
s->si[0].send_proxy_ofs = 0;
|
||||
set_target_client(&s->si[0].target);
|
||||
@ -191,7 +191,7 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
|
||||
s->si[1].err_type = SI_ET_NONE;
|
||||
s->si[1].conn_retries = 0; /* used for logging too */
|
||||
s->si[1].err_loc = NULL;
|
||||
s->si[1].proto = NULL;
|
||||
s->si[1].conn.ctrl = NULL;
|
||||
s->si[1].release = NULL;
|
||||
s->si[1].send_proxy_ofs = 0;
|
||||
clear_target(&s->si[1].target);
|
||||
@ -279,8 +279,8 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
|
||||
fdtab[cfd].owner = &s->si[0];
|
||||
fdtab[cfd].state = FD_STREADY;
|
||||
fdtab[cfd].flags = 0;
|
||||
fdtab[cfd].cb[DIR_RD].f = s->si[0].sock.read;
|
||||
fdtab[cfd].cb[DIR_WR].f = s->si[0].sock.write;
|
||||
fdtab[cfd].cb[DIR_RD].f = si_data(&s->si[0])->read;
|
||||
fdtab[cfd].cb[DIR_WR].f = si_data(&s->si[0])->write;
|
||||
fdinfo[cfd].peeraddr = (struct sockaddr *)&s->si[0].addr.from;
|
||||
fdinfo[cfd].peerlen = sizeof(s->si[0].addr.from);
|
||||
EV_FD_SET(cfd, DIR_RD);
|
||||
@ -565,7 +565,7 @@ static int sess_update_st_con_tcp(struct session *s, struct stream_interface *si
|
||||
(((req->flags & (BF_OUT_EMPTY|BF_WRITE_ACTIVITY)) == BF_OUT_EMPTY) ||
|
||||
s->be->options & PR_O_ABRT_CLOSE)))) {
|
||||
/* give up */
|
||||
si->sock.shutw(si);
|
||||
si_shutw(si);
|
||||
si->err_type |= SI_ET_CONN_ABRT;
|
||||
si->err_loc = target_srv(&s->target);
|
||||
si->flags &= ~SI_FL_CAP_SPLICE;
|
||||
@ -626,7 +626,7 @@ static int sess_update_st_cer(struct session *s, struct stream_interface *si)
|
||||
process_srv_queue(target_srv(&s->target));
|
||||
|
||||
/* shutw is enough so stop a connecting socket */
|
||||
si->sock.shutw(si);
|
||||
si_shutw(si);
|
||||
si->ob->flags |= BF_WRITE_ERROR;
|
||||
si->ib->flags |= BF_READ_ERROR;
|
||||
|
||||
@ -705,7 +705,7 @@ static void sess_establish(struct session *s, struct stream_interface *si)
|
||||
|
||||
rep->analysers |= s->fe->fe_rsp_ana | s->be->be_rsp_ana;
|
||||
rep->flags |= BF_READ_ATTACHED; /* producer is now attached */
|
||||
if (si->proto) {
|
||||
if (si_ctrl(si)) {
|
||||
/* real connections have timeouts */
|
||||
req->wto = s->be->timeout.server;
|
||||
rep->rto = s->be->timeout.server;
|
||||
@ -765,8 +765,8 @@ static void sess_update_stream_int(struct session *s, struct stream_interface *s
|
||||
process_srv_queue(srv);
|
||||
|
||||
/* Failed and not retryable. */
|
||||
si->sock.shutr(si);
|
||||
si->sock.shutw(si);
|
||||
si_shutr(si);
|
||||
si_shutw(si);
|
||||
si->ob->flags |= BF_WRITE_ERROR;
|
||||
|
||||
s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now);
|
||||
@ -814,8 +814,8 @@ static void sess_update_stream_int(struct session *s, struct stream_interface *s
|
||||
if (srv)
|
||||
srv->counters.failed_conns++;
|
||||
s->be->be_counters.failed_conns++;
|
||||
si->sock.shutr(si);
|
||||
si->sock.shutw(si);
|
||||
si_shutr(si);
|
||||
si_shutw(si);
|
||||
si->ob->flags |= BF_WRITE_TIMEOUT;
|
||||
if (!si->err_type)
|
||||
si->err_type = SI_ET_QUEUE_TO;
|
||||
@ -832,8 +832,8 @@ static void sess_update_stream_int(struct session *s, struct stream_interface *s
|
||||
/* give up */
|
||||
si->exp = TICK_ETERNITY;
|
||||
s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now);
|
||||
si->sock.shutr(si);
|
||||
si->sock.shutw(si);
|
||||
si_shutr(si);
|
||||
si_shutw(si);
|
||||
si->err_type |= SI_ET_QUEUE_ABRT;
|
||||
si->state = SI_ST_CLO;
|
||||
if (s->srv_error)
|
||||
@ -851,8 +851,8 @@ static void sess_update_stream_int(struct session *s, struct stream_interface *s
|
||||
(si->ob->flags & BF_OUT_EMPTY || s->be->options & PR_O_ABRT_CLOSE))) {
|
||||
/* give up */
|
||||
si->exp = TICK_ETERNITY;
|
||||
si->sock.shutr(si);
|
||||
si->sock.shutw(si);
|
||||
si_shutr(si);
|
||||
si_shutw(si);
|
||||
si->err_type |= SI_ET_CONN_ABRT;
|
||||
si->state = SI_ST_CLO;
|
||||
if (s->srv_error)
|
||||
@ -930,8 +930,8 @@ static void sess_prepare_conn_req(struct session *s, struct stream_interface *si
|
||||
return;
|
||||
|
||||
/* we did not get any server, let's check the cause */
|
||||
si->sock.shutr(si);
|
||||
si->sock.shutw(si);
|
||||
si_shutr(si);
|
||||
si_shutw(si);
|
||||
si->ob->flags |= BF_WRITE_ERROR;
|
||||
if (!si->err_type)
|
||||
si->err_type = SI_ET_CONN_OTHER;
|
||||
@ -1344,26 +1344,26 @@ struct task *process_session(struct task *t)
|
||||
|
||||
if (unlikely((s->req->flags & (BF_SHUTW|BF_WRITE_TIMEOUT)) == BF_WRITE_TIMEOUT)) {
|
||||
s->req->cons->flags |= SI_FL_NOLINGER;
|
||||
s->req->cons->sock.shutw(s->req->cons);
|
||||
si_shutw(s->req->cons);
|
||||
}
|
||||
|
||||
if (unlikely((s->req->flags & (BF_SHUTR|BF_READ_TIMEOUT)) == BF_READ_TIMEOUT)) {
|
||||
if (s->req->prod->flags & SI_FL_NOHALF)
|
||||
s->req->prod->flags |= SI_FL_NOLINGER;
|
||||
s->req->prod->sock.shutr(s->req->prod);
|
||||
si_shutr(s->req->prod);
|
||||
}
|
||||
|
||||
buffer_check_timeouts(s->rep);
|
||||
|
||||
if (unlikely((s->rep->flags & (BF_SHUTW|BF_WRITE_TIMEOUT)) == BF_WRITE_TIMEOUT)) {
|
||||
s->rep->cons->flags |= SI_FL_NOLINGER;
|
||||
s->rep->cons->sock.shutw(s->rep->cons);
|
||||
si_shutw(s->rep->cons);
|
||||
}
|
||||
|
||||
if (unlikely((s->rep->flags & (BF_SHUTR|BF_READ_TIMEOUT)) == BF_READ_TIMEOUT)) {
|
||||
if (s->rep->prod->flags & SI_FL_NOHALF)
|
||||
s->rep->prod->flags |= SI_FL_NOLINGER;
|
||||
s->rep->prod->sock.shutr(s->rep->prod);
|
||||
si_shutr(s->rep->prod);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1377,8 +1377,8 @@ struct task *process_session(struct task *t)
|
||||
srv = target_srv(&s->target);
|
||||
if (unlikely(s->si[0].flags & SI_FL_ERR)) {
|
||||
if (s->si[0].state == SI_ST_EST || s->si[0].state == SI_ST_DIS) {
|
||||
s->si[0].sock.shutr(&s->si[0]);
|
||||
s->si[0].sock.shutw(&s->si[0]);
|
||||
si_shutr(&s->si[0]);
|
||||
si_shutw(&s->si[0]);
|
||||
stream_int_report_error(&s->si[0]);
|
||||
if (!(s->req->analysers) && !(s->rep->analysers)) {
|
||||
s->be->be_counters.cli_aborts++;
|
||||
@ -1395,8 +1395,8 @@ struct task *process_session(struct task *t)
|
||||
|
||||
if (unlikely(s->si[1].flags & SI_FL_ERR)) {
|
||||
if (s->si[1].state == SI_ST_EST || s->si[1].state == SI_ST_DIS) {
|
||||
s->si[1].sock.shutr(&s->si[1]);
|
||||
s->si[1].sock.shutw(&s->si[1]);
|
||||
si_shutr(&s->si[1]);
|
||||
si_shutw(&s->si[1]);
|
||||
stream_int_report_error(&s->si[1]);
|
||||
s->be->be_counters.failed_resp++;
|
||||
if (srv)
|
||||
@ -1893,7 +1893,7 @@ struct task *process_session(struct task *t)
|
||||
|
||||
/* shutdown(write) pending */
|
||||
if (unlikely((s->req->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_OUT_EMPTY)) == (BF_SHUTW_NOW|BF_OUT_EMPTY)))
|
||||
s->req->cons->sock.shutw(s->req->cons);
|
||||
si_shutw(s->req->cons);
|
||||
|
||||
/* shutdown(write) done on server side, we must stop the client too */
|
||||
if (unlikely((s->req->flags & (BF_SHUTW|BF_SHUTR|BF_SHUTR_NOW)) == BF_SHUTW &&
|
||||
@ -1904,7 +1904,7 @@ struct task *process_session(struct task *t)
|
||||
if (unlikely((s->req->flags & (BF_SHUTR|BF_SHUTR_NOW)) == BF_SHUTR_NOW)) {
|
||||
if (s->req->prod->flags & SI_FL_NOHALF)
|
||||
s->req->prod->flags |= SI_FL_NOLINGER;
|
||||
s->req->prod->sock.shutr(s->req->prod);
|
||||
si_shutr(s->req->prod);
|
||||
}
|
||||
|
||||
/* it's possible that an upper layer has requested a connection setup or abort.
|
||||
@ -1922,7 +1922,7 @@ struct task *process_session(struct task *t)
|
||||
s->req->cons->state = SI_ST_REQ; /* new connection requested */
|
||||
s->req->cons->conn_retries = s->be->conn_retries;
|
||||
if (unlikely(s->req->cons->target.type == TARG_TYPE_APPLET &&
|
||||
!(s->req->cons->proto && s->req->cons->proto->connect))) {
|
||||
!(si_ctrl(s->req->cons) && si_ctrl(s->req->cons)->connect))) {
|
||||
s->req->cons->state = SI_ST_EST; /* connection established */
|
||||
s->rep->flags |= BF_READ_ATTACHED; /* producer is now attached */
|
||||
s->req->wex = TICK_ETERNITY;
|
||||
@ -2038,7 +2038,7 @@ struct task *process_session(struct task *t)
|
||||
|
||||
/* shutdown(write) pending */
|
||||
if (unlikely((s->rep->flags & (BF_SHUTW|BF_OUT_EMPTY|BF_SHUTW_NOW)) == (BF_OUT_EMPTY|BF_SHUTW_NOW)))
|
||||
s->rep->cons->sock.shutw(s->rep->cons);
|
||||
si_shutw(s->rep->cons);
|
||||
|
||||
/* shutdown(write) done on the client side, we must stop the server too */
|
||||
if (unlikely((s->rep->flags & (BF_SHUTW|BF_SHUTR|BF_SHUTR_NOW)) == BF_SHUTW) &&
|
||||
@ -2049,7 +2049,7 @@ struct task *process_session(struct task *t)
|
||||
if (unlikely((s->rep->flags & (BF_SHUTR|BF_SHUTR_NOW)) == BF_SHUTR_NOW)) {
|
||||
if (s->rep->prod->flags & SI_FL_NOHALF)
|
||||
s->rep->prod->flags |= SI_FL_NOLINGER;
|
||||
s->rep->prod->sock.shutr(s->rep->prod);
|
||||
si_shutr(s->rep->prod);
|
||||
}
|
||||
|
||||
if (s->req->prod->state == SI_ST_DIS || s->req->cons->state == SI_ST_DIS)
|
||||
@ -2101,10 +2101,10 @@ struct task *process_session(struct task *t)
|
||||
session_process_counters(s);
|
||||
|
||||
if (s->rep->cons->state == SI_ST_EST && s->rep->cons->target.type != TARG_TYPE_APPLET)
|
||||
s->rep->cons->sock.update(s->rep->cons);
|
||||
si_update(s->rep->cons);
|
||||
|
||||
if (s->req->cons->state == SI_ST_EST && s->req->cons->target.type != TARG_TYPE_APPLET)
|
||||
s->req->cons->sock.update(s->req->cons);
|
||||
si_update(s->req->cons);
|
||||
|
||||
s->req->flags &= ~(BF_READ_NULL|BF_READ_PARTIAL|BF_WRITE_NULL|BF_WRITE_PARTIAL|BF_READ_ATTACHED);
|
||||
s->rep->flags &= ~(BF_READ_NULL|BF_READ_PARTIAL|BF_WRITE_NULL|BF_WRITE_PARTIAL|BF_READ_ATTACHED);
|
||||
|
@ -36,6 +36,7 @@
|
||||
#include <proto/pipe.h>
|
||||
#include <proto/protocols.h>
|
||||
#include <proto/sock_raw.h>
|
||||
#include <proto/stream_interface.h>
|
||||
#include <proto/task.h>
|
||||
|
||||
#include <types/global.h>
|
||||
@ -103,7 +104,7 @@ static int sock_raw_splice_in(struct buffer *b, struct stream_interface *si)
|
||||
si->flags |= SI_FL_WAIT_ROOM;
|
||||
EV_FD_CLR(fd, DIR_RD);
|
||||
b->rex = TICK_ETERNITY;
|
||||
b->cons->sock.chk_snd(b->cons);
|
||||
si_chk_snd(b->cons);
|
||||
return 1;
|
||||
}
|
||||
|
||||
@ -455,7 +456,7 @@ static int sock_raw_read(int fd)
|
||||
(b->i == 0 && (b->cons->flags & SI_FL_WAIT_DATA))) {
|
||||
int last_len = b->pipe ? b->pipe->data : 0;
|
||||
|
||||
b->cons->sock.chk_snd(b->cons);
|
||||
si_chk_snd(b->cons);
|
||||
|
||||
/* check if the consumer has freed some space */
|
||||
if (!(b->flags & BF_FULL) &&
|
||||
@ -726,7 +727,7 @@ static int sock_raw_write(int fd)
|
||||
/* the producer might be waiting for more room to store data */
|
||||
if (likely((b->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL &&
|
||||
(b->prod->flags & SI_FL_WAIT_ROOM)))
|
||||
b->prod->sock.chk_rcv(b->prod);
|
||||
si_chk_rcv(b->prod);
|
||||
|
||||
/* we have to wake up if there is a special event or if we don't have
|
||||
* any more data to forward and it's not planned to send any more.
|
||||
|
@ -138,7 +138,7 @@ static void stream_int_update_embedded(struct stream_interface *si)
|
||||
return;
|
||||
|
||||
if ((si->ob->flags & (BF_OUT_EMPTY|BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == (BF_OUT_EMPTY|BF_SHUTW_NOW))
|
||||
si->sock.shutw(si);
|
||||
si_shutw(si);
|
||||
|
||||
if ((si->ob->flags & (BF_FULL|BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == 0)
|
||||
si->flags |= SI_FL_WAIT_DATA;
|
||||
@ -164,11 +164,11 @@ static void stream_int_update_embedded(struct stream_interface *si)
|
||||
old_flags = si->flags;
|
||||
if (likely((si->ob->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL &&
|
||||
(si->ob->prod->flags & SI_FL_WAIT_ROOM)))
|
||||
si->ob->prod->sock.chk_rcv(si->ob->prod);
|
||||
si_chk_rcv(si->ob->prod);
|
||||
|
||||
if (((si->ib->flags & (BF_READ_PARTIAL|BF_OUT_EMPTY)) == BF_READ_PARTIAL) &&
|
||||
(si->ib->cons->flags & SI_FL_WAIT_DATA)) {
|
||||
si->ib->cons->sock.chk_snd(si->ib->cons);
|
||||
si_chk_snd(si->ib->cons);
|
||||
/* check if the consumer has freed some space */
|
||||
if (!(si->ib->flags & BF_FULL))
|
||||
si->flags &= ~SI_FL_WAIT_ROOM;
|
||||
@ -339,7 +339,7 @@ struct task *stream_int_register_handler(struct stream_interface *si, struct si_
|
||||
DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", app, si, si->owner);
|
||||
|
||||
stream_interface_prepare(si, &stream_int_embedded);
|
||||
si->proto = NULL;
|
||||
si->conn.ctrl = NULL;
|
||||
set_target_applet(&si->target, app);
|
||||
si->applet.state = 0;
|
||||
si->release = app->release;
|
||||
@ -362,7 +362,7 @@ struct task *stream_int_register_handler_task(struct stream_interface *si,
|
||||
DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", fct, si, si->owner);
|
||||
|
||||
stream_interface_prepare(si, &stream_int_task);
|
||||
si->proto = NULL;
|
||||
si->conn.ctrl = NULL;
|
||||
clear_target(&si->target);
|
||||
si->release = NULL;
|
||||
si->flags |= SI_FL_WAIT_DATA;
|
||||
|
Loading…
Reference in New Issue
Block a user