- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
After watching one of the GDC/TBB seminars, and afterwards picking the presenter's head, we came up with a simple Actor (Erlang-style) based on TBB.
And the current state is more or less like this:
[cpp]templateclass Actor { typedef Actor Actor_t; class MessageHandlerTask : public tbb::task { Actor_t& pActor; public: //MessageHandlerTask methods MessageHandlerTask(Actor_t& actor) : pActor(actor) {} tbb::task * execute() { pActor.processAllMessages(); return NULL; } }; public: void inbox(const MSG_TYPE& message,bool cleanup=false) { messageQueue.push(std::make_pair(message,cleanup)); if(isProcessingNow.compare_and_swap(false, true) == true) { // allocate and spawn the MessageHandlerTask pMessageHandlerTask = new(pRootTask->allocate_additional_child_of(*pRootTask)) MessageHandlerTask(*this); pRootTask->spawn(*pMessageHandlerTask); } } Actor(Actor* parent=NULL) :pParent(parent) { if(pParent) { //borrow parent's root pRootTask = pParent->pRootTask; } else { //I'm a root so allocate a new root pRootTask = new(tbb::task::allocate_root()) tbb::empty_task(); pRootTask->set_ref_count(2); } } virtual ~Actor() { if(!pParent) //I'm a root { if(pRootTask != NULL) { tbb::spin_mutex::scoped_lock(spinMutex); if(pRootTask != NULL) { pRootTask->wait_for_all(); pRootTask->destroy(*pRootTask); pRootTask = NULL; } } } } //protected: //in reality this method should only be called by the actor itself to create children Actor* CreateChildActor() { return new Actor(this); } protected: virtual void messageHandler(MSG_TYPE& message) { //User's Actor implementation } virtual void cleanup(MSG_TYPE& message) { //User's Actor implementation } private: void processAllMessages() { std::pair pData; while(messageQueue.pop_if_present(pData)) { messageHandler(pData.first); if(pData.second==true) { cleanup(pData.first); } } isProcessingNow = false; } Actor_t* pParent; tbb::task* pRootTask; MessageHandlerTask* pMessageHandlerTask; tbb::atomic isProcessingNow; tbb::spin_mutex spinMutex; tbb::concurrent_queue messageQueue; };[/cpp]
Any suggestions related to the implementation?
Link Copied
11 Replies
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hmm, isn't TBB all about creating new tasks, not new threads (see pre-existing thread "Support for actor programming model")?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Quoting - Raf Schietekat
Hmm, isn't TBB all about creating new tasks, not new threads (see "Support for actor programming model")?
Hmm, that's why this actor uses tasks to execute it's message handling, leaving threading to the scheduler, in a fairly TBB-style, or that's what it looks like to me. There is no thread creation in the above code
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Quoting - robert.jay.gould
Hmm, that's why this actor uses tasks to execute it's message handling, leaving threading to the scheduler, in a fairly TBB-style, or that's what it looks like to me. There is no thread creation in the above code
Here's a real question: are there any blocking calls, why (not), and (if so) wouldn't this cause problems like starvation and/or stack explosion? That's just what I remember from before, I haven't looked at the code yet (got to run now).
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
That may deadlock. Check handling of the isProcessingNow variable in processAllMessages() and in inbox(). Think of your queueing mechanism as Peterson's mutual exclusion algorithm with variables being isProcessingNow and messageQueue being empty or not.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Quoting - Raf Schietekat
Sorry, that was just a silly joke, and you saw the version from before I slightly clarified what I meant.
Here's a real question: are there any blocking calls, why (not), and (if so) wouldn't this cause problems like starvation and/or stack explosion? That's just what I remember from before, I haven't looked at the code yet (got to run now).
Here's a real question: are there any blocking calls, why (not), and (if so) wouldn't this cause problems like starvation and/or stack explosion? That's just what I remember from before, I haven't looked at the code yet (got to run now).
Well the purpose of this Actor is for simple processing and AI-like uses, where IO shouldn't occur, because, well that's not what this Actor was made for :)
Anyways I was thinking about providing an async message system so an Actor can wrap an IO action and get a message back when finished (to allow for some IO when utterlynecessary), but this design ignores IO in general
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Quoting - Dmitriy Vyukov
That may deadlock. Check handling of the isProcessingNow variable in processAllMessages() and in inbox(). Think of your queueing mechanism as Peterson's mutual exclusion algorithm with variables being isProcessingNow and messageQueue being empty or not.
Thanks for the advice
I got rid of the deadlock and some other issues with that approach
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Quoting - robert.jay.gould
Any suggestions related to the implementation?
I agree with Dmitry that it can deadlock.
Another note is that since there is a single consumer of the queue in any given time, a queue implementation that is customized for such case might work better than the generic tbb::concurrent_queue. Only measurements will tell for sure, of course.
The third note is that the usage model should assume there is a lot of actors to be efficient and scalable. I am sure you targeted exactly that (as opposed to a few actors with a lot of processing for messages).
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Robert and all,
This is very cool to see my know-nothing-about-Actors sample turned into something real. I would be very interested in hearing from any of you how you are using a TBB-backed Actor implementation.
Also, I'm not seeing the possibility for deadlock on isProcessingNow... sounds like you've cleaned that up in an edit already.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Quoting - Brad Werth (Intel)
Robert and all,
This is very cool to see my know-nothing-about-Actors sample turned into something real. I would be very interested in hearing from any of you how you are using a TBB-backed Actor implementation.
Also, I'm not seeing the possibility for deadlock on isProcessingNow... sounds like you've cleaned that up in an edit already.
Hi Brad,
the deadlock is really tricky, but more so was the dealing with the tbb's arena implementation's internals. Mostly because tbb doesn't like one thread touching tasks that belong to other threads.
anyways I improved the Actor and will post it's latest iteration now, comments more than welcomed
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Ok I fixed several issues and sprinkled in some task continuation. Now the actor seems to work fine, and doesn't deadlock (as far as I've been able to see). So here it is:
[cpp]//prototype templateNow I made a test scenario, which consists of each actor receiving a number, and passing the number-1 to the actor to it's "right". I'm calling this a spiral countdown. Anyways the objective is to have actors collaborating with heavy dependence on others.class Actor { public: typedef Actor Actor_t; typedef typename MSG_TYPE Msg_t; private: class MessageHandlerTask : public tbb::task { Actor_t& pActor; public: //MessageHandlerTask methods MessageHandlerTask(Actor_t& actor) : pActor(actor) {} tbb::task * execute() { pActor._processAllMessages(); return NULL; } }; public: Actor(long id, Actor_t* parent=NULL) : pParent(parent) , myId(id) { isProcessingNow = false; childrenCounter = 0; if(pParent) { //borrow parent's root pRootTask = pParent->pRootTask; } else { //I'm the root actor so allocate a root pRootTask = new(tbb::task::allocate_root()) tbb::empty_task(); pRootTask->set_ref_count(++childrenCounter); } } virtual ~Actor() { waitUntilDone(); if(!messageQueue.empty()) { printf("Actor %i was destroyed with %i messages in its queue leftn",myId,messageQueue.size()); } } template Actor* createChild(long id) { static_cast (this);//Static assert that the Child's class is derived from same Root class pRootTask->set_ref_count(++childrenCounter); //TODO recycling optimization return new CHILDCLASS(id,this); } const long getId() { return myId; } //Mailbox for external interface void inbox(const MSG_TYPE& message, bool cleanup=false) { _inbox(this,message,cleanup); } //TODO Not sure if this should really be a public interface or not, as it Root only void waitUntilDone() { if(!pParent) { if(pRootTask != NULL) { tbb::spin_mutex::scoped_lock(spinMutex); if(pRootTask != NULL) { while(childrenCounter!=1) { int c = pRootTask->ref_count(); pRootTask->wait_for_all(); } printf("all actions finishedn"); pRootTask->destroy(*pRootTask); pRootTask = NULL; } } } } protected: //Behavior overrides virtual bool messageHandler(MSG_TYPE& message) { //User's Actor implementation return false; } virtual void cleanup(MSG_TYPE& message) { //User's Actor implementation } //Access for an actor to mail another actor void sendMessage(Actor_t& target,const MSG_TYPE& message, bool cleanup=false) { target._inbox(this,message,cleanup); } private: void _inbox(Actor_t* sender, const MSG_TYPE& message, bool cleanup=false) { messageQueue.push(std::make_pair(message,cleanup)); if(sender==this) { _processMailBox(pRootTask); } else { _processMailBox(sender->pMessageHandlerTask); } } void _processAllMessages() { std::pair pData; bool ok = true; do { isProcessingNow = true; if(messageQueue.pop_if_present(pData)) { ok = messageHandler(pData.first); if(pData.second==true) { cleanup(pData.first); } } }while(ok && !messageQueue.empty()); isProcessingNow = false; if(!ok) { pParent->_destroyChild(const_cast (this)); } } void _processMailBox(tbb::task* root) { if((messageQueue.empty()==false) && !(isProcessingNow.compare_and_swap(true, false))) { // allocate and spawn the MessageHandlerTask tbb::empty_task& continuation = *new( root->allocate_continuation() ) tbb::empty_task; pMessageHandlerTask = new(continuation.allocate_child()) MessageHandlerTask(*this); continuation.set_ref_count(1); root->spawn(*pMessageHandlerTask); } } void _destroyChild(Actor* child) { pRootTask->set_ref_count(--childrenCounter); //TODO recycling optimization delete child; } const long myId;//Actor Id Actor* pParent; tbb::task* pRootTask; MessageHandlerTask* pMessageHandlerTask; tbb::concurrent_queue messageQueue; tbb::atomic isProcessingNow; //These could be refactored out to a RootActorClass tbb::atomic childrenCounter; tbb::spin_mutex spinMutex; }; [/cpp]
[cpp]typedef ActorTestActor; //Note: Because Actors are doing a spiral countdown, the std::vector's alignment seems to be better than concurrent_vector typedef std::vector Children; static const size_t ActorsN = 1000; static Children children (ActorsN); Children& GetChildren() { return children; } class ActorTypeA: public TestActor { public: ActorTypeA(long id,TestActor* root=NULL) : TestActor(id,root) {} protected: virtual bool messageHandler(TestActor::Msg_t& message) { if(message!=0) { size_t nextId = (getId()+1)%ActorsN; TestActor* next = GetChildren()[nextId]; sendMessage(*next,message-1); return true; } else { printf("Actor %i finished!n",getId()); return false;//kill me } } }; int main(int argc, char* argv[]) { tbb::task_scheduler_init init;//(1); { tbb::tick_count t0; Children &children = GetChildren(); { TestActor root(-1); { Children::iterator iter = children.begin(); Children::iterator end = children.end(); size_t counter = 0; for(;iter!=end;++iter,++counter) { *iter = root.createChild (counter); } for(iter = children.begin();iter!=end;++iter,++counter) { //worst case (?) style spiral countdown (*iter)->inbox(ActorsN); } } t0 = tbb::tick_count::now(); root.waitUntilDone(); tbb::tick_count t1 = tbb::tick_count::now(); printf ("time = %.1f msecn", (t1-t0).seconds()*1000); } } return 0; }[/cpp]
Some things I noticed were that interesting were that when the Actors' N was between 10 to 100, it scales linearly, but when N > 1000 it actually scales supralinearly, which I guess is due to the TBB scheduler working it's magic.
Any ideas?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Also as per Alexey's suggestion I implemented another version of the actor with a custom queue (a C++ version of Dmitriy's agent queue here with a tbb::concurrent_allocator). That boost's performance by about 25% when using an N>1000.

Reply
Topic Options
- Subscribe to RSS Feed
- Mark Topic as New
- Mark Topic as Read
- Float this Topic for Current User
- Bookmark
- Subscribe
- Printer Friendly Page