mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-04-20 05:56:54 +00:00
MINOR: task: provide 3 task_new_* wrappers to simplify the API
We'll need to improve the API to pass other arguments in the future, so let's start to adapt better to the current use cases. task_new() is used: - 18 times as task_new(tid_bit) - 18 times as task_new(MAX_THREADS_MASK) - 2 times with a single bit (in a loop) - 1 in the debug code that uses a mask This patch provides 3 new functions to achieve this: - task_new_here() to create a task on the calling thread - task_new_anywhere() to create a task to be run anywhere - task_new_on() to create a task to run on a specific thread The change is trivial and will allow us to later concentrate the required adaptations to these 3 functions only. It's still possible to call task_new() if needed but a comment was added to encourage the use of the new ones instead. The debug code was not changed and still uses it.
This commit is contained in:
parent
6a2a912cb8
commit
beeabf5314
@ -68,7 +68,7 @@ static inline struct appctx *appctx_new(struct applet *applet)
|
||||
appctx->obj_type = OBJ_TYPE_APPCTX;
|
||||
appctx->applet = applet;
|
||||
appctx_init(appctx);
|
||||
appctx->t = task_new(tid_bit);
|
||||
appctx->t = task_new_here();
|
||||
if (unlikely(appctx->t == NULL)) {
|
||||
pool_free(pool_head_appctx, appctx);
|
||||
return NULL;
|
||||
|
@ -462,8 +462,9 @@ static inline struct tasklet *tasklet_new(void)
|
||||
|
||||
/*
|
||||
* Allocate and initialise a new task. The new task is returned, or NULL in
|
||||
* case of lack of memory. The task count is incremented. Tasks should only
|
||||
* be allocated this way, and must be freed using task_free().
|
||||
* case of lack of memory. The task count is incremented. This API might change
|
||||
* in the near future, so prefer one of the task_new_*() wrappers below which
|
||||
* are usually more suitable. Tasks must be freed using task_free().
|
||||
*/
|
||||
static inline struct task *task_new(unsigned long thread_mask)
|
||||
{
|
||||
@ -475,6 +476,33 @@ static inline struct task *task_new(unsigned long thread_mask)
|
||||
return t;
|
||||
}
|
||||
|
||||
/* Allocate and initialize a new task, to run on global thread <thr>. The new
|
||||
* task is returned, or NULL in case of lack of memory. It's up to the caller
|
||||
* to pass a valid thread number (in tid space, 0 to nbthread-1). The task
|
||||
* count is incremented.
|
||||
*/
|
||||
static inline struct task *task_new_on(uint thr)
|
||||
{
|
||||
return task_new(1UL << thr);
|
||||
}
|
||||
|
||||
/* Allocate and initialize a new task, to run on the calling thread. The new
|
||||
* task is returned, or NULL in case of lack of memory. The task count is
|
||||
* incremented.
|
||||
*/
|
||||
static inline struct task *task_new_here()
|
||||
{
|
||||
return task_new(tid_bit);
|
||||
}
|
||||
|
||||
/* Allocate and initialize a new task, to run on any thread. The new task is
|
||||
* returned, or NULL in case of lack of memory. The task count is incremented.
|
||||
*/
|
||||
static inline struct task *task_new_anywhere()
|
||||
{
|
||||
return task_new(MAX_THREADS_MASK);
|
||||
}
|
||||
|
||||
/*
|
||||
* Free a task. Its context must have been freed since it will be lost. The
|
||||
* task count is decremented. It it is the current task, this one is reset.
|
||||
|
@ -3680,7 +3680,7 @@ out_uri_auth_compat:
|
||||
}
|
||||
}
|
||||
|
||||
idle_conn_task = task_new(MAX_THREADS_MASK);
|
||||
idle_conn_task = task_new_anywhere();
|
||||
if (!idle_conn_task) {
|
||||
ha_alert("parsing : failed to allocate global idle connection task.\n");
|
||||
cfgerr++;
|
||||
@ -3690,7 +3690,7 @@ out_uri_auth_compat:
|
||||
idle_conn_task->context = NULL;
|
||||
|
||||
for (i = 0; i < global.nbthread; i++) {
|
||||
idle_conns[i].cleanup_task = task_new(1UL << i);
|
||||
idle_conns[i].cleanup_task = task_new_on(i);
|
||||
if (!idle_conns[i].cleanup_task) {
|
||||
ha_alert("parsing : failed to allocate idle connection tasks for thread '%d'.\n", i);
|
||||
cfgerr++;
|
||||
@ -3769,7 +3769,7 @@ out_uri_auth_compat:
|
||||
}
|
||||
|
||||
/* create the task associated with the proxy */
|
||||
curproxy->task = task_new(MAX_THREADS_MASK);
|
||||
curproxy->task = task_new_anywhere();
|
||||
if (curproxy->task) {
|
||||
curproxy->task->context = curproxy;
|
||||
curproxy->task->process = manage_proxy;
|
||||
|
@ -1388,13 +1388,14 @@ int start_check_task(struct check *check, int mininter,
|
||||
int nbcheck, int srvpos)
|
||||
{
|
||||
struct task *t;
|
||||
unsigned long thread_mask = MAX_THREADS_MASK;
|
||||
|
||||
/* task for the check. Process-based checks exclusively run on thread 1. */
|
||||
if (check->type == PR_O2_EXT_CHK)
|
||||
thread_mask = 1;
|
||||
t = task_new_on(1);
|
||||
else
|
||||
t = task_new_anywhere();
|
||||
|
||||
/* task for the check */
|
||||
if ((t = task_new(thread_mask)) == NULL) {
|
||||
if (!t) {
|
||||
ha_alert("Starting [%s:%s] check: out of memory.\n",
|
||||
check->server->proxy->id, check->server->id);
|
||||
return 0;
|
||||
|
@ -1686,7 +1686,7 @@ static struct task *mux_stopping_process(struct task *t, void *ctx, unsigned int
|
||||
static int allocate_mux_cleanup(void)
|
||||
{
|
||||
/* allocates the thread bound mux_stopping_data task */
|
||||
mux_stopping_data[tid].task = task_new(tid_bit);
|
||||
mux_stopping_data[tid].task = task_new_here();
|
||||
if (!mux_stopping_data[tid].task) {
|
||||
ha_alert("Failed to allocate the task for connection cleanup on thread %d.\n", tid);
|
||||
return 0;
|
||||
|
@ -1027,7 +1027,7 @@ struct dns_session *dns_session_new(struct dns_stream_server *dss)
|
||||
/* never fail because it is the first watcher attached to the ring */
|
||||
DISGUISE(ring_attach(&ds->ring));
|
||||
|
||||
if ((ds->task_exp = task_new(tid_bit)) == NULL)
|
||||
if ((ds->task_exp = task_new_here()) == NULL)
|
||||
goto error;
|
||||
|
||||
ds->task_exp->process = dns_process_query_exp;
|
||||
@ -1223,7 +1223,7 @@ int dns_stream_init(struct dns_nameserver *ns, struct server *srv)
|
||||
goto out;
|
||||
}
|
||||
/* Create the task associated to the resolver target handling conns */
|
||||
if ((dss->task_req = task_new(MAX_THREADS_MASK)) == NULL) {
|
||||
if ((dss->task_req = task_new_anywhere()) == NULL) {
|
||||
ha_alert("memory allocation error initializing the ring for dns tcp server '%s'.\n", srv->id);
|
||||
goto out;
|
||||
}
|
||||
@ -1240,7 +1240,7 @@ int dns_stream_init(struct dns_nameserver *ns, struct server *srv)
|
||||
}
|
||||
|
||||
/* Create the task associated to the resolver target handling conns */
|
||||
if ((dss->task_rsp = task_new(MAX_THREADS_MASK)) == NULL) {
|
||||
if ((dss->task_rsp = task_new_anywhere()) == NULL) {
|
||||
ha_alert("memory allocation error initializing the ring for dns tcp server '%s'.\n", srv->id);
|
||||
goto out;
|
||||
}
|
||||
@ -1250,7 +1250,7 @@ int dns_stream_init(struct dns_nameserver *ns, struct server *srv)
|
||||
dss->task_rsp->context = ns;
|
||||
|
||||
/* Create the task associated to the resolver target handling conns */
|
||||
if ((dss->task_idle = task_new(MAX_THREADS_MASK)) == NULL) {
|
||||
if ((dss->task_idle = task_new_anywhere()) == NULL) {
|
||||
ha_alert("memory allocation error initializing the ring for dns tcp server '%s'.\n", srv->id);
|
||||
goto out;
|
||||
}
|
||||
|
@ -1998,7 +1998,7 @@ spoe_create_appctx(struct spoe_config *conf)
|
||||
goto out_free_appctx;
|
||||
|
||||
appctx->st0 = SPOE_APPCTX_ST_CONNECT;
|
||||
if ((SPOE_APPCTX(appctx)->task = task_new(tid_bit)) == NULL)
|
||||
if ((SPOE_APPCTX(appctx)->task = task_new_here()) == NULL)
|
||||
goto out_free_spoe_appctx;
|
||||
|
||||
SPOE_APPCTX(appctx)->owner = appctx;
|
||||
|
10
src/hlua.c
10
src/hlua.c
@ -8251,9 +8251,9 @@ static int hlua_register_task(lua_State *L)
|
||||
* otherwise, inherit the current thread identifier
|
||||
*/
|
||||
if (state_id == 0)
|
||||
task = task_new(MAX_THREADS_MASK);
|
||||
task = task_new_anywhere();
|
||||
else
|
||||
task = task_new(tid_bit);
|
||||
task = task_new_here();
|
||||
if (!task)
|
||||
goto alloc_error;
|
||||
|
||||
@ -8941,7 +8941,7 @@ static int hlua_applet_tcp_init(struct appctx *ctx, struct proxy *px, struct str
|
||||
ctx->ctx.hlua_apptcp.flags = 0;
|
||||
|
||||
/* Create task used by signal to wakeup applets. */
|
||||
task = task_new(tid_bit);
|
||||
task = task_new_here();
|
||||
if (!task) {
|
||||
SEND_ERR(px, "Lua applet tcp '%s': out of memory.\n",
|
||||
ctx->rule->arg.hlua_rule->fcn->name);
|
||||
@ -9134,7 +9134,7 @@ static int hlua_applet_http_init(struct appctx *ctx, struct proxy *px, struct st
|
||||
ctx->ctx.hlua_apphttp.flags |= APPLET_HTTP11;
|
||||
|
||||
/* Create task used by signal to wakeup applets. */
|
||||
task = task_new(tid_bit);
|
||||
task = task_new_here();
|
||||
if (!task) {
|
||||
SEND_ERR(px, "Lua applet http '%s': out of memory.\n",
|
||||
ctx->rule->arg.hlua_rule->fcn->name);
|
||||
@ -9753,7 +9753,7 @@ static int hlua_cli_parse_fct(char **args, char *payload, struct appctx *appctx,
|
||||
* We use the same wakeup function than the Lua applet_tcp and
|
||||
* applet_http. It is absolutely compatible.
|
||||
*/
|
||||
appctx->ctx.hlua_cli.task = task_new(tid_bit);
|
||||
appctx->ctx.hlua_cli.task = task_new_here();
|
||||
if (!appctx->ctx.hlua_cli.task) {
|
||||
SEND_ERR(NULL, "Lua cli '%s': out of memory.\n", fcn->name);
|
||||
goto error;
|
||||
|
@ -1134,7 +1134,7 @@ void listener_release(struct listener *l)
|
||||
/* Initializes the listener queues. Returns 0 on success, otherwise ERR_* flags */
|
||||
static int listener_queue_init()
|
||||
{
|
||||
global_listener_queue_task = task_new(MAX_THREADS_MASK);
|
||||
global_listener_queue_task = task_new_anywhere();
|
||||
if (!global_listener_queue_task) {
|
||||
ha_alert("Out of memory when initializing global listener queue\n");
|
||||
return ERR_FATAL|ERR_ABORT;
|
||||
|
@ -133,7 +133,7 @@ int init_email_alert(struct mailers *mls, struct proxy *p, char **err)
|
||||
check->addr = mailer->addr;
|
||||
check->port = get_host_port(&mailer->addr);
|
||||
|
||||
if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
|
||||
if ((t = task_new_anywhere()) == NULL) {
|
||||
memprintf(err, "out of memory while allocating mailer alerts task");
|
||||
goto error;
|
||||
}
|
||||
|
@ -734,7 +734,7 @@ static int fcgi_init(struct connection *conn, struct proxy *px, struct session *
|
||||
fconn->app = app;
|
||||
fconn->task = NULL;
|
||||
if (tick_isset(fconn->timeout)) {
|
||||
t = task_new(tid_bit);
|
||||
t = task_new_here();
|
||||
if (!t) {
|
||||
TRACE_ERROR("fconn task allocation failure", FCGI_EV_FCONN_NEW|FCGI_EV_FCONN_END|FCGI_EV_FCONN_ERR);
|
||||
goto fail;
|
||||
@ -4247,7 +4247,7 @@ static int fcgi_takeover(struct connection *conn, int orig_tid)
|
||||
__ha_barrier_store();
|
||||
task_kill(task);
|
||||
|
||||
fcgi->task = task_new(tid_bit);
|
||||
fcgi->task = task_new_here();
|
||||
if (!fcgi->task) {
|
||||
fcgi_release(fcgi);
|
||||
return -1;
|
||||
|
@ -808,7 +808,7 @@ static int h1_init(struct connection *conn, struct proxy *proxy, struct session
|
||||
&h1c->conn->stopping_list);
|
||||
}
|
||||
if (tick_isset(h1c->timeout)) {
|
||||
t = task_new(tid_bit);
|
||||
t = task_new_here();
|
||||
if (!t) {
|
||||
TRACE_ERROR("H1C task allocation failure", H1_EV_H1C_NEW|H1_EV_H1C_END|H1_EV_H1C_ERR);
|
||||
goto fail;
|
||||
@ -3738,7 +3738,7 @@ static int h1_takeover(struct connection *conn, int orig_tid)
|
||||
__ha_barrier_store();
|
||||
task_kill(task);
|
||||
|
||||
h1c->task = task_new(tid_bit);
|
||||
h1c->task = task_new_here();
|
||||
if (!h1c->task) {
|
||||
h1_release(h1c);
|
||||
return -1;
|
||||
|
@ -945,7 +945,7 @@ static int h2_init(struct connection *conn, struct proxy *prx, struct session *s
|
||||
h2c->proxy = prx;
|
||||
h2c->task = NULL;
|
||||
if (tick_isset(h2c->timeout)) {
|
||||
t = task_new(tid_bit);
|
||||
t = task_new_here();
|
||||
if (!t)
|
||||
goto fail;
|
||||
|
||||
@ -6636,7 +6636,7 @@ static int h2_takeover(struct connection *conn, int orig_tid)
|
||||
__ha_barrier_store();
|
||||
task_kill(task);
|
||||
|
||||
h2c->task = task_new(tid_bit);
|
||||
h2c->task = task_new_here();
|
||||
if (!h2c->task) {
|
||||
h2_release(h2c);
|
||||
return -1;
|
||||
|
@ -602,7 +602,7 @@ static int qc_init(struct connection *conn, struct proxy *prx,
|
||||
qcc->proxy = prx;
|
||||
qcc->task = NULL;
|
||||
if (tick_isset(qcc->timeout)) {
|
||||
t = task_new(tid_bit);
|
||||
t = task_new_here();
|
||||
if (!t)
|
||||
goto fail;
|
||||
|
||||
@ -2107,7 +2107,7 @@ static int qc_takeover(struct connection *conn, int orig_tid)
|
||||
__ha_barrier_store();
|
||||
task_kill(task);
|
||||
|
||||
qcc->task = task_new(tid_bit);
|
||||
qcc->task = task_new_here();
|
||||
if (!qcc->task) {
|
||||
qc_release(qcc);
|
||||
return -1;
|
||||
|
@ -3503,7 +3503,7 @@ int peers_init_sync(struct peers *peers)
|
||||
peers->peers_fe->maxconn += 3;
|
||||
}
|
||||
|
||||
peers->sync_task = task_new(MAX_THREADS_MASK);
|
||||
peers->sync_task = task_new_anywhere();
|
||||
if (!peers->sync_task)
|
||||
return 0;
|
||||
|
||||
|
@ -2039,7 +2039,7 @@ static void do_soft_stop_now()
|
||||
|
||||
/* schedule a hard-stop after a delay if needed */
|
||||
if (tick_isset(global.hard_stop_after)) {
|
||||
task = task_new(MAX_THREADS_MASK);
|
||||
task = task_new_anywhere();
|
||||
if (task) {
|
||||
task->process = hard_stop;
|
||||
task_schedule(task, tick_add(now_ms, global.hard_stop_after));
|
||||
@ -2077,7 +2077,7 @@ void soft_stop(void)
|
||||
stopping = 1;
|
||||
|
||||
if (tick_isset(global.grace_delay)) {
|
||||
task = task_new(MAX_THREADS_MASK);
|
||||
task = task_new_anywhere();
|
||||
if (task) {
|
||||
ha_notice("Scheduling a soft-stop in %u ms.\n", global.grace_delay);
|
||||
send_log(NULL, LOG_WARNING, "Scheduling a soft-stop in %u ms.\n", global.grace_delay);
|
||||
|
@ -2412,7 +2412,7 @@ static int resolvers_finalize_config(void)
|
||||
}
|
||||
|
||||
/* Create the task associated to the resolvers section */
|
||||
if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
|
||||
if ((t = task_new_anywhere()) == NULL) {
|
||||
ha_alert("resolvers '%s' : out of memory.\n", resolvers->id);
|
||||
err_code |= (ERR_ALERT|ERR_ABORT);
|
||||
goto err;
|
||||
@ -2453,7 +2453,7 @@ static int resolvers_finalize_config(void)
|
||||
}
|
||||
}
|
||||
|
||||
srv->srvrq_check = task_new(MAX_THREADS_MASK);
|
||||
srv->srvrq_check = task_new_anywhere();
|
||||
if (!srv->srvrq_check) {
|
||||
ha_alert("%s '%s' : unable to create SRVRQ task for server '%s'.\n",
|
||||
proxy_type_str(px), px->id, srv->id);
|
||||
|
@ -4480,7 +4480,7 @@ static int init_srv_slowstart(struct server *srv)
|
||||
struct task *t;
|
||||
|
||||
if (srv->slowstart) {
|
||||
if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
|
||||
if ((t = task_new_anywhere()) == NULL) {
|
||||
ha_alert("Cannot activate slowstart for server %s/%s: out of memory.\n", srv->proxy->id, srv->id);
|
||||
return ERR_ALERT | ERR_FATAL;
|
||||
}
|
||||
|
@ -248,7 +248,7 @@ int session_accept_fd(struct connection *cli_conn)
|
||||
* conn -- owner ---> task <-----+
|
||||
*/
|
||||
if (cli_conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS)) {
|
||||
if (unlikely((sess->task = task_new(tid_bit)) == NULL))
|
||||
if (unlikely((sess->task = task_new_here()) == NULL))
|
||||
goto out_free_sess;
|
||||
|
||||
sess->task->context = sess;
|
||||
|
@ -731,7 +731,7 @@ static struct task *process_sink_forward(struct task * task, void *context, unsi
|
||||
*/
|
||||
int sink_init_forward(struct sink *sink)
|
||||
{
|
||||
sink->forward_task = task_new(MAX_THREADS_MASK);
|
||||
sink->forward_task = task_new_anywhere();
|
||||
if (!sink->forward_task)
|
||||
return 0;
|
||||
|
||||
|
@ -648,7 +648,7 @@ int stktable_init(struct stktable *t)
|
||||
|
||||
t->exp_next = TICK_ETERNITY;
|
||||
if ( t->expire ) {
|
||||
t->exp_task = task_new(MAX_THREADS_MASK);
|
||||
t->exp_task = task_new_anywhere();
|
||||
if (!t->exp_task)
|
||||
return 0;
|
||||
t->exp_task->process = process_table_expire;
|
||||
|
@ -429,7 +429,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin, struct bu
|
||||
s->pcli_flags = 0;
|
||||
s->unique_id = IST_NULL;
|
||||
|
||||
if ((t = task_new(tid_bit)) == NULL)
|
||||
if ((t = task_new_here()) == NULL)
|
||||
goto out_fail_alloc;
|
||||
|
||||
s->task = t;
|
||||
|
@ -3046,7 +3046,7 @@ static struct quic_conn *qc_new_conn(unsigned int version, int ipv4,
|
||||
*/
|
||||
static int quic_conn_init_timer(struct quic_conn *qc)
|
||||
{
|
||||
qc->timer_task = task_new(MAX_THREADS_MASK);
|
||||
qc->timer_task = task_new_anywhere();
|
||||
if (!qc->timer_task)
|
||||
return 0;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user