Browse Source

amicommon: more work on AThreads
- reworked exit syncronization/waiting. now uses signalsemaphores instead of messaging. this avoids the requirement for an extra message port per thread, which caused signal shortages earlier.
- additional cleanups
- now try to free up "leaked" resources on exit

git-svn-id: trunk@30933 -

Károly Balogh 10 years ago
parent
commit
24677c9309
1 changed files with 48 additions and 52 deletions
  1. 48 52
      rtl/amicommon/athreads.pp

+ 48 - 52
rtl/amicommon/athreads.pp

@@ -25,13 +25,13 @@ procedure SetAThreadBaseName(s: String);
 implementation
 implementation
 
 
 { enable this to compile athreads easily outside the RTL }
 { enable this to compile athreads easily outside the RTL }
-{.$DEFINE ATHREADS_STANDALONE} 
+{.$DEFINE ATHREADS_STANDALONE}
 
 
 {$IFDEF ATHREADS_STANDALONE}
 {$IFDEF ATHREADS_STANDALONE}
 uses
 uses
   exec, amigados, utility;
   exec, amigados, utility;
 {$ELSE}
 {$ELSE}
-{ * Include sytem specific includes * }
+{ * Include required system specific includes * }
 {$include execd.inc}
 {$include execd.inc}
 {$include execf.inc}
 {$include execf.inc}
 {$include timerd.inc}
 {$include timerd.inc}
@@ -67,8 +67,7 @@ type
     mainthread: boolean;     { true if this is our main thread }
     mainthread: boolean;     { true if this is our main thread }
     exited: boolean;         { true if the thread has exited, and can be cleaned up }
     exited: boolean;         { true if the thread has exited, and can be cleaned up }
     suspended: boolean;      { true if the thread was started suspended, and not resumed yet }
     suspended: boolean;      { true if the thread was started suspended, and not resumed yet }
