diff --git a/include/proto/stream_sock.h b/include/proto/stream_sock.h index a3fd992e9..929cb082f 100644 --- a/include/proto/stream_sock.h +++ b/include/proto/stream_sock.h @@ -3,7 +3,7 @@ This file contains client-side definitions. Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu - + This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, version 2.1 @@ -36,6 +36,8 @@ int stream_sock_write(int fd); void stream_sock_data_finish(struct stream_interface *si); void stream_sock_shutr(struct stream_interface *si); void stream_sock_shutw(struct stream_interface *si); +void stream_sock_chk_rcv(struct stream_interface *si); +void stream_sock_chk_snd(struct stream_interface *si); /* This either returns the sockname or the original destination address. Code diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h index d34cfa4a8..bb4a9e3b5 100644 --- a/include/types/stream_interface.h +++ b/include/types/stream_interface.h @@ -78,6 +78,8 @@ struct stream_interface { unsigned int exp; /* wake up time for connect, queue, turn-around, ... */ void (*shutr)(struct stream_interface *); /* shutr function */ void (*shutw)(struct stream_interface *); /* shutw function */ + void (*chk_rcv)(struct stream_interface *);/* chk_rcv function */ + void (*chk_snd)(struct stream_interface *);/* chk_snd function */ struct buffer *ib, *ob; /* input and output buffers */ unsigned int err_type; /* first error detected, one of SI_ET_* */ void *err_loc; /* commonly the server, NULL when SI_ET_NONE */ diff --git a/src/client.c b/src/client.c index 4e8004e76..5885f06f4 100644 --- a/src/client.c +++ b/src/client.c @@ -182,6 +182,8 @@ int event_accept(int fd) { s->si[0].owner = t; s->si[0].shutr = stream_sock_shutr; s->si[0].shutw = stream_sock_shutw; + s->si[0].chk_rcv = stream_sock_chk_rcv; + s->si[0].chk_snd = stream_sock_chk_snd; s->si[0].fd = cfd; s->si[0].flags = SI_FL_NONE; s->si[0].exp = TICK_ETERNITY; @@ -192,6 +194,8 @@ int event_accept(int fd) { s->si[1].owner = t; s->si[1].shutr = stream_sock_shutr; s->si[1].shutw = stream_sock_shutw; + s->si[1].chk_rcv = stream_sock_chk_rcv; + s->si[1].chk_snd = stream_sock_chk_snd; s->si[1].exp = TICK_ETERNITY; s->si[1].fd = -1; /* just to help with debugging */ s->si[1].flags = SI_FL_NONE; diff --git a/src/proto_uxst.c b/src/proto_uxst.c index 3da304999..6c1367936 100644 --- a/src/proto_uxst.c +++ b/src/proto_uxst.c @@ -452,6 +452,8 @@ int uxst_event_accept(int fd) { s->si[0].owner = t; s->si[0].shutr = stream_sock_shutr; s->si[0].shutw = stream_sock_shutw; + s->si[0].chk_rcv = stream_sock_chk_rcv; + s->si[0].chk_snd = stream_sock_chk_snd; s->si[0].fd = cfd; s->si[0].flags = SI_FL_NONE; s->si[0].exp = TICK_ETERNITY; @@ -462,6 +464,8 @@ int uxst_event_accept(int fd) { s->si[1].owner = t; s->si[1].shutr = stream_sock_shutr; s->si[1].shutw = stream_sock_shutw; + s->si[1].chk_rcv = stream_sock_chk_rcv; + s->si[1].chk_snd = stream_sock_chk_snd; s->si[1].exp = TICK_ETERNITY; s->si[1].fd = -1; /* just to help with debugging */ s->si[1].flags = SI_FL_NONE; diff --git a/src/stream_sock.c b/src/stream_sock.c index 72a42c70d..fdd0dbdf3 100644 --- a/src/stream_sock.c +++ b/src/stream_sock.c @@ -244,12 +244,16 @@ int stream_sock_read(int fd) { * have at least read something. */ - if (tick_isset(b->rex) && b->flags & BF_READ_PARTIAL) + if ((b->flags & (BF_READ_PARTIAL|BF_FULL|BF_READ_NOEXP)) == BF_READ_PARTIAL) b->rex = tick_add_ifset(now_ms, b->rto); if (!(b->flags & BF_READ_ACTIVITY)) goto out_skip_wakeup; out_wakeup: + /* the consumer might be waiting for data */ + if (b->cons->flags & SI_FL_WAIT_DATA && (b->flags & BF_READ_PARTIAL)) + b->cons->chk_snd(b->cons); + task_wakeup(si->owner, TASK_WOKEN_IO); out_skip_wakeup: @@ -433,7 +437,7 @@ int stream_sock_write(int fd) { * written something. */ - if (tick_isset(b->wex) && b->flags & BF_WRITE_PARTIAL) { + if ((b->flags & (BF_WRITE_PARTIAL|BF_EMPTY|BF_SHUTW)) == BF_WRITE_PARTIAL) { b->wex = tick_add_ifset(now_ms, b->wto); if (tick_isset(b->wex) & tick_isset(si->ib->rex)) { /* FIXME: to prevent the client from expiring read timeouts during writes, @@ -448,6 +452,10 @@ int stream_sock_write(int fd) { if (!(b->flags & BF_WRITE_ACTIVITY)) goto out_skip_wakeup; out_wakeup: + /* the producer might be waiting for more room to store data */ + if ((b->prod->flags & SI_FL_WAIT_ROOM) && (b->flags & BF_WRITE_PARTIAL)) + b->prod->chk_rcv(b->prod); + task_wakeup(si->owner, TASK_WOKEN_IO); out_skip_wakeup: @@ -579,7 +587,8 @@ void stream_sock_data_finish(struct stream_interface *si) /* Check if we need to close the write side */ if (!(ob->flags & BF_SHUTW)) { /* Write not closed, update FD status and timeout for writes */ - if ((ob->flags & BF_EMPTY) || + if ((ob->send_max == 0) || + (ob->flags & BF_EMPTY) || (ob->flags & (BF_HIJACK|BF_WRITE_ENA)) == 0) { /* stop writing */ if ((ob->flags & (BF_EMPTY|BF_HIJACK|BF_WRITE_ENA)) == (BF_EMPTY|BF_WRITE_ENA)) @@ -609,6 +618,75 @@ void stream_sock_data_finish(struct stream_interface *si) } } +/* This function is used for inter-stream-interface calls. It is called by the + * consumer to inform the producer side that it may be interested in checking + * for free space in the buffer. Note that it intentionally does not update + * timeouts, so that we can still check them later at wake-up. + */ +void stream_sock_chk_rcv(struct stream_interface *si) +{ + struct buffer *ib = si->ib; + + DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n", + now_ms, __FUNCTION__, + fd, fdtab[fd].owner, + ib, ob, + ib->rex, ob->wex, + ib->flags, ob->flags, + ib->l, ob->l, si->state); + + if (unlikely(si->state != SI_ST_EST || (ib->flags & BF_SHUTR))) + return; + + if (ib->flags & (BF_FULL|BF_HIJACK)) { + /* stop reading */ + if ((ib->flags & (BF_FULL|BF_HIJACK)) == BF_FULL) + si->flags |= SI_FL_WAIT_ROOM; + EV_FD_COND_C(si->fd, DIR_RD); + } + else { + /* (re)start reading */ + si->flags &= ~SI_FL_WAIT_ROOM; + EV_FD_COND_S(si->fd, DIR_RD); + } +} + + +/* This function is used for inter-stream-interface calls. It is called by the + * producer to inform the consumer side that it may be interested in checking + * for data in the buffer. Note that it intentionally does not update timeouts, + * so that we can still check them later at wake-up. + */ +void stream_sock_chk_snd(struct stream_interface *si) +{ + struct buffer *ob = si->ob; + + DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n", + now_ms, __FUNCTION__, + fd, fdtab[fd].owner, + ib, ob, + ib->rex, ob->wex, + ib->flags, ob->flags, + ib->l, ob->l, si->state); + + if (unlikely(si->state != SI_ST_EST || (ob->flags & BF_SHUTW))) + return; + + if ((ob->send_max == 0) || + (ob->flags & BF_EMPTY) || + (ob->flags & (BF_HIJACK|BF_WRITE_ENA)) == 0) { + /* stop writing */ + if ((ob->flags & (BF_EMPTY|BF_HIJACK|BF_WRITE_ENA)) == (BF_EMPTY|BF_WRITE_ENA)) + si->flags |= SI_FL_WAIT_DATA; + EV_FD_COND_C(si->fd, DIR_WR); + } + else { + /* (re)start writing. */ + si->flags &= ~SI_FL_WAIT_DATA; + EV_FD_COND_S(si->fd, DIR_WR); + } +} + /* * Local variables: