Parallel programming with Delphi part II. Resolving race conditions

In this post I will reveal some simple yet efficient techniques to address and resolve race conditions in parallel algorithms.

Do you remember my shiny parallel version of the triangle intersection code?

  procedure IntersectTriangles(i: integer; Triangle: TTriangles);
  begin
    Grid.GetTriangleExtents(Triangle[i], ixMin, ixMax, iyMin, iyMax);
    for iy := iyMin to iyMax do begin
      for ix := ixMin to ixMax do begin
        if RayTraceTriangle(Triangle[i], x(ix), y(iy), z) then begin
          Grid[ix,iy].AddIntPoint(Triangle[i], z);
        end;
      end;
    end;
  end;

  Parallel.ForI(0, Triangle.Count-1, @IntersectTriangles, @Triangle);

There is one problem with it. Imagine such a situation: two threads are trying to add intersection points with the same [ix, iy] to the Grid at the same time. What result will we get? Most likely only one intersection point will be added and the whole algorithm will fail. Such situations are usually called race conditions.

Basically there is only one thing, that can lead to race conditions in your parallel code. This is writing to a memory shared across threads. Typical examples of shared memory include input and output variables, counters, storages etc. While working on my own problem I was faced only with some of them. Here I will just show you my actual solutions to the actual problems, but at first enjoy my boring lecture.

Atomic instructions

I was thinking about parallel programming a long time ago. I was just looking at the modules written by me and wondering how to rewrite an existing heavily optimized sequential code into an equally efficient parallel version. And almost every time I was stuck, because I couldn’t figure out how to avoid writing to a shared memory. Than I started thinking that avoiding writing to a shared memory is impossible and this is a common problem and there should be existing solutions for it. And I started digging through the internet, MSDN, programming manuals etc. What I’ve found at first were mutexes, semaphores, critical sections, and many other scary words, the meaning of which I still do not understand. The proposed solutions seamed to me cumbersome and inefficient at the same time. For example, the critical sections. What a stupid idea is to serialize a peace of a potentially thread unsafe code every time you call it when the probability of a race condition is one to thousand? I am pretty sure that critical sections would make my parallel code many times slower than the sequential version.

So I continued my research and I’ve found it – the LOCK. Lock is a prefix instruction in the x86 assembler language. It turns ordinal arithmetic instructions such as inc, dec, add, sub and others into their atomic versions.

The right time to explain the term Atomic.

Every CPU instruction decomposes to more elementary operations. For example, the inc dword ptr [eax] instruction involves at least three steps.

  1. Read a DWORD from the memory location stored at eax into an accumulator.
  2. Increment the accumulator.
  3. Write the result back to the memory.

That’s why when several threads are operating on the same variable at the same time there is no guarantee that the resulting variable value will be correct.

    Opposed to the ordinal instructions atomics are executed as a whole without subdivision to more elementary operations (hence their name). This fact makes them thread safe. On the x86 architecture atomics are implemented either by locking the system bus or via the cache coherency mechanism when appropriate. Hence the Lock prefix.
    In the Windows API the atomic instructions are referred as interlocked. You have probably seen or used the InterlockedIncrement, InterlockedDecrement functions in the standard implementations of the IUnknown.AddRef, IUnknown.Release methods. Windows offers wrappers for all atomic instructions, but I wrote my own versions for some of them. Here they are.

    procedure AtomicInc(var i: integer);
    procedure AtomicDec(var i: integer);
    function AtomicAdd(Addend: integer; var Dest: integer): integer;
    function AtomicSwap(Data: pointer; var Dest): pointer; overload;
    function CAS32(oldValue: integer; newValue: integer; var destination): boolean;

Atomic counters

procedure AtomicInc(var i: integer); //i := i+1;
asm  
  lock inc dword ptr[eax];
end;
procedure AtomicDec(var i: integer); //i := i-1;
asm  
  lock dec dword ptr[eax];
end;

Sometimes you just need count some number while not caring of the actual value of this number. Than just use the AtomicInc, AtomicDec functions.

You have already seen how I used atomic counters in my implementation of the Paralel For cycle to increment and decrement fRunCounter.

procedure TSTTrheadPool.ForI(FromIndex, ToIndex: integer; const Func:
  TParallelFunction; UserData: pointer);
begin
  StartWorkers;
  try
    while (fTaskCounter>0) or (fRunCounter>0) do begin
      ParallelForExecute(0)
    end;
  finally
    StopWorkers;
  end;
end;

procedure TSTThreadPool.ParallelForExecute(tdi: integer);
var
  i: integer;
begin
  if (fTaskCounter>0) then begin
    AtomicInc(fRunCounter);
    i := AtomicAdd(-1, fTaskCounter);
    if i>=0 then begin
      fParallelFunction(fToIndex-i, fParallelData);
    end;
    AtomicDec(fRunCounter);
  end;
