[MEDIUM] enable inter-stream_interface wakeup calls

By letting the producer tell the consumer there is data to check,
and the consumer tell the producer there is some space left again,
we can cut in half the number of session wakeups.

This is also an important starting point for future splicing support.
This commit is contained in:
Willy Tarreau 2008-12-14 14:42:35 +01:00
parent b0ef735c71
commit 3ffeba1f67
5 changed files with 94 additions and 4 deletions

View File

@ -3,7 +3,7 @@
This file contains client-side definitions.
Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation, version 2.1
@ -36,6 +36,8 @@ int stream_sock_write(int fd);
void stream_sock_data_finish(struct stream_interface *si);
void stream_sock_shutr(struct stream_interface *si);
void stream_sock_shutw(struct stream_interface *si);
void stream_sock_chk_rcv(struct stream_interface *si);
void stream_sock_chk_snd(struct stream_interface *si);
/* This either returns the sockname or the original destination address. Code

View File

@ -78,6 +78,8 @@ struct stream_interface {
unsigned int exp; /* wake up time for connect, queue, turn-around, ... */
void (*shutr)(struct stream_interface *); /* shutr function */
void (*shutw)(struct stream_interface *); /* shutw function */
void (*chk_rcv)(struct stream_interface *);/* chk_rcv function */
void (*chk_snd)(struct stream_interface *);/* chk_snd function */
struct buffer *ib, *ob; /* input and output buffers */
unsigned int err_type; /* first error detected, one of SI_ET_* */
void *err_loc; /* commonly the server, NULL when SI_ET_NONE */

View File

