Browse Source

* optimized version of TMultiReadExclusiveWriteSynchronizer that allows
concurrent readers (mantis #14451)

git-svn-id: trunk@14587 -

Jonas Maebe 15 years ago
parent
commit
106baa8b2f

+ 1 - 0
.gitattributes

@@ -9421,6 +9421,7 @@ tests/test/units/sysutils/tfile2.pp svneol=native#text/plain
 tests/test/units/sysutils/tfilename.pp svneol=native#text/plain
 tests/test/units/sysutils/tfloattostr.pp svneol=native#text/plain
 tests/test/units/sysutils/tlocale.pp svneol=native#text/plain
+tests/test/units/sysutils/trwsync.pp svneol=native#text/plain
 tests/test/units/sysutils/tsscanf.pp svneol=native#text/plain
 tests/test/units/sysutils/tstrtobool.pp svneol=native#text/plain
 tests/test/uobjc24.pp svneol=native#text/plain

+ 16 - 1
rtl/objpas/sysutils/systhrdh.inc

@@ -21,7 +21,7 @@ type
      procedure EndWrite;
    end;
 
-   TMultiReadExclusiveWriteSynchronizer = class(TInterfacedObject,IReadWriteSync)
+   TSimpleRWSync = class(TInterfacedObject,IReadWriteSync)
    private
       crit : TRtlCriticalSection;
    public
@@ -33,3 +33,18 @@ type
       procedure Endread;
    end;
 
+   TMultiReadExclusiveWriteSynchronizer = class(TInterfacedObject,IReadWriteSync)
+   private
+      freaderqueue: peventstate;
+      fwritelock,
+      fwaitingwriterlock: prtlevent;
+      freadercount: cardinal;
+      fwritelocked: longint;
+   public
+      constructor Create; virtual;
+      destructor  Destroy; override;
+      function Beginwrite : boolean;
+      procedure Endwrite;
+      procedure Beginread;
+      procedure Endread;
+   end;

+ 120 - 7
rtl/objpas/sysutils/sysuthrd.inc

@@ -1,6 +1,6 @@
 {
     This file is part of the Free Pascal run time library.
-    Copyright (c) 2005 by the Free Pascal development team
+    Copyright (c) 2005,2009 by the Free Pascal development team
 
     See the file COPYING.FPC, included in this distribution,
     for details about the copyright.
@@ -12,33 +12,146 @@
  **********************************************************************}
 
 
-constructor TMultiReadExclusiveWriteSynchronizer.Create;
+constructor TSimpleRWSync.Create;
 begin
   System.InitCriticalSection(Crit);
 end;
 
-destructor TMultiReadExclusiveWriteSynchronizer.Destroy;
+destructor TSimpleRWSync.Destroy;
 begin
   System.DoneCriticalSection(Crit);
 end;
 
-function  TMultiReadExclusiveWriteSynchronizer.Beginwrite : boolean;
+function  TSimpleRWSync.Beginwrite : boolean;
 begin
   System.EnterCriticalSection(Crit);
   result:=true;
 end;
 
-procedure  TMultiReadExclusiveWriteSynchronizer.Endwrite;
+procedure  TSimpleRWSync.Endwrite;
 begin
   System.LeaveCriticalSection(Crit);
 end;
 
-procedure  TMultiReadExclusiveWriteSynchronizer.Beginread;
+procedure  TSimpleRWSync.Beginread;
 begin
   System.EnterCriticalSection(Crit);
 end;
 
-procedure  TMultiReadExclusiveWriteSynchronizer.Endread;
+procedure  TSimpleRWSync.Endread;
 begin
   System.LeaveCriticalSection(Crit);
 end;
+
+
+
+constructor TMultiReadExclusiveWriteSynchronizer.Create;
+begin
+  fwritelock:=RTLEventCreate;
+  RTLeventSetEvent(fwritelock);
+  fwaitingwriterlock:=RTLEventCreate;
+  RTLEventResetEvent(fwaitingwriterlock);
+  fwritelocked:=0;
+  freadercount:=0;
+  freaderqueue:=BasicEventCreate(nil,true,false,'');
+end;
+
+
+destructor TMultiReadExclusiveWriteSynchronizer.Destroy;
+begin
+  InterlockedExchange(fwritelocked,0);
+  RtlEventDestroy(fwritelock);
+  BasicEventDestroy(freaderqueue);
+end;
+
+
+function  TMultiReadExclusiveWriteSynchronizer.Beginwrite : boolean;
+begin
+  { wait for any other writers that may be in progress }
+  RTLEventWaitFor(fwritelock);
+  { it is possible that we earlier on missed waiting on the
+    fwaitingwriterlock and that it's still set (must be done
+    after aquiring the fwritelock, because otherwise one
+    writer could reset the fwaitingwriterlock of another one
+    that's about to wait on it) }
+  BasicEventResetEvent(fwaitingwriterlock);
+  { new readers have to block from now on; writers get priority to avoid
+    writer starvation (since they have to compete with potentially many
+    concurrent readers) }
+  BasicEventResetEvent(freaderqueue);
+  { for quick checking by candidate-readers }
+  InterlockedExchange(fwritelocked,1);
+
+  { wait until all readers are gone -- freadercount and fwritelocked are only
+    accessed using atomic operations (that's why we use
+    InterLockedExchangeAdd(x,0) below) -> always in-order. The writer always
+    first sets fwritelocked and then checks freadercount, while the readers
+    always first increase freadercount and then check fwritelocked }
+  while (InterLockedExchangeAdd(freadercount,0)<>0) do
+    RTLEventWaitFor(fwaitingwriterlock);
+
+  { we have the writer lock, and all readers are gone }
+  result:=true;
+end;
+
+
+procedure  TMultiReadExclusiveWriteSynchronizer.Endwrite;
+begin
+  { Finish all writes inside the section so that everything executing
+    afterwards will certainly see these results }
+  WriteBarrier;
+
+  { signal potential readers that the coast is clear }
+  InterlockedExchange(fwritelocked,0);
+  { wake up waiting readers (if any); do not check first whether freadercount
+    is <> 0, because the InterlockedDecrement in the while loop of BeginRead
+    can have already occurred, so a single reader may be about to wait on
+    freaderqueue even though freadercount=0. Setting an event multiple times
+    is no problem. }
+  BasicEventSetEvent(freaderqueue);
+  { free the writer lock so another writer can become active }
+  RTLeventSetEvent(fwritelock);
+end;
+
+
+procedure  TMultiReadExclusiveWriteSynchronizer.Beginread;
+Const
+  wrSignaled = 0;
+  wrTimeout  = 1;
+  wrAbandoned= 2;
+  wrError    = 3;
+begin
+  InterlockedIncrement(freadercount);
+  { wait until there is no more writer }
+  while InterLockedExchangeAdd(fwritelocked,0)<>0 do
+    begin
+      { there's a writer busy or ir wanting to start -> wait until it's
+        finished; a writer may already be blocked in the mean time, so
+        wake it up if we're the last to go to sleep }
+      if InterlockedDecrement(freadercount)=0 then
+        RTLEventSetEvent(fwaitingwriterlock);
+      if (BasicEventWaitFor(high(cardinal),freaderqueue) in [wrAbandoned,wrError]) then
+        raise Exception.create('BasicEventWaitFor failed in TMultiReadExclusiveWriteSynchronizer.Beginread');
+      { and try again: first increase freadercount, only then check
+        fwritelocked }
+      InterlockedIncrement(freadercount);
+    end;
+end;
+
+
+procedure  TMultiReadExclusiveWriteSynchronizer.Endread;
+begin
+  { Make sure that all read operations have finished, so that none of those
+    can still be executed after a writer starts working and changes some
+    things }
+  ReadBarrier;
+
+  { If no more readers, wake writer in the ready-queue if any. Since a writer
+    always first atomically sets the fwritelocked and then atomically checks
+    the freadercount, first modifying freadercount and then checking fwritelock
+    ensures that we cannot miss one of the events regardless of execution
+    order. }
+  if (InterlockedDecrement(freadercount)=0) and
+     (InterLockedExchangeAdd(fwritelocked,0)<>0) then
+    RTLEventSetEvent(fwaitingwriterlock);
+end;

+ 146 - 0
tests/test/units/sysutils/trwsync.pp

@@ -0,0 +1,146 @@
+{$ifdef fpc}
+{$mode objfpc}
+{$h+}
+{$endif}
+
+uses
+{$ifdef unix}
+  cthreads,
+{$endif}
+  SysUtils, Classes;
+
+var
+  lock: TMultiReadExclusiveWriteSynchronizer;
+  gcount: longint;
+
+type
+  tcounter = class(tthread)
+   private
+    flock: TMultiReadExclusiveWriteSynchronizer;
+    flocalcount: longint;
+   public
+    constructor create;
+    property localcount: longint read flocalcount;
+  end;
+
+  treadcounter = class(tcounter)
+    procedure execute; override;
+  end;
+  
+  twritecounter = class(tcounter)
+    procedure execute; override;
+  end;
+  
+constructor tcounter.create;
+  begin
+    { create suspended }
+    inherited create(true);
+    freeonterminate:=false;
+    flock:=lock;
+    flocalcount:=0;
+  end;
+  
+procedure treadcounter.execute;
+  var
+    i: longint;
+    l: longint;
+    r: longint;
+  begin
+    for i:=1 to 100000 do
+      begin
+        lock.beginread;
+        inc(flocalcount);
+        l:=gcount;
+        if (random(10000)=0) then
+          sleep(20);
+        { this must cause data races/loss at some point }
+        gcount:=l+1;
+        lock.endread;
+        r:=random(30000);
+        if (r=0) then
+          sleep(30);
+      end;
+  end;
+
+
+procedure twritecounter.execute;
+  var
+    i: longint;
+    l: longint;
+    r: longint;
+  begin
+    for i:=1 to 500 do
+      begin
+        lock.beginwrite;
+        inc(flocalcount);
+        l:=gcount;
+        if (random(100)=0) then
+          sleep(20);
+        { we must be exclusive }
+        if gcount<>l then
+          halt(1);
+        gcount:=l+1;
+        lock.endwrite;
+        r:=random(30);
+        if (r>28) then
+          sleep(r);
+      end;
+  end;
+  
+var
+  r1,r2,r3,r4,r5,r6: treadcounter;
+  w1,w2,w3,w4: twritecounter;
+begin
+  randomize;
+  lock:=TMultiReadExclusiveWriteSynchronizer.create;
+  { first try some writers }
+  w1:=twritecounter.create;
+  w2:=twritecounter.create;
+  w3:=twritecounter.create;
+  w4:=twritecounter.create;
+  w1.resume;
+  w2.resume;
+  w3.resume;
+  w4.resume;
+  w1.waitfor;
+  w2.waitfor;
+  w3.waitfor;
+  w4.waitfor;
+  
+  { must not have caused any data races }
+  if (gcount<>w1.localcount+w2.localcount+w3.localcount+w4.localcount) then
+    halt(1);
+
+  { now try some mixed readers/writers }
+  gcount:=0;
+  r1:=treadcounter.create;
+  r2:=treadcounter.create;
+  r3:=treadcounter.create;
+  r4:=treadcounter.create;
+  r5:=treadcounter.create;
+  r6:=treadcounter.create;
+  w1:=twritecounter.create;
+  w2:=twritecounter.create;
+  
+  r1.resume;
+  r2.resume;
+  r3.resume;
+  r4.resume;
+  r5.resume;
+  r6.resume;
+  w1.resume;
+  w2.resume;
+  
+  r1.waitfor;
+  r2.waitfor;
+  r3.waitfor;
+  r4.waitfor;
+  r5.waitfor;
+  r6.waitfor;
+  w1.waitfor;
+  w2.waitfor;
+  
+  { updating via the readcount must have caused data races }
+  if (gcount>=r1.localcount+r2.localcount+r3.localcount+r4.localcount+r5.localcount+r6.localcount+w1.localcount+w2.localcount) then
+    halt(2);
+end.