OPTIM: sink: try to merge "dropped" messages faster

When a reader doesn't read fast enough and causes drops, subsequent
threads try to produce a "dropped" message. But it takes time to
produce and emit this message, in part due to the use of chunk_printf()
that relies on vfprintf() which has to parse the printf format, and
during this time other threads may continue to increment the counter.
This is the reason why this is currently performed in a loop. When
reading what is received, it's common to see a large count followed
by one or two single-digit counts, indicating that we could possibly
have improved that by writing faster.

Let's improve the situation a little bit. First we're now using a
static message prefixed with enough space to write the digits, and a
call to ultoa_r() fills these digits from right to left so that we
don't have to process a format string nor perform a copy of the message.

Second, we now re-check the counter immediately after having prepared
the message so that we still get an opportunity for updating it. In
order to avoid too long loops, this is limited to 10 iterations.

Tests show that the number of single-digit "dropped" counters on output
now dropped roughly by 15-30%. Also, it was observed that with 8 threads,
there's almost never more than one retry.
This commit is contained in:
Willy Tarreau 2024-03-01 17:59:59 +01:00
parent 571232535a
commit eb7b2ec83a

View File

@ -212,16 +212,33 @@ send:
*/
int sink_announce_dropped(struct sink *sink, struct log_header hdr)
{
unsigned int dropped;
struct buffer msg;
static THREAD_LOCAL char msg_dropped1[] = "1 event dropped";
static THREAD_LOCAL char msg_dropped2[] = "0000000000 events dropped";
uint dropped, last_dropped;
struct ist msgvec[1];
char logbuf[64];
uint retries = 10;
while (unlikely((dropped = sink->ctx.dropped) > 0)) {
chunk_init(&msg, logbuf, sizeof(logbuf));
chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : "");
msgvec[0] = ist2(msg.area, msg.data);
last_dropped = 0;
while (1) {
while (unlikely((dropped = HA_ATOMIC_LOAD(&sink->ctx.dropped)) > last_dropped) && retries-- > 0) {
/* try to aggregate multiple messages if other threads arrive while
* we're producing the dropped message.
*/
uint msglen = sizeof(msg_dropped1);
const char *msg = msg_dropped1;
last_dropped = dropped;
if (dropped > 1) {
msg = ultoa_r(dropped, msg_dropped2, 11);
msg_dropped2[10] = ' ';
msglen = msg_dropped2 + sizeof(msg_dropped2) - msg;
}
msgvec[0] = ist2(msg, msglen);
}
if (!dropped)
break;
last_dropped = 0;
hdr.level = LOG_NOTICE; /* override level but keep original log header data */
if (__sink_write(sink, hdr, 0, msgvec, 1) <= 0)