From 6b35e9bfbf0f37c8a584c7aabb475afb81b7008c Mon Sep 17 00:00:00 2001 From: Emeric Brun Date: Fri, 30 Jun 2017 16:23:45 +0200 Subject: [PATCH] MEDIUM: threads/stream: Make streams list thread safe Adds a global lock to protect the full streams list used to dump sessions on stats socket. --- include/common/hathreads.h | 4 ++-- src/stream.c | 24 ++++++++++++++++++++++-- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/include/common/hathreads.h b/include/common/hathreads.h index 1717cc9b7..81c2aad4a 100644 --- a/include/common/hathreads.h +++ b/include/common/hathreads.h @@ -157,6 +157,7 @@ enum lock_label { APPLETS_LOCK, PEER_LOCK, BUF_WQ_LOCK, + STRMS_LOCK, LOCK_LABELS }; struct lock_stat { @@ -243,7 +244,7 @@ static inline void show_lock_stats() "TASK_RQ", "TASK_WQ", "POOL", "LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER", "UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS", - "APPLETS", "PEER", "BUF_WQ" }; + "APPLETS", "PEER", "BUF_WQ", "STREAMS" }; int lbl; for (lbl = 0; lbl < LOCK_LABELS; lbl++) { @@ -524,7 +525,6 @@ static inline void __spin_unlock(enum lock_label lbl, struct ha_spinlock *l, l->info.last_location.line = line; __RWLOCK_WRUNLOCK(&l->lock); - HA_ATOMIC_ADD(&lock_stats[lbl].num_write_unlocked, 1); } diff --git a/src/stream.c b/src/stream.c index 51d235454..889908f4e 100644 --- a/src/stream.c +++ b/src/stream.c @@ -63,6 +63,9 @@ struct pool_head *pool2_stream; struct list streams; +#ifdef USE_THREAD +HA_SPINLOCK_T streams_lock; +#endif /* List of all use-service keywords. */ static struct list service_keywords = LIST_HEAD_INIT(service_keywords); @@ -154,7 +157,6 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) s->uniq_id = global.req_count++; /* OK, we're keeping the stream, so let's properly initialize the stream */ - LIST_ADDQ(&streams, &s->list); LIST_INIT(&s->back_refs); LIST_INIT(&s->buffer_wait.list); @@ -251,6 +253,10 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) s->txn = NULL; s->hlua = NULL; + SPIN_LOCK(STRMS_LOCK, &streams_lock); + LIST_ADDQ(&streams, &s->list); + SPIN_UNLOCK(STRMS_LOCK, &streams_lock); + if (flt_stream_init(s) < 0 || flt_stream_start(s) < 0) goto out_fail_accept; @@ -369,6 +375,7 @@ static void stream_free(struct stream *s) stream_store_counters(s); + SPIN_LOCK(STRMS_LOCK, &streams_lock); list_for_each_entry_safe(bref, back, &s->back_refs, users) { /* we have to unlink all watchers. We must not relink them if * this stream was the last one in the list. @@ -380,6 +387,8 @@ static void stream_free(struct stream *s) bref->ref = s->list.n; } LIST_DEL(&s->list); + SPIN_UNLOCK(STRMS_LOCK, &streams_lock); + si_release_endpoint(&s->si[1]); si_release_endpoint(&s->si[0]); @@ -462,6 +471,7 @@ void stream_release_buffers(struct stream *s) int init_stream() { LIST_INIT(&streams); + SPIN_INIT(&streams_lock); pool2_stream = create_pool("stream", sizeof(struct stream), MEM_F_SHARED); return pool2_stream != NULL; } @@ -3039,11 +3049,14 @@ static int cli_io_handler_dump_sess(struct appctx *appctx) * pointer points back to the head of the streams list. */ LIST_INIT(&appctx->ctx.sess.bref.users); + SPIN_LOCK(STRMS_LOCK, &streams_lock); appctx->ctx.sess.bref.ref = streams.n; + SPIN_UNLOCK(STRMS_LOCK, &streams_lock); appctx->st2 = STAT_ST_LIST; /* fall through */ case STAT_ST_LIST: + SPIN_LOCK(STRMS_LOCK, &streams_lock); /* first, let's detach the back-ref from a possible previous stream */ if (!LIST_ISEMPTY(&appctx->ctx.sess.bref.users)) { LIST_DEL(&appctx->ctx.sess.bref.users); @@ -3063,8 +3076,10 @@ static int cli_io_handler_dump_sess(struct appctx *appctx) LIST_ADDQ(&curr_strm->back_refs, &appctx->ctx.sess.bref.users); /* call the proper dump() function and return if we're missing space */ - if (!stats_dump_full_strm_to_buffer(si, curr_strm)) + if (!stats_dump_full_strm_to_buffer(si, curr_strm)) { + SPIN_UNLOCK(STRMS_LOCK, &streams_lock); return 0; + } /* stream dump complete */ LIST_DEL(&appctx->ctx.sess.bref.users); @@ -3190,6 +3205,7 @@ static int cli_io_handler_dump_sess(struct appctx *appctx) */ si_applet_cant_put(si); LIST_ADDQ(&curr_strm->back_refs, &appctx->ctx.sess.bref.users); + SPIN_UNLOCK(STRMS_LOCK, &streams_lock); return 0; } @@ -3211,9 +3227,11 @@ static int cli_io_handler_dump_sess(struct appctx *appctx) appctx->ctx.sess.target = NULL; appctx->ctx.sess.uid = 0; + SPIN_UNLOCK(STRMS_LOCK, &streams_lock); return 1; } + SPIN_UNLOCK(STRMS_LOCK, &streams_lock); appctx->st2 = STAT_ST_FIN; /* fall through */ @@ -3226,8 +3244,10 @@ static int cli_io_handler_dump_sess(struct appctx *appctx) static void cli_release_show_sess(struct appctx *appctx) { if (appctx->st2 == STAT_ST_LIST) { + SPIN_UNLOCK(STRMS_LOCK, &streams_lock); if (!LIST_ISEMPTY(&appctx->ctx.sess.bref.users)) LIST_DEL(&appctx->ctx.sess.bref.users); + SPIN_UNLOCK(STRMS_LOCK, &streams_lock); } }