ElasticThreadPool.hx 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. /*
  2. * Copyright (C)2005-2019 Haxe Foundation
  3. *
  4. * Permission is hereby granted, free of charge, to any person obtaining a
  5. * copy of this software and associated documentation files (the "Software"),
  6. * to deal in the Software without restriction, including without limitation
  7. * the rights to use, copy, modify, merge, publish, distribute, sublicense,
  8. * and/or sell copies of the Software, and to permit persons to whom the
  9. * Software is furnished to do so, subject to the following conditions:
  10. *
  11. * The above copyright notice and this permission notice shall be included in
  12. * all copies or substantial portions of the Software.
  13. *
  14. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  19. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
  20. * DEALINGS IN THE SOFTWARE.
  21. */
  22. package sys.thread;
  23. #if (!target.threaded)
  24. #error "This class is not available on this target"
  25. #end
  26. import haxe.Exception;
  27. /**
  28. Thread pool with a varying amount of threads.
  29. A new thread is spawned every time a task is submitted while all existing
  30. threads are busy.
  31. **/
  32. @:coreApi
  33. class ElasticThreadPool implements IThreadPool {
  34. /* Amount of alive threads in this pool. */
  35. public var threadsCount(get,null):Int;
  36. /* Maximum amount of threads in this pool. */
  37. public var maxThreadsCount:Int;
  38. /** Indicates if `shutdown` method of this pool has been called. */
  39. public var isShutdown(get,never):Bool;
  40. var _isShutdown = false;
  41. function get_isShutdown():Bool return _isShutdown;
  42. final pool:Array<Worker> = [];
  43. final queue = new Deque<()->Void>();
  44. final mutex = new Mutex();
  45. final threadTimeout:Float;
  46. /**
  47. Create a new thread pool with `threadsCount` threads.
  48. If a worker thread does not receive a task for `threadTimeout` seconds it
  49. is terminated.
  50. **/
  51. public function new(maxThreadsCount:Int, threadTimeout:Float = 60):Void {
  52. if(maxThreadsCount < 1)
  53. throw new ThreadPoolException('ElasticThreadPool needs maxThreadsCount to be at least 1.');
  54. this.maxThreadsCount = maxThreadsCount;
  55. this.threadTimeout = threadTimeout;
  56. }
  57. /**
  58. Submit a task to run in a thread.
  59. Throws an exception if the pool is shut down.
  60. **/
  61. public function run(task:()->Void):Void {
  62. if(_isShutdown)
  63. throw new ThreadPoolException('Task is rejected. Thread pool is shut down.');
  64. if(task == null)
  65. throw new ThreadPoolException('Task to run must not be null.');
  66. mutex.acquire();
  67. var submitted = false;
  68. var deadWorker = null;
  69. for(worker in pool) {
  70. if(deadWorker == null && worker.dead) {
  71. deadWorker = worker;
  72. }
  73. if(worker.task == null) {
  74. submitted = true;
  75. worker.wakeup(task);
  76. break;
  77. }
  78. }
  79. if(!submitted) {
  80. if(deadWorker != null) {
  81. deadWorker.wakeup(task);
  82. } else if(pool.length < maxThreadsCount) {
  83. var worker = new Worker(queue, threadTimeout);
  84. pool.push(worker);
  85. worker.wakeup(task);
  86. } else {
  87. queue.add(task);
  88. }
  89. }
  90. mutex.release();
  91. }
  92. /**
  93. Initiates a shutdown.
  94. All previousely submitted tasks will be executed, but no new tasks will
  95. be accepted.
  96. Multiple calls to this method have no effect.
  97. **/
  98. public function shutdown():Void {
  99. if(_isShutdown) return;
  100. mutex.acquire();
  101. _isShutdown = true;
  102. for(worker in pool) {
  103. worker.shutdown();
  104. }
  105. mutex.release();
  106. }
  107. function get_threadsCount():Int {
  108. var result = 0;
  109. for(worker in pool)
  110. if(!worker.dead)
  111. ++result;
  112. return result;
  113. }
  114. }
  115. private class Worker {
  116. public var task(default,null):Null<()->Void>;
  117. public var dead(default,null) = false;
  118. final deathMutex = new Mutex();
  119. final waiter = new Lock();
  120. final queue:Deque<()->Void>;
  121. final timeout:Float;
  122. var thread:Thread;
  123. var isShutdown = false;
  124. public function new(queue:Deque<()->Void>, timeout:Float) {
  125. this.queue = queue;
  126. this.timeout = timeout;
  127. start();
  128. }
  129. public function wakeup(task:()->Void) {
  130. deathMutex.acquire();
  131. if(dead)
  132. start();
  133. this.task = task;
  134. waiter.release();
  135. deathMutex.release();
  136. }
  137. public function shutdown() {
  138. isShutdown = true;
  139. waiter.release();
  140. }
  141. function start() {
  142. dead = false;
  143. thread = Thread.create(loop);
  144. }
  145. function loop() {
  146. try {
  147. while(waiter.wait(timeout)) {
  148. switch task {
  149. case null:
  150. if(isShutdown)
  151. break;
  152. case fn:
  153. fn();
  154. //if more tasks were added while all threads were busy
  155. while(true) {
  156. switch queue.pop(false) {
  157. case null: break;
  158. case fn: fn();
  159. }
  160. }
  161. task = null;
  162. }
  163. }
  164. deathMutex.acquire();
  165. //in case a task was submitted right after the lock timed out
  166. if(task != null)
  167. start()
  168. else
  169. dead = true;
  170. deathMutex.release();
  171. } catch(e) {
  172. task = null;
  173. start();
  174. throw e;
  175. }
  176. }
  177. }