threadpool.bmx 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604
  1. ' Copyright (c)2019 Bruce A Henderson
  2. '
  3. ' This software is provided 'as-is', without any express or implied
  4. ' warranty. In no event will the authors be held liable for any damages
  5. ' arising from the use of this software.
  6. '
  7. ' Permission is granted to anyone to use this software for any purpose,
  8. ' including commercial applications, and to alter it and redistribute it
  9. ' freely, subject to the following restrictions:
  10. '
  11. ' 1. The origin of this software must not be misrepresented; you must not
  12. ' claim that you wrote the original software. If you use this software
  13. ' in a product, an acknowledgment in the product documentation would be
  14. ' appreciated but is not required.
  15. '
  16. ' 2. Altered source versions must be plainly marked as such, and must not be
  17. ' misrepresented as being the original software.
  18. '
  19. ' 3. This notice may not be removed or altered from any source
  20. ' distribution.
  21. '
  22. SuperStrict
  23. Rem
  24. bbdoc: System/ThreadPool
  25. End Rem
  26. Module BRL.ThreadPool
  27. ModuleInfo "Version: 1.02"
  28. ModuleInfo "Author: Bruce A Henderson"
  29. ModuleInfo "License: zlib/libpng"
  30. ModuleInfo "Copyright: Bruce A Henderson"
  31. ModuleInfo "History: 1.02"
  32. ModuleInfo "History: Added scheduled pool executor"
  33. ModuleInfo "History: 1.01"
  34. ModuleInfo "History: Added cached pool executor"
  35. ModuleInfo "History: 1.00"
  36. ModuleInfo "History: Initial Release"
  37. ?threaded
  38. Import BRL.Threads
  39. Import BRL.LinkedList
  40. Import BRL.Time
  41. Import pub.stdc
  42. Rem
  43. bbdoc: An object that is intended to be executed by a thread pool.
  44. End Rem
  45. Type TRunnable Abstract
  46. Rem
  47. bbdoc: Called when the object is executed by the thread pool.
  48. End Rem
  49. Method run() Abstract
  50. End Type
  51. Type TExecutor Abstract
  52. Method execute(command:TRunnable) Abstract
  53. End Type
  54. Rem
  55. bbdoc: An executor that executes each submitted task using one of possibly several pooled threads.
  56. End Rem
  57. Type TThreadPoolExecutor Extends TExecutor
  58. Field keepThreadsAlive:Int
  59. Field isShutdown:Int
  60. Field threadCount:Int
  61. Field maxThreads:Int
  62. Field threads:TList
  63. Field threadsLock:TMutex
  64. Field jobQueue:TJobQueue
  65. Field threadsAlive:Int
  66. Field threadsWorking:Int
  67. Field countLock:TMutex
  68. Field threadsIdle:TCondVar
  69. Field maxIdleWait:Int
  70. Rem
  71. bbdoc:
  72. End Rem
  73. Method New(initial:Int, idleWait:Int = 0)
  74. maxThreads = initial
  75. maxIdleWait = idleWait
  76. threadsLock = TMutex.Create()
  77. jobQueue = New TJobQueue
  78. countLock = TMutex.Create()
  79. threadsIdle = TCondVar.Create()
  80. threads = New TList
  81. keepThreadsAlive = True
  82. ' initialise threads
  83. If maxThreads > 0 Then
  84. For threadCount = 0 Until maxThreads
  85. threads.AddLast(New TPooledThread(Self, _processThread))
  86. Next
  87. End If
  88. End Method
  89. ' thread callback
  90. Function _processThread:Object( data:Object)
  91. Local thread:TPooledThread = TPooledThread(data)
  92. Local pool:TThreadPoolExecutor = thread.pool
  93. Return pool.processThread(thread)
  94. End Function
  95. Method processThread:Object(thread:TPooledThread)
  96. countLock.Lock()
  97. threadsAlive :+ 1
  98. countLock.Unlock()
  99. While keepThreadsAlive
  100. If maxIdleWait Then
  101. If jobQueue.TimedWait(maxIdleWait) = 1 Then
  102. Exit
  103. End If
  104. Else
  105. jobQueue.Wait()
  106. End If
  107. If keepThreadsAlive Then
  108. countLock.Lock()
  109. threadsWorking :+ 1
  110. countLock.Unlock()
  111. jobQueue.Lock()
  112. Local job:TRunnable = jobQueue.Remove()
  113. jobQueue.Unlock()
  114. If job Then
  115. job.run()
  116. End If
  117. countLock.Lock()
  118. threadsWorking :- 1
  119. If Not threadsWorking Then
  120. threadsIdle.Signal()
  121. End If
  122. countLock.Unlock()
  123. End If
  124. Wend
  125. countLock.Lock()
  126. threadsAlive :- 1
  127. countLock.Unlock()
  128. threadsLock.Lock()
  129. threads.Remove(thread)
  130. threadsLock.Unlock()
  131. Return Null
  132. End Method
  133. Rem
  134. bbdoc: Executes the given command at some time in the future.
  135. End Rem
  136. Method execute(command:TRunnable) Override
  137. If Not isShutdown Then
  138. doExecute(command)
  139. End If
  140. End Method
  141. Private
  142. Method doExecute(command:TRunnable)
  143. If maxThreads < 0 Then
  144. Local newThread:Int
  145. countLock.Lock()
  146. If threadsWorking = threadsAlive Then
  147. newThread = True
  148. End If
  149. countLock.Unlock()
  150. If newThread Then
  151. threadsLock.Lock()
  152. threads.AddLast(New TPooledThread(Self, _processThread))
  153. threadsLock.Unlock()
  154. End If
  155. End If
  156. jobQueue.Lock()
  157. jobQueue.Add(command)
  158. jobQueue.Unlock()
  159. End Method
  160. Public
  161. Rem
  162. bbdoc: Creates an executor that uses a single worker thread operating off an unbounded queue.
  163. End Rem
  164. Function newSingleThreadExecutor:TThreadPoolExecutor()
  165. Return New TThreadPoolExecutor(1)
  166. End Function
  167. Rem
  168. bbdoc: Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue.
  169. about: At any point, at most @threads threads will be active processing tasks. If additional tasks are
  170. submitted when all threads are active, they will wait in the queue until a thread is available.
  171. End Rem
  172. Function newFixedThreadPool:TThreadPoolExecutor(threads:Int)
  173. Assert threads > 0
  174. Return New TThreadPoolExecutor(threads)
  175. End Function
  176. Rem
  177. bbdoc:
  178. End Rem
  179. Function newCachedThreadPool:TThreadPoolExecutor(idleWait:Int = 60000)
  180. Return New TThreadPoolExecutor(-1, idleWait)
  181. End Function
  182. Rem
  183. bbdoc: Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.
  184. End Rem
  185. Method shutdown()
  186. isShutdown = True
  187. Local threadsTotal:Int = threadsAlive
  188. ' wait for queued jobs to be processed
  189. While Not jobQueue.jobs.IsEmpty()
  190. Delay 100
  191. Wend
  192. ' threads can shutdown now
  193. keepThreadsAlive = False
  194. ' remove idle threads
  195. Local time:Int = MilliSecs() & $7FFFFFFF
  196. Local waiting:Int = 0
  197. While waiting < 1000 And threadsAlive
  198. jobQueue.PostAll()
  199. waiting = (MilliSecs() & $7FFFFFFF) - time
  200. Wend
  201. ' poll remaining threads
  202. While threadsAlive
  203. jobQueue.PostAll()
  204. Delay 10
  205. Wend
  206. ' clear down threads
  207. For Local thread:TPooledThread = EachIn threads
  208. thread.pool = Null
  209. Next
  210. threads.Clear()
  211. End Method
  212. Rem
  213. bbdoc: Returns the approximate number of threads that are actively executing tasks.
  214. end rem
  215. Method getActiveCount:Int()
  216. countLock.Lock()
  217. Local count:Int = threadsWorking
  218. countLock.unlock()
  219. return count
  220. End Method
  221. Method IsQueueEmpty:Int()
  222. return jobQueue.IsEmpty()
  223. end method
  224. End Type
  225. Rem
  226. bbdoc: An executor that can be used to schedule commands to run after a given delay, or to execute commands periodically.
  227. End Rem
  228. Type TScheduledThreadPoolExecutor Extends TThreadPoolExecutor
  229. Field tasks:TScheduledTask
  230. Field taskMutex:TMutex
  231. Field taskCond:TCondVar
  232. Field schedulerThread:TThread
  233. Method New(initial:Int, idleWait:Int = 0)
  234. Super.New(initial, idleWait)
  235. taskMutex = TMutex.Create()
  236. taskCond = TCondVar.Create()
  237. schedulerThread = CreateThread(taskScheduler, Self)
  238. End Method
  239. Method schedule(command:TRunnable, delay_:Int, unit:ETimeUnit = ETimeUnit.Milliseconds)
  240. schedule(command, ULong(delay_), 0, unit)
  241. End Method
  242. Method schedule(command:TRunnable, initialDelay:Int, period:Int, unit:ETimeUnit = ETimeUnit.Milliseconds)
  243. schedule(command, ULong(initialDelay), ULong(period), unit)
  244. End Method
  245. Rem
  246. bbdoc: Schedules a one-shot command to run after a given delay.
  247. End Rem
  248. Method schedule(command:TRunnable, delay_:ULong, unit:ETimeUnit = ETimeUnit.Milliseconds)
  249. schedule(command, delay_, 0, unit)
  250. End Method
  251. Rem
  252. bbdoc: Schedules a recurring command to run after a given initial delay, and subsequently with the given period.
  253. End Rem
  254. Method schedule(command:TRunnable, initialDelay:ULong, period:ULong, unit:ETimeUnit = ETimeUnit.Milliseconds)
  255. Local now:ULong = CurrentUnixTime()
  256. Local newTask:TScheduledTask = New TScheduledTask
  257. Local delayMs:ULong = TimeUnitToMillis(initialDelay, unit)
  258. Local periodMs:ULong = TimeUnitToMillis(period, unit)
  259. newTask.executeAt = now + delayMs
  260. newTask.intervalMs = periodMs
  261. newTask.command = command
  262. taskMutex.Lock()
  263. insertTask(newTask)
  264. taskMutex.Unlock()
  265. End Method
  266. Rem
  267. bbdoc: Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.
  268. End Rem
  269. Method shutdown() Override
  270. isShutdown = True
  271. schedulerThread.Wait()
  272. Super.shutdown()
  273. End Method
  274. Private
  275. Method insertTask(newTask:TScheduledTask)
  276. Local headChanged:Int = False
  277. If Not tasks Or newTask.executeAt < tasks.executeAt Then
  278. newTask.nextTask = tasks
  279. tasks = newTask
  280. headChanged = True
  281. Else
  282. Local current:TScheduledTask = tasks
  283. While current.nextTask And current.nextTask.executeAt < newTask.executeAt
  284. current = current.nextTask
  285. Wend
  286. newTask.nextTask = current.nextTask
  287. current.nextTask = newTask
  288. End If
  289. If headChanged Then
  290. taskCond.Signal()
  291. End If
  292. End Method
  293. Function taskScheduler:Object( data:Object )
  294. Local exec:TScheduledThreadPoolExecutor = TScheduledThreadPoolExecutor(data)
  295. While True
  296. exec.taskMutex.Lock()
  297. While Not exec.tasks
  298. If exec.isShutdown Then
  299. exec.taskMutex.Unlock()
  300. Return Null
  301. End If
  302. exec.taskCond.Wait(exec.taskMutex)
  303. Wend
  304. Local now:ULong = CurrentUnixTime()
  305. If now < exec.tasks.executeAt Then
  306. ' Wait until the next task is due or a new task is scheduled
  307. exec.taskCond.TimedWait(exec.taskMutex, Int(exec.tasks.executeAt - now))
  308. End If
  309. now = CurrentUnixTime()
  310. While exec.tasks And exec.tasks.executeAt <= now
  311. Local task:TScheduledTask = exec.tasks
  312. exec.doExecute(task.command)
  313. If task.intervalMs And Not exec.isShutdown Then
  314. ' If the task is recurring, reschedule it, unless the executor is shutting down
  315. task.executeAt = now + task.intervalMs
  316. exec.tasks = task.nextTask
  317. exec.insertTask(task)
  318. Else
  319. ' Otherwise, remove it from the list
  320. exec.tasks = task.nextTask
  321. End If
  322. Wend
  323. exec.taskMutex.Unlock()
  324. Wend
  325. End Function
  326. Public
  327. Rem
  328. bbdoc: Creates an executor that uses a single worker thread operating off an unbounded queue.
  329. End Rem
  330. Function newSingleThreadExecutor:TScheduledThreadPoolExecutor()
  331. Return New TScheduledThreadPoolExecutor(1)
  332. End Function
  333. Rem
  334. bbdoc: Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue.
  335. about: At any point, at most @threads threads will be active processing tasks. If additional tasks are
  336. submitted when all threads are active, they will wait in the queue until a thread is available.
  337. End Rem
  338. Function newFixedThreadPool:TScheduledThreadPoolExecutor(threads:Int)
  339. Assert threads > 0
  340. Return New TScheduledThreadPoolExecutor(threads)
  341. End Function
  342. Rem
  343. bbdoc: Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available.
  344. about: These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.
  345. Threads that remain idle for more than the specified @idleWait time will be terminated and removed from the pool.
  346. End Rem
  347. Function newCachedThreadPool:TScheduledThreadPoolExecutor(idleWait:Int = 60000)
  348. Return New TScheduledThreadPoolExecutor(-1, idleWait)
  349. End Function
  350. End Type
  351. Type TScheduledTask
  352. Field executeAt:Long ' the time to execute the task, in ms since the epoch
  353. Field command:TRunnable
  354. Field intervalMs:ULong ' zero for one-shot tasks
  355. Field nextTask:TScheduledTask
  356. End Type
  357. Private
  358. Type TBinarySemaphore
  359. Field mutex:TMutex
  360. Field cond:TCondVar
  361. Field value:Int
  362. Method New(value:Int)
  363. Init(value)
  364. End Method
  365. Method Init(value:Int)
  366. If value = 0 Or value = 1 Then
  367. mutex = TMutex.Create()
  368. cond = TCondVar.Create()
  369. Self.value = value
  370. End If
  371. End Method
  372. Method Wait()
  373. mutex.lock()
  374. While value <> 1
  375. cond.Wait(mutex)
  376. Wend
  377. value = 0
  378. mutex.Unlock()
  379. End Method
  380. Method TimedWait:Int(millis:Int)
  381. mutex.lock()
  382. While value <> 1
  383. Local res:Int = cond.TimedWait(mutex, millis)
  384. If res = 1 Then
  385. value = 0
  386. mutex.Unlock()
  387. Return res
  388. End If
  389. Wend
  390. value = 0
  391. mutex.Unlock()
  392. End Method
  393. Method Post()
  394. mutex.Lock()
  395. value = 1
  396. cond.Signal()
  397. mutex.Unlock()
  398. End Method
  399. Method PostAll()
  400. mutex.Lock()
  401. value = 1
  402. cond.Broadcast()
  403. mutex.Unlock()
  404. End Method
  405. Method Reset()
  406. cond.Close()
  407. mutex.Close()
  408. Init(0)
  409. End Method
  410. End Type
  411. Type TJobQueue
  412. Field mutex:TMutex
  413. Field hasJobs:TBinarySemaphore
  414. Field jobs:TList
  415. Method New()
  416. hasJobs = New TBinarySemaphore(0)
  417. mutex = TMutex.Create()
  418. jobs = New TList
  419. End Method
  420. Method Add(job:TRunnable)
  421. jobs.AddLast(job)
  422. hasJobs.Post()
  423. End Method
  424. Method Remove:TRunnable()
  425. Local job:TRunnable
  426. If Not jobs.IsEmpty() Then
  427. job = TRunnable(jobs.RemoveFirst())
  428. hasJobs.Post()
  429. End If
  430. Return job
  431. End Method
  432. Method IsEmpty:Int()
  433. Lock()
  434. Local empty:Int = jobs.IsEmpty()
  435. UnLock()
  436. return empty
  437. End Method
  438. Method Lock()
  439. mutex.Lock()
  440. End Method
  441. Method Unlock()
  442. mutex.Unlock()
  443. End Method
  444. Method Wait()
  445. hasJobs.Wait()
  446. End Method
  447. Method TimedWait:Int(millis:Int)
  448. Return hasJobs.TimedWait(millis)
  449. End Method
  450. Method PostAll()
  451. hasJobs.PostAll()
  452. End Method
  453. End Type
  454. Type TPooledThread Extends TThread
  455. Field pool:TThreadPoolExecutor
  456. Field id:Int
  457. Method New(pool:TThreadPoolExecutor, entry:Object( data:Object))
  458. Self.pool = pool
  459. _entry=entry
  460. _data=Self
  461. _running=True
  462. _handle=threads_CreateThread( _EntryStub, Self )
  463. End Method
  464. End Type
  465. Extern
  466. Function threads_CreateThread:Byte Ptr( entry:Object( data:Object ),data:Object )
  467. End Extern
  468. ?