Community
cancel
Showing results for 
Search instead for 
Did you mean: 
aminer10
Beginner
66 Views

Lockfree_mpmc and lockfree ParallelQueue ...


Hello all,

As you will notice, even inside NASA and other institutions
in France (Ariane...) are making errors in there programs...

Read this:

"Although unexpected computer resets and human errors
continue to frustrate NASA/JPL scientists, both Pathfinder
and Sojourner... Current priorities involve fixing
the software bug that is causing Pathfinder to reset its
computer"

"The trouble experienced by the Mars lander "Mars Pathfinder"
is a classic example of problems caused by priority inversion
in realtime systems"

And don't forget about the french Ariane 5 rocket explosion:

http://www.youtube.com/watch?v=z-r9cYp3tTE

So, i have just updated and corrected a problem with
lockfree_mpmc, if you take a look at lockfree_mpmc.pas
inside my lockfree Parallel Queue zip file:

http://pages.videotron.com/aminer/parallelqueue/parallelqueue.htm

You will notice that i have modified the algorithm in the push()
side, and i have used a test like this:

if getlength >= fsize
then
begin
result:=false;
exit;
end;


Now i have tested my new algorithm and it is working very well.

Correctness:

To not make the correctness verification longer, i will concentrate on
the more important parts. If you take a look at the lock-free ParallelQueue
source code , inside lockfree_mpmc.pas you will read this in push side:

function TLockfree_MPMC.push(tm : tNodeQueue):boolean;//stdcall;
var lasttail,newtemp:longword;
begin
if getlength >= fsize
then
begin
result:=false;
exit;
end;
result:=true;
//newTemp:=windows.interlockedincrement(temp);
newTemp:=LockedIncLong(temp);

lastTail:=newTemp-1;
setObject(lastTail,tm);
repeat
if CAS(tail[0],lasttail,newtemp)
then
begin
exit;
end;
sleep(0);
until false;
end;


So, let's say the size of the bounded queue is 1000 , and imagine
that the threads are executing the "if getlength >= fsize " all at the
same time, and imagine that the getlength returns 999, so, the
"if getlength >= fsize" will returns false , and since we have
fSize:=(1 shl aPower) - margin , with a margin of 1000
(margin must be >= to the number of cores) , there will be no
problem(overflow..)...


Now in the pop side, you will read this:

function TLockfree_MPMC.pop(var obj:tNodeQueue):boolean;

var lastHead : longword;
begin
repeat
lastHead:=head[0];
if tail[0]<>head[0]
then
begin
obj:=getObject(lastHead);
if CAS(head[0],lasthead,lasthead+1)
then
begin
result:=true;
exit;
end;
end
else
begin
result:=false;
exit;
end;
until false;
end;


So, as you have noticed, there is a test like this:
if CAS(head[0],lasthead,lasthead+1) , and this test
avoids something like the ABA problem, cause if head
have changed , the CAS() will fail.

I have updated my Threadpool engine with the new lockfree_mpmc,
please download my other modules that use lockfree_mpmc.


Sincerely
Amine Moulay Ramdane.

0 Kudos
60 Replies
Dmitry_Vyukov
Valued Contributor I
63 Views

Quoting aminer10

Correctness:

To not make the correctness verification longer, i will concentrate on
the more important parts. If you take a look at the lock-free ParallelQueue
source code , inside lockfree_mpmc.pas you will read this in push side:

The proof looks rather weak.

The queue passed your "verification", and then you were pointed to some serious problem. Then you fixed it, and they queue passed your "verification" again. Then you were pointed to other serious problem. You fixed it again, and they queue passed your "verification" again. And now you find one more problem, and assert that this time it should be correct... It brings your "verification" process into question...

aminer10
Beginner
63 Views


Hello,


Dmitry Vyukov the prince Jedi is coming back ..

Let me tell you that I have reasonned about the algorithm
and used logic and i have looked at itfrom different sides/perspectives
and i can tell you that it's working.

I invite you to look at the source code Dmitry and you
willsee that am not wrong..



Both the SPMC and MPMC are working now, look at them yourself.


Sincerely
Amine Moulay Ramdane





aminer10
Beginner
63 Views


>The queue passed your "verification", and then you were pointed to some serious problem. Then you fixed
> [...]


That was with the SPMC, i have corrected the problem with SPMC and it's working
well.

Now look at the MPMC it's working well, and invite you to verify ityourself.


Regards,
Amine.

aminer10
Beginner
63 Views



Hello again,


And of course i am usinglockfree ParallelQueue
insidemy threadpool enginethat is using work-stealingand
a lockfree queue for each worker..

Now if you want to reduce even more the contention, you can
for exemple 'batch' the reads...

