diff --git a/src/sink.c b/src/sink.c index 38a082ed4..889acd2f4 100644 --- a/src/sink.c +++ b/src/sink.c @@ -306,7 +306,7 @@ static void sink_forward_io_handler(struct appctx *appctx) struct ring *ring = sink->ctx.ring; struct buffer *buf = &ring->buf; uint64_t msg_len; - size_t len, cnt, ofs; + size_t len, cnt, ofs, last_ofs; int ret = 0; /* if stopping was requested, close immediately */ @@ -409,6 +409,7 @@ static void sink_forward_io_handler(struct appctx *appctx) HA_ATOMIC_INC(b_peek(buf, ofs)); ofs += ring->ofs; sft->ofs = ofs; + last_ofs = ring->ofs; } HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock); @@ -416,8 +417,16 @@ static void sink_forward_io_handler(struct appctx *appctx) /* let's be woken up once new data arrive */ HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); LIST_APPEND(&ring->waiters, &appctx->wait_entry); + ofs = ring->ofs; HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); - applet_have_no_more_data(appctx); + if (ofs != last_ofs) { + /* more data was added into the ring between the + * unlock and the lock, and the writer might not + * have seen us. We need to reschedule a read. + */ + applet_have_more_data(appctx); + } else + applet_have_no_more_data(appctx); } HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);