bmk_pm_legacy.bmx 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. SuperStrict
  2. Import "bmk_ng_utils.bmx"
  3. Type TProcessManager
  4. Field pool:TThreadPool
  5. Field cpuCount:Int
  6. Field threads:TList = New TList
  7. Method New()
  8. cpuCount = GetCoreCount()
  9. pool = TThreadPool.Create(Max(1, cpuCount - 1), cpuCount * 6)
  10. End Method
  11. Method CheckTasks()
  12. While pool.count() = pool.Size()
  13. Delay 5
  14. Wend
  15. End Method
  16. Method WaitForTasks()
  17. While pool.Count() Or pool.Running()
  18. Delay 5
  19. Wend
  20. End Method
  21. Method DoSystem(cmd:String, src:String, obj:String, supp:String)
  22. CheckTasks()
  23. pool.AddTask(TProcessTask._DoTasks, CreateProcessTask(cmd, src, obj, supp))
  24. End Method
  25. Method AddTask:Int(func:Object(data:Object), data:Object)
  26. CheckTasks()
  27. pool.AddTask(func, data)
  28. End Method
  29. End Type
  30. Rem
  31. bbdoc: A thread pool.
  32. End Rem
  33. Type TThreadPool
  34. Field _threads:TThread[]
  35. Field _queue:TThreadPoolTask[]
  36. Field _lock:TMutex
  37. Field _waitVar:TCondVar
  38. Field _count:Int
  39. Field _head:Int
  40. Field _tail:Int
  41. Field _running:Int
  42. Field _shutdown:Int
  43. Rem
  44. bbdoc: Creates a new thread pool of @threadCount threads and a queue size of @queueSize.
  45. End Rem
  46. Function Create:TThreadPool(threadCount:Int, queueSize:Int)
  47. Local pool:TThreadPool = New TThreadPool
  48. pool._threads = New TThread[threadCount]
  49. pool._queue = New TThreadPoolTask[queueSize]
  50. pool._lock = TMutex.Create()
  51. pool._waitVar = TCondVar.Create()
  52. For Local i:Int = 0 Until threadCount
  53. pool._threads[i] = TThread.Create(_ThreadPoolThread, pool)
  54. Next
  55. ' cache tasks
  56. For Local i:Int = 0 Until queueSize
  57. pool._queue[i] = New TThreadPoolTask
  58. Next
  59. Return pool
  60. End Function
  61. Rem
  62. bbdoc: Returns the number of tasks in the queue.
  63. End Rem
  64. Method Count:Int()
  65. Return _count
  66. End Method
  67. Rem
  68. bbdoc: Returns the size of the queue.
  69. End Rem
  70. Method Size:Int()
  71. Return _queue.length
  72. End Method
  73. Rem
  74. bbdoc: Returns the number of busy/running threads.
  75. End Rem
  76. Method Running:Int()
  77. Return _running
  78. End Method
  79. Rem
  80. bbdoc: Adds a task to the queue.
  81. End Rem
  82. Method AddTask:Int(func:Object(data:Object), data:Object)
  83. Local result:Int = True
  84. _lock.Lock()
  85. Local slot:Int = _tail + 1
  86. If slot = _queue.length Then
  87. slot = 0
  88. End If
  89. While True
  90. If _count = _queue.length Then
  91. result = False
  92. Exit
  93. End If
  94. If _shutdown Then
  95. result = False
  96. Exit
  97. End If
  98. _queue[_tail].func = func
  99. _queue[_tail].data = data
  100. _tail = slot
  101. _count :+ 1
  102. _waitVar.Broadcast()
  103. Exit
  104. Wend
  105. _lock.Unlock()
  106. Return result
  107. End Method
  108. Rem
  109. bbdoc: Shutdown the pool.
  110. about: If @immediately is False, the queue will be processed to the end.
  111. End Rem
  112. Method Shutdown(immediately:Int = False)
  113. _lock.Lock()
  114. While True
  115. If _shutdown Then
  116. Return
  117. End If
  118. If immediately Then
  119. _shutdown = 2
  120. Else
  121. _shutdown = 1
  122. End If
  123. _waitVar.Broadcast()
  124. _lock.Unlock()
  125. For Local i:Int = 0 Until _threads.length
  126. _threads[i].Wait()
  127. Next
  128. Exit
  129. Wend
  130. _lock.Lock()
  131. _lock.Close()
  132. _waitVar.Close()
  133. End Method
  134. Function _ThreadPoolThread:Object(data:Object)
  135. Local pool:TThreadPool = TThreadPool(data)
  136. While True
  137. pool._lock.Lock()
  138. While pool._count = 0 And Not pool._shutdown
  139. pool._waitVar.Wait(pool._lock)
  140. Delay 5
  141. Wend
  142. If pool._shutdown And pool._count = 0 Then
  143. ' time to finish
  144. Exit
  145. End If
  146. Local task:TThreadPoolTask = pool._queue[pool._head]
  147. Local func:Object(data:Object) = task.func
  148. Local data:Object = task.data
  149. pool._head :+ 1
  150. If pool._head = pool._queue.length Then
  151. pool._head = 0
  152. End If
  153. ' less queue
  154. pool._count :- 1
  155. ' more running threads
  156. pool._running :+ 1
  157. pool._lock.Unlock()
  158. ' perform a task
  159. func(data)
  160. pool._lock.Lock()
  161. pool._running :- 1
  162. pool._lock.Unlock()
  163. Wend
  164. pool._lock.Unlock()
  165. End Function
  166. Method Delete()
  167. Shutdown()
  168. End Method
  169. End Type
  170. Type TThreadPoolTask
  171. Field func:Object(data:Object)
  172. Field data:Object
  173. End Type