I have even tested itagainst the locked version,
and it scores far better thanthe locked version.

It's one of the best algorithm and i tell you that it's useful Dmitry :)


Regards,
Amine Moulay Ramdane.


Dmitry_Vyukov
Valued Contributor I
63 Views

Quoting aminer10
Dmitry Vyukov the prince Jedi is coming back ..

Let me tell you that I have reasonned about the algorithm
and used logic and i have looked at itfrom different sides/perspectives
and i can tell you that it's working.

I invite you to look at the source code Dmitry and you
willsee that am not wrong..

function TLockfree_MPMC.getLength:longword;
begin
if tail[0] < head[0]
then result:= (High(longword)-head[0])+(1+tail[0])
else result:=(tail[0]-head[0]);
end;


Here you checking the condition with one set of values, and then doing calculations with another set of values. It renders the check basically useless. You can just remove it w/o any change in behavior. What I am missing?

aminer10
Beginner
63 Views

>Here you checking the condition with one set of values,
>and then doing calculations with another set of values.
>It renders the check basically useless. You can just
>remove it w/o any change in behavior. What I am missing?

To just get an idea of the length and this not a problem.




Amine.



aminer10
Beginner
63 Views



I understand , you mean that i have to save the head and tail
before doing the calcultation...


that's correct, any other thing ?


Amine.


Dmitry_Vyukov
Valued Contributor I
63 Views

Quoting aminer10
>Here you checking the condition with one set of values,
>and then doing calculations with another set of values.
>It renders the check basically useless. You can just
>remove it w/o any change in behavior. What I am missing?

To just get an idea of the length and this not a problem.

Well, but you are using getLength() in your proof...

aminer10
Beginner
63 Views



Take a look more to the code, and tell me if there is other things...


Amine.


Dmitry_Vyukov
Valued Contributor I
63 Views

Quoting aminer10
So, let's say the size of the bounded queue is 1000 , and imagine

that the threads are executing the "if getlength >= fsize " all at the
same time, and imagine that the getlength returns 999, so, the
"if getlength >= fsize" will returns false , and since we have
fSize:=(1 shl aPower) - margin , with a margin of 1000
(margin must be >= to the number of cores) , there will be no
problem(overflow..)...


And what if another thread will push margin items in between? Won't you still get overflow?

aminer10
Beginner
63 Views


Dmitry write:
>And what if another thread will push margin items in between? Won't you still get overflow?

I have tried to make the algorithmhttp://www.emadar.com/fpc/lockfree.htm

to not overflow, but apparently it is not working at all...


Have you any idea Dmitry how to make it work and NOT overflow?,
cause this algorithm is very fast ..


Amine.

Dmitry_Vyukov
Valued Contributor I
63 Views

Quoting aminer10
Take a look more to the code, and tell me if there is other things...

I am fed up about concurrent code that works only in sequential mode.

In either case, the best I can do is to verify it with Relacy... which you can do too.

aminer10
Beginner
63 Views



This algorithm was not mine: http://www.emadar.com/fpc/lockfree.htm

I have tried to make the algorithm not to overflow...

The truth is:

It canoverflow and you can havethe kind ofAriane 5 rocket explosion,
look at this:

http://www.youtube.com/watch?v=z-r9cYp3tTE


Amine Moulay Ramdane.



Dmitry_Vyukov
Valued Contributor I
63 Views

Quoting aminer10

Have you any idea Dmitry how to make it work and NOT overflow?,
cause this algorithm is very fast ..


I have no idea how to fix it. However I have another more scalable algorithm:

http://groups.google.com/group/lock-free/browse_frm/thread/f9ba9895f6d0987d

aminer10
Beginner
63 Views



Hello,

Dmitry look at the following code, it does use locks
in the push side , but it does give better performance than
lockfreE_mpmc algorithm usingmy ParallelQueue of course
and it avoids overflow problem...

Look at this:

------------------

unit RingBuffer1;

interface

uses

{$IFDEF Delphi}windows,{$ENDIF}

{$IFDEF DELPHI2005+}windows,{$ENDIF}

{$IFDEF FreePascal}windows,sysutils,{$ENDIF}

syncobjs,spinlock;

Const ctfree = 0 ;

ctbusy = 1 ;

type

tNodeQueue = tObject;

typecache1 = array[0..15] of longword;

TRingBuffer = class

private

tail,

head: typecache1;

fMask : longword;

fSize : longword;

temp:integer;

tab : array of tNodeQueue;

cs1,cs2:TCriticalSection;

sl1,sl2:TSpinLock;

v,t:longword;

procedure setobject(lp : longword;const aobject : tNodeQueue);

function getLength:longword;

function getSize:longword;

function getObject(lp : longword):tNodeQueue;

public

