ThreadPool.cs 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. //
  2. // System.Threading.ThreadPool
  3. //
  4. // Author:
  5. // Patrik Torstensson
  6. // Dick Porter ([email protected])
  7. //
  8. // (C) Ximian, Inc. http://www.ximian.com
  9. // (C) Patrik Torstensson
  10. //
  11. using System;
  12. using System.Collections;
  13. namespace System.Threading {
  14. /// <summary> (Patrik T notes)
  15. /// This threadpool is focused on saving resources not giving max performance.
  16. ///
  17. /// Note, this class is not perfect but it works. ;-) Should also replace
  18. /// the queue with an internal one (performance)
  19. ///
  20. /// This class should also use a specialized queue to increase performance..
  21. /// </summary>
  22. ///
  23. public sealed class ThreadPool {
  24. internal struct ThreadPoolWorkItem {
  25. public WaitCallback _CallBack;
  26. public object _Context;
  27. }
  28. private int _ThreadTimeout;
  29. private long _MaxThreads;
  30. private long _CurrentThreads;
  31. private long _ThreadsInUse;
  32. private long _RequestInQueue;
  33. private long _ThreadCreateTriggerRequests;
  34. private Thread _MonitorThread;
  35. private Queue _RequestQueue;
  36. private ArrayList _Threads;
  37. private ManualResetEvent _DataInQueue;
  38. static ThreadPool _Threadpool;
  39. static ThreadPool() {
  40. _Threadpool = new ThreadPool();
  41. }
  42. private ThreadPool() {
  43. // 30 sec timeout default
  44. _ThreadTimeout = 30 * 1000;
  45. // Used to signal that there is data in the queue
  46. _DataInQueue = new ManualResetEvent(false);
  47. _Threads = ArrayList.Synchronized(new ArrayList());
  48. // Holds requests..
  49. _RequestQueue = Queue.Synchronized(new Queue(128));
  50. // TODO: This should be 2 x number of CPU:s in the box
  51. _MaxThreads = 16;
  52. _CurrentThreads = 0;
  53. _RequestInQueue = 0;
  54. _ThreadsInUse = 0;
  55. _ThreadCreateTriggerRequests = 5;
  56. // TODO: This temp starts one thread, remove this..
  57. CheckIfStartThread();
  58. // Keeps track of requests in the queue and increases the number of threads if needed
  59. // PT: Disabled - causes problems during shutdown
  60. //_MonitorThread = new Thread(new ThreadStart(MonitorThread));
  61. //_MonitorThread.Start();
  62. }
  63. internal void RemoveThread() {
  64. Interlocked.Decrement(ref _CurrentThreads);
  65. _Threads.Remove(Thread.CurrentThread);
  66. }
  67. internal void CheckIfStartThread() {
  68. bool bCreateThread = false;
  69. if (_CurrentThreads == 0) {
  70. bCreateThread = true;
  71. }
  72. if (( _MaxThreads == -1 || _CurrentThreads < _MaxThreads) &&
  73. _ThreadsInUse > 0 &&
  74. _RequestInQueue >= _ThreadCreateTriggerRequests) {
  75. bCreateThread = true;
  76. }
  77. if (bCreateThread) {
  78. Interlocked.Increment(ref _CurrentThreads);
  79. Thread Start = new Thread(new ThreadStart(WorkerThread));
  80. Start.IsThreadPoolThreadInternal = true;
  81. Start.IsBackground = true;
  82. Start.Start();
  83. _Threads.Add(Start);
  84. }
  85. }
  86. internal void AddItem(ref ThreadPoolWorkItem Item) {
  87. _RequestQueue.Enqueue(Item);
  88. if (Interlocked.Increment(ref _RequestInQueue) == 1) {
  89. _DataInQueue.Set();
  90. }
  91. }
  92. // Work Thread main function
  93. internal void WorkerThread() {
  94. bool bWaitForData = true;
  95. while (true) {
  96. if (bWaitForData) {
  97. if (!_DataInQueue.WaitOne(_ThreadTimeout, false)) {
  98. // Keep one thread running
  99. if (_CurrentThreads > 1) {
  100. // timeout
  101. RemoveThread();
  102. return;
  103. }
  104. continue;
  105. }
  106. }
  107. Interlocked.Increment(ref _ThreadsInUse);
  108. // TODO: Remove when we know how to stop the watch thread
  109. CheckIfStartThread();
  110. try {
  111. ThreadPoolWorkItem oItem = (ThreadPoolWorkItem) _RequestQueue.Dequeue();
  112. if (Interlocked.Decrement(ref _RequestInQueue) == 0) {
  113. _DataInQueue.Reset();
  114. }
  115. oItem._CallBack(oItem._Context);
  116. }
  117. catch (InvalidOperationException) {
  118. // Queue empty
  119. bWaitForData = true;
  120. }
  121. catch (ThreadAbortException) {
  122. // We will leave here.. (thread abort can't be handled)
  123. RemoveThread();
  124. }
  125. finally {
  126. Interlocked.Decrement(ref _ThreadsInUse);
  127. }
  128. }
  129. }
  130. /* This is currently not in use
  131. internal void MonitorThread() {
  132. while (true) {
  133. if (_DataInQueue.WaitOne ()) {
  134. CheckIfStartThread();
  135. }
  136. Thread.Sleep(500);
  137. }
  138. }
  139. */
  140. internal bool QueueUserWorkItemInternal(WaitCallback callback) {
  141. return QueueUserWorkItem(callback, null);
  142. }
  143. internal bool QueueUserWorkItemInternal(WaitCallback callback, object context) {
  144. ThreadPoolWorkItem Item = new ThreadPoolWorkItem();
  145. Item._CallBack = callback;
  146. Item._Context = context;
  147. AddItem(ref Item);
  148. // LAMESPEC: Return value? should use exception here if anything goes wrong
  149. return true;
  150. }
  151. public static bool BindHandle(IntPtr osHandle) {
  152. throw new NotSupportedException("This is a win32 specific method, not supported Mono");
  153. }
  154. public static bool QueueUserWorkItem(WaitCallback callback) {
  155. return _Threadpool.QueueUserWorkItemInternal(callback);
  156. }
  157. public static bool QueueUserWorkItem(WaitCallback callback, object state) {
  158. return _Threadpool.QueueUserWorkItemInternal(callback, state);
  159. }
  160. public static bool UnsafeQueueUserWorkItem(WaitCallback callback, object state) {
  161. return _Threadpool.QueueUserWorkItemInternal(callback, state);
  162. }
  163. static TimeSpan GetTSFromMS (long ms)
  164. {
  165. if (ms < -1)
  166. throw new ArgumentOutOfRangeException ("millisecondsTimeOutInterval", "timeout < -1");
  167. return new TimeSpan (0, 0, 0, 0, (int) ms);
  168. }
  169. public static RegisteredWaitHandle RegisterWaitForSingleObject (WaitHandle waitObject,
  170. WaitOrTimerCallback callback,
  171. object state,
  172. int millisecondsTimeOutInterval,
  173. bool executeOnlyOnce)
  174. {
  175. TimeSpan ts = GetTSFromMS ((long) millisecondsTimeOutInterval);
  176. return RegisterWaitForSingleObject (waitObject, callback, state, ts, executeOnlyOnce);
  177. }
  178. public static RegisteredWaitHandle RegisterWaitForSingleObject (WaitHandle waitObject,
  179. WaitOrTimerCallback callback,
  180. object state,
  181. long millisecondsTimeOutInterval,
  182. bool executeOnlyOnce)
  183. {
  184. TimeSpan ts = GetTSFromMS (millisecondsTimeOutInterval);
  185. return RegisterWaitForSingleObject (waitObject, callback, state, ts, executeOnlyOnce);
  186. }
  187. public static RegisteredWaitHandle RegisterWaitForSingleObject (WaitHandle waitObject,
  188. WaitOrTimerCallback callback,
  189. object state,
  190. TimeSpan timeout,
  191. bool executeOnlyOnce)
  192. {
  193. long ms = (long) timeout.TotalMilliseconds;
  194. if (ms < -1)
  195. throw new ArgumentOutOfRangeException ("timeout", "timeout < -1");
  196. if (ms > Int32.MaxValue)
  197. throw new NotSupportedException ("Timeout is too big. Maximum is Int32.MaxValue");
  198. RegisteredWaitHandle waiter = new RegisteredWaitHandle (waitObject, callback, state, timeout, executeOnlyOnce);
  199. _Threadpool.QueueUserWorkItemInternal (new WaitCallback (waiter.Wait), null);
  200. return waiter;
  201. }
  202. [CLSCompliant(false)]
  203. public static RegisteredWaitHandle RegisterWaitForSingleObject (WaitHandle waitObject,
  204. WaitOrTimerCallback callback,
  205. object state,
  206. uint millisecondsTimeOutInterval,
  207. bool executeOnlyOnce)
  208. {
  209. TimeSpan ts = GetTSFromMS ((long) millisecondsTimeOutInterval);
  210. return RegisterWaitForSingleObject (waitObject, callback, state, ts, executeOnlyOnce);
  211. }
  212. [MonoTODO]
  213. public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(WaitHandle waitObject, WaitOrTimerCallback callback, object state, int millisecondsTimeOutInterval, bool executeOnlyOnce) {
  214. throw new NotImplementedException();
  215. }
  216. [MonoTODO]
  217. public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(WaitHandle waitObject, WaitOrTimerCallback callback, object state, long millisecondsTimeOutInterval, bool executeOnlyOnce) {
  218. throw new NotImplementedException();
  219. }
  220. [MonoTODO]
  221. public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(WaitHandle waitObject, WaitOrTimerCallback callback, object state, TimeSpan timeout, bool executeOnlyOnce) {
  222. throw new NotImplementedException();
  223. }
  224. [CLSCompliant(false)][MonoTODO]
  225. public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(WaitHandle waitObject, WaitOrTimerCallback callback, object state, uint millisecondsTimeOutInterval, bool executeOnlyOnce) {
  226. throw new NotImplementedException();
  227. }
  228. }
  229. }