MINOR: http_fetch: add "req.ungrpc" sample fetch for gRPC.

This patch implements "req.ungrpc" sample fetch method to decode and
parse a gRPC request. It takes only one argument: a protocol buffers
field number to identify the protocol buffers message number to be looked up.
This argument is a sort of path in dotted notation to the terminal field number
to be retrieved.

  ex:
    req.ungrpc(1.2.3.4)

This sample fetch catch the data in raw mode, without interpreting them.
Some protocol buffers specific converters may be used to convert the data
to the correct type.
This commit is contained in:
Frdric Lcaille 2019-02-25 15:30:36 +01:00 committed by Willy Tarreau
parent 3a463c92cf
commit 1fceee8316
2 changed files with 426 additions and 0 deletions

View File

@ -0,0 +1,185 @@
/*
* include/proto/protocol_buffers.h
* This file contains functions and macros declarations for protocol buffers decoding.
*
* Copyright 2012 Willy Tarreau <w@1wt.eu>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation, version 2.1
* exclusively.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef _PROTO_PROTOCOL_BUFFERS_H
#define _PROTO_PROTOCOL_BUFFERS_H
#include <types/protocol_buffers.h>
#define PBUF_TYPE_VARINT 0
#define PBUF_TYPE_64BIT 1
#define PBUF_TYPE_LENGTH_DELIMITED 2
#define PBUF_TYPE_START_GROUP 3
#define PBUF_TYPE_STOP_GROUP 4
#define PBUF_TYPE_32BIT 5
#define PBUF_VARINT_DONT_STOP_BIT 7
#define PBUF_VARINT_DONT_STOP_BITMASK (1 << PBUF_VARINT_DONT_STOP_BIT)
#define PBUF_VARINT_DATA_BITMASK ~PBUF_VARINT_DONT_STOP_BITMASK
/*
* Decode a protocol buffers varint located in a buffer at <pos> address with
* <len> as length. The decoded value is stored at <val>.
* Returns 1 if succeeded, 0 if not.
*/
static inline int
protobuf_varint(uint64_t *val, unsigned char *pos, size_t len)
{
unsigned int shift;
*val = 0;
shift = 0;
while (len > 0) {
int stop = !(*pos & PBUF_VARINT_DONT_STOP_BITMASK);
*val |= ((uint64_t)(*pos & PBUF_VARINT_DATA_BITMASK)) << shift;
++pos;
--len;
if (stop)
break;
else if (!len)
return 0;
shift += 7;
/* The maximum length in bytes of a 64-bit encoded value is 10. */
if (shift > 70)
return 0;
}
return 1;
}
/*
* Decode a protocol buffers varint located in a buffer at <pos> offset address with
* <len> as length address. Update <pos> and <len> consequently. Decrease <*len>
* by the number of decoded bytes. The decoded value is stored at <val>.
* Returns 1 if succeeded, 0 if not.
*/
static inline int
protobuf_decode_varint(uint64_t *val, unsigned char **pos, size_t *len)
{
unsigned int shift;
*val = 0;
shift = 0;
while (*len > 0) {
int stop = !(**pos & PBUF_VARINT_DONT_STOP_BITMASK);
*val |= ((uint64_t)**pos & PBUF_VARINT_DATA_BITMASK) << shift;
++*pos;
--*len;
if (stop)
break;
else if (!*len)
return 0;
shift += 7;
/* The maximum length in bytes of a 64-bit encoded value is 10. */
if (shift > 70)
return 0;
}
return 1;
}
/*
* Skip a protocol buffer varint found at <pos> as position address with <len>
* as available length address. Update <*pos> to make it point to the next
* available byte. Decrease <*len> by the number of skipped bytes.
* Returns 1 if succeeded, 0 if not.
*/
static inline int
protobuf_skip_varint(unsigned char **pos, size_t *len)
{
unsigned int shift;
shift = 0;
while (*len > 0) {
int stop = !(**pos & PBUF_VARINT_DONT_STOP_BITMASK);
++*pos;
--*len;
if (stop)
break;
else if (!*len)
return 0;
shift += 7;
/* The maximum length in bytes of a 64-bit encoded value is 10. */
if (shift > 70)
return 0;
}
return 1;
}
/*
* If succeeded, return the length of a prococol buffers varint found at <pos> as
* position address, with <len> as address of the available bytes at <*pos>.
* Update <*pos> to make it point to the next available byte. Decrease <*len>
* by the number of bytes used to encode this varint.
* Return -1 if failed.
*/
static inline int
protobuf_varint_getlen(unsigned char **pos, size_t *len)
{
unsigned char *spos;
unsigned int shift;
shift = 0;
spos = *pos;
while (*len > 0) {
int stop = !(**pos & PBUF_VARINT_DONT_STOP_BITMASK);
++*pos;
--*len;
if (stop)
break;
else if (!*len)
return -1;
shift += 7;
/* The maximum length in bytes of a 64-bit encoded value is 10. */
if (shift > 70)
return -1;
}
return *pos - spos;
}
#endif /* _PROTO_PROTOCOL_BUFFERS_H */
/*
* Local variables:
* c-indent-level: 8
* c-basic-offset: 8
* End:
*/

