[FFmpeg-devel] [PATCH 2/4] libavformat/tcp: Added an option to reuse sockets
Karthick J
kjeyapal at akamai.com
Fri Nov 3 10:27:01 EET 2017
---
doc/protocols.texi | 4 ++
libavformat/tcp.c | 150 ++++++++++++++++++++++++++++++++++++++++++++++++++++
libavformat/tcp.h | 27 ++++++++++
libavformat/utils.c | 2 +
4 files changed, 183 insertions(+)
create mode 100644 libavformat/tcp.h
diff --git a/doc/protocols.texi b/doc/protocols.texi
index a7968ff..62d317d 100644
--- a/doc/protocols.texi
+++ b/doc/protocols.texi
@@ -1242,6 +1242,10 @@ Set receive buffer size, expressed bytes.
@item send_buffer_size=@var{bytes}
Set send buffer size, expressed bytes.
+
+ at item reuse_sockets=@var{1|0}
+Reuse sockets instead of opening a new socket each time.
+Default value is 0.
@end table
The following example shows how to setup a listening TCP connection
diff --git a/libavformat/tcp.c b/libavformat/tcp.c
index 06368ff..8bca628 100644
--- a/libavformat/tcp.c
+++ b/libavformat/tcp.c
@@ -1,6 +1,7 @@
/*
* TCP protocol
* Copyright (c) 2002 Fabrice Bellard
+ * Copyright (c) 2017 Akamai Technologies, Inc
*
* This file is part of FFmpeg.
*
@@ -19,6 +20,8 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "avformat.h"
+#include "tcp.h"
+#include "libavcodec/internal.h"
#include "libavutil/avassert.h"
#include "libavutil/parseutils.h"
#include "libavutil/opt.h"
@@ -38,6 +41,7 @@ typedef struct TCPOptions {
int listen_timeout;
int recv_buffer_size;
int send_buffer_size;
+ int reuse_sockets;
} TCPOptions;
typedef struct TCPContext {
@@ -47,6 +51,16 @@ typedef struct TCPContext {
TCPOptions options;
} TCPContext;
+typedef struct TCPSocket {
+ char *hostname;
+ int port;
+ int in_use;
+ int64_t last_close_time;
+ int fd;
+ TCPOptions options;
+ struct TCPSocket *next;
+} TCPSocket;
+
#define OFFSET(x) (offsetof(TCPContext, options) + offsetof(TCPOptions, x))
#define D AV_OPT_FLAG_DECODING_PARAM
#define E AV_OPT_FLAG_ENCODING_PARAM
@@ -56,6 +70,7 @@ static const AVOption options[] = {
{ "listen_timeout", "Connection awaiting timeout (in milliseconds)", OFFSET(listen_timeout), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
{ "send_buffer_size", "Socket send buffer size (in bytes)", OFFSET(send_buffer_size), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
{ "recv_buffer_size", "Socket receive buffer size (in bytes)", OFFSET(recv_buffer_size), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
+ { "reuse_sockets", "Reuse sockets instead of opening a new socket each time", OFFSET(reuse_sockets), AV_OPT_TYPE_BOOL, { .i64 = 0 }, 0, 1, .flags = D|E },
{ NULL }
};
@@ -66,6 +81,116 @@ static const AVClass tcp_class = {
.version = LIBAVUTIL_VERSION_INT,
};
+static TCPSocket *first_socket = NULL;
+
+static TCPSocket* tcp_socket_create (TCPContext *s, char *name, int port)
+{
+ TCPSocket *p = NULL;
+
+ p = (TCPSocket *) calloc (1, sizeof(TCPSocket));
+ if (p == NULL) {
+ return NULL;
+ }
+
+ p->hostname = strdup(name);
+ p->port = port;
+ p->in_use = 1;
+ p->fd = s->fd;
+ p->options = s->options;
+ p->next = NULL;
+
+ return p;
+}
+
+static int tcp_socket_free (TCPSocket *socket)
+{
+ if (socket) {
+ closesocket(socket->fd);
+ free(socket->hostname);
+ free(socket);
+ }
+ return 0;
+}
+
+static void tcp_list_add(TCPContext *s, char *name, int port)
+{
+ TCPSocket *socket = tcp_socket_create(s, name, port);
+
+ if (!socket)
+ return;
+ avpriv_lock_avformat();
+ socket->next = first_socket;
+ first_socket = socket;
+ avpriv_unlock_avformat();
+
+}
+
+static void tcp_list_remove_next(TCPSocket *socket)
+{
+ TCPSocket *temp;
+ if (socket) {
+ temp = socket->next;
+ socket->next = socket->next->next;
+ } else {
+ temp = first_socket;
+ first_socket = first_socket->next;
+ }
+ tcp_socket_free(temp);
+
+}
+
+static int tcp_socket_find (TCPContext *s, char *name, int port)
+{
+ int lfd = -1;
+ TCPSocket *p = first_socket;
+ TCPSocket *prev = NULL;
+ int64_t current_time = av_gettime();
+ avpriv_lock_avformat();
+ while(p) {
+ const int idle_timeout = 60 * 1000000;
+ // Remove idle connections
+ if (!p->in_use && (current_time - p->last_close_time) > idle_timeout) {
+ tcp_list_remove_next(prev);
+ } else {
+ // Reuse the connection if a correct match if found
+ if (!p->in_use && lfd == -1 &&
+ !strcmp(p->hostname, name) && (p->port == port) &&
+ !memcmp(&p->options, &s->options, sizeof(s->options))) {
+ p->in_use = 1;
+ lfd = p->fd;
+ }
+ prev = p;
+ }
+ p = p->next;
+
+ }
+ avpriv_unlock_avformat();
+ return lfd;
+}
+
+static void tcp_socket_release (int search_fd)
+{
+ TCPSocket *p = first_socket;
+
+ while(p) {
+ if (p->fd == search_fd) {
+ p->last_close_time = av_gettime();
+ p->in_use = 0;
+ break;
+ }
+ p = p->next;
+ }
+}
+
+void ff_tcp_deinit(void)
+{
+ avpriv_lock_avformat();
+ while(first_socket) {
+ tcp_list_remove_next(NULL);
+ }
+ avpriv_unlock_avformat();
+}
+
/* return non zero if error */
static int tcp_open(URLContext *h, const char *uri, int flags)
{
@@ -107,6 +232,16 @@ static int tcp_open(URLContext *h, const char *uri, int flags)
s->open_timeout =
h->rw_timeout = s->options.rw_timeout;
}
+ /* Check for an existing connection */
+ if (s->options.reuse_sockets) {
+ int reuse_fd = tcp_socket_find(s, hostname, port);
+ if (-1 != reuse_fd) {
+ h->is_streamed = 1;
+ s->fd = reuse_fd;
+ av_log (h, AV_LOG_INFO, "reusing socket fd = %d\n", reuse_fd);
+ return 0;
+ }
+ }
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
snprintf(portstr, sizeof(portstr), "%d", port);
@@ -178,6 +313,10 @@ static int tcp_open(URLContext *h, const char *uri, int flags)
h->is_streamed = 1;
s->fd = fd;
+ if (s->options.reuse_sockets) {
+ tcp_list_add (s, hostname, port);
+ av_log (h, AV_LOG_DEBUG, "add socket fd = %d\n", fd);
+ }
freeaddrinfo(ai);
return 0;
@@ -246,6 +385,11 @@ static int tcp_shutdown(URLContext *h, int flags)
TCPContext *s = h->priv_data;
int how;
+ if (s->options.reuse_sockets) {
+ av_log (h, AV_LOG_DEBUG, "Not shutting down.. assuming Re-use later\n");
+ return 0;
+ }
+
if (flags & AVIO_FLAG_WRITE && flags & AVIO_FLAG_READ) {
how = SHUT_RDWR;
} else if (flags & AVIO_FLAG_WRITE) {
@@ -260,6 +404,12 @@ static int tcp_shutdown(URLContext *h, int flags)
static int tcp_close(URLContext *h)
{
TCPContext *s = h->priv_data;
+
+ if (s->options.reuse_sockets) {
+ av_log (h, AV_LOG_DEBUG, "Not closing.. assuming Re-use later\n");
+ tcp_socket_release(s->fd);
+ return 0;
+ }
closesocket(s->fd);
return 0;
}
diff --git a/libavformat/tcp.h b/libavformat/tcp.h
new file mode 100644
index 0000000..c36a4de
--- /dev/null
+++ b/libavformat/tcp.h
@@ -0,0 +1,27 @@
+/*
+ * TCP Protocol
+ * Copyright (c) 2017 Akamai Technologies, Inc.
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef AVFORMAT_TCP_H_
+#define AVFORMAT_TCP_H_
+
+void ff_tcp_deinit(void);
+
+#endif /* AVFORMAT_TCP_H_ */
diff --git a/libavformat/utils.c b/libavformat/utils.c
index cbfb78b..9b22c47 100644
--- a/libavformat/utils.c
+++ b/libavformat/utils.c
@@ -48,6 +48,7 @@
#include "metadata.h"
#if CONFIG_NETWORK
#include "network.h"
+#include "tcp.h"
#endif
#include "riff.h"
#include "url.h"
@@ -4859,6 +4860,7 @@ int avformat_network_deinit(void)
#if CONFIG_NETWORK
ff_network_close();
ff_tls_deinit();
+ ff_tcp_deinit();
ff_network_inited_globally = 0;
#endif
return 0;
--
1.9.1
More information about the ffmpeg-devel
mailing list