[FFmpeg-devel] [PATCH] avformat/udp: Add a delay between packets for streaming to clients with short buffer
Nicolas George
george at nsup.org
Thu Mar 3 20:51:44 CET 2016
Le primidi 11 ventôse, an CCXXIV, Pavel Nikiforov a écrit :
> Hello !
>
> This patch enables background sending of UDP packets with specified delay.
> When sending packets without a delay some devices with small RX buffer
> ( MAG200 STB, for example) will drop tail packets in burst causing
> decoding errors.
>
> It needs to specify "fifo_size" with "packet_gap" .
>
> The output url will looks like udp://xxx:yyy?fifo_size=<output fifo
> size>&packet_gap=<delay in usecs>
>
> Patch attached.
Thanks for the patch. Comments below.
> libavformat/udp.c | 133 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
> 1 file changed, 129 insertions(+), 4 deletions(-)
Missing documentation update.
>
> diff --git a/libavformat/udp.c b/libavformat/udp.c
> index ea80e52..ff676f4 100644
> --- a/libavformat/udp.c
> +++ b/libavformat/udp.c
> @@ -92,6 +92,7 @@ typedef struct UDPContext {
> int circular_buffer_size;
> AVFifoBuffer *fifo;
> int circular_buffer_error;
> + int packet_gap; /* delay between transmitted packets */
> #if HAVE_PTHREAD_CANCEL
> pthread_t circular_buffer_thread;
> pthread_mutex_t mutex;
> @@ -112,6 +113,7 @@ typedef struct UDPContext {
> #define E AV_OPT_FLAG_ENCODING_PARAM
> static const AVOption options[] = {
> { "buffer_size", "System data size (in bytes)", OFFSET(buffer_size), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
> + { "packet_gap", "Delay between packets, in usec", OFFSET(packet_gap), AV_OPT_TYPE_INT, { .i64 = 0 }, 0, INT_MAX, .flags = E },
As pointed by Michael, this should use AV_OPT_TYPE_DURATION.
> { "localport", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, D|E },
> { "local_port", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
> { "localaddr", "Local address", OFFSET(localaddr), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E },
> @@ -486,7 +488,7 @@ static int udp_get_file_handle(URLContext *h)
> }
>
> #if HAVE_PTHREAD_CANCEL
> -static void *circular_buffer_task( void *_URLContext)
> +static void *circular_buffer_task_rx( void *_URLContext)
> {
> URLContext *h = _URLContext;
> UDPContext *s = h->priv_data;
> @@ -499,7 +501,7 @@ static void *circular_buffer_task( void *_URLContext)
> s->circular_buffer_error = AVERROR(EIO);
> goto end;
> }
> - while(1) {
> + for(;;) {
Stray change.
> int len;
>
> pthread_mutex_unlock(&s->mutex);
> @@ -542,6 +544,81 @@ end:
> pthread_mutex_unlock(&s->mutex);
> return NULL;
> }
> +
> +static void do_udp_write(void *arg, void *buf, int size) {
> + URLContext *h = arg;
> + UDPContext *s = h->priv_data;
> +
> + int ret;
> +
> + if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
This looks suspicious: blocking/nonblocking is a property of the protocol as
seen by the calling application, it should not apply in a thread used
internally. See below.
> + ret = ff_network_wait_fd(s->udp_fd, 1);
> + if (ret < 0) {
> + s->circular_buffer_error=ret;
Please keep in line with surrounding coding style. In this instance: spaces
around operators.
> + return;
> + }
> + }
> +
> + if (!s->is_connected) {
> + ret = sendto (s->udp_fd, buf, size, 0,
> + (struct sockaddr *) &s->dest_addr,
Style: no spaces after function names and casts.
> + s->dest_addr_len);
> + } else
> + ret = send(s->udp_fd, buf, size, 0);
> +
> + s->circular_buffer_error=ret;
> +}
> +
> +static void *circular_buffer_task_tx( void *_URLContext)
> +{
> + URLContext *h = _URLContext;
> + UDPContext *s = h->priv_data;
> + int old_cancelstate;
> +
> + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
> +
> + for(;;) {
> + int len;
> + uint8_t tmp[4];
> +
> + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
> +
> + av_usleep(s->packet_gap);
Not very reliable: if the task gets unscheduled for some time, it will be
added to the gap and never catch, possibly causing playback hiccups. A
system with a kind of rate control would probably be better.
> +
> + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
> +
> + pthread_mutex_lock(&s->mutex);
> +
> + len=av_fifo_size(s->fifo);
> +
> + while (len<4) {
> + if (pthread_cond_wait(&s->cond, &s->mutex) < 0) {
> + goto end;
> + }
> + len=av_fifo_size(s->fifo);
> + }
> +
> + av_fifo_generic_peek(s->fifo, tmp, 4, NULL);
> + len=AV_RL32(tmp);
> +
> + if (len>0 && av_fifo_size(s->fifo)>=len) {
len + 4
> + av_fifo_drain(s->fifo, 4); /* skip packet length */
> + av_fifo_generic_read(s->fifo, h, len, do_udp_write); /* use function for write from fifo buffer */
The comment looks useless.
> + if (s->circular_buffer_error == len) {
> + /* all ok - reset error */
> + s->circular_buffer_error=0;
> + }
> + }
> +
> + pthread_mutex_unlock(&s->mutex);
> + }
> +
> +end:
> + pthread_mutex_unlock(&s->mutex);
> + return NULL;
> +}
> +
> +
> #endif
>
> static int parse_source_list(char *buf, char **sources, int *num_sources,
> @@ -650,6 +727,13 @@ static int udp_open(URLContext *h, const char *uri, int flags)
> "'circular_buffer_size' option was set but it is not supported "
> "on this build (pthread support is required)\n");
> }
> + if (av_find_info_tag(buf, sizeof(buf), "packet_gap", p)) {
> + s->packet_gap = strtol(buf, NULL, 10);
> + if (!HAVE_PTHREAD_CANCEL)
> + av_log(h, AV_LOG_WARNING,
> + "'packet_gap' option was set but it is not supported "
> + "on this build (pthread support is required)\n");
> + }
> if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
> av_strlcpy(localaddr, buf, sizeof(localaddr));
> }
> @@ -829,7 +913,18 @@ static int udp_open(URLContext *h, const char *uri, int flags)
> s->udp_fd = udp_fd;
>
> #if HAVE_PTHREAD_CANCEL
> - if (!is_output && s->circular_buffer_size) {
> + /*
> + Create thread in case of:
> + 1. Input and circular_buffer_size is set
> + 2. Output and packet_gap and circular_buffer_size is set
> + */
UDP sockets can be read+write; in this case, two threads are needed.
> +
> + if (is_output && s->packet_gap && !s->circular_buffer_size) {
> + /* Warn user in case of 'circular_buffer_size' is not set */
> + av_log(h, AV_LOG_WARNING,"'packet_gap' option was set but 'circular_buffer_size' is not, but required\n");
> + }
> +
> + if ((!is_output && s->circular_buffer_size) || (is_output && s->packet_gap && s->circular_buffer_size)) {
> int ret;
>
> /* start the task going */
> @@ -844,7 +939,7 @@ static int udp_open(URLContext *h, const char *uri, int flags)
> av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
> goto cond_fail;
> }
> - ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h);
> + ret = pthread_create(&s->circular_buffer_thread, NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx, h);
> if (ret != 0) {
> av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
> goto thread_fail;
> @@ -945,6 +1040,36 @@ static int udp_write(URLContext *h, const uint8_t *buf, int size)
> UDPContext *s = h->priv_data;
> int ret;
>
> +#if HAVE_PTHREAD_CANCEL
> + if (s->fifo) {
> + uint8_t tmp[4];
> +
> + pthread_mutex_lock(&s->mutex);
> +
> + /*
> + Return error if last tx failed.
> + Here we can't know on which packet error was, but it needs to know that error exists.
> + */
> + if (s->circular_buffer_error<0) {
> + int err=s->circular_buffer_error;
> + s->circular_buffer_error=0;
> + pthread_mutex_unlock(&s->mutex);
> + return err;
> + }
> +
> + if(av_fifo_space(s->fifo) < size + 4) {
> + /* What about a partial packet tx ? */
> + pthread_mutex_unlock(&s->mutex);
> + return AVERROR(ENOMEM);
> + }
ENOMEM means that memory allocation failed, it is not true here. This is the
point where blocking or non-blocking matters: in non-blocking mode, the
return value must be AVERROR(EAGAIN). In blocking mode, it should block
until the thread catches up.
> + AV_WL32(tmp, size);
> + av_fifo_generic_write(s->fifo, tmp, 4, NULL); /* size of packet */
> + av_fifo_generic_write(s->fifo, (uint8_t *)buf, size, NULL); /* the data */
> + pthread_cond_signal(&s->cond);
> + pthread_mutex_unlock(&s->mutex);
> + return size;
> + }
> +#endif
> if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
> ret = ff_network_wait_fd(s->udp_fd, 1);
> if (ret < 0)
Regards,
--
Nicolas George
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 819 bytes
Desc: Digital signature
URL: <http://ffmpeg.org/pipermail/ffmpeg-devel/attachments/20160303/79342cce/attachment.sig>
More information about the ffmpeg-devel
mailing list