threadpool.bmx 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. ' Copyright (c)2016 Bruce A Henderson
  2. '
  3. ' This software is provided 'as-is', without any express or implied
  4. ' warranty. In no event will the authors be held liable for any damages
  5. ' arising from the use of this software.
  6. '
  7. ' Permission is granted to anyone to use this software for any purpose,
  8. ' including commercial applications, and to alter it and redistribute it
  9. ' freely, subject to the following restrictions:
  10. '
  11. ' 1. The origin of this software must not be misrepresented; you must not
  12. ' claim that you wrote the original software. If you use this software
  13. ' in a product, an acknowledgment in the product documentation would be
  14. ' appreciated but is not required.
  15. '
  16. ' 2. Altered source versions must be plainly marked as such, and must not be
  17. ' misrepresented as being the original software.
  18. '
  19. ' 3. This notice may not be removed or altered from any source
  20. ' distribution.
  21. '
  22. SuperStrict
  23. Rem
  24. bbdoc: System/ThreadPool
  25. End Rem
  26. Module BRL.ThreadPool
  27. ModuleInfo "Version: 1.00"
  28. ModuleInfo "Author: Bruce A Henderson"
  29. ModuleInfo "License: zlib/libpng"
  30. ModuleInfo "Copyright: Bruce A Henderson"
  31. ModuleInfo "History: 1.00"
  32. ModuleInfo "History: Initial Release"
  33. ?threaded
  34. Import BRL.Threads
  35. Import BRL.LinkedList
  36. Rem
  37. bbdoc: An object that is intended to be executed by a thread pool.
  38. End Rem
  39. Type TRunnable Abstract
  40. Rem
  41. bbdoc: Called when the object is executed by the thread pool.
  42. End Rem
  43. Method run() Abstract
  44. End Type
  45. Type TExecutor Abstract
  46. Method execute(command:TRunnable) Abstract
  47. End Type
  48. Rem
  49. bbdoc: An executor that executes each submitted task using one of possibly several pooled threads.
  50. End Rem
  51. Type TThreadPoolExecutor Extends TExecutor
  52. Field keepThreadsAlive:Int
  53. Field isShutdown:Int
  54. Field threadCount:Int
  55. Field maxThreads:Int
  56. Field threads:TList
  57. Field jobQueue:TJobQueue
  58. Field threadsAlive:Int
  59. Field threadsWorking:Int
  60. Field countLock:TMutex
  61. Field threadsIdle:TCondVar
  62. Rem
  63. bbdoc:
  64. End Rem
  65. Method New(initial:Int)
  66. maxThreads = initial
  67. jobQueue = New TJobQueue
  68. countLock = TMutex.Create()
  69. threadsIdle = TCondVar.Create()
  70. threads = New TList
  71. keepThreadsAlive = True
  72. ' initialise threads
  73. If maxThreads > 0 Then
  74. For threadCount = 0 Until maxThreads
  75. threads.AddLast(New TPooledThread(Self, _processThread))
  76. Next
  77. End If
  78. End Method
  79. ' thread callback
  80. Function _processThread:Object( data:Object)
  81. Local thread:TPooledThread = TPooledThread(data)
  82. Local pool:TThreadPoolExecutor = thread.pool
  83. Return pool.processThread()
  84. End Function
  85. Method processThread:Object()
  86. countLock.Lock()
  87. threadsAlive :+ 1
  88. countLock.Unlock()
  89. While keepThreadsAlive
  90. jobQueue.Wait()
  91. If keepThreadsAlive Then
  92. countLock.Lock()
  93. threadsWorking :+ 1
  94. countLock.Unlock()
  95. jobQueue.Lock()
  96. Local job:TRunnable = jobQueue.Remove()
  97. jobQueue.Unlock()
  98. If job Then
  99. job.run()
  100. End If
  101. countLock.Lock()
  102. threadsWorking :- 1
  103. If Not threadsWorking Then
  104. threadsIdle.Signal()
  105. End If
  106. countLock.Unlock()
  107. End If
  108. Wend
  109. countLock.Lock()
  110. threadsAlive :- 1
  111. countLock.Unlock()
  112. Return Null
  113. End Method
  114. Rem
  115. bbdoc: Executes the given command at some time in the future.
  116. End Rem
  117. Method execute(command:TRunnable) Override
  118. If Not isShutdown Then
  119. jobQueue.Lock()
  120. jobQueue.Add(command)
  121. jobQueue.Unlock()
  122. End If
  123. End Method
  124. Rem
  125. bbdoc: Creates an executor that uses a single worker thread operating off an unbounded queue.
  126. End Rem
  127. Function newSingleThreadExecutor:TThreadPoolExecutor()
  128. Return New TThreadPoolExecutor(1)
  129. End Function
  130. Rem
  131. bbdoc: Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue.
  132. about: At any point, at most @threads threads will be active processing tasks. If additional tasks are
  133. submitted when all threads are active, they will wait in the queue until a thread is available.
  134. End Rem
  135. Function newFixedThreadPool:TThreadPoolExecutor(threads:Int)
  136. Assert threads > 0
  137. Return New TThreadPoolExecutor(threads)
  138. End Function
  139. Rem
  140. bbdoc: Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.
  141. End Rem
  142. Method shutdown()
  143. isShutdown = True
  144. Local threadsTotal:Int = threadsAlive
  145. ' wait for queued jobs to be processed
  146. While Not jobQueue.jobs.IsEmpty()
  147. Delay 100
  148. Wend
  149. ' threads can shutdown now
  150. keepThreadsAlive = False
  151. ' remove idle threads
  152. Local time:Int = MilliSecs() & $7FFFFFFF
  153. Local waiting:Int = 0
  154. While waiting < 1000 And threadsAlive
  155. jobQueue.PostAll()
  156. waiting = (MilliSecs() & $7FFFFFFF) - time
  157. Wend
  158. ' poll remaining threads
  159. While threadsAlive
  160. jobQueue.PostAll()
  161. Delay 10
  162. Wend
  163. ' clear down threads
  164. For Local thread:TPooledThread = EachIn threads
  165. thread.pool = Null
  166. Next
  167. threads.Clear()
  168. End Method
  169. End Type
  170. Private
  171. Type TBinarySemaphore
  172. Field mutex:TMutex
  173. Field cond:TCondVar
  174. Field value:Int
  175. Method New(value:Int)
  176. Init(value)
  177. End Method
  178. Method Init(value:Int)
  179. If value = 0 Or value = 1 Then
  180. mutex = TMutex.Create()
  181. cond = TCondVar.Create()
  182. Self.value = value
  183. End If
  184. End Method
  185. Method Wait()
  186. mutex.lock()
  187. While value <> 1
  188. cond.Wait(mutex)
  189. Wend
  190. value = 0
  191. mutex.Unlock()
  192. End Method
  193. Method Post()
  194. mutex.Lock()
  195. value = 1
  196. cond.Signal()
  197. mutex.Unlock()
  198. End Method
  199. Method PostAll()
  200. mutex.Lock()
  201. value = 1
  202. cond.Broadcast()
  203. mutex.Unlock()
  204. End Method
  205. Method Reset()
  206. cond.Close()
  207. mutex.Close()
  208. Init(0)
  209. End Method
  210. End Type
  211. Type TJobQueue
  212. Field mutex:TMutex
  213. Field hasJobs:TBinarySemaphore
  214. Field jobs:TList
  215. Method New()
  216. hasJobs = New TBinarySemaphore(0)
  217. mutex = TMutex.Create()
  218. jobs = New TList
  219. End Method
  220. Method Add(job:TRunnable)
  221. jobs.AddLast(job)
  222. hasJobs.Post()
  223. End Method
  224. Method Remove:TRunnable()
  225. Local job:TRunnable
  226. If Not jobs.IsEmpty() Then
  227. job = TRunnable(jobs.RemoveFirst())
  228. hasJobs.Post()
  229. End If
  230. Return job
  231. End Method
  232. Method Lock()
  233. mutex.Lock()
  234. End Method
  235. Method Unlock()
  236. mutex.Unlock()
  237. End Method
  238. Method Wait()
  239. hasJobs.Wait()
  240. End Method
  241. Method PostAll()
  242. hasJobs.PostAll()
  243. End Method
  244. End Type
  245. Type TPooledThread Extends TThread
  246. Field pool:TThreadPoolExecutor
  247. Field id:Int
  248. Method New(pool:TThreadPoolExecutor, entry:Object( data:Object))
  249. Self.pool = pool
  250. _entry=entry
  251. _data=Self
  252. _running=True
  253. _handle=threads_CreateThread( _EntryStub, Self )
  254. End Method
  255. End Type
  256. Extern
  257. Function threads_CreateThread:Byte Ptr( entry:Object( data:Object ),data:Object )
  258. End Extern
  259. ?