-    replyPort: PMsgPort;     { Amiga exec.library IPC message reply port }
-    replyMsg: PMessage;      { exit message for the thread }
+    mutex: TSignalSemaphore; { thread's mutex. locked during the thread's life. }
     name: String;            { Thread's name }
     name: String;            { Thread's name }
   end;
   end;
 
 
@@ -85,13 +84,12 @@ var
   AThreadNum: LongInt;
   AThreadNum: LongInt;
   AThreadListSemaphore: TSignalSemaphore;
   AThreadListSemaphore: TSignalSemaphore;
 
 
-{$IFDEF DEBUG_MT}
+
 { Simple IntToStr() replacement which works with ShortStrings }
 { Simple IntToStr() replacement which works with ShortStrings }
 function IToStr(const i: LongInt): String;
 function IToStr(const i: LongInt): String;
 begin
 begin
   Str(I,result);
   Str(I,result);
 end;
 end;
-{$ENDIF}
 
 
 { Function to add a thread to the running threads list }
 { Function to add a thread to the running threads list }
 procedure AddToThreadList(var l: PThreadInfo; ti: PThreadInfo);
 procedure AddToThreadList(var l: PThreadInfo; ti: PThreadInfo);
@@ -153,21 +151,9 @@ begin
         begin
         begin
 {$IFDEF DEBUG_MT}
 {$IFDEF DEBUG_MT}
           SysDebugLn('FPC AThreads: Releasing resources for thread ID:'+hexStr(Pointer(threadID)));
           SysDebugLn('FPC AThreads: Releasing resources for thread ID:'+hexStr(Pointer(threadID)));
+          if (p^.threadVars <> nil) or (p^.threadVarsSize <> 0) then
+            SysDebugLn('FPC AThreads: WARNING, threadvars area wasn''t properly freed!'+hexStr(Pointer(threadID)));
 {$ENDIF}
 {$ENDIF}
-          while GetMsg(p^.replyPort) <> nil do begin end;
-          DeleteMsgPort(p^.replyPort);
-          dispose(p^.replyMsg);
-{$ifdef DEBUG_MT}
-          { When debug mode enabled, release the threadvars here, later, because the "normal" location }
-          { is too early, because debug messages on the thread might still use the heap manager (KB) }
-{$ifdef AMIGA}
-          ObtainSemaphore(ASYS_heapSemaphore);
-{$endif}
-          FreePooled(ASYS_heapPool,p^.threadVars,p^.threadVarsSize);
-{$ifdef AMIGA}
-          ReleaseSemaphore(ASYS_heapSemaphore);
-{$endif}
-{$endif}
           dispose(p);
           dispose(p);
           if pprev <> nil then
           if pprev <> nil then
             pprev^.nextThread:=tmpNext;
             pprev^.nextThread:=tmpNext;
@@ -228,7 +214,11 @@ end;
 
 
 { Helper function for IPC }
 { Helper function for IPC }
 procedure SendMessageToThread(var threadMsg: TThreadMsg; p: PThreadInfo; const op: TThreadOperation; waitReply: boolean);
 procedure SendMessageToThread(var threadMsg: TThreadMsg; p: PThreadInfo; const op: TThreadOperation; waitReply: boolean);
+var
+  replyPort: PMsgPort;
 begin
 begin
+  replyPort:=@PProcess(FindTask(nil))^.pr_MsgPort;
+
   FillChar(threadMsg,sizeof(threadMsg),0);
   FillChar(threadMsg,sizeof(threadMsg),0);
   with threadMsg do
   with threadMsg do
     begin
     begin
@@ -237,7 +227,7 @@ begin
           mn_Node.ln_Type:=NT_MESSAGE;
           mn_Node.ln_Type:=NT_MESSAGE;
           mn_Length:=SizeOf(TThreadMsg);
           mn_Length:=SizeOf(TThreadMsg);
           if waitReply then
           if waitReply then
-            mn_ReplyPort:=p^.replyPort
+            mn_ReplyPort:=replyPort
           else
           else
             mn_ReplyPort:=nil;
             mn_ReplyPort:=nil;
         end;
         end;
@@ -248,8 +238,8 @@ begin
 
 
   if waitReply then
   if waitReply then
     begin
     begin
-      WaitPort(p^.replyPort);
-      GetMsg(p^.replyPort);
+      WaitPort(replyPort);
+      GetMsg(replyPort);
     end;
     end;
 end;
 end;
 
 
@@ -342,10 +332,6 @@ begin
 {$ifdef DEBUG_MT}
 {$ifdef DEBUG_MT}
       SysDebugLn('FPC AThreads: Releasing threadvars, ID:'+hexStr(Pointer(p^.threadID)));
       SysDebugLn('FPC AThreads: Releasing threadvars, ID:'+hexStr(Pointer(p^.threadID)));
 {$endif}
 {$endif}
-{$ifndef DEBUG_MT}
-      { When debug mode is enabled, do not release threadvars here, because }
-      { Debug messages later might still need the heapmanager, which depends }
-      { on the threadvar (KB) }
 {$ifdef AMIGA}
 {$ifdef AMIGA}
       ObtainSemaphore(ASYS_heapSemaphore);
       ObtainSemaphore(ASYS_heapSemaphore);
 {$endif}
 {$endif}
@@ -355,7 +341,6 @@ begin
 {$ifdef AMIGA}
 {$ifdef AMIGA}
       ReleaseSemaphore(ASYS_heapSemaphore);
       ReleaseSemaphore(ASYS_heapSemaphore);
 {$endif}
 {$endif}
-{$endif DEBUG_MT}
     end
     end
   else
   else
     begin
     begin
@@ -381,8 +366,9 @@ begin
       new(threadInfo);
       new(threadInfo);
       FillChar(threadInfo^,sizeof(TThreadInfo),0);
       FillChar(threadInfo^,sizeof(TThreadInfo),0);
       p^.pr_Task.tc_UserData:=threadInfo;
       p^.pr_Task.tc_UserData:=threadInfo;
-      threadInfo^.replyPort:=@p^.pr_MsgPort;
       threadInfo^.mainThread:=true;
       threadInfo^.mainThread:=true;
+      InitSemaphore(@threadInfo^.mutex);
+      ObtainSemaphore(@threadInfo^.mutex);
       threadInfo^.threadID:=TThreadID(p);
       threadInfo^.threadID:=TThreadID(p);
       InitThreadVars(@ARelocateThreadvar);
       InitThreadVars(@ARelocateThreadvar);
       AddToThreadList(AThreadList,threadInfo);
       AddToThreadList(AThreadList,threadInfo);
@@ -412,6 +398,9 @@ begin
   threadInfo:=threadMsg^.tm_ThreadInfo;
   threadInfo:=threadMsg^.tm_ThreadInfo;
   thisThread^.pr_Task.tc_userData:=threadInfo;
   thisThread^.pr_Task.tc_userData:=threadInfo;
 
 
+  { Obtain the threads' mutex, used for exit sync }
+  ObtainSemaphore(@threadInfo^.mutex);
+
   { Allocate local thread vars, this must be the first thing,
   { Allocate local thread vars, this must be the first thing,
     because the exception management and io depends on threadvars }
     because the exception management and io depends on threadvars }
   AAllocateThreadVars;
   AAllocateThreadVars;
@@ -463,16 +452,8 @@ begin
   Forbid();
   Forbid();
   threadInfo^.exited:=true;
   threadInfo^.exited:=true;
 
 
-  { Send our exit message... }
-  with threadInfo^.replyMsg^ do
-    begin
-      mn_Node.ln_Type:=NT_MESSAGE;
-      mn_Length:=SizeOf(TMessage);
-      mn_ReplyPort:=nil;
-    end;
-  Forbid();
-  threadInfo^.exited:=true;
-  PutMsg(threadInfo^.replyPort,threadInfo^.replyMsg);
+  { Finally, Release our exit mutex. }
+  ReleaseSemaphore(@threadInfo^.mutex);
 end;
 end;
 
 
 
 
@@ -488,7 +469,6 @@ var
   threadInfo: PThreadInfo;
   threadInfo: PThreadInfo;
   threadMsg: TThreadMsg;
   threadMsg: TThreadMsg;
   threadName: String;
   threadName: String;
-  replyPort: PMsgPort;
   subThread: PProcess;
   subThread: PProcess;
 begin
 begin
   ABeginThread:=TThreadID(0);
   ABeginThread:=TThreadID(0);
@@ -529,12 +509,9 @@ begin
 {$endif}
 {$endif}
       exit;
       exit;
     end;
     end;
-  replyPort:=CreateMsgPort;
-
   ThreadID:=TThreadID(subThread);
   ThreadID:=TThreadID(subThread);
   threadInfo^.threadID:=ThreadID;
   threadInfo^.threadID:=ThreadID;
-  threadInfo^.replyPort:=replyPort;
-  new(threadInfo^.replyMsg);
+  InitSemaphore(@threadInfo^.mutex);
 
 
   // the thread should be started here, and waiting 
   // the thread should be started here, and waiting 
   // for our start message, so send it
   // for our start message, so send it
@@ -655,9 +632,11 @@ begin
 {$endif}
 {$endif}
             end;
             end;
 
 
-          { WaitPort will break the Forbid() state... }
-          WaitPort(p^.replyPort);
-          GetMsg(p^.replyPort);
+          { Wait for the thread to exit... }
+          Permit();
+          ObtainSemaphore(@p^.mutex);
+          ReleaseSemaphore(@p^.mutex);
+          Forbid();
         end
         end
       else
       else
 {$ifdef DEBUG_MT}
 {$ifdef DEBUG_MT}
@@ -914,6 +893,9 @@ Procedure InitSystemThreads; external name '_FPC_InitSystemThreads';
 
 
 { This should only be called from the finalization }
 { This should only be called from the finalization }
 procedure WaitForAllThreads;
 procedure WaitForAllThreads;
+var
+  p: PThreadInfo;
+  pn: PThreadInfo;
 begin
 begin
   { If we are the main thread exiting, we have to wait for our subprocesses to
   { If we are the main thread exiting, we have to wait for our subprocesses to
     exit. Because AmigaOS won't clean up for us. Also, after exiting the main
     exit. Because AmigaOS won't clean up for us. Also, after exiting the main
@@ -921,7 +903,7 @@ begin
     running in the background... So even waiting here forever is better than
     running in the background... So even waiting here forever is better than
     exiting with active threads, which will most likely just kill the OS
     exiting with active threads, which will most likely just kill the OS
     immediately. (KB) }
     immediately. (KB) }
-  ObtainSemaphoreShared(@AThreadListSemaphore);
+  ObtainSemaphore(@AThreadListSemaphore);
 
 
 {$IFDEF DEBUG_MT}
 {$IFDEF DEBUG_MT}
   if AThreadListLen > 1 then
   if AThreadListLen > 1 then
@@ -937,15 +919,29 @@ begin
       ReleaseSemaphore(@AThreadListSemaphore);
       ReleaseSemaphore(@AThreadListSemaphore);
       DOSDelay(1);
       DOSDelay(1);
       { Reobtain the semaphore... }
       { Reobtain the semaphore... }
-      ObtainSemaphoreShared(@AThreadListSemaphore);
+      ObtainSemaphore(@AThreadListSemaphore);
     end;
     end;
 
 
-{$IFDEF DEBUG_MT}
   if AThreadListLen > 1 then
   if AThreadListLen > 1 then
-    SysDebugLn('FPC AThreads: All threads exited but some lacking cleanup - resources will be leaked!')
+    begin
+{$IFDEF DEBUG_MT}
+      SysDebugLn('FPC AThreads: All threads exited but some lacking cleanup - trying to free up resources...');
+{$ENDIF}
+      p:=AThreadList;
+      while p <> nil do
+        begin
+          pn:=p^.nextThread;
+          if not p^.mainThread then
+            RemoveFromThreadList(AThreadList,p^.threadID);
+          p:=pn;
+        end;
+    end
   else
   else
-    SysDebugLn('FPC AThreads: All threads exited normally.');
+    begin
+{$IFDEF DEBUG_MT}
+      SysDebugLn('FPC AThreads: All threads exited normally.');
 {$ENDIF}
 {$ENDIF}
+    end;
   ReleaseSemaphore(@AThreadListSemaphore);
   ReleaseSemaphore(@AThreadListSemaphore);
 end;
 end;