diff --git a/include/common/xref.h b/include/common/xref.h index b020280ef..6dfa7b627 100644 --- a/include/common/xref.h +++ b/include/common/xref.h @@ -1,6 +1,8 @@ #ifndef __XREF_H__ #define __XREF_H__ +#include + /* xref is used to create relation between two elements. * Once an element is released, it breaks the relation. If the * relation is already broken, it frees the xref struct. @@ -13,25 +15,64 @@ struct xref { struct xref *peer; }; +#define XREF_BUSY ((struct xref *)1) + static inline void xref_create(struct xref *xref_a, struct xref *xref_b) { xref_a->peer = xref_b; xref_b->peer = xref_a; } -static inline struct xref *xref_get_peer(struct xref *xref) +static inline struct xref *xref_get_peer_and_lock(struct xref *xref) { - if (!xref->peer) - return NULL; - return xref->peer; + struct xref *local; + struct xref *remote; + + while (1) { + + /* Get the local pointer to the peer. */ + local = HA_ATOMIC_XCHG(&xref->peer, XREF_BUSY); + + /* If the local pointer is NULL, the peer no longer exists. */ + if (local == NULL) { + xref->peer = NULL; + return NULL; + } + + /* If the local pointeru is BUSY, the peer try to acquire the + * lock. We retry the process. + */ + if (local == XREF_BUSY) + continue; + + /* We are locked, the peer cant disapear, try to acquire + * the pper's lock. Note that remote can't be NULL. + */ + remote = HA_ATOMIC_XCHG(&local->peer, XREF_BUSY); + + /* The remote lock is BUSY, We retry the process. */ + if (remote == XREF_BUSY) { + xref->peer = local; + continue; + } + + /* We have the lock, we return the value of the xref. */ + return local; + } } -static inline void xref_disconnect(struct xref *xref) +static inline void xref_unlock(struct xref *xref, struct xref *peer) { - if (!xref->peer) - return; + /* Release the peer. */ + peer->peer = xref; - xref->peer->peer = NULL; + /* Release myself. */ + xref->peer = peer; +} + +static inline void xref_disconnect(struct xref *xref, struct xref *peer) +{ + peer->peer = NULL; xref->peer = NULL; } diff --git a/src/hlua.c b/src/hlua.c index 1d14e5486..6edb08a83 100644 --- a/src/hlua.c +++ b/src/hlua.c @@ -1579,8 +1579,12 @@ static void hlua_socket_handler(struct appctx *appctx) */ static void hlua_socket_release(struct appctx *appctx) { + struct xref *peer; + /* Remove my link in the original object. */ - xref_disconnect(&appctx->ctx.hlua_cosocket.xref); + peer = xref_get_peer_and_lock(&appctx->ctx.hlua_cosocket.xref); + if (peer) + xref_disconnect(&appctx->ctx.hlua_cosocket.xref, peer); /* Wake all the task waiting for me. */ notification_wake(&appctx->ctx.hlua_cosocket.wake_on_read); @@ -1602,11 +1606,9 @@ __LJMP static int hlua_socket_gc(lua_State *L) MAY_LJMP(check_args(L, 1, "__gc")); socket = MAY_LJMP(hlua_checksocket(L, 1)); - peer = xref_get_peer(&socket->xref); - if (!peer) { - xref_disconnect(&socket->xref); + peer = xref_get_peer_and_lock(&socket->xref); + if (!peer) return 0; - } appctx = container_of(peer, struct appctx, ctx.hlua_cosocket.xref); /* Set the flag which destroy the session. */ @@ -1614,7 +1616,7 @@ __LJMP static int hlua_socket_gc(lua_State *L) appctx_wakeup(appctx); /* Remove all reference between the Lua stack and the coroutine stream. */ - xref_disconnect(&socket->xref); + xref_disconnect(&socket->xref, peer); return 0; } @@ -1637,11 +1639,9 @@ __LJMP static int hlua_socket_close(lua_State *L) if (socket->tid != tid) WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread")); - peer = xref_get_peer(&socket->xref); - if (!peer) { - xref_disconnect(&socket->xref); + peer = xref_get_peer_and_lock(&socket->xref); + if (!peer) return 0; - } appctx = container_of(peer, struct appctx, ctx.hlua_cosocket.xref); /* Set the flag which destroy the session. */ @@ -1649,7 +1649,7 @@ __LJMP static int hlua_socket_close(lua_State *L) appctx_wakeup(appctx); /* Remove all reference between the Lua stack and the coroutine stream. */ - xref_disconnect(&socket->xref); + xref_disconnect(&socket->xref, peer); return 0; } @@ -1694,11 +1694,9 @@ __LJMP static int hlua_socket_receive_yield(struct lua_State *L, int status, lua WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread")); /* check for connection break. If some data where read, return it. */ - peer = xref_get_peer(&socket->xref); - if (!peer) { - xref_disconnect(&socket->xref); - goto connection_closed; - } + peer = xref_get_peer_and_lock(&socket->xref); + if (!peer) + goto no_peer; appctx = container_of(peer, struct appctx, ctx.hlua_cosocket.xref); si = appctx->owner; s = si_strm(si); @@ -1784,10 +1782,15 @@ __LJMP static int hlua_socket_receive_yield(struct lua_State *L, int status, lua /* Return result. */ luaL_pushresult(&socket->b); + xref_unlock(&socket->xref, peer); return 1; connection_closed: + xref_unlock(&socket->xref, peer); + +no_peer: + /* If the buffer containds data. */ if (socket->b.n > 0) { luaL_pushresult(&socket->b); @@ -1800,8 +1803,11 @@ connection_closed: connection_empty: appctx = objt_appctx(s->si[0].end); - if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_read, hlua->task)) + if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_read, hlua->task)) { + xref_unlock(&socket->xref, peer); WILL_LJMP(luaL_error(L, "out of memory")); + } + xref_unlock(&socket->xref, peer); WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_receive_yield, TICK_ETERNITY, 0)); return 0; } @@ -1915,9 +1921,8 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread")); /* check for connection break. If some data where read, return it. */ - peer = xref_get_peer(&socket->xref); + peer = xref_get_peer_and_lock(&socket->xref); if (!peer) { - xref_disconnect(&socket->xref); lua_pushinteger(L, -1); return 1; } @@ -1927,6 +1932,7 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext /* Check for connection close. */ if (channel_output_closed(&s->req)) { + xref_unlock(&socket->xref, peer); lua_pushinteger(L, -1); return 1; } @@ -1936,8 +1942,10 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext send_len = buf_len - sent; /* All the data are sent. */ - if (sent >= buf_len) + if (sent >= buf_len) { + xref_unlock(&socket->xref, peer); return 1; /* Implicitly return the length sent. */ + } /* Check if the buffer is avalaible because HAProxy doesn't allocate * the request buffer if its not required. @@ -1952,8 +1960,10 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext len = buffer_total_space(s->req.buf); if (len <= 0) { appctx = objt_appctx(s->si[0].end); - if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task)) + if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task)) { + xref_unlock(&socket->xref, peer); WILL_LJMP(luaL_error(L, "out of memory")); + } goto hlua_socket_write_yield_return; } @@ -1974,6 +1984,7 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext MAY_LJMP(hlua_socket_close(L)); lua_pop(L, 1); lua_pushinteger(L, -1); + xref_unlock(&socket->xref, peer); return 1; } @@ -1989,10 +2000,13 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext lua_pushinteger(L, sent + len); /* All the data buffer is sent ? */ - if (sent + len >= buf_len) + if (sent + len >= buf_len) { + xref_unlock(&socket->xref, peer); return 1; + } hlua_socket_write_yield_return: + xref_unlock(&socket->xref, peer); WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_write_yield, TICK_ETERNITY, 0)); return 0; } @@ -2131,6 +2145,7 @@ __LJMP static int hlua_socket_getpeername(struct lua_State *L) struct appctx *appctx; struct stream_interface *si; struct stream *s; + int ret; MAY_LJMP(check_args(L, 1, "getpeername")); @@ -2143,9 +2158,8 @@ __LJMP static int hlua_socket_getpeername(struct lua_State *L) WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread")); /* check for connection break. If some data where read, return it. */ - peer = xref_get_peer(&socket->xref); + peer = xref_get_peer_and_lock(&socket->xref); if (!peer) { - xref_disconnect(&socket->xref); lua_pushnil(L); return 1; } @@ -2155,17 +2169,21 @@ __LJMP static int hlua_socket_getpeername(struct lua_State *L) conn = objt_conn(s->si[1].end); if (!conn) { + xref_unlock(&socket->xref, peer); lua_pushnil(L); return 1; } conn_get_to_addr(conn); if (!(conn->flags & CO_FL_ADDR_TO_SET)) { + xref_unlock(&socket->xref, peer); lua_pushnil(L); return 1; } - return MAY_LJMP(hlua_socket_info(L, &conn->addr.to)); + ret = MAY_LJMP(hlua_socket_info(L, &conn->addr.to)); + xref_unlock(&socket->xref, peer); + return ret; } /* Returns information about my connection side. */ @@ -2177,6 +2195,7 @@ static int hlua_socket_getsockname(struct lua_State *L) struct xref *peer; struct stream_interface *si; struct stream *s; + int ret; MAY_LJMP(check_args(L, 1, "getsockname")); @@ -2189,9 +2208,8 @@ static int hlua_socket_getsockname(struct lua_State *L) WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread")); /* check for connection break. If some data where read, return it. */ - peer = xref_get_peer(&socket->xref); + peer = xref_get_peer_and_lock(&socket->xref); if (!peer) { - xref_disconnect(&socket->xref); lua_pushnil(L); return 1; } @@ -2201,17 +2219,21 @@ static int hlua_socket_getsockname(struct lua_State *L) conn = objt_conn(s->si[1].end); if (!conn) { + xref_unlock(&socket->xref, peer); lua_pushnil(L); return 1; } conn_get_from_addr(conn); if (!(conn->flags & CO_FL_ADDR_FROM_SET)) { + xref_unlock(&socket->xref, peer); lua_pushnil(L); return 1; } - return hlua_socket_info(L, &conn->addr.from); + ret = hlua_socket_info(L, &conn->addr.from); + xref_unlock(&socket->xref, peer); + return ret; } /* This struct define the applet. */ @@ -2238,9 +2260,8 @@ __LJMP static int hlua_socket_connect_yield(struct lua_State *L, int status, lua WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread")); /* check for connection break. If some data where read, return it. */ - peer = xref_get_peer(&socket->xref); + peer = xref_get_peer_and_lock(&socket->xref); if (!peer) { - xref_disconnect(&socket->xref); lua_pushnil(L); lua_pushstring(L, "Can't connect"); return 2; @@ -2252,11 +2273,14 @@ __LJMP static int hlua_socket_connect_yield(struct lua_State *L, int status, lua /* Check if we run on the same thread than the xreator thread. * We cannot access to the socket if the thread is different. */ - if (socket->tid != tid) + if (socket->tid != tid) { + xref_unlock(&socket->xref, peer); WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread")); + } /* Check for connection close. */ if (!hlua || channel_output_closed(&s->req)) { + xref_unlock(&socket->xref, peer); lua_pushnil(L); lua_pushstring(L, "Can't connect"); return 2; @@ -2266,12 +2290,16 @@ __LJMP static int hlua_socket_connect_yield(struct lua_State *L, int status, lua /* Check for connection established. */ if (appctx->ctx.hlua_cosocket.connected) { + xref_unlock(&socket->xref, peer); lua_pushinteger(L, 1); return 1; } - if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task)) + if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task)) { + xref_unlock(&socket->xref, peer); WILL_LJMP(luaL_error(L, "out of memory error")); + } + xref_unlock(&socket->xref, peer); WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_connect_yield, TICK_ETERNITY, 0)); return 0; } @@ -2308,9 +2336,8 @@ __LJMP static int hlua_socket_connect(struct lua_State *L) port = MAY_LJMP(luaL_checkinteger(L, 3)); /* check for connection break. If some data where read, return it. */ - peer = xref_get_peer(&socket->xref); + peer = xref_get_peer_and_lock(&socket->xref); if (!peer) { - xref_disconnect(&socket->xref); lua_pushnil(L); return 1; } @@ -2320,29 +2347,39 @@ __LJMP static int hlua_socket_connect(struct lua_State *L) /* Initialise connection. */ conn = si_alloc_conn(&s->si[1]); - if (!conn) + if (!conn) { + xref_unlock(&socket->xref, peer); WILL_LJMP(luaL_error(L, "connect: internal error")); + } /* needed for the connection not to be closed */ conn->target = s->target; /* Parse ip address. */ addr = str2sa_range(ip, NULL, &low, &high, NULL, NULL, NULL, 0); - if (!addr) + if (!addr) { + xref_unlock(&socket->xref, peer); WILL_LJMP(luaL_error(L, "connect: cannot parse destination address '%s'", ip)); - if (low != high) + } + if (low != high) { + xref_unlock(&socket->xref, peer); WILL_LJMP(luaL_error(L, "connect: port ranges not supported : address '%s'", ip)); + } memcpy(&conn->addr.to, addr, sizeof(struct sockaddr_storage)); /* Set port. */ if (low == 0) { if (conn->addr.to.ss_family == AF_INET) { - if (port == -1) + if (port == -1) { + xref_unlock(&socket->xref, peer); WILL_LJMP(luaL_error(L, "connect: port missing")); + } ((struct sockaddr_in *)&conn->addr.to)->sin_port = htons(port); } else if (conn->addr.to.ss_family == AF_INET6) { - if (port == -1) + if (port == -1) { + xref_unlock(&socket->xref, peer); WILL_LJMP(luaL_error(L, "connect: port missing")); + } ((struct sockaddr_in6 *)&conn->addr.to)->sin6_port = htons(port); } } @@ -2359,8 +2396,11 @@ __LJMP static int hlua_socket_connect(struct lua_State *L) hlua->flags |= HLUA_MUST_GC; - if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task)) + if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task)) { + xref_unlock(&socket->xref, peer); WILL_LJMP(luaL_error(L, "out of memory")); + } + xref_unlock(&socket->xref, peer); WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_connect_yield, TICK_ETERNITY, 0)); return 0; @@ -2379,9 +2419,8 @@ __LJMP static int hlua_socket_connect_ssl(struct lua_State *L) socket = MAY_LJMP(hlua_checksocket(L, 1)); /* check for connection break. If some data where read, return it. */ - peer = xref_get_peer(&socket->xref); + peer = xref_get_peer_and_lock(&socket->xref); if (!peer) { - xref_disconnect(&socket->xref); lua_pushnil(L); return 1; } @@ -2390,6 +2429,7 @@ __LJMP static int hlua_socket_connect_ssl(struct lua_State *L) s = si_strm(si); s->target = &socket_ssl.obj_type; + xref_unlock(&socket->xref, peer); return MAY_LJMP(hlua_socket_connect(L)); } #endif @@ -2420,9 +2460,8 @@ __LJMP static int hlua_socket_settimeout(struct lua_State *L) WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread")); /* check for connection break. If some data where read, return it. */ - peer = xref_get_peer(&socket->xref); + peer = xref_get_peer_and_lock(&socket->xref); if (!peer) { - xref_disconnect(&socket->xref); hlua_pusherror(L, "socket: not yet initialised, you can't set timeouts."); WILL_LJMP(lua_error(L)); return 0; @@ -2435,6 +2474,7 @@ __LJMP static int hlua_socket_settimeout(struct lua_State *L) s->req.wto = tmout; s->res.rto = tmout; s->res.wto = tmout; + xref_unlock(&socket->xref, peer); return 0; }