From 8a7af6031240c162a09095c7c7e7e99c1e6a9549 Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Sat, 3 May 2008 23:07:14 +0200 Subject: [PATCH] [MEDIUM] detect streaming buffers and tag them as such Add the ability to detect streaming buffers, and set a flag indicating it. It will later serve us in order to dynamically resize them, and to prioritize file descriptors during polls. --- include/types/buffers.h | 4 ++++ src/stream_sock.c | 46 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/include/types/buffers.h b/include/types/buffers.h index a79641d531..583c517d13 100644 --- a/include/types/buffers.h +++ b/include/types/buffers.h @@ -53,6 +53,8 @@ #define BF_WRITE_STATUS (BF_PARTIAL_WRITE|BF_COMPLETE_WRITE|BF_WRITE_ERROR|BF_WRITE_NULL) #define BF_CLEAR_WRITE (~BF_WRITE_STATUS) +#define BF_STREAMER 4096 +#define BF_STREAMER_FAST 8192 /* describes a chunk of string */ @@ -72,6 +74,8 @@ struct buffer { unsigned int l; /* data length */ char *r, *w, *lr; /* read ptr, write ptr, last read */ char *rlim; /* read limit, used for header rewriting */ + unsigned char xfer_large; /* number of consecutive large xfers */ + unsigned char xfer_small; /* number of consecutive small xfers */ unsigned long long total; /* total data read */ char data[BUFSIZE]; }; diff --git a/src/stream_sock.c b/src/stream_sock.c index 08cc65b650..fab32ca298 100644 --- a/src/stream_sock.c +++ b/src/stream_sock.c @@ -44,7 +44,7 @@ int stream_sock_read(int fd) { __label__ out_eternity, out_wakeup, out_shutdown_r, out_error; struct buffer *b = fdtab[fd].cb[DIR_RD].b; - int ret, max, retval; + int ret, max, retval, cur_read; int read_poll = MAX_READ_POLL_LOOPS; #ifdef DEBUG_FULL @@ -61,6 +61,7 @@ int stream_sock_read(int fd) { if ((fdtab[fd].ev & (FD_POLL_IN|FD_POLL_HUP)) == FD_POLL_HUP) goto out_shutdown_r; + cur_read = 0; while (1) { /* * 1. compute the maximum block size we can read at once. @@ -110,6 +111,7 @@ int stream_sock_read(int fd) { if (ret > 0) { b->r += ret; b->l += ret; + cur_read += ret; b->flags |= BF_PARTIAL_READ; if (b->r == b->data + BUFSIZE) { @@ -122,6 +124,35 @@ int stream_sock_read(int fd) { /* The buffer is now full, there's no point in going through * the loop again. */ + if (!(b->flags & BF_STREAMER_FAST) && (cur_read == b->l)) { + b->xfer_small = 0; + b->xfer_large++; + if (b->xfer_large >= 3) { + /* we call this buffer a fast streamer if it manages + * to be filled in one call 3 consecutive times. + */ + b->flags |= (BF_STREAMER | BF_STREAMER_FAST); + //fputc('+', stderr); + } + } + else if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) && + (cur_read <= BUFSIZE / 2)) { + b->xfer_large = 0; + b->xfer_small++; + if (b->xfer_small >= 2) { + /* if the buffer has been at least half full twice, + * we receive faster than we send, so at least it + * is not a "fast streamer". + */ + b->flags &= ~BF_STREAMER_FAST; + //fputc('-', stderr); + } + } + else { + b->xfer_small = 0; + b->xfer_large = 0; + } + EV_FD_CLR(fd, DIR_RD); goto out_eternity; } @@ -133,6 +164,19 @@ int stream_sock_read(int fd) { * is closed. */ if (ret < max) { + if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) && + (cur_read <= BUFSIZE / 2)) { + b->xfer_large = 0; + b->xfer_small++; + if (b->xfer_small >= 3) { + /* we have read less than half of the buffer in + * one pass, and this happened at least 3 times. + * This is definitely not a streamer. + */ + b->flags &= ~(BF_STREAMER | BF_STREAMER_FAST); + //fputc('!', stderr); + } + } if (fdtab[fd].ev & FD_POLL_HUP) goto out_shutdown_r; break;