MEDIUM: threads/stream: Make streams list thread safe

Adds a global lock to protect the full streams list used to dump
sessions on stats socket.
This commit is contained in:
Emeric Brun 2017-06-30 16:23:45 +02:00 committed by Willy Tarreau
parent a1dd243adb
commit 6b35e9bfbf
2 changed files with 24 additions and 4 deletions

View File

@ -157,6 +157,7 @@ enum lock_label {
APPLETS_LOCK,
PEER_LOCK,
BUF_WQ_LOCK,
STRMS_LOCK,
LOCK_LABELS
};
struct lock_stat {
@ -243,7 +244,7 @@ static inline void show_lock_stats()
"TASK_RQ", "TASK_WQ", "POOL",
"LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER",
"UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS",
"APPLETS", "PEER", "BUF_WQ" };
"APPLETS", "PEER", "BUF_WQ", "STREAMS" };
int lbl;
for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
@ -524,7 +525,6 @@ static inline void __spin_unlock(enum lock_label lbl, struct ha_spinlock *l,
l->info.last_location.line = line;
__RWLOCK_WRUNLOCK(&l->lock);
HA_ATOMIC_ADD(&lock_stats[lbl].num_write_unlocked, 1);
}

View File

@ -63,6 +63,9 @@
struct pool_head *pool2_stream;
struct list streams;
#ifdef USE_THREAD
HA_SPINLOCK_T streams_lock;
#endif
/* List of all use-service keywords. */
static struct list service_keywords = LIST_HEAD_INIT(service_keywords);
@ -154,7 +157,6 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
s->uniq_id = global.req_count++;
/* OK, we're keeping the stream, so let's properly initialize the stream */
LIST_ADDQ(&streams, &s->list);
LIST_INIT(&s->back_refs);
LIST_INIT(&s->buffer_wait.list);
@ -251,6 +253,10 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
s->txn = NULL;
s->hlua = NULL;
SPIN_LOCK(STRMS_LOCK, &streams_lock);
LIST_ADDQ(&streams, &s->list);
SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
if (flt_stream_init(s) < 0 || flt_stream_start(s) < 0)
goto out_fail_accept;
@ -369,6 +375,7 @@ static void stream_free(struct stream *s)
stream_store_counters(s);
SPIN_LOCK(STRMS_LOCK, &streams_lock);
list_for_each_entry_safe(bref, back, &s->back_refs, users) {
/* we have to unlink all watchers. We must not relink them if
* this stream was the last one in the list.
@ -380,6 +387,8 @@ static void stream_free(struct stream *s)
bref->ref = s->list.n;
}
LIST_DEL(&s->list);
SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
si_release_endpoint(&s->si[1]);
si_release_endpoint(&s->si[0]);
@ -462,6 +471,7 @@ void stream_release_buffers(struct stream *s)
int init_stream()
{
LIST_INIT(&streams);
SPIN_INIT(&streams_lock);
pool2_stream = create_pool("stream", sizeof(struct stream), MEM_F_SHARED);
return pool2_stream != NULL;
}
@ -3039,11 +3049,14 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
* pointer points back to the head of the streams list.
*/
LIST_INIT(&appctx->ctx.sess.bref.users);
SPIN_LOCK(STRMS_LOCK, &streams_lock);
appctx->ctx.sess.bref.ref = streams.n;
SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
appctx->st2 = STAT_ST_LIST;
/* fall through */
case STAT_ST_LIST:
SPIN_LOCK(STRMS_LOCK, &streams_lock);
/* first, let's detach the back-ref from a possible previous stream */
if (!LIST_ISEMPTY(&appctx->ctx.sess.bref.users)) {
LIST_DEL(&appctx->ctx.sess.bref.users);
@ -3063,8 +3076,10 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
LIST_ADDQ(&curr_strm->back_refs, &appctx->ctx.sess.bref.users);
/* call the proper dump() function and return if we're missing space */
if (!stats_dump_full_strm_to_buffer(si, curr_strm))
if (!stats_dump_full_strm_to_buffer(si, curr_strm)) {
SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
return 0;
}
/* stream dump complete */
LIST_DEL(&appctx->ctx.sess.bref.users);
@ -3190,6 +3205,7 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
*/
si_applet_cant_put(si);
LIST_ADDQ(&curr_strm->back_refs, &appctx->ctx.sess.bref.users);
SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
return 0;
}
@ -3211,9 +3227,11 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
appctx->ctx.sess.target = NULL;
appctx->ctx.sess.uid = 0;
SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
return 1;
}
SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
appctx->st2 = STAT_ST_FIN;
/* fall through */
@ -3226,8 +3244,10 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
static void cli_release_show_sess(struct appctx *appctx)
{
if (appctx->st2 == STAT_ST_LIST) {
SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
if (!LIST_ISEMPTY(&appctx->ctx.sess.bref.users))
LIST_DEL(&appctx->ctx.sess.bref.users);
SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
}
}