Parallel programming in Delphi part I. Parallel For

Today multicore CPUs are commonplace, however many programmers still didn’t written a single line of a parallel code. And so I did until recently. In this post I will try to share the results of my first experience of parallel programming in Delphi. Maybe it will help someone to start implementing parallel algorithms by himself.

Parallel.ForI()

This summer I was working on the voxel 5d NC simulation module. In this module I had a code that looked like this:

  for i := 0 to Triangle.Count-1 do begin
    Grid.GetTriangleExtents(Triangle[i], ixMin, ixMax, iyMin, iyMax);
    for iy := iyMin to iyMax do begin
      for ix := ixMin to ixMax do begin
        if RayTraceTriangle(Triangle[i], x(ix), y(iy), z) then begin
          Grid[ix,iy].AddIntPoint(Triangle[i], z);
        end;
      end;
    end;
  end;

So what I had to do was to parallelize the outer loop. I know there are many parallel thread libraries with parallel for out there (actually I know only one for delphi – the OmniThreadLibrary – which is free and open source). In spite of this I decided to write my own stuff (the reason was I could not compile the OTL in BDS 2006 as it was designed for later delphi versions).

The goal was to write a unit STParallel.pas which would be looking like this:

unit STParallel; interface

type TParallelFunction = procedure(i: integer; UserData: pointer); TSTThreadPool = class public procedure ForI(FromIndex, ToIndex: integer; const Func: TParallelFunction; UserData: pointer); end; var Parallel: TSTThreadPool=nil;

So I could rewrite my triangles intersection routine in the form of

  procedure IntersectTriangles(i: integer; Triangle: TTriangles);
  begin
    Grid.GetTriangleExtents(Triangle[i], ixMin, ixMax, iyMin, iyMax);
    for iy := iyMin to iyMax do begin
      for ix := ixMin to ixMax do begin
        if RayTraceTriangle(Triangle[i], x(ix), y(iy), z) then begin
          Grid[ix,iy].AddIntPoint(Triangle[i], z);
        end;
      end;
    end;
  end;

  Parallel.ForI(0, Triangle.Count-1, @IntersectTriangles, @Triangle);

To implement such a class you should decide how to

  • determine the number of available cores and create an array/pool of windows threads,
  • suspend and wake up the threads all at once,
  • subdivide the task into smaller pieces between the threads.
    Let’s start. The first point is pretty simple. The function that returns the number of available cores is the Windows.GetSytemInfo

    function GetProcessorNum: integer;
    var
      si:    _SYSTEM_INFO;
    begin
      GetSystemInfo(si);
      Result:=si.dwNumberOfProcessors;
    end;
    

    For the threads I have created the TSTWorkerThread class

    interface
    
      TExecuteFunction = procedure(tdi: integer) of object;
    
      TSTWorkerThread = class(TThread)
      public
        Constructor Create;
        Destructor Destroy; override;
        Procedure Execute; override;
        property OnExecute: TExecuteFunction read fOnExecute write fOnExecute;
        property IsRunning: boolean read fIsRunning write fIsRunning;
        property Index: integer read fIndex write fIndex;
      end;
    
    implementation
    
    procedure TSTThreadPool.InitWorkers;
    var
      i: integer;
    begin
      tdn := STDef.GetProcessorNum;
      SetLength(fWorkers, tdn);
      for i := 1 to High(fWorkers) do begin  //Notice! I’m creating tdn-1 threads
        fWorkers[i] := TSTWorkerThread.Create;
        fWorkers[i].index := i;
        fWorkers[i].OnExecute := self.ParallelForExecute;
        fWorkers[i].Resume;
      end;
    end;
    
    procedure TSTWorkerThread.Execute;
    begin
      while not self.Terminated do
        fOnExecute(fIndex);
    end;
    
    procedure TSTThreadPool.ParallelForExecute2(tdi: integer);
    begin
      ...
    end;
    

The solution for the second point I’ve borrowed from the OTL. They use the SetEvent, ResetEvent and WaitForSingleObject routines. So I did.

constructor TSTThreadPool.Create;
begin
  fStartEvent := Windows.CreateEvent(nil, true, false, nil);
end;

procedure TSTThreadPool.StartWorkers;
begin
  while not Windows.SetEvent(fStartEvent) do;
end;

procedure TSTThreadPool.StopWorkers;
begin
  while not Windows.ResetEvent(fStartEvent) do;
end;

procedure TSTThreadPool.ForI(FromIndex, ToIndex: integer; const Func:
  TParallelFunction2; const UserData: pointer);
