mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2024-12-13 23:14:46 +00:00
[MEDIUM] stream_sock: try to send pending data on chk_snd()
When the producer calls stream_sock_chk_snd(), we now try to send all pending data asynchronously. If it succeeds, we don't have to enable polling on the FD which saves about half of the calls to epoll_wait(). In stream_sock_read(), we finally set the WAIT_ROOM flag as soon as possible, in preparation of the splice code. We reset it when we detect that some room has been released either in the buffer or in the splice.
This commit is contained in:
parent
d2def0fd25
commit
a456f2a059
@ -80,6 +80,7 @@ int stream_sock_read(int fd) {
|
||||
|
||||
if (max == 0) {
|
||||
b->flags |= BF_FULL;
|
||||
si->flags |= SI_FL_WAIT_ROOM;
|
||||
break;
|
||||
}
|
||||
|
||||
@ -158,6 +159,7 @@ int stream_sock_read(int fd) {
|
||||
}
|
||||
|
||||
b->flags |= BF_FULL;
|
||||
si->flags |= SI_FL_WAIT_ROOM;
|
||||
break;
|
||||
}
|
||||
|
||||
@ -232,29 +234,33 @@ int stream_sock_read(int fd) {
|
||||
|
||||
out_wakeup:
|
||||
/* We might have some data the consumer is waiting for */
|
||||
if (likely((b->send_max || b->splice_len) &&
|
||||
(b->cons->flags & SI_FL_WAIT_DATA)))
|
||||
if ((b->send_max || b->splice_len) && (b->cons->flags & SI_FL_WAIT_DATA)) {
|
||||
int last_len = b->splice_len;
|
||||
|
||||
b->cons->chk_snd(b->cons);
|
||||
|
||||
/* note that the consumer might have cleared BF_FULL */
|
||||
if ((b->flags & (BF_READ_PARTIAL|BF_FULL|BF_READ_NOEXP)) == BF_READ_PARTIAL)
|
||||
b->rex = tick_add_ifset(now_ms, b->rto);
|
||||
else if (b->flags & BF_FULL) {
|
||||
si->flags |= SI_FL_WAIT_ROOM;
|
||||
/* check if the consumer has freed some space */
|
||||
if (!(b->flags & BF_FULL) && (!last_len || b->splice_len < last_len))
|
||||
si->flags &= ~SI_FL_WAIT_ROOM;
|
||||
}
|
||||
|
||||
if (si->flags & SI_FL_WAIT_ROOM) {
|
||||
EV_FD_CLR(fd, DIR_RD);
|
||||
b->rex = TICK_ETERNITY;
|
||||
}
|
||||
else if ((b->flags & (BF_READ_PARTIAL|BF_FULL|BF_READ_NOEXP)) == BF_READ_PARTIAL)
|
||||
b->rex = tick_add_ifset(now_ms, b->rto);
|
||||
|
||||
/* we have to wake up if there is a special event or if we don't have
|
||||
* any more data to forward.
|
||||
*/
|
||||
if (likely((b->flags & (BF_READ_NULL|BF_READ_ERROR|BF_SHUTR)) ||
|
||||
!b->to_forward ||
|
||||
si->state != SI_ST_EST ||
|
||||
b->cons->state != SI_ST_EST ||
|
||||
(si->flags & SI_FL_ERR)))
|
||||
if ((b->flags & (BF_READ_NULL|BF_READ_ERROR|BF_SHUTR)) ||
|
||||
!b->to_forward ||
|
||||
si->state != SI_ST_EST ||
|
||||
b->cons->state != SI_ST_EST ||
|
||||
(si->flags & SI_FL_ERR))
|
||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||
|
||||
|
||||
fdtab[fd].ev &= ~FD_POLL_IN;
|
||||
return retval;
|
||||
|
||||
@ -286,7 +292,7 @@ int stream_sock_read(int fd) {
|
||||
* It returns -1 in case of unrecoverable error, 0 if the caller needs to poll
|
||||
* before calling it again, otherwise 1.
|
||||
*/
|
||||
int stream_sock_write_loop(struct stream_interface *si, struct buffer *b)
|
||||
static int stream_sock_write_loop(struct stream_interface *si, struct buffer *b)
|
||||
{
|
||||
int write_poll = MAX_WRITE_POLL_LOOPS;
|
||||
int retval = 1;
|
||||
@ -688,6 +694,7 @@ void stream_sock_chk_rcv(struct stream_interface *si)
|
||||
void stream_sock_chk_snd(struct stream_interface *si)
|
||||
{
|
||||
struct buffer *ob = si->ob;
|
||||
int retval;
|
||||
|
||||
DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n",
|
||||
now_ms, __FUNCTION__,
|
||||
@ -700,19 +707,55 @@ void stream_sock_chk_snd(struct stream_interface *si)
|
||||
if (unlikely(si->state != SI_ST_EST || (ob->flags & BF_SHUTW)))
|
||||
return;
|
||||
|
||||
if ((ob->send_max == 0 && ob->splice_len == 0) ||
|
||||
(ob->flags & BF_EMPTY) ||
|
||||
(ob->flags & (BF_HIJACK|BF_WRITE_ENA)) == 0) {
|
||||
/* stop writing */
|
||||
if (!(si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */
|
||||
(fdtab[si->fd].ev & FD_POLL_OUT) || /* we'll be called anyway */
|
||||
!(ob->send_max || ob->splice_len) || /* called with nothing to send ! */
|
||||
!(ob->flags & (BF_HIJACK|BF_WRITE_ENA))) /* we may not write */
|
||||
return;
|
||||
|
||||
retval = stream_sock_write_loop(si, ob);
|
||||
if (retval < 0) {
|
||||
/* Write error on the file descriptor. We mark the FD as STERROR so
|
||||
* that we don't use it anymore and we notify the task.
|
||||
*/
|
||||
fdtab[si->fd].state = FD_STERROR;
|
||||
fdtab[si->fd].ev &= ~FD_POLL_STICKY;
|
||||
si->flags |= SI_FL_ERR;
|
||||
goto out_wakeup;
|
||||
}
|
||||
|
||||
if (retval > 0 || (ob->send_max == 0 && ob->splice_len == 0)) {
|
||||
/* the connection is established but we can't write. Either the
|
||||
* buffer is empty, or we just refrain from sending because the
|
||||
* send_max limit was reached. Maybe we just wrote the last
|
||||
* chunk and need to close.
|
||||
*/
|
||||
if (((ob->flags & (BF_SHUTW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) ==
|
||||
(BF_EMPTY|BF_WRITE_ENA|BF_SHUTR)) &&
|
||||
(si->state == SI_ST_EST)) {
|
||||
stream_sock_shutw(si);
|
||||
goto out_wakeup;
|
||||
}
|
||||
|
||||
if ((ob->flags & (BF_EMPTY|BF_HIJACK|BF_WRITE_ENA)) == (BF_EMPTY|BF_WRITE_ENA))
|
||||
si->flags |= SI_FL_WAIT_DATA;
|
||||
EV_FD_COND_C(si->fd, DIR_WR);
|
||||
ob->wex = TICK_ETERNITY;
|
||||
}
|
||||
else {
|
||||
/* (re)start writing. */
|
||||
si->flags &= ~SI_FL_WAIT_DATA;
|
||||
EV_FD_COND_S(si->fd, DIR_WR);
|
||||
}
|
||||
|
||||
/* in case of special condition (error, shutdown, end of write...), we
|
||||
* have to notify the task.
|
||||
*/
|
||||
if (likely((ob->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) ||
|
||||
(!ob->to_forward && !ob->send_max && !ob->splice_len) ||
|
||||
si->state != SI_ST_EST)) {
|
||||
out_wakeup:
|
||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user