mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-01-31 02:22:07 +00:00
MEDIUM: hlua/event_hdl: initial support for event handlers
Now that the event handler API is pretty mature, we can expose it in the lua API. Introducing the core.event_sub(<event_types>, <cb>) lua function that takes an array of event types <event_types> as well as a callback function <cb> as argument. The function returns a subscription <sub> on success. Subscription <sub> allows you to manage the subscription from anywhere in the script. To this day only the sub->unsub method is implemented. The following event types are currently supported: - "SERVER_ADD": when a server is added - "SERVER_DEL": when a server is removed from haproxy - "SERVER_DOWN": server states goes from up to down - "SERVER_UP": server states goes from down to up As for the <cb> function: it will be called when one of the registered event types occur. The function will be called with 3 arguments: cb(<event>,<data>,<sub>) <event>: event type (string) that triggered the function. (could be any of the types used in <event_types> when registering the subscription) <data>: data associated with the event (specific to each event family). For "SERVER_" family events, server details such as server name/id/proxy will be provided. If the server still exists (not yet deleted), a reference to the live server is provided to spare you from an additionnal lookup if you need to have direct access to the server from lua. <sub> refers to the subscription. In case you need to manage it from within an event handler. (It refers to the same subscription that the one returned from core.event_sub()) Subscriptions are per-thread: the thread that will be handling the event is the one who performed the subscription using core.event_sub() function. Each thread treats events sequentially, it means that if you have, let's say SERVER_UP, then SERVER_DOWN in a short timelapse, then your cb function will first be called with SERVER_UP, and once you're done handling the event, your function will be called again with SERVER_DOWN. This is to ensure event consitency when it comes to logging / triggering logic from lua. Your lua cb function may yield if needed, but you're pleased to process the event as fast as possible to prevent the event queue from growing up To prevent abuses, if the event queue for the current subscription goes over 100 unconsumed events, the subscription will pause itself automatically for as long as it takes for your handler to catch up. This would lead to events being missed, so a warning will be emitted in the logs to inform you about that. This is not something you want to let happen too often, it may indicate that you subscribed to an event that is occurring too frequently or/and that your callback function is too slow to keep up the pace and you should review it. If you want to do some parallel processing because your callback functions are slow: you might want to create subtasks from lua using core.register_task() from within your callback function to perform the heavy job in a dedicated task and allow remaining events to be processed more quickly. Please check the lua documentation for more information.
This commit is contained in:
parent
4e5e26641d
commit
c84899c636
@ -20,7 +20,7 @@ is the **initialisation mode**, and the second is the **runtime mode**.
|
||||
the Lua code seems to be run in blocking, but it is not the case.
|
||||
|
||||
The Lua code is loaded in one or more files. These files contains main code and
|
||||
functions. Lua have 7 execution context.
|
||||
functions. Lua has 8 execution contexts.
|
||||
|
||||
1. The Lua file **body context**. It is executed during the load of the Lua file
|
||||
in the HAProxy `[global]` section with the directive `lua-load`. It is
|
||||
@ -72,6 +72,9 @@ functions. Lua have 7 execution context.
|
||||
`core.register_filter()`. Each declared filter is prefixed by the string
|
||||
"lua.".
|
||||
|
||||
8. The **event context**: Inside a function that handles events subscribed
|
||||
through `core.event_sub()` or `Server.event_sub()`.
|
||||
|
||||
|
||||
HAProxy Lua Hello world
|
||||
-----------------------
|
||||
@ -726,7 +729,7 @@ Core class
|
||||
|
||||
.. js:function:: core.register_task(func[, arg1[, arg2[, ...[, arg4]]]])
|
||||
|
||||
**context**: body, init, task, action, sample-fetch, converter
|
||||
**context**: body, init, task, action, sample-fetch, converter, event
|
||||
|
||||
Register and start independent task. The task is started when the HAProxy
|
||||
main scheduler starts. For example this type of tasks can be executed to
|
||||
@ -912,6 +915,70 @@ Core class
|
||||
]
|
||||
..
|
||||
|
||||
.. js:function:: core.event_sub(event_types, func)
|
||||
|
||||
**context**: body, init, task, action, sample-fetch, converter
|
||||
|
||||
Register a function that will be called on specific system events.
|
||||
|
||||
:param array event_types: array of string containing the event types you want to subscribe to
|
||||
:param function func: is the Lua function called when one of the subscribed events occur.
|
||||
:returns: A :ref:`event_sub_class` object.
|
||||
|
||||
List of available event types :
|
||||
|
||||
**SERVER** Family:
|
||||
|
||||
* **SERVER_ADD**: when a server is added
|
||||
* **SERVER_DEL**: when a server is removed
|
||||
* **SERVER_DOWN**: when a server state goes from UP to DOWN
|
||||
* **SERVER_UP**: when a server state goes from DOWN to UP
|
||||
|
||||
.. Note::
|
||||
You may also use **SERVER** in **event_types** to subscribe to all server events types at once.
|
||||
|
||||
The prototype of the Lua function used as argument is:
|
||||
|
||||
.. code-block:: lua
|
||||
|
||||
function(event, event_data, sub)
|
||||
..
|
||||
|
||||
* **event** (*string*): the event type (one of the **event_types** you specified when subscribing)
|
||||
* **event_data**: specific to each event family (For **SERVER** family, a :ref:`server_event_class` object)
|
||||
* **sub**: class to manage the subscription from within the event (a :ref:`event_sub_class` object)
|
||||
|
||||
.. Warning::
|
||||
The callback function will only be scheduled on the very same thread that
|
||||
performed the subscription.
|
||||
|
||||
Moreover, each thread treats events sequentially. It means that if you have,
|
||||
let's say SERVER_UP followed by a SERVER_DOWN in a short timelapse, then
|
||||
the cb function will first be called with SERVER_UP, and once it's done
|
||||
handling the event, the cb function will be called again with SERVER_DOWN.
|
||||
|
||||
This is to ensure event consistency when it comes to logging / triggering logic
|
||||
from lua.
|
||||
|
||||
Your lua cb function may yield if needed, but you're pleased to process the
|
||||
event as fast as possible to prevent the event queue from growing up, depending
|
||||
on the event flow that is expected for the given subscription.
|
||||
|
||||
To prevent abuses, if the event queue for the current subscription goes over
|
||||
a certain amount of unconsumed events, the subscription will pause itself
|
||||
automatically for as long as it takes for your handler to catch up. This would
|
||||
lead to events being missed, so an error will be reported in the logs to warn
|
||||
you about that.
|
||||
This is not something you want to let happen too often, it may indicate that
|
||||
you subscribed to an event that is occurring too frequently or/and that your
|
||||
callback function is too slow to keep up the pace and you should review it.
|
||||
|
||||
If you want to do some parallel processing because your callback functions are
|
||||
slow: you might want to create subtasks from lua using
|
||||
:js:func:`core.register_task()` from within your callback function to perform
|
||||
the heavy job in a dedicated task and allow remaining events to be processed
|
||||
more quickly.
|
||||
|
||||
.. _proxy_class:
|
||||
|
||||
Proxy class
|
||||
@ -1244,6 +1311,44 @@ Listener class
|
||||
manipulated listener.
|
||||
:returns: a key/value table containing stats
|
||||
|
||||
.. _event_sub_class:
|
||||
|
||||
EventSub class
|
||||
==============
|
||||
|
||||
.. js:function:: EventSub.unsub()
|
||||
|
||||
End the subscription, the callback function will not be called again.
|
||||
|
||||
.. _server_event_class:
|
||||
|
||||
ServerEvent class
|
||||
=================
|
||||
|
||||
.. js:attribute:: ServerEvent.name
|
||||
|
||||
Contains the name of the server.
|
||||
|
||||
.. js:attribute:: ServerEvent.puid
|
||||
|
||||
Contains the proxy-unique uid of the server
|
||||
|
||||
.. js:attribute:: ServerEvent.rid
|
||||
|
||||
Contains the revision ID of the server
|
||||
|
||||
.. js:attribute:: ServerEvent.proxy_name
|
||||
|
||||
Contains the name of the proxy to which the server belongs
|
||||
|
||||
.. js:attribute:: ServerEvent.reference
|
||||
|
||||
Reference to the live server (A :ref:`server_class`).
|
||||
|
||||
.. Warning::
|
||||
Not available if the server was removed in the meantime.
|
||||
(Will never be set for SERVER_DEL event since the server does not exist anymore)
|
||||
|
||||
.. _concat_class:
|
||||
|
||||
Concat class
|
||||
|
@ -34,6 +34,7 @@
|
||||
#include <haproxy/server-t.h>
|
||||
#include <haproxy/stick_table-t.h>
|
||||
#include <haproxy/xref-t.h>
|
||||
#include <haproxy/event_hdl-t.h>
|
||||
|
||||
#define CLASS_CORE "Core"
|
||||
#define CLASS_TXN "TXN"
|
||||
@ -50,6 +51,7 @@
|
||||
#define CLASS_PROXY "Proxy"
|
||||
#define CLASS_SERVER "Server"
|
||||
#define CLASS_LISTENER "Listener"
|
||||
#define CLASS_EVENT_SUB "EventSub"
|
||||
#define CLASS_REGEX "Regex"
|
||||
#define CLASS_STKTABLE "StickTable"
|
||||
#define CLASS_CERTCACHE "CertCache"
|
||||
|
@ -53,6 +53,7 @@ void hlua_init();
|
||||
int hlua_post_init();
|
||||
void hlua_applet_tcp_fct(struct appctx *ctx);
|
||||
void hlua_applet_http_fct(struct appctx *ctx);
|
||||
int hlua_event_sub(lua_State *L, event_hdl_sub_list *sub_list);
|
||||
struct task *hlua_process_task(struct task *task, void *context, unsigned int state);
|
||||
const char *hlua_show_current_location(const char *pfx);
|
||||
|
||||
|
@ -23,6 +23,7 @@
|
||||
#define _HAPROXY_HLUA_FCN_H
|
||||
|
||||
#include <lua.h>
|
||||
#include <haproxy/hlua-t.h>
|
||||
|
||||
int hlua_checkboolean(lua_State *L, int index);
|
||||
|
||||
@ -33,5 +34,7 @@ void *hlua_checkudata(lua_State *L, int ud, int class_ref);
|
||||
int hlua_register_metatable(struct lua_State *L, char *name);
|
||||
void hlua_fcn_reg_core_fcn(lua_State *L);
|
||||
int hlua_dump_object(lua_State *L);
|
||||
int hlua_fcn_new_server(lua_State *L, struct server *srv);
|
||||
int hlua_fcn_new_event_sub(lua_State *L, struct event_hdl_sub *sub);
|
||||
|
||||
#endif /* _HAPROXY_HLUA_FCN_H */
|
||||
|
490
src/hlua.c
490
src/hlua.c
@ -65,7 +65,7 @@
|
||||
#include <haproxy/tools.h>
|
||||
#include <haproxy/vars.h>
|
||||
#include <haproxy/xref.h>
|
||||
|
||||
#include <haproxy/event_hdl.h>
|
||||
|
||||
/* Lua uses longjmp to perform yield or throwing errors. This
|
||||
* macro is used only for identifying the function that can
|
||||
@ -394,6 +394,22 @@ struct hlua_mem_allocator {
|
||||
|
||||
static struct hlua_mem_allocator hlua_global_allocator THREAD_ALIGNED(64);
|
||||
|
||||
/* hlua event subscription */
|
||||
struct hlua_event_sub {
|
||||
int fcn_ref;
|
||||
int state_id;
|
||||
struct hlua *hlua;
|
||||
struct task *task;
|
||||
event_hdl_async_equeue equeue;
|
||||
struct event_hdl_sub *sub;
|
||||
uint8_t paused;
|
||||
};
|
||||
|
||||
/* This is the memory pool containing struct hlua_event_sub
|
||||
* for event subscriptions from lua
|
||||
*/
|
||||
DECLARE_STATIC_POOL(pool_head_hlua_event_sub, "hlua_esub", sizeof(struct hlua_event_sub));
|
||||
|
||||
/* These functions converts types between HAProxy internal args or
|
||||
* sample and LUA types. Another function permits to check if the
|
||||
* LUA stack contains arguments according with an required ARG_T
|
||||
@ -8787,6 +8803,477 @@ static int hlua_register_task(lua_State *L)
|
||||
return 0; /* Never reached */
|
||||
}
|
||||
|
||||
/* called from unsafe location */
|
||||
static void hlua_event_subscription_destroy(struct hlua_event_sub *hlua_sub)
|
||||
{
|
||||
/* hlua cleanup */
|
||||
|
||||
hlua_lock(hlua_sub->hlua);
|
||||
/* registry is shared between coroutines */
|
||||
hlua_unref(hlua_sub->hlua->T, hlua_sub->fcn_ref);
|
||||
hlua_unlock(hlua_sub->hlua);
|
||||
|
||||
hlua_ctx_destroy(hlua_sub->hlua);
|
||||
|
||||
/* free */
|
||||
pool_free(pool_head_hlua_event_sub, hlua_sub);
|
||||
}
|
||||
|
||||
/* single event handler: hlua ctx is shared between multiple events handlers
|
||||
* issued from the same subscription. Thus, it is not destroyed when the event
|
||||
* is processed: it is destroyed when no more events are expected for the
|
||||
* subscription (ie: when the subscription ends).
|
||||
*
|
||||
* Moreover, events are processed sequentially within the subscription:
|
||||
* one event must be fully processed before another one may be processed.
|
||||
* This ensures proper consistency for lua event handling from an ordering
|
||||
* point of view. This is especially useful with server events for example
|
||||
* where ADD/DEL/UP/DOWN events ordering really matters to trigger specific
|
||||
* actions from lua (e.g.: sending emails or making API calls).
|
||||
*
|
||||
* Due to this design, each lua event handler is pleased to process the event
|
||||
* as fast as possible to prevent the event queue from growing up.
|
||||
* Strictly speaking, there is no runtime limit for the callback function
|
||||
* (timeout set to default task timeout), but if the event queue goes past
|
||||
* the limit of unconsumed events an error will be reported and the
|
||||
* susbscription will pause itself for as long as it takes for the handler to
|
||||
* catch up (events will be lost as a result).
|
||||
* If the event handler does not need the sequential ordering and wants to
|
||||
* process multiple events at a time, it may spawn a new side-task using
|
||||
* 'core.register_task' to delegate the event handling and make parallel event
|
||||
* processing within the same subscription set.
|
||||
*/
|
||||
static void hlua_event_handler(struct hlua *hlua)
|
||||
{
|
||||
enum hlua_exec status;
|
||||
|
||||
/* If it is the first call to the task, we must initialize the
|
||||
* execution timeouts.
|
||||
*/
|
||||
if (!HLUA_IS_RUNNING(hlua)) {
|
||||
hlua->max_time = hlua_timeout_task;
|
||||
}
|
||||
|
||||
/* make sure to reset the task expiry before each hlua_ctx_resume()
|
||||
* since the task is re-used for multiple cb function calls
|
||||
* We couldn't risk to have t->expire pointing to a past date because
|
||||
* it was set during last function invocation but was never reset since
|
||||
* (ie: E_AGAIN)
|
||||
*/
|
||||
hlua->task->expire = TICK_ETERNITY;
|
||||
|
||||
/* Execute the Lua code. */
|
||||
status = hlua_ctx_resume(hlua, 1);
|
||||
|
||||
switch (status) {
|
||||
/* finished or yield */
|
||||
case HLUA_E_OK:
|
||||
break;
|
||||
|
||||
case HLUA_E_AGAIN: /* co process or timeout wake me later. */
|
||||
notification_gc(&hlua->com);
|
||||
hlua->task->expire = hlua->wake_time;
|
||||
break;
|
||||
|
||||
/* finished with error. */
|
||||
case HLUA_E_ETMOUT:
|
||||
SEND_ERR(NULL, "Lua event_hdl: execution timeout.\n");
|
||||
break;
|
||||
|
||||
case HLUA_E_ERRMSG:
|
||||
SEND_ERR(NULL, "Lua event_hdl: %s.\n", lua_tostring(hlua->T, -1));
|
||||
break;
|
||||
|
||||
case HLUA_E_ERR:
|
||||
default:
|
||||
SEND_ERR(NULL, "Lua event_hdl: unknown error.\n");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
__LJMP static int hlua_event_hdl_cb_data_push_args(struct hlua_event_sub *hlua_sub,
|
||||
struct event_hdl_sub_type event, void *data)
|
||||
{
|
||||
struct hlua *hlua = hlua_sub->hlua;
|
||||
|
||||
hlua->nargs = 1;
|
||||
lua_pushstring(hlua->T, event_hdl_sub_type_to_string(event));
|
||||
if (event_hdl_sub_family_equal(EVENT_HDL_SUB_SERVER, event)) {
|
||||
struct event_hdl_cb_data_server *e_server = data;
|
||||
struct proxy *px;
|
||||
struct server *server;
|
||||
|
||||
hlua->nargs += 1;
|
||||
lua_newtable(hlua->T);
|
||||
/* Add server name */
|
||||
lua_pushstring(hlua->T, "name");
|
||||
lua_pushstring(hlua->T, e_server->safe.name);
|
||||
lua_settable(hlua->T, -3);
|
||||
/* Add server puid */
|
||||
lua_pushstring(hlua->T, "puid");
|
||||
lua_pushinteger(hlua->T, e_server->safe.puid);
|
||||
lua_settable(hlua->T, -3);
|
||||
/* Add server rid */
|
||||
lua_pushstring(hlua->T, "rid");
|
||||
lua_pushinteger(hlua->T, e_server->safe.rid);
|
||||
lua_settable(hlua->T, -3);
|
||||
/* Add server proxy name */
|
||||
lua_pushstring(hlua->T, "proxy_name");
|
||||
lua_pushstring(hlua->T, e_server->safe.proxy_name);
|
||||
lua_settable(hlua->T, -3);
|
||||
|
||||
/* attempt to provide reference server object
|
||||
* (if it wasn't removed yet, SERVER_DEL will never succeed here)
|
||||
*/
|
||||
px = proxy_find_by_name(e_server->safe.proxy_name, PR_CAP_BE, 0);
|
||||
BUG_ON(!px);
|
||||
server = findserver_unique_id(px, e_server->safe.puid, e_server->safe.rid);
|
||||
if (server) {
|
||||
lua_pushstring(hlua->T, "reference");
|
||||
hlua_fcn_new_server(hlua->T, server);
|
||||
lua_settable(hlua->T, -3);
|
||||
}
|
||||
}
|
||||
/* sub mgmt */
|
||||
hlua->nargs += 1;
|
||||
hlua_fcn_new_event_sub(hlua->T, hlua_sub->sub);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* events runner: if there's an ongoing hlua event handling process, finish it
|
||||
* then, check if there are new events waiting to be processed
|
||||
* (events are processed sequentially)
|
||||
*
|
||||
* We have a safety measure to warn/guard if the event queue is growing up
|
||||
* too much due to many events being generated and lua handler is unable to
|
||||
* keep up the pace (e.g.: when the event queue grows past 100 unconsumed events).
|
||||
* TODO: make it tunable
|
||||
*/
|
||||
static struct task *hlua_event_runner(struct task *task, void *context, unsigned int state)
|
||||
{
|
||||
struct hlua_event_sub *hlua_sub = context;
|
||||
struct event_hdl_async_event *event;
|
||||
const char *error = NULL;
|
||||
|
||||
if (!hlua_sub->paused && event_hdl_async_equeue_size(&hlua_sub->equeue) > 100) {
|
||||
const char *trace;
|
||||
|
||||
/* We reached the limit of pending events in the queue: we should
|
||||
* warn the user, and temporarily pause the subscription to give a chance
|
||||
* to the handler to catch up? (it also prevents ressource shortage since
|
||||
* the queue could grow indefinitely otherwise)
|
||||
* TODO: find a way to inform the handler that it missed some events
|
||||
* (example: stats within the subscription in event_hdl api exposed via lua api?)
|
||||
*
|
||||
* Nonetheless, reaching this limit means that the handler is not fast enough
|
||||
* and/or that it subscribed to events that happen too frequently and did not
|
||||
* expect it. This could come from an inadequate design in the user's script.
|
||||
*/
|
||||
event_hdl_pause(hlua_sub->sub);
|
||||
hlua_sub->paused = 1;
|
||||
|
||||
/* The following Lua calls can fail. */
|
||||
if (!SET_SAFE_LJMP(hlua_sub->hlua))
|
||||
trace = NULL;
|
||||
trace = hlua_traceback(hlua_sub->hlua->T, ", ");
|
||||
/* At this point the execution is safe. */
|
||||
RESET_SAFE_LJMP(hlua_sub->hlua);
|
||||
|
||||
ha_warning("Lua event_hdl: pausing the subscription because the function fails to keep up the pace (%u unconsumed events): %s\n", event_hdl_async_equeue_size(&hlua_sub->equeue), ((trace) ? trace : ""));
|
||||
}
|
||||
|
||||
if (HLUA_IS_RUNNING(hlua_sub->hlua)) {
|
||||
/* ongoing hlua event handler, resume it */
|
||||
hlua_event_handler(hlua_sub->hlua);
|
||||
} else if ((event = event_hdl_async_equeue_pop(&hlua_sub->equeue))) { /* check for new events */
|
||||
if (event_hdl_sub_type_equal(event->type, EVENT_HDL_SUB_END)) {
|
||||
/* ending event: no more events to come */
|
||||
event_hdl_async_free_event(event);
|
||||
task_destroy(task);
|
||||
hlua_event_subscription_destroy(hlua_sub);
|
||||
return NULL;
|
||||
}
|
||||
/* new event: start processing it */
|
||||
|
||||
/* The following Lua calls can fail. */
|
||||
if (!SET_SAFE_LJMP(hlua_sub->hlua)) {
|
||||
if (lua_type(hlua_sub->hlua->T, -1) == LUA_TSTRING)
|
||||
error = lua_tostring(hlua_sub->hlua->T, -1);
|
||||
else
|
||||
error = "critical error";
|
||||
ha_alert("Lua event_hdl: %s.\n", error);
|
||||
goto skip_event;
|
||||
}
|
||||
|
||||
/* Check stack available size. */
|
||||
if (!lua_checkstack(hlua_sub->hlua->T, 5)) {
|
||||
ha_alert("Lua event_hdl: full stack.\n");
|
||||
RESET_SAFE_LJMP(hlua_sub->hlua);
|
||||
goto skip_event;
|
||||
}
|
||||
|
||||
/* Restore the function in the stack. */
|
||||
hlua_pushref(hlua_sub->hlua->T, hlua_sub->fcn_ref);
|
||||
|
||||
/* push args */
|
||||
hlua_sub->hlua->nargs = 0;
|
||||
if (!hlua_event_hdl_cb_data_push_args(hlua_sub, event->type, event->data)) {
|
||||
RESET_SAFE_LJMP(hlua_sub->hlua);
|
||||
goto skip_event;
|
||||
}
|
||||
|
||||
/* At this point the execution is safe. */
|
||||
RESET_SAFE_LJMP(hlua_sub->hlua);
|
||||
|
||||
/* At this point the event was successfully translated into hlua ctx,
|
||||
* or hlua error occured, so we can safely discard it
|
||||
*/
|
||||
event_hdl_async_free_event(event);
|
||||
event = NULL;
|
||||
|
||||
hlua_event_handler(hlua_sub->hlua);
|
||||
skip_event:
|
||||
if (event)
|
||||
event_hdl_async_free_event(event);
|
||||
|
||||
}
|
||||
|
||||
if (!HLUA_IS_RUNNING(hlua_sub->hlua)) {
|
||||
/* we just finished the processing of one event..
|
||||
* check for new events before becoming idle
|
||||
*/
|
||||
if (!event_hdl_async_equeue_isempty(&hlua_sub->equeue)) {
|
||||
/* more events to process, make sure the task
|
||||
* will be resumed ASAP to process pending events
|
||||
*/
|
||||
task_wakeup(task, TASK_WOKEN_OTHER);
|
||||
}
|
||||
else if (hlua_sub->paused) {
|
||||
/* empty queue, the handler catched up: resume the subscription */
|
||||
event_hdl_resume(hlua_sub->sub);
|
||||
hlua_sub->paused = 0;
|
||||
}
|
||||
}
|
||||
|
||||
return task;
|
||||
}
|
||||
|
||||
/* Must be called directly under lua protected/safe environment
|
||||
* (not from external callback)
|
||||
* <fcn_ref> should NOT be dropped after the function successfully returns:
|
||||
* it will be done automatically in hlua_event_subscription_destroy() when the
|
||||
* subscription ends.
|
||||
*
|
||||
* Returns the new subscription on success and NULL on failure (memory error)
|
||||
*/
|
||||
static struct event_hdl_sub *hlua_event_subscribe(event_hdl_sub_list *list, struct event_hdl_sub_type e_type,
|
||||
int state_id, int fcn_ref)
|
||||
{
|
||||
struct hlua_event_sub *hlua_sub;
|
||||
struct task *task = NULL;
|
||||
|
||||
hlua_sub = pool_alloc(pool_head_hlua_event_sub);
|
||||
if (!hlua_sub)
|
||||
goto mem_error;
|
||||
hlua_sub->task = NULL;
|
||||
hlua_sub->hlua = NULL;
|
||||
hlua_sub->paused = 0;
|
||||
if ((task = task_new_here()) == NULL) {
|
||||
ha_alert("out of memory while allocating hlua event task");
|
||||
goto mem_error;
|
||||
}
|
||||
task->process = hlua_event_runner;
|
||||
task->context = hlua_sub;
|
||||
event_hdl_async_equeue_init(&hlua_sub->equeue);
|
||||
hlua_sub->task = task;
|
||||
hlua_sub->fcn_ref = fcn_ref;
|
||||
hlua_sub->state_id = state_id;
|
||||
hlua_sub->hlua = pool_alloc(pool_head_hlua);
|
||||
if (!hlua_sub->hlua)
|
||||
goto mem_error;
|
||||
HLUA_INIT(hlua_sub->hlua);
|
||||
if (!hlua_ctx_init(hlua_sub->hlua, hlua_sub->state_id, task))
|
||||
goto mem_error;
|
||||
|
||||
hlua_sub->sub = event_hdl_subscribe_ptr(list, e_type,
|
||||
EVENT_HDL_ASYNC_TASK(&hlua_sub->equeue,
|
||||
task,
|
||||
hlua_sub,
|
||||
NULL));
|
||||
if (!hlua_sub->sub)
|
||||
goto mem_error;
|
||||
|
||||
return hlua_sub->sub; /* returns pointer to event_hdl_sub struct */
|
||||
|
||||
mem_error:
|
||||
if (hlua_sub) {
|
||||
if (hlua_sub->task)
|
||||
task_destroy(hlua_sub->task);
|
||||
if (hlua_sub->hlua)
|
||||
hlua_ctx_destroy(hlua_sub->hlua);
|
||||
pool_free(pool_head_hlua_event_sub, hlua_sub);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* looks for an array of strings referring to a composition of event_hdl subscription
|
||||
* types at <index> in <L> stack
|
||||
*/
|
||||
__LJMP static struct event_hdl_sub_type hlua_check_event_sub_types(lua_State *L, int index)
|
||||
{
|
||||
struct event_hdl_sub_type subscriptions;
|
||||
const char *msg;
|
||||
|
||||
if (lua_type(L, index) != LUA_TTABLE) {
|
||||
msg = lua_pushfstring(L, "table of strings expected, got %s", luaL_typename(L, index));
|
||||
luaL_argerror(L, index, msg);
|
||||
}
|
||||
|
||||
subscriptions = EVENT_HDL_SUB_NONE;
|
||||
|
||||
/* browse the argument as an array. */
|
||||
lua_pushnil(L);
|
||||
while (lua_next(L, index) != 0) {
|
||||
if (lua_type(L, -1) != LUA_TSTRING) {
|
||||
msg = lua_pushfstring(L, "table of strings expected, got %s", luaL_typename(L, index));
|
||||
luaL_argerror(L, index, msg);
|
||||
}
|
||||
|
||||
if (event_hdl_sub_type_equal(EVENT_HDL_SUB_NONE, event_hdl_string_to_sub_type(lua_tostring(L, -1)))) {
|
||||
msg = lua_pushfstring(L, "'%s' event type is unknown", lua_tostring(L, -1));
|
||||
luaL_argerror(L, index, msg);
|
||||
}
|
||||
|
||||
/* perform subscriptions |= current sub */
|
||||
subscriptions = event_hdl_sub_type_add(subscriptions, event_hdl_string_to_sub_type(lua_tostring(L, -1)));
|
||||
|
||||
/* pop the current value. */
|
||||
lua_pop(L, 1);
|
||||
}
|
||||
|
||||
return subscriptions;
|
||||
}
|
||||
|
||||
/* Wrapper for hlua_fcn_new_event_sub(): catch errors raised by
|
||||
* the function to prevent LJMP
|
||||
*
|
||||
* If no error occured, the function returns 1, else it returns 0 and
|
||||
* the error message is pushed at the top of the stack
|
||||
*/
|
||||
__LJMP static int _hlua_new_event_sub_safe(lua_State *L)
|
||||
{
|
||||
struct event_hdl_sub *sub = lua_touserdata(L, 1);
|
||||
|
||||
/* this function may raise errors */
|
||||
return MAY_LJMP(hlua_fcn_new_event_sub(L, sub));
|
||||
}
|
||||
static int hlua_new_event_sub_safe(lua_State *L, struct event_hdl_sub *sub)
|
||||
{
|
||||
if (!lua_checkstack(L, 2))
|
||||
return 0;
|
||||
lua_pushcfunction(L, _hlua_new_event_sub_safe);
|
||||
lua_pushlightuserdata(L, sub);
|
||||
switch (lua_pcall(L, 1, 1, 0)) {
|
||||
case LUA_OK:
|
||||
return 1;
|
||||
default:
|
||||
/* error was catched */
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* This function is a LUA helper used for registering lua event callbacks.
|
||||
* It expects an event subscription array and the function to be executed
|
||||
* when subscribed events occur (stack arguments).
|
||||
* It can be called from the "init" section, "post init" or during the runtime.
|
||||
*
|
||||
* <sub_list> is the subscription list where the subscription will be attempted
|
||||
*
|
||||
* Pushes the newly allocated subscription on the stack on success
|
||||
*/
|
||||
__LJMP int hlua_event_sub(lua_State *L, event_hdl_sub_list *sub_list)
|
||||
{
|
||||
struct hlua *hlua;
|
||||
struct event_hdl_sub *sub;
|
||||
struct event_hdl_sub_type subscriptions;
|
||||
int fcn_ref;
|
||||
int state_id;
|
||||
|
||||
MAY_LJMP(check_args(L, 2, "event_sub"));
|
||||
|
||||
/* Get the reference state */
|
||||
hlua = hlua_gethlua(L);
|
||||
if (hlua)
|
||||
/* we are in runtime processing, any thread may subscribe to events:
|
||||
* subscription events will be handled by the thread who performed
|
||||
* the registration.
|
||||
*/
|
||||
state_id = hlua->state_id;
|
||||
else {
|
||||
/* we are in initialization mode, only thread 0 (actual calling thread)
|
||||
* may subscribe to events to prevent the same handler (from different lua
|
||||
* stacks) from being registered multiple times
|
||||
*
|
||||
* hlua_state_id == 0: monostack (lua-load)
|
||||
* hlua_state_id > 0: hlua_state_id=tid+1, multi-stack (lua-load-per-thread)
|
||||
* (thus if hlua_state_id > 1, it means we are not in primary thread ctx)
|
||||
*/
|
||||
if (hlua_state_id > 1)
|
||||
return 0; /* skip registration */
|
||||
state_id = hlua_state_id;
|
||||
}
|
||||
|
||||
/* First argument : event subscriptions. */
|
||||
subscriptions = MAY_LJMP(hlua_check_event_sub_types(L, 1));
|
||||
|
||||
if (event_hdl_sub_type_equal(subscriptions, EVENT_HDL_SUB_NONE)) {
|
||||
WILL_LJMP(luaL_error(L, "event_sub: no valid event types were provided"));
|
||||
return 0; /* Never reached */
|
||||
}
|
||||
|
||||
/* Second argument : lua function. */
|
||||
fcn_ref = MAY_LJMP(hlua_checkfunction(L, 2));
|
||||
|
||||
/* try to subscribe */
|
||||
sub = hlua_event_subscribe(sub_list, subscriptions, state_id, fcn_ref);
|
||||
if (!sub) {
|
||||
hlua_unref(L, fcn_ref);
|
||||
WILL_LJMP(luaL_error(L, "event_sub: lua out of memory error"));
|
||||
return 0; /* Never reached */
|
||||
}
|
||||
|
||||
/* push the subscription to the stack
|
||||
*
|
||||
* Here we use the safe function so that lua errors will be
|
||||
* handled explicitly to prevent 'sub' from being lost
|
||||
*/
|
||||
if (!hlua_new_event_sub_safe(L, sub)) {
|
||||
/* Some events could already be pending in the handler's queue.
|
||||
* However it is wiser to cancel the subscription since we are unable to
|
||||
* provide a valid reference to it.
|
||||
* Pending events will be delivered (unless lua keeps raising errors).
|
||||
*/
|
||||
event_hdl_unsubscribe(sub); /* cancel the subscription */
|
||||
WILL_LJMP(luaL_error(L, "event_sub: cannot push the subscription (%s)", lua_tostring(L, -1)));
|
||||
return 0; /* Never reached */
|
||||
}
|
||||
event_hdl_drop(sub); /* sub has been duplicated, discard old ref */
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* This function is a LUA wrapper used for registering global lua event callbacks
|
||||
* The new subscription is pushed onto the stack on success
|
||||
* Returns the number of arguments pushed to the stack (1 for success)
|
||||
*/
|
||||
__LJMP static int hlua_event_global_sub(lua_State *L)
|
||||
{
|
||||
/* NULL <sub_list> = global subscription list */
|
||||
return MAY_LJMP(hlua_event_sub(L, NULL));
|
||||
}
|
||||
|
||||
/* Wrapper called by HAProxy to execute an LUA converter. This wrapper
|
||||
* doesn't allow "yield" functions because the HAProxy engine cannot
|
||||
* resume converters.
|
||||
@ -12271,6 +12758,7 @@ lua_State *hlua_init_state(int thread_num)
|
||||
hlua_class_function(L, "del_map", hlua_del_map);
|
||||
hlua_class_function(L, "tcp", hlua_socket_new);
|
||||
hlua_class_function(L, "httpclient", hlua_httpclient_new);
|
||||
hlua_class_function(L, "event_sub", hlua_event_global_sub);
|
||||
hlua_class_function(L, "log", hlua_log);
|
||||
hlua_class_function(L, "Debug", hlua_log_debug);
|
||||
hlua_class_function(L, "Info", hlua_log_info);
|
||||
|
@ -35,6 +35,7 @@
|
||||
#include <haproxy/server.h>
|
||||
#include <haproxy/stats.h>
|
||||
#include <haproxy/stick_table.h>
|
||||
#include <haproxy/event_hdl.h>
|
||||
#include <haproxy/stream-t.h>
|
||||
#include <haproxy/time.h>
|
||||
#include <haproxy/tools.h>
|
||||
@ -44,6 +45,7 @@ static int class_concat_ref;
|
||||
static int class_proxy_ref;
|
||||
static int class_server_ref;
|
||||
static int class_listener_ref;
|
||||
static int class_event_sub_ref;
|
||||
static int class_regex_ref;
|
||||
static int class_stktable_ref;
|
||||
static int class_proxy_list_ref;
|
||||
@ -1810,6 +1812,45 @@ void hlua_listable_proxies(lua_State *L, char capabilities)
|
||||
lua_setmetatable(L, -2);
|
||||
}
|
||||
|
||||
int hlua_event_sub_unsub(lua_State *L)
|
||||
{
|
||||
struct event_hdl_sub *sub = hlua_checkudata(L, 1, class_event_sub_ref);
|
||||
|
||||
BUG_ON(!sub);
|
||||
event_hdl_take(sub); /* keep a reference on sub until the item is GCed */
|
||||
event_hdl_unsubscribe(sub); /* will automatically call event_hdl_drop() */
|
||||
return 0;
|
||||
}
|
||||
|
||||
int hlua_event_sub_gc(lua_State *L)
|
||||
{
|
||||
struct event_hdl_sub *sub = hlua_checkudata(L, 1, class_event_sub_ref);
|
||||
|
||||
BUG_ON(!sub);
|
||||
event_hdl_drop(sub); /* final drop of the reference */
|
||||
return 0;
|
||||
}
|
||||
|
||||
int hlua_fcn_new_event_sub(lua_State *L, struct event_hdl_sub *sub)
|
||||
{
|
||||
lua_newtable(L);
|
||||
|
||||
/* Pop a class event_sub metatable and affect it to the userdata. */
|
||||
lua_rawgeti(L, LUA_REGISTRYINDEX, class_event_sub_ref);
|
||||
lua_setmetatable(L, -2);
|
||||
|
||||
lua_pushlightuserdata(L, sub);
|
||||
lua_rawseti(L, -2, 0);
|
||||
|
||||
/* userdata is affected: increment sub refcount */
|
||||
event_hdl_take(sub);
|
||||
|
||||
/* set public methods */
|
||||
hlua_class_function(L, "unsub", hlua_event_sub_unsub);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* This Lua function take a string, a list of separators.
|
||||
* It tokenize the input string using the list of separators
|
||||
* as separator.
|
||||
@ -2098,6 +2139,11 @@ void hlua_fcn_reg_core_fcn(lua_State *L)
|
||||
lua_settable(L, -3); /* -> META["__index"] = TABLE */
|
||||
class_listener_ref = hlua_register_metatable(L, CLASS_LISTENER);
|
||||
|
||||
/* Create event_sub object. */
|
||||
lua_newtable(L);
|
||||
hlua_class_function(L, "__gc", hlua_event_sub_gc);
|
||||
class_event_sub_ref = hlua_register_metatable(L, CLASS_EVENT_SUB);
|
||||
|
||||
/* Create server object. */
|
||||
lua_newtable(L);
|
||||
hlua_class_function(L, "__gc", hlua_server_gc);
|
||||
|
Loading…
Reference in New Issue
Block a user