MEDIUM: threads/filters: Add init/deinit callback per thread

Now, it is possible to define init_per_thread and deinit_per_thread callbacks to
deal with ressources allocation for each thread.

This is the filter responsibility to deal with concurrency. This is also the
filter responsibility to know if HAProxy is started with some threads. A good
way to do so is to check "global.nbthread" value. If it is greater than 1, then
_per_thread callbacks will be called.
This commit is contained in:
Christopher Faulet 2017-07-27 16:33:28 +02:00 committed by Willy Tarreau
parent e95f2c3ef5
commit 71a6a8efaa
3 changed files with 107 additions and 8 deletions

View File

@ -1,6 +1,6 @@
----------------------------------------- -----------------------------------------
Filters Guide - version 1.8 Filters Guide - version 1.8
( Last update: 2016-11-10 ) ( Last update: 2017-07-27 )
------------------------------------------ ------------------------------------------
Author : Christopher Faulet Author : Christopher Faulet
Contact : christopher dot faulet at capflam dot org Contact : christopher dot faulet at capflam dot org
@ -33,6 +33,7 @@ SUMMARY
3.1. API Overview 3.1. API Overview
3.2. Defining the filter name and its configuration 3.2. Defining the filter name and its configuration
3.3. Managing the filter lifecycle 3.3. Managing the filter lifecycle
3.3.1. Dealing with threads
3.4. Handling the streams activity 3.4. Handling the streams activity
3.5. Analyzing the channels activity 3.5. Analyzing the channels activity
3.6. Filtering the data exchanged 3.6. Filtering the data exchanged
@ -180,8 +181,10 @@ existing callbacks. Available callbacks are listed in the following structure:
* Callbacks to manage the filter lifecycle * Callbacks to manage the filter lifecycle
*/ */
int (*init) (struct proxy *p, struct flt_conf *fconf); int (*init) (struct proxy *p, struct flt_conf *fconf);
void (*deinit)(struct proxy *p, struct flt_conf *fconf); void (*deinit) (struct proxy *p, struct flt_conf *fconf);
int (*check) (struct proxy *p, struct flt_conf *fconf); int (*check) (struct proxy *p, struct flt_conf *fconf);
int (*init_per_thread) (struct proxy *p, struct flt_conf *fconf);
void (*deinit_per_thread)(struct proxy *p, struct flt_conf *fconf);
/* /*
* Stream callbacks * Stream callbacks
@ -473,7 +476,7 @@ the HAProxy configuration is fully defined. For example:
---------------------------------- ----------------------------------
Once the configuration parsed and checked, filters are ready to by used. There Once the configuration parsed and checked, filters are ready to by used. There
are two callbacks to manage the filter lifecycle: are two main callbacks to manage the filter lifecycle:
* 'flt_ops.init': It initializes the filter for a proxy. You may define this * 'flt_ops.init': It initializes the filter for a proxy. You may define this
callback if you need to complete your filter configuration. callback if you need to complete your filter configuration.
@ -514,6 +517,30 @@ TODO: Add callbacks to handle creation/destruction of filter instances. And
document it. document it.
3.3.1 DEALING WITH THREADS
--------------------------
When HAProxy is compiled with the threads support and started with more that one
thread (global.nbthread > 1), then it is possible to manage the filter per
thread with following callbacks:
* 'flt_ops.init_per_thread': It initializes the filter for each thread. It
works the same way than 'flt_ops.init' but in the
context of a thread. This callback is called
after the thread creation.
* 'flt_ops.deinit_per_thread': It cleans up what the init_per_thread callback
have done. It is called in the context of a
thread, before exiting it.
This is the filter's responsibility to deal with concurrency. check, init and
deinit callbacks are called on the main thread. All others are called on a
"worker" thread (not always the same). This is also the filter's responsibility
to know if HAProxy is started with more than one thread. If it is started with
one thread (or compiled without the threads support), these callbacks will be
silently ignored (in this case, global.nbthread will be always equal to one).
3.4. HANDLING THE STREAMS ACTIVITY 3.4. HANDLING THE STREAMS ACTIVITY
----------------------------------- -----------------------------------

View File

@ -62,6 +62,11 @@ struct flt_kw_list {
* - deinit : Cleans up what the init function has done. * - deinit : Cleans up what the init function has done.
* - check : Check the filter config for a proxy. Returns the * - check : Check the filter config for a proxy. Returns the
* number of errors encountered. * number of errors encountered.
* - init_per_thread : Initializes the filter for a proxy for a specific
* thread. Returns a negative value if an error
* occurs.
* - deinit_per_thread : Cleans up what the init_per_thread funcion has
* done.
* *
* *
* - attach : Called after a filter instance creation, when it is * - attach : Called after a filter instance creation, when it is
@ -151,8 +156,10 @@ struct flt_ops {
* Callbacks to manage the filter lifecycle * Callbacks to manage the filter lifecycle
*/ */
int (*init) (struct proxy *p, struct flt_conf *fconf); int (*init) (struct proxy *p, struct flt_conf *fconf);
void (*deinit)(struct proxy *p, struct flt_conf *fconf); void (*deinit) (struct proxy *p, struct flt_conf *fconf);
int (*check) (struct proxy *p, struct flt_conf *fconf); int (*check) (struct proxy *p, struct flt_conf *fconf);
int (*init_per_thread) (struct proxy *p, struct flt_conf *fconf);
void (*deinit_per_thread)(struct proxy *p, struct flt_conf *fconf);
/* /*
* Stream callbacks * Stream callbacks
*/ */