begin
  fParallelFunction := Func;
  fParallelData := UserData;
  StartWorkers;
  try
    ScheduleForTask(FromIndex, ToIndex);
  finally
    StopWorkers;
  end;
end;

procedure TSTThreadPool.ParallelForExecute(tdi: integer);
begin
  if GetForIndex(tdi, i) then
    fParallelFunction(i, fParallelData);
  end else if tdi>0 then begin
    WaitForSingleObject(fStartEvent, windows.INFINITE);
  end;
end.

Thread scheduling

The most interesting part is thread scheduling. Here is the problem. We have to call the loop body function N times while having M threads. How can we distribute those N tasks between M threads to minimize the execution time of our Parallel.ForI function? Obviously the time is equal to the execution time of the most slow thread plus some overhead time. We should also consider the fact that iterations can last different time (e.g. when intersecting triangles some triangles are small, some are large, some may lie far away and will be pruned quickly).

Another question is, what the calling process will be doing meanwhile: will it just wait until the workers finish or will it do some work too?

The answers depend on actual use cases of our class. Thus I implemented four different scheduling strategies. Those are:

  • Atomic,
  • Log. atomic,
  • Divide and conquer,
  • Interleaved.

        Atomic strategy

        This was the first strategy I’ve implemented. The reason is that theoretically the strategy achieves the most even workload distribution between threads. Here is the implementation.

        procedure TSTTrheadPool.ForI(FromIndex, ToIndex: integer; const Func:
          TParallelFunction; UserData: pointer);
        begin
          fFromIndex := FromIndex;
          fToIndex := ToIndex;
          fTaskCounter := FromIndex-ToIndex+1;
          fParallelFunction := Func;
          fParallelData := UserData;
          StartWorkers;
          try
            while (fTaskCounter>0) or (fRunCounter>0) do begin
              ParallelForExecute(0)
            end;
          finally
            StopWorkers;
          end;
        end;
        
        procedure TSTThreadPool.ParallelForExecute(tdi: integer);
        var
          i: integer;
        begin
          if (fTaskCounter>0) then begin
            AtomicInc(fRunCounter);
            i := AtomicAdd(-1, fTaskCounter);
            if i>=0 then begin
              fParallelFunction(fToIndex-i, fParallelData);
            end;
            AtomicDec(fRunCounter);
          end else if tdi>0 then begin
            WaitForSingleObject(fStartEvent, windows.INFINITE);
          end;
        end;
        

        How it works.

      The fTaskCounter variable determines the count of remaining iterations.

      The calling process and the worker threads are all executing the same ParallelForExecute function. So we use only N-1 worker threads for an N-core system.

      The task is completed when fTaskCounter is zero and all the workers have finished its subtasks. To determine the latter moment I’ve introduced the fRunCounter variable. Every worker increments it when starts executing a task and decrements it at the end. So the exit condition is (fTaskCounter=0) and (fRunCounter=0).

      The work stealing process is accomplished in the lines

          i := AtomicAdd(-1, fTaskCounter);
          if i>=0 then begin
            fParallelFunction(fToIndex-i, fParallelData);
          end;
      

      The AtomicAdd function just decrements the task counter by one and returns the result. The term “atomic” means that the function is locking, i.e. when two or more threads will be trying to decrement the same variable at the same time, all the threads will be put in a queue by the system hardware so you will always get the correct result. The ordinal arithmetic functions don’t guarantee a correct result in such situations.

      That’s it. Theoretically the solution should demonstrate outstanding performance. But what in real life? Well, it depends…The problem is in that magic atomic functions. It seams they are not cheap at all. I thought the triangle intersection routine should be heavy enough to make the overheads look negligible. But it’s not.

      Thus the atomic strategy is ideal only for compute intensive tasks when each loop iteration takes a long time. In my case a loop iteration takes a little time while the iteration count is large (1000-1000000). And I’ve developed yet another scheduling strategy.

      Log Atomic strategy

      The idea of the Log Atomic strategy is pretty simple. What if each worker thread will decrement the task counter not by one, but by some number, depending on the amount of the task counter. The larger the task counter is  the more we decrement. This will reduce the overhead introduced by atomic functions many times while still ensuring pretty even workload distribution.

      Playing with the decrement amount choosing function I found one which seems to be the only right (there is some math theory behind it, for sure). Here it is.

      function GetStep(TaskCounter: integer): integer;
      begin
        result := Max(1, TaskCounter div (2*ThreadNumber));
      end;
      

      For 100 iterations and 4 threads the sequence will be the following: (12-11-9-8-7-6-5-5-4-4-3-3-2-2-2-2-1-1-1-1-1-1-1-1-1-1-1-1-1-1-1). So there is only 31 decrements instead of 100. For 1000 iterations and 4 threads the sequence is (125-109-95-83-73-64-56-49-43-37-33-29-25-22-19-17-15-13-11-10-9-7-7-6-5-4-4-3-3-3-2-2-2-1-1-1-1-1-1-1-1-1-1-1-1-1-1-1), i.e. 48 decrements instead of 1000. You can see, the law is logarithmic.

      Divide and conquer and Interleaved strategies

      Sometimes you have a loop with many iterations taking near the same time. In such situations work stealing strategies may be not so efficient as simple even tasks subdivision. By even subdivision I mean an approach when we just take the number of tasks and divide it by the number of workers, than each worker fulfills its tasks range independently of each other.

      I’ve implemented two subdivision strategies: the Simple Divide and the Interleaved.

      In the Divide strategy every thread executes the continuous range of iterations [FromIndex+Worker.Index*N..Max(ToIndex, FromIndex+Worker.Index*(N+1)-1], where N := UpRound((FromIndex-ToIndex+1)/ThreadCount).

      In the interleaved strategy every worker executes iterations from FromIndex+Worker.Index to ToIndex with step := ThreadCount.

      Here is the implementation.

      procedure RunWorkers(const w: TSTWorkerArray); inline; var i: integer; begin for i := 0 to High(w) do w[i].IsRunning := true;

      StartWorkers; end; procedure WaitWorkers(const w: TSTWorkerArray); var i: integer; begin i := 1; repeat if not w[i].IsRunning then inc(i); until i=tdn;

      StopWorkers; end; procedure TSTTrheadPool.ForI(FromIndex, ToIndex: integer; const Func: TParallelFunction; UserData: pointer); begin fFromIndex := FromIndex; fToIndex := ToIndex; fTaskCounter := FromIndex-ToIndex+1; fParallelFunction := Func; fParallelData := UserData; RunWorkers(fWorkers); try ParallelForExecute(0); finally WaitWorkers(fWorkers); end; end; procedure TSTThreadPool.ParallelForExecute(tdi: integer); var i, n, si, ti, s, j: integer; begin case fForType of pfDivide: begin if fWorkers[tdi].IsRunning then begin Step := UpRound(n/tdn); si := fFromIndex+Step*tdi; ti := Min(fToIndex, si+Step-1); for i := si to ti do fParallelFunction(i, fParallelData); fWorkers[tdi].IsRunning := false; end else if tdi>0 then begin WaitForSingleObject(fStartEvent, windows.INFINITE); end; end; pfInterleaved: begin if fWorkers[tdi].IsRunning then begin i := fFromIndex+tdi; while i<=fToIndex do begin fParallelFunction(i, fParallelData); i := i+tdn; end; fWorkers[tdi].IsRunning := false; end else if tdi>0 then begin WaitForSingleObject(fStartEvent, windows.INFINITE); end; end; end; end;

      As you can see the StartWorkers, StopWorkers functions were replaced with RunWorkers and WaitWorkers functions and we do not use atomic operations at all.

      While testing those approaches on small tasks I encountered that the WaitWorkers function takes relatively much time, even if the loop iterations take the same time and distribute equally across the CPU cores. The reason seams to lie in the windows WaitForSingleObject function: the awaiting threads react to our fStartEvent with some delay and asynchronously. Thus they finish also asynchronously. That’s a pity.

      Conclusion

      My measurements showed that none of the parallel.ForI versions like small loops. What I mean is that you will never reach the theoretical 4x boost on a quad-core CPU trying to parallelize them. In my opinion, the reason of that is that many critical tasks such as thread scheduling and synchronization are implemented in software and not hardware. And this is the common problem. There is no thread library free of that drawback as I know. Thus my recommendation is: parallelize only your most outer loops. 

      Here is the full source code.

      Advertisements
      Parallel programming in Delphi part I. Parallel For

      3 thoughts on “Parallel programming in Delphi part I. Parallel For

      1. Good work! Your approach makes sense and is quite similar to the OTL ForEach implementation. And I fully agree that there’s no point in parallelizing small loops.

      Leave a Reply

      Fill in your details below or click an icon to log in:

      WordPress.com Logo

      You are commenting using your WordPress.com account. Log Out / Change )

      Twitter picture

      You are commenting using your Twitter account. Log Out / Change )

      Facebook photo

      You are commenting using your Facebook account. Log Out / Change )

      Google+ photo

      You are commenting using your Google+ account. Log Out / Change )

      Connecting to %s