avformat/libamqp: parse vhost in uri

Reviewed-by: Marton Balint <cus@passwd.hu>
Signed-off-by: Andriy Gelman <andriy.gelman@gmail.com>
This commit is contained in:
Andriy Gelman 2020-12-19 22:32:04 -05:00
parent 6e96e3d970
commit cd97e1ff4d
2 changed files with 30 additions and 8 deletions

View File

@ -63,16 +63,17 @@ After starting the broker, an FFmpeg client may stream data to the broker using
the command: the command:
@example @example
ffmpeg -re -i input -f mpegts amqp://[[user]:[password]@@]hostname[:port] ffmpeg -re -i input -f mpegts amqp://[[user]:[password]@@]hostname[:port][/vhost]
@end example @end example
Where hostname and port (default is 5672) is the address of the broker. The Where hostname and port (default is 5672) is the address of the broker. The
client may also set a user/password for authentication. The default for both client may also set a user/password for authentication. The default for both
fields is "guest". fields is "guest". Name of virtual host on broker can be set with vhost. The
default value is "/".
Muliple subscribers may stream from the broker using the command: Muliple subscribers may stream from the broker using the command:
@example @example
ffplay amqp://[[user]:[password]@@]hostname[:port] ffplay amqp://[[user]:[password]@@]hostname[:port][/vhost]
@end example @end example
In RabbitMQ all data published to the broker flows through a specific exchange, In RabbitMQ all data published to the broker flows through a specific exchange,

View File

@ -62,10 +62,10 @@ static const AVOption options[] = {
static int amqp_proto_open(URLContext *h, const char *uri, int flags) static int amqp_proto_open(URLContext *h, const char *uri, int flags)
{ {
int ret, server_msg; int ret, server_msg;
char hostname[STR_LEN], credentials[STR_LEN]; char hostname[STR_LEN], credentials[STR_LEN], path[STR_LEN];
int port; int port;
const char *user, *password = NULL; const char *user, *password = NULL, *vhost;
const char *user_decoded, *password_decoded; const char *user_decoded, *password_decoded, *vhost_decoded;
char *p; char *p;
amqp_rpc_reply_t broker_reply; amqp_rpc_reply_t broker_reply;
struct timeval tval = { 0 }; struct timeval tval = { 0 };
@ -76,7 +76,7 @@ static int amqp_proto_open(URLContext *h, const char *uri, int flags)
h->max_packet_size = s->pkt_size; h->max_packet_size = s->pkt_size;
av_url_split(NULL, 0, credentials, sizeof(credentials), av_url_split(NULL, 0, credentials, sizeof(credentials),
hostname, sizeof(hostname), &port, NULL, 0, uri); hostname, sizeof(hostname), &port, path, sizeof(path), uri);
if (port < 0) if (port < 0)
port = 5672; port = 5672;
@ -109,8 +109,27 @@ static int amqp_proto_open(URLContext *h, const char *uri, int flags)
return AVERROR(ENOMEM); return AVERROR(ENOMEM);
} }
/* skip query for now */
p = strchr(path, '?');
if (p)
*p = '\0';
vhost = path;
if (*vhost == '\0')
vhost = "/";
else
vhost++; /* skip leading '/' */
vhost_decoded = ff_urldecode(vhost, 0);
if (!vhost_decoded) {
av_freep(&user_decoded);
av_freep(&password_decoded);
return AVERROR(ENOMEM);
}
s->conn = amqp_new_connection(); s->conn = amqp_new_connection();
if (!s->conn) { if (!s->conn) {
av_freep(&vhost_decoded);
av_freep(&user_decoded); av_freep(&user_decoded);
av_freep(&password_decoded); av_freep(&password_decoded);
av_log(h, AV_LOG_ERROR, "Error creating connection\n"); av_log(h, AV_LOG_ERROR, "Error creating connection\n");
@ -136,7 +155,7 @@ static int amqp_proto_open(URLContext *h, const char *uri, int flags)
goto destroy_connection; goto destroy_connection;
} }
broker_reply = amqp_login(s->conn, "/", 0, s->pkt_size, 0, broker_reply = amqp_login(s->conn, vhost_decoded, 0, s->pkt_size, 0,
AMQP_SASL_METHOD_PLAIN, user_decoded, password_decoded); AMQP_SASL_METHOD_PLAIN, user_decoded, password_decoded);
if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) { if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
@ -195,6 +214,7 @@ static int amqp_proto_open(URLContext *h, const char *uri, int flags)
} }
} }
av_freep(&vhost_decoded);
av_freep(&user_decoded); av_freep(&user_decoded);
av_freep(&password_decoded); av_freep(&password_decoded);
return 0; return 0;
@ -206,6 +226,7 @@ close_connection:
destroy_connection: destroy_connection:
amqp_destroy_connection(s->conn); amqp_destroy_connection(s->conn);
av_freep(&vhost_decoded);
av_freep(&user_decoded); av_freep(&user_decoded);
av_freep(&password_decoded); av_freep(&password_decoded);
return AVERROR_EXTERNAL; return AVERROR_EXTERNAL;