MEDIUM: mux-spop: Introduce the SPOP multiplexer

It is no possible yet to use it. Idles connections and pipelining mode are
not supported for now. But it should be possible to open a SPOP connection,
perform the HELLO handshake, send a NOTIFY frame based on data produced by
the client side and receive the corresponding ACK frame to transfer its
content to the client side.

The related issue is #2502.
This commit is contained in:
Christopher Faulet 2024-07-04 11:37:23 +02:00
parent d0d23a7a66
commit 7e1bb7283b
7 changed files with 3570 additions and 2 deletions

View File

@ -983,7 +983,7 @@ OBJS += src/mux_h2.o src/mux_h1.o src/mux_fcgi.o src/stream.o \
src/hpack-huff.o src/freq_ctr.o src/dict.o src/wdt.o \
src/pipe.o src/init.o src/http_acl.o src/hpack-enc.o \
src/ebtree.o src/dgram.o src/hash.o src/version.o \
src/limits.o
src/limits.o src/mux_spop.o
ifneq ($(TRACE),)
OBJS += src/calltrace.o

View File

@ -12,6 +12,7 @@
#include <haproxy/mux_fcgi-t.h>
#include <haproxy/mux_h2-t.h>
#include <haproxy/mux_h1-t.h>
#include <haproxy/mux_spop-t.h>
#include <haproxy/peers-t.h>
#include <haproxy/quic_conn-t.h>
#include <haproxy/stconn-t.h>
@ -41,11 +42,13 @@
#define SHOW_AS_PEERS 0x00080000
#define SHOW_AS_PEER 0x00100000
#define SHOW_AS_QC 0x00200000
#define SHOW_AS_SPOPC 0x00400000
#define SHOW_AS_SPOPS 0x00800000
// command line names, must be in exact same order as the SHOW_AS_* flags above
// so that show_as_words[i] matches flag 1U<<i.
const char *show_as_words[] = { "ana", "chn", "conn", "sc", "stet", "strm", "task", "txn", "sd", "hsl", "htx", "hmsg", "fd", "h2c", "h2s", "h1c", "h1s", "fconn", "fstrm",
"peers", "peer", "qc"};
"peers", "peer", "qc", "spopc", "spops"};
/* will be sufficient for even largest flag names */
static char buf[4096];
@ -161,6 +164,8 @@ int main(int argc, char **argv)
if (show_as & SHOW_AS_PEERS) printf("peers->flags = %s\n",(peers_show_flags (buf, bsz, " | ", flags), buf));
if (show_as & SHOW_AS_PEER) printf("peer->flags = %s\n", (peer_show_flags (buf, bsz, " | ", flags), buf));
if (show_as & SHOW_AS_QC) printf("qc->flags = %s\n", (qc_show_flags (buf, bsz, " | ", flags), buf));
if (show_as & SHOW_AS_SPOPC) printf("spopc->flags = %s\n",(spop_conn_show_flags(buf, bsz, " | ", flags), buf));
if (show_as & SHOW_AS_SPOPS) printf("spops->flags = %s\n",(spop_strm_show_flags(buf, bsz, " | ", flags), buf));
}
return 0;
}

View File

