mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-01-12 00:39:32 +00:00
MEDIUM: threads/xref: Convert xref function to a thread safe model
Ensure that the unlink is done safely between thread and that the peer struct will not destroy between the usage of the peer.
This commit is contained in:
parent
94a6bfce9b
commit
952939d294
@ -1,6 +1,8 @@
|
||||
#ifndef __XREF_H__
|
||||
#define __XREF_H__
|
||||
|
||||
#include <common/hathreads.h>
|
||||
|
||||
/* xref is used to create relation between two elements.
|
||||
* Once an element is released, it breaks the relation. If the
|
||||
* relation is already broken, it frees the xref struct.
|
||||
@ -13,25 +15,64 @@ struct xref {
|
||||
struct xref *peer;
|
||||
};
|
||||
|
||||
#define XREF_BUSY ((struct xref *)1)
|
||||
|
||||
static inline void xref_create(struct xref *xref_a, struct xref *xref_b)
|
||||
{
|
||||
xref_a->peer = xref_b;
|
||||
xref_b->peer = xref_a;
|
||||
}
|
||||
|
||||
static inline struct xref *xref_get_peer(struct xref *xref)
|
||||
static inline struct xref *xref_get_peer_and_lock(struct xref *xref)
|
||||
{
|
||||
if (!xref->peer)
|
||||
return NULL;
|
||||
return xref->peer;
|
||||
struct xref *local;
|
||||
struct xref *remote;
|
||||
|
||||
while (1) {
|
||||
|
||||
/* Get the local pointer to the peer. */
|
||||
local = HA_ATOMIC_XCHG(&xref->peer, XREF_BUSY);
|
||||
|
||||
/* If the local pointer is NULL, the peer no longer exists. */
|
||||
if (local == NULL) {
|
||||
xref->peer = NULL;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* If the local pointeru is BUSY, the peer try to acquire the
|
||||
* lock. We retry the process.
|
||||
*/
|
||||
if (local == XREF_BUSY)
|
||||
continue;
|
||||
|
||||
/* We are locked, the peer cant disapear, try to acquire
|
||||
* the pper's lock. Note that remote can't be NULL.
|
||||
*/
|
||||
remote = HA_ATOMIC_XCHG(&local->peer, XREF_BUSY);
|
||||
|
||||
/* The remote lock is BUSY, We retry the process. */
|
||||
if (remote == XREF_BUSY) {
|
||||
xref->peer = local;
|
||||
continue;
|
||||
}
|
||||
|
||||
/* We have the lock, we return the value of the xref. */
|
||||
return local;
|
||||
}
|
||||
}
|
||||
|
||||
static inline void xref_disconnect(struct xref *xref)
|
||||
static inline void xref_unlock(struct xref *xref, struct xref *peer)
|
||||
{
|
||||
if (!xref->peer)
|
||||
return;
|
||||
/* Release the peer. */
|
||||
peer->peer = xref;
|
||||
|
||||
xref->peer->peer = NULL;
|
||||
/* Release myself. */
|
||||
xref->peer = peer;
|
||||
}
|
||||
|
||||
static inline void xref_disconnect(struct xref *xref, struct xref *peer)
|
||||
{
|
||||
peer->peer = NULL;
|
||||
xref->peer = NULL;
|
||||
}
|
||||
|
||||
|
128
src/hlua.c
128
src/hlua.c
@ -1579,8 +1579,12 @@ static void hlua_socket_handler(struct appctx *appctx)
|
||||
*/
|
||||
static void hlua_socket_release(struct appctx *appctx)
|
||||
{
|
||||
struct xref *peer;
|
||||
|
||||
/* Remove my link in the original object. */
|
||||
xref_disconnect(&appctx->ctx.hlua_cosocket.xref);
|
||||
peer = xref_get_peer_and_lock(&appctx->ctx.hlua_cosocket.xref);
|
||||
if (peer)
|
||||
xref_disconnect(&appctx->ctx.hlua_cosocket.xref, peer);
|
||||
|
||||
/* Wake all the task waiting for me. */
|
||||
notification_wake(&appctx->ctx.hlua_cosocket.wake_on_read);
|
||||
@ -1602,11 +1606,9 @@ __LJMP static int hlua_socket_gc(lua_State *L)
|
||||
MAY_LJMP(check_args(L, 1, "__gc"));
|
||||
|
||||
socket = MAY_LJMP(hlua_checksocket(L, 1));
|
||||
peer = xref_get_peer(&socket->xref);
|
||||
if (!peer) {
|
||||
xref_disconnect(&socket->xref);
|
||||
peer = xref_get_peer_and_lock(&socket->xref);
|
||||
if (!peer)
|
||||
return 0;
|
||||
}
|
||||
appctx = container_of(peer, struct appctx, ctx.hlua_cosocket.xref);
|
||||
|
||||
/* Set the flag which destroy the session. */
|
||||
@ -1614,7 +1616,7 @@ __LJMP static int hlua_socket_gc(lua_State *L)
|
||||
appctx_wakeup(appctx);
|
||||
|
||||
/* Remove all reference between the Lua stack and the coroutine stream. */
|
||||
xref_disconnect(&socket->xref);
|
||||
xref_disconnect(&socket->xref, peer);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -1637,11 +1639,9 @@ __LJMP static int hlua_socket_close(lua_State *L)
|
||||
if (socket->tid != tid)
|
||||
WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
|
||||
|
||||
peer = xref_get_peer(&socket->xref);
|
||||
if (!peer) {
|
||||
xref_disconnect(&socket->xref);
|
||||
peer = xref_get_peer_and_lock(&socket->xref);
|
||||
if (!peer)
|
||||
return 0;
|
||||
}
|
||||
appctx = container_of(peer, struct appctx, ctx.hlua_cosocket.xref);
|
||||
|
||||
/* Set the flag which destroy the session. */
|
||||
@ -1649,7 +1649,7 @@ __LJMP static int hlua_socket_close(lua_State *L)
|
||||
appctx_wakeup(appctx);
|
||||
|
||||
/* Remove all reference between the Lua stack and the coroutine stream. */
|
||||
xref_disconnect(&socket->xref);
|
||||
xref_disconnect(&socket->xref, peer);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -1694,11 +1694,9 @@ __LJMP static int hlua_socket_receive_yield(struct lua_State *L, int status, lua
|
||||
WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
|
||||
|
||||
/* check for connection break. If some data where read, return it. */
|
||||
peer = xref_get_peer(&socket->xref);
|
||||
if (!peer) {
|
||||
xref_disconnect(&socket->xref);
|
||||
goto connection_closed;
|
||||
}
|
||||
peer = xref_get_peer_and_lock(&socket->xref);
|
||||
if (!peer)
|
||||
goto no_peer;
|
||||
appctx = container_of(peer, struct appctx, ctx.hlua_cosocket.xref);
|
||||
si = appctx->owner;
|
||||
s = si_strm(si);
|
||||
@ -1784,10 +1782,15 @@ __LJMP static int hlua_socket_receive_yield(struct lua_State *L, int status, lua
|
||||
|
||||
/* Return result. */
|
||||
luaL_pushresult(&socket->b);
|
||||
xref_unlock(&socket->xref, peer);
|
||||
return 1;
|
||||
|
||||
connection_closed:
|
||||
|
||||
xref_unlock(&socket->xref, peer);
|
||||
|
||||
no_peer:
|
||||
|
||||
/* If the buffer containds data. */
|
||||
if (socket->b.n > 0) {
|
||||
luaL_pushresult(&socket->b);
|
||||
@ -1800,8 +1803,11 @@ connection_closed:
|
||||
connection_empty:
|
||||
|
||||
appctx = objt_appctx(s->si[0].end);
|
||||
if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_read, hlua->task))
|
||||
if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_read, hlua->task)) {
|
||||
xref_unlock(&socket->xref, peer);
|
||||
WILL_LJMP(luaL_error(L, "out of memory"));
|
||||
}
|
||||
xref_unlock(&socket->xref, peer);
|
||||
WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_receive_yield, TICK_ETERNITY, 0));
|
||||
return 0;
|
||||
}
|
||||
@ -1915,9 +1921,8 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
|
||||
WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
|
||||
|
||||
/* check for connection break. If some data where read, return it. */
|
||||
peer = xref_get_peer(&socket->xref);
|
||||
peer = xref_get_peer_and_lock(&socket->xref);
|
||||
if (!peer) {
|
||||
xref_disconnect(&socket->xref);
|
||||
lua_pushinteger(L, -1);
|
||||
return 1;
|
||||
}
|
||||
@ -1927,6 +1932,7 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
|
||||
|
||||
/* Check for connection close. */
|
||||
if (channel_output_closed(&s->req)) {
|
||||
xref_unlock(&socket->xref, peer);
|
||||
lua_pushinteger(L, -1);
|
||||
return 1;
|
||||
}
|
||||
@ -1936,8 +1942,10 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
|
||||
send_len = buf_len - sent;
|
||||
|
||||
/* All the data are sent. */
|
||||
if (sent >= buf_len)
|
||||
if (sent >= buf_len) {
|
||||
xref_unlock(&socket->xref, peer);
|
||||
return 1; /* Implicitly return the length sent. */
|
||||
}
|
||||
|
||||
/* Check if the buffer is avalaible because HAProxy doesn't allocate
|
||||
* the request buffer if its not required.
|
||||
@ -1952,8 +1960,10 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
|
||||
len = buffer_total_space(s->req.buf);
|
||||
if (len <= 0) {
|
||||
appctx = objt_appctx(s->si[0].end);
|
||||
if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task))
|
||||
if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task)) {
|
||||
xref_unlock(&socket->xref, peer);
|
||||
WILL_LJMP(luaL_error(L, "out of memory"));
|
||||
}
|
||||
goto hlua_socket_write_yield_return;
|
||||
}
|
||||
|
||||
@ -1974,6 +1984,7 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
|
||||
MAY_LJMP(hlua_socket_close(L));
|
||||
lua_pop(L, 1);
|
||||
lua_pushinteger(L, -1);
|
||||
xref_unlock(&socket->xref, peer);
|
||||
return 1;
|
||||
}
|
||||
|
||||
@ -1989,10 +2000,13 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
|
||||
lua_pushinteger(L, sent + len);
|
||||
|
||||
/* All the data buffer is sent ? */
|
||||
if (sent + len >= buf_len)
|
||||
if (sent + len >= buf_len) {
|
||||
xref_unlock(&socket->xref, peer);
|
||||
return 1;
|
||||
}
|
||||
|
||||
hlua_socket_write_yield_return:
|
||||
xref_unlock(&socket->xref, peer);
|
||||
WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_write_yield, TICK_ETERNITY, 0));
|
||||
return 0;
|
||||
}
|
||||
@ -2131,6 +2145,7 @@ __LJMP static int hlua_socket_getpeername(struct lua_State *L)
|
||||
struct appctx *appctx;
|
||||
struct stream_interface *si;
|
||||
struct stream *s;
|
||||
int ret;
|
||||
|
||||
MAY_LJMP(check_args(L, 1, "getpeername"));
|
||||
|
||||
@ -2143,9 +2158,8 @@ __LJMP static int hlua_socket_getpeername(struct lua_State *L)
|
||||
WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
|
||||
|
||||
/* check for connection break. If some data where read, return it. */
|
||||
peer = xref_get_peer(&socket->xref);
|
||||
peer = xref_get_peer_and_lock(&socket->xref);
|
||||
if (!peer) {
|
||||
xref_disconnect(&socket->xref);
|
||||
lua_pushnil(L);
|
||||
return 1;
|
||||
}
|
||||
@ -2155,17 +2169,21 @@ __LJMP static int hlua_socket_getpeername(struct lua_State *L)
|
||||
|
||||
conn = objt_conn(s->si[1].end);
|
||||
if (!conn) {
|
||||
xref_unlock(&socket->xref, peer);
|
||||
lua_pushnil(L);
|
||||
return 1;
|
||||
}
|
||||
|
||||
conn_get_to_addr(conn);
|
||||
if (!(conn->flags & CO_FL_ADDR_TO_SET)) {
|
||||
xref_unlock(&socket->xref, peer);
|
||||
lua_pushnil(L);
|
||||
return 1;
|
||||
}
|
||||
|
||||
return MAY_LJMP(hlua_socket_info(L, &conn->addr.to));
|
||||
ret = MAY_LJMP(hlua_socket_info(L, &conn->addr.to));
|
||||
xref_unlock(&socket->xref, peer);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Returns information about my connection side. */
|
||||
@ -2177,6 +2195,7 @@ static int hlua_socket_getsockname(struct lua_State *L)
|
||||
struct xref *peer;
|
||||
struct stream_interface *si;
|
||||
struct stream *s;
|
||||
int ret;
|
||||
|
||||
MAY_LJMP(check_args(L, 1, "getsockname"));
|
||||
|
||||
@ -2189,9 +2208,8 @@ static int hlua_socket_getsockname(struct lua_State *L)
|
||||
WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
|
||||
|
||||
/* check for connection break. If some data where read, return it. */
|
||||
peer = xref_get_peer(&socket->xref);
|
||||
peer = xref_get_peer_and_lock(&socket->xref);
|
||||
if (!peer) {
|
||||
xref_disconnect(&socket->xref);
|
||||
lua_pushnil(L);
|
||||
return 1;
|
||||
}
|
||||
@ -2201,17 +2219,21 @@ static int hlua_socket_getsockname(struct lua_State *L)
|
||||
|
||||
conn = objt_conn(s->si[1].end);
|
||||
if (!conn) {
|
||||
xref_unlock(&socket->xref, peer);
|
||||
lua_pushnil(L);
|
||||
return 1;
|
||||
}
|
||||
|
||||
conn_get_from_addr(conn);
|
||||
if (!(conn->flags & CO_FL_ADDR_FROM_SET)) {
|
||||
xref_unlock(&socket->xref, peer);
|
||||
lua_pushnil(L);
|
||||
return 1;
|
||||
}
|
||||
|
||||
return hlua_socket_info(L, &conn->addr.from);
|
||||
ret = hlua_socket_info(L, &conn->addr.from);
|
||||
xref_unlock(&socket->xref, peer);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* This struct define the applet. */
|
||||
@ -2238,9 +2260,8 @@ __LJMP static int hlua_socket_connect_yield(struct lua_State *L, int status, lua
|
||||
WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
|
||||
|
||||
/* check for connection break. If some data where read, return it. */
|
||||
peer = xref_get_peer(&socket->xref);
|
||||
peer = xref_get_peer_and_lock(&socket->xref);
|
||||
if (!peer) {
|
||||
xref_disconnect(&socket->xref);
|
||||
lua_pushnil(L);
|
||||
lua_pushstring(L, "Can't connect");
|
||||
return 2;
|
||||
@ -2252,11 +2273,14 @@ __LJMP static int hlua_socket_connect_yield(struct lua_State *L, int status, lua
|
||||
/* Check if we run on the same thread than the xreator thread.
|
||||
* We cannot access to the socket if the thread is different.
|
||||
*/
|
||||
if (socket->tid != tid)
|
||||
if (socket->tid != tid) {
|
||||
xref_unlock(&socket->xref, peer);
|
||||
WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
|
||||
}
|
||||
|
||||
/* Check for connection close. */
|
||||
if (!hlua || channel_output_closed(&s->req)) {
|
||||
xref_unlock(&socket->xref, peer);
|
||||
lua_pushnil(L);
|
||||
lua_pushstring(L, "Can't connect");
|
||||
return 2;
|
||||
@ -2266,12 +2290,16 @@ __LJMP static int hlua_socket_connect_yield(struct lua_State *L, int status, lua
|
||||
|
||||
/* Check for connection established. */
|
||||
if (appctx->ctx.hlua_cosocket.connected) {
|
||||
xref_unlock(&socket->xref, peer);
|
||||
lua_pushinteger(L, 1);
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task))
|
||||
if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task)) {
|
||||
xref_unlock(&socket->xref, peer);
|
||||
WILL_LJMP(luaL_error(L, "out of memory error"));
|
||||
}
|
||||
xref_unlock(&socket->xref, peer);
|
||||
WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_connect_yield, TICK_ETERNITY, 0));
|
||||
return 0;
|
||||
}
|
||||
@ -2308,9 +2336,8 @@ __LJMP static int hlua_socket_connect(struct lua_State *L)
|
||||
port = MAY_LJMP(luaL_checkinteger(L, 3));
|
||||
|
||||
/* check for connection break. If some data where read, return it. */
|
||||
peer = xref_get_peer(&socket->xref);
|
||||
peer = xref_get_peer_and_lock(&socket->xref);
|
||||
if (!peer) {
|
||||
xref_disconnect(&socket->xref);
|
||||
lua_pushnil(L);
|
||||
return 1;
|
||||
}
|
||||
@ -2320,29 +2347,39 @@ __LJMP static int hlua_socket_connect(struct lua_State *L)
|
||||
|
||||
/* Initialise connection. */
|
||||
conn = si_alloc_conn(&s->si[1]);
|
||||
if (!conn)
|
||||
if (!conn) {
|
||||
xref_unlock(&socket->xref, peer);
|
||||
WILL_LJMP(luaL_error(L, "connect: internal error"));
|
||||
}
|
||||
|
||||
/* needed for the connection not to be closed */
|
||||
conn->target = s->target;
|
||||
|
||||
/* Parse ip address. */
|
||||
addr = str2sa_range(ip, NULL, &low, &high, NULL, NULL, NULL, 0);
|
||||
if (!addr)
|
||||
if (!addr) {
|
||||
xref_unlock(&socket->xref, peer);
|
||||
WILL_LJMP(luaL_error(L, "connect: cannot parse destination address '%s'", ip));
|
||||
if (low != high)
|
||||
}
|
||||
if (low != high) {
|
||||
xref_unlock(&socket->xref, peer);
|
||||
WILL_LJMP(luaL_error(L, "connect: port ranges not supported : address '%s'", ip));
|
||||
}
|
||||
memcpy(&conn->addr.to, addr, sizeof(struct sockaddr_storage));
|
||||
|
||||
/* Set port. */
|
||||
if (low == 0) {
|
||||
if (conn->addr.to.ss_family == AF_INET) {
|
||||
if (port == -1)
|
||||
if (port == -1) {
|
||||
xref_unlock(&socket->xref, peer);
|
||||
WILL_LJMP(luaL_error(L, "connect: port missing"));
|
||||
}
|
||||
((struct sockaddr_in *)&conn->addr.to)->sin_port = htons(port);
|
||||
} else if (conn->addr.to.ss_family == AF_INET6) {
|
||||
if (port == -1)
|
||||
if (port == -1) {
|
||||
xref_unlock(&socket->xref, peer);
|
||||
WILL_LJMP(luaL_error(L, "connect: port missing"));
|
||||
}
|
||||
((struct sockaddr_in6 *)&conn->addr.to)->sin6_port = htons(port);
|
||||
}
|
||||
}
|
||||
@ -2359,8 +2396,11 @@ __LJMP static int hlua_socket_connect(struct lua_State *L)
|
||||
|
||||
hlua->flags |= HLUA_MUST_GC;
|
||||
|
||||
if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task))
|
||||
if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task)) {
|
||||
xref_unlock(&socket->xref, peer);
|
||||
WILL_LJMP(luaL_error(L, "out of memory"));
|
||||
}
|
||||
xref_unlock(&socket->xref, peer);
|
||||
WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_connect_yield, TICK_ETERNITY, 0));
|
||||
|
||||
return 0;
|
||||
@ -2379,9 +2419,8 @@ __LJMP static int hlua_socket_connect_ssl(struct lua_State *L)
|
||||
socket = MAY_LJMP(hlua_checksocket(L, 1));
|
||||
|
||||
/* check for connection break. If some data where read, return it. */
|
||||
peer = xref_get_peer(&socket->xref);
|
||||
peer = xref_get_peer_and_lock(&socket->xref);
|
||||
if (!peer) {
|
||||
xref_disconnect(&socket->xref);
|
||||
lua_pushnil(L);
|
||||
return 1;
|
||||
}
|
||||
@ -2390,6 +2429,7 @@ __LJMP static int hlua_socket_connect_ssl(struct lua_State *L)
|
||||
s = si_strm(si);
|
||||
|
||||
s->target = &socket_ssl.obj_type;
|
||||
xref_unlock(&socket->xref, peer);
|
||||
return MAY_LJMP(hlua_socket_connect(L));
|
||||
}
|
||||
#endif
|
||||
@ -2420,9 +2460,8 @@ __LJMP static int hlua_socket_settimeout(struct lua_State *L)
|
||||
WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
|
||||
|
||||
/* check for connection break. If some data where read, return it. */
|
||||
peer = xref_get_peer(&socket->xref);
|
||||
peer = xref_get_peer_and_lock(&socket->xref);
|
||||
if (!peer) {
|
||||
xref_disconnect(&socket->xref);
|
||||
hlua_pusherror(L, "socket: not yet initialised, you can't set timeouts.");
|
||||
WILL_LJMP(lua_error(L));
|
||||
return 0;
|
||||
@ -2435,6 +2474,7 @@ __LJMP static int hlua_socket_settimeout(struct lua_State *L)
|
||||
s->req.wto = tmout;
|
||||
s->res.rto = tmout;
|
||||
s->res.wto = tmout;
|
||||
xref_unlock(&socket->xref, peer);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user