diff --git a/src/sink.c b/src/sink.c index dcbe61e38..4d7cb9c0e 100644 --- a/src/sink.c +++ b/src/sink.c @@ -355,10 +355,7 @@ static void sink_forward_io_handler(struct appctx *appctx) struct sink_forward_target *sft = appctx->svcctx; struct sink *sink = sft->sink; struct ring *ring = sink->ctx.ring; - struct buffer *buf = &ring->buf; - uint64_t msg_len; - size_t len, cnt, ofs, last_ofs; - ssize_t copied; + size_t ofs, last_ofs; int ret = 0; if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR)))) { @@ -389,68 +386,13 @@ static void sink_forward_io_handler(struct appctx *appctx) LIST_DEL_INIT(&appctx->wait_entry); HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); - HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock); - - /* explanation for the initialization below: it would be better to do - * this in the parsing function but this would occasionally result in - * dropped events because we'd take a reference on the oldest message - * and keep it while being scheduled. Thus instead let's take it the - * first time we enter here so that we have a chance to pass many - * existing messages before grabbing a reference to a location. This - * value cannot be produced after initialization. - */ - if (unlikely(sft->ofs == ~0)) { - sft->ofs = b_peek_ofs(buf, 0); - HA_ATOMIC_INC(b_orig(buf) + sft->ofs); - } - - /* we were already there, adjust the offset to be relative to - * the buffer's head and remove us from the counter. - */ - ofs = sft->ofs - b_head_ofs(buf); - if (sft->ofs < b_head_ofs(buf)) - ofs += b_size(buf); - BUG_ON(ofs >= buf->size); - HA_ATOMIC_DEC(b_peek(buf, ofs)); - - /* in this loop, ofs always points to the counter byte that precedes - * the message so that we can take our reference there if we have to - * stop before the end (ret=0). - */ - ret = 1; - while (ofs + 1 < b_data(buf)) { - cnt = 1; - len = b_peek_varint(buf, ofs + cnt, &msg_len); - if (!len) - break; - cnt += len; - BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); - - copied = applet_append_line(appctx, buf, ofs + cnt, msg_len); - if (copied == -2) { - /* too large a message to ever fit, let's skip it */ - ofs += cnt + msg_len; - continue; - } - else if (copied == -1) { - /* output full */ - ret = 0; - break; - } - ofs += cnt + msg_len; - } - - HA_ATOMIC_INC(b_peek(buf, ofs)); - last_ofs = b_tail_ofs(buf); - sft->ofs = b_peek_ofs(buf, ofs); - - HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock); + ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0, applet_append_line); if (ret) { /* let's be woken up once new data arrive */ HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); LIST_APPEND(&ring->waiters, &appctx->wait_entry); - ofs = b_tail_ofs(buf); + ofs = b_tail_ofs(&ring->buf); HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); if (ofs != last_ofs) { /* more data was added into the ring between the