mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-04-11 03:31:36 +00:00
BUG/MINOR: event_hdl: make event_hdl_subscribe thread-safe
List insertion in event_hdl_subscribe() was not thread-safe when dealing
with unique identifiers. Indeed, in this case the list insertion is
conditional (we check for a duplicate, then we insert). And while we're
using mt lists for this, the whole operation is not atomic: there is a
race between the check and the insertion.
This could lead to the same ID being registered multiple times with
concurrent calls to event_hdl_subscribe() on the same ID.
To fix this, we add 'insert_lock' dedicated lock in the subscription
list struct. The lock's cost is nearly 0 since it is only used when
registering identified subscriptions and the lock window is very short:
we only guard the duplicate check and the list insertion to make the
conditional insertion "atomic" within a given subscription list.
This is the only place where we need the lock: as soon as the item is
properly inserted we're out of trouble because all other operations on
the list are already thread-safe thanks to mt lists.
A new lock hint is introduced: LOCK_EHDL which is dedicated to event_hdl
The patch may seem quite large since we had to rework the logic around
the subscribe function and switch from simple mt_list to a dedicated
struct wrapping both the mt_list and the insert_lock for the
event_hdl_sub_list type.
(sizeof(event_hdl_sub_list) is now 24 instead of 16)
However, all the changes are internal: we don't break the API.
If 68e692da0
("MINOR: event_hdl: add event handler base api")
is being backported, then this commit should be backported with it.
This commit is contained in:
parent
53eb6aecce
commit
d514ca45c6
@ -24,7 +24,8 @@
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#include <haproxy/list-t.h>
|
||||
#include <haproxy/api-t.h>
|
||||
#include <haproxy/thread-t.h>
|
||||
|
||||
/* event data struct are defined as followed */
|
||||
struct event_hdl_cb_data_template {
|
||||
@ -72,8 +73,13 @@ struct event_hdl_sub_type
|
||||
uint16_t subtype;
|
||||
};
|
||||
|
||||
/* event_hdl_sub_list is an alias to mt_list (please use this for portability) */
|
||||
typedef struct mt_list event_hdl_sub_list;
|
||||
struct event_hdl_sub_list_head {
|
||||
struct mt_list head;
|
||||
__decl_thread(HA_SPINLOCK_T insert_lock);
|
||||
};
|
||||
|
||||
/* event_hdl_sub_list is an alias (please use this for portability) */
|
||||
typedef struct event_hdl_sub_list_head event_hdl_sub_list;
|
||||
/* event_hdl_async_equeue is an alias to mt_list (please use this for portability) */
|
||||
typedef struct mt_list event_hdl_async_equeue;
|
||||
|
||||
|
@ -221,6 +221,14 @@ uint64_t event_hdl_id(const char *scope, const char *name);
|
||||
* If <sub_list> is not specified (equals NULL):
|
||||
* global subscription list (process wide) will be used.
|
||||
*
|
||||
* For identified subscriptions (EVENT_HDL_ID_*), the function is safe against
|
||||
* concurrent subscriptions attempts with the same ID: the ID will only be
|
||||
* inserted once in the list and subsequent attempts will yield an error.
|
||||
* However, trying to register the same ID multiple times is considered as
|
||||
* an error (no specific error code is returned in this case) so the check should
|
||||
* be performed by the caller if it is expected. (The caller must ensure that the ID
|
||||
* is unique to prevent the error from being raised)
|
||||
*
|
||||
* Returns 1 in case of success, 0 in case of failure (invalid argument / memory error)
|
||||
*/
|
||||
int event_hdl_subscribe(event_hdl_sub_list *sub_list,
|
||||
@ -422,7 +430,8 @@ static inline struct event_hdl_async_event *event_hdl_async_equeue_pop(event_hdl
|
||||
*/
|
||||
static inline void event_hdl_sub_list_init(event_hdl_sub_list *sub_list)
|
||||
{
|
||||
MT_LIST_INIT(sub_list);
|
||||
MT_LIST_INIT(&sub_list->head);
|
||||
HA_SPIN_INIT(&sub_list->insert_lock);
|
||||
}
|
||||
|
||||
/* use this function when you need to destroy <sub_list>
|
||||
|
@ -425,6 +425,7 @@ enum lock_label {
|
||||
IDLE_CONNS_LOCK,
|
||||
QUIC_LOCK,
|
||||
OCSP_LOCK,
|
||||
EHDL_LOCK,
|
||||
OTHER_LOCK,
|
||||
/* WT: make sure never to use these ones outside of development,
|
||||
* we need them for lock profiling!
|
||||
|
114
src/event_hdl.c
114
src/event_hdl.c
@ -43,14 +43,20 @@ DECLARE_STATIC_POOL(pool_head_sub_event, "ehdl_sub_e", sizeof(struct event_hdl_a
|
||||
DECLARE_STATIC_POOL(pool_head_sub_event_data, "ehdl_sub_ed", sizeof(struct event_hdl_async_event_data));
|
||||
DECLARE_STATIC_POOL(pool_head_sub_taskctx, "ehdl_sub_tctx", sizeof(struct event_hdl_async_task_default_ctx));
|
||||
|
||||
/* global subscription list (implicit where NULL is used as sublist argument) */
|
||||
static struct mt_list global_event_hdl_sub_list = MT_LIST_HEAD_INIT(global_event_hdl_sub_list);
|
||||
|
||||
/* TODO: will become a config tunable
|
||||
* ie: tune.events.max-async-notif-at-once
|
||||
*/
|
||||
static int event_hdl_async_max_notif_at_once = 10;
|
||||
|
||||
/* global subscription list (implicit where NULL is used as sublist argument) */
|
||||
static event_hdl_sub_list global_event_hdl_sub_list;
|
||||
|
||||
static void event_hdl_init(void)
|
||||
{
|
||||
/* initialize global subscription list */
|
||||
event_hdl_sub_list_init(&global_event_hdl_sub_list);
|
||||
}
|
||||
|
||||
/* general purpose hashing function when you want to compute
|
||||
* an ID based on <scope> x <name>
|
||||
* It is your responsibility to make sure <scope> is not used
|
||||
@ -372,10 +378,9 @@ static void event_hdl_unsubscribe_async(const struct event_hdl_sub_mgmt *mgmt)
|
||||
struct event_hdl_sub *event_hdl_subscribe_ptr(event_hdl_sub_list *sub_list,
|
||||
struct event_hdl_sub_type e_type, struct event_hdl hdl)
|
||||
{
|
||||
struct event_hdl_sub *new_sub;
|
||||
struct event_hdl_sub *new_sub = NULL;
|
||||
struct mt_list *elt1, elt2;
|
||||
uint8_t found = 0;
|
||||
struct event_hdl_async_task_default_ctx *task_ctx;
|
||||
struct event_hdl_async_task_default_ctx *task_ctx = NULL;
|
||||
|
||||
if (!sub_list)
|
||||
sub_list = &global_event_hdl_sub_list; /* fall back to global list */
|
||||
@ -387,26 +392,9 @@ struct event_hdl_sub *event_hdl_subscribe_ptr(event_hdl_sub_list *sub_list,
|
||||
(hdl.async == EVENT_HDL_ASYNC_MODE_ADVANCED &&
|
||||
(!hdl.async_equeue || !hdl.async_task)));
|
||||
|
||||
/* first check if such identified hdl is not already registered */
|
||||
if (hdl.id) {
|
||||
mt_list_for_each_entry_safe(new_sub, sub_list, mt_list, elt1, elt2) {
|
||||
if (hdl.id == new_sub->hdl.id) {
|
||||
/* we found matching registered hdl */
|
||||
found = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (found) {
|
||||
/* error already registered */
|
||||
event_hdl_report_hdl_state(ha_warning, &hdl, "SUB", "could not subscribe: subscription with this id already exists");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
new_sub = pool_alloc(pool_head_sub);
|
||||
if (new_sub == NULL) {
|
||||
goto new_sub_memory_error;
|
||||
goto memory_error;
|
||||
}
|
||||
|
||||
/* assignments */
|
||||
@ -419,7 +407,7 @@ struct event_hdl_sub *event_hdl_subscribe_ptr(event_hdl_sub_list *sub_list,
|
||||
new_sub->async_end = pool_alloc(pool_head_sub_event);
|
||||
if (!new_sub->async_end) {
|
||||
/* memory error */
|
||||
goto new_sub_memory_error_event_end;
|
||||
goto memory_error;
|
||||
}
|
||||
if (hdl.async == EVENT_HDL_ASYNC_MODE_NORMAL) {
|
||||
/* normal mode: no task provided, we must initialize it */
|
||||
@ -429,7 +417,7 @@ struct event_hdl_sub *event_hdl_subscribe_ptr(event_hdl_sub_list *sub_list,
|
||||
|
||||
if (!task_ctx) {
|
||||
/* memory error */
|
||||
goto new_sub_memory_error_task_ctx;
|
||||
goto memory_error;
|
||||
}
|
||||
MT_LIST_INIT(&task_ctx->e_queue);
|
||||
task_ctx->func = new_sub->hdl.async_ptr;
|
||||
@ -439,13 +427,11 @@ struct event_hdl_sub *event_hdl_subscribe_ptr(event_hdl_sub_list *sub_list,
|
||||
|
||||
if (!new_sub->hdl.async_task) {
|
||||
/* memory error */
|
||||
goto new_sub_memory_error_task;
|
||||
goto memory_error;
|
||||
}
|
||||
new_sub->hdl.async_task->context = task_ctx;
|
||||
new_sub->hdl.async_task->process = event_hdl_async_task_default;
|
||||
}
|
||||
/* registration cannot fail anymore */
|
||||
|
||||
/* initialize END event (used to notify about subscription ending)
|
||||
* used by both normal and advanced mode:
|
||||
* - to safely terminate the task in normal mode
|
||||
@ -464,32 +450,65 @@ struct event_hdl_sub *event_hdl_subscribe_ptr(event_hdl_sub_list *sub_list,
|
||||
*/
|
||||
new_sub->refcount = 2;
|
||||
|
||||
/* ready for registration */
|
||||
MT_LIST_INIT(&new_sub->mt_list);
|
||||
|
||||
/* check if such identified hdl is not already registered */
|
||||
if (hdl.id) {
|
||||
struct event_hdl_sub *cur_sub;
|
||||
uint8_t found = 0;
|
||||
|
||||
HA_SPIN_LOCK(EHDL_LOCK, &sub_list->insert_lock);
|
||||
mt_list_for_each_entry_safe(cur_sub, &sub_list->head, mt_list, elt1, elt2) {
|
||||
if (hdl.id == cur_sub->hdl.id) {
|
||||
/* we found matching registered hdl */
|
||||
found = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (found) {
|
||||
/* error already registered */
|
||||
HA_SPIN_UNLOCK(EHDL_LOCK, &sub_list->insert_lock);
|
||||
event_hdl_report_hdl_state(ha_alert, &hdl, "SUB", "could not subscribe: subscription with this id already exists");
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
/* Append in list (global or user specified list).
|
||||
* For now, append when sync mode, and insert when async mode
|
||||
* so that async handlers are executed first
|
||||
*/
|
||||
MT_LIST_INIT(&new_sub->mt_list);
|
||||
if (hdl.async) {
|
||||
/* async mode, insert at the beginning of the list */
|
||||
MT_LIST_INSERT(sub_list, &new_sub->mt_list);
|
||||
MT_LIST_INSERT(&sub_list->head, &new_sub->mt_list);
|
||||
} else {
|
||||
/* sync mode, append at the end of the list */
|
||||
MT_LIST_APPEND(sub_list, &new_sub->mt_list);
|
||||
MT_LIST_APPEND(&sub_list->head, &new_sub->mt_list);
|
||||
}
|
||||
|
||||
if (hdl.id)
|
||||
HA_SPIN_UNLOCK(EHDL_LOCK, &sub_list->insert_lock);
|
||||
|
||||
return new_sub;
|
||||
|
||||
new_sub_memory_error_task:
|
||||
pool_free(pool_head_sub_taskctx, task_ctx);
|
||||
new_sub_memory_error_task_ctx:
|
||||
pool_free(pool_head_sub_event, new_sub->async_end);
|
||||
new_sub_memory_error_event_end:
|
||||
pool_free(pool_head_sub, new_sub);
|
||||
new_sub_memory_error:
|
||||
|
||||
event_hdl_report_hdl_state(ha_warning, &hdl, "SUB", "could not register subscription due to memory error");
|
||||
cleanup:
|
||||
if (new_sub) {
|
||||
if (hdl.async == EVENT_HDL_ASYNC_MODE_NORMAL) {
|
||||
if (new_sub->hdl.async_task)
|
||||
tasklet_free(new_sub->hdl.async_task);
|
||||
if (task_ctx)
|
||||
pool_free(pool_head_sub_taskctx, task_ctx);
|
||||
}
|
||||
if (hdl.async)
|
||||
pool_free(pool_head_sub_event, new_sub->async_end);
|
||||
pool_free(pool_head_sub, new_sub);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
||||
memory_error:
|
||||
event_hdl_report_hdl_state(ha_warning, &hdl, "SUB", "could not register subscription due to memory error");
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
void event_hdl_take(struct event_hdl_sub *sub)
|
||||
@ -547,7 +566,7 @@ int event_hdl_lookup_unsubscribe(event_hdl_sub_list *sub_list,
|
||||
if (!sub_list)
|
||||
sub_list = &global_event_hdl_sub_list; /* fall back to global list */
|
||||
|
||||
mt_list_for_each_entry_safe(del_sub, sub_list, mt_list, elt1, elt2) {
|
||||
mt_list_for_each_entry_safe(del_sub, &sub_list->head, mt_list, elt1, elt2) {
|
||||
if (lookup_id == del_sub->hdl.id) {
|
||||
/* we found matching registered hdl */
|
||||
MT_LIST_DELETE_SAFE(elt1);
|
||||
@ -569,7 +588,7 @@ int event_hdl_lookup_resubscribe(event_hdl_sub_list *sub_list,
|
||||
if (!sub_list)
|
||||
sub_list = &global_event_hdl_sub_list; /* fall back to global list */
|
||||
|
||||
mt_list_for_each_entry_safe(cur_sub, sub_list, mt_list, elt1, elt2) {
|
||||
mt_list_for_each_entry_safe(cur_sub, &sub_list->head, mt_list, elt1, elt2) {
|
||||
if (lookup_id == cur_sub->hdl.id) {
|
||||
/* we found matching registered hdl */
|
||||
status = _event_hdl_resub(cur_sub, type);
|
||||
@ -589,7 +608,7 @@ struct event_hdl_sub *event_hdl_lookup_take(event_hdl_sub_list *sub_list,
|
||||
if (!sub_list)
|
||||
sub_list = &global_event_hdl_sub_list; /* fall back to global list */
|
||||
|
||||
mt_list_for_each_entry_safe(cur_sub, sub_list, mt_list, elt1, elt2) {
|
||||
mt_list_for_each_entry_safe(cur_sub, &sub_list->head, mt_list, elt1, elt2) {
|
||||
if (lookup_id == cur_sub->hdl.id) {
|
||||
/* we found matching registered hdl */
|
||||
event_hdl_take(cur_sub);
|
||||
@ -612,7 +631,7 @@ static int _event_hdl_publish(event_hdl_sub_list *sub_list, struct event_hdl_sub
|
||||
struct event_hdl_async_event_data *async_data = NULL; /* reuse async data for multiple async hdls */
|
||||
int error = 0;
|
||||
|
||||
mt_list_for_each_entry_safe(cur_sub, sub_list, mt_list, elt1, elt2) {
|
||||
mt_list_for_each_entry_safe(cur_sub, &sub_list->head, mt_list, elt1, elt2) {
|
||||
/* notify each function that has subscribed to sub_family.type */
|
||||
if ((cur_sub->sub.family == e_type.family) &&
|
||||
((cur_sub->sub.subtype & e_type.subtype) == e_type.subtype)) {
|
||||
@ -751,10 +770,13 @@ void event_hdl_sub_list_destroy(event_hdl_sub_list *sub_list)
|
||||
|
||||
if (!sub_list)
|
||||
sub_list = &global_event_hdl_sub_list; /* fall back to global list */
|
||||
mt_list_for_each_entry_safe(cur_sub, sub_list, mt_list, elt1, elt2) {
|
||||
mt_list_for_each_entry_safe(cur_sub, &sub_list->head, mt_list, elt1, elt2) {
|
||||
/* remove cur elem from list */
|
||||
MT_LIST_DELETE_SAFE(elt1);
|
||||
/* then free it */
|
||||
_event_hdl_unsubscribe(cur_sub);
|
||||
}
|
||||
HA_SPIN_DESTROY(&sub_list->insert_lock);
|
||||
}
|
||||
|
||||
INITCALL0(STG_INIT, event_hdl_init);
|
||||
|
@ -444,6 +444,7 @@ static const char *lock_label(enum lock_label label)
|
||||
case IDLE_CONNS_LOCK: return "IDLE_CONNS";
|
||||
case QUIC_LOCK: return "QUIC";
|
||||
case OCSP_LOCK: return "OCSP";
|
||||
case EHDL_LOCK: return "EVENT_HDL";
|
||||
case OTHER_LOCK: return "OTHER";
|
||||
case DEBUG1_LOCK: return "DEBUG1";
|
||||
case DEBUG2_LOCK: return "DEBUG2";
|
||||
|
Loading…
Reference in New Issue
Block a user