JobManager.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. from direct.directnotify.DirectNotifyGlobal import directNotify
  2. from direct.task.TaskManagerGlobal import taskMgr
  3. from direct.showbase.Job import Job
  4. from direct.showbase.PythonUtil import getBase
  5. class JobManager:
  6. """
  7. Similar to the taskMgr but designed for tasks that are CPU-intensive and/or
  8. not time-critical. Jobs run in a fixed timeslice that the JobManager is
  9. allotted each frame.
  10. """
  11. notify = directNotify.newCategory("JobManager")
  12. # there's one task for the JobManager, all jobs run in this task
  13. TaskName = 'jobManager'
  14. def __init__(self, timeslice=None):
  15. # how long do we run per frame
  16. self._timeslice = timeslice
  17. # store the jobs in these structures to allow fast lookup by various keys
  18. # priority -> jobId -> job
  19. self._pri2jobId2job = {}
  20. # priority -> chronological list of jobIds
  21. self._pri2jobIds = {}
  22. # jobId -> priority
  23. self._jobId2pri = {}
  24. # how many timeslices to give each job; this is used to efficiently implement
  25. # the relative job priorities
  26. self._jobId2timeslices = {}
  27. # how much time did the job use beyond the allotted timeslice, used to balance
  28. # out CPU usage
  29. self._jobId2overflowTime = {}
  30. self._useOverflowTime = None
  31. # this is a generator that we use to give high-priority jobs more timeslices,
  32. # it yields jobIds in a sequence that includes high-priority jobIds more often
  33. # than low-priority
  34. self._jobIdGenerator = None
  35. self._highestPriority = Job.Priorities.Normal
  36. def destroy(self):
  37. taskMgr.remove(JobManager.TaskName)
  38. del self._pri2jobId2job
  39. def add(self, job):
  40. pri = job.getPriority()
  41. jobId = job._getJobId()
  42. # store the job in the main table
  43. self._pri2jobId2job.setdefault(pri, {})
  44. self._pri2jobId2job[pri][jobId] = job
  45. # and also store a direct mapping from the job's ID to its priority
  46. self._jobId2pri[jobId] = pri
  47. # add the jobId onto the end of the list of jobIds for this priority
  48. self._pri2jobIds.setdefault(pri, [])
  49. self._pri2jobIds[pri].append(jobId)
  50. # record the job's relative timeslice count
  51. self._jobId2timeslices[jobId] = pri
  52. # init the overflow time tracking
  53. self._jobId2overflowTime[jobId] = 0.
  54. # reset the jobId round-robin
  55. self._jobIdGenerator = None
  56. if len(self._jobId2pri) == 1:
  57. taskMgr.add(self._process, JobManager.TaskName)
  58. self._highestPriority = pri
  59. elif pri > self._highestPriority:
  60. self._highestPriority = pri
  61. self.notify.debug('added job: %s' % job.getJobName())
  62. def remove(self, job):
  63. jobId = job._getJobId()
  64. # look up the job's priority
  65. pri = self._jobId2pri.pop(jobId)
  66. # TODO: this removal is a linear search
  67. self._pri2jobIds[pri].remove(jobId)
  68. # remove the job from the main table
  69. del self._pri2jobId2job[pri][jobId]
  70. # clean up the job's generator, if any
  71. job._cleanupGenerator()
  72. # remove the job's timeslice count
  73. self._jobId2timeslices.pop(jobId)
  74. # remove the overflow time
  75. self._jobId2overflowTime.pop(jobId)
  76. if len(self._pri2jobId2job[pri]) == 0:
  77. del self._pri2jobId2job[pri]
  78. if pri == self._highestPriority:
  79. if len(self._jobId2pri) > 0:
  80. # calculate a new highest priority
  81. # TODO: this is not very fast
  82. priorities = self._getSortedPriorities()
  83. self._highestPriority = priorities[-1]
  84. else:
  85. taskMgr.remove(JobManager.TaskName)
  86. self._highestPriority = 0
  87. self.notify.debug('removed job: %s' % job.getJobName())
  88. def finish(self, job):
  89. # run this job, right now, until it finishes
  90. assert self.notify.debugCall()
  91. jobId = job._getJobId()
  92. # look up the job's priority
  93. pri = self._jobId2pri[jobId]
  94. # grab the job
  95. job = self._pri2jobId2job[pri][jobId]
  96. gen = job._getGenerator()
  97. if __debug__:
  98. job._pstats.start()
  99. job.resume()
  100. while True:
  101. try:
  102. result = gen.next()
  103. except StopIteration:
  104. # Job didn't yield Job.Done, it ran off the end and returned
  105. # treat it as if it returned Job.Done
  106. self.notify.warning('job %s never yielded Job.Done' % job)
  107. result = Job.Done
  108. if result is Job.Done:
  109. job.suspend()
  110. self.remove(job)
  111. job._setFinished()
  112. messenger.send(job.getFinishedEvent())
  113. # job is done.
  114. break
  115. if __debug__:
  116. job._pstats.stop()
  117. # how long should we run per frame?
  118. @staticmethod
  119. def getDefaultTimeslice():
  120. # run for 1/2 millisecond per frame by default
  121. # config is in milliseconds, this func returns value in seconds
  122. return getBase().config.GetFloat('job-manager-timeslice-ms', .5) / 1000.
  123. def getTimeslice(self):
  124. if self._timeslice:
  125. return self._timeslice
  126. return self.getDefaultTimeslice()
  127. def setTimeslice(self, timeslice):
  128. self._timeslice = timeslice
  129. def _getSortedPriorities(self):
  130. # returns all job priorities in ascending order
  131. priorities = self._pri2jobId2job.keys()
  132. priorities.sort()
  133. return priorities
  134. def _process(self, task=None):
  135. if self._useOverflowTime is None:
  136. self._useOverflowTime = config.GetBool('job-use-overflow-time', 1)
  137. if len(self._pri2jobId2job):
  138. #assert self.notify.debugCall()
  139. # figure out how long we can run
  140. endT = globalClock.getRealTime() + (self.getTimeslice() * .9)
  141. while True:
  142. if self._jobIdGenerator is None:
  143. # round-robin the jobs, giving high-priority jobs more timeslices
  144. self._jobIdGenerator = flywheel(
  145. self._jobId2timeslices.keys(),
  146. countFunc = lambda jobId: self._jobId2timeslices[jobId])
  147. try:
  148. # grab the next jobId in the sequence
  149. jobId = self._jobIdGenerator.next()
  150. except StopIteration:
  151. self._jobIdGenerator = None
  152. continue
  153. # OK, we've selected a job to run
  154. pri = self._jobId2pri.get(jobId)
  155. if pri is None:
  156. # this job is no longer present
  157. continue
  158. # check if there's overflow time that we need to make up for
  159. if self._useOverflowTime:
  160. overflowTime = self._jobId2overflowTime[jobId]
  161. timeLeft = endT - globalClock.getRealTime()
  162. if overflowTime >= timeLeft:
  163. self._jobId2overflowTime[jobId] = max(0., overflowTime-timeLeft)
  164. # don't run any more jobs this frame, this makes up
  165. # for the extra overflow time that was used before
  166. break
  167. job = self._pri2jobId2job[pri][jobId]
  168. gen = job._getGenerator()
  169. if __debug__:
  170. job._pstats.start()
  171. job.resume()
  172. while globalClock.getRealTime() < endT:
  173. try:
  174. result = gen.next()
  175. except StopIteration:
  176. # Job didn't yield Job.Done, it ran off the end and returned
  177. # treat it as if it returned Job.Done
  178. self.notify.warning('job %s never yielded Job.Done' % job)
  179. result = Job.Done
  180. if result is Job.Sleep:
  181. job.suspend()
  182. if __debug__:
  183. job._pstats.stop()
  184. # grab the next job if there's time left
  185. break
  186. elif result is Job.Done:
  187. job.suspend()
  188. self.remove(job)
  189. job._setFinished()
  190. if __debug__:
  191. job._pstats.stop()
  192. messenger.send(job.getFinishedEvent())
  193. # grab the next job if there's time left
  194. break
  195. else:
  196. # we've run out of time
  197. #assert self.notify.debug('timeslice end: %s, %s' % (endT, globalClock.getRealTime()))
  198. job.suspend()
  199. overflowTime = globalClock.getRealTime() - endT
  200. if overflowTime > self.getTimeslice():
  201. self._jobId2overflowTime[jobId] += overflowTime
  202. if __debug__:
  203. job._pstats.stop()
  204. break
  205. if len(self._pri2jobId2job) == 0:
  206. # there's nothing left to do, all the jobs are done!
  207. break
  208. return task.cont
  209. def __repr__(self):
  210. s = '======================================================='
  211. s += '\nJobManager: active jobs in descending order of priority'
  212. s += '\n======================================================='
  213. pris = self._getSortedPriorities()
  214. if len(pris) == 0:
  215. s += '\n no jobs running'
  216. else:
  217. pris.reverse()
  218. for pri in pris:
  219. jobId2job = self._pri2jobId2job[pri]
  220. # run through the jobs at this priority in the order that they will run
  221. for jobId in self._pri2jobIds[pri]:
  222. job = jobId2job[jobId]
  223. s += '\n%5d: %s (jobId %s)' % (pri, job.getJobName(), jobId)
  224. s += '\n'
  225. return s