MEDIUM: polling: centralize polled events processing

Currently, each poll loop handles the polled events the same way,
resulting in a lot of duplicated, complex code. Additionally, epoll
was the only one to handle newly created FDs immediately.

So instead, let's move that code to fd.c in a new function dedicated
to this task : fd_process_polled_events(). All pollers now use this
function.
This commit is contained in:
Willy Tarreau 2014-01-25 09:58:06 +01:00
parent 6c11bd2f89
commit e852545594
6 changed files with 81 additions and 92 deletions

View File

@ -81,6 +81,14 @@ void run_poller();
*/
void fd_process_cached_events();
/* Check the events attached to a file descriptor, update its cache
* accordingly, and call the associated I/O callback. If new updates are
* detected, the function tries to process them as well in order to save
* wakeups after accept().
*/
void fd_process_polled_events(int fd);
/* Mark fd <fd> as updated and allocate an entry in the update list for this if
* it was not already there. This can be done at any time.
*/

View File

@ -174,63 +174,8 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
if (e & EPOLLRDHUP)
n |= FD_POLL_HUP;
if (!n)
continue;
fdtab[fd].ev |= n;
if (fdtab[fd].iocb) {
int new_updt, old_updt;
/* Mark the events as ready before processing */
if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
fd_may_recv(fd);
if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
fd_may_send(fd);
if (fdtab[fd].cache)
continue;
/* Save number of updates to detect creation of new FDs. */
old_updt = fd_nbupdt;
fdtab[fd].iocb(fd);
/* One or more fd might have been created during the iocb().
* This mainly happens with new incoming connections that have
* just been accepted, so we'd like to process them immediately
* for better efficiency. Second benefit, if at the end the fds
* are disabled again, we can safely destroy their update entry
* to reduce the scope of later scans. This is the reason we
* scan the new entries backwards.
*/
for (new_updt = fd_nbupdt; new_updt > old_updt; new_updt--) {
fd = fd_updt[new_updt - 1];
if (!fdtab[fd].new)
continue;
fdtab[fd].new = 0;
fdtab[fd].ev &= FD_POLL_STICKY;
if ((fdtab[fd].state & FD_EV_STATUS_R) == (FD_EV_READY_R | FD_EV_ACTIVE_R))
fdtab[fd].ev |= FD_POLL_IN;
if ((fdtab[fd].state & FD_EV_STATUS_W) == (FD_EV_READY_W | FD_EV_ACTIVE_W))
fdtab[fd].ev |= FD_POLL_OUT;
if (fdtab[fd].ev && fdtab[fd].iocb && fdtab[fd].owner)
fdtab[fd].iocb(fd);
/* we can remove this update entry if it's the last one and is
* unused, otherwise we don't touch anything.
*/
if (new_updt == fd_nbupdt && !fd_recv_active(fd) && !fd_send_active(fd)) {
fdtab[fd].updated = 0;
fd_nbupdt--;
}
}
}
fd_process_polled_events(fd);
}
/* the caller will take care of cached events */
}

View File

@ -152,18 +152,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
fdtab[fd].ev |= FD_POLL_OUT;
}
if (fdtab[fd].iocb && fdtab[fd].ev) {
if (fdtab[fd].ev & FD_POLL_IN)
fd_may_recv(fd);
if (fdtab[fd].ev & FD_POLL_OUT)
fd_may_send(fd);
if (fdtab[fd].cache)
continue;
fdtab[fd].iocb(fd);
}
fd_process_polled_events(fd);
}
}

View File

@ -177,18 +177,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
((e & POLLHUP) ? FD_POLL_HUP : 0);
}
if (fdtab[fd].iocb && fdtab[fd].ev) {
if (fdtab[fd].ev & FD_POLL_IN)
fd_may_recv(fd);
if (fdtab[fd].ev & FD_POLL_OUT)
fd_may_send(fd);
if (fdtab[fd].cache)
continue;
fdtab[fd].iocb(fd);
}
fd_process_polled_events(fd);
}
}

View File

@ -161,18 +161,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
if (FD_ISSET(fd, tmp_evts[DIR_WR]))
fdtab[fd].ev |= FD_POLL_OUT;
if (fdtab[fd].iocb && fdtab[fd].ev) {
if (fdtab[fd].ev & FD_POLL_IN)
fd_may_recv(fd);
if (fdtab[fd].ev & FD_POLL_OUT)
fd_may_send(fd);
if (fdtab[fd].cache)
continue;
fdtab[fd].iocb(fd);
}
fd_process_polled_events(fd);
}
}
}

View File

@ -237,6 +237,75 @@ void fd_process_cached_events()
}
}
/* Check the events attached to a file descriptor, update its cache
* accordingly, and call the associated I/O callback. If new updates are
* detected, the function tries to process them as well in order to save
* wakeups after accept().
*/
void fd_process_polled_events(int fd)
{
int new_updt, old_updt;
/* First thing to do is to mark the reported events as ready, in order
* for them to later be continued from the cache without polling if
* they have to be interrupted (eg: recv fills a buffer).
*/
if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
fd_may_recv(fd);
if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
fd_may_send(fd);
if (fdtab[fd].cache) {
/* This fd is already cached, no need to process it now. */
return;
}
if (unlikely(!fdtab[fd].iocb || !fdtab[fd].ev)) {
/* nothing to do */
return;
}
/* Save number of updates to detect creation of new FDs. */
old_updt = fd_nbupdt;
fdtab[fd].iocb(fd);
/* One or more fd might have been created during the iocb().
* This mainly happens with new incoming connections that have
* just been accepted, so we'd like to process them immediately
* for better efficiency, as it saves one useless task wakeup.
* Second benefit, if at the end the fds are disabled again, we can
* safely destroy their update entry to reduce the scope of later
* scans. This is the reason we scan the new entries backwards.
*/
for (new_updt = fd_nbupdt; new_updt > old_updt; new_updt--) {
fd = fd_updt[new_updt - 1];
if (!fdtab[fd].new)
continue;
fdtab[fd].new = 0;
fdtab[fd].ev &= FD_POLL_STICKY;
if ((fdtab[fd].state & FD_EV_STATUS_R) == (FD_EV_READY_R | FD_EV_ACTIVE_R))
fdtab[fd].ev |= FD_POLL_IN;
if ((fdtab[fd].state & FD_EV_STATUS_W) == (FD_EV_READY_W | FD_EV_ACTIVE_W))
fdtab[fd].ev |= FD_POLL_OUT;
if (fdtab[fd].ev && fdtab[fd].iocb && fdtab[fd].owner)
fdtab[fd].iocb(fd);
/* we can remove this update entry if it's the last one and is
* unused, otherwise we don't touch anything, especially given
* that the FD might have been closed already.
*/
if (new_updt == fd_nbupdt && !fd_recv_active(fd) && !fd_send_active(fd)) {
fdtab[fd].updated = 0;
fd_nbupdt--;
}
}
}
/* disable the specified poller */
void disable_poller(const char *poller_name)
{