From 73b013b070ddfe8d38e20c52a5d836e3f2be1838 Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Mon, 21 May 2012 16:31:45 +0200 Subject: [PATCH] MINOR: stream_interface: introduce a new "struct connection" type We start to move everything needed to manage a connection to a special entity "struct connection". We have the data layer operations and the control operations there. We'll also have more info in the future such as file descriptors and applet contexts, so that in the end it becomes detachable from the stream interface, which will allow connections to be reused between sessions. For now on, we start with minimal changes. --- include/proto/stream_interface.h | 65 +++++++++++++++++++++++++++----- include/types/stream_interface.h | 15 +++++++- src/backend.c | 8 ++-- src/dumpstats.c | 18 ++++----- src/peers.c | 10 ++--- src/proto_http.c | 8 ++-- src/proto_tcp.c | 10 ++--- src/session.c | 64 +++++++++++++++---------------- src/sock_raw.c | 7 ++-- src/stream_interface.c | 10 ++--- 10 files changed, 137 insertions(+), 78 deletions(-) diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index ad5ae6871..9a619f862 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -42,6 +42,16 @@ struct task *stream_int_register_handler_task(struct stream_interface *si, struct task *(*fct)(struct task *)); void stream_int_unregister_handler(struct stream_interface *si); +static inline const struct protocol *si_ctrl(struct stream_interface *si) +{ + return si->conn.ctrl; +} + +static inline const struct sock_ops *si_data(struct stream_interface *si) +{ + return si->conn.data; +} + static inline void clear_target(struct target *dest) { dest->type = TARG_TYPE_NONE; @@ -98,7 +108,7 @@ static inline struct server *target_srv(struct target *t) static inline void stream_interface_prepare(struct stream_interface *si, const struct sock_ops *ops) { - memcpy(&si->sock, ops, sizeof(si->sock)); + si->conn.data = ops; } @@ -108,12 +118,12 @@ static inline void si_get_from_addr(struct stream_interface *si) if (si->flags & SI_FL_FROM_SET) return; - if (!si->proto || !si->proto->get_src) + if (!si_ctrl(si) || !si_ctrl(si)->get_src) return; - if (si->proto->get_src(si->fd, (struct sockaddr *)&si->addr.from, - sizeof(si->addr.from), - si->target.type != TARG_TYPE_CLIENT) == -1) + if (si_ctrl(si)->get_src(si->fd, (struct sockaddr *)&si->addr.from, + sizeof(si->addr.from), + si->target.type != TARG_TYPE_CLIENT) == -1) return; si->flags |= SI_FL_FROM_SET; } @@ -124,16 +134,53 @@ static inline void si_get_to_addr(struct stream_interface *si) if (si->flags & SI_FL_TO_SET) return; - if (!si->proto || !si->proto->get_dst) + if (!si_ctrl(si) || !si_ctrl(si)->get_dst) return; - if (si->proto->get_dst(si->fd, (struct sockaddr *)&si->addr.to, - sizeof(si->addr.to), - si->target.type != TARG_TYPE_CLIENT) == -1) + if (si_ctrl(si)->get_dst(si->fd, (struct sockaddr *)&si->addr.to, + sizeof(si->addr.to), + si->target.type != TARG_TYPE_CLIENT) == -1) return; si->flags |= SI_FL_TO_SET; } +/* Sends a shutr to the connection using the data layer */ +static inline void si_shutr(struct stream_interface *si) +{ + si_data(si)->shutr(si); +} + +/* Sends a shutw to the connection using the data layer */ +static inline void si_shutw(struct stream_interface *si) +{ + si_data(si)->shutw(si); +} + +/* Calls the data state update on the stream interfaace */ +static inline void si_update(struct stream_interface *si) +{ + si_data(si)->update(si); +} + +/* Calls chk_rcv on the connection using the data layer */ +static inline void si_chk_rcv(struct stream_interface *si) +{ + si_data(si)->chk_rcv(si); +} + +/* Calls chk_snd on the connection using the data layer */ +static inline void si_chk_snd(struct stream_interface *si) +{ + si_data(si)->chk_snd(si); +} + +/* Calls chk_snd on the connection using the ctrl layer */ +static inline int si_connect(struct stream_interface *si) +{ + if (unlikely(!si_ctrl(si) || !si_ctrl(si)->connect)) + return SN_ERR_INTERNAL; + return si_ctrl(si)->connect(si); +} #endif /* _PROTO_STREAM_INTERFACE_H */ diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h index 0eb3730c8..d573c585e 100644 --- a/include/types/stream_interface.h +++ b/include/types/stream_interface.h @@ -95,6 +95,18 @@ struct proxy; struct si_applet; struct stream_interface; +/* This structure describes a connection with its methods and data. + * A connection may be performed to proxy or server via a local or remote + * socket, and can also be made to an internal applet. It can support + * several data schemes (applet, raw, ssl, ...). It can support several + * connection control schemes, generally a protocol for socket-oriented + * connections, but other methods for applets. + */ +struct connection { + const struct sock_ops *data; /* operations at the data layer */ + const struct protocol *ctrl; /* operations at the control layer, generally a protocol */ +}; + struct target { int type; union { @@ -138,8 +150,7 @@ struct stream_interface { unsigned int err_type; /* first error detected, one of SI_ET_* */ void *err_loc; /* commonly the server, NULL when SI_ET_NONE */ - struct sock_ops sock; /* socket level operations */ - struct protocol *proto; /* socket protocol */ + struct connection conn; /* descriptor for a connection */ void (*release)(struct stream_interface *); /* handler to call after the last close() */ diff --git a/src/backend.c b/src/backend.c index 9a5b1968f..31f1a0fbd 100644 --- a/src/backend.c +++ b/src/backend.c @@ -984,14 +984,14 @@ int connect_server(struct session *s) /* set the correct protocol on the output stream interface */ if (s->target.type == TARG_TYPE_SERVER) { - s->req->cons->proto = target_srv(&s->target)->proto; + s->req->cons->conn.ctrl = target_srv(&s->target)->proto; stream_interface_prepare(s->req->cons, target_srv(&s->target)->sock); } else if (s->target.type == TARG_TYPE_PROXY) { /* proxies exclusively run on sock_raw right now */ - s->req->cons->proto = protocol_by_family(s->req->cons->addr.to.ss_family); + s->req->cons->conn.ctrl = protocol_by_family(s->req->cons->addr.to.ss_family); stream_interface_prepare(s->req->cons, &sock_raw); - if (!s->req->cons->proto) + if (!si_ctrl(s->req->cons)) return SN_ERR_INTERNAL; } else @@ -1010,7 +1010,7 @@ int connect_server(struct session *s) if (s->fe->options2 & PR_O2_SRC_ADDR) s->req->cons->flags |= SI_FL_SRC_ADDR; - err = s->req->cons->proto->connect(s->req->cons); + err = si_connect(s->req->cons); if (err != SN_ERR_NONE) return err; diff --git a/src/dumpstats.c b/src/dumpstats.c index 4b051d7bf..65f076da9 100644 --- a/src/dumpstats.c +++ b/src/dumpstats.c @@ -1379,7 +1379,7 @@ static void cli_io_handler(struct stream_interface *si) /* Let's close for real now. We just close the request * side, the conditions below will complete if needed. */ - si->sock.shutw(si); + si_shutw(si); break; } else if (si->applet.st0 == STAT_CLI_GETREQ) { @@ -1521,7 +1521,7 @@ static void cli_io_handler(struct stream_interface *si) * we forward the close to the request side so that it flows upstream to * the client. */ - si->sock.shutw(si); + si_shutw(si); } if ((req->flags & BF_SHUTW) && (si->state == SI_ST_EST) && (si->applet.st0 < STAT_CLI_OUTPUT)) { @@ -1531,12 +1531,12 @@ static void cli_io_handler(struct stream_interface *si) * the client side has closed. So we'll forward this state downstream * on the response buffer. */ - si->sock.shutr(si); + si_shutr(si); res->flags |= BF_READ_NULL; } /* update all other flags and resync with the other side */ - si->sock.update(si); + si_update(si); /* we don't want to expire timeouts while we're processing requests */ si->ib->rex = TICK_ETERNITY; @@ -1738,26 +1738,26 @@ static void http_stats_io_handler(struct stream_interface *si) if (s->txn.meth == HTTP_METH_POST) { if (stats_http_redir(si, s->be->uri_auth)) { si->applet.st0 = 1; - si->sock.shutw(si); + si_shutw(si); } } else { if (stats_dump_http(si, s->be->uri_auth)) { si->applet.st0 = 1; - si->sock.shutw(si); + si_shutw(si); } } } if ((res->flags & BF_SHUTR) && (si->state == SI_ST_EST)) - si->sock.shutw(si); + si_shutw(si); if ((req->flags & BF_SHUTW) && (si->state == SI_ST_EST) && si->applet.st0) { - si->sock.shutr(si); + si_shutr(si); res->flags |= BF_READ_NULL; } /* update all other flags and resync with the other side */ - si->sock.update(si); + si_update(si); /* we don't want to expire timeouts while we're processing requests */ si->ib->rex = TICK_ETERNITY; diff --git a/src/peers.c b/src/peers.c index f39c7df35..db22a843d 100644 --- a/src/peers.c +++ b/src/peers.c @@ -1023,15 +1023,15 @@ incomplete: si->applet.st0 = PEER_SESSION_END; /* fall through */ case PEER_SESSION_END: { - si->sock.shutw(si); - si->sock.shutr(si); + si_shutw(si); + si_shutr(si); si->ib->flags |= BF_READ_NULL; goto quit; } } } out: - si->sock.update(si); + si_update(si); si->ob->flags |= BF_READ_DONTWAIT; /* we don't want to expire timeouts while we're processing requests */ si->ib->rex = TICK_ETERNITY; @@ -1154,7 +1154,7 @@ static struct session *peer_session_create(struct peer *peer, struct peer_sessio s->si[0].state = s->si[0].prev_state = SI_ST_EST; s->si[0].err_type = SI_ET_NONE; s->si[0].err_loc = NULL; - s->si[0].proto = NULL; + s->si[0].conn.ctrl = NULL; s->si[0].release = NULL; s->si[0].send_proxy_ofs = 0; set_target_client(&s->si[0].target); @@ -1173,7 +1173,7 @@ static struct session *peer_session_create(struct peer *peer, struct peer_sessio s->si[1].conn_retries = p->conn_retries; s->si[1].err_type = SI_ET_NONE; s->si[1].err_loc = NULL; - s->si[1].proto = peer->proto; + s->si[1].conn.ctrl = peer->proto; s->si[1].release = NULL; s->si[1].send_proxy_ofs = 0; set_target_proxy(&s->si[1].target, s->be); diff --git a/src/proto_http.c b/src/proto_http.c index 157694f40..850589eb6 100644 --- a/src/proto_http.c +++ b/src/proto_http.c @@ -803,8 +803,8 @@ void perform_http_redirect(struct session *s, struct stream_interface *si) } /* prepare to return without error. */ - si->sock.shutr(si); - si->sock.shutw(si); + si_shutr(si); + si_shutw(si); si->err_type = SI_ET_NONE; si->err_loc = NULL; si->state = SI_ST_CLO; @@ -3690,8 +3690,8 @@ void http_end_txn_clean_session(struct session *s) http_silent_debug(__LINE__, s); s->req->cons->flags |= SI_FL_NOLINGER | SI_FL_NOHALF; - s->req->cons->sock.shutr(s->req->cons); - s->req->cons->sock.shutw(s->req->cons); + si_shutr(s->req->cons); + si_shutw(s->req->cons); http_silent_debug(__LINE__, s); diff --git a/src/proto_tcp.c b/src/proto_tcp.c index 9931d28d7..02b20dd4a 100644 --- a/src/proto_tcp.c +++ b/src/proto_tcp.c @@ -461,8 +461,8 @@ int tcp_connect_server(struct stream_interface *si) fdtab[fd].cb[DIR_WR].f = tcp_connect_write; } else { - fdtab[fd].cb[DIR_RD].f = si->sock.read; - fdtab[fd].cb[DIR_WR].f = si->sock.write; + fdtab[fd].cb[DIR_RD].f = si_data(si)->read; + fdtab[fd].cb[DIR_WR].f = si_data(si)->write; } fdinfo[fd].peeraddr = (struct sockaddr *)&si->addr.to; @@ -602,11 +602,11 @@ static int tcp_connect_write(int fd) /* The FD is ready now, we can hand the handlers to the socket layer * and forward the event there to start working on the socket. */ - fdtab[fd].cb[DIR_RD].f = si->sock.read; - fdtab[fd].cb[DIR_WR].f = si->sock.write; + fdtab[fd].cb[DIR_RD].f = si_data(si)->read; + fdtab[fd].cb[DIR_WR].f = si_data(si)->write; fdtab[fd].state = FD_STREADY; si->exp = TICK_ETERNITY; - return si->sock.write(fd); + return si_data(si)->write(fd); out_wakeup: task_wakeup(si->owner, TASK_WOKEN_IO); diff --git a/src/session.c b/src/session.c index ed289b443..8a2704148 100644 --- a/src/session.c +++ b/src/session.c @@ -166,7 +166,7 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) s->si[0].state = s->si[0].prev_state = SI_ST_EST; s->si[0].err_type = SI_ET_NONE; s->si[0].err_loc = NULL; - s->si[0].proto = l->proto; + s->si[0].conn.ctrl = l->proto; s->si[0].release = NULL; s->si[0].send_proxy_ofs = 0; set_target_client(&s->si[0].target); @@ -191,7 +191,7 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) s->si[1].err_type = SI_ET_NONE; s->si[1].conn_retries = 0; /* used for logging too */ s->si[1].err_loc = NULL; - s->si[1].proto = NULL; + s->si[1].conn.ctrl = NULL; s->si[1].release = NULL; s->si[1].send_proxy_ofs = 0; clear_target(&s->si[1].target); @@ -279,8 +279,8 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) fdtab[cfd].owner = &s->si[0]; fdtab[cfd].state = FD_STREADY; fdtab[cfd].flags = 0; - fdtab[cfd].cb[DIR_RD].f = s->si[0].sock.read; - fdtab[cfd].cb[DIR_WR].f = s->si[0].sock.write; + fdtab[cfd].cb[DIR_RD].f = si_data(&s->si[0])->read; + fdtab[cfd].cb[DIR_WR].f = si_data(&s->si[0])->write; fdinfo[cfd].peeraddr = (struct sockaddr *)&s->si[0].addr.from; fdinfo[cfd].peerlen = sizeof(s->si[0].addr.from); EV_FD_SET(cfd, DIR_RD); @@ -565,7 +565,7 @@ static int sess_update_st_con_tcp(struct session *s, struct stream_interface *si (((req->flags & (BF_OUT_EMPTY|BF_WRITE_ACTIVITY)) == BF_OUT_EMPTY) || s->be->options & PR_O_ABRT_CLOSE)))) { /* give up */ - si->sock.shutw(si); + si_shutw(si); si->err_type |= SI_ET_CONN_ABRT; si->err_loc = target_srv(&s->target); si->flags &= ~SI_FL_CAP_SPLICE; @@ -626,7 +626,7 @@ static int sess_update_st_cer(struct session *s, struct stream_interface *si) process_srv_queue(target_srv(&s->target)); /* shutw is enough so stop a connecting socket */ - si->sock.shutw(si); + si_shutw(si); si->ob->flags |= BF_WRITE_ERROR; si->ib->flags |= BF_READ_ERROR; @@ -705,7 +705,7 @@ static void sess_establish(struct session *s, struct stream_interface *si) rep->analysers |= s->fe->fe_rsp_ana | s->be->be_rsp_ana; rep->flags |= BF_READ_ATTACHED; /* producer is now attached */ - if (si->proto) { + if (si_ctrl(si)) { /* real connections have timeouts */ req->wto = s->be->timeout.server; rep->rto = s->be->timeout.server; @@ -765,8 +765,8 @@ static void sess_update_stream_int(struct session *s, struct stream_interface *s process_srv_queue(srv); /* Failed and not retryable. */ - si->sock.shutr(si); - si->sock.shutw(si); + si_shutr(si); + si_shutw(si); si->ob->flags |= BF_WRITE_ERROR; s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now); @@ -814,8 +814,8 @@ static void sess_update_stream_int(struct session *s, struct stream_interface *s if (srv) srv->counters.failed_conns++; s->be->be_counters.failed_conns++; - si->sock.shutr(si); - si->sock.shutw(si); + si_shutr(si); + si_shutw(si); si->ob->flags |= BF_WRITE_TIMEOUT; if (!si->err_type) si->err_type = SI_ET_QUEUE_TO; @@ -832,8 +832,8 @@ static void sess_update_stream_int(struct session *s, struct stream_interface *s /* give up */ si->exp = TICK_ETERNITY; s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now); - si->sock.shutr(si); - si->sock.shutw(si); + si_shutr(si); + si_shutw(si); si->err_type |= SI_ET_QUEUE_ABRT; si->state = SI_ST_CLO; if (s->srv_error) @@ -851,8 +851,8 @@ static void sess_update_stream_int(struct session *s, struct stream_interface *s (si->ob->flags & BF_OUT_EMPTY || s->be->options & PR_O_ABRT_CLOSE))) { /* give up */ si->exp = TICK_ETERNITY; - si->sock.shutr(si); - si->sock.shutw(si); + si_shutr(si); + si_shutw(si); si->err_type |= SI_ET_CONN_ABRT; si->state = SI_ST_CLO; if (s->srv_error) @@ -930,8 +930,8 @@ static void sess_prepare_conn_req(struct session *s, struct stream_interface *si return; /* we did not get any server, let's check the cause */ - si->sock.shutr(si); - si->sock.shutw(si); + si_shutr(si); + si_shutw(si); si->ob->flags |= BF_WRITE_ERROR; if (!si->err_type) si->err_type = SI_ET_CONN_OTHER; @@ -1344,26 +1344,26 @@ struct task *process_session(struct task *t) if (unlikely((s->req->flags & (BF_SHUTW|BF_WRITE_TIMEOUT)) == BF_WRITE_TIMEOUT)) { s->req->cons->flags |= SI_FL_NOLINGER; - s->req->cons->sock.shutw(s->req->cons); + si_shutw(s->req->cons); } if (unlikely((s->req->flags & (BF_SHUTR|BF_READ_TIMEOUT)) == BF_READ_TIMEOUT)) { if (s->req->prod->flags & SI_FL_NOHALF) s->req->prod->flags |= SI_FL_NOLINGER; - s->req->prod->sock.shutr(s->req->prod); + si_shutr(s->req->prod); } buffer_check_timeouts(s->rep); if (unlikely((s->rep->flags & (BF_SHUTW|BF_WRITE_TIMEOUT)) == BF_WRITE_TIMEOUT)) { s->rep->cons->flags |= SI_FL_NOLINGER; - s->rep->cons->sock.shutw(s->rep->cons); + si_shutw(s->rep->cons); } if (unlikely((s->rep->flags & (BF_SHUTR|BF_READ_TIMEOUT)) == BF_READ_TIMEOUT)) { if (s->rep->prod->flags & SI_FL_NOHALF) s->rep->prod->flags |= SI_FL_NOLINGER; - s->rep->prod->sock.shutr(s->rep->prod); + si_shutr(s->rep->prod); } } @@ -1377,8 +1377,8 @@ struct task *process_session(struct task *t) srv = target_srv(&s->target); if (unlikely(s->si[0].flags & SI_FL_ERR)) { if (s->si[0].state == SI_ST_EST || s->si[0].state == SI_ST_DIS) { - s->si[0].sock.shutr(&s->si[0]); - s->si[0].sock.shutw(&s->si[0]); + si_shutr(&s->si[0]); + si_shutw(&s->si[0]); stream_int_report_error(&s->si[0]); if (!(s->req->analysers) && !(s->rep->analysers)) { s->be->be_counters.cli_aborts++; @@ -1395,8 +1395,8 @@ struct task *process_session(struct task *t) if (unlikely(s->si[1].flags & SI_FL_ERR)) { if (s->si[1].state == SI_ST_EST || s->si[1].state == SI_ST_DIS) { - s->si[1].sock.shutr(&s->si[1]); - s->si[1].sock.shutw(&s->si[1]); + si_shutr(&s->si[1]); + si_shutw(&s->si[1]); stream_int_report_error(&s->si[1]); s->be->be_counters.failed_resp++; if (srv) @@ -1893,7 +1893,7 @@ struct task *process_session(struct task *t) /* shutdown(write) pending */ if (unlikely((s->req->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_OUT_EMPTY)) == (BF_SHUTW_NOW|BF_OUT_EMPTY))) - s->req->cons->sock.shutw(s->req->cons); + si_shutw(s->req->cons); /* shutdown(write) done on server side, we must stop the client too */ if (unlikely((s->req->flags & (BF_SHUTW|BF_SHUTR|BF_SHUTR_NOW)) == BF_SHUTW && @@ -1904,7 +1904,7 @@ struct task *process_session(struct task *t) if (unlikely((s->req->flags & (BF_SHUTR|BF_SHUTR_NOW)) == BF_SHUTR_NOW)) { if (s->req->prod->flags & SI_FL_NOHALF) s->req->prod->flags |= SI_FL_NOLINGER; - s->req->prod->sock.shutr(s->req->prod); + si_shutr(s->req->prod); } /* it's possible that an upper layer has requested a connection setup or abort. @@ -1922,7 +1922,7 @@ struct task *process_session(struct task *t) s->req->cons->state = SI_ST_REQ; /* new connection requested */ s->req->cons->conn_retries = s->be->conn_retries; if (unlikely(s->req->cons->target.type == TARG_TYPE_APPLET && - !(s->req->cons->proto && s->req->cons->proto->connect))) { + !(si_ctrl(s->req->cons) && si_ctrl(s->req->cons)->connect))) { s->req->cons->state = SI_ST_EST; /* connection established */ s->rep->flags |= BF_READ_ATTACHED; /* producer is now attached */ s->req->wex = TICK_ETERNITY; @@ -2038,7 +2038,7 @@ struct task *process_session(struct task *t) /* shutdown(write) pending */ if (unlikely((s->rep->flags & (BF_SHUTW|BF_OUT_EMPTY|BF_SHUTW_NOW)) == (BF_OUT_EMPTY|BF_SHUTW_NOW))) - s->rep->cons->sock.shutw(s->rep->cons); + si_shutw(s->rep->cons); /* shutdown(write) done on the client side, we must stop the server too */ if (unlikely((s->rep->flags & (BF_SHUTW|BF_SHUTR|BF_SHUTR_NOW)) == BF_SHUTW) && @@ -2049,7 +2049,7 @@ struct task *process_session(struct task *t) if (unlikely((s->rep->flags & (BF_SHUTR|BF_SHUTR_NOW)) == BF_SHUTR_NOW)) { if (s->rep->prod->flags & SI_FL_NOHALF) s->rep->prod->flags |= SI_FL_NOLINGER; - s->rep->prod->sock.shutr(s->rep->prod); + si_shutr(s->rep->prod); } if (s->req->prod->state == SI_ST_DIS || s->req->cons->state == SI_ST_DIS) @@ -2101,10 +2101,10 @@ struct task *process_session(struct task *t) session_process_counters(s); if (s->rep->cons->state == SI_ST_EST && s->rep->cons->target.type != TARG_TYPE_APPLET) - s->rep->cons->sock.update(s->rep->cons); + si_update(s->rep->cons); if (s->req->cons->state == SI_ST_EST && s->req->cons->target.type != TARG_TYPE_APPLET) - s->req->cons->sock.update(s->req->cons); + si_update(s->req->cons); s->req->flags &= ~(BF_READ_NULL|BF_READ_PARTIAL|BF_WRITE_NULL|BF_WRITE_PARTIAL|BF_READ_ATTACHED); s->rep->flags &= ~(BF_READ_NULL|BF_READ_PARTIAL|BF_WRITE_NULL|BF_WRITE_PARTIAL|BF_READ_ATTACHED); diff --git a/src/sock_raw.c b/src/sock_raw.c index af260274e..143d14511 100644 --- a/src/sock_raw.c +++ b/src/sock_raw.c @@ -36,6 +36,7 @@ #include #include #include +#include #include #include @@ -103,7 +104,7 @@ static int sock_raw_splice_in(struct buffer *b, struct stream_interface *si) si->flags |= SI_FL_WAIT_ROOM; EV_FD_CLR(fd, DIR_RD); b->rex = TICK_ETERNITY; - b->cons->sock.chk_snd(b->cons); + si_chk_snd(b->cons); return 1; } @@ -455,7 +456,7 @@ static int sock_raw_read(int fd) (b->i == 0 && (b->cons->flags & SI_FL_WAIT_DATA))) { int last_len = b->pipe ? b->pipe->data : 0; - b->cons->sock.chk_snd(b->cons); + si_chk_snd(b->cons); /* check if the consumer has freed some space */ if (!(b->flags & BF_FULL) && @@ -726,7 +727,7 @@ static int sock_raw_write(int fd) /* the producer might be waiting for more room to store data */ if (likely((b->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL && (b->prod->flags & SI_FL_WAIT_ROOM))) - b->prod->sock.chk_rcv(b->prod); + si_chk_rcv(b->prod); /* we have to wake up if there is a special event or if we don't have * any more data to forward and it's not planned to send any more. diff --git a/src/stream_interface.c b/src/stream_interface.c index 9dfda93a2..c70ee35e7 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -138,7 +138,7 @@ static void stream_int_update_embedded(struct stream_interface *si) return; if ((si->ob->flags & (BF_OUT_EMPTY|BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == (BF_OUT_EMPTY|BF_SHUTW_NOW)) - si->sock.shutw(si); + si_shutw(si); if ((si->ob->flags & (BF_FULL|BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == 0) si->flags |= SI_FL_WAIT_DATA; @@ -164,11 +164,11 @@ static void stream_int_update_embedded(struct stream_interface *si) old_flags = si->flags; if (likely((si->ob->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL && (si->ob->prod->flags & SI_FL_WAIT_ROOM))) - si->ob->prod->sock.chk_rcv(si->ob->prod); + si_chk_rcv(si->ob->prod); if (((si->ib->flags & (BF_READ_PARTIAL|BF_OUT_EMPTY)) == BF_READ_PARTIAL) && (si->ib->cons->flags & SI_FL_WAIT_DATA)) { - si->ib->cons->sock.chk_snd(si->ib->cons); + si_chk_snd(si->ib->cons); /* check if the consumer has freed some space */ if (!(si->ib->flags & BF_FULL)) si->flags &= ~SI_FL_WAIT_ROOM; @@ -339,7 +339,7 @@ struct task *stream_int_register_handler(struct stream_interface *si, struct si_ DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", app, si, si->owner); stream_interface_prepare(si, &stream_int_embedded); - si->proto = NULL; + si->conn.ctrl = NULL; set_target_applet(&si->target, app); si->applet.state = 0; si->release = app->release; @@ -362,7 +362,7 @@ struct task *stream_int_register_handler_task(struct stream_interface *si, DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", fct, si, si->owner); stream_interface_prepare(si, &stream_int_task); - si->proto = NULL; + si->conn.ctrl = NULL; clear_target(&si->target); si->release = NULL; si->flags |= SI_FL_WAIT_DATA;