Intel® oneAPI Threading Building Blocks
Ask questions and share information about adding parallelism to your applications when using this threading library.

ffmpeg with TBB

nagy
New Contributor I
625 Views
The latest revision of ffmpeg has some multithreading functionality. However, it uses explicit threads for each instance of a codec.
I have written some code which enables ffmpeg to use the TBB task-scheduler instead of explicit threads.
This makes it possible to call ffmpeg from inside tbb tasks (without any thread blocking) and also avoid oversubscription by explicit threeds.
Thought I'd share for anyone interested in this scenario.
Simply use tbb_avcodec_open/tbb_avcodec_close instead of av_codec_open/av_codec_close.
[cpp]// Author Robert Nagy

#include "tbb_avcodec.h"

#include 
#include 

extern "C" 
{
	#define __STDC_CONSTANT_MACROS
	#define __STDC_LIMIT_MACROS
	#include 
}
	
int task_execute(AVCodecContext* s, std::function&& func, void* arg, int* ret, int count, int size)
{	
    tbb::atomic counter;
	counter = 0;
		
	// Execute s->thread_count number of tasks in parallel.
	tbb::parallel_for(0, s->thread_count, 1, [&](int threadnr) 
	{
		while(true)
		{
			int jobnr = counter++;
			if(jobnr >= count)
				break;

			int r = func(arg, size, jobnr, threadnr);
			if (ret)
				ret[jobnr] = r;
		}
	});
	
    return 0;
}
	
int thread_execute(AVCodecContext* s, int (*func)(AVCodecContext *c2, void *arg2), void* arg, int* ret, int count, int size)
{
	return task_execute(s, [&](void* arg, int arg_size, int jobnr, int threadnr) -> int
	{
		return func(s, reinterpret_cast(arg) + jobnr*size);
	}, arg, ret, count, size);
}

int thread_execute2(AVCodecContext* s, int (*func)(AVCodecContext* c2, void* arg2, int, int), void* arg, int* ret, int count)
{
	return task_execute(s, [&](void* arg, int arg_size, int jobnr, int threadnr) -> int
	{
		return func(s, arg, jobnr, threadnr);
	}, arg, ret, count, 0);
}

void thread_init(AVCodecContext* s)
{
	static const size_t MAX_THREADS = 16; // See mpegvideo.h
	static int dummy_opaque;

    s->active_thread_type = FF_THREAD_SLICE;
	s->thread_opaque	  = &dummy_opaque; 
    s->execute			  = thread_execute;
    s->execute2			  = thread_execute2;
    s->thread_count		  = MAX_THREADS; // We are using a task-scheduler, so use as many "threads/tasks" as possible.
}

void thread_free(AVCodecContext* s)
{
	s->thread_opaque = nullptr;
}

int tbb_avcodec_open(AVCodecContext* avctx, AVCodec* codec)
{
	avctx->thread_count = 1;
	if((codec->capabilities & CODEC_CAP_SLICE_THREADS) && (avctx->thread_type & FF_THREAD_SLICE))
		thread_init(avctx);
	return avcodec_open(avctx, codec); 
}

int tbb_avcodec_close(AVCodecContext* avctx)
{
	thread_free(avctx);
	// ff_thread_free will not be executed since thread_opaque == nullptr.
	return avcodec_close(avctx); 
}[/cpp]
0 Kudos
2 Replies
Alexey-Kukanov
Employee
625 Views

This looks good to me; a clever way to use TBB in a context that was designed with native threads in mind.

I wonder if it can be made even more TBBish, so to say. I have a rough idea, which you can try if you have time and desire.
The following two items can be of interest if there is a desire/need to control the number of threads.

- in thread_init, create a heap-allocated tbb::task_scheduler_init (TSI) object, and initialize it with as many threads as desired (not necessary MAX_THREADS). Keep the address of this object in s->thread_opaque if possible/allowed; if not, a possible solution is a global map that maps AVCodecContext to the address of the corresponding TSI.
- correspondingly in thread_free, obtain and remove the TSI object.

Independently of the above, another potential change is in how to call parallel_for. Instead of using itto merely create enoughthreads, cannot it be used for its direct purpose, like below?

[cpp]    int task_execute(AVCodecContext* s,
                     std::function&& f,
                     void* arg, int* ret, int count, int size)   
    {      
        tbb::atomic counter;   
        counter = 0;   
               
        // Execute 'count' number of tasks in parallel.   
        tbb::parallel_for(tbb::blocked_range(0, count, 2),
                          [&](const tbb::blocked_range &r)    
        {   
            int threadnr = counter++;   
            for(int jobnr=r.begin(); jobnr!=r.end(); ++jobnr)
            {   
                int r = func(arg, size, jobnr, threadnr);   
                if (ret)   
                    ret[jobnr] = r;   
            }
            --counter;
        });   
           
        return 0;   
    }

[/cpp]


This can perform better if count is significantly greater than thread_count, because a) more parallel slack means TBB works more efficiently (which you apparently know), and b) the overhead of the centralized atomic counter is spread over more iterations. Note that I selected the grain size of 2 for blocked_range; this is because the counter is both incremented and decremented inside the loop body, and so at least two iterations per task (and correspondingly, count>=2*thread_count) are necessary to "match" your variant.

0 Kudos
nagy
New Contributor I
625 Views
Some improvements, based on and inspired by Alexeys code above.
[bash]int thread_execute(AVCodecContext* s, int (*func)(AVCodecContext *c2, void *arg2), void* arg, int* ret, int count, int size)
{
	tbb::parallel_for(tbb::blocked_range(0, count), [&](const tbb::blocked_range& r)
	{
		for(size_t n = r.begin(); n != r.end(); ++n)		
		{
			int r = func(s, reinterpret_cast(arg) + n*size);
			if(ret)
				ret = r;
		}
	});

	return 0;
}

int thread_execute2(AVCodecContext* s, int (*func)(AVCodecContext* c2, void* arg2, int, int), void* arg, int* ret, int count)
{	
	tbb::atomic counter;   
    counter = 0;   

	// Note: this will probably only work when tbb::task_scheduler_init::num_threads() < MAX_THREADS(16).
    tbb::parallel_for(tbb::blocked_range(0, count, 2), [&](const tbb::blocked_range &r)    
    {   
        int threadnr = counter++;   
        for(int jobnr = r.begin(); jobnr != r.end(); ++jobnr)
        {   
            int r = func(s, arg, jobnr, threadnr);   
            if (ret)   
                ret[jobnr] = r;   
        }
        --counter;
    });   

    return 0;  
}[/bash]
0 Kudos
Reply