end;

Atomic add

function AtomicAdd(Addend: integer; var Dest: integer): integer;
asm
  //Dest := Dest+Addend;
  //result := Dest;
  mov ecx, eax;
  lock xadd [edx], eax;
  add eax, ecx;
end;

The AtomicAdd function is much more powerful than AtomicInc or AtomicDec as it can return the result of addition. It is possible because internally it is implemented not with the add instruction, but using the xadd instruction which performs two operations: adds two numbers and exchanges them.

AtomicAdd is very useful if you want to implement a lock free thread safe array. Here is a simple example, assuming you know the maximum number of items in advance and you only add items.

TThreadSafePointerArray = class
protected
  fTop, fMaxCount: integer;
  fpp: array of pointer;
public
  function Add(p: pointer): integer;
end;

function TThreadSafePointerArray.Add(p: pointer): integer;
begin
  result := AtomicAdd(1, fTop);
  ASSERT(result<fMaxCount, 'Array bounds exceeded!');
  fpp[result] := p;
end;

Atomic swap

function AtomicSwap(Data: pointer; var Dest): pointer; overload;
asm
  //result := Dest;
  //Dest := Data;
  lock xchg [edx], eax;
end;

AtomicSwap replaces a DWORD variable Dest (int32, pointer32, etc.) with your Data (in this case, a pointer) and returns the previous value of the variable in the result.

What is it useful for? For example, for thread safe destruction of objects and releasing interface references.

procedure FreeAndNilObject(var Obj);
var
  Temp: TObject;
begin
  if Obj<>nil then begin
    Temp := TObject(AtomicSwap(nil, Obj));
    if Temp<>nil then
      Temp.Destroy;
  end;
end;

procedure FreeAndNilInterface(var Intf);
var
  Temp: IUnknown;
begin
  if Intf<>nil then begin
    Temp := IUnknown(AtomicSwap(nil, Intf));
    Temp := nil;
  end;
end;

Atomic Compare and Swap/CAS

function CAS32(oldValue: integer; newValue: integer; var destination): boolean;
asm
  //if destination=oldValue then begin
  //  destination := newValue;
  //  result := true;
  //end else
  // result := false;
  lock cmpxchg dword ptr [destination], newValue
  setz  al
end;

The most powerful atomic function is the so called Compare And Swap routine or simply CAS. It takes three parameters: OldValue, NewValue and Destination. If the value of the destination variable equals to the OldValue than CAS replaces it with the NewValue and returns true, otherwise it returns false.

CAS is widely used in thread safe implementations of dynamic data structures such as queues, stacks, etc. Here is an example of how CAS is used to safely insert a node in a queue (Note: it works only if you are not trying to add and remove nodes at the same time).

TTThreadSafeQueue = class
type
  PQueueNode = ^TQueueNode;
  TQueueNode=record
    next; PQueueNode;
    Data: TQueueData;
  end;
protected
  fFirstNode: PQueueNode;
  fStorage: TThreadSafeStorage;
public
  procedure PushFront(const Data: TQueueData);
end;

procedure TThreadSafeQueue.PushFront(const Data: TQueueData);
var
  NewNode: PQueueNode;
begin
  NewNode := fStorage.GetNew;
  NewNode.Data := Data;
  repeat
    NewNode.next := fFirstNode;
  until CAS32(fFirstNode, NewNode, fFirstNode);
end;

Lock/Unlock

The above examples are widespread but still special cases. In more complicated situations you need serialize a thread unsafe code. The super simple, elegant and efficient way to do so is to use spinlocks, which again are easily implemented with atomic CAS and dec instructions.

Here is my humble implementation of spinlocks.

procedure Lock(var Lk: integer);
asm
  //while not CAS32(0, 1, Lk) do;
  mov edx, eax
  mov ecx, 1
@Loop:
  mov eax, 0
  lock cmpxchg dword ptr [edx], ecx
  jnz @Loop
end;

procedure UnLock(var Lk: integer);
asm
  lock dec dword ptr[eax];
end;

The function Lock takes one “ByRef” argument Lk which intends to be a lock variable. A lock variable is a DWORD which in an unlocked state equals to zero while in a locked state resembles 1. The Lock function spins in a tight loop until it manages to replace the Lk value from 0 to 1 using the CAS instruction. The UnLock function just decrements the Lk variable by one, thus returning it to the unlocked 0 state.

Here is a good example of the power of the Lock/Unlock pair.

type
  TIntPointGrid = class
  type
    PIntPointArray = ^TIntPointArray;
    TIntPointArray = record
      Lk: integer;
      pp: array of TIntPoint;
    end;
  protected
    fPP: array of array of TIntPointArray;
  public
    function AddIntPoint(ix, iy: integer; const p: TIntPoint);
  end;

