mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-01-17 11:10:42 +00:00
MINOR: hlua_fcn: add Queue class
Adding a new lua class: Queue. This class provides a generic FIFO storage mechanism that may be shared between multiple lua contexts to easily pass data between them, as stock Lua doesn't provide easy methods for passing data between multiple coroutines. New Queue object may be obtained using core.queue() (it works like core.concat() for a concat Class) Lua documentation was updated (including some usage examples)
This commit is contained in:
parent
fc4ec0d653
commit
86fb22c557
@ -758,6 +758,9 @@ Core class
|
||||
It takes up to 4 optional arguments (provided when registering), and no
|
||||
output is expected.
|
||||
|
||||
See also :js:func:`core.queue` to dynamically pass data between main context
|
||||
and tasks or even between tasks.
|
||||
|
||||
.. js:function:: core.register_cli([path], usage, func)
|
||||
|
||||
**context**: body
|
||||
@ -849,6 +852,14 @@ Core class
|
||||
|
||||
:returns: A :ref:`concat_class` object.
|
||||
|
||||
.. js:function:: core.queue()
|
||||
|
||||
**context**: body, init, task, event, action, sample-fetch, converter
|
||||
|
||||
This function returns a new queue object.
|
||||
|
||||
:returns: A :ref:`queue_class` object.
|
||||
|
||||
.. js:function:: core.done(data)
|
||||
|
||||
**context**: body, init, task, action, sample-fetch, converter
|
||||
@ -1758,6 +1769,83 @@ This class contains additional info related to **SERVER_ADMIN** event.
|
||||
Same as :js:attr:`ServerEventState.requeued` but when the requeue is due to
|
||||
the server administrative state change.
|
||||
|
||||
.. _queue_class:
|
||||
|
||||
Queue class
|
||||
===========
|
||||
|
||||
.. js:class:: Queue
|
||||
|
||||
This class provides a generic FIFO storage mechanism that may be shared
|
||||
between multiple lua contexts to easily pass data between them, as stock
|
||||
Lua doesn't provide easy methods for passing data between multiple coroutines.
|
||||
|
||||
inter-task example:
|
||||
|
||||
.. code-block:: lua
|
||||
|
||||
-- script wide shared queue
|
||||
local queue = core.queue()
|
||||
|
||||
-- master task
|
||||
core.register_task(function()
|
||||
-- send the date every second
|
||||
while true do
|
||||
queue:push(os.date("%c", core.now().sec))
|
||||
core.sleep(1)
|
||||
end
|
||||
end)
|
||||
|
||||
-- worker task
|
||||
core.register_task(function()
|
||||
while true do
|
||||
-- print the date sent by master
|
||||
print(queue:pop_wait())
|
||||
end
|
||||
end)
|
||||
..
|
||||
|
||||
Of course, queue may also be used as a local storage mechanism.
|
||||
|
||||
Use :js:func:`core.queue` to get a new Queue object.
|
||||
|
||||
.. js:function:: Queue.size(queue)
|
||||
|
||||
This function returns the number of items within the Queue.
|
||||
|
||||
:param class_queue queue: A :ref:`queue_class` to the current queue
|
||||
|
||||
.. js:function:: Queue.push(queue, item)
|
||||
|
||||
This function pushes the item (may be of any type) to the queue.
|
||||
Pushed item cannot be nil or invalid, or an error will be thrown.
|
||||
|
||||
:param class_queue queue: A :ref:`queue_class` to the current queue
|
||||
:returns: boolean true for success and false for error
|
||||
|
||||
.. js:function:: Queue.pop(queue)
|
||||
|
||||
This function immediately tries to pop an item from the queue.
|
||||
It returns nil of no item is available at the time of the call.
|
||||
|
||||
:param class_queue queue: A :ref:`queue_class` to the current queue
|
||||
:returns: the item at the top of the stack (any type) or nil if no items
|
||||
|
||||
.. js:function:: Queue.pop_wait(queue)
|
||||
|
||||
**context**: task
|
||||
|
||||
This is an alternative to pop() that may be used within task contexts.
|
||||
|
||||
The call waits for data if no item is currently available. This may be
|
||||
useful when used in a while loop to prevent cpu waste.
|
||||
|
||||
Note that this requires yielding, thus it is only available within contexts
|
||||
that support yielding (mainly task context).
|
||||
|
||||
:param class_queue queue: A :ref:`queue_class` to the current queue
|
||||
:returns: the item at the top of the stack (any type) or nil in case of error
|
||||
|
||||
.. _concat_class:
|
||||
|
||||
Concat class
|
||||
|
238
src/hlua_fcn.c
238
src/hlua_fcn.c
@ -43,6 +43,7 @@
|
||||
|
||||
/* Contains the class reference of the concat object. */
|
||||
static int class_concat_ref;
|
||||
static int class_queue_ref;
|
||||
static int class_proxy_ref;
|
||||
static int class_server_ref;
|
||||
static int class_listener_ref;
|
||||
@ -499,6 +500,241 @@ static void hlua_concat_init(lua_State *L)
|
||||
class_concat_ref = luaL_ref(L, LUA_REGISTRYINDEX);
|
||||
}
|
||||
|
||||
/* C backing storage for lua Queue class */
|
||||
struct hlua_queue {
|
||||
uint32_t size;
|
||||
struct mt_list list;
|
||||
struct mt_list wait_tasks;
|
||||
};
|
||||
|
||||
/* used to store lua objects in queue->list */
|
||||
struct hlua_queue_item {
|
||||
int ref; /* lua object reference id */
|
||||
struct mt_list list;
|
||||
};
|
||||
|
||||
/* used to store wait entries in queue->wait_tasks */
|
||||
struct hlua_queue_wait
|
||||
{
|
||||
struct task *task;
|
||||
struct mt_list entry;
|
||||
};
|
||||
|
||||
/* This is the memory pool containing struct hlua_queue_item (queue items)
|
||||
*/
|
||||
DECLARE_STATIC_POOL(pool_head_hlua_queue, "hlua_queue", sizeof(struct hlua_queue_item));
|
||||
|
||||
/* This is the memory pool containing struct hlua_queue_wait
|
||||
* (queue waiting tasks)
|
||||
*/
|
||||
DECLARE_STATIC_POOL(pool_head_hlua_queuew, "hlua_queuew", sizeof(struct hlua_queue_wait));
|
||||
|
||||
static struct hlua_queue *hlua_check_queue(lua_State *L, int ud)
|
||||
{
|
||||
return hlua_checkudata(L, ud, class_queue_ref);
|
||||
}
|
||||
|
||||
/* queue:size(): returns an integer containing the current number of queued
|
||||
* items.
|
||||
*/
|
||||
static int hlua_queue_size(lua_State *L)
|
||||
{
|
||||
struct hlua_queue *queue = hlua_check_queue(L, 1);
|
||||
|
||||
BUG_ON(!queue);
|
||||
lua_pushinteger(L, queue->size);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* queue:push(): push an item (any type, except nil) at the end of the queue
|
||||
*
|
||||
* Returns boolean:true for success and boolean:false on error
|
||||
*/
|
||||
static int hlua_queue_push(lua_State *L)
|
||||
{
|
||||
struct hlua_queue *queue = hlua_check_queue(L, 1);
|
||||
struct hlua_queue_item *item;
|
||||
struct mt_list *elt1, elt2;
|
||||
struct hlua_queue_wait *waiter;
|
||||
|
||||
if (lua_gettop(L) != 2 || lua_isnoneornil(L, 2)) {
|
||||
luaL_error(L, "unexpected argument");
|
||||
/* not reached */
|
||||
return 0;
|
||||
}
|
||||
BUG_ON(!queue);
|
||||
item = pool_alloc(pool_head_hlua_queue);
|
||||
if (!item) {
|
||||
lua_pushboolean(L, 0);
|
||||
return 1;
|
||||
}
|
||||
item->ref = hlua_ref(L);
|
||||
MT_LIST_INIT(&item->list);
|
||||
HA_ATOMIC_INC(&queue->size);
|
||||
MT_LIST_APPEND(&queue->list, &item->list);
|
||||
|
||||
/* notify tasks waiting on queue:pop_wait() (if any) */
|
||||
mt_list_for_each_entry_safe(waiter, &queue->wait_tasks, entry, elt1, elt2) {
|
||||
task_wakeup(waiter->task, TASK_WOKEN_MSG);
|
||||
}
|
||||
|
||||
lua_pushboolean(L, 1);
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* queue:pop(): returns the first item at the top of que queue or nil if
|
||||
* the queue is empty.
|
||||
*/
|
||||
static int _hlua_queue_pop(lua_State *L, struct hlua_queue *queue)
|
||||
{
|
||||
struct hlua_queue_item *item;
|
||||
|
||||
item = MT_LIST_POP(&queue->list, typeof(item), list);
|
||||
if (!item) {
|
||||
lua_pushnil(L);
|
||||
return 1; /* nothing in queue, return nil */
|
||||
}
|
||||
|
||||
HA_ATOMIC_DEC(&queue->size);
|
||||
/* push lua obj on the stack */
|
||||
hlua_pushref(L, item->ref);
|
||||
|
||||
/* free the queue item */
|
||||
pool_free(pool_head_hlua_queue, item);
|
||||
|
||||
return 1;
|
||||
}
|
||||
static int hlua_queue_pop(lua_State *L)
|
||||
{
|
||||
struct hlua_queue *queue = hlua_check_queue(L, 1);
|
||||
|
||||
BUG_ON(!queue);
|
||||
return _hlua_queue_pop(L, queue);
|
||||
}
|
||||
|
||||
/* queue:pop_wait(): same as queue:pop() but doesn't return on empty queue.
|
||||
*
|
||||
* Aborts if used incorrectly and returns nil in case of memory error.
|
||||
*/
|
||||
static int _hlua_queue_pop_wait(lua_State *L, int status, lua_KContext ctx)
|
||||
{
|
||||
struct hlua_queue *queue = hlua_check_queue(L, 1);
|
||||
|
||||
/* new pop attempt */
|
||||
if (!_hlua_queue_pop(L, queue))
|
||||
hlua_yieldk(L, 0, 0, _hlua_queue_pop_wait, TICK_ETERNITY, 0); // wait retry
|
||||
return 1; // success
|
||||
}
|
||||
static int hlua_queue_pop_wait(lua_State *L)
|
||||
{
|
||||
struct hlua_queue *queue = hlua_check_queue(L, 1);
|
||||
struct hlua_queue_wait *wait;
|
||||
struct hlua *hlua;
|
||||
|
||||
BUG_ON(!queue);
|
||||
|
||||
/* Get hlua struct, or NULL if we execute from main lua state */
|
||||
hlua = hlua_gethlua(L);
|
||||
|
||||
if (!hlua || HLUA_CANT_YIELD(hlua)) {
|
||||
luaL_error(L, "pop_wait() may only be used within task context "
|
||||
"(requires yielding)");
|
||||
return 0; /* not reached */
|
||||
}
|
||||
|
||||
wait = pool_alloc(pool_head_hlua_queuew);
|
||||
if (!wait) {
|
||||
lua_pushnil(L);
|
||||
return 1; /* memory error, return nil */
|
||||
}
|
||||
|
||||
wait->task = hlua->task;
|
||||
MT_LIST_INIT(&wait->entry);
|
||||
|
||||
/* add task to queue's wait list */
|
||||
MT_LIST_TRY_APPEND(&queue->wait_tasks, &wait->entry);
|
||||
|
||||
/* push queue on the top of the stack */
|
||||
lua_pushlightuserdata(L, queue);
|
||||
|
||||
/* try to pop without waiting (there could be already pending items) */
|
||||
if (!_hlua_queue_pop(L, queue)) {
|
||||
/* no item immediately available, go to waiting loop */
|
||||
hlua_yieldk(L, 0, 0, _hlua_queue_pop_wait, TICK_ETERNITY, 0);
|
||||
}
|
||||
|
||||
/* remove task from waiting list */
|
||||
MT_LIST_DELETE(&wait->entry);
|
||||
pool_free(pool_head_hlua_queuew, wait);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int hlua_queue_new(lua_State *L)
|
||||
{
|
||||
struct hlua_queue *q;
|
||||
|
||||
lua_newtable(L);
|
||||
|
||||
/* set class metatable */
|
||||
lua_rawgeti(L, LUA_REGISTRYINDEX, class_queue_ref);
|
||||
lua_setmetatable(L, -2);
|
||||
|
||||
/* index:0 is queue userdata (c data) */
|
||||
q = lua_newuserdata(L, sizeof(*q));
|
||||
MT_LIST_INIT(&q->list);
|
||||
MT_LIST_INIT(&q->wait_tasks);
|
||||
q->size = 0;
|
||||
lua_rawseti(L, -2, 0);
|
||||
|
||||
/* class methods */
|
||||
hlua_class_function(L, "size", hlua_queue_size);
|
||||
hlua_class_function(L, "pop", hlua_queue_pop);
|
||||
hlua_class_function(L, "pop_wait", hlua_queue_pop_wait);
|
||||
hlua_class_function(L, "push", hlua_queue_push);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int hlua_queue_gc(struct lua_State *L)
|
||||
{
|
||||
struct hlua_queue *queue = hlua_check_queue(L, 1);
|
||||
struct hlua_queue_wait *wait;
|
||||
struct hlua_queue_item *item;
|
||||
|
||||
/* Purge waiting tasks (if any)
|
||||
*
|
||||
* It is normally not expected to have waiting tasks, except if such
|
||||
* task has been aborted while in the middle of a queue:pop_wait()
|
||||
* function call.
|
||||
*/
|
||||
while ((wait = MT_LIST_POP(&queue->wait_tasks, typeof(wait), entry))) {
|
||||
/* free the wait entry */
|
||||
pool_free(pool_head_hlua_queuew, wait);
|
||||
}
|
||||
|
||||
/* purge remaining (unconsumed) items in the queue */
|
||||
while ((item = MT_LIST_POP(&queue->list, typeof(item), list))) {
|
||||
/* free the queue item */
|
||||
pool_free(pool_head_hlua_queue, item);
|
||||
}
|
||||
|
||||
/* queue (userdata) will automatically be freed by lua gc */
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void hlua_queue_init(lua_State *L)
|
||||
{
|
||||
/* Creates the queue object. */
|
||||
lua_newtable(L);
|
||||
|
||||
hlua_class_function(L, "__gc", hlua_queue_gc);
|
||||
|
||||
class_queue_ref = luaL_ref(L, LUA_REGISTRYINDEX);
|
||||
}
|
||||
|
||||
int hlua_fcn_new_stktable(lua_State *L, struct stktable *tbl)
|
||||
{
|
||||
lua_newtable(L);
|
||||
@ -2339,6 +2575,7 @@ static int hlua_regex_free(struct lua_State *L)
|
||||
void hlua_fcn_reg_core_fcn(lua_State *L)
|
||||
{
|
||||
hlua_concat_init(L);
|
||||
hlua_queue_init(L);
|
||||
|
||||
hlua_class_function(L, "now", hlua_now);
|
||||
hlua_class_function(L, "http_date", hlua_http_date);
|
||||
@ -2346,6 +2583,7 @@ void hlua_fcn_reg_core_fcn(lua_State *L)
|
||||
hlua_class_function(L, "rfc850_date", hlua_rfc850_date);
|
||||
hlua_class_function(L, "asctime_date", hlua_asctime_date);
|
||||
hlua_class_function(L, "concat", hlua_concat_new);
|
||||
hlua_class_function(L, "queue", hlua_queue_new);
|
||||
hlua_class_function(L, "get_info", hlua_get_info);
|
||||
hlua_class_function(L, "parse_addr", hlua_parse_addr);
|
||||
hlua_class_function(L, "match_addr", hlua_match_addr);
|
||||
|
Loading…
Reference in New Issue
Block a user