Browse Source

added JobManager

Darren Ranalli 19 years ago
parent
commit
8a8b59d484

+ 65 - 0
direct/src/showbase/Job.py

@@ -0,0 +1,65 @@
+class Job:
+    # Base class for cpu-intensive or non-time-critical operations that
+    # are run through the JobManager.
+    Done = object()
+    Continue = object()
+    Priorities = ScratchPad(Low=-100, Normal=0, High=100)
+    _SerialGen = SerialNumGen()
+    
+    def __init__(self, name):
+        self._name = name
+        self._generator = None
+        self._id = Job._SerialGen.next()
+
+    def destroy(self):
+        del self._name
+        del self._generator
+
+    def getPriority(self):
+        # override if you want a different priority
+        # you can use numbers other than those in Job.Priorities
+        return Job.Priorities.Normal
+
+    def run(self):
+        # override and yield Job.Continue when possible/reasonable
+        # try not to run longer than the JobManager's timeslice between yields
+        # when done, yield Job.Done
+        raise "don't call down"
+
+    def _getJobId(self):
+        return self._id
+
+    def _getGenerator(self):
+        if self._generator is None:
+            self._generator = self.run()
+        return self._generator
+
+if __debug__: # __dev__ not yet available at this point
+    from direct.showbase.Job import Job
+    class TestJob(Job):
+        def __init__(self):
+            Job.__init__(self, 'TestJob')
+            self._counter = 0
+            self._accum = 0
+            self._finished = False
+
+        def run(self):
+            while True:
+                while self._accum < 100:
+                    self._accum += 1
+                    print 'counter = %s, accum = %s' % (self._counter, self._accum)
+                    yield Job.Continue
+
+                self._accum = 0
+                self._counter += 1
+
+                if self._counter >= 100:
+                    print 'Job.Done'
+                    yield Job.Done
+                else:
+                    yield Job.Continue
+
+    def addTestJob():
+        t = TestJob()
+        jobMgr.add(t)
+        

+ 108 - 0
direct/src/showbase/JobManager.py

@@ -0,0 +1,108 @@
+from direct.directnotify.DirectNotifyGlobal import directNotify
+from direct.task.TaskManagerGlobal import taskMgr
+from direct.showbase.Job import Job
+
+class JobManager:
+    """
+    Similar to the taskMgr but designed for tasks that are CPU-intensive and/or
+    not time-critical. Jobs run one at a time, in order of priority, in
+    the timeslice that the JobManager is allowed to run each frame.
+    """
+    notify = directNotify.newCategory("JobManager")
+
+    # there's one main task for the JobManager, all jobs run in this task
+    TaskName = 'jobManager'
+    # run for one millisecond per frame by default
+    DefTimeslice = .001
+
+    def __init__(self, timeslice=None):
+        if timeslice is None:
+            timeslice = JobManager.DefTimeslice
+        # how long do we run per frame
+        self._timeslice = timeslice
+        # store the jobs in these structures to allow fast lookup by various keys
+        # priority -> jobId -> job
+        self._pri2jobId2job = {}
+        # priority -> chronological list of jobIds
+        self._pri2jobIds = {}
+        # jobId -> priority
+        self._jobId2pri = {}
+        self._highestPriority = Job.Priorities.Normal
+
+    def destroy(self):
+        taskMgr.remove(JobManager.TaskName)
+        del self._pri2jobId2job
+
+    def add(self, job):
+        assert self.notify.debugCall()
+        pri = job.getPriority()
+        jobId = job._getJobId()
+        # store the job in the main table
+        self._pri2jobId2job.setdefault(pri, {})
+        self._pri2jobId2job[pri][jobId] = job
+        # and also store a direct mapping from the job's ID to its priority
+        self._jobId2pri[jobId] = pri
+        # add the jobId onto the end of the list of jobIds for this priority
+        self._pri2jobIds.setdefault(pri, [])
+        self._pri2jobIds[pri].append(jobId)
+        if pri > self._highestPriority:
+            self._highestPriority = pri
+        if len(self._jobId2pri) == 1:
+            taskMgr.add(self._process, JobManager.TaskName)
+        
+    def remove(self, job):
+        assert self.notify.debugCall()
+        jobId = job._getJobId()
+        # look up the job's priority
+        pri = self._jobId2pri.pop(jobId)
+        # TODO: this removal is a linear search
+        self._pri2jobIds[pri].remove(jobId)
+        # remove the job from the main table
+        del self._pri2jobId2job[pri][jobId]
+        if len(self._pri2jobId2job[pri]) == 0:
+            del self._pri2jobId2job[pri]
+            if pri == self._highestPriority:
+                if len(self._jobId2pri) > 0:
+                    # calculate a new highest priority
+                    # TODO: this is not very fast
+                    priorities = self._pri2jobId2job.keys()
+                    priorities.sort()
+                    self._highestPriority = priorities[-1]
+                else:
+                    taskMgr.remove(JobManager.TaskName)
+                    self._highestPriority = 0
+
+    # how long should we run per frame?
+    def getTimeslice(self):
+        return self._timeslice
+    def setTimeslice(self, timeslice):
+        self._timeslice = timeslice
+
+    def _process(self, task=None):
+        if len(self._pri2jobId2job):
+            assert self.notify.debugCall()
+            # figure out how long we can run
+            endT = globalClock.getRealTime() + (self._timeslice * .9)
+            while True:
+                # always process the highest priority first
+                jobId2job = self._pri2jobId2job[self._highestPriority]
+                # process jobs with equal priority in the order they came in
+                jobId = self._pri2jobIds[self._highestPriority][-1]
+                job = jobId2job[jobId]
+                gen = job._getGenerator()
+                while globalClock.getRealTime() < endT:
+                    result = gen.next()
+                    if result is Job.Done:
+                        self.remove(job)
+                        # highest-priority job is done.
+                        # grab the next one if there's time left
+                        break
+                else:
+                    # we've run out of time
+                    assert self.notify.debug('out of time: %s, %s' % (endT, globalClock.getRealTime()))
+                    break
+                
+                if len(self._pri2jobId2job) == 0:
+                    # there's nothing left to do
+                    break
+        return task.cont

+ 5 - 0
direct/src/showbase/JobManagerGlobal.py

@@ -0,0 +1,5 @@
+__all__ = ['jobMgr']
+
+import JobManager
+
+jobMgr = JobManager.JobManager()