View File

@ -39,6 +39,7 @@
#include <proto/log.h>
#include <proto/obj_type.h>
#include <proto/proto_http.h>
#include <proto/protocol_buffers.h>
#include <proto/sample.h>
#include <proto/stream.h>
@ -1516,6 +1517,245 @@ static int smp_fetch_hdr_val(const struct arg *args, struct sample *smp, const c
return ret;
}
static inline struct buffer *
smp_fetch_body_buf(const struct arg *args, struct sample *smp)
{
struct buffer *buf;
if (IS_HTX_SMP(smp) || (smp->px->mode == PR_MODE_TCP)) {
/* HTX version */
struct htx *htx = smp_prefetch_htx(smp, args);
int32_t pos;
if (!htx)
return NULL;
buf = get_trash_chunk();
for (pos = htx_get_head(htx); pos != -1; pos = htx_get_next(htx, pos)) {
struct htx_blk *blk = htx_get_blk(htx, pos);
enum htx_blk_type type = htx_get_blk_type(blk);
if (type == HTX_BLK_EOM || type == HTX_BLK_EOD)
break;
if (type == HTX_BLK_DATA) {
if (!htx_data_to_h1(htx_get_blk_value(htx, blk), buf, 0))
return NULL;
}
}
}
else {
/* LEGACY version */
struct http_msg *msg;
unsigned long len;
unsigned long block1;
char *body;
if (smp_prefetch_http(smp->px, smp->strm, smp->opt, args, smp, 1) <= 0)
return NULL;
if ((smp->opt & SMP_OPT_DIR) == SMP_OPT_DIR_REQ)
msg = &smp->strm->txn->req;
else
msg = &smp->strm->txn->rsp;
len = http_body_bytes(msg);
body = c_ptr(msg->chn, -http_data_rewind(msg));
block1 = len;
if (block1 > b_wrap(&msg->chn->buf) - body)
block1 = b_wrap(&msg->chn->buf) - body;
buf = get_trash_chunk();
if (block1 == len) {
/* buffer is not wrapped (or empty) */
memcpy(buf->area, body, len);
}
else {
/* buffer is wrapped, we need to defragment it */
memcpy(buf->area, body, block1);
memcpy(buf->area + block1, b_orig(&msg->chn->buf),
len - block1);
}
buf->data = len;
}
return buf;
}
#define GRPC_MSG_COMPRESS_FLAG_SZ 1 /* 1 byte */
#define GRPC_MSG_LENGTH_SZ 4 /* 4 bytes */
#define GRPC_MSG_HEADER_SZ (GRPC_MSG_COMPRESS_FLAG_SZ + GRPC_MSG_LENGTH_SZ)
/*
* Fetch a gRPC field value. Takes a mandatory argument: the field identifier
* (dotted notation) internally represented as an array of unsigned integers
* and its size.
* Return 1 if the field was found, 0 if not.
*/
static int smp_fetch_req_ungrpc(const struct arg *args, struct sample *smp, const char *kw, void *private)
{
struct buffer *body;
unsigned char *pos;
size_t grpc_left;
unsigned int *fid;
size_t fid_sz;
if (!smp->strm)
return 0;
fid = args[0].data.fid.ids;
fid_sz = args[0].data.fid.sz;
body = smp_fetch_body_buf(args, smp);
if (!body)
return 0;
pos = (unsigned char *)body->area;
/* Remaining bytes in the body to be parsed. */
grpc_left = body->data;
while (grpc_left > GRPC_MSG_COMPRESS_FLAG_SZ + GRPC_MSG_LENGTH_SZ) {
int next_field, found;
size_t grpc_msg_len, left;
unsigned int wire_type, field_number;
uint64_t key, elen;
grpc_msg_len = left = ntohl(*(uint32_t *)(pos + GRPC_MSG_COMPRESS_FLAG_SZ));
pos += GRPC_MSG_HEADER_SZ;
grpc_left -= GRPC_MSG_HEADER_SZ;
if (grpc_left < left)
return 0;
found = 1;
/* Length of the length-delimited messages if any. */
elen = 0;
/* Message decoding: there may be serveral key+value protobuf pairs by
* gRPC message.
*/
next_field = 0;
while (next_field < fid_sz) {
uint64_t sleft;
if ((ssize_t)left <= 0)
return 0;
/* Remaining bytes saving. */
sleft = left;
/* Key decoding */
if (!protobuf_decode_varint(&key, &pos, &left))
return 0;
wire_type = key & 0x7;
field_number = key >> 3;
found = field_number == fid[next_field];
if (found && field_number != fid[next_field])
found = 0;
switch (wire_type) {
case PBUF_TYPE_VARINT:
{
if (!found) {
protobuf_skip_varint(&pos, &left);
} else if (next_field == fid_sz - 1) {
int varint_len;
unsigned char *spos = pos;
varint_len = protobuf_varint_getlen(&pos, &left);
if (varint_len == -1)
return 0;
smp->data.type = SMP_T_BIN;
smp->data.u.str.area = (char *)spos;
smp->data.u.str.data = varint_len;
smp->flags = SMP_F_VOL_TEST;
return 1;
}
break;
}
case PBUF_TYPE_64BIT:
{
if (!found) {
pos += sizeof(uint64_t);
left -= sizeof(uint64_t);
} else if (next_field == fid_sz - 1) {
smp->data.type = SMP_T_BIN;
smp->data.u.str.area = (char *)pos;
smp->data.u.str.data = sizeof(uint64_t);
smp->flags = SMP_F_VOL_TEST;
return 1;
}
break;
}
case PBUF_TYPE_LENGTH_DELIMITED:
{
/* Decode the length of this length-delimited field. */
if (!protobuf_decode_varint(&elen, &pos, &left))
return 0;
if (elen > left)
return 0;
/* The size of the current field is computed from here do skip
* the bytes to encode the previous lenght.*
*/
sleft = left;
if (!found) {
/* Skip the current length-delimited field. */
pos += elen;
left -= elen;
break;
} else if (next_field == fid_sz - 1) {
smp->data.type = SMP_T_BIN;
smp->data.u.str.area = (char *)pos;
smp->data.u.str.data = elen;
smp->flags = SMP_F_VOL_TEST;
return 1;
}
break;
}
case PBUF_TYPE_32BIT:
{
if (!found) {
pos += sizeof(uint32_t);
left -= sizeof(uint32_t);
} else if (next_field == fid_sz - 1) {
smp->data.type = SMP_T_BIN;
smp->data.u.str.area = (char *)pos;
smp->data.u.str.data = sizeof(uint32_t);
smp->flags = SMP_F_VOL_TEST;
return 1;
}
break;
}
default:
return 0;
}
if ((ssize_t)(elen) > 0)
elen -= sleft - left;
if (found) {
next_field++;
}
else if ((ssize_t)elen <= 0) {
next_field = 0;
}
}
grpc_left -= grpc_msg_len;
}
return 0;
}
/* Fetch an HTTP header's IP value. takes a mandatory argument of type string
* and an optional one of type int to designate a specific occurrence.
* It returns an IPv4 or IPv6 address.
@ -2882,6 +3122,7 @@ static struct sample_fetch_kw_list sample_fetch_keywords = {ILH, {
{ "req.hdr_ip", smp_fetch_hdr_ip, ARG2(0,STR,SINT), val_hdr, SMP_T_IPV4, SMP_USE_HRQHV },
{ "req.hdr_names", smp_fetch_hdr_names, ARG1(0,STR), NULL, SMP_T_STR, SMP_USE_HRQHV },
{ "req.hdr_val", smp_fetch_hdr_val, ARG2(0,STR,SINT), val_hdr, SMP_T_SINT, SMP_USE_HRQHV },
{ "req.ungrpc", smp_fetch_req_ungrpc, ARG1(1, PBUF_FNUM), NULL, SMP_T_BIN, SMP_USE_HRQHV },
/* explicit req.{cook,hdr} are used to force the fetch direction to be response-only */
{ "res.cook", smp_fetch_cookie, ARG1(0,STR), NULL, SMP_T_STR, SMP_USE_HRSHV },