[FFmpeg-devel] [PATCH 2/4] ffserver: Implement publisher
Stephan Holljes
klaxa1337 at googlemail.com
Thu Apr 12 16:35:47 EEST 2018
---
publisher.c | 278 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
publisher.h | 134 +++++++++++++++++++++++++++++
2 files changed, 412 insertions(+)
create mode 100644 publisher.c
create mode 100644 publisher.h
diff --git a/publisher.c b/publisher.c
new file mode 100644
index 0000000..d1ccb95
--- /dev/null
+++ b/publisher.c
@@ -0,0 +1,278 @@
+#include "publisher.h"
+#include "segment.h"
+#include <libavutil/log.h>
+
+void client_log(struct Client *c)
+{
+ char state[64];
+ sprintf("State: ", state);
+ switch(c->state) {
+ case FREE:
+ sprintf(state, "FREE");
+ break;
+ case RESERVED:
+ sprintf(state, "RESERVED");
+ break;
+ case WAIT:
+ sprintf(state, "WAIT");
+ break;
+ case WRITABLE:
+ sprintf(state, "WRITABLE");
+ break;
+ case BUSY:
+ sprintf(state, "BUSY");
+ break;
+ case BUFFER_FULL:
+ sprintf(state, "BUFFER_FULL");
+ break;
+ default:
+ sprintf(state, "UNDEFINED");
+ break;
+ }
+ av_log(NULL, AV_LOG_INFO, "%s\n", state);
+}
+
+void client_disconnect(struct Client *c)
+{
+ struct Segment *seg;
+ av_write_trailer(c->ofmt_ctx);
+ avio_close(c->ofmt_ctx->pb);
+ avformat_free_context(c->ofmt_ctx);
+ while(av_fifo_size(c->buffer)) {
+ av_fifo_generic_read(c->buffer, &seg, sizeof(struct Segment*), NULL);
+ segment_unref(seg);
+ }
+ client_set_state(c, FREE);
+ c->current_segment_id = -1;
+}
+
+void client_set_state(struct Client *c, enum State state)
+{
+ pthread_mutex_lock(&c->state_lock);
+ c->state = state;
+ pthread_mutex_unlock(&c->state_lock);
+}
+
+void client_push_segment(struct Client *c, struct Segment *seg)
+{
+ if (av_fifo_space(c->buffer) == 0) {
+ av_log(NULL, AV_LOG_WARNING, "Client buffer full, dropping Segment.\n");
+ client_set_state(c, BUFFER_FULL);
+ return;
+ }
+ segment_ref(seg);
+ av_fifo_generic_write(c->buffer, &seg, sizeof(struct Segment*), NULL);
+ client_set_state(c, WRITABLE);
+}
+
+void publisher_init(struct PublisherContext **pub)
+{
+ int i;
+ struct PublisherContext *pc = (struct PublisherContext*) malloc(sizeof(struct PublisherContext));
+ pc->nb_threads = 4;
+ pc->current_segment_id = -1;
+ pc->shutdown = 0;
+ pc->buffer = av_fifo_alloc_array(sizeof(struct Segment), MAX_SEGMENTS);
+ pc->fs_buffer = av_fifo_alloc_array(sizeof(struct Segment), MAX_SEGMENTS);
+ for (i = 0; i < MAX_CLIENTS; i++) {
+ struct Client *c = &pc->clients[i];
+ c->buffer = av_fifo_alloc_array(sizeof(struct Segment), MAX_SEGMENTS);
+ c->id = i;
+ c->current_segment_id = -1;
+ pthread_mutex_init(&c->state_lock, NULL);
+ client_set_state(c, FREE);
+ }
+ *pub = pc;
+}
+
+void publisher_push_segment(struct PublisherContext *pub, struct Segment *seg)
+{
+ struct Segment *drop;
+ av_fifo_generic_write(pub->buffer, &seg, sizeof(struct Segment*), NULL);
+ segment_ref(seg);
+ if (av_fifo_size(pub->fs_buffer) >= BUFFER_SEGMENTS * sizeof(struct Segment*)) {
+ av_fifo_generic_read(pub->fs_buffer, &drop, sizeof(struct Segment*), NULL);
+ segment_unref(drop);
+ }
+ av_fifo_generic_write(pub->fs_buffer, &seg, sizeof(struct Segment*), NULL);
+ segment_ref(seg);
+}
+
+int publisher_reserve_client(struct PublisherContext *pub)
+{
+ int i;
+ for (i = 0; i < MAX_CLIENTS; i++) {
+ switch(pub->clients[i].state) {
+ case FREE:
+ client_set_state(&pub->clients[i], RESERVED);
+ return 0;
+ default:
+ continue;
+ }
+ }
+ return 1;
+}
+
+void publisher_cancel_reserve(struct PublisherContext *pub)
+{
+ int i;
+ for (i = 0; i < MAX_CLIENTS; i++) {
+ switch(pub->clients[i].state) {
+ case RESERVED:
+ client_set_state(&pub->clients[i], FREE);
+ return;
+ default:
+ continue;
+ }
+ }
+ return;
+}
+
+void client_push_prebuffer(struct PublisherContext *pub, struct Client *c)
+{
+ int off;
+ int size;
+ struct Segment *seg;
+ size = av_fifo_size(pub->fs_buffer);
+ for (off = 0; off < size; off += sizeof(struct Segment*)) {
+ av_fifo_generic_peek_at(pub->fs_buffer, &seg, off, sizeof(struct Segment*), NULL);
+ client_push_segment(c, seg);
+ }
+}
+
+void publisher_add_client(struct PublisherContext *pub, AVFormatContext *ofmt_ctx)
+{
+ int i;
+ struct Segment *prebuffer_seg;
+ for (i = 0; i < MAX_CLIENTS; i++) {
+ switch(pub->clients[i].state) {
+ case RESERVED:
+ pub->clients[i].ofmt_ctx = ofmt_ctx;
+ client_set_state(&pub->clients[i], WRITABLE);
+ client_push_prebuffer(pub, &pub->clients[i]);
+ return;
+ default:
+ continue;
+ }
+ }
+}
+
+void publisher_free(struct PublisherContext *pub)
+{
+ int i;
+ struct Segment *seg;
+ while(av_fifo_size(pub->buffer)) {
+ av_fifo_generic_read(pub->buffer, &seg, sizeof(struct Segment*), NULL);
+ segment_unref(seg);
+ }
+ av_fifo_freep(&pub->buffer);
+ while(av_fifo_size(pub->fs_buffer)) {
+ av_fifo_generic_read(pub->fs_buffer, &seg, sizeof(struct Segment*), NULL);
+ segment_unref(seg);
+ }
+ av_fifo_freep(&pub->fs_buffer);
+ for (i = 0; i < MAX_CLIENTS; i++) {
+ av_fifo_freep(&pub->clients[i].buffer);
+ }
+ return;
+}
+
+void publisher_freep(struct PublisherContext **pub)
+{
+ publisher_free(*pub);
+ *pub = NULL;
+ return;
+}
+
+void publish(struct PublisherContext *pub)
+{
+ int i;
+ struct Segment *seg;
+ av_log(NULL, AV_LOG_DEBUG, "pub->buffer size: %d\n", av_fifo_size(pub->buffer));
+ if (av_fifo_size(pub->buffer) == 0)
+ return;
+ av_log(NULL, AV_LOG_DEBUG, "Peeking buffer\n");
+ av_fifo_generic_read(pub->buffer, &seg, sizeof(struct Segment*), NULL);
+ av_log(NULL, AV_LOG_DEBUG, "Peeked buffer\n");
+ if (seg) {
+ pub->current_segment_id = seg->id;
+ for (i = 0; i < MAX_CLIENTS; i++) {
+ switch(pub->clients[i].state) {
+ case BUFFER_FULL:
+ av_log(NULL, AV_LOG_WARNING, "Dropping segment for client %d, buffer full.\n", i);
+ continue;
+ case WAIT:
+ case WRITABLE:
+ client_push_segment(&pub->clients[i], seg);
+ default:
+ continue;
+ }
+ }
+ segment_unref(seg);
+ }
+}
+
+void publisher_gen_status_json(struct PublisherContext *pub, char *status)
+{
+ int nb_free = 0, nb_reserved = 0, nb_wait = 0, nb_writable = 0, nb_busy = 0, nb_buffer_full = 0, current_read = 0, newest_write = 0, oldest_write = 0;
+ int i;
+ struct Client *c;
+
+ current_read = pub->current_segment_id;
+ oldest_write = current_read;
+
+ for (i = 0; i < MAX_CLIENTS; i++) {
+ c = &pub->clients[i];
+ if (c->current_segment_id > 0 && c->current_segment_id < oldest_write) {
+ oldest_write = c->current_segment_id;
+ }
+ if (c->current_segment_id > newest_write) {
+ newest_write = c->current_segment_id;
+ }
+
+ switch(c->state) {
+ case FREE:
+ nb_free++;
+ continue;
+ case RESERVED:
+ nb_reserved++;
+ continue;
+ case WAIT:
+ nb_wait++;
+ continue;
+ case WRITABLE:
+ nb_writable++;
+ continue;
+ case BUSY:
+ nb_busy++;
+ continue;
+ case BUFFER_FULL:
+ nb_buffer_full++;
+ continue;
+ default:
+ continue;
+ }
+ }
+
+
+ snprintf(status, 4095,
+ "{\n\t\"free\": %d,\n"
+ "\t\"reserved\": %d,\n"
+ "\t\"wait\": %d,\n"
+ "\t\"writable\": %d,\n"
+ "\t\"busy\": %d\n"
+ "\t\"buffer_full\": %d\n"
+ "\t\"current_read\": %d\n"
+ "\t\"newest_write\": %d\n"
+ "\t\"oldest_write\": %d\n"
+ "}\n",
+ nb_free,
+ nb_reserved,
+ nb_wait,
+ nb_writable,
+ nb_busy,
+ nb_buffer_full,
+ current_read,
+ newest_write,
+ oldest_write);
+}
diff --git a/publisher.h b/publisher.h
new file mode 100644
index 0000000..7646fda
--- /dev/null
+++ b/publisher.h
@@ -0,0 +1,134 @@
+#ifndef PUBLISHER_H
+#define PUBLISHER_H
+
+#include <libavformat/avformat.h>
+#include <libavutil/fifo.h>
+#include <pthread.h>
+#include "segment.h"
+
+#define MAX_CLIENTS 16
+#define MAX_SEGMENTS 16
+#define BUFFER_SEGMENTS 10
+
+/* Client State enum */
+
+enum State {
+ FREE, // no client connected
+ RESERVED, // reserved for a client that just connected
+ WAIT, // up to date, no new Segments to write
+ WRITABLE, // buffer is not full, new Segments can be pushed
+ BUSY, // currently writing to this client
+ BUFFER_FULL // client buffer is full, new Segments will be dropped
+};
+
+
+struct Client {
+ AVFormatContext *ofmt_ctx; // writable AVFormatContext, basically our tcp connection to the client
+ AVFifoBuffer *buffer; // Client buffer of Segment references
+ enum State state;
+ pthread_mutex_t state_lock;
+ int id;
+ int current_segment_id; // The stream-based id of the segment that has last been worked on.
+};
+
+struct PublisherContext {
+ struct Client clients[MAX_CLIENTS]; // currently compile-time configuration, easly made dynamic with malloc?
+ AVFifoBuffer *buffer; // publisher buffer for new Segments
+ AVFifoBuffer *fs_buffer; // fast start buffer
+ int nb_threads;
+ int current_segment_id;
+ int shutdown; // indicate shutdown, gracefully close client connections and files and exit
+};
+
+/**
+ * Log a client's stats to the console.
+ *
+ * @param c pointer to the client to print
+ */
+void client_log(struct Client *c);
+
+/**
+ * Disconnect a client
+ *
+ * @param c pointer to the client to disconnect.
+ */
+void client_disconnect(struct Client *c);
+
+/**
+ * Set a client's state. Note: This is protected by mutex locks.
+ *
+ * @param c pointer to the client to set the state of
+ * @param state the state to set the client to
+ */
+void client_set_state(struct Client *c, enum State state);
+
+/**
+ * Allocate and initialize a PublisherContext
+ *
+ * @param pub pointer to a pointer to a PublisherContext. It will be allocated and initialized.
+ */
+void publisher_init(struct PublisherContext **pub);
+
+/**
+ * Push a Segment to a PublisherContext.
+ *
+ * @param pub pointer to a PublisherContext
+ * @param seg pointer to the Segment to add
+ */
+void publisher_push_segment(struct PublisherContext *pub, struct Segment *seg);
+
+/**
+ * Reserve a slot in the client struct of a PublisherContext. May fail if the number
+ * of maximum clients has been reached.
+ *
+ * @param pub pointer to a PublisherContext
+ * @return 0 in case of success, 1 in case of failure
+ */
+int publisher_reserve_client(struct PublisherContext *pub);
+
+/**
+ * Cancel a single reservation. This can be used if a client spot was reserved, but the client
+ * unexpectedly disconnects or sends an invalid request.
+ *
+ * @param pub pointer to a PublisherContext
+ */
+void publisher_cancel_reserve(struct PublisherContext *pub);
+
+/**
+ * Add a client by its ofmt_ctx. This initializes an element in the client struct of the PublisherContext
+ * that has been reserved prior to calling this function.
+ *
+ * @param pub pointer to a PublisherContext
+ * @param ofmt_ctx AVFormatContext of a client
+ */
+void publisher_add_client(struct PublisherContext *pub, AVFormatContext *ofmt_ctx);
+
+/**
+ * Free buffers and associated client buffers.
+ *
+ * @param pub pointer to the PublisherContext to free
+ */
+void publisher_free(struct PublisherContext *pub);
+
+/**
+ * Free buffers and associated client buffers and set *pub to NULL.
+ *
+ * @param pub pointer to the PublisherContext pointer to free
+ */
+void publisher_freep(struct PublisherContext **pub);
+
+/**
+ * Signal to the PublisherContext to check its buffer and publish pending Segments.
+ *
+ * @param pub pointer to a PublisherContext
+ */
+void publish(struct PublisherContext *pub);
+
+/**
+ * Print the current client and file reading status to a json string.
+ * @param pub pointer to a PublisherContext
+ * @param status string of at least 4096 bytes size.
+ */
+void publisher_gen_status_json(struct PublisherContext *pub, char *status);
+
+#endif // PUBLISHER_H
--
2.16.2
More information about the ffmpeg-devel
mailing list