MEDIUM: log/sink: make the log forwarder code use ring_dispatch_messages()

This code becomes even simpler and almost does not need any knowledge
of the structure of the ring anymore. It even highlighted that an old
race had not been fixed due to code duplication, but that's now done.
This commit is contained in:
Willy Tarreau 2024-02-27 17:05:11 +01:00
parent c262442b1a
commit 8f3edf2ac6

View File

@ -425,11 +425,8 @@ static void sink_forward_oc_io_handler(struct appctx *appctx)
struct sink_forward_target *sft = appctx->svcctx;
struct sink *sink = sft->sink;
struct ring *ring = sink->ctx.ring;
struct buffer *buf = &ring->buf;
uint64_t msg_len;
size_t len, cnt, ofs, last_ofs;
size_t ofs, last_ofs;
int ret = 0;
ssize_t copied;
if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR|SE_FL_SHR|SE_FL_SHW))))
goto out;
@ -458,67 +455,12 @@ static void sink_forward_oc_io_handler(struct appctx *appctx)
LIST_DEL_INIT(&appctx->wait_entry);
HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
/* explanation for the initialization below: it would be better to do
* this in the parsing function but this would occasionally result in
* dropped events because we'd take a reference on the oldest message
* and keep it while being scheduled. Thus instead let's take it the
* first time we enter here so that we have a chance to pass many
* existing messages before grabbing a reference to a location. This
* value cannot be produced after initialization.
*/
if (unlikely(sft->ofs == ~0)) {
sft->ofs = b_peek_ofs(buf, 0);
HA_ATOMIC_INC(b_orig(buf) + sft->ofs);
}
/* we were already there, adjust the offset to be relative to
* the buffer's head and remove us from the counter.
*/
ofs = sft->ofs - b_head_ofs(buf);
if (sft->ofs < b_head_ofs(buf))
ofs += b_size(buf);
BUG_ON(ofs >= buf->size);
HA_ATOMIC_DEC(b_peek(buf, ofs));
/* in this loop, ofs always points to the counter byte that precedes
* the message so that we can take our reference there if we have to
* stop before the end (ret=0).
*/
ret = 1;
while (ofs + 1 < b_data(buf)) {
cnt = 1;
len = b_peek_varint(buf, ofs + cnt, &msg_len);
if (!len)
break;
cnt += len;
BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
copied = syslog_applet_append_event(appctx, buf, ofs + cnt, msg_len);
if (copied == -2) {
/* too large a message to ever fit, let's skip it */
ofs += cnt + msg_len;
continue;
}
else if (copied == -1) {
/* output full */
ret = 0;
break;
}
ofs += cnt + msg_len;
}
HA_ATOMIC_INC(b_peek(buf, ofs));
last_ofs = b_tail_ofs(buf);
sft->ofs = b_peek_ofs(buf, ofs);
HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock);
ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0, syslog_applet_append_event);
if (ret) {
/* let's be woken up once new data arrive */
HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
LIST_APPEND(&ring->waiters, &appctx->wait_entry);
ofs = b_tail_ofs(buf);
ofs = b_tail_ofs(&ring->buf);
HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
if (ofs != last_ofs) {
/* more data was added into the ring between the