MINOR: tasks: Change the task API so that the callback takes 3 arguments.

In preparation for thread-specific runqueues, change the task API so that
the callback takes 3 arguments, the task itself, the context, and the state,
those were retrieved from the task before. This will allow these elements to
change atomically in the scheduler while the application uses the copied
value, and even to have NULL tasks later.
This commit is contained in:
Olivier Houchard 2018-05-25 14:04:04 +02:00 committed by Willy Tarreau
parent 8c126c7235
commit 9f6af33222
15 changed files with 46 additions and 46 deletions

View File

@ -37,7 +37,7 @@ extern unsigned int error_snapshot_id; /* global ID assigned to each error then
extern struct eb_root proxy_by_name; /* tree of proxies sorted by name */ extern struct eb_root proxy_by_name; /* tree of proxies sorted by name */
int start_proxies(int verbose); int start_proxies(int verbose);
struct task *manage_proxy(struct task *t); struct task *manage_proxy(struct task *t, void *context, unsigned short state);
void soft_stop(void); void soft_stop(void);
int pause_proxy(struct proxy *p); int pause_proxy(struct proxy *p);
int resume_proxy(struct proxy *p); int resume_proxy(struct proxy *p);

View File

@ -46,7 +46,7 @@ void stream_shutdown(struct stream *stream, int why);
void stream_process_counters(struct stream *s); void stream_process_counters(struct stream *s);
void sess_change_server(struct stream *sess, struct server *newsrv); void sess_change_server(struct stream *sess, struct server *newsrv);
struct task *process_stream(struct task *t); struct task *process_stream(struct task *t, void *context, unsigned short state);
void default_srv_error(struct stream *s, struct stream_interface *si); void default_srv_error(struct stream *s, struct stream_interface *si);
int parse_track_counters(char **args, int *arg, int parse_track_counters(char **args, int *arg,
int section_type, struct proxy *curpx, int section_type, struct proxy *curpx,

View File

@ -67,7 +67,7 @@ struct task {
unsigned short pending_state; /* pending states for running talk */ unsigned short pending_state; /* pending states for running talk */
short nice; /* the task's current nice value from -1024 to +1024 */ short nice; /* the task's current nice value from -1024 to +1024 */
unsigned int calls; /* number of times ->process() was called */ unsigned int calls; /* number of times ->process() was called */
struct task * (*process)(struct task *t); /* the function which processes the task */ struct task * (*process)(struct task *t, void *ctx, unsigned short state); /* the function which processes the task */
void *context; /* the task's context */ void *context; /* the task's context */
struct eb32_node wq; /* ebtree node used to hold the task in the wait queue */ struct eb32_node wq; /* ebtree node used to hold the task in the wait queue */
int expire; /* next expiration date for this task, in ticks */ int expire; /* next expiration date for this task, in ticks */

View File

@ -1434,9 +1434,9 @@ struct data_cb check_conn_cb = {
* reached, the task automatically stops. Note that any server status change * reached, the task automatically stops. Note that any server status change
* must have updated s->last_change accordingly. * must have updated s->last_change accordingly.
*/ */
static struct task *server_warmup(struct task *t) static struct task *server_warmup(struct task *t, void *context, unsigned short state)
{ {
struct server *s = t->context; struct server *s = context;
/* by default, plan on stopping the task */ /* by default, plan on stopping the task */
t->expire = TICK_ETERNITY; t->expire = TICK_ETERNITY;
@ -1967,9 +1967,9 @@ out:
* Please do NOT place any return statement in this function and only leave * Please do NOT place any return statement in this function and only leave
* via the out_unlock label. * via the out_unlock label.
*/ */
static struct task *process_chk_proc(struct task *t) static struct task *process_chk_proc(struct task *t, void *context, unsigned short state)
{ {
struct check *check = t->context; struct check *check = context;
struct server *s = check->server; struct server *s = check->server;
int rv; int rv;
int ret; int ret;
@ -2099,9 +2099,9 @@ static struct task *process_chk_proc(struct task *t)
* Please do NOT place any return statement in this function and only leave * Please do NOT place any return statement in this function and only leave
* via the out_unlock label. * via the out_unlock label.
*/ */
static struct task *process_chk_conn(struct task *t) static struct task *process_chk_conn(struct task *t, void *context, unsigned short state)
{ {
struct check *check = t->context; struct check *check = context;
struct server *s = check->server; struct server *s = check->server;
struct conn_stream *cs = check->cs; struct conn_stream *cs = check->cs;
struct connection *conn = cs_conn(cs); struct connection *conn = cs_conn(cs);
@ -2272,13 +2272,13 @@ static struct task *process_chk_conn(struct task *t)
* manages a server health-check. Returns * manages a server health-check. Returns
* the time the task accepts to wait, or TIME_ETERNITY for infinity. * the time the task accepts to wait, or TIME_ETERNITY for infinity.
*/ */
static struct task *process_chk(struct task *t) static struct task *process_chk(struct task *t, void *context, unsigned short state)
{ {
struct check *check = t->context; struct check *check = context;
if (check->type == PR_O2_EXT_CHK) if (check->type == PR_O2_EXT_CHK)
return process_chk_proc(t); return process_chk_proc(t, context, state);
return process_chk_conn(t); return process_chk_conn(t, context, state);
} }
@ -3126,9 +3126,9 @@ void email_alert_free(struct email_alert *alert)
pool_free(pool_head_email_alert, alert); pool_free(pool_head_email_alert, alert);
} }
static struct task *process_email_alert(struct task *t) static struct task *process_email_alert(struct task *t, void *context, unsigned short state)
{ {
struct check *check = t->context; struct check *check = context;
struct email_alertq *q; struct email_alertq *q;
struct email_alert *alert; struct email_alert *alert;
@ -3153,7 +3153,7 @@ static struct task *process_email_alert(struct task *t)
check->state |= CHK_ST_ENABLED; check->state |= CHK_ST_ENABLED;
} }
process_chk(t); process_chk(t, context, state);
if (check->state & CHK_ST_INPROGRESS) if (check->state & CHK_ST_INPROGRESS)
break; break;

View File

@ -1698,9 +1698,9 @@ static void dns_resolve_send(struct dgram_conn *dgram)
* resolutions and retry them if possible. Else a timeout is reported. Then, it * resolutions and retry them if possible. Else a timeout is reported. Then, it
* checks the wait list to trigger new resolutions. * checks the wait list to trigger new resolutions.
*/ */
static struct task *dns_process_resolvers(struct task *t) static struct task *dns_process_resolvers(struct task *t, void *context, unsigned short state)
{ {
struct dns_resolvers *resolvers = t->context; struct dns_resolvers *resolvers = context;
struct dns_resolution *res, *resback; struct dns_resolution *res, *resback;
int exp; int exp;

View File

@ -1205,9 +1205,9 @@ spoe_wakeup_appctx(struct appctx *appctx)
/* Callback function that catches applet timeouts. If a timeout occurred, we set /* Callback function that catches applet timeouts. If a timeout occurred, we set
* <appctx->st1> flag and the SPOE applet is woken up. */ * <appctx->st1> flag and the SPOE applet is woken up. */
static struct task * static struct task *
spoe_process_appctx(struct task * task) spoe_process_appctx(struct task * task, void *context, unsigned short state)
{ {
struct appctx *appctx = task->context; struct appctx *appctx = context;
appctx->st1 = SPOE_APPCTX_ERR_NONE; appctx->st1 = SPOE_APPCTX_ERR_NONE;
if (tick_is_expired(task->expire, now_ms)) { if (tick_is_expired(task->expire, now_ms)) {

View File

@ -211,7 +211,7 @@ int mworker_pipe[2];
/* list of the temporarily limited listeners because of lack of resource */ /* list of the temporarily limited listeners because of lack of resource */
struct list global_listener_queue = LIST_HEAD_INIT(global_listener_queue); struct list global_listener_queue = LIST_HEAD_INIT(global_listener_queue);
struct task *global_listener_queue_task; struct task *global_listener_queue_task;
static struct task *manage_global_listener_queue(struct task *t); static struct task *manage_global_listener_queue(struct task *t, void *context, unsigned short state);
/* bitfield of a few warnings to emit just once (WARN_*) */ /* bitfield of a few warnings to emit just once (WARN_*) */
unsigned int warned = 0; unsigned int warned = 0;
@ -2476,7 +2476,7 @@ static void *run_thread_poll_loop(void *data)
* for global resources when there are enough free resource, or at least once in * for global resources when there are enough free resource, or at least once in
* a while. It is designed to be called as a task. * a while. It is designed to be called as a task.
*/ */
static struct task *manage_global_listener_queue(struct task *t) static struct task *manage_global_listener_queue(struct task *t, void *context, unsigned short state)
{ {
int next = TICK_ETERNITY; int next = TICK_ETERNITY;
/* queue is empty, nothing to do */ /* queue is empty, nothing to do */

View File

@ -5517,9 +5517,9 @@ __LJMP static int hlua_set_nice(lua_State *L)
* Task wrapper are longjmp safe because the only one Lua code * Task wrapper are longjmp safe because the only one Lua code
* executed is the safe hlua_ctx_resume(); * executed is the safe hlua_ctx_resume();
*/ */
static struct task *hlua_process_task(struct task *task) static struct task *hlua_process_task(struct task *task, void *context, unsigned short state)
{ {
struct hlua *hlua = task->context; struct hlua *hlua = context;
enum hlua_exec status; enum hlua_exec status;
if (task->thread_mask == MAX_THREADS_MASK) if (task->thread_mask == MAX_THREADS_MASK)
@ -6216,9 +6216,9 @@ static enum act_return hlua_action(struct act_rule *rule, struct proxy *px,
} }
} }
struct task *hlua_applet_wakeup(struct task *t) struct task *hlua_applet_wakeup(struct task *t, void *context, unsigned short state)
{ {
struct appctx *ctx = t->context; struct appctx *ctx = context;
struct stream_interface *si = ctx->owner; struct stream_interface *si = ctx->owner;
/* If the applet is wake up without any expected work, the sheduler /* If the applet is wake up without any expected work, the sheduler

View File

@ -215,7 +215,7 @@ static const struct h2s *h2_idle_stream = &(const struct h2s){
.id = 0, .id = 0,
}; };
static struct task *h2_timeout_task(struct task *t); static struct task *h2_timeout_task(struct task *t, void *context, unsigned short state);
/*****************************************************/ /*****************************************************/
/* functions below are for dynamic buffer management */ /* functions below are for dynamic buffer management */
@ -2324,9 +2324,9 @@ static int h2_wake(struct connection *conn)
* immediately killed. If it's allocatable and empty, we attempt to send a * immediately killed. If it's allocatable and empty, we attempt to send a
* GOAWAY frame. * GOAWAY frame.
*/ */
static struct task *h2_timeout_task(struct task *t) static struct task *h2_timeout_task(struct task *t, void *context, unsigned short state)
{ {
struct h2c *h2c = t->context; struct h2c *h2c = context;
int expired = tick_is_expired(t->expire, now_ms); int expired = tick_is_expired(t->expire, now_ms);
if (!expired && h2c) if (!expired && h2c)

View File

@ -1958,9 +1958,9 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
* Task processing function to manage re-connect and peer session * Task processing function to manage re-connect and peer session
* tasks wakeup on local update. * tasks wakeup on local update.
*/ */
static struct task *process_peer_sync(struct task * task) static struct task *process_peer_sync(struct task * task, void *context, unsigned short state)
{ {
struct peers *peers = task->context; struct peers *peers = context;
struct peer *ps; struct peer *ps;
struct shared_table *st; struct shared_table *st;

View File

@ -834,9 +834,9 @@ int start_proxies(int verbose)
* called as a task which is woken up upon stopping or when rate limiting must * called as a task which is woken up upon stopping or when rate limiting must
* be enforced. * be enforced.
*/ */
struct task *manage_proxy(struct task *t) struct task *manage_proxy(struct task *t, void *context, unsigned short state)
{ {
struct proxy *p = t->context; struct proxy *p = context;
int next = TICK_ETERNITY; int next = TICK_ETERNITY;
unsigned int wait; unsigned int wait;
@ -934,7 +934,7 @@ static int proxy_parse_hard_stop_after(char **args, int section_type, struct pro
return 0; return 0;
} }
struct task *hard_stop(struct task *t) struct task *hard_stop(struct task *t, void *context, unsigned short state)
{ {
struct proxy *p; struct proxy *p;
struct stream *s; struct stream *s;

View File

@ -31,7 +31,7 @@
struct pool_head *pool_head_session; struct pool_head *pool_head_session;
static int conn_complete_session(struct connection *conn); static int conn_complete_session(struct connection *conn);
static struct task *session_expire_embryonic(struct task *t); static struct task *session_expire_embryonic(struct task *t, void *context, unsigned short state);
/* Create a a new session and assign it to frontend <fe>, listener <li>, /* Create a a new session and assign it to frontend <fe>, listener <li>,
* origin <origin>, set the current date and clear the stick counters pointers. * origin <origin>, set the current date and clear the stick counters pointers.
@ -381,9 +381,9 @@ static void session_kill_embryonic(struct session *sess)
/* Manages the embryonic session timeout. It is only called when the timeout /* Manages the embryonic session timeout. It is only called when the timeout
* strikes and performs the required cleanup. * strikes and performs the required cleanup.
*/ */
static struct task *session_expire_embryonic(struct task *t) static struct task *session_expire_embryonic(struct task *t, void *context, unsigned short state)
{ {
struct session *sess = t->context; struct session *sess = context;
if (!(t->state & TASK_WOKEN_TIMER)) if (!(t->state & TASK_WOKEN_TIMER))
return t; return t;

View File

@ -578,9 +578,9 @@ out_unlock:
* Task processing function to trash expired sticky sessions. A pointer to the * Task processing function to trash expired sticky sessions. A pointer to the
* task itself is returned since it never dies. * task itself is returned since it never dies.
*/ */
static struct task *process_table_expire(struct task *task) static struct task *process_table_expire(struct task *task, void *context, unsigned short state)
{ {
struct stktable *t = task->context; struct stktable *t = context;
task->expire = stktable_trash_expired(t); task->expire = stktable_trash_expired(t);
return task; return task;

View File

@ -1615,10 +1615,10 @@ static int process_store_rules(struct stream *s, struct channel *rep, int an_bit
* and each function is called only if at least another function has changed at * and each function is called only if at least another function has changed at
* least one flag it is interested in. * least one flag it is interested in.
*/ */
struct task *process_stream(struct task *t) struct task *process_stream(struct task *t, void *context, unsigned short state)
{ {
struct server *srv; struct server *srv;
struct stream *s = t->context; struct stream *s = context;
struct session *sess = s->sess; struct session *sess = s->sess;
unsigned int rqf_last, rpf_last; unsigned int rqf_last, rpf_last;
unsigned int rq_prod_last, rq_cons_last; unsigned int rq_prod_last, rq_cons_last;
@ -1655,7 +1655,7 @@ struct task *process_stream(struct task *t)
si_b->flags |= SI_FL_DONT_WAKE; si_b->flags |= SI_FL_DONT_WAKE;
/* update pending events */ /* update pending events */
s->pending_events |= (t->state & TASK_WOKEN_ANY); s->pending_events |= (state & TASK_WOKEN_ANY);
/* 1a: Check for low level timeouts if needed. We just set a flag on /* 1a: Check for low level timeouts if needed. We just set a flag on
* stream interfaces when their timeouts have expired. * stream interfaces when their timeouts have expired.

View File

@ -226,10 +226,10 @@ void process_runnable_tasks()
* predictor take this most common call. * predictor take this most common call.
*/ */
if (likely(t->process == process_stream)) if (likely(t->process == process_stream))
t = process_stream(t); t = process_stream(t, t->context, t->state);
else { else {
if (t->process != NULL) if (t->process != NULL)
t = t->process(t); t = t->process(t, t->context, t->state);
else { else {
__task_free(t); __task_free(t);
t = NULL; t = NULL;
@ -314,10 +314,10 @@ void process_runnable_tasks()
*/ */
curr_task = t; curr_task = t;
if (likely(t->process == process_stream)) if (likely(t->process == process_stream))
t = process_stream(t); t = process_stream(t, t->context, t->state);
else { else {
if (t->process != NULL) if (t->process != NULL)
t = t->process(t); t = t->process(t, t->context, t->state);
else { else {
__task_free(t); __task_free(t);
t = NULL; t = NULL;