mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2024-12-12 14:35:14 +00:00
MAJOR: polling: centralize calls to I/O callbacks
In order for HTTP/2 not to eat too much memory, we'll have to support on-the-fly buffer allocation, since most streams will have an empty request buffer at some point. Supporting allocation on the fly means being able to sleep inside I/O callbacks if a buffer is not available. Till now, the I/O callbacks were called from two locations : - when processing the cached events - when processing the polled events from the poller This change cleans up the design a bit further than what was started in 1.5. It now ensures that we never call any iocb from the poller itself and that instead, events learned by the poller are put into the cache. The benefit is important in terms of stability : we don't have to care anymore about the risk that new events are added into the poller while processing its events, and we're certain that updates are processed at a single location. To achieve this, we now modify all the fd_* functions so that instead of creating updates, they add/remove the fd to/from the cache depending on its state, and only create an update when the polling status reaches a state where it will have to change. Since the pollers make use of these functions to notify readiness (using fd_may_recv/fd_may_send), the cache is always up to date with the poller. Creating updates only when the polling status needs to change saves a significant amount of work for the pollers : a benchmark showed that on a typical TCP proxy test, the amount of updates per connection dropped from 11 to 1 on average. This also means that the update list is smaller and has more chances of not thrashing too many CPU cache lines. The first observed benefit is a net 2% performance gain on the connection rate. A second benefit is that when a connection is accepted, it's only when we're processing the cache, and the recv event is automatically added into the cache *after* the current one, resulting in this event to be processed immediately during the same loop. Previously we used to have a second run over the updates to detect if new events were added to catch them before waking up tasks. The next gain will be offered by the next steps on this subject consisting in implementing an I/O queue containing all cached events ordered by priority just like the run queue, and to be able to leave some events pending there as long as needed. That will allow us *not* to perform some FD processing if it's not the proper time for this (typically keep waiting for a buffer to be allocated if none is available for an recv()). And by only processing a small bunch of them, we'll allow priorities to take place even at the I/O level. As a result of this change, functions fd_alloc_or_release_cache_entry() and fd_process_polled_events() have disappeared, and the code dedicated to checking for new fd events after the callback during the poll() loop was removed as well. Despite the patch looking large, it's mostly a change of what function is falled upon fd_*() and almost nothing was added.
This commit is contained in:
parent
5506e3f8b6
commit
5be2f35231
@ -81,18 +81,10 @@ void run_poller();
|
||||
*/
|
||||
void fd_process_cached_events();
|
||||
|
||||
/* Check the events attached to a file descriptor, update its cache
|
||||
* accordingly, and call the associated I/O callback. If new updates are
|
||||
* detected, the function tries to process them as well in order to save
|
||||
* wakeups after accept().
|
||||
/* Mark fd <fd> as updated for polling and allocate an entry in the update list
|
||||
* for this if it was not already there. This can be done at any time.
|
||||
*/
|
||||
void fd_process_polled_events(int fd);
|
||||
|
||||
|
||||
/* Mark fd <fd> as updated and allocate an entry in the update list for this if
|
||||
* it was not already there. This can be done at any time.
|
||||
*/
|
||||
static inline void updt_fd(const int fd)
|
||||
static inline void updt_fd_polling(const int fd)
|
||||
{
|
||||
if (fdtab[fd].updated)
|
||||
/* already scheduled for update */
|
||||
@ -157,15 +149,22 @@ static inline int fd_compute_new_polled_status(int state)
|
||||
return state;
|
||||
}
|
||||
|
||||
/* Automatically allocates or releases a cache entry for fd <fd> depending on
|
||||
* its new state. This is meant to be used by pollers while processing updates.
|
||||
/* This function automatically enables/disables caching for an entry depending
|
||||
* on its state, and also possibly creates an update entry so that the poller
|
||||
* does its job as well. It is only called on state changes.
|
||||
*/
|
||||
static inline void fd_alloc_or_release_cache_entry(int fd, int new_state)
|
||||
static inline void fd_update_cache(int fd)
|
||||
{
|
||||
/* READY and ACTIVE states (the two with both flags set) require a cache entry */
|
||||
/* 3 states for each direction require a polling update */
|
||||
if ((fdtab[fd].state & (FD_EV_POLLED_R | FD_EV_ACTIVE_R)) == FD_EV_POLLED_R ||
|
||||
(fdtab[fd].state & (FD_EV_POLLED_R | FD_EV_READY_R | FD_EV_ACTIVE_R)) == FD_EV_ACTIVE_R ||
|
||||
(fdtab[fd].state & (FD_EV_POLLED_W | FD_EV_ACTIVE_W)) == FD_EV_POLLED_W ||
|
||||
(fdtab[fd].state & (FD_EV_POLLED_W | FD_EV_READY_W | FD_EV_ACTIVE_W)) == FD_EV_ACTIVE_W)
|
||||
updt_fd_polling(fd);
|
||||
|
||||
if (((new_state & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R)) ||
|
||||
((new_state & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W))) {
|
||||
/* only READY and ACTIVE states (the two with both flags set) require a cache entry */
|
||||
if (((fdtab[fd].state & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R)) ||
|
||||
((fdtab[fd].state & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W))) {
|
||||
fd_alloc_cache_entry(fd);
|
||||
}
|
||||
else {
|
||||
@ -243,7 +242,7 @@ static inline void fd_stop_recv(int fd)
|
||||
if (!((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_R))
|
||||
return; /* already disabled */
|
||||
fdtab[fd].state &= ~FD_EV_ACTIVE_R;
|
||||
updt_fd(fd); /* need an update entry to change the state */
|
||||
fd_update_cache(fd); /* need an update entry to change the state */
|
||||
}
|
||||
|
||||
/* Disable processing send events on fd <fd> */
|
||||
@ -252,7 +251,7 @@ static inline void fd_stop_send(int fd)
|
||||
if (!((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_W))
|
||||
return; /* already disabled */
|
||||
fdtab[fd].state &= ~FD_EV_ACTIVE_W;
|
||||
updt_fd(fd); /* need an update entry to change the state */
|
||||
fd_update_cache(fd); /* need an update entry to change the state */
|
||||
}
|
||||
|
||||
/* Disable processing of events on fd <fd> for both directions. */
|
||||
@ -261,7 +260,7 @@ static inline void fd_stop_both(int fd)
|
||||
if (!((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_RW))
|
||||
return; /* already disabled */
|
||||
fdtab[fd].state &= ~FD_EV_ACTIVE_RW;
|
||||
updt_fd(fd); /* need an update entry to change the state */
|
||||
fd_update_cache(fd); /* need an update entry to change the state */
|
||||
}
|
||||
|
||||
/* Report that FD <fd> cannot receive anymore without polling (EAGAIN detected). */
|
||||
@ -270,7 +269,7 @@ static inline void fd_cant_recv(const int fd)
|
||||
if (!(((unsigned int)fdtab[fd].state) & FD_EV_READY_R))
|
||||
return; /* already marked as blocked */
|
||||
fdtab[fd].state &= ~FD_EV_READY_R;
|
||||
updt_fd(fd);
|
||||
fd_update_cache(fd);
|
||||
}
|
||||
|
||||
/* Report that FD <fd> can receive anymore without polling. */
|
||||
@ -279,7 +278,7 @@ static inline void fd_may_recv(const int fd)
|
||||
if (((unsigned int)fdtab[fd].state) & FD_EV_READY_R)
|
||||
return; /* already marked as blocked */
|
||||
fdtab[fd].state |= FD_EV_READY_R;
|
||||
updt_fd(fd);
|
||||
fd_update_cache(fd);
|
||||
}
|
||||
|
||||
/* Disable readiness when polled. This is useful to interrupt reading when it
|
||||
@ -299,7 +298,7 @@ static inline void fd_cant_send(const int fd)
|
||||
if (!(((unsigned int)fdtab[fd].state) & FD_EV_READY_W))
|
||||
return; /* already marked as blocked */
|
||||
fdtab[fd].state &= ~FD_EV_READY_W;
|
||||
updt_fd(fd);
|
||||
fd_update_cache(fd);
|
||||
}
|
||||
|
||||
/* Report that FD <fd> can send anymore without polling (EAGAIN detected). */
|
||||
@ -308,7 +307,7 @@ static inline void fd_may_send(const int fd)
|
||||
if (((unsigned int)fdtab[fd].state) & FD_EV_READY_W)
|
||||
return; /* already marked as blocked */
|
||||
fdtab[fd].state |= FD_EV_READY_W;
|
||||
updt_fd(fd);
|
||||
fd_update_cache(fd);
|
||||
}
|
||||
|
||||
/* Prepare FD <fd> to try to receive */
|
||||
@ -317,7 +316,7 @@ static inline void fd_want_recv(int fd)
|
||||
if (((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_R))
|
||||
return; /* already enabled */
|
||||
fdtab[fd].state |= FD_EV_ACTIVE_R;
|
||||
updt_fd(fd); /* need an update entry to change the state */
|
||||
fd_update_cache(fd); /* need an update entry to change the state */
|
||||
}
|
||||
|
||||
/* Prepare FD <fd> to try to send */
|
||||
@ -326,7 +325,7 @@ static inline void fd_want_send(int fd)
|
||||
if (((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_W))
|
||||
return; /* already enabled */
|
||||
fdtab[fd].state |= FD_EV_ACTIVE_W;
|
||||
updt_fd(fd); /* need an update entry to change the state */
|
||||
fd_update_cache(fd); /* need an update entry to change the state */
|
||||
}
|
||||
|
||||
/* Prepares <fd> for being polled */
|
||||
|
@ -69,7 +69,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
|
||||
int updt_idx;
|
||||
int wait_time;
|
||||
|
||||
/* first, scan the update list to find changes */
|
||||
/* first, scan the update list to find polling changes */
|
||||
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
|
||||
fd = fd_updt[updt_idx];
|
||||
fdtab[fd].updated = 0;
|
||||
@ -109,8 +109,6 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
|
||||
ev.data.fd = fd;
|
||||
epoll_ctl(epoll_fd, opcode, fd, &ev);
|
||||
}
|
||||
|
||||
fd_alloc_or_release_cache_entry(fd, en);
|
||||
}
|
||||
fd_nbupdt = 0;
|
||||
|
||||
@ -175,7 +173,11 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
|
||||
n |= FD_POLL_HUP;
|
||||
|
||||
fdtab[fd].ev |= n;
|
||||
fd_process_polled_events(fd);
|
||||
if (n & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
|
||||
fd_may_recv(fd);
|
||||
|
||||
if (n & (FD_POLL_OUT | FD_POLL_ERR))
|
||||
fd_may_send(fd);
|
||||
}
|
||||
/* the caller will take care of cached events */
|
||||
}
|
||||
|
@ -84,8 +84,6 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fd_alloc_or_release_cache_entry(fd, en);
|
||||
}
|
||||
if (changes)
|
||||
kevent(kqueue_fd, kev, changes, NULL, 0, NULL);
|
||||
@ -138,7 +136,11 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
|
||||
fdtab[fd].ev |= FD_POLL_OUT;
|
||||
}
|
||||
|
||||
fd_process_polled_events(fd);
|
||||
if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
|
||||
fd_may_recv(fd);
|
||||
|
||||
if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
|
||||
fd_may_send(fd);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -93,8 +93,6 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
|
||||
else if ((en & ~eo) & FD_EV_POLLED_W)
|
||||
hap_fd_set(fd, fd_evts[DIR_WR]);
|
||||
}
|
||||
|
||||
fd_alloc_or_release_cache_entry(fd, en);
|
||||
}
|
||||
fd_nbupdt = 0;
|
||||
|
||||
@ -164,7 +162,11 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
|
||||
((e & POLLHUP) ? FD_POLL_HUP : 0);
|
||||
}
|
||||
|
||||
fd_process_polled_events(fd);
|
||||
if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
|
||||
fd_may_recv(fd);
|
||||
|
||||
if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
|
||||
fd_may_send(fd);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -76,8 +76,6 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
|
||||
else if ((en & ~eo) & FD_EV_POLLED_W)
|
||||
FD_SET(fd, fd_evts[DIR_WR]);
|
||||
}
|
||||
|
||||
fd_alloc_or_release_cache_entry(fd, en);
|
||||
}
|
||||
fd_nbupdt = 0;
|
||||
|
||||
@ -148,7 +146,11 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
|
||||
if (FD_ISSET(fd, tmp_evts[DIR_WR]))
|
||||
fdtab[fd].ev |= FD_POLL_OUT;
|
||||
|
||||
fd_process_polled_events(fd);
|
||||
if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
|
||||
fd_may_recv(fd);
|
||||
|
||||
if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
|
||||
fd_may_send(fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
81
src/fd.c
81
src/fd.c
@ -199,7 +199,9 @@ void fd_delete(int fd)
|
||||
}
|
||||
|
||||
/* Scan and process the cached events. This should be called right after
|
||||
* the poller.
|
||||
* the poller. The loop may cause new entries to be created, for example
|
||||
* if a listener causes an accept() to initiate a new incoming connection
|
||||
* wanting to attempt an recv().
|
||||
*/
|
||||
void fd_process_cached_events()
|
||||
{
|
||||
@ -209,12 +211,6 @@ void fd_process_cached_events()
|
||||
fd = fd_cache[entry];
|
||||
e = fdtab[fd].state;
|
||||
|
||||
/* Principle: events which are marked FD_EV_ACTIVE are processed
|
||||
* with their usual I/O callback. The callback may remove the
|
||||
* events from the cache or tag them for polling. Changes will be
|
||||
* applied on next round. Cache entries with no more activity are
|
||||
* automatically scheduled for removal.
|
||||
*/
|
||||
fdtab[fd].ev &= FD_POLL_STICKY;
|
||||
|
||||
if ((e & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R))
|
||||
@ -226,7 +222,7 @@ void fd_process_cached_events()
|
||||
if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev)
|
||||
fdtab[fd].iocb(fd);
|
||||
else
|
||||
updt_fd(fd);
|
||||
fd_release_cache_entry(fd);
|
||||
|
||||
/* If the fd was removed from the cache, it has been
|
||||
* replaced by the next one that we don't want to skip !
|
||||
@ -237,75 +233,6 @@ void fd_process_cached_events()
|
||||
}
|
||||
}
|
||||
|
||||
/* Check the events attached to a file descriptor, update its cache
|
||||
* accordingly, and call the associated I/O callback. If new updates are
|
||||
* detected, the function tries to process them as well in order to save
|
||||
* wakeups after accept().
|
||||
*/
|
||||
void fd_process_polled_events(int fd)
|
||||
{
|
||||
int new_updt, old_updt;
|
||||
|
||||
/* First thing to do is to mark the reported events as ready, in order
|
||||
* for them to later be continued from the cache without polling if
|
||||
* they have to be interrupted (eg: recv fills a buffer).
|
||||
*/
|
||||
if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
|
||||
fd_may_recv(fd);
|
||||
|
||||
if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
|
||||
fd_may_send(fd);
|
||||
|
||||
if (fdtab[fd].cache) {
|
||||
/* This fd is already cached, no need to process it now. */
|
||||
return;
|
||||
}
|
||||
|
||||
if (unlikely(!fdtab[fd].iocb || !fdtab[fd].ev)) {
|
||||
/* nothing to do */
|
||||
return;
|
||||
}
|
||||
|
||||
/* Save number of updates to detect creation of new FDs. */
|
||||
old_updt = fd_nbupdt;
|
||||
fdtab[fd].iocb(fd);
|
||||
|
||||
/* One or more fd might have been created during the iocb().
|
||||
* This mainly happens with new incoming connections that have
|
||||
* just been accepted, so we'd like to process them immediately
|
||||
* for better efficiency, as it saves one useless task wakeup.
|
||||
* Second benefit, if at the end the fds are disabled again, we can
|
||||
* safely destroy their update entry to reduce the scope of later
|
||||
* scans. This is the reason we scan the new entries backwards.
|
||||
*/
|
||||
for (new_updt = fd_nbupdt; new_updt > old_updt; new_updt--) {
|
||||
fd = fd_updt[new_updt - 1];
|
||||
if (!fdtab[fd].new)
|
||||
continue;
|
||||
|
||||
fdtab[fd].new = 0;
|
||||
fdtab[fd].ev &= FD_POLL_STICKY;
|
||||
|
||||
if ((fdtab[fd].state & FD_EV_STATUS_R) == (FD_EV_READY_R | FD_EV_ACTIVE_R))
|
||||
fdtab[fd].ev |= FD_POLL_IN;
|
||||
|
||||
if ((fdtab[fd].state & FD_EV_STATUS_W) == (FD_EV_READY_W | FD_EV_ACTIVE_W))
|
||||
fdtab[fd].ev |= FD_POLL_OUT;
|
||||
|
||||
if (fdtab[fd].ev && fdtab[fd].iocb && fdtab[fd].owner)
|
||||
fdtab[fd].iocb(fd);
|
||||
|
||||
/* we can remove this update entry if it's the last one and is
|
||||
* unused, otherwise we don't touch anything, especially given
|
||||
* that the FD might have been closed already.
|
||||
*/
|
||||
if (new_updt == fd_nbupdt && !fd_recv_active(fd) && !fd_send_active(fd)) {
|
||||
fdtab[fd].updated = 0;
|
||||
fd_nbupdt--;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* disable the specified poller */
|
||||
void disable_poller(const char *poller_name)
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user