diff --git a/include/proto/fd.h b/include/proto/fd.h index b0a478e4c..87309bf0a 100644 --- a/include/proto/fd.h +++ b/include/proto/fd.h @@ -81,18 +81,10 @@ void run_poller(); */ void fd_process_cached_events(); -/* Check the events attached to a file descriptor, update its cache - * accordingly, and call the associated I/O callback. If new updates are - * detected, the function tries to process them as well in order to save - * wakeups after accept(). +/* Mark fd as updated for polling and allocate an entry in the update list + * for this if it was not already there. This can be done at any time. */ -void fd_process_polled_events(int fd); - - -/* Mark fd as updated and allocate an entry in the update list for this if - * it was not already there. This can be done at any time. - */ -static inline void updt_fd(const int fd) +static inline void updt_fd_polling(const int fd) { if (fdtab[fd].updated) /* already scheduled for update */ @@ -157,15 +149,22 @@ static inline int fd_compute_new_polled_status(int state) return state; } -/* Automatically allocates or releases a cache entry for fd depending on - * its new state. This is meant to be used by pollers while processing updates. +/* This function automatically enables/disables caching for an entry depending + * on its state, and also possibly creates an update entry so that the poller + * does its job as well. It is only called on state changes. */ -static inline void fd_alloc_or_release_cache_entry(int fd, int new_state) +static inline void fd_update_cache(int fd) { - /* READY and ACTIVE states (the two with both flags set) require a cache entry */ + /* 3 states for each direction require a polling update */ + if ((fdtab[fd].state & (FD_EV_POLLED_R | FD_EV_ACTIVE_R)) == FD_EV_POLLED_R || + (fdtab[fd].state & (FD_EV_POLLED_R | FD_EV_READY_R | FD_EV_ACTIVE_R)) == FD_EV_ACTIVE_R || + (fdtab[fd].state & (FD_EV_POLLED_W | FD_EV_ACTIVE_W)) == FD_EV_POLLED_W || + (fdtab[fd].state & (FD_EV_POLLED_W | FD_EV_READY_W | FD_EV_ACTIVE_W)) == FD_EV_ACTIVE_W) + updt_fd_polling(fd); - if (((new_state & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R)) || - ((new_state & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W))) { + /* only READY and ACTIVE states (the two with both flags set) require a cache entry */ + if (((fdtab[fd].state & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R)) || + ((fdtab[fd].state & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W))) { fd_alloc_cache_entry(fd); } else { @@ -243,7 +242,7 @@ static inline void fd_stop_recv(int fd) if (!((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_R)) return; /* already disabled */ fdtab[fd].state &= ~FD_EV_ACTIVE_R; - updt_fd(fd); /* need an update entry to change the state */ + fd_update_cache(fd); /* need an update entry to change the state */ } /* Disable processing send events on fd */ @@ -252,7 +251,7 @@ static inline void fd_stop_send(int fd) if (!((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_W)) return; /* already disabled */ fdtab[fd].state &= ~FD_EV_ACTIVE_W; - updt_fd(fd); /* need an update entry to change the state */ + fd_update_cache(fd); /* need an update entry to change the state */ } /* Disable processing of events on fd for both directions. */ @@ -261,7 +260,7 @@ static inline void fd_stop_both(int fd) if (!((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_RW)) return; /* already disabled */ fdtab[fd].state &= ~FD_EV_ACTIVE_RW; - updt_fd(fd); /* need an update entry to change the state */ + fd_update_cache(fd); /* need an update entry to change the state */ } /* Report that FD cannot receive anymore without polling (EAGAIN detected). */ @@ -270,7 +269,7 @@ static inline void fd_cant_recv(const int fd) if (!(((unsigned int)fdtab[fd].state) & FD_EV_READY_R)) return; /* already marked as blocked */ fdtab[fd].state &= ~FD_EV_READY_R; - updt_fd(fd); + fd_update_cache(fd); } /* Report that FD can receive anymore without polling. */ @@ -279,7 +278,7 @@ static inline void fd_may_recv(const int fd) if (((unsigned int)fdtab[fd].state) & FD_EV_READY_R) return; /* already marked as blocked */ fdtab[fd].state |= FD_EV_READY_R; - updt_fd(fd); + fd_update_cache(fd); } /* Disable readiness when polled. This is useful to interrupt reading when it @@ -299,7 +298,7 @@ static inline void fd_cant_send(const int fd) if (!(((unsigned int)fdtab[fd].state) & FD_EV_READY_W)) return; /* already marked as blocked */ fdtab[fd].state &= ~FD_EV_READY_W; - updt_fd(fd); + fd_update_cache(fd); } /* Report that FD can send anymore without polling (EAGAIN detected). */ @@ -308,7 +307,7 @@ static inline void fd_may_send(const int fd) if (((unsigned int)fdtab[fd].state) & FD_EV_READY_W) return; /* already marked as blocked */ fdtab[fd].state |= FD_EV_READY_W; - updt_fd(fd); + fd_update_cache(fd); } /* Prepare FD to try to receive */ @@ -317,7 +316,7 @@ static inline void fd_want_recv(int fd) if (((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_R)) return; /* already enabled */ fdtab[fd].state |= FD_EV_ACTIVE_R; - updt_fd(fd); /* need an update entry to change the state */ + fd_update_cache(fd); /* need an update entry to change the state */ } /* Prepare FD to try to send */ @@ -326,7 +325,7 @@ static inline void fd_want_send(int fd) if (((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_W)) return; /* already enabled */ fdtab[fd].state |= FD_EV_ACTIVE_W; - updt_fd(fd); /* need an update entry to change the state */ + fd_update_cache(fd); /* need an update entry to change the state */ } /* Prepares for being polled */ diff --git a/src/ev_epoll.c b/src/ev_epoll.c index 9d359b2ab..755c6fa58 100644 --- a/src/ev_epoll.c +++ b/src/ev_epoll.c @@ -69,7 +69,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) int updt_idx; int wait_time; - /* first, scan the update list to find changes */ + /* first, scan the update list to find polling changes */ for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) { fd = fd_updt[updt_idx]; fdtab[fd].updated = 0; @@ -109,8 +109,6 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) ev.data.fd = fd; epoll_ctl(epoll_fd, opcode, fd, &ev); } - - fd_alloc_or_release_cache_entry(fd, en); } fd_nbupdt = 0; @@ -175,7 +173,11 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) n |= FD_POLL_HUP; fdtab[fd].ev |= n; - fd_process_polled_events(fd); + if (n & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) + fd_may_recv(fd); + + if (n & (FD_POLL_OUT | FD_POLL_ERR)) + fd_may_send(fd); } /* the caller will take care of cached events */ } diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c index 06ccaee29..2af94b647 100644 --- a/src/ev_kqueue.c +++ b/src/ev_kqueue.c @@ -84,8 +84,6 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) } } } - - fd_alloc_or_release_cache_entry(fd, en); } if (changes) kevent(kqueue_fd, kev, changes, NULL, 0, NULL); @@ -138,7 +136,11 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) fdtab[fd].ev |= FD_POLL_OUT; } - fd_process_polled_events(fd); + if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) + fd_may_recv(fd); + + if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR)) + fd_may_send(fd); } } diff --git a/src/ev_poll.c b/src/ev_poll.c index 2f6e56d3c..866906c3a 100644 --- a/src/ev_poll.c +++ b/src/ev_poll.c @@ -93,8 +93,6 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) else if ((en & ~eo) & FD_EV_POLLED_W) hap_fd_set(fd, fd_evts[DIR_WR]); } - - fd_alloc_or_release_cache_entry(fd, en); } fd_nbupdt = 0; @@ -164,7 +162,11 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) ((e & POLLHUP) ? FD_POLL_HUP : 0); } - fd_process_polled_events(fd); + if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) + fd_may_recv(fd); + + if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR)) + fd_may_send(fd); } } diff --git a/src/ev_select.c b/src/ev_select.c index 5a76d44c0..73fe327aa 100644 --- a/src/ev_select.c +++ b/src/ev_select.c @@ -76,8 +76,6 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) else if ((en & ~eo) & FD_EV_POLLED_W) FD_SET(fd, fd_evts[DIR_WR]); } - - fd_alloc_or_release_cache_entry(fd, en); } fd_nbupdt = 0; @@ -148,7 +146,11 @@ REGPRM2 static void _do_poll(struct poller *p, int exp) if (FD_ISSET(fd, tmp_evts[DIR_WR])) fdtab[fd].ev |= FD_POLL_OUT; - fd_process_polled_events(fd); + if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) + fd_may_recv(fd); + + if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR)) + fd_may_send(fd); } } } diff --git a/src/fd.c b/src/fd.c index c238bc880..2a6179f9f 100644 --- a/src/fd.c +++ b/src/fd.c @@ -199,7 +199,9 @@ void fd_delete(int fd) } /* Scan and process the cached events. This should be called right after - * the poller. + * the poller. The loop may cause new entries to be created, for example + * if a listener causes an accept() to initiate a new incoming connection + * wanting to attempt an recv(). */ void fd_process_cached_events() { @@ -209,12 +211,6 @@ void fd_process_cached_events() fd = fd_cache[entry]; e = fdtab[fd].state; - /* Principle: events which are marked FD_EV_ACTIVE are processed - * with their usual I/O callback. The callback may remove the - * events from the cache or tag them for polling. Changes will be - * applied on next round. Cache entries with no more activity are - * automatically scheduled for removal. - */ fdtab[fd].ev &= FD_POLL_STICKY; if ((e & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R)) @@ -226,7 +222,7 @@ void fd_process_cached_events() if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev) fdtab[fd].iocb(fd); else - updt_fd(fd); + fd_release_cache_entry(fd); /* If the fd was removed from the cache, it has been * replaced by the next one that we don't want to skip ! @@ -237,75 +233,6 @@ void fd_process_cached_events() } } -/* Check the events attached to a file descriptor, update its cache - * accordingly, and call the associated I/O callback. If new updates are - * detected, the function tries to process them as well in order to save - * wakeups after accept(). - */ -void fd_process_polled_events(int fd) -{ - int new_updt, old_updt; - - /* First thing to do is to mark the reported events as ready, in order - * for them to later be continued from the cache without polling if - * they have to be interrupted (eg: recv fills a buffer). - */ - if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) - fd_may_recv(fd); - - if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR)) - fd_may_send(fd); - - if (fdtab[fd].cache) { - /* This fd is already cached, no need to process it now. */ - return; - } - - if (unlikely(!fdtab[fd].iocb || !fdtab[fd].ev)) { - /* nothing to do */ - return; - } - - /* Save number of updates to detect creation of new FDs. */ - old_updt = fd_nbupdt; - fdtab[fd].iocb(fd); - - /* One or more fd might have been created during the iocb(). - * This mainly happens with new incoming connections that have - * just been accepted, so we'd like to process them immediately - * for better efficiency, as it saves one useless task wakeup. - * Second benefit, if at the end the fds are disabled again, we can - * safely destroy their update entry to reduce the scope of later - * scans. This is the reason we scan the new entries backwards. - */ - for (new_updt = fd_nbupdt; new_updt > old_updt; new_updt--) { - fd = fd_updt[new_updt - 1]; - if (!fdtab[fd].new) - continue; - - fdtab[fd].new = 0; - fdtab[fd].ev &= FD_POLL_STICKY; - - if ((fdtab[fd].state & FD_EV_STATUS_R) == (FD_EV_READY_R | FD_EV_ACTIVE_R)) - fdtab[fd].ev |= FD_POLL_IN; - - if ((fdtab[fd].state & FD_EV_STATUS_W) == (FD_EV_READY_W | FD_EV_ACTIVE_W)) - fdtab[fd].ev |= FD_POLL_OUT; - - if (fdtab[fd].ev && fdtab[fd].iocb && fdtab[fd].owner) - fdtab[fd].iocb(fd); - - /* we can remove this update entry if it's the last one and is - * unused, otherwise we don't touch anything, especially given - * that the FD might have been closed already. - */ - if (new_updt == fd_nbupdt && !fd_recv_active(fd) && !fd_send_active(fd)) { - fdtab[fd].updated = 0; - fd_nbupdt--; - } - } -} - /* disable the specified poller */ void disable_poller(const char *poller_name) {