@ -0,0 +1,147 @@
/* SPDX-License-Identifier: LGPL-2.0-or-later */
#ifndef _HAPROXY_MUX_SPOP_T_H
#define _HAPROXY_MUX_SPOP_T_H
#include <haproxy/api-t.h>
#include <haproxy/show_flags-t.h>
/**** SPOP connection flags (32 bit), in spop_conn->flags ****/
#define SPOP_CF_NONE 0x00000000
/* Flags indicating why writing to the mux is blocked */
#define SPOP_CF_MUX_MALLOC 0x00000001 /* mux is blocked on lack connection's mux buffer */
#define SPOP_CF_MUX_MFULL 0x00000002 /* mux is blocked on connection's mux buffer full */
#define SPOP_CF_MUX_BLOCK_ANY 0x00000003 /* mux is blocked on connection's mux buffer full */
#define SPOP_CF_WAIT_INLIST 0x00000010 // there is at least one stream blocked by another stream in send_list
#define SPOP_CF_DEM_DALLOC 0x00000020 /* demux blocked on lack of connection's demux buffer */
#define SPOP_CF_DEM_DFULL 0x00000040 /* demux blocked on connection's demux buffer full */
#define SPOP_CF_DEM_MROOM 0x00000080 /* demux blocked on lack of room in mux buffer */
#define SPOP_CF_DEM_SALLOC 0x00000100 /* demux blocked on lack of stream's rx buffer */
#define SPOP_CF_DEM_SFULL 0x00000200 /* demux blocked on stream request buffer full */
#define SPOP_CF_DEM_TOOMANY 0x00000400 /* demux blocked waiting for some stream connectors to leave */
#define SPOP_CF_DEM_BLOCK_ANY 0x000007E0 /* aggregate of the demux flags above except DALLOC/DFULL */
/* Other flags */
#define SPOP_CF_DISCO_SENT 0x00001000 /* a frame DISCONNECT was successfully sent */
#define SPOP_CF_DISCO_FAILED 0x00002000 /* failed to disconnect */
#define SPOP_CF_WAIT_FOR_HS 0x00004000 /* We did check that at least a stream was waiting for handshake */
#define SPOP_CF_DEM_SHORT_READ 0x00008000 // demux blocked on incomplete frame
/* unused 0x00010000 */
#define SPOP_CF_RCVD_SHUT 0x00020000 // a recv() attempt already failed on a shutdown
#define SPOP_CF_END_REACHED 0x00040000 // pending data too short with RCVD_SHUT present
#define SPOP_CF_ERR_PENDING 0x00080000 /* A write error was detected (block sends but not reads) */
#define SPOP_CF_ERROR 0x00100000 /* A read error was detected (handled has an abort) */
/* This function is used to report flags in debugging tools. Please reflect
* below any single-bit flag addition above in the same order via the
* __APPEND_FLAG macro. The new end of the buffer is returned.
*/
static forceinline char *spop_conn_show_flags(char *buf, size_t len, const char *delim, uint flg)
{
#define _(f, ...) __APPEND_FLAG(buf, len, delim, flg, f, #f, __VA_ARGS__)
/* prologue */
_(0);
/* flags */
_(SPOP_CF_MUX_MALLOC, _(SPOP_CF_MUX_MFULL, _(SPOP_CF_WAIT_INLIST,
_(SPOP_CF_DEM_DALLOC, _(SPOP_CF_DEM_DFULL, _(SPOP_CF_DEM_MROOM,
_(SPOP_CF_DEM_SALLOC, _(SPOP_CF_DEM_SFULL, _(SPOP_CF_DEM_TOOMANY,
_(SPOP_CF_DISCO_SENT, _(SPOP_CF_DISCO_FAILED, _(SPOP_CF_WAIT_FOR_HS,
_(SPOP_CF_DEM_SHORT_READ, _(SPOP_CF_RCVD_SHUT, _(SPOP_CF_END_REACHED,
_(SPOP_CF_ERR_PENDING, _(SPOP_CF_ERROR)))))))))))))))));
/* epilogue */
_(~0U);
return buf;
#undef _
}
/**** SPOP stream flags (32 bit), in spop_strm->flags ****/
#define SPOP_SF_NONE 0x00000000
// #define SPOP_SF_ACK_RCVD 0x00000001 /* ACK freme received */
//#define SPOP_SF_ES_SENT 0x00000002 /* end-of-stream sent */
//#define SPOP_SF_EP_SENT 0x00000004 /* end-of-param sent */
//#define SPOP_SF_DISCON_SENT 0x00000008 /* disconnect sent */
/* Stream flags indicating the reason the stream is blocked */
#define SPOP_SF_BLK_MBUSY 0x00000010 /* blocked waiting for mux access (transient) */
#define SPOP_SF_BLK_MROOM 0x00000020 /* blocked waiting for room in the mux */
#define SPOP_SF_BLK_ANY 0x00000030 /* any of the reasons above */
//#define SPOP_SF_BEGIN_SENT 0x00000100 /* a BEGIN_REQUEST record was sent for this stream */
//#define SPOP_SF_OUTGOING_DATA 0x00000200 /* set whenever we've seen outgoing data */
#define SPOP_SF_NOTIFIED 0x00000400 /* a paused stream was notified to try to send again */
/* This function is used to report flags in debugging tools. Please reflect
* below any single-bit flag addition above in the same order via the
* __APPEND_FLAG macro. The new end of the buffer is returned.
*/
static forceinline char *spop_strm_show_flags(char *buf, size_t len, const char *delim, uint flg)
{
#define _(f, ...) __APPEND_FLAG(buf, len, delim, flg, f, #f, __VA_ARGS__)
/* prologue */
_(0);
/* flags */
_(SPOP_SF_BLK_MBUSY, _(SPOP_SF_BLK_MROOM, _(SPOP_SF_NOTIFIED)));
/* epilogue */
_(~0U);
return buf;
#undef _
}
/* SPOP connection state (spop_conn->state) */
enum spop_conn_st {
SPOP_CS_HA_HELLO = 0, /* init done, waiting for sending HELLO frame */
SPOP_CS_AGENT_HELLO, /* HELLO frame sent, waiting for agent HELLO frame to define the connection settings */
SPOP_CS_FRAME_H, /* HELLO handshake finished, waiting for a frame header */
SPOP_CS_FRAME_P, /* Frame header received, waiting for a frame data */
SPOP_CS_ERROR, /* send DISCONNECT frame to be able ti close the conneciton */
SPOP_CS_CLOSING, /* DISCONNECT frame sent, waiting for the agent DISCONNECT frame before closing */
SPOP_CS_CLOSED, /* Agent DISCONNECT frame received and close the connection ASAP */
SPOP_CS_ENTRIES
} __attribute__((packed));
/* returns a spop_conn state as an abbreviated 3-letter string, or "???" if unknown */
static inline const char *spop_conn_st_to_str(enum spop_conn_st st)
{
switch (st) {
case SPOP_CS_HA_HELLO : return "HHL";
case SPOP_CS_AGENT_HELLO: return "AHL";
case SPOP_CS_FRAME_H : return "FRH";
case SPOP_CS_FRAME_P : return "FRP";
case SPOP_CS_ERROR : return "ERR";
case SPOP_CS_CLOSING : return "CLI";
case SPOP_CS_CLOSED : return "CLO";
default : return "???";
}
}
/* SPOP stream state, in spop_strm->state */
enum spop_strm_st {
SPOP_SS_IDLE = 0,
SPOP_SS_OPEN,
SPOP_SS_HREM, // half-closed(remote)
SPOP_SS_HLOC, // half-closed(local)
SPOP_SS_ERROR,
SPOP_SS_CLOSED,
SPOP_SS_ENTRIES
} __attribute__((packed));
/* returns a spop_strm state as an abbreviated 3-letter string, or "???" if unknown */
static inline const char *spop_strm_st_to_str(enum spop_strm_st st)
{
switch (st) {
case SPOP_SS_IDLE : return "IDL";
case SPOP_SS_OPEN : return "OPN";
case SPOP_SS_HREM : return "RCL";
case SPOP_SS_HLOC : return "HCL";
case SPOP_SS_ERROR : return "ERR";
case SPOP_SS_CLOSED : return "CLO";
default : return "???";
}
}
#endif /* _HAPROXY_MUX_SPOP_T_H */

