[FFmpeg-devel] [PATCH 09/10] lavfi/dnn: Async Support for TensorFlow Backend
Shubhanshu Saxena
shubhanshu.e01 at gmail.com
Fri May 28 12:24:53 EEST 2021
This commit adds functions to execute the inference requests
to TensorFlow Backend asynchronously in detached threads.
Signed-off-by: Shubhanshu Saxena <shubhanshu.e01 at gmail.com>
---
libavfilter/dnn/dnn_backend_tf.c | 198 ++++++++++++++++++++++++++++---
libavfilter/dnn/dnn_backend_tf.h | 3 +
libavfilter/dnn/dnn_interface.c | 3 +
3 files changed, 187 insertions(+), 17 deletions(-)
diff --git a/libavfilter/dnn/dnn_backend_tf.c b/libavfilter/dnn/dnn_backend_tf.c
index 31746deef4..296604461b 100644
--- a/libavfilter/dnn/dnn_backend_tf.c
+++ b/libavfilter/dnn/dnn_backend_tf.c
@@ -35,6 +35,7 @@
#include "dnn_backend_native_layer_maximum.h"
#include "dnn_io_proc.h"
#include "dnn_backend_common.h"
+#include "libavutil/thread.h"
#include "safe_queue.h"
#include "queue.h"
#include <tensorflow/c/c_api.h>
@@ -57,6 +58,7 @@ typedef struct TFModel{
TF_Status *status;
SafeQueue *request_queue;
Queue *inference_queue;
+ Queue *task_queue;
} TFModel;
typedef struct tf_infer_request {
@@ -69,7 +71,10 @@ typedef struct tf_infer_request {
typedef struct RequestItem {
tf_infer_request *infer_request;
InferenceItem *inference;
- // further properties will be added later for async
+#if HAVE_PTHREAD_CANCEL
+ pthread_t thread;
+ pthread_attr_t thread_attr;
+#endif
} RequestItem;
#define OFFSET(x) offsetof(TFContext, x)
@@ -83,6 +88,7 @@ static const AVOption dnn_tensorflow_options[] = {
AVFILTER_DEFINE_CLASS(dnn_tensorflow);
static DNNReturnType execute_model_tf(RequestItem *request, Queue *inference_queue);
+static void infer_completion_callback(void *args);
static void free_buffer(void *data, size_t length)
{
@@ -112,6 +118,59 @@ static tf_infer_request* tf_create_inference_request(void)
return infer_request;
}
+static void tf_start_inference(RequestItem *request)
+{
+ tf_infer_request *infer_request = request->infer_request;
+ InferenceItem *inference = request->inference;
+ TaskItem *task = inference->task;
+ TFModel *tf_model = task->model;
+
+ TF_SessionRun(tf_model->session, NULL,
+ infer_request->tf_input, &infer_request->input_tensor, 1,
+ infer_request->tf_outputs, infer_request->output_tensors,
+ task->nb_output, NULL, 0, NULL,
+ tf_model->status);
+}
+
+static void *tf_thread_routine(void *arg)
+{
+ RequestItem *request = arg;
+ tf_start_inference(request);
+ infer_completion_callback(request);
+#if HAVE_PTHREAD_CANCEL
+ pthread_exit(0);
+#endif
+}
+
+static DNNReturnType tf_start_inference_async(RequestItem *request)
+{
+ InferenceItem *inference = request->inference;
+ TaskItem *task = inference->task;
+ TFModel *tf_model = task->model;
+ TFContext *ctx = &tf_model->ctx;
+ int ret;
+
+#if HAVE_PTHREAD_CANCEL
+ ret = pthread_create(&request->thread, &request->thread_attr, tf_thread_routine, request);
+ if (ret != 0)
+ {
+ av_log(ctx, AV_LOG_ERROR, "unable to start async inference\n");
+ return DNN_ERROR;
+ }
+ return DNN_SUCCESS;
+#else
+ av_log(ctx, AV_LOG_WARNING, "pthreads not supported. Roll back to sync\n");
+ tf_start_inference(request);
+ if (TF_GetCode(tf_model->status) != TF_OK) {
+ tf_free_request(request->infer_request);
+ av_log(ctx, AV_LOG_ERROR, "Failed to run session when executing model\n");
+ return DNN_ERROR;
+ }
+ infer_completion_callback(request);
+ return (task->inference_done == task->inference_todo) ? DNN_SUCCESS : DNN_ERROR;
+#endif
+}
+
static DNNReturnType extract_inference_from_task(TaskItem *task, Queue *inference_queue)
{
TFModel *tf_model = task->model;
@@ -826,7 +885,10 @@ DNNModel *ff_dnn_load_model_tf(const char *model_filename, DNNFunctionType func_
av_freep(&item);
goto err;
}
-
+#if HAVE_PTHREAD_CANCEL
+ pthread_attr_init(&item->thread_attr);
+ pthread_attr_setdetachstate(&item->thread_attr, PTHREAD_CREATE_DETACHED);
+#endif
if (ff_safe_queue_push_back(tf_model->request_queue, item) < 0) {
av_freep(&item->infer_request);
av_freep(&item);
@@ -839,6 +901,16 @@ DNNModel *ff_dnn_load_model_tf(const char *model_filename, DNNFunctionType func_
goto err;
}
+ tf_model->task_queue = ff_queue_create();
+ if (!tf_model->task_queue) {
+ goto err;
+ }
+
+ tf_model->inference_queue = ff_queue_create();
+ if (!tf_model->inference_queue) {
+ goto err;
+ }
+
model->model = tf_model;
model->get_input = &get_input_tf;
model->get_output = &get_output_tf;
@@ -1012,10 +1084,9 @@ final:
static DNNReturnType execute_model_tf(RequestItem *request, Queue *inference_queue)
{
TFModel *tf_model;
- TFContext *ctx;
- tf_infer_request *infer_request;
InferenceItem *inference;
TaskItem *task;
+ TFContext *ctx;
inference = ff_queue_peek_front(inference_queue);
if (!inference) {
@@ -1026,22 +1097,16 @@ static DNNReturnType execute_model_tf(RequestItem *request, Queue *inference_que
tf_model = task->model;
ctx = &tf_model->ctx;
- if (task->async) {
- avpriv_report_missing_feature(ctx, "Async execution not supported");
+ if (fill_model_input_tf(tf_model, request) != DNN_SUCCESS) {
return DNN_ERROR;
- } else {
- if (fill_model_input_tf(tf_model, request) != DNN_SUCCESS) {
- return DNN_ERROR;
- }
+ }
- infer_request = request->infer_request;
- TF_SessionRun(tf_model->session, NULL,
- infer_request->tf_input, &infer_request->input_tensor, 1,
- infer_request->tf_outputs, infer_request->output_tensors,
- task->nb_output, NULL, 0, NULL,
- tf_model->status);
+ if (task->async) {
+ return tf_start_inference_async(request);
+ } else {
+ tf_start_inference(request);
if (TF_GetCode(tf_model->status) != TF_OK) {
- tf_free_request(infer_request);
+ tf_free_request(request->infer_request);
av_log(ctx, AV_LOG_ERROR, "Failed to run session when executing model\n");
return DNN_ERROR;
}
@@ -1079,6 +1144,94 @@ DNNReturnType ff_dnn_execute_model_tf(const DNNModel *model, DNNExecBaseParams *
return execute_model_tf(request, tf_model->inference_queue);
}
+DNNReturnType ff_dnn_execute_model_async_tf(const DNNModel *model, DNNExecBaseParams *exec_params) {
+ TFModel *tf_model = model->model;
+ TFContext *ctx = &tf_model->ctx;
+ TaskItem *task;
+ RequestItem *request;
+
+ if (ff_check_exec_params(ctx, DNN_TF, model->func_type, exec_params) != 0) {
+ return DNN_ERROR;
+ }
+
+ task = av_malloc(sizeof(*task));
+ if (!task) {
+ av_log(ctx, AV_LOG_ERROR, "unable to alloc memory for task item.\n");
+ return DNN_ERROR;
+ }
+
+ if (ff_dnn_fill_task(task, exec_params, tf_model, 1, 1) != DNN_SUCCESS) {
+ av_freep(&task);
+ return DNN_ERROR;
+ }
+
+ if (ff_queue_push_back(tf_model->task_queue, task) < 0) {
+ av_freep(&task);
+ av_log(ctx, AV_LOG_ERROR, "unable to push back task_queue.\n");
+ return DNN_ERROR;
+ }
+
+ if (extract_inference_from_task(task, tf_model->inference_queue) != DNN_SUCCESS) {
+ av_log(ctx, AV_LOG_ERROR, "unable to extract inference from task.\n");
+ return DNN_ERROR;
+ }
+
+ request = ff_safe_queue_pop_front(tf_model->request_queue);
+ if (!request) {
+ av_log(ctx, AV_LOG_ERROR, "unable to get infer request.\n");
+ return DNN_ERROR;
+ }
+ return execute_model_tf(request, tf_model->inference_queue);
+}
+
+DNNAsyncStatusType ff_dnn_get_async_result_tf(const DNNModel *model, AVFrame **in, AVFrame **out)
+{
+ TFModel *tf_model = model->model;
+ TaskItem *task = ff_queue_peek_front(tf_model->task_queue);
+
+ if (!task) {
+ return DAST_EMPTY_QUEUE;
+ }
+
+ if (task->inference_done != task->inference_todo) {
+ return DAST_NOT_READY;
+ }
+
+ *in = task->in_frame;
+ *out = task->out_frame;
+ ff_queue_pop_front(tf_model->task_queue);
+ av_freep(&task);
+
+ return DAST_SUCCESS;
+}
+
+DNNReturnType ff_dnn_flush_tf(const DNNModel *model)
+{
+ TFModel *tf_model = model->model;
+ TFContext *ctx = &tf_model->ctx;
+ RequestItem *request;
+ DNNReturnType ret;
+
+ if (ff_queue_size(tf_model->inference_queue) == 0) {
+ // no pending task need to flush
+ return DNN_SUCCESS;
+ }
+
+ request = ff_safe_queue_pop_front(tf_model->request_queue);
+ if (!request) {
+ av_log(ctx, AV_LOG_ERROR, "unable to get infer request.\n");
+ return DNN_ERROR;
+ }
+
+ ret = fill_model_input_tf(tf_model, request);
+ if (ret != DNN_SUCCESS) {
+ av_log(ctx, AV_LOG_ERROR, "Failed to fill model input.\n");
+ return ret;
+ }
+
+ return tf_start_inference_async(request);
+}
+
void ff_dnn_free_model_tf(DNNModel **model)
{
TFModel *tf_model;
@@ -1087,6 +1240,9 @@ void ff_dnn_free_model_tf(DNNModel **model)
tf_model = (*model)->model;
while (ff_safe_queue_size(tf_model->request_queue) != 0) {
RequestItem *item = ff_safe_queue_pop_front(tf_model->request_queue);
+#if HAVE_PTHREAD_CANCEL
+ pthread_attr_destroy(&item->thread_attr);
+#endif
tf_free_request(item->infer_request);
av_freep(&item->infer_request);
av_freep(&item);
@@ -1099,6 +1255,14 @@ void ff_dnn_free_model_tf(DNNModel **model)
}
ff_queue_destroy(tf_model->inference_queue);
+ while (ff_queue_size(tf_model->task_queue) != 0) {
+ TaskItem *item = ff_queue_pop_front(tf_model->task_queue);
+ av_frame_free(&item->in_frame);
+ av_frame_free(&item->out_frame);
+ av_freep(&item);
+ }
+ ff_queue_destroy(tf_model->task_queue);
+
if (tf_model->graph){
TF_DeleteGraph(tf_model->graph);
}
diff --git a/libavfilter/dnn/dnn_backend_tf.h b/libavfilter/dnn/dnn_backend_tf.h
index 3dfd6e4280..aec0fc2011 100644
--- a/libavfilter/dnn/dnn_backend_tf.h
+++ b/libavfilter/dnn/dnn_backend_tf.h
@@ -32,6 +32,9 @@
DNNModel *ff_dnn_load_model_tf(const char *model_filename, DNNFunctionType func_type, const char *options, AVFilterContext *filter_ctx);
DNNReturnType ff_dnn_execute_model_tf(const DNNModel *model, DNNExecBaseParams *exec_params);
+DNNReturnType ff_dnn_execute_model_async_tf(const DNNModel *model, DNNExecBaseParams *exec_params);
+DNNAsyncStatusType ff_dnn_get_async_result_tf(const DNNModel *model, AVFrame **in, AVFrame **out);
+DNNReturnType ff_dnn_flush_tf(const DNNModel *model);
void ff_dnn_free_model_tf(DNNModel **model);
diff --git a/libavfilter/dnn/dnn_interface.c b/libavfilter/dnn/dnn_interface.c
index 02e532fc1b..81af934dd5 100644
--- a/libavfilter/dnn/dnn_interface.c
+++ b/libavfilter/dnn/dnn_interface.c
@@ -48,6 +48,9 @@ DNNModule *ff_get_dnn_module(DNNBackendType backend_type)
#if (CONFIG_LIBTENSORFLOW == 1)
dnn_module->load_model = &ff_dnn_load_model_tf;
dnn_module->execute_model = &ff_dnn_execute_model_tf;
+ dnn_module->execute_model_async = &ff_dnn_execute_model_async_tf;
+ dnn_module->get_async_result = &ff_dnn_get_async_result_tf;
+ dnn_module->flush = &ff_dnn_flush_tf;
dnn_module->free_model = &ff_dnn_free_model_tf;
#else
av_freep(&dnn_module);
--
2.25.1
More information about the ffmpeg-devel
mailing list