Today multicore CPUs are commonplace, however many programmers still didn’t written a single line of a parallel code. And so I did until recently. In this post I will try to share the results of my first experience of parallel programming in Delphi. Maybe it will help someone to start implementing parallel algorithms by himself.
This summer I was working on the voxel 5d NC simulation module. In this module I had a code that looked like this:
for i := 0 to Triangle.Count-1 do begin Grid.GetTriangleExtents(Triangle[i], ixMin, ixMax, iyMin, iyMax); for iy := iyMin to iyMax do begin for ix := ixMin to ixMax do begin if RayTraceTriangle(Triangle[i], x(ix), y(iy), z) then begin Grid[ix,iy].AddIntPoint(Triangle[i], z); end; end; end; end;
So what I had to do was to parallelize the outer loop. I know there are many parallel thread libraries with parallel for out there (actually I know only one for delphi – the OmniThreadLibrary – which is free and open source). In spite of this I decided to write my own stuff (the reason was I could not compile the OTL in BDS 2006 as it was designed for later delphi versions).
The goal was to write a unit STParallel.pas which would be looking like this:
unit STParallel; interface
type TParallelFunction = procedure(i: integer; UserData: pointer); TSTThreadPool = class public procedure ForI(FromIndex, ToIndex: integer; const Func: TParallelFunction; UserData: pointer); end; var Parallel: TSTThreadPool=nil;
So I could rewrite my triangles intersection routine in the form of
procedure IntersectTriangles(i: integer; Triangle: TTriangles); begin Grid.GetTriangleExtents(Triangle[i], ixMin, ixMax, iyMin, iyMax); for iy := iyMin to iyMax do begin for ix := ixMin to ixMax do begin if RayTraceTriangle(Triangle[i], x(ix), y(iy), z) then begin Grid[ix,iy].AddIntPoint(Triangle[i], z); end; end; end; end; Parallel.ForI(0, Triangle.Count-1, @IntersectTriangles, @Triangle);
To implement such a class you should decide how to
- determine the number of available cores and create an array/pool of windows threads,
- suspend and wake up the threads all at once,
- subdivide the task into smaller pieces between the threads.
- Let’s start. The first point is pretty simple. The function that returns the number of available cores is the Windows.GetSytemInfo
function GetProcessorNum: integer; var si: _SYSTEM_INFO; begin GetSystemInfo(si); Result:=si.dwNumberOfProcessors; end;
For the threads I have created the TSTWorkerThread class
interface TExecuteFunction = procedure(tdi: integer) of object; TSTWorkerThread = class(TThread) public Constructor Create; Destructor Destroy; override; Procedure Execute; override; property OnExecute: TExecuteFunction read fOnExecute write fOnExecute; property IsRunning: boolean read fIsRunning write fIsRunning; property Index: integer read fIndex write fIndex; end; implementation procedure TSTThreadPool.InitWorkers; var i: integer; begin tdn := STDef.GetProcessorNum; SetLength(fWorkers, tdn); for i := 1 to High(fWorkers) do begin //Notice! I’m creating tdn-1 threads fWorkers[i] := TSTWorkerThread.Create; fWorkers[i].index := i; fWorkers[i].OnExecute := self.ParallelForExecute; fWorkers[i].Resume; end; end; procedure TSTWorkerThread.Execute; begin while not self.Terminated do fOnExecute(fIndex); end; procedure TSTThreadPool.ParallelForExecute2(tdi: integer); begin ... end;
constructor TSTThreadPool.Create; begin fStartEvent := Windows.CreateEvent(nil, true, false, nil); end; procedure TSTThreadPool.StartWorkers; begin while not Windows.SetEvent(fStartEvent) do; end; procedure TSTThreadPool.StopWorkers; begin while not Windows.ResetEvent(fStartEvent) do; end; procedure TSTThreadPool.ForI(FromIndex, ToIndex: integer; const Func: TParallelFunction2; const UserData: pointer); begin fParallelFunction := Func; fParallelData := UserData; StartWorkers; try ScheduleForTask(FromIndex, ToIndex); finally StopWorkers; end; end; procedure TSTThreadPool.ParallelForExecute(tdi: integer); begin if GetForIndex(tdi, i) then fParallelFunction(i, fParallelData); end else if tdi>0 then begin WaitForSingleObject(fStartEvent, windows.INFINITE); end; end.
The most interesting part is thread scheduling. Here is the problem. We have to call the loop body function N times while having M threads. How can we distribute those N tasks between M threads to minimize the execution time of our Parallel.ForI function? Obviously the time is equal to the execution time of the most slow thread plus some overhead time. We should also consider the fact that iterations can last different time (e.g. when intersecting triangles some triangles are small, some are large, some may lie far away and will be pruned quickly).
Another question is, what the calling process will be doing meanwhile: will it just wait until the workers finish or will it do some work too?
The answers depend on actual use cases of our class. Thus I implemented four different scheduling strategies. Those are:
- Log. atomic,
- Divide and conquer,
- Atomic strategy
This was the first strategy I’ve implemented. The reason is that theoretically the strategy achieves the most even workload distribution between threads. Here is the implementation.
procedure TSTTrheadPool.ForI(FromIndex, ToIndex: integer; const Func: TParallelFunction; UserData: pointer); begin fFromIndex := FromIndex; fToIndex := ToIndex; fTaskCounter := FromIndex-ToIndex+1; fParallelFunction := Func; fParallelData := UserData; StartWorkers; try while (fTaskCounter>0) or (fRunCounter>0) do begin ParallelForExecute(0) end; finally StopWorkers; end; end; procedure TSTThreadPool.ParallelForExecute(tdi: integer); var i: integer; begin if (fTaskCounter>0) then begin AtomicInc(fRunCounter); i := AtomicAdd(-1, fTaskCounter); if i>=0 then begin fParallelFunction(fToIndex-i, fParallelData); end; AtomicDec(fRunCounter); end else if tdi>0 then begin WaitForSingleObject(fStartEvent, windows.INFINITE); end; end;
How it works.
The fTaskCounter variable determines the count of remaining iterations.
The calling process and the worker threads are all executing the same ParallelForExecute function. So we use only N-1 worker threads for an N-core system.
The task is completed when fTaskCounter is zero and all the workers have finished its subtasks. To determine the latter moment I’ve introduced the fRunCounter variable. Every worker increments it when starts executing a task and decrements it at the end. So the exit condition is (fTaskCounter=0) and (fRunCounter=0).
The work stealing process is accomplished in the lines
i := AtomicAdd(-1, fTaskCounter); if i>=0 then begin fParallelFunction(fToIndex-i, fParallelData); end;
The AtomicAdd function just decrements the task counter by one and returns the result. The term “atomic” means that the function is locking, i.e. when two or more threads will be trying to decrement the same variable at the same time, all the threads will be put in a queue by the system hardware so you will always get the correct result. The ordinal arithmetic functions don’t guarantee a correct result in such situations.
That’s it. Theoretically the solution should demonstrate outstanding performance. But what in real life? Well, it depends…The problem is in that magic atomic functions. It seams they are not cheap at all. I thought the triangle intersection routine should be heavy enough to make the overheads look negligible. But it’s not.
Thus the atomic strategy is ideal only for compute intensive tasks when each loop iteration takes a long time. In my case a loop iteration takes a little time while the iteration count is large (1000-1000000). And I’ve developed yet another scheduling strategy.
Log Atomic strategy
The idea of the Log Atomic strategy is pretty simple. What if each worker thread will decrement the task counter not by one, but by some number, depending on the amount of the task counter. The larger the task counter is the more we decrement. This will reduce the overhead introduced by atomic functions many times while still ensuring pretty even workload distribution.
Playing with the decrement amount choosing function I found one which seems to be the only right (there is some math theory behind it, for sure). Here it is.
function GetStep(TaskCounter: integer): integer; begin result := Max(1, TaskCounter div (2*ThreadNumber)); end;
For 100 iterations and 4 threads the sequence will be the following: (12-11-9-8-7-6-5-5-4-4-3-3-2-2-2-2-1-1-1-1-1-1-1-1-1-1-1-1-1-1-1). So there is only 31 decrements instead of 100. For 1000 iterations and 4 threads the sequence is (125-109-95-83-73-64-56-49-43-37-33-29-25-22-19-17-15-13-11-10-9-7-7-6-5-4-4-3-3-3-2-2-2-1-1-1-1-1-1-1-1-1-1-1-1-1-1-1), i.e. 48 decrements instead of 1000. You can see, the law is logarithmic.
Divide and conquer and Interleaved strategies
Sometimes you have a loop with many iterations taking near the same time. In such situations work stealing strategies may be not so efficient as simple even tasks subdivision. By even subdivision I mean an approach when we just take the number of tasks and divide it by the number of workers, than each worker fulfills its tasks range independently of each other.
I’ve implemented two subdivision strategies: the Simple Divide and the Interleaved.
In the Divide strategy every thread executes the continuous range of iterations [FromIndex+Worker.Index*N..Max(ToIndex, FromIndex+Worker.Index*(N+1)-1], where N := UpRound((FromIndex-ToIndex+1)/ThreadCount).
In the interleaved strategy every worker executes iterations from FromIndex+Worker.Index to ToIndex with step := ThreadCount.
Here is the implementation.
procedure RunWorkers(const w: TSTWorkerArray); inline; var i: integer; begin for i := 0 to High(w) do w[i].IsRunning := true;
StartWorkers; end; procedure WaitWorkers(const w: TSTWorkerArray); var i: integer; begin i := 1; repeat if not w[i].IsRunning then inc(i); until i=tdn;
StopWorkers; end; procedure TSTTrheadPool.ForI(FromIndex, ToIndex: integer; const Func: TParallelFunction; UserData: pointer); begin fFromIndex := FromIndex; fToIndex := ToIndex; fTaskCounter := FromIndex-ToIndex+1; fParallelFunction := Func; fParallelData := UserData; RunWorkers(fWorkers); try ParallelForExecute(0); finally WaitWorkers(fWorkers); end; end; procedure TSTThreadPool.ParallelForExecute(tdi: integer); var i, n, si, ti, s, j: integer; begin case fForType of pfDivide: begin if fWorkers[tdi].IsRunning then begin Step := UpRound(n/tdn); si := fFromIndex+Step*tdi; ti := Min(fToIndex, si+Step-1); for i := si to ti do fParallelFunction(i, fParallelData); fWorkers[tdi].IsRunning := false; end else if tdi>0 then begin WaitForSingleObject(fStartEvent, windows.INFINITE); end; end; pfInterleaved: begin if fWorkers[tdi].IsRunning then begin i := fFromIndex+tdi; while i<=fToIndex do begin fParallelFunction(i, fParallelData); i := i+tdn; end; fWorkers[tdi].IsRunning := false; end else if tdi>0 then begin WaitForSingleObject(fStartEvent, windows.INFINITE); end; end; end; end;
As you can see the StartWorkers, StopWorkers functions were replaced with RunWorkers and WaitWorkers functions and we do not use atomic operations at all.
While testing those approaches on small tasks I encountered that the WaitWorkers function takes relatively much time, even if the loop iterations take the same time and distribute equally across the CPU cores. The reason seams to lie in the windows WaitForSingleObject function: the awaiting threads react to our fStartEvent with some delay and asynchronously. Thus they finish also asynchronously. That’s a pity.
My measurements showed that none of the parallel.ForI versions like small loops. What I mean is that you will never reach the theoretical 4x boost on a quad-core CPU trying to parallelize them. In my opinion, the reason of that is that many critical tasks such as thread scheduling and synchronization are implemented in software and not hardware. And this is the common problem. There is no thread library free of that drawback as I know. Thus my recommendation is: parallelize only your most outer loops.
Here is the full source code.