Ver código fonte

amicommon: AThreads improvements
- now supports creation of suspended threads
- reworked CriticalSections handling to avoid outside-heap dynamic allocations of mutex space
- allocate threadvars area into the heapPool, this avoids leaking the mainthread threadvar area on exit
- now supports setting thread stacksize in a Delphi-compatible way (by default inherits the stacksize of its parent)
- better subthread naming

git-svn-id: trunk@30912 -

Károly Balogh 10 anos atrás
pai
commit
73f451f3d3
2 arquivos alterados com 221 adições e 91 exclusões
  1. 212 87
      rtl/amicommon/athreads.pp
  2. 9 4
      rtl/amicommon/sysosh.inc

+ 212 - 87
rtl/amicommon/athreads.pp

@@ -19,6 +19,14 @@ unit athreads;
 
 interface
 
+{$WARNING These should be in the system unit }
+{ some BeginThread flags we support }
+const
+  CREATE_SUSPENDED = 1;
+  STACK_SIZE_PARAM_IS_A_RESERVATION = 2;
+
+procedure SetAThreadBaseName(s: String);
+
 implementation
 
 uses
@@ -27,36 +35,47 @@ uses
 const
   threadvarblocksize : dword = 0;
 
+var
+  SubThreadBaseName: String = 'FPC Subthread';
+
 {.$define DEBUG_MT}
+type
+  TThreadOperation = ( toNone, toStart, toResume, toExit );
 
 type
+  PThreadMsg = ^TThreadMsg;
+
   PThreadInfo = ^TThreadInfo;
   TThreadInfo = record