View File

@ -27,6 +27,9 @@
#include <haproxy/sample-t.h>
#include <haproxy/spoe-t.h>
struct appctx;
struct spoe_agent *spoe_appctx_agent(struct appctx *appctx);
/* Encode a buffer. Its length <len> is encoded as a varint, followed by a copy
* of <str>. It must have enough space in <*buf> to encode the buffer, else an

View File

@ -280,6 +280,7 @@ struct stconn;
#define SE_ABRT_SRC_MUX_H2 0x03 /* Code set by the H2 mux */
#define SE_ABRT_SRC_MUX_QUIC 0x04 /* Code set by the QUIC/H3 mux */
#define SE_ABRT_SRC_MUX_FCGI 0x05 /* Code set by the FCGI mux */
#define SE_ABRT_SRC_MUX_SPOP 0x06 /* Code set by the SPOP mux */
struct se_abort_info {
uint32_t info;

View File

@ -1099,6 +1099,20 @@ spoe_recv_frame(struct appctx *appctx, char *buf, size_t framesz)
/********************************************************************
* Functions that manage the SPOE applet
********************************************************************/
struct spoe_agent *spoe_appctx_agent(struct appctx *appctx)
{
struct spoe_appctx *spoe_appctx;
if (!appctx)
return NULL;
spoe_appctx = SPOE_APPCTX(appctx);
if (!spoe_appctx)
return NULL;
return spoe_appctx->agent;
}
static int
spoe_wakeup_appctx(struct appctx *appctx)
{

3398
src/mux_spop.c Normal file

File diff suppressed because it is too large Load Diff