View File

@ -18,6 +18,7 @@
#include <common/errors.h> #include <common/errors.h>
#include <common/namespace.h> #include <common/namespace.h>
#include <common/standard.h> #include <common/standard.h>
#include <common/hathreads.h>
#include <types/filters.h> #include <types/filters.h>
#include <types/proto_http.h> #include <types/proto_http.h>
@ -263,6 +264,23 @@ flt_init(struct proxy *proxy)
return 0; return 0;
} }
/*
* Calls 'init_per_thread' callback for all filters attached to a proxy for each
* threads. This happens after the thread creation. Filters can finish to fill
* their config. Returns (ERR_ALERT|ERR_FATAL) if an error occurs, 0 otherwise.
*/
static int
flt_init_per_thread(struct proxy *proxy)
{
struct flt_conf *fconf;
list_for_each_entry(fconf, &proxy->filter_configs, list) {
if (fconf->ops->init_per_thread && fconf->ops->init_per_thread(proxy, fconf) < 0)
return ERR_ALERT|ERR_FATAL;
}
return 0;
}
/* Calls flt_init() for all proxies, see above */ /* Calls flt_init() for all proxies, see above */
static int static int
flt_init_all() flt_init_all()
@ -281,6 +299,25 @@ flt_init_all()
return 0; return 0;
} }
/* Calls flt_init_per_thread() for all proxies, see above. Be carefull here, it
* returns 0 if an error occured. This is the opposite of flt_init_all. */
static int
flt_init_all_per_thread()
{
struct proxy *px;
int err_code = 0;
for (px = proxy; px; px = px->next) {
err_code = flt_init_per_thread(px);
if (err_code & (ERR_ABORT|ERR_FATAL)) {
Alert("Failed to initialize filters for proxy '%s' for thread %u.\n",
px->id, tid);
return 0;
}
}
return 1;
}
/* /*
* Calls 'check' callback for all filters attached to a proxy. This happens * Calls 'check' callback for all filters attached to a proxy. This happens
* after the configuration parsing but before filters initialization. Returns * after the configuration parsing but before filters initialization. Returns
@ -317,6 +354,32 @@ flt_deinit(struct proxy *proxy)
} }
} }
/*
* Calls 'denit_per_thread' callback for all filters attached to a proxy for
* each threads. This happens before exiting a thread.
*/
void
flt_deinit_per_thread(struct proxy *proxy)
{
struct flt_conf *fconf, *back;
list_for_each_entry_safe(fconf, back, &proxy->filter_configs, list) {
if (fconf->ops->deinit_per_thread)
fconf->ops->deinit_per_thread(proxy, fconf);
}
}
/* Calls flt_deinit_per_thread() for all proxies, see above */
static void
flt_deinit_all_per_thread()
{
struct proxy *px;
for (px = proxy; px; px = px->next)
flt_deinit_per_thread(px);
}
/* Attaches a filter to a stream. Returns -1 if an error occurs, 0 otherwise. */ /* Attaches a filter to a stream. Returns -1 if an error occurs, 0 otherwise. */
static int static int
flt_stream_add_filter(struct stream *s, struct flt_conf *fconf, unsigned int flags) flt_stream_add_filter(struct stream *s, struct flt_conf *fconf, unsigned int flags)
@ -1124,6 +1187,8 @@ __filters_init(void)
pool2_filter = create_pool("filter", sizeof(struct filter), MEM_F_SHARED); pool2_filter = create_pool("filter", sizeof(struct filter), MEM_F_SHARED);
cfg_register_keywords(&cfg_kws); cfg_register_keywords(&cfg_kws);
hap_register_post_check(flt_init_all); hap_register_post_check(flt_init_all);
hap_register_per_thread_init(flt_init_all_per_thread);
hap_register_per_thread_deinit(flt_deinit_all_per_thread);
} }
__attribute__((destructor)) __attribute__((destructor))