[FFmpeg-devel] [PATCH v2 1/2] avformat: Add AMQP version 0-9-1 protocol support
Marton Balint
cus at passwd.hu
Sun Mar 8 13:25:29 EET 2020
> Subject: [FFmpeg-devel] [PATCH v2 1/2] avformat: Add AMQP version 0-9-1 protocol support
>
> From: Andriy Gelman <andriy.gelman at gmail.com>
>
> Supports connecting to a RabbitMQ broker via AMQP version 0-9-1.
>
> Signed-off-by: Andriy Gelman <andriy.gelman at gmail.com>
> ---
>
> Changes in v2:
> - Addressed comments from Marton
> - Updated documentation
>
> Compilation notes:
> - Requires librabbitmq-dev package (on ubuntu).
> - The pkg-config libprabbitmq.pc has a corrupt entry.
> **update: fixed on the github master branch**
> The line "Libs.private: rt; -lpthread" should be changed to
> "Libs.private: -lrt -pthread".
> - Compile FFmpeg with --enable-librabbitmq
>
[...]
> + at item connection_timeout
> +The timeout in microseconds during the initial connection to the broker. The
In *seconds* (because it is an AV_OPT_TYPE_DURATION)
> +default value is rw_timeout, or 5000000 microseconds if rw_timeout is not set.
5 seconds
[...]
> +static const AVOption options[] = {
> + { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags = D | E },
> + { "exchange", "Exchange to send/read packets", OFFSET(exchange), AV_OPT_TYPE_STRING, { .str = "amq.direct" }, 0, 0, .flags = D | E },
> + { "routing_key", "Key to filter streams", OFFSET(routing_key), AV_OPT_TYPE_STRING, { .str = "amqp" }, 0, 0, .flags = D | E },
> + { "connection_timeout", "Initial connection timeout", OFFSET(connection_timeout), AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT_MAX, .flags = D | E},
INT64_MAX can be the maximum.
> + { NULL }
> +};
> +
> +static int amqp_proto_open(URLContext *h, const char *uri, int flags)
> +{
> + int ret, server_msg;
> + char hostname[STR_LEN], credentials[STR_LEN];
> + char *credentials_decoded;
> + int port;
> + const char *user, *password = NULL;
> + char *p;
> + amqp_rpc_reply_t broker_reply;
> + struct timeval tval = { 0 };
> +
> + AMQPContext *s = h->priv_data;
> +
> + h->is_streamed = 1;
> + h->max_packet_size = s->pkt_size;
> +
> + av_url_split(NULL, 0, credentials, sizeof(credentials),
> + hostname, sizeof(hostname), &port, NULL, 0, uri);
> +
> + if (port < 0)
> + port = 5672;
> +
> + if (hostname[0] == '\0' || port <= 0 || port > 65535 ) {
> + av_log(h, AV_LOG_ERROR, "Invalid hostname/port\n");
> + return AVERROR(EINVAL);
> + }
> +
> + credentials_decoded = ff_urldecode(credentials, 0);
This is not entirely correct, becase the username may contain ':'
characters... So you should split first and urldecode the splitted
components...
> + if (!credentials_decoded)
> + return AVERROR(ENOMEM);
> +
> + p = strchr(credentials_decoded, ':');
> + if (p) {
> + *p = '\0';
> + password = p + 1;
> + }
> +
> + if (!password || *password == '\0')
> + password = "guest";
> +
> + user = credentials_decoded;
> + if (*user == '\0')
> + user = "guest";
> +
> + s->conn = amqp_new_connection();
> + if (!s->conn) {
> + av_log(h, AV_LOG_ERROR, "Error creating connection\n");
> + return AVERROR_EXTERNAL;
> + }
> +
> + s->socket = amqp_tcp_socket_new(s->conn);
> + if (!s->socket) {
> + av_log(h, AV_LOG_ERROR, "Error creating socket\n");
> + goto destroy_connection;
> + }
> +
> + if (s->connection_timeout < 0)
> + s->connection_timeout = (h->rw_timeout > 0 ? h->rw_timeout : 5000000);
> +
> + tval.tv_sec = s->connection_timeout / 1000000;
> + tval.tv_usec = s->connection_timeout % 1000000;
> + ret = amqp_socket_open_noblock(s->socket, hostname, port, &tval);
> +
> + if (ret) {
> + av_log(h, AV_LOG_ERROR, "Error connecting to server\n");
This should log the useful error, e.g:
av_log(h, AV_LOG_ERROR, "Error connecting to server: %s\n", amqp_error_string2(ret));
[...]
> +static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
> +{
> + int ret;
> + AMQPContext *s = h->priv_data;
> + int fd = amqp_socket_get_sockfd(s->socket);
> +
> + amqp_bytes_t message = { size, (void *)buf };
> + amqp_basic_properties_t props;
> +
> + ret = ff_network_wait_fd_timeout(fd, 1, h->rw_timeout, &h->interrupt_callback);
> + if (ret)
> + return ret;
> +
> + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
> + props.content_type = amqp_cstring_bytes("octet/stream");
> + props.delivery_mode = 2; /* persistent delivery mode */
> +
> + ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL, amqp_cstring_bytes(s->exchange),
> + amqp_cstring_bytes(s->routing_key), 0, 0,
> + &props, message);
> +
> + if (ret) {
> + av_log(h, AV_LOG_ERROR, "Error publish\n");
Same here
> + return AVERROR_EXTERNAL;
> + }
> +
> + return size;
> +}
> +
[...]
Thanks,
Marton
More information about the ffmpeg-devel
mailing list