In this post I will reveal some simple yet efficient techniques to address and resolve race conditions in parallel algorithms.
Do you remember my shiny parallel version of the triangle intersection code?
procedure IntersectTriangles(i: integer; Triangle: TTriangles); begin Grid.GetTriangleExtents(Triangle[i], ixMin, ixMax, iyMin, iyMax); for iy := iyMin to iyMax do begin for ix := ixMin to ixMax do begin if RayTraceTriangle(Triangle[i], x(ix), y(iy), z) then begin Grid[ix,iy].AddIntPoint(Triangle[i], z); end; end; end; end; Parallel.ForI(0, Triangle.Count-1, @IntersectTriangles, @Triangle);
There is one problem with it. Imagine such a situation: two threads are trying to add intersection points with the same [ix, iy] to the Grid at the same time. What result will we get? Most likely only one intersection point will be added and the whole algorithm will fail. Such situations are usually called race conditions.
Basically there is only one thing, that can lead to race conditions in your parallel code. This is writing to a memory shared across threads. Typical examples of shared memory include input and output variables, counters, storages etc. While working on my own problem I was faced only with some of them. Here I will just show you my actual solutions to the actual problems, but at first enjoy my boring lecture.
I was thinking about parallel programming a long time ago. I was just looking at the modules written by me and wondering how to rewrite an existing heavily optimized sequential code into an equally efficient parallel version. And almost every time I was stuck, because I couldn’t figure out how to avoid writing to a shared memory. Than I started thinking that avoiding writing to a shared memory is impossible and this is a common problem and there should be existing solutions for it. And I started digging through the internet, MSDN, programming manuals etc. What I’ve found at first were mutexes, semaphores, critical sections, and many other scary words, the meaning of which I still do not understand. The proposed solutions seamed to me cumbersome and inefficient at the same time. For example, the critical sections. What a stupid idea is to serialize a peace of a potentially thread unsafe code every time you call it when the probability of a race condition is one to thousand? I am pretty sure that critical sections would make my parallel code many times slower than the sequential version.
So I continued my research and I’ve found it – the LOCK. Lock is a prefix instruction in the x86 assembler language. It turns ordinal arithmetic instructions such as inc, dec, add, sub and others into their atomic versions.
The right time to explain the term Atomic.
Every CPU instruction decomposes to more elementary operations. For example, the inc dword ptr [eax] instruction involves at least three steps.
- Read a DWORD from the memory location stored at eax into an accumulator.
- Increment the accumulator.
- Write the result back to the memory.
That’s why when several threads are operating on the same variable at the same time there is no guarantee that the resulting variable value will be correct.
- Opposed to the ordinal instructions atomics are executed as a whole without subdivision to more elementary operations (hence their name). This fact makes them thread safe. On the x86 architecture atomics are implemented either by locking the system bus or via the cache coherency mechanism when appropriate. Hence the Lock prefix.
- In the Windows API the atomic instructions are referred as interlocked. You have probably seen or used the InterlockedIncrement, InterlockedDecrement functions in the standard implementations of the IUnknown.AddRef, IUnknown.Release methods. Windows offers wrappers for all atomic instructions, but I wrote my own versions for some of them. Here they are.
procedure AtomicInc(var i: integer); procedure AtomicDec(var i: integer); function AtomicAdd(Addend: integer; var Dest: integer): integer; function AtomicSwap(Data: pointer; var Dest): pointer; overload; function CAS32(oldValue: integer; newValue: integer; var destination): boolean;
procedure AtomicInc(var i: integer); //i := i+1; asm lock inc dword ptr[eax]; end; procedure AtomicDec(var i: integer); //i := i-1; asm lock dec dword ptr[eax]; end;
Sometimes you just need count some number while not caring of the actual value of this number. Than just use the AtomicInc, AtomicDec functions.
You have already seen how I used atomic counters in my implementation of the Paralel For cycle to increment and decrement fRunCounter.
procedure TSTTrheadPool.ForI(FromIndex, ToIndex: integer; const Func: TParallelFunction; UserData: pointer); begin StartWorkers; try while (fTaskCounter>0) or (fRunCounter>0) do begin ParallelForExecute(0) end; finally StopWorkers; end; end; procedure TSTThreadPool.ParallelForExecute(tdi: integer); var i: integer; begin if (fTaskCounter>0) then begin AtomicInc(fRunCounter); i := AtomicAdd(-1, fTaskCounter); if i>=0 then begin fParallelFunction(fToIndex-i, fParallelData); end; AtomicDec(fRunCounter); end; end;
function AtomicAdd(Addend: integer; var Dest: integer): integer; asm //Dest := Dest+Addend; //result := Dest; mov ecx, eax; lock xadd [edx], eax; add eax, ecx; end;
The AtomicAdd function is much more powerful than AtomicInc or AtomicDec as it can return the result of addition. It is possible because internally it is implemented not with the add instruction, but using the xadd instruction which performs two operations: adds two numbers and exchanges them.
AtomicAdd is very useful if you want to implement a lock free thread safe array. Here is a simple example, assuming you know the maximum number of items in advance and you only add items.
TThreadSafePointerArray = class protected fTop, fMaxCount: integer; fpp: array of pointer; public function Add(p: pointer): integer; end; function TThreadSafePointerArray.Add(p: pointer): integer; begin result := AtomicAdd(1, fTop); ASSERT(result<fMaxCount, 'Array bounds exceeded!'); fpp[result] := p; end;
function AtomicSwap(Data: pointer; var Dest): pointer; overload; asm //result := Dest; //Dest := Data; lock xchg [edx], eax; end;
AtomicSwap replaces a DWORD variable Dest (int32, pointer32, etc.) with your Data (in this case, a pointer) and returns the previous value of the variable in the result.
What is it useful for? For example, for thread safe destruction of objects and releasing interface references.
procedure FreeAndNilObject(var Obj); var Temp: TObject; begin if Obj<>nil then begin Temp := TObject(AtomicSwap(nil, Obj)); if Temp<>nil then Temp.Destroy; end; end; procedure FreeAndNilInterface(var Intf); var Temp: IUnknown; begin if Intf<>nil then begin Temp := IUnknown(AtomicSwap(nil, Intf)); Temp := nil; end; end;
Atomic Compare and Swap/CAS
function CAS32(oldValue: integer; newValue: integer; var destination): boolean; asm //if destination=oldValue then begin // destination := newValue; // result := true; //end else // result := false; lock cmpxchg dword ptr [destination], newValue setz al end;
The most powerful atomic function is the so called Compare And Swap routine or simply CAS. It takes three parameters: OldValue, NewValue and Destination. If the value of the destination variable equals to the OldValue than CAS replaces it with the NewValue and returns true, otherwise it returns false.
CAS is widely used in thread safe implementations of dynamic data structures such as queues, stacks, etc. Here is an example of how CAS is used to safely insert a node in a queue (Note: it works only if you are not trying to add and remove nodes at the same time).
TTThreadSafeQueue = class type PQueueNode = ^TQueueNode; TQueueNode=record next; PQueueNode; Data: TQueueData; end; protected fFirstNode: PQueueNode; fStorage: TThreadSafeStorage; public procedure PushFront(const Data: TQueueData); end; procedure TThreadSafeQueue.PushFront(const Data: TQueueData); var NewNode: PQueueNode; begin NewNode := fStorage.GetNew; NewNode.Data := Data; repeat NewNode.next := fFirstNode; until CAS32(fFirstNode, NewNode, fFirstNode); end;
The above examples are widespread but still special cases. In more complicated situations you need serialize a thread unsafe code. The super simple, elegant and efficient way to do so is to use spinlocks, which again are easily implemented with atomic CAS and dec instructions.
Here is my humble implementation of spinlocks.
procedure Lock(var Lk: integer); asm //while not CAS32(0, 1, Lk) do; mov edx, eax mov ecx, 1 @Loop: mov eax, 0 lock cmpxchg dword ptr [edx], ecx jnz @Loop end; procedure UnLock(var Lk: integer); asm lock dec dword ptr[eax]; end;
The function Lock takes one “ByRef” argument Lk which intends to be a lock variable. A lock variable is a DWORD which in an unlocked state equals to zero while in a locked state resembles 1. The Lock function spins in a tight loop until it manages to replace the Lk value from 0 to 1 using the CAS instruction. The UnLock function just decrements the Lk variable by one, thus returning it to the unlocked 0 state.
Here is a good example of the power of the Lock/Unlock pair.
type TIntPointGrid = class type PIntPointArray = ^TIntPointArray; TIntPointArray = record Lk: integer; pp: array of TIntPoint; end; protected fPP: array of array of TIntPointArray; public function AddIntPoint(ix, iy: integer; const p: TIntPoint); end; function TIntPointGrid.AddIntPoint(ix, iy: integer; const p: TIntPoint); var ip: PIntPointArray; begin ip := @fPP[ix, iy]; Lock(ip.Lk); try n := length(ip.pp); SetLength(ip.pp, n+1); ip.pp[n] := p; finally UnLock(ip.Lk); end; end;
That is a possible solution (not the actual) for my add intersection problem. I just added the Lk field to my intersection points array data structure and used it as a lock variable. You can notice that the worker threads are locked only when they are trying to add intersection points to the same cells, what is a rare case. If I use critical sections I would be forced either to create NxM ones for all of the grid cells, which is a waste, or to include the whole AddIntPoint function into one critical section, which would lead to a pour performance.
Completely avoiding race conditions
Using atomics and spinlocks to resolve memory conflicts is good but even better is to completely avoid race conditions. The good news is that I’ve invented an easy way to do it in most cases.
As you know, I’ve implemented my own version of the Parallel For cycle. So I am free to modify it any way I want. What I’ve done is
- I’ve introduced the global variable tdn: integer which resembles the maximum number of the worker threads in my parallel.ForI.
- Each worker thread has an index greater or equal to zero and less than tdn.
- The TParalleForFunction callback now takes one more parameter – the ThreadIndex, which is the index of the caller thread.
TParallelFunction2 = procedure(i: integer; TdI: integer; UserData: pointer);
Knowing the tdn and the ThreadIndex you can create tdn number of the supporting data structures instead of one so that each thread will be dealing with its own data set. Than at a final stage you will just collect the generated data from the tdn structures into one. Nice and easy.Here is an example, which computes a bounding box of some objects in a parallel loop.
function CalculateBox(const Objects: array of IObjectWithBox): TBox; type PBoxData=TBoxData; TBoxData=record Objects: array of IObjectWithBox; bb: array of TBox; end; procedure CalcBox(i: integer; tdi: integer; BoxData: PBoxData); begin BoxData.bb[tdi].AddBox(BoxData.Objects[i].GetBox); end; var b: TBox; BoxData: TBoxData; begin result.IsEmpty := true; BoxData.Objects := Objects; SetLength(BoxData.bb, Parallel.tdn); for i := 0 to tdn-1 do BoxData.bb[i].IsEmpty := true; try Parallel.ForI(0, High(Objects), @CalcBox, @BoxData); finally for i := 0 to tdn-1 do result.AddBox(BoxData.bb[i]); end; end;
As you can see, I just create tdn temporary boxes, so that each parallel worker fills its own box, than at the final stage I just add the temporal boxes into the result. The similar way you can do many things.
Parallel programming is a little bit tricky, mostly due to shared memory conflicts and things like that. Even such common operations as assigning a value to a variable or destroying an object may require special treatment when used in parallel algorithms. On the other hand, parallel programming is all about performance.
- Use tdn and tdi in Parallel.ForI and create separate data structures for each thread to completely avoid memory conflicts.
- Use atomics and spinlocks instead of critical sections and other crap. This way you will lock the data, not the code and only when it is really necessary. Here is the source code, which includes STParallel.pas with Paralel.ForI and atomics, and PageStorage.pas with a thread safe memory manager.