JobManager.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. from panda3d.core import ConfigVariableBool, ConfigVariableDouble
  2. from direct.directnotify.DirectNotifyGlobal import directNotify
  3. from direct.task.TaskManagerGlobal import taskMgr
  4. from direct.showbase.Job import Job
  5. from direct.showbase.PythonUtil import flywheel
  6. from direct.showbase.MessengerGlobal import messenger
  7. class JobManager:
  8. """
  9. Similar to the taskMgr but designed for tasks that are CPU-intensive and/or
  10. not time-critical. Jobs run in a fixed timeslice that the JobManager is
  11. allotted each frame.
  12. """
  13. notify = directNotify.newCategory("JobManager")
  14. # there's one task for the JobManager, all jobs run in this task
  15. TaskName = 'jobManager'
  16. def __init__(self, timeslice=None):
  17. # how long do we run per frame
  18. self._timeslice = timeslice
  19. # store the jobs in these structures to allow fast lookup by various keys
  20. # priority -> jobId -> job
  21. self._pri2jobId2job = {}
  22. # priority -> chronological list of jobIds
  23. self._pri2jobIds = {}
  24. # jobId -> priority
  25. self._jobId2pri = {}
  26. # how many timeslices to give each job; this is used to efficiently implement
  27. # the relative job priorities
  28. self._jobId2timeslices = {}
  29. # how much time did the job use beyond the allotted timeslice, used to balance
  30. # out CPU usage
  31. self._jobId2overflowTime = {}
  32. self._useOverflowTime = None
  33. # this is a generator that we use to give high-priority jobs more timeslices,
  34. # it yields jobIds in a sequence that includes high-priority jobIds more often
  35. # than low-priority
  36. self._jobIdGenerator = None
  37. self._highestPriority = Job.Priorities.Normal
  38. def destroy(self):
  39. taskMgr.remove(JobManager.TaskName)
  40. del self._pri2jobId2job
  41. def add(self, job):
  42. pri = job.getPriority()
  43. jobId = job._getJobId()
  44. # store the job in the main table
  45. self._pri2jobId2job.setdefault(pri, {})
  46. self._pri2jobId2job[pri][jobId] = job
  47. # and also store a direct mapping from the job's ID to its priority
  48. self._jobId2pri[jobId] = pri
  49. # add the jobId onto the end of the list of jobIds for this priority
  50. self._pri2jobIds.setdefault(pri, [])
  51. self._pri2jobIds[pri].append(jobId)
  52. # record the job's relative timeslice count
  53. self._jobId2timeslices[jobId] = pri
  54. # init the overflow time tracking
  55. self._jobId2overflowTime[jobId] = 0.
  56. # reset the jobId round-robin
  57. self._jobIdGenerator = None
  58. if len(self._jobId2pri) == 1:
  59. taskMgr.add(self._process, JobManager.TaskName)
  60. self._highestPriority = pri
  61. elif pri > self._highestPriority:
  62. self._highestPriority = pri
  63. self.notify.debug('added job: %s' % job.getJobName())
  64. def remove(self, job):
  65. jobId = job._getJobId()
  66. # look up the job's priority
  67. pri = self._jobId2pri.pop(jobId)
  68. # TODO: this removal is a linear search
  69. self._pri2jobIds[pri].remove(jobId)
  70. # remove the job from the main table
  71. del self._pri2jobId2job[pri][jobId]
  72. # clean up the job's generator, if any
  73. job._cleanupGenerator()
  74. # remove the job's timeslice count
  75. self._jobId2timeslices.pop(jobId)
  76. # remove the overflow time
  77. self._jobId2overflowTime.pop(jobId)
  78. if len(self._pri2jobId2job[pri]) == 0:
  79. del self._pri2jobId2job[pri]
  80. if pri == self._highestPriority:
  81. if len(self._jobId2pri) > 0:
  82. # calculate a new highest priority
  83. # TODO: this is not very fast
  84. priorities = self._getSortedPriorities()
  85. self._highestPriority = priorities[-1]
  86. else:
  87. taskMgr.remove(JobManager.TaskName)
  88. self._highestPriority = 0
  89. self.notify.debug('removed job: %s' % job.getJobName())
  90. def finish(self, job):
  91. # run this job, right now, until it finishes
  92. assert self.notify.debugCall()
  93. jobId = job._getJobId()
  94. # look up the job's priority
  95. pri = self._jobId2pri[jobId]
  96. # grab the job
  97. job = self._pri2jobId2job[pri][jobId]
  98. gen = job._getGenerator()
  99. if __debug__:
  100. job._pstats.start()
  101. job.resume()
  102. while True:
  103. try:
  104. result = next(gen)
  105. except StopIteration:
  106. # Job didn't yield Job.Done, it ran off the end and returned
  107. # treat it as if it returned Job.Done
  108. self.notify.warning('job %s never yielded Job.Done' % job)
  109. result = Job.Done
  110. if result is Job.Done:
  111. job.suspend()
  112. self.remove(job)
  113. job._setFinished()
  114. messenger.send(job.getFinishedEvent())
  115. # job is done.
  116. break
  117. if __debug__:
  118. job._pstats.stop()
  119. # how long should we run per frame?
  120. @staticmethod
  121. def getDefaultTimeslice():
  122. # run for 1/2 millisecond per frame by default
  123. # config is in milliseconds, this func returns value in seconds
  124. return ConfigVariableDouble('job-manager-timeslice-ms', .5).value / 1000.
  125. def getTimeslice(self):
  126. if self._timeslice:
  127. return self._timeslice
  128. return self.getDefaultTimeslice()
  129. def setTimeslice(self, timeslice):
  130. self._timeslice = timeslice
  131. def _getSortedPriorities(self):
  132. # returns all job priorities in ascending order
  133. priorities = list(self._pri2jobId2job.keys())
  134. priorities.sort()
  135. return priorities
  136. def _process(self, task=None):
  137. if self._useOverflowTime is None:
  138. self._useOverflowTime = ConfigVariableBool('job-use-overflow-time', 1).value
  139. if len(self._pri2jobId2job) > 0:
  140. clock = ClockObject.getGlobalClock()
  141. #assert self.notify.debugCall()
  142. # figure out how long we can run
  143. endT = clock.getRealTime() + (self.getTimeslice() * .9)
  144. while True:
  145. if self._jobIdGenerator is None:
  146. # round-robin the jobs, giving high-priority jobs more timeslices
  147. self._jobIdGenerator = flywheel(
  148. list(self._jobId2timeslices.keys()),
  149. countFunc = lambda jobId: self._jobId2timeslices[jobId])
  150. try:
  151. # grab the next jobId in the sequence
  152. jobId = next(self._jobIdGenerator)
  153. except StopIteration:
  154. self._jobIdGenerator = None
  155. continue
  156. # OK, we've selected a job to run
  157. pri = self._jobId2pri.get(jobId)
  158. if pri is None:
  159. # this job is no longer present
  160. continue
  161. # check if there's overflow time that we need to make up for
  162. if self._useOverflowTime:
  163. overflowTime = self._jobId2overflowTime[jobId]
  164. timeLeft = endT - clock.getRealTime()
  165. if overflowTime >= timeLeft:
  166. self._jobId2overflowTime[jobId] = max(0., overflowTime-timeLeft)
  167. # don't run any more jobs this frame, this makes up
  168. # for the extra overflow time that was used before
  169. break
  170. job = self._pri2jobId2job[pri][jobId]
  171. gen = job._getGenerator()
  172. if __debug__:
  173. job._pstats.start()
  174. job.resume()
  175. while clock.getRealTime() < endT:
  176. try:
  177. result = next(gen)
  178. except StopIteration:
  179. # Job didn't yield Job.Done, it ran off the end and returned
  180. # treat it as if it returned Job.Done
  181. self.notify.warning('job %s never yielded Job.Done' % job)
  182. result = Job.Done
  183. if result is Job.Sleep:
  184. job.suspend()
  185. if __debug__:
  186. job._pstats.stop()
  187. # grab the next job if there's time left
  188. break
  189. elif result is Job.Done:
  190. job.suspend()
  191. self.remove(job)
  192. job._setFinished()
  193. if __debug__:
  194. job._pstats.stop()
  195. messenger.send(job.getFinishedEvent())
  196. # grab the next job if there's time left
  197. break
  198. else:
  199. # we've run out of time
  200. #assert self.notify.debug('timeslice end: %s, %s' % (endT, clock.getRealTime()))
  201. job.suspend()
  202. overflowTime = clock.getRealTime() - endT
  203. if overflowTime > self.getTimeslice():
  204. self._jobId2overflowTime[jobId] += overflowTime
  205. if __debug__:
  206. job._pstats.stop()
  207. break
  208. if len(self._pri2jobId2job) == 0:
  209. # there's nothing left to do, all the jobs are done!
  210. break
  211. return task.cont
  212. def __repr__(self):
  213. s = '======================================================='
  214. s += '\nJobManager: active jobs in descending order of priority'
  215. s += '\n======================================================='
  216. pris = self._getSortedPriorities()
  217. if len(pris) == 0:
  218. s += '\n no jobs running'
  219. else:
  220. pris.reverse()
  221. for pri in pris:
  222. jobId2job = self._pri2jobId2job[pri]
  223. # run through the jobs at this priority in the order that they will run
  224. for jobId in self._pri2jobIds[pri]:
  225. job = jobId2job[jobId]
  226. s += '\n%5d: %s (jobId %s)' % (pri, job.getJobName(), jobId)
  227. s += '\n'
  228. return s