threadpool.bmx 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618
  1. ' Copyright (c)2019 Bruce A Henderson
  2. '
  3. ' This software is provided 'as-is', without any express or implied
  4. ' warranty. In no event will the authors be held liable for any damages
  5. ' arising from the use of this software.
  6. '
  7. ' Permission is granted to anyone to use this software for any purpose,
  8. ' including commercial applications, and to alter it and redistribute it
  9. ' freely, subject to the following restrictions:
  10. '
  11. ' 1. The origin of this software must not be misrepresented; you must not
  12. ' claim that you wrote the original software. If you use this software
  13. ' in a product, an acknowledgment in the product documentation would be
  14. ' appreciated but is not required.
  15. '
  16. ' 2. Altered source versions must be plainly marked as such, and must not be
  17. ' misrepresented as being the original software.
  18. '
  19. ' 3. This notice may not be removed or altered from any source
  20. ' distribution.
  21. '
  22. SuperStrict
  23. Rem
  24. bbdoc: System/ThreadPool
  25. End Rem
  26. Module BRL.ThreadPool
  27. ModuleInfo "Version: 1.02"
  28. ModuleInfo "Author: Bruce A Henderson"
  29. ModuleInfo "License: zlib/libpng"
  30. ModuleInfo "Copyright: Bruce A Henderson"
  31. ModuleInfo "History: 1.02"
  32. ModuleInfo "History: Added scheduled pool executor"
  33. ModuleInfo "History: 1.01"
  34. ModuleInfo "History: Added cached pool executor"
  35. ModuleInfo "History: 1.00"
  36. ModuleInfo "History: Initial Release"
  37. ?threaded
  38. Import BRL.Threads
  39. Import BRL.LinkedList
  40. Import BRL.Time
  41. Import pub.stdc
  42. Rem
  43. bbdoc: An object that is intended to be executed by a thread pool.
  44. End Rem
  45. Type TRunnable Abstract
  46. Rem
  47. bbdoc: Called when the object is executed by the thread pool.
  48. End Rem
  49. Method run() Abstract
  50. End Type
  51. Type TExecutor Abstract
  52. Method execute(command:TRunnable) Abstract
  53. End Type
  54. Rem
  55. bbdoc: An executor that executes each submitted task using one of possibly several pooled threads.
  56. End Rem
  57. Type TThreadPoolExecutor Extends TExecutor
  58. Field keepThreadsAlive:Int
  59. Field isShutdown:Int
  60. Field threadCount:Int
  61. Field maxThreads:Int
  62. Field threads:TList
  63. Field threadsLock:TMutex
  64. Field jobQueue:TJobQueue
  65. Field threadsAlive:Int
  66. Field threadsWorking:Int
  67. Field countLock:TMutex
  68. Field threadsIdle:TCondVar
  69. Field maxIdleWait:Int
  70. Rem
  71. bbdoc:
  72. End Rem
  73. Method New(initial:Int, idleWait:Int = 0)
  74. maxThreads = initial
  75. maxIdleWait = idleWait
  76. threadsLock = TMutex.Create()
  77. jobQueue = New TJobQueue
  78. countLock = TMutex.Create()
  79. threadsIdle = TCondVar.Create()
  80. threads = New TList
  81. keepThreadsAlive = True
  82. ' initialise threads
  83. If maxThreads > 0 Then
  84. For threadCount = 0 Until maxThreads
  85. threads.AddLast(New TPooledThread(Self, _processThread))
  86. Next
  87. End If
  88. End Method
  89. ' thread callback
  90. Function _processThread:Object( data:Object)
  91. Local thread:TPooledThread = TPooledThread(data)
  92. Local pool:TThreadPoolExecutor = thread.pool
  93. Return pool.processThread(thread)
  94. End Function
  95. Method processThread:Object(thread:TPooledThread)
  96. countLock.Lock()
  97. threadsAlive :+ 1
  98. countLock.Unlock()
  99. While keepThreadsAlive
  100. If maxIdleWait Then
  101. If jobQueue.TimedWait(maxIdleWait) = 1 Then
  102. Exit
  103. End If
  104. Else
  105. jobQueue.Wait()
  106. End If
  107. If keepThreadsAlive Then
  108. countLock.Lock()
  109. threadsWorking :+ 1
  110. countLock.Unlock()
  111. jobQueue.Lock()
  112. Local job:TRunnable = jobQueue.Remove()
  113. jobQueue.Unlock()
  114. If job Then
  115. job.run()
  116. End If
  117. countLock.Lock()
  118. threadsWorking :- 1
  119. If Not threadsWorking Then
  120. threadsIdle.Signal()
  121. End If
  122. countLock.Unlock()
  123. End If
  124. Wend
  125. countLock.Lock()
  126. threadsAlive :- 1
  127. countLock.Unlock()
  128. threadsLock.Lock()
  129. threads.Remove(thread)
  130. threadsLock.Unlock()
  131. Return Null
  132. End Method
  133. Rem
  134. bbdoc: Executes the given command at some time in the future.
  135. End Rem
  136. Method execute(command:TRunnable) Override
  137. If Not isShutdown Then
  138. doExecute(command)
  139. End If
  140. End Method
  141. Private
  142. Method doExecute(command:TRunnable)
  143. If maxThreads < 0 Then
  144. Local newThread:Int
  145. countLock.Lock()
  146. If threadsWorking = threadsAlive Then
  147. newThread = True
  148. End If
  149. countLock.Unlock()
  150. If newThread Then
  151. threadsLock.Lock()
  152. threads.AddLast(New TPooledThread(Self, _processThread))
  153. threadsLock.Unlock()
  154. End If
  155. End If
  156. jobQueue.Lock()
  157. jobQueue.Add(command)
  158. jobQueue.Unlock()
  159. End Method
  160. Public
  161. Rem
  162. bbdoc: Creates an executor that uses a single worker thread operating off an unbounded queue.
  163. End Rem
  164. Function newSingleThreadExecutor:TThreadPoolExecutor()
  165. Return New TThreadPoolExecutor(1)
  166. End Function
  167. Rem
  168. bbdoc: Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue.
  169. about: At any point, at most @threads threads will be active processing tasks. If additional tasks are
  170. submitted when all threads are active, they will wait in the queue until a thread is available.
  171. End Rem
  172. Function newFixedThreadPool:TThreadPoolExecutor(threads:Int)
  173. Assert threads > 0
  174. Return New TThreadPoolExecutor(threads)
  175. End Function
  176. Rem
  177. bbdoc:
  178. End Rem
  179. Function newCachedThreadPool:TThreadPoolExecutor(idleWait:Int = 60000)
  180. Return New TThreadPoolExecutor(-1, idleWait)
  181. End Function
  182. Rem
  183. bbdoc: Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.
  184. End Rem
  185. Method shutdown()
  186. isShutdown = True
  187. Local threadsTotal:Int = threadsAlive
  188. ' wait for queued jobs to be processed
  189. While Not jobQueue.jobs.IsEmpty()
  190. Delay 100
  191. Wend
  192. ' threads can shutdown now
  193. keepThreadsAlive = False
  194. ' remove idle threads
  195. Local time:Int = MilliSecs() & $7FFFFFFF
  196. Local waiting:Int = 0
  197. While waiting < 1000 And threadsAlive
  198. jobQueue.PostAll()
  199. waiting = (MilliSecs() & $7FFFFFFF) - time
  200. Wend
  201. ' poll remaining threads
  202. While threadsAlive
  203. jobQueue.PostAll()
  204. Delay 10
  205. Wend
  206. ' clear down threads
  207. For Local thread:TPooledThread = EachIn threads
  208. thread.pool = Null
  209. Next
  210. threads.Clear()
  211. End Method
  212. Rem
  213. bbdoc: Returns the approximate number of threads that are actively executing tasks.
  214. end rem
  215. Method getActiveCount:Int()
  216. countLock.Lock()
  217. Local count:Int = threadsWorking
  218. countLock.unlock()
  219. return count
  220. End Method
  221. Method IsQueueEmpty:Int()
  222. return jobQueue.IsEmpty()
  223. end method
  224. End Type
  225. Rem
  226. bbdoc: An executor that can be used to schedule commands to run after a given delay, or to execute commands periodically.
  227. End Rem
  228. Type TScheduledThreadPoolExecutor Extends TThreadPoolExecutor
  229. Field tasks:TScheduledTask
  230. Field taskMutex:TMutex
  231. Field taskCond:TCondVar
  232. Field schedulerThread:TThread
  233. Method New(initial:Int, idleWait:Int = 0)
  234. Super.New(initial, idleWait)
  235. taskMutex = TMutex.Create()
  236. taskCond = TCondVar.Create()
  237. schedulerThread = CreateThread(taskScheduler, Self)
  238. End Method
  239. Method schedule(command:TRunnable, delay_:Int, unit:ETimeUnit = ETimeUnit.Milliseconds)
  240. schedule(command, ULong(delay_), 0, unit)
  241. End Method
  242. Method schedule(command:TRunnable, initialDelay:Int, period:Int, unit:ETimeUnit = ETimeUnit.Milliseconds)
  243. schedule(command, ULong(initialDelay), ULong(period), unit)
  244. End Method
  245. Rem
  246. bbdoc: Schedules a one-shot command to run after a given delay.
  247. End Rem
  248. Method schedule(command:TRunnable, delay_:ULong, unit:ETimeUnit = ETimeUnit.Milliseconds)
  249. schedule(command, delay_, 0, unit)
  250. End Method
  251. Rem
  252. bbdoc: Schedules a recurring command to run after a given initial delay, and subsequently with the given period.
  253. End Rem
  254. Method schedule(command:TRunnable, initialDelay:ULong, period:ULong, unit:ETimeUnit = ETimeUnit.Milliseconds)
  255. Local now:ULong = CurrentUnixTime()
  256. Local newTask:TScheduledTask = New TScheduledTask
  257. Local delayMs:ULong = initialDelay
  258. Local periodMs:ULong = period
  259. Select unit
  260. Case ETimeUnit.Seconds
  261. delayMs :* 1000
  262. periodMs :* 1000
  263. Case ETimeUnit.Minutes
  264. delayMs :* 60000
  265. periodMs :* 60000
  266. Case ETimeUnit.Hours
  267. delayMs :* 3600000
  268. periodMs :* 3600000
  269. Case ETimeUnit.Days
  270. delayMs :* 86400000
  271. periodMs :* 86400000
  272. End Select
  273. newTask.executeAt = now + delayMs
  274. newTask.intervalMs = periodMs
  275. newTask.command = command
  276. taskMutex.Lock()
  277. insertTask(newTask)
  278. taskMutex.Unlock()
  279. End Method
  280. Rem
  281. bbdoc: Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.
  282. End Rem
  283. Method shutdown() Override
  284. isShutdown = True
  285. schedulerThread.Wait()
  286. Super.shutdown()
  287. End Method
  288. Private
  289. Method insertTask(newTask:TScheduledTask)
  290. Local headChanged:Int = False
  291. If Not tasks Or newTask.executeAt < tasks.executeAt Then
  292. newTask.nextTask = tasks
  293. tasks = newTask
  294. headChanged = True
  295. Else
  296. Local current:TScheduledTask = tasks
  297. While current.nextTask And current.nextTask.executeAt < newTask.executeAt
  298. current = current.nextTask
  299. Wend
  300. newTask.nextTask = current.nextTask
  301. current.nextTask = newTask
  302. End If
  303. If headChanged Then
  304. taskCond.Signal()
  305. End If
  306. End Method
  307. Function taskScheduler:Object( data:Object )
  308. Local exec:TScheduledThreadPoolExecutor = TScheduledThreadPoolExecutor(data)
  309. While True
  310. exec.taskMutex.Lock()
  311. While Not exec.tasks
  312. If exec.isShutdown Then
  313. exec.taskMutex.Unlock()
  314. Return Null
  315. End If
  316. exec.taskCond.Wait(exec.taskMutex)
  317. Wend
  318. Local now:ULong = CurrentUnixTime()
  319. If now < exec.tasks.executeAt Then
  320. ' Wait until the next task is due or a new task is scheduled
  321. exec.taskCond.TimedWait(exec.taskMutex, Int(exec.tasks.executeAt - now))
  322. End If
  323. now = CurrentUnixTime()
  324. While exec.tasks And exec.tasks.executeAt <= now
  325. Local task:TScheduledTask = exec.tasks
  326. exec.doExecute(task.command)
  327. If task.intervalMs And Not exec.isShutdown Then
  328. ' If the task is recurring, reschedule it, unless the executor is shutting down
  329. task.executeAt = now + task.intervalMs
  330. exec.tasks = task.nextTask
  331. exec.insertTask(task)
  332. Else
  333. ' Otherwise, remove it from the list
  334. exec.tasks = task.nextTask
  335. End If
  336. Wend
  337. exec.taskMutex.Unlock()
  338. Wend
  339. End Function
  340. Public
  341. Rem
  342. bbdoc: Creates an executor that uses a single worker thread operating off an unbounded queue.
  343. End Rem
  344. Function newSingleThreadExecutor:TScheduledThreadPoolExecutor()
  345. Return New TScheduledThreadPoolExecutor(1)
  346. End Function
  347. Rem
  348. bbdoc: Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue.
  349. about: At any point, at most @threads threads will be active processing tasks. If additional tasks are
  350. submitted when all threads are active, they will wait in the queue until a thread is available.
  351. End Rem
  352. Function newFixedThreadPool:TScheduledThreadPoolExecutor(threads:Int)
  353. Assert threads > 0
  354. Return New TScheduledThreadPoolExecutor(threads)
  355. End Function
  356. Rem
  357. bbdoc: Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available.
  358. about: These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.
  359. Threads that remain idle for more than the specified @idleWait time will be terminated and removed from the pool.
  360. End Rem
  361. Function newCachedThreadPool:TScheduledThreadPoolExecutor(idleWait:Int = 60000)
  362. Return New TScheduledThreadPoolExecutor(-1, idleWait)
  363. End Function
  364. End Type
  365. Type TScheduledTask
  366. Field executeAt:Long ' the time to execute the task, in ms since the epoch
  367. Field command:TRunnable
  368. Field intervalMs:ULong ' zero for one-shot tasks
  369. Field nextTask:TScheduledTask
  370. End Type
  371. Private
  372. Type TBinarySemaphore
  373. Field mutex:TMutex
  374. Field cond:TCondVar
  375. Field value:Int
  376. Method New(value:Int)
  377. Init(value)
  378. End Method
  379. Method Init(value:Int)
  380. If value = 0 Or value = 1 Then
  381. mutex = TMutex.Create()
  382. cond = TCondVar.Create()
  383. Self.value = value
  384. End If
  385. End Method
  386. Method Wait()
  387. mutex.lock()
  388. While value <> 1
  389. cond.Wait(mutex)
  390. Wend
  391. value = 0
  392. mutex.Unlock()
  393. End Method
  394. Method TimedWait:Int(millis:Int)
  395. mutex.lock()
  396. While value <> 1
  397. Local res:Int = cond.TimedWait(mutex, millis)
  398. If res = 1 Then
  399. value = 0
  400. mutex.Unlock()
  401. Return res
  402. End If
  403. Wend
  404. value = 0
  405. mutex.Unlock()
  406. End Method
  407. Method Post()
  408. mutex.Lock()
  409. value = 1
  410. cond.Signal()
  411. mutex.Unlock()
  412. End Method
  413. Method PostAll()
  414. mutex.Lock()
  415. value = 1
  416. cond.Broadcast()
  417. mutex.Unlock()
  418. End Method
  419. Method Reset()
  420. cond.Close()
  421. mutex.Close()
  422. Init(0)
  423. End Method
  424. End Type
  425. Type TJobQueue
  426. Field mutex:TMutex
  427. Field hasJobs:TBinarySemaphore
  428. Field jobs:TList
  429. Method New()
  430. hasJobs = New TBinarySemaphore(0)
  431. mutex = TMutex.Create()
  432. jobs = New TList
  433. End Method
  434. Method Add(job:TRunnable)
  435. jobs.AddLast(job)
  436. hasJobs.Post()
  437. End Method
  438. Method Remove:TRunnable()
  439. Local job:TRunnable
  440. If Not jobs.IsEmpty() Then
  441. job = TRunnable(jobs.RemoveFirst())
  442. hasJobs.Post()
  443. End If
  444. Return job
  445. End Method
  446. Method IsEmpty:Int()
  447. Lock()
  448. Local empty:Int = jobs.IsEmpty()
  449. UnLock()
  450. return empty
  451. End Method
  452. Method Lock()
  453. mutex.Lock()
  454. End Method
  455. Method Unlock()
  456. mutex.Unlock()
  457. End Method
  458. Method Wait()
  459. hasJobs.Wait()
  460. End Method
  461. Method TimedWait:Int(millis:Int)
  462. Return hasJobs.TimedWait(millis)
  463. End Method
  464. Method PostAll()
  465. hasJobs.PostAll()
  466. End Method
  467. End Type
  468. Type TPooledThread Extends TThread
  469. Field pool:TThreadPoolExecutor
  470. Field id:Int
  471. Method New(pool:TThreadPoolExecutor, entry:Object( data:Object))
  472. Self.pool = pool
  473. _entry=entry
  474. _data=Self
  475. _running=True
  476. _handle=threads_CreateThread( _EntryStub, Self )
  477. End Method
  478. End Type
  479. Extern
  480. Function threads_CreateThread:Byte Ptr( entry:Object( data:Object ),data:Object )
  481. End Extern
  482. ?