REORG: dns/ring: split the ring between the generic one and the DNS one

A ring is used for the DNS code but slightly differently from the generic
one, which prevents some important changes from being made to the generic
code without breaking DNS. As the use cases differ, it's better to just
split them apart for now and have the DNS code use its own ring that we
rename dns_ring and let the generic code continue to live on its own.

The unused parts such as CLI registration were dropped, resizing and
allocation from a mapped area were dropped. dns_ring_detach_appctx() was
kept despite not being used, so as to stay consistent with the comments
that say it must be called, despite the DNS code explicitly mentioning
that it skips it for now (i.e. this may change in the future).

Hopefully after the generic rings are converted the DNS code can migrate
back to them, though this is really not necessary.
This commit is contained in:
Willy Tarreau 2024-02-19 14:38:59 +01:00
parent 8022ae326c
commit ad31e53287
7 changed files with 408 additions and 28 deletions

View File

@ -926,7 +926,7 @@ OBJS += src/mux_h2.o src/mux_fcgi.o src/mux_h1.o src/tcpcheck.o \
src/http_client.o src/listener.o src/dns.o src/vars.o src/debug.o \
src/tcp_rules.o src/sink.o src/h1_htx.o src/task.o src/mjson.o \
src/h2.o src/filters.o src/server_state.o src/payload.o \
src/fcgi-app.o src/map.o src/htx.o src/h1.o src/pool.o \
src/fcgi-app.o src/map.o src/htx.o src/h1.o src/pool.o src/dns_ring.o \
src/cfgparse-global.o src/trace.o src/tcp_sample.o src/http_ext.o \
src/flt_http_comp.o src/mux_pt.o src/flt_trace.o src/mqtt.o \
src/acl.o src/sock.o src/mworker.o src/tcp_act.o src/ring.o \

View File