function TIntPointGrid.AddIntPoint(ix, iy: integer; const p: TIntPoint);
var
  ip: PIntPointArray;
begin
  ip := @fPP[ix, iy];
  Lock(ip.Lk);
  try
    n := length(ip.pp);
    SetLength(ip.pp, n+1);
    ip.pp[n] := p;
  finally
    UnLock(ip.Lk);
  end;
end;

That is a possible solution (not the actual) for my add intersection problem. I just added the Lk field to my intersection points array data structure and used it as a lock variable. You can notice that the worker threads are locked only when they are trying to add intersection points to the same cells, what is a rare case. If I use critical sections I would be forced either to create NxM ones for all of the grid cells, which is a waste, or to include the whole AddIntPoint function into one critical section, which would lead to a pour performance.

Completely avoiding race conditions

Using atomics and spinlocks to resolve memory conflicts is good but even better is to completely avoid race conditions. The good news is that I’ve invented an easy way to do it in most cases.

As you know, I’ve implemented my own version of the Parallel For cycle. So I am free to modify it any way I want. What I’ve done is

  • I’ve introduced the global variable tdn: integer which resembles the maximum number of the worker threads in my parallel.ForI.
  • Each worker thread has an index greater or equal to zero and less than tdn.
  • The TParalleForFunction callback now takes one more parameter – the ThreadIndex, which is the index of the caller thread.
    TParallelFunction2 = procedure(i: integer; TdI: integer; UserData: pointer);
    
  • Knowing the tdn and the ThreadIndex you can create tdn number of the supporting data structures instead of one so that each thread will be dealing with its own data set. Than at a final stage you will just collect the generated data from the tdn structures into one. Nice and easy.Here is an example, which computes a bounding box of some objects in a parallel loop.

    function CalculateBox(const Objects: array of IObjectWithBox): TBox;
    type
      PBoxData=TBoxData;
      TBoxData=record
        Objects: array of IObjectWithBox;
        bb: array of TBox;
      end;
      procedure CalcBox(i: integer; tdi: integer; BoxData: PBoxData);
      begin
        BoxData.bb[tdi].AddBox(BoxData.Objects[i].GetBox);
      end;
    
    var
      b: TBox;
      BoxData: TBoxData;
    begin
      result.IsEmpty := true;
      BoxData.Objects := Objects;
      SetLength(BoxData.bb, Parallel.tdn);
      for i := 0 to tdn-1 do
        BoxData.bb[i].IsEmpty := true;
      try
        Parallel.ForI(0, High(Objects), @CalcBox, @BoxData);
      finally
        for i := 0 to tdn-1 do
          result.AddBox(BoxData.bb[i]);
      end;
    end;
    

As you can see, I just create tdn temporary boxes, so that each parallel worker fills its own box, than at the final stage I just add the temporal boxes into the result. The similar way you can do many things.

Conclusion

Parallel programming is a little bit tricky, mostly due to shared memory conflicts and things like that. Even such common operations as assigning a value to a variable or destroying an object may require special treatment when used in parallel algorithms. On the other hand, parallel programming is all about performance.

That’s why

  1. Use tdn and tdi in Parallel.ForI and create separate data structures for each thread to completely avoid memory conflicts.
  2. Use atomics and spinlocks instead of critical sections and other crap. This way you will lock the data, not the code and only when it is really necessary. Here is the source code, which includes STParallel.pas with Paralel.ForI and atomics, and PageStorage.pas with a thread safe memory manager.
Advertisements
Parallel programming with Delphi part II. Resolving race conditions

12 thoughts on “Parallel programming with Delphi part II. Resolving race conditions

  1. Vitali, hi! In regards to your comments on my blog – OTL already goes through a very complicated process to minimize probability of race conditions. In fact, if you iterate over an integer range, there will be no race conditions at all, even if threads are stealing workload from each other.

    BTW, you should register your log with delphifeeds.org.

    1. Gabr, hi! Thanks for the comment. I’m not sure I understand you very well. Despite the parallel for scheduller itself may not cause race conditions, they still can appear as long as you shoud store somewhere the data you generate in your parallel loop. In this case you can use thread safe data structures, but this may reduce performance. I think separating data structures between workers by knowing their number and the index of each worker is a neat solution in many cases.

      1. I was talking about the input but if you use .PreserverOrder, you’ll also get separate output buffer for each threads. ForEach will merge those buffers automatically into one output.

  2. The design for your site is a bit off in Epiphany. Nevertheless I like your blog. I might need to use a normal web browser just to enjoy it.

  3. Marin says:

    Ok, but I use the class “TSTTrheadPool” and show me the message”access violation at address 005A8599…Read of 00000000.”

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s