constructor create(aPower : integer =20); {allocate tab with size equal 2^aPower, for 20 size is equal 1048576}

destructor Destroy; override;

procedure EnterCS;

procedure LeaveCS;

function push(tm : tNodeQueue):boolean;

function pop(var obj:tNodeQueue):boolean;

property length : longword read getLength;

property count: longword read getLength;

property size : longword read getSize;

end;

implementation

function CAS(var Target: longword; Comperand: longword;NewValue: longword ): boolean; assembler;stdcall;

asm

mov ecx,Target

mov edx,NewValue

mov eax,Comperand

lock cmpxchg [ecx],edx

JNZ @@2

MOV AL,01

JMP @@Exit

@@2:

XOR AL,AL

@@Exit:

end;

Procedure TRingBuffer.EnterCS;

Begin

Repeat

asm pause end;

Until CAS(V,0,1);

sleep(0);

End;

Procedure TRingBuffer.LeaveCS;

Begin

V:=ctfree;

asm pause end;

//sleep(0);

end;

constructor TRingBuffer.create(aPower : integer );

begin

cs1:=TCriticalSection.create;

cs2:=TCriticalSection.create;

sl1:=TSpinlock.create;

sl2:=TSpinlock.create;

fMask:=not($FFFFFFFF shl aPower);

fSize:=(1 shl aPower) ;

setLength(tab,fSize);

tail[0]:=0;

head[0]:=0;

V:=ctfree;

end;

destructor TRingBuffer.Destroy;

begin

cs1.free;

cs2.free;

setLength(tab,0);

inherited Destroy;

end;

procedure TRingBuffer.setObject(lp : longword;const aobject : tNodeQueue);

begin

tab[lp and fMask]:=aObject;

end;

function TRingBuffer.getObject(lp : longword):tNodeQueue;

begin

result:=tab[lp and fMask];

end;

function TRingBuffer.push(tm : tNodeQueue):boolean;

begin

result:=true;

//cs1.enter;

sl1.Acquire;

if getlength >= fsize

then

begin

result:=false;

//cs1.leave;

sl1.Release;

exit;

end;

setObject(tail[0],tm);

inc(tail[0]);

//cs1.leave;

sl1.release;

end;

function TRingBuffer.pop(var obj:tNodeQueue):boolean;

var lastHead : longword;

begin

repeat

lastHead:=head[0];

if tail[0]<>head[0]

then

begin

obj:=getObject(lastHead);

if CAS(head[0],lasthead,lasthead+1)

then

begin

result:=true;

exit;

end;

end

else

begin

result:=false;

exit;

end;

until false;

end;

function TRingBuffer.getLength:longword;

begin

if tail[0] < head[0]

then result:= (High(longword)-head[0])+(1+tail[0])

else result:=(tail[0]-head[0]);

end;

function TRingBuffer.getSize:longword;

begin

result:=fSize;

end;

end.

---------------

aminer10
Beginner
63 Views




I have tested Ringbuffer1(it does use locks in the push side) and if
i combine it with my parallelqueue it does give better performance
than lockfree_mpmc + parallelqueue.



Amine.
aminer10
Beginner
63 Views


Hello,

I have uploaded the new version ParallelQueue version 1.22 to my website, lockfree_mpmc does use a margin of 1000 threads now..


I have changed getlength to:


---------

function TLockfree_MPMC.getLength:longword;

var head1,tail1:longword;

begin

head1:=head[0];

tail1:=tail[0];

if tail1 < head1

then result:= (High(longword)-head1)+(1+tail1)

else result:=(tail1-head1);

end;

-----------


So, let's say the size of the bounded queue is 1000 , and imagine that the threads are executing the "if getlength >= fsize " all at the same time, and imagine that the getlength returns 999, so, the "if getlength >= fsize" will returns false , and since we have a fSize:=(1 shl aPower) - margin , with a margin of 1000 (margin must be >= to the number of threads) , there will be no problem(overflow..)...


And i think that's correct now.


Sincerely,
Amine Moulay Ramdane


aminer10
Beginner
63 Views



My website:

http://pages.videotron.com/aminer/


Regards,
Amine Moulay Ramdane.
aminer10
Beginner
63 Views



Dmitry wrote:
>And what if another thread will push margin items in between? Won't you still get overflow?


margin is limited to 1000 threads in the 1.22 version, it will work.

Imagine that fsize is 1000 and in the worst case we are at 999 items, if
all the 1000 threads have crossed there will still be no problem, no overflow.

The algorithm is correct now.



Amine.
aminer10
Beginner
11 Views



Hello,

And if you need a margin more than 1000 threads, you can change it easily...


That's cool now.


Welcome: http://pages.videotron.com/aminer/


Amine Moulay Ramdane.
Reply