-    threadVars: Pointer; { have threadvars ptr as first field,
-                           so no offset is needed to access it (faster) }
-    nextThread: PThreadInfo;
-    threadID: TThreadID;
-    stackLen: PtrUInt;
-    exitCode: Pointer;
-    f: TThreadFunc;
-    p: Pointer;
-    name: String;
-    mainthread: boolean;
-    exited: boolean;
-    replyPort: PMsgPort;
-    replyMsg: PMessage;
+    threadVars: Pointer;     { have threadvars ptr as first field, so no offset is needed to access it (faster) }
+    threadVarsSize: DWord;   { size of the allocated threadvars block }
+    nextThread: PThreadInfo; { threadinfos are a linked list, using this field }
+    threadID: TThreadID;     { thread ID, as returned by CreateNewProc() }
+    stackLen: PtrUInt;       { stack size the thread was construced with }
+    exitCode: Pointer;       { exitcode after the process has exited     }
+    f: TThreadFunc;          { ThreadFunc function pointer }
+    p: Pointer;              { ThreadFunc argument }
+    flags: dword;            { Flags this thread were created with }
+    num: longint;            { This was the "num"th thread to created }
+    mainthread: boolean;     { true if this is our main thread }
+    exited: boolean;         { true if the thread has exited, and can be cleaned up }
+    suspended: boolean;      { true if the thread was started suspended, and not resumed yet }
+    replyPort: PMsgPort;     { Amiga exec.library IPC message reply port }
+    replyMsg: PMessage;      { exit message for the thread }
+    name: String;            { Thread's name }
   end;
 
-  PThreadMsg = ^TThreadMsg;
   TThreadMsg = record
     tm_MsgNode   : TMessage;
     tm_ThreadInfo: PThreadInfo;
+    tm_Operation : TThreadOperation;
   end;
 
 var
   AThreadManager: TThreadManager;
   AThreadList: PThreadInfo;
   AThreadListLen: LongInt;
+  AThreadNum: LongInt;
   AThreadListSemaphore: TSignalSemaphore;
 
 { Function to add a thread to the running threads list }
@@ -78,6 +97,8 @@ begin
       while (p^.nextThread<>nil) do p:=p^.nextThread;
       p^.nextThread:=ti;
     end;
+  inc(AThreadNum);
+  ti^.num:=AThreadNum;
   inc(AThreadListLen);
 {$IFDEF DEBUG_MT}
   SysDebugLn('FPC AThreads: thread ID:'+hexstr(Pointer(ti^.threadID))+' added, now '+inttostr(AThreadListLen)+' thread(s) in list.');
@@ -154,7 +175,7 @@ begin
 
   ObtainSemaphore(@AThreadListSemaphore);
   p:=l;
-  while (p <> nil) and (p^.threadID <> threadID) do 
+  while (p <> nil) and (p^.threadID <> threadID) do
     p:=p^.nextThread;
   GetThreadInfo:=p;
   ReleaseSemaphore(@AThreadListSemaphore);
@@ -173,7 +194,48 @@ begin
       inc(CountRunningThreads,ord(not p^.exited));
       p:=p^.nextThread;
     end;
-  ReleaseSemaphore(@AThreadListSemaphore); 
+  ReleaseSemaphore(@AThreadListSemaphore);
+end;
+
+{ Helper function for IPC }
+procedure SendMessageToThread(var threadMsg: TThreadMsg; p: PThreadInfo; const op: TThreadOperation; waitReply: boolean);
+begin
+  FillChar(threadMsg,sizeof(threadMsg),0);
+  with threadMsg do
+    begin
+      with tm_MsgNode do
+        begin
+          mn_Node.ln_Type:=NT_MESSAGE;
+          mn_Length:=SizeOf(TThreadMsg);
+          if waitReply then
+            mn_ReplyPort:=p^.replyPort
+          else
+            mn_ReplyPort:=nil;
+        end;
+      tm_ThreadInfo:=p;
+      tm_Operation:=op;
+    end;
+  PutMsg(@PProcess(p^.threadID)^.pr_MsgPort,@threadMsg);
+
+  if waitReply then
+    begin
+      WaitPort(p^.replyPort);
+      GetMsg(p^.replyPort);
+    end;
+end;
+
+procedure SetAThreadBaseName(s: String);
+begin
+  ObtainSemaphore(@AThreadListSemaphore);
+  SubThreadBaseName:=s;
+  ReleaseSemaphore(@AThreadListSemaphore);
+end;
+
+function GetAThreadBaseName: String;
+begin
+  ObtainSemaphore(@AThreadListSemaphore);
+  GetAThreadBaseName:=SubThreadBaseName;
+  ReleaseSemaphore(@AThreadListSemaphore);
 end;
 
 
@@ -189,22 +251,22 @@ end;
 
 function ARelocateThreadvar(offset : dword) : pointer;
 var
-  userData: Pointer;
+  p: PThreadInfo;
 begin
 {$IFDEF DEBUG_MT}
   {SysDebugLn('FPC AThreads: RelocateThreadvar');}
 {$ENDIF}
-  userData:=PProcess(FindTask(nil))^.pr_Task.tc_UserData;
-  if userData = nil then
-    result:=nil
+  p:=PThreadInfo(PProcess(FindTask(nil))^.pr_Task.tc_UserData);
+  if (p <> nil) and (p^.threadVars <> nil) then
+    result:=p^.threadVars + Offset
   else
-    result:=PThreadInfo(userData)^.threadVars + Offset;
+    result:=nil;
 end;
 
 
 procedure AAllocateThreadVars;
 var
-  userData: pointer;
+  p: PThreadInfo;
 begin
 {$ifdef DEBUG_MT}
   SysDebugLn('FPC AThreads: Allocating threadvars');
@@ -214,9 +276,24 @@ begin
   { exceptions which use threadvars but       }
   { these aren't allocated yet ...            }
   { allocate room on the heap for the thread vars }
-  userData:=PProcess(FindTask(nil))^.pr_Task.tc_UserData;
-  if userData <> nil then
-    PThreadInfo(userData)^.threadVars:=AllocVec(threadvarblocksize,MEMF_CLEAR)
+  p:=PThreadInfo(PProcess(FindTask(nil))^.pr_Task.tc_UserData);
+  if p <> nil then
+    begin
+{$ifdef AMIGA}
+      ObtainSemaphore(ASYS_heapSemaphore);
+{$endif}
+      p^.threadVars:=AllocPooled(ASYS_heapPool,threadvarblocksize);
+      if p^.threadVars = nil then
+        SysDebugLn('FPC AThreads: Failed to allocate threadvar memory!')
+      else
+        begin
+          p^.threadVarsSize:=threadvarblocksize;
+          FillChar(p^.threadVars^,threadvarblocksize,0);
+        end;
+{$ifdef AMIGA}
+      ReleaseSemaphore(ASYS_heapSemaphore);
+{$endif}
+    end
   else
     begin
 {$ifdef DEBUG_MT}
@@ -228,18 +305,26 @@ end;
 
 procedure AReleaseThreadVars;
 var
-  userData: pointer;
+  p: PThreadInfo;
 begin
 {$ifdef DEBUG_MT}
   SysDebugLn('FPC AThreads: Releasing threadvars');
 {$endif}
-  userData:=PProcess(FindTask(nil))^.pr_Task.tc_UserData;
-  if userdata <> nil then
-    FreeVec(PThreadInfo(userData)^.threadVars)
+  p:=PThreadInfo(PProcess(FindTask(nil))^.pr_Task.tc_UserData);
+  if (p <> nil) and (p^.threadVars <> nil) then
+    begin
+{$ifdef AMIGA}
+      ObtainSemaphore(ASYS_heapSemaphore);
+{$endif}
+      FreePooled(ASYS_heapPool,p^.threadVars,p^.threadVarsSize);
+{$ifdef AMIGA}
+      ReleaseSemaphore(ASYS_heapSemaphore);
+{$endif}
+    end
   else
     begin
 {$ifdef DEBUG_MT}
-      SysDebugLn('FPC AThreads: ReleaseThreadVars: tc_UserData of this process was nil!')
+      SysDebugLn('FPC AThreads: ReleaseThreadVars: tc_UserData or threadVars area of this process was nil!')
 {$endif}
     end;
 end;
@@ -258,14 +343,11 @@ begin
 {$endif}
       p:=PProcess(FindTask(nil));
       new(threadInfo);
+      FillChar(threadInfo^,sizeof(TThreadInfo),0);
       p^.pr_Task.tc_UserData:=threadInfo;
       threadInfo^.replyPort:=@p^.pr_MsgPort;
       threadInfo^.mainThread:=true;
-      threadInfo^.exited:=false;
       threadInfo^.threadID:=TThreadID(p);
-      threadInfo^.replyMsg:=nil;
-      threadInfo^.f:=nil;
-      threadInfo^.p:=nil;
       InitThreadVars(@ARelocateThreadvar);
       AddToThreadList(AThreadList,threadInfo);
     end;
@@ -281,6 +363,8 @@ procedure ThreadFunc; cdecl;
 var
   thisThread: PProcess;
   threadMsg: PThreadMsg;
+  resumeMsg: PThreadMsg;
+  exitSuspend: boolean; // true if we have to exit instead of suspend
   threadInfo: PThreadInfo;
 begin
   thisThread:=PProcess(FindTask(nil));
@@ -299,24 +383,57 @@ begin
   AAllocateThreadVars;
 
 {$ifdef DEBUG_MT}
-  { this line can't be before threadvar allocation }
-  SysDebugLn('FPC AThreads: Entering Subthread function, ID:'+hexStr(thisThread));
+  { first debug line can't be before threadvar allocation }
+  SysDebugLn('FPC AThreads: Entering subthread function, ID:'+hexStr(thisThread));
 {$endif}
 
+  if threadInfo^.name <> '' then
+    begin
+{$ifdef DEBUG_MT}
+      { this line can't be before threadvar allocation }
+      SysDebugLn('FPC AThreads: Renaming thread ID:'+hexStr(thisThread)+' to '+threadInfo^.name);
+{$endif}
+      thisThread^.pr_Task.tc_Node.ln_Name:=PChar(@threadInfo^.name[1]);
+    end;
+
   { Reply the message, so the calling thread could continue }
   { note that threadMsg was allocated on the caller's task, so }
   { it will be invalid below this point }
   ReplyMsg(PMessage(threadMsg));
 
-  InitThread(threadInfo^.stackLen);
-  threadInfo^.exitCode:=Pointer(threadInfo^.f(threadInfo^.p));
-  DoneThread;
+  { if creating a suspended thread, wait for the wakeup message to arrive }
+  { then check if we actually have to resume, or exit }
+  exitSuspend:=false;
+  if threadInfo^.suspended then
+    begin
+{$ifdef DEBUG_MT}
+      SysDebugLn('FPC AThreads: Suspending subthread on entry, ID:'+hexStr(thisThread));
+{$endif}
+      WaitPort(@thisThread^.pr_MsgPort);
+      resumeMsg:=PThreadMsg(GetMsg(@thisThread^.pr_MsgPort));
+      exitSuspend:=resumeMsg^.tm_Operation <> toResume;
+      threadInfo^.suspended:=false;
+      ReplyMsg(PMessage(resumeMsg));
+{$ifdef DEBUG_MT}
+      SysDebugLn('FPC AThreads: Resuming subthread on entry, ID:'+hexStr(thisThread)+', resumed only to exit: '+inttostr(ord(exitSuspend)));
+{$endif}
+    end;
+
+  { Finally, call the user code }
+  if not exitSuspend then
+    begin
+      InitThread(threadInfo^.stackLen);
+      threadInfo^.exitCode:=Pointer(threadInfo^.f(threadInfo^.p));
+      DoneThread;
+    end;
 
 {$ifdef DEBUG_MT}
   SysDebugLn('FPC AThreads: Exiting Subthread function, ID:'+hexStr(thisThread));
 {$endif}
   Forbid();
   threadInfo^.exited:=true;
+
+  { Send our exit message... }
   with threadInfo^.replyMsg^ do
     begin
       mn_Node.ln_Type:=NT_MESSAGE;
@@ -358,24 +475,26 @@ begin
   { the only way to pass data to the newly created thread
     in a MT safe way, is to use the heap }
   new(threadInfo);
+  FillChar(threadInfo^,sizeof(TThreadInfo),0);
   threadInfo^.f:=ThreadFunction;
   threadInfo^.p:=p;
-  threadInfo^.stackLen:=stacksize;
-  threadInfo^.exited:=false;
-  threadInfo^.mainThread:=false;
+  if (creationFlags and STACK_SIZE_PARAM_IS_A_RESERVATION) > 0 then
+    threadInfo^.stackLen:=stacksize
+  else
+    threadInfo^.stackLen:=System.StackLength; { inherit parent's stack size }
+  threadInfo^.suspended:=(creationFlags and CREATE_SUSPENDED) > 0;
 
 {$ifdef DEBUG_MT}
-  SysDebugLn('FPC AThreads: Starting new thread...');
+  SysDebugLn('FPC AThreads: Starting new thread... Stack size: '+inttostr(threadInfo^.stackLen));
 {$endif}
   subThread:=CreateNewProc([
                            {$IFDEF MORPHOS}
                            NP_CodeType,CODETYPE_PPC,
-                           NP_PPCStackSize, stacksize,
+                           NP_PPCStackSize, threadInfo^.stacklen,
                            {$ELSE}
-                           NP_StackSize, stacksize,
+                           NP_StackSize, threadInfo^.stacklen,
                            {$ENDIF}
                            NP_Entry,PtrUInt(@ThreadFunc),
-                           NP_Name,PtrUInt(PChar('FPC Subthread')),
                            TAG_DONE]);
   if subThread = nil then
     begin
@@ -393,30 +512,13 @@ begin
 
   // the thread should be started here, and waiting 
   // for our start message, so send it
-  FillChar(threadMsg,sizeof(threadMsg),0);
-  with threadMsg do
-    begin
-      with tm_MsgNode do
-        begin
-          mn_Node.ln_Type:=NT_MESSAGE;
-          mn_Length:=SizeOf(TThreadMsg);
-          mn_ReplyPort:=replyPort;
-        end;
-      tm_ThreadInfo:=threadInfo;
-    end;
-  AddToThreadList(AThreadList,threadInfo);
-
 {$ifdef DEBUG_MT}
-  SysDebugLn('FPC AThreads: Sending start message to subthread ID:'+hexStr(subThread));
+  SysDebugLn('FPC AThreads: Sending start message to subthread and waiting for reply ID:'+hexStr(subThread));
 {$endif}
-  PutMsg(@subThread^.pr_MsgPort,PMessage(@threadMsg));
-
-  { wait for a reply, so we know the thread has initialized properly }
-{$ifdef DEBUG_MT}
-  SysDebugLn('FPC AThreads: Waiting for reply...');
-{$endif}
-  WaitPort(replyPort);
-  GetMsg(replyPort);
+  AddToThreadList(AThreadList,threadInfo);
+  { AddToThreadList assigned us a number, so use it to name the thread }
+  threadInfo^.name:=GetAThreadBaseName+' #'+inttostr(threadInfo^.num);
+  SendMessageToThread(threadMsg,threadInfo,toStart,true);
 
   ABeginThread:=ThreadId;
 {$ifdef DEBUG_MT}
@@ -442,12 +544,30 @@ end;
 
 
 function AResumeThread (threadHandle : TThreadID) : dword;
+var
+  m: TThreadMsg;
+  p: PThreadInfo;
 begin
+  AResumeThread:=0;
+  Forbid();
+  p:=GetThreadInfo(AThreadList,threadHandle);
+  if (p <> nil) and p^.suspended then
+    begin
 {$ifdef DEBUG_MT}
-  SysDebugLn('FPC AThreads: unsupported operation: ResumeThread called for ID:'+hexStr(Pointer(threadHandle)));
+      SysDebugLn('FPC AThreads: Waiting for thread to resume, ID:'+hexStr(Pointer(threadHandle)));
 {$endif}
-  // cannot be properly supported on Amiga
-  result:=dword(-1);
+      { WaitPort in SendMessageToThread will break the Forbid() state... }
+      SendMessageToThread(m,p,toResume,true);
+      AResumeThread:=0;
+    end
+  else
+    begin
+{$ifdef DEBUG_MT}
+      SysDebugLn('FPC AThreads: Error, attempt to resume a non-suspended thread, or invalid thread ID:'+hexStr(Pointer(threadHandle)));
+{$endif}
+      AResumeThread:=dword(-1);
+    end;
+  Permit();
 end;
 
 
@@ -484,8 +604,8 @@ end;
 
 function AWaitForThreadTerminate (threadHandle : TThreadID; TimeoutMs : longint) : dword;  {0=no timeout}
 var
-  LResultP: Pointer;
   p: PThreadInfo;
+  m: TThreadMsg;
 begin
 {.$WARNING Support for timeout argument is not implemented}
 { But since CThreads uses pthread_join, which has also no timeout,
@@ -498,16 +618,22 @@ begin
 {$ifdef DEBUG_MT}
       SysDebugLn('FPC AThreads: Waiting for thread to exit, ID:'+hexStr(Pointer(threadHandle)));
 {$endif}
+      { WaitPort in SendMessageToThread will break the Forbid() state... }
+      if p^.suspended then
+        SendMessageToThread(m,p,toExit,true);
+
       { WaitPort will break the Forbid() state... }
       WaitPort(p^.replyPort);
       GetMsg(p^.replyPort);
       AWaitForThreadTerminate:=DWord(p^.exitCode);
     end
   else
+    begin
 {$ifdef DEBUG_MT}
-    SysDebugLn('FPC AThreads: Error, attempt to wait for invalid thread ID to exit, ID:'+hexStr(Pointer(threadHandle)))
+      SysDebugLn('FPC AThreads: Error, attempt to wait for invalid thread ID to exit, ID:'+hexStr(Pointer(threadHandle)));
 {$endif}
-  ;
+      AWaitForThreadTerminate:=dword(-1); { Return non-zero code on error. }
+    end;
   Permit();
 end;
 
@@ -592,28 +718,27 @@ end;
 procedure AInitCriticalSection(var CS);
 begin
 {$IFDEF DEBUG_MT}
-  SysDebugLn('FPC AThreads: InitCriticalSection $'+hexStr(@CS));
+  SysDebugLn('FPC AThreads: InitCriticalSection '+hexStr(@CS));
 {$ENDIF}
-  PSignalSemaPhore(CS):=AllocVec(sizeof(TSignalSemaphore),MEMF_CLEAR);
-  InitSemaphore(PSignalSemaphore(CS));
+  InitSemaphore(PSignalSemaphore(@CS));
 end;
 
 
 procedure AEnterCriticalSection(var CS);
 begin
 {$IFDEF DEBUG_MT}
-  SysDebugLn('FPC AThreads: EnterCriticalSection $'+hexStr(@CS));
+  SysDebugLn('FPC AThreads: EnterCriticalSection '+hexStr(@CS));
 {$ENDIF}
-  ObtainSemaphore(PSignalSemaphore(CS));
+  ObtainSemaphore(PSignalSemaphore(@CS));
 end;
 
 
 function ATryEnterCriticalSection(var CS):longint;
 begin
 {$IFDEF DEBUG_MT}
-  SysDebugLn('FPC AThreads: TryEnterCriticalSection $'+hexStr(@CS));
+  SysDebugLn('FPC AThreads: TryEnterCriticalSection '+hexStr(@CS));
 {$ENDIF}
-  result:=DWord(AttemptSemaphore(PSignalSemaphore(CS)));
+  result:=DWord(AttemptSemaphore(PSignalSemaphore(@CS)));
   if result<>0 then
     result:=1;
 end;
@@ -622,23 +747,22 @@ end;
 procedure ALeaveCriticalSection(var CS);
 begin
 {$IFDEF DEBUG_MT}
-  SysDebugLn('FPC AThreads: LeaveCriticalSection $'+hexStr(@CS));
+  SysDebugLn('FPC AThreads: LeaveCriticalSection '+hexStr(@CS));
 {$ENDIF}
-  ReleaseSemaphore(PSignalSemaphore(CS));
+  ReleaseSemaphore(PSignalSemaphore(@CS));
 end;
 
 
 procedure ADoneCriticalSection(var CS);
 begin
 {$IFDEF DEBUG_MT}
-  SysDebugLn('FPC AThreads: DoneCriticalSection $'+hexStr(@CS));
+  SysDebugLn('FPC AThreads: DoneCriticalSection '+hexStr(@CS));
 {$ENDIF}
   { unlock as long as unlocking works to unlock it if it is recursive
     some Delphi code might call this function with a locked mutex }
-  with PSignalSemaphore(CS)^ do
+  with TSignalSemaphore(CS) do
     while ss_NestCount > 0 do
-      ReleaseSemaphore(PSignalSemaphore(CS));
-  FreeVec(Pointer(CS));
+      ReleaseSemaphore(PSignalSemaphore(@CS));
 end;
 
 
@@ -800,6 +924,7 @@ initialization
     end;
   AThreadList:=nil;
   AThreadListLen:=0;
+  AThreadNum:=-1; { Mainthread will be 0. }
   InitSemaphore(@AThreadListSemaphore);
   SetAThreadManager;
 

+ 9 - 4
rtl/amicommon/sysosh.inc

@@ -23,9 +23,14 @@ type
   THandle = LongInt;
 {$endif CPU64}
   TThreadID = THandle;
-  
+
   PRTLCriticalSection = ^TRTLCriticalSection;
+{$IFDEF AROS}
   TRTLCriticalSection = Pointer;
-
-
-
+{$ELSE}
+  TRTLCriticalSection = record
+    { This must actually be bigger or equal to sizeof(TSignalSemaphore)
+      which seems to be 46 bytes on MorphOS and Amiga/m68k. }
+    semaphore: array[0..63] of byte; 
+  end;
+{$ENDIF}