- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
[cpp]// Author Robert Nagy #include "tbb_avcodec.h" #include#include extern "C" { #define __STDC_CONSTANT_MACROS #define __STDC_LIMIT_MACROS #include } int task_execute(AVCodecContext* s, std::function && func, void* arg, int* ret, int count, int size) { tbb::atomic counter; counter = 0; // Execute s->thread_count number of tasks in parallel. tbb::parallel_for(0, s->thread_count, 1, [&](int threadnr) { while(true) { int jobnr = counter++; if(jobnr >= count) break; int r = func(arg, size, jobnr, threadnr); if (ret) ret[jobnr] = r; } }); return 0; } int thread_execute(AVCodecContext* s, int (*func)(AVCodecContext *c2, void *arg2), void* arg, int* ret, int count, int size) { return task_execute(s, [&](void* arg, int arg_size, int jobnr, int threadnr) -> int { return func(s, reinterpret_cast (arg) + jobnr*size); }, arg, ret, count, size); } int thread_execute2(AVCodecContext* s, int (*func)(AVCodecContext* c2, void* arg2, int, int), void* arg, int* ret, int count) { return task_execute(s, [&](void* arg, int arg_size, int jobnr, int threadnr) -> int { return func(s, arg, jobnr, threadnr); }, arg, ret, count, 0); } void thread_init(AVCodecContext* s) { static const size_t MAX_THREADS = 16; // See mpegvideo.h static int dummy_opaque; s->active_thread_type = FF_THREAD_SLICE; s->thread_opaque = &dummy_opaque; s->execute = thread_execute; s->execute2 = thread_execute2; s->thread_count = MAX_THREADS; // We are using a task-scheduler, so use as many "threads/tasks" as possible. } void thread_free(AVCodecContext* s) { s->thread_opaque = nullptr; } int tbb_avcodec_open(AVCodecContext* avctx, AVCodec* codec) { avctx->thread_count = 1; if((codec->capabilities & CODEC_CAP_SLICE_THREADS) && (avctx->thread_type & FF_THREAD_SLICE)) thread_init(avctx); return avcodec_open(avctx, codec); } int tbb_avcodec_close(AVCodecContext* avctx) { thread_free(avctx); // ff_thread_free will not be executed since thread_opaque == nullptr. return avcodec_close(avctx); }[/cpp]
Link Copied
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
This looks good to me; a clever way to use TBB in a context that was designed with native threads in mind.
I wonder if it can be made even more TBBish, so to say. I have a rough idea, which you can try if you have time and desire.
The following two items can be of interest if there is a desire/need to control the number of threads.
- in thread_init, create a heap-allocated tbb::task_scheduler_init (TSI) object, and initialize it with as many threads as desired (not necessary MAX_THREADS). Keep the address of this object in s->thread_opaque if possible/allowed; if not, a possible solution is a global map that maps AVCodecContext to the address of the corresponding TSI.
- correspondingly in thread_free, obtain and remove the TSI object.
Independently of the above, another potential change is in how to call parallel_for. Instead of using itto merely create enoughthreads, cannot it be used for its direct purpose, like below?
[cpp] int task_execute(AVCodecContext* s, std::function&& f, void* arg, int* ret, int count, int size) { tbb::atomic counter; counter = 0; // Execute 'count' number of tasks in parallel. tbb::parallel_for(tbb::blocked_range (0, count, 2), [&](const tbb::blocked_range &r) { int threadnr = counter++; for(int jobnr=r.begin(); jobnr!=r.end(); ++jobnr) { int r = func(arg, size, jobnr, threadnr); if (ret) ret[jobnr] = r; } --counter; }); return 0; } [/cpp]
This can perform better if count is significantly greater than thread_count, because a) more parallel slack means TBB works more efficiently (which you apparently know), and b) the overhead of the centralized atomic counter is spread over more iterations. Note that I selected the grain size of 2 for blocked_range; this is because the counter is both incremented and decremented inside the loop body, and so at least two iterations per task (and correspondingly, count>=2*thread_count) are necessary to "match" your variant.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
[bash]int thread_execute(AVCodecContext* s, int (*func)(AVCodecContext *c2, void *arg2), void* arg, int* ret, int count, int size) { tbb::parallel_for(tbb::blocked_range(0, count), [&](const tbb::blocked_range & r) { for(size_t n = r.begin(); n != r.end(); ++n) { int r = func(s, reinterpret_cast (arg) + n*size); if(ret) ret = r; } }); return 0; } int thread_execute2(AVCodecContext* s, int (*func)(AVCodecContext* c2, void* arg2, int, int), void* arg, int* ret, int count) { tbb::atomic counter; counter = 0; // Note: this will probably only work when tbb::task_scheduler_init::num_threads() < MAX_THREADS(16). tbb::parallel_for(tbb::blocked_range (0, count, 2), [&](const tbb::blocked_range &r) { int threadnr = counter++; for(int jobnr = r.begin(); jobnr != r.end(); ++jobnr) { int r = func(s, arg, jobnr, threadnr); if (ret) ret[jobnr] = r; } --counter; }); return 0; }[/bash]
- Subscribe to RSS Feed
- Mark Topic as New
- Mark Topic as Read
- Float this Topic for Current User
- Bookmark
- Subscribe
- Printer Friendly Page