@ -27,8 +27,8 @@
#include <haproxy/connection-t.h>
#include <haproxy/buf-t.h>
#include <haproxy/dgram-t.h>
#include <haproxy/dns_ring-t.h>
#include <haproxy/obj_type-t.h>
#include <haproxy/ring-t.h>
#include <haproxy/stats-t.h>
#include <haproxy/task-t.h>
#include <haproxy/thread.h>
@ -78,7 +78,7 @@ struct dns_additional_record {
*/
struct dns_stream_server {
struct server *srv;
struct ring *ring_req;
struct dns_ring *ring_req;
int max_slots;
int maxconn;
int idle_conns;
@ -97,7 +97,7 @@ struct dns_stream_server {
struct dns_dgram_server {
struct dgram_conn conn; /* transport layer */
struct ring *ring_req;
struct dns_ring *ring_req;
size_t ofs_req; // ring buffer reader offset
};
@ -121,7 +121,7 @@ struct dns_session {
struct task *task_exp;
struct eb_root query_ids; /* tree to quickly lookup/retrieve query ids currently in use */
size_t ofs; // ring buffer reader offset
struct ring ring;
struct dns_ring ring;
struct {
uint16_t len;
uint16_t offset;

View File

@ -0,0 +1,110 @@
/*
* include/haproxy/dns_ring-t.h
* This file provides definitions for ring buffers used for disposable data.
* This is a fork of ring-t.h for DNS usages.
*
* Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation, version 2.1
* exclusively.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef _HAPROXY_DNS_RING_T_H
#define _HAPROXY_DNS_RING_T_H
#include <haproxy/api-t.h>
#include <haproxy/buf-t.h>
#include <haproxy/thread.h>
/* The code below handles circular buffers with single-producer and multiple
* readers (up to 255). The buffer storage area must remain always allocated.
* It's made of series of payload blocks followed by a readers count (RC).
* There is always a readers count at the beginning of the buffer as well. Each
* payload block is composed of a varint-encoded size (VI) followed by the
* actual payload (PL).
*
* The readers count is encoded on a single byte. It indicates how many readers
* are still waiting at this position. The writer writes after the buffer's
* tail, which initially starts just past the first readers count. Then it
* knows by reading this count that it must wake up the readers to indicate
* data availability. When a reader reads the payload block, it increments the
* next readers count and decrements the current one. The area between the
* initial readers count and the next one is protected from overwriting for as
* long as the initial count is non-null. As such these readers count are
* effective barriers against data recycling.
*
* Only the writer is allowed to update the buffer's tail/head. This ensures
* that events can remain as long as possible so that late readers can get the
* maximum history available. It also helps dealing with multi-thread accesses
* using a simple RW lock during the buffer head's manipulation. The writer
* will have to delete some old records starting at the head until the new
* message can fit or a non-null readers count is encountered. If a message
* cannot fit due to insufficient room, the message is lost and the drop
* counted must be incremented.
*
* Like any buffer, this buffer naturally wraps at the end and continues at the
* beginning. The creation process consists in immediately adding a null
* readers count byte into the buffer. The write process consists in always
* writing a payload block followed by a new readers count. The delete process
* consists in removing a null readers count and payload block. As such, there
* is always at least one readers count byte in the buffer available at the
* head for new readers to attach to, and one before the tail, both of which
* may be the same when the buffer doesn't contain any event. It is thus safe
* for any reader to simply keep the absolute offset of the last visited
* position and to restart from there. The write will update the buffer's
* absolute offset when deleting entries. All this also has the benefit of
* allowing a buffer to be hot-resized without losing its contents.
*
* Thus we have this :
* - init of empty buffer:
* head-, ,-tail
* [ RC | xxxxxxxxxxxxxxxxxxxxxxxxxx ]
*
* - reader attached:
* head-, ,-tail
* [ RC | xxxxxxxxxxxxxxxxxxxxxxxxxx ]
* ^- +1
*
* - append of one event:
* appended
* head-, <----------> ,-tail
* [ RC | VI | PL | RC | xxxxxxxxxxx ]
*
* - reader advancing:
* head-, ,-tail
* [ RC | VI | PL | RC | xxxxxxxxxxx ]
* ^- -1 ^- +1
*
* - writer removing older message:
* head-, ,-tail
* [ xxxxxxxxxxxx | RC | xxxxxxxxxxx ]
* <---------->
* removed
*/
struct dns_ring {
struct buffer buf; // storage area
struct list waiters; // list of waiters, for now, CLI "show event"
__decl_thread(HA_RWLOCK_T lock);
int readers_count;
};
#endif /* _HAPROXY_DNS_RING_T_H */
/*
* Local variables:
* c-indent-level: 8
* c-basic-offset: 8
* End:
*/

View File

@ -0,0 +1,46 @@
/*
* include/haproxy/dns_ring.h
* Exported functions for ring buffers used for disposable data.
* This is a fork of ring.h for DNS usage.
*
* Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation, version 2.1
* exclusively.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef _HAPROXY_DNS_RING_H
#define _HAPROXY_DNS_RING_H
#include <stdlib.h>
#include <import/ist.h>
#include <haproxy/dns_ring-t.h>
struct appctx;
struct dns_ring *dns_ring_new(size_t size);
void dns_ring_init(struct dns_ring *ring, void* area, size_t size);
void dns_ring_free(struct dns_ring *ring);
ssize_t dns_ring_write(struct dns_ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg);
int dns_ring_attach(struct dns_ring *ring);
void dns_ring_detach_appctx(struct dns_ring *ring, struct appctx *appctx, size_t ofs);
#endif /* _HAPROXY_DNS_RING_H */
/*
* Local variables:
* c-indent-level: 8
* c-basic-offset: 8
* End:
*/

View File

@ -27,10 +27,10 @@
#include <haproxy/cli.h>
#include <haproxy/dgram.h>
#include <haproxy/dns.h>
#include <haproxy/dns_ring.h>
#include <haproxy/errors.h>
#include <haproxy/fd.h>
#include <haproxy/log.h>
#include <haproxy/ring.h>
#include <haproxy/sc_strm.h>
#include <haproxy/stconn.h>
#include <haproxy/stream.h>
@ -108,7 +108,7 @@ int dns_send_nameserver(struct dns_nameserver *ns, void *buf, size_t len)
struct ist myist;
myist = ist2(buf, len);
ret = ring_write(ns->dgram->ring_req, DNS_TCP_MSG_MAX_SIZE, NULL, 0, &myist, 1);
ret = dns_ring_write(ns->dgram->ring_req, DNS_TCP_MSG_MAX_SIZE, NULL, 0, &myist, 1);
if (!ret) {
ns->counters->snd_error++;
HA_SPIN_UNLOCK(DNS_LOCK, &dgram->lock);
@ -131,7 +131,7 @@ int dns_send_nameserver(struct dns_nameserver *ns, void *buf, size_t len)
struct ist myist;
myist = ist2(buf, len);
ret = ring_write(ns->stream->ring_req, DNS_TCP_MSG_MAX_SIZE, NULL, 0, &myist, 1);
ret = dns_ring_write(ns->stream->ring_req, DNS_TCP_MSG_MAX_SIZE, NULL, 0, &myist, 1);
if (!ret) {
ns->counters->snd_error++;
return -1;
@ -290,7 +290,7 @@ static void dns_resolve_send(struct dgram_conn *dgram)
{
int fd;
struct dns_nameserver *ns;
struct ring *ring;
struct dns_ring *ring;
struct buffer *buf;
uint64_t msg_len;
size_t len, cnt, ofs;
@ -407,21 +407,21 @@ int dns_dgram_init(struct dns_nameserver *ns, struct sockaddr_storage *sk)
ns->dgram = dgram;
dgram->ofs_req = ~0; /* init ring offset */
dgram->ring_req = ring_new(2*DNS_TCP_MSG_RING_MAX_SIZE);
dgram->ring_req = dns_ring_new(2*DNS_TCP_MSG_RING_MAX_SIZE);
if (!dgram->ring_req) {
ha_alert("memory allocation error initializing the ring for nameserver.\n");
goto out;
}
/* attach the task as reader */
if (!ring_attach(dgram->ring_req)) {
if (!dns_ring_attach(dgram->ring_req)) {
/* mark server attached to the ring */
ha_alert("nameserver sets too many watchers > 255 on ring. This is a bug and should not happen.\n");
goto out;
}
return 0;
out:
ring_free(dgram->ring_req);
dns_ring_free(dgram->ring_req);
free(dgram);
@ -436,7 +436,7 @@ static void dns_session_io_handler(struct appctx *appctx)
{
struct stconn *sc = appctx_sc(appctx);
struct dns_session *ds = appctx->svcctx;
struct ring *ring = &ds->ring;
struct dns_ring *ring = &ds->ring;
struct buffer *buf = &ring->buf;
uint64_t msg_len;
int available_room;
@ -844,7 +844,7 @@ static void dns_session_release(struct appctx *appctx)
if (!ds)
return;
/* We do not call ring_appctx_detach here
/* We do not call dns_ring_appctx_detach here
* because we want to keep readers counters
* to retry a conn with a different appctx.
*/
@ -1058,9 +1058,9 @@ struct dns_session *dns_session_new(struct dns_stream_server *dss)
if (!ds->tx_ring_area)
goto error;
ring_init(&ds->ring, ds->tx_ring_area, DNS_TCP_MSG_RING_MAX_SIZE);
dns_ring_init(&ds->ring, ds->tx_ring_area, DNS_TCP_MSG_RING_MAX_SIZE);
/* never fail because it is the first watcher attached to the ring */
DISGUISE(ring_attach(&ds->ring));
DISGUISE(dns_ring_attach(&ds->ring));
if ((ds->task_exp = task_new_here()) == NULL)
goto error;
@ -1095,7 +1095,7 @@ static struct task *dns_process_req(struct task *t, void *context, unsigned int
{
struct dns_nameserver *ns = (struct dns_nameserver *)context;
struct dns_stream_server *dss = ns->stream;
struct ring *ring = dss->ring_req;
struct dns_ring *ring = dss->ring_req;
struct buffer *buf = &ring->buf;
uint64_t msg_len;
size_t len, cnt, ofs;
@ -1151,7 +1151,7 @@ static struct task *dns_process_req(struct task *t, void *context, unsigned int
if (!LIST_ISEMPTY(&dss->free_sess)) {
ds = LIST_NEXT(&dss->free_sess, struct dns_session *, list);
if (ring_write(&ds->ring, DNS_TCP_MSG_MAX_SIZE, NULL, 0, &myist, 1) > 0) {
if (dns_ring_write(&ds->ring, DNS_TCP_MSG_MAX_SIZE, NULL, 0, &myist, 1) > 0) {
ds->nb_queries++;
if (ds->nb_queries >= DNS_STREAM_MAX_PIPELINED_REQ)
LIST_DEL_INIT(&ds->list);
@ -1171,8 +1171,8 @@ static struct task *dns_process_req(struct task *t, void *context, unsigned int
if (!LIST_ISEMPTY(&dss->idle_sess)) {
ds = LIST_NEXT(&dss->idle_sess, struct dns_session *, list);
/* ring is empty so this ring_write should never fail */
ring_write(&ds->ring, DNS_TCP_MSG_MAX_SIZE, NULL, 0, &myist, 1);
/* ring is empty so this dns_ring_write should never fail */
dns_ring_write(&ds->ring, DNS_TCP_MSG_MAX_SIZE, NULL, 0, &myist, 1);
ds->nb_queries++;
LIST_DEL_INIT(&ds->list);
@ -1196,8 +1196,8 @@ static struct task *dns_process_req(struct task *t, void *context, unsigned int
/* allocate a new session */
ads = dns_session_new(dss);
if (ads) {
/* ring is empty so this ring_write should never fail */
ring_write(&ads->ring, DNS_TCP_MSG_MAX_SIZE, NULL, 0, &myist, 1);
/* ring is empty so this dns_ring_write should never fail */
dns_ring_write(&ads->ring, DNS_TCP_MSG_MAX_SIZE, NULL, 0, &myist, 1);
ads->nb_queries++;
LIST_INSERT(&dss->free_sess, &ads->list);
}
@ -1248,7 +1248,7 @@ int dns_stream_init(struct dns_nameserver *ns, struct server *srv)
dss->maxconn = srv->maxconn;
dss->ofs_req = ~0; /* init ring offset */
dss->ring_req = ring_new(2*DNS_TCP_MSG_RING_MAX_SIZE);
dss->ring_req = dns_ring_new(2*DNS_TCP_MSG_RING_MAX_SIZE);
if (!dss->ring_req) {
ha_alert("memory allocation error initializing the ring for dns tcp server '%s'.\n", srv->id);
goto out;
@ -1264,7 +1264,7 @@ int dns_stream_init(struct dns_nameserver *ns, struct server *srv)
dss->task_req->context = ns;
/* attach the task as reader */
if (!ring_attach(dss->ring_req)) {
if (!dns_ring_attach(dss->ring_req)) {
/* mark server attached to the ring */
ha_alert("server '%s': too many watchers for ring. this should never happen.\n", srv->id);
goto out;
@ -1306,7 +1306,7 @@ int dns_stream_init(struct dns_nameserver *ns, struct server *srv)
if (dss && dss->task_req)
task_destroy(dss->task_req);
if (dss && dss->ring_req)
ring_free(dss->ring_req);
dns_ring_free(dss->ring_req);
free(dss);
return -1;

224
src/dns_ring.c Normal file
View File

@ -0,0 +1,224 @@
/*
* Ring buffer management
* This is a fork of ring.c for DNS usage.
*
* Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation, version 2.1
* exclusively.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <stdlib.h>
#include <haproxy/api.h>
#include <haproxy/applet.h>
#include <haproxy/buf.h>
#include <haproxy/cli.h>
#include <haproxy/dns_ring.h>
#include <haproxy/sc_strm.h>
#include <haproxy/stconn.h>
#include <haproxy/thread.h>
/* Initialize a pre-allocated ring with the buffer area
* of size */
void dns_ring_init(struct dns_ring *ring, void *area, size_t size)
{
HA_RWLOCK_INIT(&ring->lock);
LIST_INIT(&ring->waiters);
ring->readers_count = 0;
ring->buf = b_make(area, size, 0, 0);
/* write the initial RC byte */
b_putchr(&ring->buf, 0);
}
/* Creates and returns a ring buffer of size <size> bytes. Returns NULL on
* allocation failure.
*/
struct dns_ring *dns_ring_new(size_t size)
{
struct dns_ring *ring = NULL;
void *area = NULL;
if (size < 2)
goto fail;
ring = malloc(sizeof(*ring));
if (!ring)
goto fail;
area = malloc(size);
if (!area)
goto fail;
dns_ring_init(ring, area, size);
return ring;
fail:
free(area);
free(ring);
return NULL;
}
/* destroys and frees ring <ring> */
void dns_ring_free(struct dns_ring *ring)
{
if (!ring)
return;
free(ring->buf.area);
free(ring);
}
/* Tries to send <npfx> parts from <prefix> followed by <nmsg> parts from <msg>
* to ring <ring>. The message is sent atomically. It may be truncated to
* <maxlen> bytes if <maxlen> is non-null. There is no distinction between the
* two lists, it's just a convenience to help the caller prepend some prefixes
* when necessary. It takes the ring's write lock to make sure no other thread
* will touch the buffer during the update. Returns the number of bytes sent,
* or <=0 on failure.
*/
ssize_t dns_ring_write(struct dns_ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg)
{
struct buffer *buf = &ring->buf;
struct appctx *appctx;
size_t totlen = 0;
size_t lenlen;
uint64_t dellen;
int dellenlen;
ssize_t sent = 0;
int i;
/* we have to find some room to add our message (the buffer is
* never empty and at least contains the previous counter) and
* to update both the buffer contents and heads at the same
* time (it's doable using atomic ops but not worth the
* trouble, let's just lock). For this we first need to know
* the total message's length. We cannot measure it while
* copying due to the varint encoding of the length.
*/
for (i = 0; i < npfx; i++)
totlen += pfx[i].len;
for (i = 0; i < nmsg; i++)
totlen += msg[i].len;
if (totlen > maxlen)
totlen = maxlen;
lenlen = varint_bytes(totlen);
HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
if (lenlen + totlen + 1 + 1 > b_size(buf))
goto done_buf;
while (b_room(buf) < lenlen + totlen + 1) {
/* we need to delete the oldest message (from the end),
* and we have to stop if there's a reader stuck there.
* Unless there's corruption in the buffer it's guaranteed
* that we have enough data to find 1 counter byte, a
* varint-encoded length (1 byte min) and the message
* payload (0 bytes min).
*/
if (*b_head(buf))
goto done_buf;
dellenlen = b_peek_varint(buf, 1, &dellen);
if (!dellenlen)
goto done_buf;
BUG_ON(b_data(buf) < 1 + dellenlen + dellen);
b_del(buf, 1 + dellenlen + dellen);
}
/* OK now we do have room */
__b_put_varint(buf, totlen);
totlen = 0;
for (i = 0; i < npfx; i++) {
size_t len = pfx[i].len;
if (len + totlen > maxlen)
len = maxlen - totlen;
if (len)
__b_putblk(buf, pfx[i].ptr, len);
totlen += len;
}
for (i = 0; i < nmsg; i++) {
size_t len = msg[i].len;
if (len + totlen > maxlen)
len = maxlen - totlen;
if (len)
__b_putblk(buf, msg[i].ptr, len);
totlen += len;
}
*b_tail(buf) = 0; buf->data++; // new read counter
sent = lenlen + totlen + 1;
/* notify potential readers */
list_for_each_entry(appctx, &ring->waiters, wait_entry)
appctx_wakeup(appctx);
done_buf:
HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
return sent;
}
/* Tries to attach appctx <appctx> as a new reader on ring <ring>. This is
* meant to be used by low level appctx code such as CLI or ring forwarding.
* For higher level functions, please see the relevant parts in appctx or CLI.
* It returns non-zero on success or zero on failure if too many users are
* already attached. On success, the caller MUST call dns_ring_detach_appctx()
* to detach itself, even if it was never woken up.
*/
int dns_ring_attach(struct dns_ring *ring)
{
int users = ring->readers_count;
do {
if (users >= 255)
return 0;
} while (!_HA_ATOMIC_CAS(&ring->readers_count, &users, users + 1));
return 1;
}
/* detach an appctx from a ring. The appctx is expected to be waiting at offset
* <ofs> relative to the beginning of the storage, or ~0 if not waiting yet.
* Nothing is done if <ring> is NULL.
*/
void dns_ring_detach_appctx(struct dns_ring *ring, struct appctx *appctx, size_t ofs)
{
if (!ring)
return;
HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
if (ofs != ~0) {
/* reader was still attached */
if (ofs < b_head_ofs(&ring->buf))
ofs += b_size(&ring->buf) - b_head_ofs(&ring->buf);
else
ofs -= b_head_ofs(&ring->buf);
BUG_ON(ofs >= b_size(&ring->buf));
LIST_DEL_INIT(&appctx->wait_entry);
HA_ATOMIC_DEC(b_peek(&ring->buf, ofs));
}
HA_ATOMIC_DEC(&ring->readers_count);
HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
}
/*
* Local variables:
* c-indent-level: 8
* c-basic-offset: 8
* End:
*/

View File

@ -28,6 +28,7 @@
#include <haproxy/check.h>
#include <haproxy/cli.h>
#include <haproxy/dns.h>
#include <haproxy/dns_ring.h>
#include <haproxy/errors.h>
#include <haproxy/fd.h>
#include <haproxy/http_rules.h>
@ -36,7 +37,6 @@
#include <haproxy/protocol.h>
#include <haproxy/proxy.h>
#include <haproxy/resolvers.h>
#include <haproxy/ring.h>
#include <haproxy/sample.h>
#include <haproxy/sc_strm.h>
#include <haproxy/server.h>
@ -2571,11 +2571,11 @@ static void resolvers_destroy(struct resolvers *resolvers)
fd_delete(ns->dgram->conn.t.sock.fd);
close(ns->dgram->conn.t.sock.fd);
}
ring_free(ns->dgram->ring_req);
dns_ring_free(ns->dgram->ring_req);
free(ns->dgram);
}
if (ns->stream) {
ring_free(ns->stream->ring_req);
dns_ring_free(ns->stream->ring_req);
task_destroy(ns->stream->task_req);
task_destroy(ns->stream->task_rsp);
free(ns->stream);