@ -182,6 +182,8 @@ int event_accept(int fd) {
s->si[0].owner = t;
s->si[0].shutr = stream_sock_shutr;
s->si[0].shutw = stream_sock_shutw;
s->si[0].chk_rcv = stream_sock_chk_rcv;
s->si[0].chk_snd = stream_sock_chk_snd;
s->si[0].fd = cfd;
s->si[0].flags = SI_FL_NONE;
s->si[0].exp = TICK_ETERNITY;
@ -192,6 +194,8 @@ int event_accept(int fd) {
s->si[1].owner = t;
s->si[1].shutr = stream_sock_shutr;
s->si[1].shutw = stream_sock_shutw;
s->si[1].chk_rcv = stream_sock_chk_rcv;
s->si[1].chk_snd = stream_sock_chk_snd;
s->si[1].exp = TICK_ETERNITY;
s->si[1].fd = -1; /* just to help with debugging */
s->si[1].flags = SI_FL_NONE;

View File

@ -452,6 +452,8 @@ int uxst_event_accept(int fd) {
s->si[0].owner = t;
s->si[0].shutr = stream_sock_shutr;
s->si[0].shutw = stream_sock_shutw;
s->si[0].chk_rcv = stream_sock_chk_rcv;
s->si[0].chk_snd = stream_sock_chk_snd;
s->si[0].fd = cfd;
s->si[0].flags = SI_FL_NONE;
s->si[0].exp = TICK_ETERNITY;
@ -462,6 +464,8 @@ int uxst_event_accept(int fd) {
s->si[1].owner = t;
s->si[1].shutr = stream_sock_shutr;
s->si[1].shutw = stream_sock_shutw;
s->si[1].chk_rcv = stream_sock_chk_rcv;
s->si[1].chk_snd = stream_sock_chk_snd;
s->si[1].exp = TICK_ETERNITY;
s->si[1].fd = -1; /* just to help with debugging */
s->si[1].flags = SI_FL_NONE;

View File

@ -244,12 +244,16 @@ int stream_sock_read(int fd) {
* have at least read something.
*/
if (tick_isset(b->rex) && b->flags & BF_READ_PARTIAL)
if ((b->flags & (BF_READ_PARTIAL|BF_FULL|BF_READ_NOEXP)) == BF_READ_PARTIAL)
b->rex = tick_add_ifset(now_ms, b->rto);
if (!(b->flags & BF_READ_ACTIVITY))
goto out_skip_wakeup;
out_wakeup:
/* the consumer might be waiting for data */
if (b->cons->flags & SI_FL_WAIT_DATA && (b->flags & BF_READ_PARTIAL))
b->cons->chk_snd(b->cons);
task_wakeup(si->owner, TASK_WOKEN_IO);
out_skip_wakeup:
@ -433,7 +437,7 @@ int stream_sock_write(int fd) {
* written something.
*/
if (tick_isset(b->wex) && b->flags & BF_WRITE_PARTIAL) {
if ((b->flags & (BF_WRITE_PARTIAL|BF_EMPTY|BF_SHUTW)) == BF_WRITE_PARTIAL) {
b->wex = tick_add_ifset(now_ms, b->wto);
if (tick_isset(b->wex) & tick_isset(si->ib->rex)) {
/* FIXME: to prevent the client from expiring read timeouts during writes,
@ -448,6 +452,10 @@ int stream_sock_write(int fd) {
if (!(b->flags & BF_WRITE_ACTIVITY))
goto out_skip_wakeup;
out_wakeup:
/* the producer might be waiting for more room to store data */
if ((b->prod->flags & SI_FL_WAIT_ROOM) && (b->flags & BF_WRITE_PARTIAL))
b->prod->chk_rcv(b->prod);
task_wakeup(si->owner, TASK_WOKEN_IO);
out_skip_wakeup:
@ -579,7 +587,8 @@ void stream_sock_data_finish(struct stream_interface *si)
/* Check if we need to close the write side */
if (!(ob->flags & BF_SHUTW)) {
/* Write not closed, update FD status and timeout for writes */
if ((ob->flags & BF_EMPTY) ||
if ((ob->send_max == 0) ||
(ob->flags & BF_EMPTY) ||
(ob->flags & (BF_HIJACK|BF_WRITE_ENA)) == 0) {
/* stop writing */
if ((ob->flags & (BF_EMPTY|BF_HIJACK|BF_WRITE_ENA)) == (BF_EMPTY|BF_WRITE_ENA))
@ -609,6 +618,75 @@ void stream_sock_data_finish(struct stream_interface *si)
}
}
/* This function is used for inter-stream-interface calls. It is called by the
* consumer to inform the producer side that it may be interested in checking
* for free space in the buffer. Note that it intentionally does not update
* timeouts, so that we can still check them later at wake-up.
*/
void stream_sock_chk_rcv(struct stream_interface *si)
{
struct buffer *ib = si->ib;
DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n",
now_ms, __FUNCTION__,
fd, fdtab[fd].owner,
ib, ob,
ib->rex, ob->wex,
ib->flags, ob->flags,
ib->l, ob->l, si->state);
if (unlikely(si->state != SI_ST_EST || (ib->flags & BF_SHUTR)))
return;
if (ib->flags & (BF_FULL|BF_HIJACK)) {
/* stop reading */
if ((ib->flags & (BF_FULL|BF_HIJACK)) == BF_FULL)
si->flags |= SI_FL_WAIT_ROOM;
EV_FD_COND_C(si->fd, DIR_RD);
}
else {
/* (re)start reading */
si->flags &= ~SI_FL_WAIT_ROOM;
EV_FD_COND_S(si->fd, DIR_RD);
}
}
/* This function is used for inter-stream-interface calls. It is called by the
* producer to inform the consumer side that it may be interested in checking
* for data in the buffer. Note that it intentionally does not update timeouts,
* so that we can still check them later at wake-up.
*/
void stream_sock_chk_snd(struct stream_interface *si)
{
struct buffer *ob = si->ob;
DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n",
now_ms, __FUNCTION__,
fd, fdtab[fd].owner,
ib, ob,
ib->rex, ob->wex,
ib->flags, ob->flags,
ib->l, ob->l, si->state);
if (unlikely(si->state != SI_ST_EST || (ob->flags & BF_SHUTW)))
return;
if ((ob->send_max == 0) ||
(ob->flags & BF_EMPTY) ||
(ob->flags & (BF_HIJACK|BF_WRITE_ENA)) == 0) {
/* stop writing */
if ((ob->flags & (BF_EMPTY|BF_HIJACK|BF_WRITE_ENA)) == (BF_EMPTY|BF_WRITE_ENA))
si->flags |= SI_FL_WAIT_DATA;
EV_FD_COND_C(si->fd, DIR_WR);
}
else {
/* (re)start writing. */
si->flags &= ~SI_FL_WAIT_DATA;
EV_FD_COND_S(si->fd, DIR_WR);
}
}
/*
* Local variables: