MINOR: task: provide 3 task_new_* wrappers to simplify the API

We'll need to improve the API to pass other arguments in the future, so
let's start to adapt better to the current use cases. task_new() is used:
  - 18 times as task_new(tid_bit)
  - 18 times as task_new(MAX_THREADS_MASK)
  - 2 times with a single bit (in a loop)
  - 1 in the debug code that uses a mask

This patch provides 3 new functions to achieve this:
  - task_new_here()     to create a task on the calling thread
  - task_new_anywhere() to create a task to be run anywhere
  - task_new_on()       to create a task to run on a specific thread

The change is trivial and will allow us to later concentrate the
required adaptations to these 3 functions only. It's still possible
to call task_new() if needed but a comment was added to encourage the
use of the new ones instead. The debug code was not changed and still
uses it.
This commit is contained in:
Willy Tarreau 2021-10-01 18:23:30 +02:00
parent 6a2a912cb8
commit beeabf5314
23 changed files with 71 additions and 42 deletions

View File

@ -68,7 +68,7 @@ static inline struct appctx *appctx_new(struct applet *applet)
appctx->obj_type = OBJ_TYPE_APPCTX;
appctx->applet = applet;
appctx_init(appctx);
appctx->t = task_new(tid_bit);
appctx->t = task_new_here();
if (unlikely(appctx->t == NULL)) {
pool_free(pool_head_appctx, appctx);
return NULL;

View File

@ -462,8 +462,9 @@ static inline struct tasklet *tasklet_new(void)
/*
* Allocate and initialise a new task. The new task is returned, or NULL in
* case of lack of memory. The task count is incremented. Tasks should only
* be allocated this way, and must be freed using task_free().
* case of lack of memory. The task count is incremented. This API might change
* in the near future, so prefer one of the task_new_*() wrappers below which
* are usually more suitable. Tasks must be freed using task_free().
*/
static inline struct task *task_new(unsigned long thread_mask)
{
@ -475,6 +476,33 @@ static inline struct task *task_new(unsigned long thread_mask)
return t;
}
/* Allocate and initialize a new task, to run on global thread <thr>. The new
* task is returned, or NULL in case of lack of memory. It's up to the caller
* to pass a valid thread number (in tid space, 0 to nbthread-1). The task
* count is incremented.
*/
static inline struct task *task_new_on(uint thr)
{
return task_new(1UL << thr);
}
/* Allocate and initialize a new task, to run on the calling thread. The new
* task is returned, or NULL in case of lack of memory. The task count is
* incremented.
*/
static inline struct task *task_new_here()
{
return task_new(tid_bit);
}
/* Allocate and initialize a new task, to run on any thread. The new task is
* returned, or NULL in case of lack of memory. The task count is incremented.
*/
static inline struct task *task_new_anywhere()
{
return task_new(MAX_THREADS_MASK);
}
/*
* Free a task. Its context must have been freed since it will be lost. The
* task count is decremented. It it is the current task, this one is reset.

View File

@ -3680,7 +3680,7 @@ out_uri_auth_compat:
}
}
idle_conn_task = task_new(MAX_THREADS_MASK);
idle_conn_task = task_new_anywhere();
if (!idle_conn_task) {
ha_alert("parsing : failed to allocate global idle connection task.\n");
cfgerr++;
@ -3690,7 +3690,7 @@ out_uri_auth_compat:
idle_conn_task->context = NULL;
for (i = 0; i < global.nbthread; i++) {
idle_conns[i].cleanup_task = task_new(1UL << i);
idle_conns[i].cleanup_task = task_new_on(i);
if (!idle_conns[i].cleanup_task) {
ha_alert("parsing : failed to allocate idle connection tasks for thread '%d'.\n", i);
cfgerr++;
@ -3769,7 +3769,7 @@ out_uri_auth_compat:
}
/* create the task associated with the proxy */
curproxy->task = task_new(MAX_THREADS_MASK);
curproxy->task = task_new_anywhere();
if (curproxy->task) {
curproxy->task->context = curproxy;
curproxy->task->process = manage_proxy;

View File

@ -1388,13 +1388,14 @@ int start_check_task(struct check *check, int mininter,
int nbcheck, int srvpos)
{
struct task *t;
unsigned long thread_mask = MAX_THREADS_MASK;
/* task for the check. Process-based checks exclusively run on thread 1. */
if (check->type == PR_O2_EXT_CHK)
thread_mask = 1;
t = task_new_on(1);
else
t = task_new_anywhere();
/* task for the check */
if ((t = task_new(thread_mask)) == NULL) {
if (!t) {
ha_alert("Starting [%s:%s] check: out of memory.\n",
check->server->proxy->id, check->server->id);
return 0;

View File

@ -1686,7 +1686,7 @@ static struct task *mux_stopping_process(struct task *t, void *ctx, unsigned int
static int allocate_mux_cleanup(void)
{
/* allocates the thread bound mux_stopping_data task */
mux_stopping_data[tid].task = task_new(tid_bit);
mux_stopping_data[tid].task = task_new_here();
if (!mux_stopping_data[tid].task) {
ha_alert("Failed to allocate the task for connection cleanup on thread %d.\n", tid);
return 0;

View File

@ -1027,7 +1027,7 @@ struct dns_session *dns_session_new(struct dns_stream_server *dss)
/* never fail because it is the first watcher attached to the ring */
DISGUISE(ring_attach(&ds->ring));
if ((ds->task_exp = task_new(tid_bit)) == NULL)
if ((ds->task_exp = task_new_here()) == NULL)
goto error;
ds->task_exp->process = dns_process_query_exp;
@ -1223,7 +1223,7 @@ int dns_stream_init(struct dns_nameserver *ns, struct server *srv)
goto out;
}
/* Create the task associated to the resolver target handling conns */
if ((dss->task_req = task_new(MAX_THREADS_MASK)) == NULL) {
if ((dss->task_req = task_new_anywhere()) == NULL) {
ha_alert("memory allocation error initializing the ring for dns tcp server '%s'.\n", srv->id);
goto out;
}
@ -1240,7 +1240,7 @@ int dns_stream_init(struct dns_nameserver *ns, struct server *srv)
}
/* Create the task associated to the resolver target handling conns */
if ((dss->task_rsp = task_new(MAX_THREADS_MASK)) == NULL) {
if ((dss->task_rsp = task_new_anywhere()) == NULL) {
ha_alert("memory allocation error initializing the ring for dns tcp server '%s'.\n", srv->id);
goto out;
}
@ -1250,7 +1250,7 @@ int dns_stream_init(struct dns_nameserver *ns, struct server *srv)
dss->task_rsp->context = ns;
/* Create the task associated to the resolver target handling conns */
if ((dss->task_idle = task_new(MAX_THREADS_MASK)) == NULL) {
if ((dss->task_idle = task_new_anywhere()) == NULL) {
ha_alert("memory allocation error initializing the ring for dns tcp server '%s'.\n", srv->id);
goto out;
}

View File

@ -1998,7 +1998,7 @@ spoe_create_appctx(struct spoe_config *conf)
goto out_free_appctx;
appctx->st0 = SPOE_APPCTX_ST_CONNECT;
if ((SPOE_APPCTX(appctx)->task = task_new(tid_bit)) == NULL)
if ((SPOE_APPCTX(appctx)->task = task_new_here()) == NULL)
goto out_free_spoe_appctx;
SPOE_APPCTX(appctx)->owner = appctx;

View File

@ -8251,9 +8251,9 @@ static int hlua_register_task(lua_State *L)
* otherwise, inherit the current thread identifier
*/
if (state_id == 0)
task = task_new(MAX_THREADS_MASK);
task = task_new_anywhere();
else
task = task_new(tid_bit);
task = task_new_here();
if (!task)
goto alloc_error;
@ -8941,7 +8941,7 @@ static int hlua_applet_tcp_init(struct appctx *ctx, struct proxy *px, struct str
ctx->ctx.hlua_apptcp.flags = 0;
/* Create task used by signal to wakeup applets. */
task = task_new(tid_bit);
task = task_new_here();
if (!task) {
SEND_ERR(px, "Lua applet tcp '%s': out of memory.\n",
ctx->rule->arg.hlua_rule->fcn->name);
@ -9134,7 +9134,7 @@ static int hlua_applet_http_init(struct appctx *ctx, struct proxy *px, struct st
ctx->ctx.hlua_apphttp.flags |= APPLET_HTTP11;
/* Create task used by signal to wakeup applets. */
task = task_new(tid_bit);
task = task_new_here();
if (!task) {
SEND_ERR(px, "Lua applet http '%s': out of memory.\n",
ctx->rule->arg.hlua_rule->fcn->name);
@ -9753,7 +9753,7 @@ static int hlua_cli_parse_fct(char **args, char *payload, struct appctx *appctx,
* We use the same wakeup function than the Lua applet_tcp and
* applet_http. It is absolutely compatible.
*/
appctx->ctx.hlua_cli.task = task_new(tid_bit);
appctx->ctx.hlua_cli.task = task_new_here();
if (!appctx->ctx.hlua_cli.task) {
SEND_ERR(NULL, "Lua cli '%s': out of memory.\n", fcn->name);
goto error;

View File

@ -1134,7 +1134,7 @@ void listener_release(struct listener *l)
/* Initializes the listener queues. Returns 0 on success, otherwise ERR_* flags */
static int listener_queue_init()
{
global_listener_queue_task = task_new(MAX_THREADS_MASK);
global_listener_queue_task = task_new_anywhere();
if (!global_listener_queue_task) {
ha_alert("Out of memory when initializing global listener queue\n");
return ERR_FATAL|ERR_ABORT;

View File

@ -133,7 +133,7 @@ int init_email_alert(struct mailers *mls, struct proxy *p, char **err)
check->addr = mailer->addr;
check->port = get_host_port(&mailer->addr);
if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
if ((t = task_new_anywhere()) == NULL) {
memprintf(err, "out of memory while allocating mailer alerts task");
goto error;
}

View File

@ -734,7 +734,7 @@ static int fcgi_init(struct connection *conn, struct proxy *px, struct session *
fconn->app = app;
fconn->task = NULL;
if (tick_isset(fconn->timeout)) {
t = task_new(tid_bit);
t = task_new_here();
if (!t) {
TRACE_ERROR("fconn task allocation failure", FCGI_EV_FCONN_NEW|FCGI_EV_FCONN_END|FCGI_EV_FCONN_ERR);
goto fail;
@ -4247,7 +4247,7 @@ static int fcgi_takeover(struct connection *conn, int orig_tid)
__ha_barrier_store();
task_kill(task);
fcgi->task = task_new(tid_bit);
fcgi->task = task_new_here();
if (!fcgi->task) {
fcgi_release(fcgi);
return -1;

View File

@ -808,7 +808,7 @@ static int h1_init(struct connection *conn, struct proxy *proxy, struct session
&h1c->conn->stopping_list);
}
if (tick_isset(h1c->timeout)) {
t = task_new(tid_bit);
t = task_new_here();
if (!t) {
TRACE_ERROR("H1C task allocation failure", H1_EV_H1C_NEW|H1_EV_H1C_END|H1_EV_H1C_ERR);
goto fail;
@ -3738,7 +3738,7 @@ static int h1_takeover(struct connection *conn, int orig_tid)
__ha_barrier_store();
task_kill(task);
h1c->task = task_new(tid_bit);
h1c->task = task_new_here();
if (!h1c->task) {
h1_release(h1c);
return -1;

View File

@ -945,7 +945,7 @@ static int h2_init(struct connection *conn, struct proxy *prx, struct session *s
h2c->proxy = prx;
h2c->task = NULL;
if (tick_isset(h2c->timeout)) {
t = task_new(tid_bit);
t = task_new_here();
if (!t)
goto fail;
@ -6636,7 +6636,7 @@ static int h2_takeover(struct connection *conn, int orig_tid)
__ha_barrier_store();
task_kill(task);
h2c->task = task_new(tid_bit);
h2c->task = task_new_here();
if (!h2c->task) {
h2_release(h2c);
return -1;

View File

@ -602,7 +602,7 @@ static int qc_init(struct connection *conn, struct proxy *prx,
qcc->proxy = prx;
qcc->task = NULL;
if (tick_isset(qcc->timeout)) {
t = task_new(tid_bit);
t = task_new_here();
if (!t)
goto fail;
@ -2107,7 +2107,7 @@ static int qc_takeover(struct connection *conn, int orig_tid)
__ha_barrier_store();
task_kill(task);
qcc->task = task_new(tid_bit);
qcc->task = task_new_here();
if (!qcc->task) {
qc_release(qcc);
return -1;

View File

@ -3503,7 +3503,7 @@ int peers_init_sync(struct peers *peers)
peers->peers_fe->maxconn += 3;
}
peers->sync_task = task_new(MAX_THREADS_MASK);
peers->sync_task = task_new_anywhere();
if (!peers->sync_task)
return 0;

View File

@ -2039,7 +2039,7 @@ static void do_soft_stop_now()
/* schedule a hard-stop after a delay if needed */
if (tick_isset(global.hard_stop_after)) {
task = task_new(MAX_THREADS_MASK);
task = task_new_anywhere();
if (task) {
task->process = hard_stop;
task_schedule(task, tick_add(now_ms, global.hard_stop_after));
@ -2077,7 +2077,7 @@ void soft_stop(void)
stopping = 1;
if (tick_isset(global.grace_delay)) {
task = task_new(MAX_THREADS_MASK);
task = task_new_anywhere();
if (task) {
ha_notice("Scheduling a soft-stop in %u ms.\n", global.grace_delay);
send_log(NULL, LOG_WARNING, "Scheduling a soft-stop in %u ms.\n", global.grace_delay);

View File

@ -2412,7 +2412,7 @@ static int resolvers_finalize_config(void)
}
/* Create the task associated to the resolvers section */
if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
if ((t = task_new_anywhere()) == NULL) {
ha_alert("resolvers '%s' : out of memory.\n", resolvers->id);
err_code |= (ERR_ALERT|ERR_ABORT);
goto err;
@ -2453,7 +2453,7 @@ static int resolvers_finalize_config(void)
}
}
srv->srvrq_check = task_new(MAX_THREADS_MASK);
srv->srvrq_check = task_new_anywhere();
if (!srv->srvrq_check) {
ha_alert("%s '%s' : unable to create SRVRQ task for server '%s'.\n",
proxy_type_str(px), px->id, srv->id);

View File

@ -4480,7 +4480,7 @@ static int init_srv_slowstart(struct server *srv)
struct task *t;
if (srv->slowstart) {
if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
if ((t = task_new_anywhere()) == NULL) {
ha_alert("Cannot activate slowstart for server %s/%s: out of memory.\n", srv->proxy->id, srv->id);
return ERR_ALERT | ERR_FATAL;
}

View File

@ -248,7 +248,7 @@ int session_accept_fd(struct connection *cli_conn)
* conn -- owner ---> task <-----+
*/
if (cli_conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS)) {
if (unlikely((sess->task = task_new(tid_bit)) == NULL))
if (unlikely((sess->task = task_new_here()) == NULL))
goto out_free_sess;
sess->task->context = sess;

View File

@ -731,7 +731,7 @@ static struct task *process_sink_forward(struct task * task, void *context, unsi
*/
int sink_init_forward(struct sink *sink)
{
sink->forward_task = task_new(MAX_THREADS_MASK);
sink->forward_task = task_new_anywhere();
if (!sink->forward_task)
return 0;

View File

@ -648,7 +648,7 @@ int stktable_init(struct stktable *t)
t->exp_next = TICK_ETERNITY;
if ( t->expire ) {
t->exp_task = task_new(MAX_THREADS_MASK);
t->exp_task = task_new_anywhere();
if (!t->exp_task)
return 0;
t->exp_task->process = process_table_expire;

View File

@ -429,7 +429,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin, struct bu
s->pcli_flags = 0;
s->unique_id = IST_NULL;
if ((t = task_new(tid_bit)) == NULL)
if ((t = task_new_here()) == NULL)
goto out_fail_alloc;
s->task = t;

View File

@ -3046,7 +3046,7 @@ static struct quic_conn *qc_new_conn(unsigned int version, int ipv4,
*/
static int quic_conn_init_timer(struct quic_conn *qc)
{
qc->timer_task = task_new(MAX_THREADS_MASK);
qc->timer_task = task_new_anywhere();
if (!qc->timer_task)
return 0;