Thread.cpp 65 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554
  1. /******************************************************************************
  2. 'ReadWriteSync' is better than 'std::shared_mutex' because:
  3. -allows for 'enterRead' followed by 'enterWrite' on the same thread, 'shared_mutex' does not allow this
  4. -allows for 'enterWrite' followed by 'enterRead' on the same thread, 'shared_mutex' does not allow this
  5. -it's Write-preferring (all new readers blocked when a writer was requested), 'shared_mutex' preference is unspecified
  6. /******************************************************************************/
  7. #include "stdafx.h"
  8. namespace EE{
  9. /******************************************************************************/
  10. #define STACK_SIZE (1024*1024) // set 1MB stack size
  11. #if !HAS_THREADS
  12. ThreadEmulation EmulatedThreads;
  13. #endif
  14. /******************************************************************************/
  15. // 32-bit AtomicGet and AtomicSet are simple:
  16. // https://github.com/mintomic/mintomic/tree/master/include/mintomic/private
  17. #if WINDOWS // versions with _ are faster than those without
  18. #if !X64
  19. Long AtomicGet(C Long &x) {Long old=x; return _InterlockedCompareExchange64((LONG64*)&x, old, old);} // this version was the fastest
  20. //Long AtomicGet(C Long &x) { return _InterlockedCompareExchange64((LONG64*)&x, 0, 0);} // slightly slower (this can work with 0 because it compares x to 0 and sets 0 only then) but always returns the old value despite if it was zero or not
  21. //Long AtomicGet(C Long &x) {Long old; do old=x; while(_InterlockedCompareExchange64((LONG64*)&x, old, old)!=old); return old;}
  22. void AtomicSet(Long &x, Long y) {Long old; do old=x; while(_InterlockedCompareExchange64((LONG64*)&x, y, old)!=old);}
  23. #endif
  24. Int AtomicInc(Int &x) {return _InterlockedIncrement((LONG*)&x)-1;} // 'InterlockedIncrement' returns the new value
  25. Int AtomicDec(Int &x) {return _InterlockedDecrement((LONG*)&x)+1;} // 'InterlockedDecrement' returns the new value
  26. Int AtomicAdd(Int &x, Int y) {return _InterlockedExchangeAdd((LONG*)&x, y);} // 'InterlockedExchangeAdd' returns the old value
  27. Int AtomicSub(Int &x, Int y) {return _InterlockedExchangeAdd((LONG*)&x, -y);} // 'InterlockedExchangeAdd' returns the old value
  28. #if X64
  29. Long AtomicAdd(Long &x, Long y) {return _InterlockedExchangeAdd64((LONG64*)&x, y);} // 'InterlockedExchangeAdd64' returns the old value
  30. Long AtomicSub(Long &x, Long y) {return _InterlockedExchangeAdd64((LONG64*)&x, -y);} // 'InterlockedExchangeAdd64' returns the old value
  31. #else
  32. Long AtomicAdd(Long &x, Long y) {return InterlockedExchangeAdd64((LONG64*)&x, y);} // 'InterlockedExchangeAdd64' returns the old value
  33. Long AtomicSub(Long &x, Long y) {return InterlockedExchangeAdd64((LONG64*)&x, -y);} // 'InterlockedExchangeAdd64' returns the old value
  34. #endif
  35. Int AtomicAnd (Int &x, Int y) {return _InterlockedAnd((LONG*)&x, y);} // 'InterlockedAnd' returns the old value
  36. Int AtomicDisable(Int &x, Int y) {return _InterlockedAnd((LONG*)&x, ~y);} // 'InterlockedAnd' returns the old value
  37. Int AtomicOr (Int &x, Int y) {return _InterlockedOr ((LONG*)&x, y);} // 'InterlockedOr' returns the old value
  38. Int AtomicXor (Int &x, Int y) {return _InterlockedXor((LONG*)&x, y);} // 'InterlockedXor' returns the old value
  39. Int AtomicGetSet(Int &x, Int y) {return _InterlockedExchange((LONG*)&x, y);} // 'InterlockedExchange' returns the old value
  40. Bool AtomicCAS(Int &x, Int compare, Int new_value) {return _InterlockedCompareExchange ((LONG *)&x, new_value, compare)== compare;} // 'InterlockedCompareExchange' returns the old value
  41. Bool AtomicCAS(Long &x, Long compare, Long new_value) {return _InterlockedCompareExchange64((LONG64*)&x, new_value, compare)== compare;} // 'InterlockedCompareExchange' returns the old value
  42. Bool AtomicCAS(Flt &x, Flt compare, Flt new_value) {return _InterlockedCompareExchange ((LONG *)&x, (Int&)new_value, (Int&)compare)==(Int&)compare;} // 'InterlockedCompareExchange' returns the old value
  43. #else
  44. #if !X64
  45. Long AtomicGet(C Long &x) {Long old=x; return __sync_val_compare_and_swap(&ConstCast(x), old, old);} // 'ConstCast' is used to mute a warning, it can be used because 'x' will be modified to 'old' only if it's equal to 'old' already
  46. //Long AtomicGet(C Long &x) { return __sync_val_compare_and_swap(&ConstCast(x), 0, 0);} // 'ConstCast' is used to mute a warning, it can be used because 'x' will be modified to 'old' only if it's equal to 'old' already
  47. //Long AtomicGet(C Long &x) {Long old; do old=x; while(!__sync_bool_compare_and_swap(&ConstCast(x), old, old)); return old;} // 'ConstCast' is used to mute a warning, it can be used because 'x' will be modified to 'old' only if it's equal to 'old' already
  48. void AtomicSet(Long &x, Long y) {Long old; do old=x; while(!__sync_bool_compare_and_swap(&x, old, y));}
  49. #endif
  50. Int AtomicInc(Int &x) {return __sync_fetch_and_add(&x, +1);} // '__sync_fetch_and_add' returns the old value
  51. Int AtomicDec(Int &x) {return __sync_fetch_and_add(&x, -1);} // '__sync_fetch_and_add' returns the old value
  52. Int AtomicAdd(Int &x, Int y) {return __sync_fetch_and_add(&x, y);} // '__sync_fetch_and_add' returns the old value
  53. Int AtomicSub(Int &x, Int y) {return __sync_fetch_and_sub(&x, y);} // '__sync_fetch_and_sub' returns the old value
  54. Long AtomicAdd(Long &x, Long y) {return __sync_fetch_and_add(&x, y);} // '__sync_fetch_and_add' returns the old value
  55. Long AtomicSub(Long &x, Long y) {return __sync_fetch_and_sub(&x, y);} // '__sync_fetch_and_sub' returns the old value
  56. Int AtomicAnd (Int &x, Int y) {return __sync_fetch_and_and(&x, y);} // '__sync_fetch_and_and' returns the old value
  57. Int AtomicDisable(Int &x, Int y) {return __sync_fetch_and_and(&x, ~y);} // '__sync_fetch_and_and' returns the old value
  58. Int AtomicOr (Int &x, Int y) {return __sync_fetch_and_or (&x, y);} // '__sync_fetch_and_or' returns the old value
  59. Int AtomicXor (Int &x, Int y) {return __sync_fetch_and_xor(&x, y);} // '__sync_fetch_and_xor' returns the old value
  60. Int AtomicGetSet(Int &x, Int y) {return __sync_lock_test_and_set(&x, y);} // '__sync_lock_test_and_set' returns the old value
  61. Bool AtomicCAS(Int &x, Int compare, Int new_value) {return __sync_bool_compare_and_swap( &x, compare, new_value);}
  62. Bool AtomicCAS(Long &x, Long compare, Long new_value) {return __sync_bool_compare_and_swap( &x, compare, new_value);}
  63. Bool AtomicCAS(Flt &x, Flt compare, Flt new_value) {return __sync_bool_compare_and_swap((Int*)&x, (Int&)compare, (Int&)new_value);}
  64. #endif
  65. /******************************************************************************/
  66. #undef GetThreadId
  67. UIntPtr Thread::id()C {return PLATFORM(::GetThreadId(_handle), (UIntPtr)_handle);}
  68. UIntPtr GetThreadId() {return _GetThreadId();}
  69. #define GetThreadId _GetThreadId
  70. #if WINDOWS_OLD
  71. UIntPtr GetThreadIdFromWindow(Ptr hwnd) {return GetWindowThreadProcessId((HWND)hwnd, null);}
  72. #else
  73. UIntPtr GetThreadIdFromWindow(Ptr hwnd) {return (hwnd==App.hwnd()) ? App.threadID() : 0;}
  74. #endif
  75. /******************************************************************************/
  76. void SetThreadName(C Str8 &name, UIntPtr thread_id)
  77. {
  78. #if WINDOWS
  79. #pragma pack(push, 8)
  80. struct ThreadName
  81. {
  82. DWORD dwType;
  83. LPCSTR szName;
  84. DWORD dwThreadID;
  85. DWORD dwFlags;
  86. };
  87. #pragma pack(pop)
  88. ThreadName info;
  89. info.dwType =0x1000;
  90. info.szName =name;
  91. info.dwThreadID=thread_id;
  92. info.dwFlags =0;
  93. __try{RaiseException(0x406D1388, 0, SIZE(info)/SIZE(ULONG_PTR), (ULONG_PTR*)&info);}
  94. __except(EXCEPTION_EXECUTE_HANDLER) {}
  95. // TODO: check 'SetThreadDescription'
  96. #elif APPLE
  97. if(thread_id==GetThreadId()) // on Apple can set the name of current thread only
  98. {
  99. #if 1 // works the same but less overhead
  100. pthread_setname_np(name); // doesn't crash if 'name' is null
  101. #else
  102. if(NSStringAuto ns_name=name)[[NSThread currentThread] setName:ns_name];
  103. #endif
  104. }
  105. #elif ANDROID || LINUX
  106. pthread_setname_np((pthread_t)thread_id, name ? name() : ""); // 'pthread_setname_np' will crash on Linux if `name` is null
  107. #endif
  108. }
  109. /******************************************************************************/
  110. // SYNCHRONIZATION LOCK
  111. /******************************************************************************/
  112. #if !HAS_THREADS
  113. SyncLock::~SyncLock() {_is=false;}
  114. SyncLock:: SyncLock() {_lock_count=0; _owner=0; _is=true;}
  115. Bool SyncLock:: tryOn ()C {on(); return true;}
  116. void SyncLock:: on ()C {if( _lock_count++==0)_owner=GetThreadId();}
  117. void SyncLock:: off ()C {if(--_lock_count ==0)_owner=0;}
  118. Bool SyncLock:: owned ()C {return _lock_count>0;}
  119. Bool SyncLock:: created ()C {return _is!=0;}
  120. #elif WINDOWS
  121. Bool SyncLock::owned ()C {return _lock.OwningThread==(HANDLE)GetThreadId();}
  122. Bool SyncLock::created()C {return _lock.DebugInfo!=null;}
  123. #if SUPPORT_WINDOWS_XP
  124. static BOOL (WINAPI *InitializeCriticalSectionEx)(LPCRITICAL_SECTION lpCriticalSection, DWORD dwSpinCount, DWORD Flags);
  125. static Bool InitializeCriticalSectionExTried;
  126. SyncLock::SyncLock()
  127. {
  128. if(InitializeCriticalSectionEx)
  129. {
  130. ex:
  131. InitializeCriticalSectionEx(&_lock, 0, CRITICAL_SECTION_NO_DEBUG_INFO);
  132. }else
  133. {
  134. if(!InitializeCriticalSectionExTried)
  135. {
  136. if(HMODULE kernel=GetModuleHandle(L"Kernel32.dll"))InitializeCriticalSectionEx=(decltype(InitializeCriticalSectionEx))GetProcAddress(kernel, "InitializeCriticalSectionEx"); // available on Vista+
  137. InitializeCriticalSectionExTried=true;
  138. if(InitializeCriticalSectionEx)goto ex;
  139. }
  140. InitializeCriticalSection(&_lock);
  141. }
  142. DEBUG_ASSERT(created(), "SyncLock");
  143. }
  144. #else
  145. SyncLock:: SyncLock() {InitializeCriticalSectionEx(&_lock, 0, CRITICAL_SECTION_NO_DEBUG_INFO); DEBUG_ASSERT(created(), "SyncLock");}
  146. #endif
  147. SyncLock::~SyncLock() { DeleteCriticalSection (&_lock); DEBUG_ASSERT(!created(), "SyncLock");}
  148. #if SYNC_LOCK_SAFE
  149. Bool SyncLock::tryOn()C {return created() ? TryEnterCriticalSection(&_lock)!=0 : false;}
  150. void SyncLock::on ()C { if(created()) EnterCriticalSection(&_lock);}
  151. void SyncLock::off ()C { if(created()) LeaveCriticalSection(&_lock);}
  152. #else
  153. Bool SyncLock::tryOn()C {return TryEnterCriticalSection(&_lock);}
  154. void SyncLock::on ()C { EnterCriticalSection(&_lock);}
  155. void SyncLock::off ()C { LeaveCriticalSection(&_lock);}
  156. #endif
  157. #else
  158. #define CUSTOM_RECURSIVE 0
  159. Bool SyncLock::created()C {return _is!=0;}
  160. SyncLock::SyncLock()
  161. {
  162. #if CUSTOM_RECURSIVE
  163. pthread_mutex_init(&_lock, null);
  164. #else
  165. pthread_mutexattr_t attr;
  166. pthread_mutexattr_init ( &attr);
  167. pthread_mutexattr_settype( &attr, PTHREAD_MUTEX_RECURSIVE);
  168. pthread_mutex_init (&_lock, &attr);
  169. pthread_mutexattr_destroy( &attr);
  170. #endif
  171. _lock_count=0;
  172. _owner=0;
  173. _is=true;
  174. }
  175. SyncLock::~SyncLock()
  176. {
  177. if(owned())REP(_lock_count)off();
  178. _is=false;
  179. pthread_mutex_destroy(&_lock);
  180. _lock_count=0;
  181. _owner=0;
  182. }
  183. Bool SyncLock::owned()C
  184. {
  185. return _lock_count>0 && _owner==GetThreadId();
  186. }
  187. Bool SyncLock::tryOn()C
  188. {
  189. #if SYNC_LOCK_SAFE
  190. if(created())
  191. #endif
  192. {
  193. #if CUSTOM_RECURSIVE
  194. if(owned())
  195. {
  196. _lock_count++;
  197. return true;
  198. }
  199. if(pthread_mutex_trylock(&_lock)==0)
  200. {
  201. _owner=GetThreadId();
  202. _lock_count++;
  203. return true;
  204. }
  205. #else
  206. if(pthread_mutex_trylock(&_lock)==0)
  207. {
  208. if(!_lock_count)_owner=GetThreadId();
  209. _lock_count++;
  210. return true;
  211. }
  212. #endif
  213. }
  214. return false;
  215. }
  216. void SyncLock::on()C
  217. {
  218. #if SYNC_LOCK_SAFE
  219. if(created())
  220. #endif
  221. {
  222. #if CUSTOM_RECURSIVE
  223. if(!owned())
  224. {
  225. pthread_mutex_lock(&_lock);
  226. _owner=GetThreadId();
  227. }
  228. _lock_count++;
  229. #else
  230. pthread_mutex_lock(&_lock);
  231. if(!_lock_count)_owner=GetThreadId();
  232. _lock_count++;
  233. #endif
  234. }
  235. }
  236. void SyncLock::off()C
  237. {
  238. #if SYNC_LOCK_SAFE
  239. if(created())
  240. #endif
  241. {
  242. #if CUSTOM_RECURSIVE
  243. _lock_count--;
  244. if(_lock_count<=0)
  245. {
  246. _owner=0;
  247. pthread_mutex_unlock(&_lock);
  248. }
  249. #else
  250. _lock_count--;
  251. if(_lock_count<=0)_owner=0;
  252. pthread_mutex_unlock(&_lock);
  253. #endif
  254. }
  255. }
  256. #endif
  257. /******************************************************************************/
  258. // SYNCHRONIZATION EVENT
  259. /******************************************************************************/
  260. #if !HAS_THREADS
  261. SyncEvent:: SyncEvent(Bool auto_off ) {_handle=null; _condition=false; _auto_off=auto_off;}
  262. SyncEvent::~SyncEvent( ) {}
  263. void SyncEvent:: on ( )C {_condition=true ;}
  264. void SyncEvent:: off ( )C {_condition=false;}
  265. Bool SyncEvent:: wait ( )C {Bool ok=_condition; if(_auto_off)_condition=false; return ok;}
  266. Bool SyncEvent:: wait (Int milliseconds)C {Bool ok=_condition; if(_auto_off)_condition=false; return ok;}
  267. #elif WINDOWS
  268. SyncEvent:: SyncEvent(Bool auto_off ) { _handle= CreateEvent(null, !auto_off, false, null); }
  269. SyncEvent::~SyncEvent( ) { if(_handle){ CloseHandle(_handle ); _handle=null;}}
  270. void SyncEvent:: on ( )C {/*if(_handle)*/ SetEvent(_handle ); } // checking for handle!=null is not needed, as the function will do nothing on null
  271. void SyncEvent:: off ( )C {/*if(_handle)*/ ResetEvent(_handle ); } // checking for handle!=null is not needed, as the function will do nothing on null
  272. Bool SyncEvent:: wait ( )C {/*if(_handle)*/return WaitForSingleObject(_handle, INFINITE)!=WAIT_TIMEOUT; return true;} // checking for handle!=null is not needed, as the function will return -1 on null, WAIT_TIMEOUT=258 so result will be -1!=258 -> true
  273. Bool SyncEvent:: wait (Int milliseconds)C {/*if(_handle)*/return WaitForSingleObject(_handle, (milliseconds>=0) ? milliseconds : INFINITE)!=WAIT_TIMEOUT; return true;} // checking for handle!=null is not needed, as the function will return -1 on null, WAIT_TIMEOUT=258 so result will be -1!=258 -> true
  274. #else
  275. SyncEvent::SyncEvent(Bool auto_off)
  276. {
  277. _condition=false; _auto_off=auto_off;
  278. Alloc(_handle); pthread_cond_init (_handle, null);
  279. Alloc(_mutex ); pthread_mutex_init(_mutex , null);
  280. }
  281. SyncEvent::~SyncEvent()
  282. {
  283. if(_handle)
  284. {
  285. pthread_mutex_lock (_mutex); _condition=false; pthread_cond_destroy(_handle); Free(_handle);
  286. pthread_mutex_unlock (_mutex);
  287. pthread_mutex_destroy(_mutex); Free(_mutex);
  288. }
  289. }
  290. void SyncEvent::on()C
  291. {
  292. if(_handle)
  293. {
  294. pthread_mutex_lock (_mutex); _condition=true; if(_auto_off)pthread_cond_signal(_handle);else pthread_cond_broadcast(_handle); // _auto_off ? wake up 1 : wake up all
  295. pthread_mutex_unlock(_mutex);
  296. }
  297. }
  298. void SyncEvent::off()C
  299. {
  300. if(_handle)
  301. {
  302. pthread_mutex_lock (_mutex); _condition=false;
  303. pthread_mutex_unlock(_mutex);
  304. }
  305. }
  306. Bool SyncEvent::wait()C
  307. {
  308. if(_handle)
  309. {
  310. pthread_mutex_lock(_mutex);
  311. for(; !_condition && !pthread_cond_wait(_handle, _mutex); ); // check for '_condition' first in case it's already met to avoid 'pthread_cond_wait' overhead, have to check in a loop because 'pthread_cond_wait' may return if OS interrupted it, 'pthread_cond_wait' automatically unlocks and locks the mutex
  312. Bool ok=_condition;
  313. if(_auto_off)_condition=false;
  314. pthread_mutex_unlock(_mutex);
  315. return ok;
  316. }
  317. return true;
  318. }
  319. Bool SyncEvent::wait(Int milliseconds)C
  320. {
  321. if(_handle)
  322. {
  323. pthread_mutex_lock(_mutex);
  324. if(!_condition) // if condition not met yet
  325. {
  326. if(milliseconds<0) // infinite wait
  327. {
  328. for(; !_condition && !pthread_cond_wait(_handle, _mutex); ); // check for '_condition' first in case it's already met to avoid 'pthread_cond_wait' overhead, have to check in a loop because 'pthread_cond_wait' may return if OS interrupted it, 'pthread_cond_wait' automatically unlocks and locks the mutex
  329. }else
  330. if(milliseconds>0) // timed wait
  331. {
  332. timeval tv; gettimeofday(&tv, null);
  333. timespec ts; ts.tv_sec =tv.tv_sec ;
  334. ts.tv_nsec =tv.tv_usec*1000;
  335. ts.tv_nsec+=(milliseconds%1000)*1000000;
  336. ts.tv_sec +=(milliseconds/1000) + ts.tv_nsec/1000000000;
  337. ts.tv_nsec%=1000000000;
  338. // 'ts' specifies the end time at which waiting always fails (this is the "time position" and not "time duration")
  339. for(; !_condition && !pthread_cond_timedwait(_handle, _mutex, &ts); ); // 'pthread_cond_timedwait' automatically unlocks and locks the mutex, keep waiting in the loop in case it returns multiple times before the end of the time, but somehow with the condition still set to false, in that case keep waiting still as long as it returns success and not timeout or other error
  340. }
  341. }
  342. Bool ok=_condition;
  343. if(_auto_off)_condition=false;
  344. pthread_mutex_unlock(_mutex);
  345. return ok;
  346. }
  347. return true;
  348. }
  349. #endif
  350. /******************************************************************************/
  351. // SYNC COUNTER
  352. /******************************************************************************/
  353. #if !HAS_THREADS
  354. SyncCounter:: SyncCounter( ) {_handle=null; _counter=0;}
  355. SyncCounter::~SyncCounter( ) {}
  356. void SyncCounter:: operator+= (Int count )C {_counter=Mid(Long(_counter)+count, (Long)INT_MIN, (Long)INT_MAX);} // clamp to prevent overflow
  357. Bool SyncCounter:: wait ( )C {if(_counter>0){_counter--; return true;} return false;}
  358. Bool SyncCounter:: wait (Int milliseconds)C {if(_counter>0){_counter--; return true;} return false;}
  359. #elif WINDOWS
  360. SyncCounter:: SyncCounter( ) { _handle= CreateSemaphore(null, 0, INT_MAX, null);}
  361. SyncCounter::~SyncCounter( ) { if(_handle){ CloseHandle(_handle ); _handle=null;}}
  362. void SyncCounter:: operator+= (Int count )C {/*if(_handle)*/ ReleaseSemaphore(_handle, count, null );} // checking for handle!=null is not needed, as the function will do nothing on null
  363. Bool SyncCounter:: wait ( )C {/*if(_handle)*/return WaitForSingleObject(_handle, INFINITE)!=WAIT_TIMEOUT; return true;} // checking for handle!=null is not needed, as the function will return -1 on null, WAIT_TIMEOUT=258 so result will be -1!=258 -> true
  364. Bool SyncCounter:: wait (Int milliseconds)C {/*if(_handle)*/return WaitForSingleObject(_handle, (milliseconds>=0) ? milliseconds : INFINITE)!=WAIT_TIMEOUT; return true;} // checking for handle!=null is not needed, as the function will return -1 on null, WAIT_TIMEOUT=258 so result will be -1!=258 -> true
  365. #else
  366. SyncCounter::SyncCounter()
  367. {
  368. _counter=0;
  369. Alloc(_handle); pthread_cond_init (_handle, null);
  370. Alloc(_mutex ); pthread_mutex_init(_mutex , null);
  371. }
  372. SyncCounter::~SyncCounter()
  373. {
  374. if(_handle)
  375. {
  376. pthread_mutex_lock (_mutex); _counter=0; pthread_cond_destroy(_handle); Free(_handle);
  377. pthread_mutex_unlock (_mutex);
  378. pthread_mutex_destroy(_mutex); Free(_mutex);
  379. }
  380. }
  381. void SyncCounter::operator+=(Int count)C
  382. {
  383. if(_handle)
  384. {
  385. pthread_mutex_lock (_mutex); _counter=Mid(Long(_counter)+count, (Long)INT_MIN, (Long)INT_MAX); if(count>1)pthread_cond_broadcast(_handle);else pthread_cond_signal(_handle); // clamp to prevent overflow, if adding more than 1 then call 'pthread_cond_broadcast' to unlock all waiting threads, 'pthread_cond_signal' will unlock only 1
  386. pthread_mutex_unlock(_mutex);
  387. }
  388. }
  389. Bool SyncCounter::wait()C
  390. {
  391. if(_handle)
  392. {
  393. pthread_mutex_lock(_mutex);
  394. for(; _counter<=0 && !pthread_cond_wait(_handle, _mutex); ); // check for '_condition' first in case it's already met to avoid 'pthread_cond_wait' overhead, have to check in a loop because 'pthread_cond_wait' may return if OS interrupted it, 'pthread_cond_wait' automatically unlocks and locks the mutex
  395. Bool ok=false; if(_counter>0){_counter--; ok=true;}
  396. pthread_mutex_unlock(_mutex);
  397. return ok;
  398. }
  399. return true;
  400. }
  401. Bool SyncCounter::wait(Int milliseconds)C
  402. {
  403. if(_handle)
  404. {
  405. pthread_mutex_lock(_mutex);
  406. if(_counter<=0) // if condition not met yet
  407. {
  408. if(milliseconds<0) // infinite wait
  409. {
  410. for(; _counter<=0 && !pthread_cond_wait(_handle, _mutex); ); // check for '_condition' first in case it's already met to avoid 'pthread_cond_wait' overhead, have to check in a loop because 'pthread_cond_wait' may return if OS interrupted it, 'pthread_cond_wait' automatically unlocks and locks the mutex
  411. }else
  412. if(milliseconds>0) // timed wait
  413. {
  414. timeval tv; gettimeofday(&tv, null);
  415. timespec ts; ts.tv_sec =tv.tv_sec ;
  416. ts.tv_nsec =tv.tv_usec*1000;
  417. ts.tv_nsec+=(milliseconds%1000)*1000000;
  418. ts.tv_sec +=(milliseconds/1000) + ts.tv_nsec/1000000000;
  419. ts.tv_nsec%=1000000000;
  420. // 'ts' specifies the end time at which waiting always fails (this is the "time position" and not "time duration")
  421. for(; _counter<=0 && !pthread_cond_timedwait(_handle, _mutex, &ts); ); // 'pthread_cond_timedwait' automatically unlocks and locks the mutex, keep waiting in the loop in case it returns multiple times before the end of the time, but somehow with the condition still set to false, in that case keep waiting still as long as it returns success and not timeout or other error
  422. }
  423. }
  424. Bool ok=false; if(_counter>0){_counter--; ok=true;}
  425. pthread_mutex_unlock(_mutex);
  426. return ok;
  427. }
  428. return true;
  429. }
  430. #endif
  431. /******************************************************************************/
  432. // READER WRITER SYNC
  433. /******************************************************************************/
  434. void ReadWriteSync::enterRead()
  435. {
  436. UIntPtr thread_id=GetThreadId();
  437. // check if this thread already has a lock present, it's important to test without '_write_lock' yet, in case some thread called 'enterWrite'
  438. if(_locks.elms()) // this is safe because we're interested only in locks from this thread, so if this thread has made a lock before, then it will be available here without the need of enabling '_locks_lock'
  439. {
  440. _locks_lock.on();
  441. REPA(_locks) // iterate all locks
  442. {
  443. Lock &lock=_locks[i];
  444. if( lock.thread_id==thread_id){lock.locks++; _locks_lock.off(); return;} // if found an existing for this thread, then increase the lock counter, unlock, and return immediately without unlocking again below
  445. }
  446. _locks_lock.off(); // unlock this first, because we need to lock '_write_lock' before '_locks_lock' when adding a new lock
  447. }
  448. // create new lock on this thread
  449. _write_lock.on (); // this prevents creating a new reader lock when a different thread has called 'enterWrite'
  450. _locks_lock.on (); Lock &l=_locks.New(); l.thread_id=thread_id; l.locks=1; // even though we've temporarily unlocked '_locks_lock' above, we can always create a new lock here, without checking again if there's already listed, because even if other thread will add a new lock, it will always be a different thread's lock
  451. _locks_lock.off();
  452. _write_lock.off();
  453. }
  454. void ReadWriteSync::leaveRead()
  455. {
  456. //if(_locks.elms()) don't check this, because if we're calling 'leaveRead', then most likely we are locked
  457. {
  458. UIntPtr thread_id=GetThreadId();
  459. _locks_lock.on();
  460. REPA(_locks)
  461. {
  462. Lock &lock=_locks[i];
  463. if( lock.thread_id==thread_id)
  464. {
  465. if(--lock.locks<=0) // this is the last lock
  466. {
  467. _locks .remove(i); // remove it first
  468. _locks_lock .off(); // unlock before signaling
  469. _left_reading.on(); // signal that we've finished reading
  470. return; // return immediately without unlocking again below
  471. }
  472. break; // we've found a lock for this thread, so no need to look any further
  473. }
  474. }
  475. _locks_lock.off();
  476. }
  477. }
  478. void ReadWriteSync::enterWrite()
  479. {
  480. UIntPtr thread_id=GetThreadId();
  481. _write_lock.on(); // block adding new 'readers' and 'writers'
  482. // wait until all 'readers' from other threads will exit
  483. again:
  484. if(_locks.elms())
  485. {
  486. _locks_lock.on();
  487. REPA(_locks)if(_locks[i].thread_id!=thread_id) // if found at least one reader from another thread
  488. {
  489. _locks_lock.off(); // unlock first before waiting
  490. _left_reading.wait(); // wait for any thread to finish reading
  491. goto again; // check again
  492. }
  493. _locks_lock.off(); // no readers were found, unlock and return
  494. }
  495. }
  496. void ReadWriteSync::leaveWrite()
  497. {
  498. _write_lock.off();
  499. }
  500. Bool ReadWriteSync::ownedRead()
  501. {
  502. if(_locks.elms())
  503. {
  504. UIntPtr thread_id=GetThreadId();
  505. SyncLocker lock(_locks_lock); REPA(_locks)if(_locks[i].thread_id==thread_id)return true;
  506. }
  507. return false;
  508. }
  509. /******************************************************************************/
  510. // SIMPLE READER WRITER SYNC
  511. /******************************************************************************/
  512. void SimpleReadWriteSync::enterWrite()C
  513. {
  514. _lock.on();
  515. for(; AtomicGet(_readers); )_finished.wait(); // wait for readers to finish
  516. }
  517. void SimpleReadWriteSync::leaveWrite()C
  518. {
  519. _lock.off();
  520. }
  521. void SimpleReadWriteSync::enterRead()C
  522. {
  523. SyncLocker lock(_lock); // make sure there is no writer
  524. AtomicInc(_readers);
  525. }
  526. void SimpleReadWriteSync::leaveRead()C
  527. {
  528. if(AtomicDec(_readers)==1)_finished.on();
  529. }
  530. /******************************************************************************/
  531. // THREAD
  532. /******************************************************************************/
  533. #if HAS_THREADS
  534. #if WINDOWS
  535. static unsigned int WINAPI ThreadFunc(Ptr user)
  536. #else
  537. static Ptr ThreadFunc(Ptr user)
  538. #endif
  539. {
  540. // !! Do not use objects with destructors here because '_endthreadex' below does not allow for destructors !!
  541. ((Thread*)user)->func();
  542. #if WINDOWS
  543. _endthreadex(0);
  544. #endif
  545. return 0;
  546. }
  547. #endif
  548. void Thread::func()
  549. {
  550. Cpu.set();
  551. #if APPLE
  552. if(_name.is()){SetThreadName(_name); _name.del();} // delete because it's not needed anymore
  553. #endif
  554. again:
  555. for(; !wantStop(); ) // main thread loop
  556. {
  557. if(wantPause())
  558. {
  559. _paused=true;
  560. _resume.wait();
  561. _paused=false;
  562. goto again; // check if after unpause we want to stop or pause again
  563. }
  564. if(!_func(T))break;
  565. }
  566. ThreadFinishedUsingGPUData(); // automatically disable OpenGL context if it was enabled, deactivating any active context on a thread is required before exiting the thread
  567. #if !WINDOWS
  568. _finished.on(); // !! we have to notify of finish before clearing '_active', because '_finished.on' operates on SyncEvent, so it needs to be valid until this 'on' call finishes, if we would clear '_active' before that, then this 'Thread' memory could have been already released and we would be operating on invalid memory !!
  569. #endif
  570. _active=false;
  571. // !! do not perform any operations after clearing '_active' because this object could have been already deleted !!
  572. }
  573. void Thread::zero()
  574. {
  575. user=null;
  576. _want_stop=_want_pause=_paused=_active=false;
  577. _priority=0;
  578. _handle=NULL;
  579. _func=null;
  580. }
  581. void Thread::stop()
  582. {
  583. if(active())
  584. {
  585. #if HAS_THREADS
  586. _want_stop=true;
  587. resume();
  588. #else
  589. EmulatedThreads.exclude(T);
  590. _active=false;
  591. #endif
  592. }
  593. }
  594. void Thread::cancelStop()
  595. {
  596. #if HAS_THREADS
  597. _want_stop=false;
  598. #endif
  599. }
  600. void Thread::pause()
  601. {
  602. if(active())
  603. {
  604. _want_pause=true;
  605. #if !HAS_THREADS
  606. if(!_paused) // ignore if it's already paused
  607. {
  608. _paused=true;
  609. EmulatedThreads.exclude(T);
  610. }
  611. #endif
  612. }
  613. }
  614. void Thread::resume()
  615. {
  616. if(_want_pause) // no need to check for 'active' because '_want_pause' can be enabled only for active threads
  617. {
  618. _want_pause=false; // disable this as soon as possible
  619. #if !HAS_THREADS
  620. if(_paused)
  621. {
  622. _paused=false;
  623. EmulatedThreads.include(T);
  624. }
  625. #else
  626. _resume.on();
  627. #endif
  628. }
  629. }
  630. #if HAS_THREADS && !WINDOWS
  631. #define PRIORITY_POLICY (APPLE ? SCHED_OTHER : SCHED_RR) // SCHED_OTHER gives better results on Apple but it's not available on Android/Linux, SCHED_RR (Mac/iOS 15..47, Android 1..99), SCHED_OTHER (Mac/iOS 15..47, Android 0..0)
  632. static const Int PriorityBase =sched_get_priority_min(PRIORITY_POLICY),
  633. PriorityRange=sched_get_priority_max(PRIORITY_POLICY)-PriorityBase;
  634. /* Tested using following program:
  635. const int PR=3;
  636. int v[PR*2+1]; Memx<Thread> threads;
  637. bool Func(Thread &t) {AtomicInc(v[t.priority()+PR]); return true;}
  638. void InitPre() {App.flag|=APP_WORK_IN_BACKGROUND; DataPath("../Data");}
  639. bool Init () {for(int p=-PR; p<=PR; p++)REP(2)threads.New().create(Func, null, p, true); FREPAO(threads).resume(); return true;}
  640. void Shut () {threads.del();}
  641. bool Update () {if(Kb.bp(KB_ESC))return false; return true;}
  642. void Draw () {D.clear(TURQ); Str s; int max=0; REPA(v)MAX(max, v[i]); FREPA(v){if(i)s+=", "; s+=Flt(v[i])/max;} D.text(0, 0, s);}
  643. */
  644. #endif
  645. void Thread::priority(Int priority)
  646. {
  647. Clamp(priority, -3, 3);
  648. if(active() && T._priority!=priority)
  649. {
  650. T._priority=priority;
  651. #if HAS_THREADS
  652. #if WINDOWS
  653. ASSERT(THREAD_PRIORITY_LOWEST==-2 && THREAD_PRIORITY_HIGHEST==2);
  654. SetThreadPriority(_handle, (priority<-2) ? THREAD_PRIORITY_IDLE : (priority>2) ? THREAD_PRIORITY_TIME_CRITICAL : priority);
  655. #else
  656. //LogN(S+PriorityBase+' '+(PriorityBase+PriorityRange));
  657. sched_param param; param.sched_priority=PriorityBase+PriorityRange*(priority+3)/6; // div by 6 because "priority==3" should give max
  658. pthread_setschedparam(_handle, PRIORITY_POLICY, &param);
  659. #endif
  660. #endif
  661. }
  662. }
  663. void Thread::kill()
  664. {
  665. if(created())
  666. {
  667. if(active())
  668. {
  669. #if WINDOWS_NEW || ANDROID // WINDOWS_NEW doesn't have 'TerminateThread', ANDROID doesn't have 'pthread_cancel'
  670. stop(); resume(); wait();
  671. #elif WINDOWS_OLD
  672. TerminateThread(_handle, 0);
  673. #else
  674. pthread_cancel(_handle);
  675. #endif
  676. }
  677. #if WINDOWS
  678. CloseHandle(_handle);
  679. #else
  680. //pthread_join(_handle, null); this has to be called for joinable threads to release system resources, but we use detached, which auto-release when threads finish
  681. _handle=NULL;
  682. //_finished.off(); don't do this here, instead do it in 'create', to give other threads waiting for this one longer chance to detect finish
  683. #endif
  684. _resume.off();
  685. zero();
  686. }
  687. }
  688. Bool Thread::wait(Int time)
  689. {
  690. if(active())
  691. {
  692. #if !HAS_THREADS
  693. if(time<0) // infinite wait
  694. {
  695. for(; active(); )EmulatedThreads.update();
  696. }else
  697. if(!time)EmulatedThreads.update();else // single step
  698. {
  699. // timed wait
  700. for(UInt start=Time.curTimeMs(); ; )
  701. {
  702. EmulatedThreads.update();
  703. if(!active() || (Time.curTimeMs()-start)>=time)break; // this code was tested OK for UInt overflow
  704. }
  705. }
  706. return !active();
  707. #elif WINDOWS
  708. return WaitForSingleObject(_handle, (time>=0) ? time : INFINITE)!=WAIT_TIMEOUT;
  709. #else
  710. return _finished.wait(time);
  711. #endif
  712. }
  713. return true;
  714. }
  715. void Thread::del(Int time)
  716. {
  717. if(created())
  718. {
  719. if(active())
  720. {
  721. stop ( ); if(_priority<0)priority(0);
  722. resume( );
  723. wait (time);
  724. }
  725. kill();
  726. }
  727. }
  728. Bool Thread::create(Bool func(Thread &thread), Ptr user, Int priority, Bool paused, C Str8 &name)
  729. {
  730. del(); if(!func)return true;
  731. T._active =true;
  732. T._func =func;
  733. T. user =user;
  734. T._want_pause=paused;
  735. //T._sleep =Max(0, sleep);
  736. #if !HAS_THREADS
  737. T._handle=pthread_t(this); // set dummy handle, which is used in 'created' method
  738. T._paused=paused; if(!paused)EmulatedThreads.include(T);
  739. #elif WINDOWS
  740. //if(_handle= CreateThread(null, STACK_SIZE, ThreadFunc, this, CREATE_SUSPENDED, null)) this causes memory leaks on some systems
  741. if(_handle=(HANDLE)_beginthreadex(null, STACK_SIZE, ThreadFunc, this, CREATE_SUSPENDED, null)) // threads are always created in suspended mode on Windows to access the 'handle' first and only then resume the thread
  742. #else
  743. pthread_attr_t attr;
  744. Bool ok=false;
  745. if(!pthread_attr_init(&attr))
  746. {
  747. #if APPLE
  748. T._name=name; // keep for Apple because on that platform we can set the name only when called from that thread
  749. #endif
  750. if(!pthread_attr_setstacksize (&attr, STACK_SIZE))
  751. if(!pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)) // create as detached to auto-release system resources without having to call 'pthread_join'
  752. {
  753. _finished.off(); // reset here instead when deleting to give other threads a longer chance to wait for finish
  754. if(!pthread_create(&_handle, &attr, ThreadFunc, this))ok=true;
  755. }
  756. pthread_attr_destroy(&attr);
  757. }
  758. if(ok)
  759. #endif
  760. {
  761. #if !APPLE
  762. SetThreadName(name, id()); // for non-Apple platforms we can change the name from any thread
  763. #endif
  764. T.priority(priority);
  765. #if WINDOWS
  766. ResumeThread(_handle); // CREATE_SUSPENDED was used so resume it
  767. #endif
  768. return true;
  769. }
  770. zero(); return false;
  771. }
  772. Thread::Thread()
  773. #if !WINDOWS
  774. : _finished(false) // disable 'auto_off' to allow multiple threads waiting for this one to be woken up
  775. #endif
  776. {
  777. zero();
  778. }
  779. Thread::Thread(Bool func(Thread &thread), Ptr user, Int priority, Bool paused)
  780. {
  781. zero(); create(func, user, priority, paused);
  782. }
  783. /******************************************************************************/
  784. // THREADS
  785. /******************************************************************************/
  786. enum THREADS_FUNC_MODE : Byte
  787. {
  788. TF_INDEX,
  789. TF_DATA,
  790. TF_DATA_PTR,
  791. TF_MEMB,
  792. TF_MEMX,
  793. };
  794. /******************************************************************************/
  795. INLINE Bool Threads::callsLeft() {return _calls.elms()>0;} // this check is OK, we don't need to compare against '_calls_pos' because in all places we call 'checkEnd' which will clear '_calls' if we reached the end
  796. /******************************************************************************/
  797. static Bool ThreadsFunc(Threads::ThreadEx &thread)
  798. {
  799. Threads &threads=*(Threads*)thread.user;
  800. Int thread_index=threads._threads.index(&thread);
  801. // process priority elements first
  802. Int processed=0; // !! in first part set this only to priority elements (from 'process' methods), because it will be added to '_processed' !!
  803. if(threads._left>0) // have elements to process
  804. {
  805. process:
  806. Int index=AtomicDec(threads._left);
  807. if( index>0)
  808. {
  809. index=threads._elms-index; // process elements starting from zero index
  810. Ptr data;
  811. switch(threads._func_mode)
  812. {
  813. case TF_INDEX : data= Ptr(index) ; break;
  814. case TF_DATA : data= (Byte*)threads._func_data+ index*threads._elm_size ; break;
  815. case TF_DATA_PTR: data=*(Ptr*)((Byte*)threads._func_data+ index*threads._elm_size); break;
  816. case TF_MEMB : data= (*threads._func_memb)[index] ; break;
  817. case TF_MEMX : data= (*threads._func_memx)[index] ; break;
  818. }
  819. threads._func(data, threads._func_user, thread_index);
  820. //if(thread.wantStop())return false; not needed since '_left' is zeroed at the start of 'Threads.del'
  821. processed++;
  822. goto process; // proceed to next element
  823. }
  824. if(processed)
  825. if(AtomicAdd(threads._processed, processed) // set what was processed by this thread, do this in just one atomic operation (and not many times inside the loop)
  826. +processed>=threads._elms) // add the 'AtomicAdd' result (which is the old value of 'threads._processed') and 'processed' to get the total number of elements processed at this time
  827. threads._finished.on(); // notify of finished processing only after all elements have been finished (to avoid overhead of finished notification on every thread, and the waiting thread being woken up multiple times)
  828. }
  829. // process queued elements next
  830. if(threads.callsLeft())
  831. {
  832. SyncLockerEx locker(threads._lock_calls);
  833. if(threads.callsLeft())
  834. {
  835. if(threads._ordered){thread.call=threads._calls[threads._calls_pos++]; threads.checkEnd();}
  836. else {thread.call=threads._calls.last(); threads._calls.removeLast();}
  837. // can't check 'threads._waiting' here during lock, because 'Threads.wait' could happen later, during the 'call.call'
  838. locker.off();
  839. thread.call.call (thread_index);
  840. thread.call.clear(); // clear as soon as finished processing so other threads checking for this can see that it's now empty
  841. if(Int waiting=AtomicGet(threads._waiting))threads._queued_finished+=waiting; // notify of finished processing only if there are any waiting threads (to avoid overhead of +=), this is needed for 'Threads.wait', this is not perfect because there is a small possibility that when 2 threads are waiting, 1 of them would quickly consume both notifications
  842. //if(thread.wantStop())return false; not needed since we're about to return anyway
  843. processed++;
  844. }
  845. }
  846. if(!processed)threads._wake_threads.wait(); // sleep only if we haven't processed any data (this is important for 2 reasons: #1 to avoid overhead of the 'wait' function, #2 in a scenario where we have big number of calls queued, always calling 'wait' could potentially quickly consume all requested wake ups on a single thread preventing other threads from waking up), sleep until any thread was requested to wake up
  847. return true;
  848. }
  849. /******************************************************************************/
  850. void Threads::del()
  851. {
  852. AtomicSet(_left, 0); // this will stop processing priority elements so we don't have to check 'wantStop' every time
  853. REPAO(_threads).stop(); // notify all threads that we want to stop
  854. _wake_threads+=_threads.elms(); // wake up all of them in case they are sleeping
  855. _threads.del(); // delete threads before anything else
  856. _calls .del(); _calls_pos=0;
  857. _func=null;
  858. _func_data=null;
  859. _func_memb=null;
  860. _func_memx=null;
  861. _func_user=null;
  862. _func_mode=TF_INDEX;
  863. _ordered=false;
  864. _left=_processed=_elms=0;
  865. _elm_size=0;
  866. _waiting=0;
  867. }
  868. void Threads::create(Bool ordered, Int threads, Int priority, C Str8 &name)
  869. {
  870. del();
  871. T._ordered=ordered;
  872. #if !HAS_THREADS
  873. Clamp(threads, 0, 1); // there's no point in creating more threads than 1 since they'd all be processed on the main thread either way
  874. #endif
  875. T._threads.setNum(threads);
  876. ASSERT_BASE_EXTENDED<Thread, ThreadEx>();
  877. REPAO(_threads).create((Bool(*)(Thread&))ThreadsFunc, this, priority, false, name.is() ? name+i : name);
  878. }
  879. /******************************************************************************/
  880. Bool Threads::wantStop ()C {return _threads.elms() ? _threads[0].wantStop() : false;}
  881. Int Threads::activeThreads()C
  882. {
  883. Int paused=0; REPA(_threads)paused+=_threads[i].wantPause();
  884. return threads()-paused;
  885. }
  886. Threads& Threads::activeThreads(Int active)
  887. {
  888. Clamp(active, 0, threads());
  889. for(Int i=active; i<threads(); i++)_threads[i].pause (); // pause unwanted threads first
  890. REP(active )_threads[i].resume(); // resume wanted threads after
  891. return T;
  892. }
  893. Int Threads::priority( )C {return _threads.elms() ? _threads[0].priority() : 0;}
  894. Threads& Threads::priority(Int priority) {REPAO( _threads).priority(priority); return T;}
  895. /******************************************************************************/
  896. // !! 'elm_index' MUST BE 'IntPtr' and not 'Int' because we're casting to '_func' of 'Ptr' type !!
  897. void Threads::_process(Int elms, void func(IntPtr elm_index, Ptr user, Int thread_index), Ptr user, Int max_threads, Bool allow_processing_on_this_thread)
  898. {
  899. #if HAS_THREADS
  900. if (max_threads<0)max_threads=threads();else MIN(max_threads, threads()+allow_processing_on_this_thread);
  901. MIN(max_threads, elms);
  902. if (max_threads>1)
  903. {
  904. SyncLocker locker(_lock_process); // this allows multiple 'process' calls on multiple threads
  905. _func_mode=TF_INDEX; _func=(void (*)(Ptr data, Ptr user, Int thread_index))func; _func_user=user; _elms=elms; AtomicSet(_left, _elms); // set '_left' as last
  906. // wake up threads
  907. _wake_threads+=max_threads-allow_processing_on_this_thread;
  908. // process on this thread
  909. if(allow_processing_on_this_thread)
  910. {
  911. Int processed=0;
  912. process:
  913. Int index=AtomicDec(_left);
  914. if( index>0)
  915. {
  916. index=_elms-index; // process elements starting from zero index
  917. func(index, user, threads()); // set 'thread_index' to be "last_thread_index+1", have to use 'threads' because we don't know which threads (with what indexes) are going to wake up, but we have to make sure they won't overlap
  918. processed++;
  919. goto process; // proceed to next element
  920. }
  921. if(processed)AtomicAdd(_processed, processed); // set what was processed by this thread, do this in just one atomic operation (not inside the loop)
  922. }
  923. // wait until all finished
  924. for(; _processed<_elms; )_finished.wait();
  925. _elms=0; _processed=0; // other members don't need to be cleared
  926. }else
  927. {
  928. #endif
  929. FREP(elms)func(i, user, 0);
  930. #if HAS_THREADS
  931. }
  932. #endif
  933. }
  934. void Threads::_process(Ptr data, Int elms, Int elm_size, void func(Ptr data, Ptr user, Int thread_index), Ptr user, Int max_threads, Bool allow_processing_on_this_thread, Bool data_ptr)
  935. {
  936. #if HAS_THREADS
  937. if (max_threads<0)max_threads=threads();else MIN(max_threads, threads()+allow_processing_on_this_thread);
  938. MIN(max_threads, elms);
  939. if (max_threads>1)
  940. {
  941. SyncLocker locker(_lock_process); // this allows multiple 'process' calls on multiple threads
  942. _func_mode=(data_ptr ? TF_DATA_PTR : TF_DATA); _func=func; _func_data=data; _func_user=user; _elm_size=elm_size; _elms=elms; AtomicSet(_left, _elms); // set '_left' as last
  943. // wake up threads
  944. _wake_threads+=max_threads-allow_processing_on_this_thread;
  945. // process on this thread
  946. if(allow_processing_on_this_thread)
  947. {
  948. Int processed=0;
  949. process:
  950. Int index=AtomicDec(_left);
  951. if( index>0)
  952. {
  953. index=_elms-index; // process elements starting from zero index
  954. Ptr d=(Byte*)data+index*elm_size; if(data_ptr)d=*(Ptr*)d;
  955. func(d, user, threads()); // set 'thread_index' to be "last_thread_index+1", have to use 'threads' because we don't know which threads (with what indexes) are going to wake up, but we have to make sure they won't overlap
  956. processed++;
  957. goto process; // proceed to next element
  958. }
  959. if(processed)AtomicAdd(_processed, processed); // set what was processed by this thread, do this in just one atomic operation (not inside the loop)
  960. }
  961. // wait until all finished
  962. for(; _processed<_elms; )_finished.wait();
  963. _elms=0; _processed=0; // other members don't need to be cleared
  964. }else
  965. {
  966. #endif
  967. FREP(elms)
  968. {
  969. Ptr d=(Byte*)data+i*elm_size; if(data_ptr)d=*(Ptr*)d;
  970. func(d, user, 0);
  971. }
  972. #if HAS_THREADS
  973. }
  974. #endif
  975. }
  976. void Threads::_process(_Memb &data, void func(Ptr data, Ptr user, Int thread_index), Ptr user, Int max_threads, Bool allow_processing_on_this_thread)
  977. {
  978. #if HAS_THREADS
  979. if (max_threads<0)max_threads=threads();else MIN(max_threads, threads()+allow_processing_on_this_thread);
  980. MIN(max_threads, data.elms());
  981. if (max_threads>1)
  982. {
  983. SyncLocker locker(_lock_process); // this allows multiple 'process' calls on multiple threads
  984. _func_mode=TF_MEMB; _func=func; _func_memb=&data; _func_user=user; _elms=data.elms(); AtomicSet(_left, _elms); // set '_left' as last
  985. // wake up threads
  986. _wake_threads+=max_threads-allow_processing_on_this_thread;
  987. // process on this thread
  988. if(allow_processing_on_this_thread)
  989. {
  990. Int processed=0;
  991. process:
  992. Int index=AtomicDec(_left);
  993. if( index>0)
  994. {
  995. index=_elms-index; // process elements starting from zero index
  996. func(data[index], user, threads()); // set 'thread_index' to be "last_thread_index+1", have to use 'threads' because we don't know which threads (with what indexes) are going to wake up, but we have to make sure they won't overlap
  997. processed++;
  998. goto process; // proceed to next element
  999. }
  1000. if(processed)AtomicAdd(_processed, processed); // set what was processed by this thread, do this in just one atomic operation (not inside the loop)
  1001. }
  1002. // wait until all finished
  1003. for(; _processed<_elms; )_finished.wait();
  1004. _elms=0; _processed=0; // other members don't need to be cleared
  1005. }else
  1006. {
  1007. #endif
  1008. FREPA(data)func(data[i], user, 0);
  1009. #if HAS_THREADS
  1010. }
  1011. #endif
  1012. }
  1013. void Threads::_process(_Memx &data, void func(Ptr data, Ptr user, Int thread_index), Ptr user, Int max_threads, Bool allow_processing_on_this_thread)
  1014. {
  1015. #if HAS_THREADS
  1016. if (max_threads<0)max_threads=threads();else MIN(max_threads, threads()+allow_processing_on_this_thread);
  1017. MIN(max_threads, data.elms());
  1018. if (max_threads>1)
  1019. {
  1020. SyncLocker locker(_lock_process); // this allows multiple 'process' calls on multiple threads
  1021. _func_mode=TF_MEMX; _func=func; _func_memx=&data; _func_user=user; _elms=data.elms(); AtomicSet(_left, _elms); // set '_left' as last
  1022. // wake up threads
  1023. _wake_threads+=max_threads-allow_processing_on_this_thread;
  1024. // process on this thread
  1025. if(allow_processing_on_this_thread)
  1026. {
  1027. Int processed=0;
  1028. process:
  1029. Int index=AtomicDec(_left);
  1030. if( index>0)
  1031. {
  1032. index=_elms-index; // process elements starting from zero index
  1033. func(data[index], user, threads()); // set 'thread_index' to be "last_thread_index+1", have to use 'threads' because we don't know which threads (with what indexes) are going to wake up, but we have to make sure they won't overlap
  1034. processed++;
  1035. goto process; // proceed to next element
  1036. }
  1037. if(processed)AtomicAdd(_processed, processed); // set what was processed by this thread, do this in just one atomic operation (not inside the loop)
  1038. }
  1039. // wait until all finished
  1040. for(; _processed<_elms; )_finished.wait();
  1041. _elms=0; _processed=0; // other members don't need to be cleared
  1042. }else
  1043. {
  1044. #endif
  1045. FREPA(data)func(data[i], user, 0);
  1046. #if HAS_THREADS
  1047. }
  1048. #endif
  1049. }
  1050. /******************************************************************************/
  1051. void Threads::free() // !! assumes that '_lock_calls' is locked !!
  1052. {
  1053. if(_calls_pos>=1024) // this will occur only for '_ordered', 1024 value was chosen so that calls are moved to the start of the container only after a decent portion has been processed, this is so that they are not moved everytime because that would cause significant slow down if there are many calls in the container
  1054. {
  1055. _calls.removeNum(0, _calls_pos, true); // remove all that have been processed
  1056. _calls_pos=0;
  1057. }
  1058. }
  1059. void Threads::checkEnd() // !! assumes that '_lock_calls' is locked !!
  1060. {
  1061. if(_calls_pos>=_calls.elms()){_calls.clear(); _calls_pos=0;} // if none are left, then clear, this is important because "callsLeft()" can be simplified thanks to this
  1062. }
  1063. /******************************************************************************/
  1064. // !! 'elm_index' MUST BE IntPtr and not Int because we're casting to '_func' of Ptr type !!
  1065. void Threads::_queue(Int elms, void func(IntPtr elm_index, Ptr user, Int thread_index), Ptr user)
  1066. {
  1067. if(elms>0)
  1068. {
  1069. if(_threads.elms())
  1070. {
  1071. {
  1072. SyncLocker locker(_lock_calls);
  1073. free();
  1074. Int start=_calls.addNum(elms); REP(elms)_calls[start+i]._set(Ptr(i), (void (*)(Ptr data, Ptr user, Int thread_index))func, user);
  1075. }
  1076. _wake_threads+=Min(_threads.elms(), elms);
  1077. }else
  1078. FREP(elms) // process in order
  1079. {
  1080. func(i, user, 0); // if have no threads then call on this thread
  1081. }
  1082. }
  1083. }
  1084. void Threads::_queue(Ptr data, void func(Ptr data, Ptr user, Int thread_index), Ptr user)
  1085. {
  1086. if(_threads.elms())
  1087. {
  1088. {
  1089. SyncLocker locker(_lock_calls);
  1090. free();
  1091. _calls.New()._set(data, func, user);
  1092. }
  1093. _wake_threads++;
  1094. }else
  1095. {
  1096. func(data, user, 0); // if have no threads then call on this thread
  1097. }
  1098. }
  1099. Threads& Threads::queue(C MemPtr<Call> &calls)
  1100. {
  1101. if(calls.elms())
  1102. {
  1103. if(_threads.elms())
  1104. {
  1105. {
  1106. SyncLocker locker(_lock_calls);
  1107. free();
  1108. FREPA(calls)_calls.add(calls[i]);
  1109. }
  1110. _wake_threads+=Min(_threads.elms(), calls.elms());
  1111. }else
  1112. {
  1113. FREPAO(calls).call(0); // if have no threads then call on this thread
  1114. }
  1115. }
  1116. return T;
  1117. }
  1118. /******************************************************************************/
  1119. Threads& Threads::cancel()
  1120. {
  1121. if(_calls.elms())
  1122. {
  1123. SyncLocker locker(_lock_calls);
  1124. _calls.clear(); _calls_pos=0;
  1125. if(_waiting)_queued_finished+=_waiting; // access '_waiting' only during lock !! if there's any thread waiting then notify of potential finish
  1126. }
  1127. return T;
  1128. }
  1129. Int Threads::_cancel(void func(Ptr data, Ptr user, Int thread_index))
  1130. {
  1131. Int canceled=0; if(_calls.elms())
  1132. {
  1133. SyncLocker locker(_lock_calls);
  1134. for(Int i=_calls.elms(); --i>=_calls_pos; )if(_calls[i].func==func){_calls.remove(i, _ordered); canceled++;}
  1135. if(canceled)
  1136. {
  1137. checkEnd();
  1138. if(_waiting)_queued_finished+=_waiting; // access '_waiting' only during lock !! if there's any thread waiting then notify of potential finish
  1139. }
  1140. }
  1141. return canceled;
  1142. }
  1143. Int Threads::_cancel(void func(Ptr data, Ptr user, Int thread_index), Ptr user)
  1144. {
  1145. Int canceled=0; if(_calls.elms())
  1146. {
  1147. SyncLocker locker(_lock_calls);
  1148. for(Int i=_calls.elms(); --i>=_calls_pos; )if(_calls[i].isFuncUser(func, user)){_calls.remove(i, _ordered); canceled++;}
  1149. if(canceled)
  1150. {
  1151. checkEnd();
  1152. if(_waiting)_queued_finished+=_waiting; // access '_waiting' only during lock !! if there's any thread waiting then notify of potential finish
  1153. }
  1154. }
  1155. return canceled;
  1156. }
  1157. Int Threads::_cancel(Ptr data, void func(Ptr data, Ptr user, Int thread_index), Ptr user)
  1158. {
  1159. Int canceled=0; if(_calls.elms())
  1160. {
  1161. Call call(data, func, user);
  1162. SyncLocker locker(_lock_calls);
  1163. for(Int i=_calls.elms(); --i>=_calls_pos; )if(_calls[i]==call){_calls.remove(i, _ordered); canceled++;}
  1164. if(canceled)
  1165. {
  1166. checkEnd();
  1167. if(_waiting)_queued_finished+=_waiting; // access '_waiting' only during lock !! if there's any thread waiting then notify of potential finish
  1168. }
  1169. }
  1170. return canceled;
  1171. }
  1172. /******************************************************************************/
  1173. Threads& Threads::wait()
  1174. {
  1175. #if !SYNC_LOCK_SAFE
  1176. if(_lock_calls.created())
  1177. #endif
  1178. for(Bool not_added=true; ; )
  1179. {
  1180. //if(callsLeft())goto wait; // this can be outside of 'locker', but in that case it must be checked before '_threads'. It can't be outside anymore because we need to modify '_waiting' during lock
  1181. { // braces to create scope for 'locker' !! we can modify '_waiting' only here !!
  1182. SyncLocker locker(_lock_calls);
  1183. _waiting+=not_added; not_added=false; // add this 'wait' call to the waiting list (if not yet added), have to do this here before checking '_threads[i].call' to make sure that thread processing function will already have this call's '_waiting' after processing the 'thread.call.call' because that one isn't covered by '_lock_calls' lock
  1184. if(callsLeft())goto wait;
  1185. REPA(_threads)if(_threads[i].call.is())goto wait;
  1186. _waiting--; // _waiting-=added; if haven't found then remove from the waiting list (if added)
  1187. }
  1188. break;
  1189. wait:
  1190. _queued_finished.wait(); // wait when 'locker' is off !!
  1191. }
  1192. return T;
  1193. }
  1194. void Threads::_wait(void func(Ptr data, Ptr user, Int thread_index))
  1195. {
  1196. #if !SYNC_LOCK_SAFE
  1197. if(_lock_calls.created())
  1198. #endif
  1199. for(Bool not_added=true; ; )
  1200. {
  1201. { // braces to create scope for 'locker' !! we can modify '_waiting' only here !!
  1202. SyncLocker locker(_lock_calls);
  1203. _waiting+=not_added; not_added=false; // add this 'wait' call to the waiting list (if not yet added), have to do this here before checking '_threads[i].call' to make sure that thread processing function will already have this call's '_waiting' after processing the 'thread.call.call' because that one isn't covered by '_lock_calls' lock
  1204. for(Int i=_calls.elms(); --i>=_calls_pos; )if( _calls[i]. func==func)goto wait;
  1205. REPA(_threads)if(_threads[i].call.func==func)goto wait;
  1206. _waiting--; // _waiting-=added; if haven't found then remove from the waiting list (if added)
  1207. }
  1208. break;
  1209. wait:
  1210. _queued_finished.wait(); // wait when 'locker' is off !!
  1211. }
  1212. }
  1213. void Threads::_wait(void func(Ptr data, Ptr user, Int thread_index), Ptr user)
  1214. {
  1215. #if !SYNC_LOCK_SAFE
  1216. if(_lock_calls.created())
  1217. #endif
  1218. for(Bool not_added=true; ; )
  1219. {
  1220. { // braces to create scope for 'locker' !! we can modify '_waiting' only here !!
  1221. SyncLocker locker(_lock_calls);
  1222. _waiting+=not_added; not_added=false; // add this 'wait' call to the waiting list (if not yet added), have to do this here before checking '_threads[i].call' to make sure that thread processing function will already have this call's '_waiting' after processing the 'thread.call.call' because that one isn't covered by '_lock_calls' lock
  1223. for(Int i=_calls.elms(); --i>=_calls_pos; )if( _calls[i]. isFuncUser(func, user))goto wait;
  1224. REPA(_threads)if(_threads[i].call.isFuncUser(func, user))goto wait;
  1225. _waiting--; // _waiting-=added; if haven't found then remove from the waiting list (if added)
  1226. }
  1227. break;
  1228. wait:
  1229. _queued_finished.wait(); // wait when 'locker' is off !!
  1230. }
  1231. }
  1232. void Threads::_wait(Ptr data, void func(Ptr data, Ptr user, Int thread_index), Ptr user)
  1233. {
  1234. #if !SYNC_LOCK_SAFE
  1235. if(_lock_calls.created())
  1236. #endif
  1237. {
  1238. Call call(data, func, user);
  1239. for(Bool not_added=true; ; )
  1240. {
  1241. { // braces to create scope for 'locker' !! we can modify '_waiting' only here !!
  1242. SyncLocker locker(_lock_calls);
  1243. _waiting+=not_added; not_added=false; // add this 'wait' call to the waiting list (if not yet added), have to do this here before checking '_threads[i].call' to make sure that thread processing function will already have this call's '_waiting' after processing the 'thread.call.call' because that one isn't covered by '_lock_calls' lock
  1244. for(Int i=_calls.elms(); --i>=_calls_pos; )if( _calls[i] ==call)goto wait;
  1245. REPA(_threads)if(_threads[i].call==call)goto wait;
  1246. _waiting--; // _waiting-=added; if haven't found then remove from the waiting list (if added)
  1247. }
  1248. break;
  1249. wait:
  1250. _queued_finished.wait(); // wait when 'locker' is off !!
  1251. }
  1252. }
  1253. }
  1254. /******************************************************************************/
  1255. Int Threads::queued()C
  1256. {
  1257. if(_threads.elms())
  1258. {
  1259. SyncLocker locker(_lock_calls);
  1260. Int queued=_calls.elms()-_calls_pos; REPA(_threads)queued+=_threads[i].call.is();
  1261. return queued;
  1262. }
  1263. return 0;
  1264. }
  1265. Int Threads::_queued(void func(Ptr data, Ptr user, Int thread_index))C
  1266. {
  1267. Int queued=0; if(_threads.elms())
  1268. {
  1269. SyncLocker locker(_lock_calls);
  1270. for(Int i=_calls.elms(); --i>=_calls_pos; )if(_calls [i]. func==func)queued++;
  1271. REPA(_threads)if(_threads[i].call.func==func)queued++;
  1272. }
  1273. return queued;
  1274. }
  1275. Int Threads::_queued(void func(Ptr data, Ptr user, Int thread_index), Ptr user)C
  1276. {
  1277. Int queued=0; if(_threads.elms())
  1278. {
  1279. SyncLocker locker(_lock_calls);
  1280. for(Int i=_calls.elms(); --i>=_calls_pos; )if(_calls [i]. isFuncUser(func, user))queued++;
  1281. REPA(_threads)if(_threads[i].call.isFuncUser(func, user))queued++;
  1282. }
  1283. return queued;
  1284. }
  1285. Int Threads::_queued(Ptr data, void func(Ptr data, Ptr user, Int thread_index), Ptr user)C
  1286. {
  1287. Int queued=0; if(_threads.elms())
  1288. {
  1289. Call call(data, func, user);
  1290. SyncLocker locker(_lock_calls);
  1291. for(Int i=_calls.elms(); --i>=_calls_pos; )if(_calls [i] ==call)queued++;
  1292. REPA(_threads)if(_threads[i].call==call)queued++;
  1293. }
  1294. return queued;
  1295. }
  1296. /******************************************************************************/
  1297. Bool Threads::busy()C
  1298. {
  1299. if(_calls .elms())return true;
  1300. if(_threads.elms())
  1301. {
  1302. SyncLocker locker(_lock_calls);
  1303. REPA(_threads)if(_threads[i].call.is())return true;
  1304. }
  1305. return false;
  1306. }
  1307. Bool Threads::_busy(void func(Ptr data, Ptr user, Int thread_index))C
  1308. {
  1309. if(_threads.elms())
  1310. {
  1311. SyncLocker locker(_lock_calls);
  1312. for(Int i=_calls.elms(); --i>=_calls_pos; )if(_calls [i]. func==func)return true;
  1313. REPA(_threads)if(_threads[i].call.func==func)return true;
  1314. }
  1315. return false;
  1316. }
  1317. Bool Threads::_busy(void func(Ptr data, Ptr user, Int thread_index), Ptr user)C
  1318. {
  1319. if(_threads.elms())
  1320. {
  1321. SyncLocker locker(_lock_calls);
  1322. for(Int i=_calls.elms(); --i>=_calls_pos; )if(_calls [i]. isFuncUser(func, user))return true;
  1323. REPA(_threads)if(_threads[i].call.isFuncUser(func, user))return true;
  1324. }
  1325. return false;
  1326. }
  1327. Bool Threads::_busy(Ptr data, void func(Ptr data, Ptr user, Int thread_index), Ptr user)C
  1328. {
  1329. if(_threads.elms())
  1330. {
  1331. Call call(data, func, user);
  1332. SyncLocker locker(_lock_calls);
  1333. for(Int i=_calls.elms(); --i>=_calls_pos; )if(_calls [i] ==call)return true;
  1334. REPA(_threads)if(_threads[i].call==call)return true;
  1335. }
  1336. return false;
  1337. }
  1338. /******************************************************************************/
  1339. // MULTI-THREADED FUNCTION CALLER
  1340. /******************************************************************************/
  1341. struct MTFC
  1342. {
  1343. Int index, // index of element to process next
  1344. elms ; // number of elements to process
  1345. void (*func)(Int elm_index, Ptr user, Int thread_index);
  1346. Ptr user;
  1347. MemtN<Thread, 16-1> threads; // one less because we will make calls on the caller thread as well
  1348. static Bool Func(Thread &thread)
  1349. {
  1350. MTFC &mtfc=*(MTFC*)thread.user;
  1351. for(Int thread_index=mtfc.threads.index(&thread); ; )
  1352. {
  1353. Int i=AtomicInc(mtfc.index);
  1354. if(InRange(i, mtfc.elms))mtfc.func(i, mtfc.user, thread_index);else break;
  1355. }
  1356. return false;
  1357. }
  1358. MTFC(Int elms, void func(Int elm_index, Ptr user, Int thread_index), Ptr user, Int threads)
  1359. {
  1360. T.index=0;
  1361. T.elms =elms;
  1362. T.func =func;
  1363. T.user =user;
  1364. T.threads.setNum(Min(elms, threads)-1); // allocate all threads at start, because they need to be in constant memory, allocate one less, because we will make calls on this thread as well
  1365. REPAO(T.threads).create(Func, this);
  1366. // make calls on this thread
  1367. for(Int thread_index=T.threads.elms(); ; )
  1368. {
  1369. Int i=AtomicInc(index);
  1370. if(InRange(i, elms))func(i, user, thread_index);else break;
  1371. }
  1372. // wait for other threads
  1373. REPAO(T.threads).wait();
  1374. }
  1375. };
  1376. void MultiThreadedCall(Int elms, void func(Int elm_index, Ptr user, Int thread_index), Ptr user, Int threads)
  1377. {
  1378. #if HAS_THREADS
  1379. if(elms>1 && threads>1){MTFC(elms, func, user, threads);}else
  1380. #endif
  1381. FREP(elms) func(i, user, 0 );
  1382. }
  1383. /******************************************************************************/
  1384. struct MTFC_Ptr // MTFC for Ptr
  1385. {
  1386. Byte *data;
  1387. Int elm_size;
  1388. void (*func)(Ptr data, Ptr user, Int thread_index);
  1389. Ptr user;
  1390. MTFC_Ptr(Ptr data, Int elm_size, void func(Ptr data, Ptr user, Int thread_index), Ptr user) {T.data=(Byte*)data; T.elm_size=elm_size; T.func=func; T.user=user;}
  1391. static void Func (Int elm_index, MTFC_Ptr &mtfc, Int thread_index) {mtfc.func( mtfc.data+elm_index*mtfc.elm_size , mtfc.user, thread_index);}
  1392. static void FuncPtr(Int elm_index, MTFC_Ptr &mtfc, Int thread_index) {mtfc.func(*(Ptr*)(mtfc.data+elm_index*mtfc.elm_size), mtfc.user, thread_index);}
  1393. };
  1394. struct MTFC_Memb // MTFC for Memb
  1395. {
  1396. _Memb &data;
  1397. void (*func)(Ptr data, Ptr user, Int thread_index);
  1398. Ptr user;
  1399. MTFC_Memb(_Memb &data, void func(Ptr data, Ptr user, Int thread_index), Ptr user) : data(data) {T.func=func; T.user=user;}
  1400. static void Func (Int elm_index, MTFC_Memb &mtfc, Int thread_index) {mtfc.func( mtfc.data[elm_index] , mtfc.user, thread_index);}
  1401. static void FuncPtr(Int elm_index, MTFC_Memb &mtfc, Int thread_index) {mtfc.func(*(Ptr*)(mtfc.data[elm_index]), mtfc.user, thread_index);}
  1402. };
  1403. struct MTFC_Memx // MTFC for Memx
  1404. {
  1405. _Memx &data;
  1406. void (*func)(Ptr data, Ptr user, Int thread_index);
  1407. Ptr user;
  1408. MTFC_Memx(_Memx &data, void func(Ptr data, Ptr user, Int thread_index), Ptr user) : data(data) {T.func=func; T.user=user;}
  1409. static void Func (Int elm_index, MTFC_Memx &mtfc, Int thread_index) {mtfc.func( mtfc.data[elm_index] , mtfc.user, thread_index);}
  1410. static void FuncPtr(Int elm_index, MTFC_Memx &mtfc, Int thread_index) {mtfc.func(*(Ptr*)(mtfc.data[elm_index]), mtfc.user, thread_index);}
  1411. };
  1412. void _MultiThreadedCall( Ptr data, Int elms, Int elm_size, void func(Ptr data, Ptr user, Int thread_index), Ptr user, Int threads, Bool data_ptr) {MTFC_Ptr mtfc(data, elm_size, func, user); MultiThreadedCall( elms , data_ptr ? *MTFC_Ptr ::FuncPtr : *MTFC_Ptr ::Func, mtfc, threads);}
  1413. void _MultiThreadedCall(_Memb &data, void func(Ptr data, Ptr user, Int thread_index), Ptr user, Int threads, Bool data_ptr) {MTFC_Memb mtfc(data, func, user); MultiThreadedCall(data.elms(), data_ptr ? *MTFC_Memb::FuncPtr : *MTFC_Memb::Func, mtfc, threads);}
  1414. void _MultiThreadedCall(_Memx &data, void func(Ptr data, Ptr user, Int thread_index), Ptr user, Int threads, Bool data_ptr) {MTFC_Memx mtfc(data, func, user); MultiThreadedCall(data.elms(), data_ptr ? *MTFC_Memx::FuncPtr : *MTFC_Memx::Func, mtfc, threads);}
  1415. /******************************************************************************/
  1416. // THREAD EMULATION
  1417. /******************************************************************************/
  1418. ThreadEmulation::ThreadEmulation() {_process_left=0; _process_type=_time=0;}
  1419. void ThreadEmulation::include(Thread &thread)
  1420. {
  1421. if(thread.sleep())
  1422. {
  1423. REPA(_delayed_threads)if(_delayed_threads[i].thread==&thread)return;
  1424. _delayed_threads.New().thread=&thread;
  1425. }else
  1426. {
  1427. _rt_threads.include(&thread);
  1428. }
  1429. }
  1430. void ThreadEmulation::exclude(Thread &thread)
  1431. {
  1432. if(thread.sleep())
  1433. {
  1434. REPA(_delayed_threads)if(_delayed_threads[i].thread==&thread)
  1435. {
  1436. if(_process_type==1 && i<_process_left)_process_left--; // if this element was going to be processed soon
  1437. _delayed_threads.remove(i, true);
  1438. break;
  1439. }
  1440. }else
  1441. {
  1442. REPA(_rt_threads)if(_rt_threads[i]==&thread)
  1443. {
  1444. if(_process_type==0 && i<_process_left)_process_left--; // if this element was going to be processed soon
  1445. _rt_threads.remove(i, true);
  1446. break;
  1447. }
  1448. }
  1449. }
  1450. void ThreadEmulation::update()
  1451. {
  1452. // real-time threads
  1453. for(_process_type=0, _process_left=_rt_threads.elms(); _process_left--; )
  1454. {
  1455. Thread &thread=*_rt_threads[_process_left]; // inside the function below, '_process_left' points to the element being processed right now
  1456. if(!thread._func(thread))
  1457. {
  1458. thread._active=false;
  1459. exclude(thread);
  1460. }
  1461. }
  1462. // delayed threads
  1463. UInt time=Time.curTimeMs(), delta=time-_time; // this code was tested OK for UInt overflow
  1464. if(delta>0)
  1465. {
  1466. _time=time;
  1467. for(_process_type=1, _process_left=_delayed_threads.elms(); _process_left--; )
  1468. {
  1469. DelayedThread &del_thread=_delayed_threads[_process_left]; // inside the function below, '_process_left' points to the element being processed right now
  1470. del_thread.waited+=delta;
  1471. Int sleep=del_thread.thread->sleep();
  1472. if(del_thread.waited>=sleep)
  1473. {
  1474. del_thread.waited-=sleep;
  1475. Thread &thread=*del_thread.thread; // keep a standalone reference in case 'del_thread' gets removed in the function
  1476. if(!thread._func(thread))
  1477. {
  1478. thread._active=false;
  1479. exclude(thread);
  1480. }
  1481. }
  1482. }
  1483. }
  1484. }
  1485. /******************************************************************************/
  1486. }
  1487. /******************************************************************************/