mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-04-20 05:56:54 +00:00
[MAJOR] implement autonomous inter-socket forwarding
If an analyser sets buf->to_forward to a given value, that many data will be forwarded between the two stream interfaces attached to a buffer without waking the task up. The same applies once all analysers have been released. This saves a large amount of calls to process_session() and a number of task_dequeue/queue.
This commit is contained in:
parent
3ffeba1f67
commit
6b66f3e4f6
@ -2,7 +2,7 @@
|
|||||||
include/common/defaults.h
|
include/common/defaults.h
|
||||||
Miscellaneous default values.
|
Miscellaneous default values.
|
||||||
|
|
||||||
Copyright (C) 2000-2007 Willy Tarreau - w@1wt.eu
|
Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu
|
||||||
|
|
||||||
This library is free software; you can redistribute it and/or
|
This library is free software; you can redistribute it and/or
|
||||||
modify it under the terms of the GNU Lesser General Public
|
modify it under the terms of the GNU Lesser General Public
|
||||||
@ -40,6 +40,16 @@
|
|||||||
#define MAXREWRITE (BUFSIZE / 2)
|
#define MAXREWRITE (BUFSIZE / 2)
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
/* FORWARD_DEFAULT_SIZE
|
||||||
|
* Indicates how many bytes may be forwarded at once in low-level stream-socks
|
||||||
|
* without waking the owner task up. This should be much larger than the buffer
|
||||||
|
* size. A few megabytes seem appropriate.
|
||||||
|
*/
|
||||||
|
#ifndef FORWARD_DEFAULT_SIZE
|
||||||
|
#define FORWARD_DEFAULT_SIZE (16*1024*1024)
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
#define REQURI_LEN 1024
|
#define REQURI_LEN 1024
|
||||||
#define CAPTURE_LEN 64
|
#define CAPTURE_LEN 64
|
||||||
|
|
||||||
|
@ -46,6 +46,7 @@ int init_buffer();
|
|||||||
static inline void buffer_init(struct buffer *buf)
|
static inline void buffer_init(struct buffer *buf)
|
||||||
{
|
{
|
||||||
buf->send_max = 0;
|
buf->send_max = 0;
|
||||||
|
buf->to_forward = 0;
|
||||||
buf->l = buf->total = 0;
|
buf->l = buf->total = 0;
|
||||||
buf->analysers = 0;
|
buf->analysers = 0;
|
||||||
buf->cons = NULL;
|
buf->cons = NULL;
|
||||||
@ -92,6 +93,7 @@ static inline void buffer_check_timeouts(struct buffer *b)
|
|||||||
static inline void buffer_flush(struct buffer *buf)
|
static inline void buffer_flush(struct buffer *buf)
|
||||||
{
|
{
|
||||||
buf->send_max = 0;
|
buf->send_max = 0;
|
||||||
|
buf->to_forward = 0;
|
||||||
buf->r = buf->lr = buf->w = buf->data;
|
buf->r = buf->lr = buf->w = buf->data;
|
||||||
buf->l = 0;
|
buf->l = 0;
|
||||||
buf->flags |= BF_EMPTY | BF_FULL;
|
buf->flags |= BF_EMPTY | BF_FULL;
|
||||||
|
@ -130,6 +130,7 @@ struct buffer {
|
|||||||
char *r, *w, *lr; /* read ptr, write ptr, last read */
|
char *r, *w, *lr; /* read ptr, write ptr, last read */
|
||||||
char *rlim; /* read limit, used for header rewriting */
|
char *rlim; /* read limit, used for header rewriting */
|
||||||
unsigned int send_max; /* number of bytes the sender can consume */
|
unsigned int send_max; /* number of bytes the sender can consume */
|
||||||
|
unsigned int to_forward; /* number of bytes that can send without a wake-up, >= send_max */
|
||||||
unsigned int analysers; /* bit field indicating what to do on the buffer */
|
unsigned int analysers; /* bit field indicating what to do on the buffer */
|
||||||
int analyse_exp; /* expiration date for current analysers (if set) */
|
int analyse_exp; /* expiration date for current analysers (if set) */
|
||||||
void (*hijacker)(struct session *, struct buffer *); /* alternative content producer */
|
void (*hijacker)(struct session *, struct buffer *); /* alternative content producer */
|
||||||
|
@ -809,6 +809,17 @@ void uxst_process_session(struct task *t, int *next)
|
|||||||
if (!s->req->analysers && !(s->req->flags & BF_HIJACK))
|
if (!s->req->analysers && !(s->req->flags & BF_HIJACK))
|
||||||
s->req->send_max = s->req->l;
|
s->req->send_max = s->req->l;
|
||||||
|
|
||||||
|
/* if noone is interested in analysing data, let's forward everything
|
||||||
|
* and only wake up every 1-2 MB. We still wake up when send_max is
|
||||||
|
* reached though.
|
||||||
|
*/
|
||||||
|
if (!s->req->send_max && s->req->prod->state >= SI_ST_EST &&
|
||||||
|
!s->req->analysers && !(s->req->flags & BF_HIJACK)) {
|
||||||
|
if (s->req->to_forward < FORWARD_DEFAULT_SIZE)
|
||||||
|
s->req->to_forward += FORWARD_DEFAULT_SIZE;
|
||||||
|
s->req->send_max = s->req->l;
|
||||||
|
}
|
||||||
|
|
||||||
/* reflect what the L7 analysers have seen last */
|
/* reflect what the L7 analysers have seen last */
|
||||||
rqf_last = s->req->flags;
|
rqf_last = s->req->flags;
|
||||||
|
|
||||||
@ -879,9 +890,17 @@ void uxst_process_session(struct task *t, int *next)
|
|||||||
resync = 1;
|
resync = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* if noone is interested in analysing data, let's forward everything */
|
/* if noone is interested in analysing data, let's forward everything
|
||||||
if (!s->rep->analysers && !(s->rep->flags & BF_HIJACK))
|
* and only wake up every 1-2 MB. We still wake up when send_max is
|
||||||
|
* reached though.
|
||||||
|
*/
|
||||||
|
if (!s->rep->send_max && s->rep->prod->state >= SI_ST_EST &&
|
||||||
|
!s->rep->analysers && !(s->rep->flags & BF_HIJACK)) {
|
||||||
|
if (s->rep->to_forward < FORWARD_DEFAULT_SIZE) {
|
||||||
|
s->rep->to_forward += FORWARD_DEFAULT_SIZE;
|
||||||
|
}
|
||||||
s->rep->send_max = s->rep->l;
|
s->rep->send_max = s->rep->l;
|
||||||
|
}
|
||||||
|
|
||||||
/* reflect what the L7 analysers have seen last */
|
/* reflect what the L7 analysers have seen last */
|
||||||
rpf_last = s->rep->flags;
|
rpf_last = s->rep->flags;
|
||||||
|
@ -746,9 +746,16 @@ resync_stream_interface:
|
|||||||
resync = 1;
|
resync = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* if noone is interested in analysing data, let's forward everything */
|
/* if noone is interested in analysing data, let's forward everything
|
||||||
if (!s->req->analysers && !(s->req->flags & BF_HIJACK))
|
* and only wake up every 1-2 MB. We still wake up when send_max is
|
||||||
|
* reached though.
|
||||||
|
*/
|
||||||
|
if (!s->req->send_max && s->req->prod->state >= SI_ST_EST &&
|
||||||
|
!s->req->analysers && !(s->req->flags & BF_HIJACK)) {
|
||||||
|
if (s->req->to_forward < FORWARD_DEFAULT_SIZE)
|
||||||
|
s->req->to_forward += FORWARD_DEFAULT_SIZE;
|
||||||
s->req->send_max = s->req->l;
|
s->req->send_max = s->req->l;
|
||||||
|
}
|
||||||
|
|
||||||
/* reflect what the L7 analysers have seen last */
|
/* reflect what the L7 analysers have seen last */
|
||||||
rqf_last = s->req->flags;
|
rqf_last = s->req->flags;
|
||||||
@ -855,9 +862,17 @@ resync_stream_interface:
|
|||||||
resync = 1;
|
resync = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* if noone is interested in analysing data, let's forward everything */
|
/* if noone is interested in analysing data, let's forward everything
|
||||||
if (!s->rep->analysers && !(s->rep->flags & BF_HIJACK))
|
* and only wake up every 1-2 MB. We still wake up when send_max is
|
||||||
|
* reached though.
|
||||||
|
*/
|
||||||
|
if (!s->rep->send_max && s->rep->prod->state >= SI_ST_EST &&
|
||||||
|
!s->rep->analysers && !(s->rep->flags & BF_HIJACK)) {
|
||||||
|
if (s->rep->to_forward < FORWARD_DEFAULT_SIZE) {
|
||||||
|
s->rep->to_forward += FORWARD_DEFAULT_SIZE;
|
||||||
|
}
|
||||||
s->rep->send_max = s->rep->l;
|
s->rep->send_max = s->rep->l;
|
||||||
|
}
|
||||||
|
|
||||||
/* reflect what the L7 analysers have seen last */
|
/* reflect what the L7 analysers have seen last */
|
||||||
rpf_last = s->rep->flags;
|
rpf_last = s->rep->flags;
|
||||||
@ -870,7 +885,7 @@ resync_stream_interface:
|
|||||||
* FIXME: this is probably where we should produce error responses.
|
* FIXME: this is probably where we should produce error responses.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/* first, let's check if the request buffer needs to shutdown(write) */
|
/* first, let's check if the response buffer needs to shutdown(write) */
|
||||||
if (unlikely((s->rep->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) ==
|
if (unlikely((s->rep->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) ==
|
||||||
(BF_EMPTY|BF_WRITE_ENA|BF_SHUTR)))
|
(BF_EMPTY|BF_WRITE_ENA|BF_SHUTR)))
|
||||||
buffer_shutw_now(s->rep);
|
buffer_shutw_now(s->rep);
|
||||||
|
@ -116,8 +116,8 @@ int stream_sock_read(int fd) {
|
|||||||
cur_read += ret;
|
cur_read += ret;
|
||||||
|
|
||||||
/* if noone is interested in analysing data, let's forward everything */
|
/* if noone is interested in analysing data, let's forward everything */
|
||||||
if (!b->analysers)
|
if (b->to_forward > b->send_max)
|
||||||
b->send_max += ret;
|
b->send_max = MIN(b->to_forward, b->l);
|
||||||
|
|
||||||
if (fdtab[fd].state == FD_STCONN)
|
if (fdtab[fd].state == FD_STCONN)
|
||||||
fdtab[fd].state = FD_STREADY;
|
fdtab[fd].state = FD_STREADY;
|
||||||
@ -251,9 +251,16 @@ int stream_sock_read(int fd) {
|
|||||||
goto out_skip_wakeup;
|
goto out_skip_wakeup;
|
||||||
out_wakeup:
|
out_wakeup:
|
||||||
/* the consumer might be waiting for data */
|
/* the consumer might be waiting for data */
|
||||||
if (b->cons->flags & SI_FL_WAIT_DATA && (b->flags & BF_READ_PARTIAL))
|
if (b->cons->flags & SI_FL_WAIT_DATA && (b->flags & BF_READ_PARTIAL) && !(b->flags & BF_EMPTY))
|
||||||
b->cons->chk_snd(b->cons);
|
b->cons->chk_snd(b->cons);
|
||||||
|
|
||||||
|
/* we have to wake up if there is a special event or if we don't have
|
||||||
|
* any more data to forward.
|
||||||
|
*/
|
||||||
|
if ((b->flags & (BF_READ_NULL|BF_READ_ERROR|BF_SHUTR)) ||
|
||||||
|
!b->to_forward ||
|
||||||
|
si->state != SI_ST_EST ||
|
||||||
|
b->cons->state != SI_ST_EST)
|
||||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||||
|
|
||||||
out_skip_wakeup:
|
out_skip_wakeup:
|
||||||
@ -379,6 +386,13 @@ int stream_sock_write(int fd) {
|
|||||||
b->l -= ret;
|
b->l -= ret;
|
||||||
b->w += ret;
|
b->w += ret;
|
||||||
b->send_max -= ret;
|
b->send_max -= ret;
|
||||||
|
/* we can send up to send_max, we just want to know when
|
||||||
|
* to_forward has been reached.
|
||||||
|
*/
|
||||||
|
if ((signed)(b->to_forward - ret) >= 0)
|
||||||
|
b->to_forward -= ret;
|
||||||
|
else
|
||||||
|
b->to_forward = 0;
|
||||||
|
|
||||||
if (fdtab[fd].state == FD_STCONN)
|
if (fdtab[fd].state == FD_STCONN)
|
||||||
fdtab[fd].state = FD_STREADY;
|
fdtab[fd].state = FD_STREADY;
|
||||||
@ -453,9 +467,16 @@ int stream_sock_write(int fd) {
|
|||||||
goto out_skip_wakeup;
|
goto out_skip_wakeup;
|
||||||
out_wakeup:
|
out_wakeup:
|
||||||
/* the producer might be waiting for more room to store data */
|
/* the producer might be waiting for more room to store data */
|
||||||
if ((b->prod->flags & SI_FL_WAIT_ROOM) && (b->flags & BF_WRITE_PARTIAL))
|
if ((b->prod->flags & SI_FL_WAIT_ROOM) && (b->flags & BF_WRITE_PARTIAL) && !(b->flags & BF_FULL))
|
||||||
b->prod->chk_rcv(b->prod);
|
b->prod->chk_rcv(b->prod);
|
||||||
|
|
||||||
|
/* we have to wake up if there is a special event or if we don't have
|
||||||
|
* any more data to forward.
|
||||||
|
*/
|
||||||
|
if ((b->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) ||
|
||||||
|
!b->to_forward ||
|
||||||
|
si->state != SI_ST_EST ||
|
||||||
|
b->prod->state != SI_ST_EST)
|
||||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||||
|
|
||||||
out_skip_wakeup:
|
out_skip_wakeup:
|
||||||
|
Loading…
Reference in New Issue
Block a user