Browse Source

DS_pickup

David Rose 17 years ago
parent
commit
8bb89607ab

+ 14 - 2
direct/src/task/TaskNew.py

@@ -3,7 +3,7 @@ AsyncTaskManager interface.  It replaces the old full-Python
 implementation of the Task system. """
 implementation of the Task system. """
 
 
 __all__ = ['Task', 'TaskManager',
 __all__ = ['Task', 'TaskManager',
-           'exit', 'cont', 'done', 'again', 'restart']
+           'cont', 'done', 'again', 'pickup', 'restart']
 
 
 from direct.directnotify.DirectNotifyGlobal import *
 from direct.directnotify.DirectNotifyGlobal import *
 from direct.showbase import ExceptionVarDump
 from direct.showbase import ExceptionVarDump
@@ -56,6 +56,7 @@ def print_exc_plus():
 done = AsyncTask.DSDone
 done = AsyncTask.DSDone
 cont = AsyncTask.DSCont
 cont = AsyncTask.DSCont
 again = AsyncTask.DSAgain
 again = AsyncTask.DSAgain
+pickup = AsyncTask.DSPickup
 restart = AsyncTask.DSRestart
 restart = AsyncTask.DSRestart
 
 
 # Alias PythonTask to Task for historical purposes.
 # Alias PythonTask to Task for historical purposes.
@@ -67,6 +68,7 @@ Task = PythonTask
 Task.DtoolClassDict['done'] = done
 Task.DtoolClassDict['done'] = done
 Task.DtoolClassDict['cont'] = cont
 Task.DtoolClassDict['cont'] = cont
 Task.DtoolClassDict['again'] = again
 Task.DtoolClassDict['again'] = again
+Task.DtoolClassDict['pickup'] = pickup
 Task.DtoolClassDict['restart'] = restart
 Task.DtoolClassDict['restart'] = restart
 
 
 class TaskManager:
 class TaskManager:
@@ -104,7 +106,8 @@ class TaskManager:
             signal.signal(signal.SIGINT, self.invokeDefaultHandler)
             signal.signal(signal.SIGINT, self.invokeDefaultHandler)
 
 
     def setupTaskChain(self, chainName, numThreads = None, tickClock = None,
     def setupTaskChain(self, chainName, numThreads = None, tickClock = None,
-                       threadPriority = None, frameBudget = None):
+                       threadPriority = None, frameBudget = None,
+                       timeslicePriority = None):
         """Defines a new task chain.  Each task chain executes tasks
         """Defines a new task chain.  Each task chain executes tasks
         potentially in parallel with all of the other task chains (if
         potentially in parallel with all of the other task chains (if
         numThreads is more than zero).  When a new task is created, it
         numThreads is more than zero).  When a new task is created, it
@@ -135,6 +138,13 @@ class TaskManager:
         allow this task chain to run per frame.  Set it to -1 to mean
         allow this task chain to run per frame.  Set it to -1 to mean
         no limit (the default).  It's not directly related to
         no limit (the default).  It's not directly related to
         threadPriority.
         threadPriority.
+
+        timeslicePriority is False in the default mode, in which each
+        task runs exactly once each frame, round-robin style,
+        regardless of the task's priority value; or True to change the
+        meaning of priority so that certain tasks are run less often,
+        in proportion to their time used and to their priority value.
+        See AsyncTaskManager.setTimeslicePriority() for more.
         """
         """
         
         
         chain = self.mgr.makeTaskChain(chainName)
         chain = self.mgr.makeTaskChain(chainName)
@@ -146,6 +156,8 @@ class TaskManager:
             chain.setThreadPriority(threadPriority)
             chain.setThreadPriority(threadPriority)
         if frameBudget is not None:
         if frameBudget is not None:
             chain.setFrameBudget(frameBudget)
             chain.setFrameBudget(frameBudget)
+        if timeslicePriority is not None:
+            chain.setTimeslicePriority(timeslicePriority)
 
 
     def hasTaskNamed(self, taskName):
     def hasTaskNamed(self, taskName):
         """Returns true if there is at least one task, active or
         """Returns true if there is at least one task, active or

+ 23 - 9
panda/src/event/asyncTask.cxx

@@ -252,6 +252,17 @@ set_sort(int sort) {
 //               priorities may execute at the same time, if the
 //               priorities may execute at the same time, if the
 //               AsyncTaskManager has more than one thread servicing
 //               AsyncTaskManager has more than one thread servicing
 //               tasks.
 //               tasks.
+//
+//               Also see AsyncTaskChain::set_timeslice_priority(),
+//               which changes the meaning of this value.  In the
+//               default mode, when the timeslice_priority flag is
+//               false, all tasks always run once per epoch,
+//               regardless of their priority values (that is, the
+//               priority controls the order of the task execution
+//               only, not the number of times it runs).  On the other
+//               hand, if you set the timeslice_priority flag to true,
+//               then changing a task's priority has an effect on the
+//               number of times it runs.
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 void AsyncTask::
 void AsyncTask::
 set_priority(int priority) {
 set_priority(int priority) {
@@ -321,7 +332,6 @@ unlock_and_do_task() {
   _dt = end - start;
   _dt = end - start;
   _max_dt = max(_dt, _max_dt);
   _max_dt = max(_dt, _max_dt);
   _total_dt += _dt;
   _total_dt += _dt;
-  ++_num_frames;
 
 
   _chain->_time_in_frame += _dt;
   _chain->_time_in_frame += _dt;
 
 
@@ -343,16 +353,20 @@ unlock_and_do_task() {
 //               DS_again: put the task to sleep for get_delay()
 //               DS_again: put the task to sleep for get_delay()
 //               seconds, then put it back on the active queue.
 //               seconds, then put it back on the active queue.
 //
 //
-//               DS_abort: abort the task, and interrupt the whole
-//               AsyncTaskManager.
+//               DS_pickup: like DS_cont, but if the task chain has a
+//               frame budget and that budget has not yet been met,
+//               re-run the task again without waiting for the next
+//               frame.  Otherwise, run it next epoch as usual.
 //
 //
 //               DS_restart: like DS_cont, but next time call the
 //               DS_restart: like DS_cont, but next time call the
-//               function from the beginning.  This only has meaning
-//               to a PythonTask that has already used the yield
-//               expression to return a generator, in which case it
-//               provides a way to abort the generator and create a
-//               new one by calling the function again.  In other
-//               contexts, this behaves exactly the same as DS_cont.
+//               function from the beginning, almost as if it were
+//               freshly added to the task manager.  The task's
+//               get_start_time() will be reset to now, and its
+//               get_elapsed_time() will be reset to 0.  Timing
+//               accounting, however, is not reset.
+//
+//               DS_abort: abort the task, and interrupt the whole
+//               AsyncTaskManager.
 //
 //
 //               This function is called with the lock *not* held.
 //               This function is called with the lock *not* held.
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////

+ 7 - 5
panda/src/event/asyncTask.h

@@ -49,11 +49,12 @@ public:
 
 
 PUBLISHED:
 PUBLISHED:
   enum DoneStatus {
   enum DoneStatus {
-    DS_done,   // normal task completion
-    DS_cont,   // run task again next epoch
-    DS_again,  // run task again after get_delay() seconds
-    DS_abort,  // abort the task and interrupt the whole task manager
-    DS_restart, // like cont, but next time start the task from the beginning.  Only meaningful for a PythonTask which has yielded a generator.
+    DS_done,    // normal task completion
+    DS_cont,    // run task again next epoch
+    DS_again,   // run task again after at least get_delay() seconds
+    DS_pickup,  // run task again this frame, if frame budget allows
+    DS_restart, // start the task over from the beginning
+    DS_abort,   // abort the task and interrupt the whole task manager
   };
   };
 
 
   enum State {
   enum State {
@@ -108,6 +109,7 @@ PUBLISHED:
 
 
 protected:
 protected:
   DoneStatus unlock_and_do_task();
   DoneStatus unlock_and_do_task();
+
   virtual DoneStatus do_task();
   virtual DoneStatus do_task();
   virtual void upon_birth();
   virtual void upon_birth();
   virtual void upon_death(bool clean_exit);
   virtual void upon_death(bool clean_exit);

+ 295 - 81
panda/src/event/asyncTaskChain.cxx

@@ -39,6 +39,7 @@ AsyncTaskChain(AsyncTaskManager *manager, const string &name) :
   _manager(manager),
   _manager(manager),
   _cvar(manager->_lock),
   _cvar(manager->_lock),
   _tick_clock(false),
   _tick_clock(false),
+  _timeslice_priority(false),
   _num_threads(0),
   _num_threads(0),
   _thread_priority(TP_normal),
   _thread_priority(TP_normal),
   _frame_budget(-1.0),
   _frame_budget(-1.0),
@@ -46,6 +47,7 @@ AsyncTaskChain(AsyncTaskManager *manager, const string &name) :
   _num_tasks(0),
   _num_tasks(0),
   _state(S_initial),
   _state(S_initial),
   _current_sort(-INT_MAX),
   _current_sort(-INT_MAX),
+  _pickup_mode(false),
   _needs_cleanup(false),
   _needs_cleanup(false),
   _current_frame(0),
   _current_frame(0),
   _time_in_frame(0.0)
   _time_in_frame(0.0)
@@ -77,12 +79,8 @@ AsyncTaskChain::
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 void AsyncTaskChain::
 void AsyncTaskChain::
 set_tick_clock(bool tick_clock) {
 set_tick_clock(bool tick_clock) {
-  if (_manager != (AsyncTaskManager *)NULL) {
-    MutexHolder holder(_manager->_lock);
-    _tick_clock = tick_clock;
-  } else {
-    _tick_clock = tick_clock;
-  }
+  MutexHolder holder(_manager->_lock);
+  _tick_clock = tick_clock;
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
@@ -92,12 +90,8 @@ set_tick_clock(bool tick_clock) {
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 bool AsyncTaskChain::
 bool AsyncTaskChain::
 get_tick_clock() const {
 get_tick_clock() const {
-  if (_manager != (AsyncTaskManager *)NULL) {
-    MutexHolder holder(_manager->_lock);
-    return _tick_clock;
-  } else {
-    return _tick_clock;
-  }
+  MutexHolder holder(_manager->_lock);
+  return _tick_clock;
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
@@ -199,12 +193,8 @@ get_thread_priority() const {
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 void AsyncTaskChain::
 void AsyncTaskChain::
 set_frame_budget(double frame_budget) {
 set_frame_budget(double frame_budget) {
-  if (_manager != (AsyncTaskManager *)NULL) {
-    MutexHolder holder(_manager->_lock);
-    _frame_budget = frame_budget;
-  } else {
-    _frame_budget = frame_budget;
-  }
+  MutexHolder holder(_manager->_lock);
+  _frame_budget = frame_budget;
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
@@ -216,12 +206,52 @@ set_frame_budget(double frame_budget) {
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 double AsyncTaskChain::
 double AsyncTaskChain::
 get_frame_budget() const {
 get_frame_budget() const {
-  if (_manager != (AsyncTaskManager *)NULL) {
-    MutexHolder holder(_manager->_lock);
-    return _frame_budget;
-  } else {
-    return _frame_budget;
-  }
+  MutexHolder holder(_manager->_lock);
+  return _frame_budget;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::set_timeslice_priority
+//       Access: Published
+//  Description: Sets the timeslice_priority flag.  This changes
+//               the interpretation of priority, and the number of
+//               times per epoch each task will run.  
+//
+//               When this flag is true, some tasks might not run in
+//               any given epoch.  Instead, tasks with priority higher
+//               than 1 will be given precedence, in proportion to the
+//               amount of time they have already used.  This gives
+//               higher-priority tasks more runtime than
+//               lower-priority tasks.  Each task gets the amount of
+//               time proportional to its priority value, so a task
+//               with priority 100 will get five times as much
+//               processing time as a task with priority 20.  For
+//               these purposes, priority values less than 1 are
+//               deemed to be equal to 1.
+//
+//               When this flag is false (the default), all tasks are
+//               run exactly once each epoch, round-robin style.
+//               Priority is only used to determine which task runs
+//               first within tasks of the same sort value.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskChain::
+set_timeslice_priority(bool timeslice_priority) {
+  MutexHolder holder(_manager->_lock);
+  _timeslice_priority = timeslice_priority;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::get_timeslice_priority
+//       Access: Published
+//  Description: Returns the timeslice_priority flag.  This changes
+//               the interpretation of priority, and the number of
+//               times per epoch each task will run.  See
+//               set_timeslice_priority().
+////////////////////////////////////////////////////////////////////
+bool AsyncTaskChain::
+get_timeslice_priority() const {
+  MutexHolder holder(_manager->_lock);
+  return _timeslice_priority;
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
@@ -504,8 +534,12 @@ do_remove(AsyncTask *task) {
         make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
         make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
       } else {
       } else {
         index = find_task_on_heap(_next_active, task);
         index = find_task_on_heap(_next_active, task);
-        nassertr(index != -1, false);
-        _next_active.erase(_next_active.begin() + index);
+        if (index != -1) {
+          _next_active.erase(_next_active.begin() + index);
+        } else {
+          index = find_task_on_heap(_this_active, task);
+          nassertr(index != -1, false);
+        }
       }
       }
       removed = true;
       removed = true;
       cleanup_task(task, false);
       cleanup_task(task, false);
@@ -562,8 +596,9 @@ do_cleanup() {
   // iterating, because cleanup_task() might release the lock,
   // iterating, because cleanup_task() might release the lock,
   // allowing the iterators to become invalid.
   // allowing the iterators to become invalid.
 
 
-  TaskHeap active, next_active, sleeping;
+  TaskHeap active, this_active, next_active, sleeping;
   active.swap(_active);
   active.swap(_active);
+  this_active.swap(_this_active);
   next_active.swap(_next_active);
   next_active.swap(_next_active);
   sleeping.swap(_sleeping);
   sleeping.swap(_sleeping);
 
 
@@ -574,6 +609,10 @@ do_cleanup() {
     AsyncTask *task = (*ti);
     AsyncTask *task = (*ti);
     cleanup_task(task, false);
     cleanup_task(task, false);
   }
   }
+  for (ti = this_active.begin(); ti != this_active.end(); ++ti) {
+    AsyncTask *task = (*ti);
+    cleanup_task(task, false);
+  }
   for (ti = next_active.begin(); ti != next_active.end(); ++ti) {
   for (ti = next_active.begin(); ti != next_active.end(); ++ti) {
     AsyncTask *task = (*ti);
     AsyncTask *task = (*ti);
     cleanup_task(task, false);
     cleanup_task(task, false);
@@ -596,7 +635,8 @@ bool AsyncTaskChain::
 do_has_task(AsyncTask *task) const {
 do_has_task(AsyncTask *task) const {
   return (find_task_on_heap(_active, task) != -1 ||
   return (find_task_on_heap(_active, task) != -1 ||
           find_task_on_heap(_next_active, task) != -1 ||
           find_task_on_heap(_next_active, task) != -1 ||
-          find_task_on_heap(_sleeping, task) != -1);
+          find_task_on_heap(_sleeping, task) != -1 ||
+          find_task_on_heap(_this_active, task) != -1);
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
@@ -669,8 +709,11 @@ service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
 
 
       } else {
       } else {
         switch (ds) {
         switch (ds) {
-        case AsyncTask::DS_cont:
         case AsyncTask::DS_restart:
         case AsyncTask::DS_restart:
+          task->_start_time = _manager->_clock->get_frame_time();
+          // Fall through.
+
+        case AsyncTask::DS_cont:
           // The task is still alive; put it on the next frame's active
           // The task is still alive; put it on the next frame's active
           // queue.
           // queue.
           task->_state = AsyncTask::S_active;
           task->_state = AsyncTask::S_active;
@@ -695,6 +738,13 @@ service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
           }
           }
           break;
           break;
 
 
+        case AsyncTask::DS_pickup:
+          // The task wants to run again this frame if possible.
+          task->_state = AsyncTask::S_active;
+          _this_active.push_back(task);
+          _cvar.signal_all();
+          break;
+
         case AsyncTask::DS_abort:
         case AsyncTask::DS_abort:
           // The task had an exception and wants to raise a big flag.
           // The task had an exception and wants to raise a big flag.
           cleanup_task(task, false);
           cleanup_task(task, false);
@@ -763,55 +813,98 @@ finish_sort_group() {
   }
   }
 
 
   // There are no more tasks in this epoch; advance to the next epoch.
   // There are no more tasks in this epoch; advance to the next epoch.
-  if (task_cat.is_spam()) {
-    do_output(task_cat.spam());
-    task_cat.spam(false)
-      << ": next epoch\n";
-  }
 
 
-  if (_tick_clock) {
+  if (!_this_active.empty() && _frame_budget >= 0.0) {
+    // Enter pickup mode.  This is a special mode at the end of the
+    // epoch in which we are just re-running the tasks that think they
+    // can still run within the frame, in an attempt to use up our
+    // frame budget.
+
     if (task_cat.is_spam()) {
     if (task_cat.is_spam()) {
       do_output(task_cat.spam());
       do_output(task_cat.spam());
       task_cat.spam(false)
       task_cat.spam(false)
-        << ": tick clock\n";
+        << ": next epoch (pickup mode)\n";
+    }
+
+    _pickup_mode = true;
+    _active.swap(_this_active);
+
+  } else {
+    // Not in pickup mode.
+
+    if (task_cat.is_spam()) {
+      do_output(task_cat.spam());
+      task_cat.spam(false)
+        << ": next epoch\n";
+    }
+
+    _pickup_mode = false;
+
+    // Here, there's no difference between _this_active and
+    // _next_active.  Combine them.
+    _next_active.insert(_next_active.end(), _this_active.begin(), _this_active.end());
+    _this_active.clear();
+
+    _active.swap(_next_active);
+
+    // We only tick the clock and wake sleepers in normal mode, the
+    // first time through the task list; not in pickup mode when we
+    // are re-running the stragglers just to use up our frame budget.
+
+    if (_tick_clock) {
+      if (task_cat.is_spam()) {
+        do_output(task_cat.spam());
+        task_cat.spam(false)
+          << ": tick clock\n";
+      }
+      _manager->_clock->tick();
+      _manager->_frame_cvar.signal_all();
+    }
+    if (!_threads.empty()) {
+      PStatClient::thread_tick(get_name());
     }
     }
-    _manager->_clock->tick();
-    _manager->_frame_cvar.signal_all();
-  }
-  if (!_threads.empty()) {
-    PStatClient::thread_tick(get_name());
-  }
     
     
-  _active.swap(_next_active);
+    // Check for any sleeping tasks that need to be woken.
+    double now = _manager->_clock->get_frame_time();
+    while (!_sleeping.empty() && _sleeping.front()->get_wake_time() <= now) {
+      PT(AsyncTask) task = _sleeping.front();
+      if (task_cat.is_spam()) {
+        task_cat.spam()
+          << "Waking " << *task << ", wake time at " 
+          << task->get_wake_time() - now << "\n";
+      }
+      pop_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
+      _sleeping.pop_back();
+      task->_state = AsyncTask::S_active;
+      _active.push_back(task);
+    }
 
 
-  // Check for any sleeping tasks that need to be woken.
-  double now = _manager->_clock->get_frame_time();
-  while (!_sleeping.empty() && _sleeping.front()->get_wake_time() <= now) {
-    PT(AsyncTask) task = _sleeping.front();
     if (task_cat.is_spam()) {
     if (task_cat.is_spam()) {
-      task_cat.spam()
-        << "Waking " << *task << ", wake time at " 
-        << task->get_wake_time() - now << "\n";
+      if (_sleeping.empty()) {
+        task_cat.spam()
+          << "No more tasks on sleeping queue.\n";
+      } else {
+        task_cat.spam()
+          << "Next sleeper: " << *_sleeping.front() << ", wake time at " 
+          << _sleeping.front()->get_wake_time() - now << "\n";
+      }
     }
     }
-    pop_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
-    _sleeping.pop_back();
-    task->_state = AsyncTask::S_active;
-    _active.push_back(task);
-  }
 
 
-  if (task_cat.is_spam()) {
-    if (_sleeping.empty()) {
-      task_cat.spam()
-        << "No more tasks on sleeping queue.\n";
-    } else {
-      task_cat.spam()
-        << "Next sleeper: " << *_sleeping.front() << ", wake time at " 
-        << _sleeping.front()->get_wake_time() - now << "\n";
+    // Any tasks that are on the active queue at the beginning of the
+    // epoch are deemed to have run one frame (or to be about to).
+    TaskHeap::const_iterator ti;
+    for (ti = _active.begin(); ti != _active.end(); ++ti) {
+      AsyncTask *task = (*ti);
+      ++task->_num_frames;
     }
     }
   }
   }
 
 
+  if (_timeslice_priority) {
+    filter_timeslice_priority();
+  }
+
+  nassertr(_num_tasks == _active.size() + _this_active.size() + _next_active.size() + _sleeping.size(), true);
   make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
   make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority());
-  nassertr(_num_tasks == _active.size() + _sleeping.size(), true);
 
 
   _current_sort = -INT_MAX;
   _current_sort = -INT_MAX;
 
 
@@ -822,9 +915,90 @@ finish_sort_group() {
   }
   }
 
 
   // There are no tasks to be had anywhere.  Chill.
   // There are no tasks to be had anywhere.  Chill.
+  _pickup_mode = false;
+  nassertr(_this_active.empty(), false);
   return false;
   return false;
 }
 }
 
 
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::filter_timeslice_priority
+//       Access: Protected
+//  Description: Called to filter the _active tasks list when we are
+//               in the special timeslice_priority mode.  In this
+//               mode, go through and postpone any tasks that have
+//               already exceeded their priority budget for this
+//               epoch.
+//
+//               Assumes the lock is already held.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskChain::
+filter_timeslice_priority() {
+  if (_active.empty()) {
+    return;
+  }
+  nassertv(_timeslice_priority);
+
+  // We must first sum up the average per-epoch runtime of each task.
+  double net_runtime = 0.0;
+  int net_priority = 0;
+  
+  TaskHeap::iterator ti;
+  for (ti = _active.begin(); ti != _active.end(); ++ti) {
+    AsyncTask *task = (*ti);
+    double runtime = max(task->get_average_dt(), 0.0);
+    int priority = max(task->_priority, 1);
+    net_runtime += runtime;
+    net_priority += priority;
+  }
+  
+  // That gives us a timeslice budget per priority value.
+  double average_budget = net_runtime / (double)net_priority;
+  
+  TaskHeap keep, postpone;
+  for (ti = _active.begin(); ti != _active.end(); ++ti) {
+    AsyncTask *task = (*ti);
+    double runtime = max(task->get_average_dt(), 0.0);
+    int priority = max(task->_priority, 1);
+    double consumed = runtime / (double)priority;
+    //    cerr << *task << " consumed " << consumed << " vs. " << average_budget << "\n";
+    if (consumed > average_budget) {
+      // Postpone.  Run this task next epoch.
+      postpone.push_back(task);
+    } else {
+      // Keep, and run this task this epoch.
+      keep.push_back(task);
+    }
+  }
+
+  if (keep.empty()) {
+    // Hmm, nothing to keep.  Grab the postponed task with the highest
+    // priority and keep that instead.
+    nassertv(!postpone.empty());
+    ti = postpone.begin();
+    TaskHeap::iterator max_ti = ti;
+    ++ti;
+    while (ti != postpone.end()) {
+      if ((*ti)->_priority > (*max_ti)->_priority) {
+        max_ti = ti;
+      }
+    }
+
+    //    cerr << "Nothing to keep, keeping " << *(*max_ti) << " instead\n";
+
+    keep.push_back(*max_ti);
+    postpone.erase(max_ti);
+  }
+   
+  _active.swap(keep);
+  if (_pickup_mode) {
+    _this_active.insert(_this_active.end(), postpone.begin(), postpone.end());
+  } else {
+    _next_active.insert(_next_active.end(), postpone.begin(), postpone.end());
+  }
+
+  nassertv(!_active.empty());
+}
+
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 //     Function: AsyncTaskChain::do_stop_threads
 //     Function: AsyncTaskChain::do_stop_threads
 //       Access: Protected
 //       Access: Protected
@@ -907,6 +1081,10 @@ do_get_active_tasks() const {
     AsyncTask *task = (*ti);
     AsyncTask *task = (*ti);
     result.add_task(task);
     result.add_task(task);
   }
   }
+  for (ti = _this_active.begin(); ti != _this_active.end(); ++ti) {
+    AsyncTask *task = (*ti);
+    result.add_task(task);
+  }
   for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
   for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
     AsyncTask *task = (*ti);
     AsyncTask *task = (*ti);
     result.add_task(task);
     result.add_task(task);
@@ -948,25 +1126,56 @@ do_poll() {
   if (!_threads.empty()) {
   if (!_threads.empty()) {
     return;
     return;
   }
   }
-  
-  while (!_active.empty()) {
-    if (_state == S_shutdown || _state == S_aborting) {
-      return;
-    }
-    int frame = _manager->_clock->get_frame_count();
-    if (_current_frame != frame) {
-      _current_frame = frame;
-      _time_in_frame = 0.0;
-    }
-    if (_frame_budget >= 0.0 && _time_in_frame > _frame_budget) {
-      return;
+
+  nassertv(!_pickup_mode);
+
+  do {
+    while (!_active.empty()) {
+      if (_state == S_shutdown || _state == S_aborting) {
+        return;
+      }
+      int frame = _manager->_clock->get_frame_count();
+      if (_current_frame != frame) {
+        _current_frame = frame;
+        _time_in_frame = 0.0;
+      }
+      if (_frame_budget >= 0.0 && _time_in_frame >= _frame_budget) {
+        // If we've exceeded our budget, stop here.  We'll resume from
+        // this point at the next call to poll().
+        cleanup_pickup_mode();
+        return;
+      }
+      
+      _current_sort = _active.front()->get_sort();
+      service_one_task(NULL);
     }
     }
+    
+    finish_sort_group();
+  } while (_pickup_mode);
+}
 
 
-    _current_sort = _active.front()->get_sort();
-    service_one_task(NULL);
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskChain::cleanup_pickup_mode
+//       Access: Protected
+//  Description: Clean up the damage from setting pickup mode.  This
+//               means we restore the _active and _next_active lists
+//               as they should have been without pickup mode, for
+//               next frame.  Assumes the lock is held.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskChain::
+cleanup_pickup_mode() {
+  if (_pickup_mode) {
+    _pickup_mode = false;
+
+    // Move everything to the _next_active queue.
+    _next_active.insert(_next_active.end(), _this_active.begin(), _this_active.end());
+    _this_active.clear();
+    _next_active.insert(_next_active.end(), _active.begin(), _active.end());
+    _active.clear();
+
+    // Now finish the epoch properly.
+    finish_sort_group();
   }
   }
-
-  finish_sort_group();
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
@@ -1018,6 +1227,10 @@ do_write(ostream &out, int indent_level) const {
     indent(out, indent_level + 2) 
     indent(out, indent_level + 2) 
       << "frame budget " << _frame_budget << " s\n";
       << "frame budget " << _frame_budget << " s\n";
   }
   }
+  if (_timeslice_priority) {
+    indent(out, indent_level + 2) 
+      << "timeslice priority\n";
+  }
   if (_tick_clock) {
   if (_tick_clock) {
     indent(out, indent_level + 2) 
     indent(out, indent_level + 2) 
       << "tick clock\n";
       << "tick clock\n";
@@ -1160,7 +1373,8 @@ thread_main() {
 
 
       // If we've exceeded our frame budget, sleep until the next
       // If we've exceeded our frame budget, sleep until the next
       // frame.
       // frame.
-      while (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame > _chain->_frame_budget) {
+      while (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame >= _chain->_frame_budget) {
+        _chain->cleanup_pickup_mode();
         _chain->_manager->_frame_cvar.wait();
         _chain->_manager->_frame_cvar.wait();
         frame = _chain->_manager->_clock->get_frame_count();
         frame = _chain->_manager->_clock->get_frame_count();
         if (_chain->_current_frame != frame) {
         if (_chain->_current_frame != frame) {

+ 9 - 1
panda/src/event/asyncTaskChain.h

@@ -74,6 +74,9 @@ PUBLISHED:
   void set_frame_budget(double frame_budget);
   void set_frame_budget(double frame_budget);
   double get_frame_budget() const;
   double get_frame_budget() const;
 
 
+  void set_timeslice_priority(bool timeslice_priority);
+  bool get_timeslice_priority() const;
+
   BLOCKING void stop_threads();
   BLOCKING void stop_threads();
   void start_threads();
   void start_threads();
   INLINE bool is_started() const;
   INLINE bool is_started() const;
@@ -108,11 +111,13 @@ protected:
   void service_one_task(AsyncTaskChainThread *thread);
   void service_one_task(AsyncTaskChainThread *thread);
   void cleanup_task(AsyncTask *task, bool clean_exit);
   void cleanup_task(AsyncTask *task, bool clean_exit);
   bool finish_sort_group();
   bool finish_sort_group();
+  void filter_timeslice_priority();
   void do_stop_threads();
   void do_stop_threads();
   void do_start_threads();
   void do_start_threads();
   AsyncTaskCollection do_get_active_tasks() const;
   AsyncTaskCollection do_get_active_tasks() const;
   AsyncTaskCollection do_get_sleeping_tasks() const;
   AsyncTaskCollection do_get_sleeping_tasks() const;
   void do_poll();
   void do_poll();
+  void cleanup_pickup_mode();
   double do_get_next_wake_time() const;
   double do_get_next_wake_time() const;
   void do_output(ostream &out) const;
   void do_output(ostream &out) const;
   void do_write(ostream &out, int indent_level) const;
   void do_write(ostream &out, int indent_level) const;
@@ -153,7 +158,7 @@ protected:
 
 
   AsyncTaskManager *_manager;
   AsyncTaskManager *_manager;
 
 
-  ConditionVarFull _cvar;  // signaled when _active, _next_active, _sleeping, _state, or _current_sort changes, or a task finishes.
+  ConditionVarFull _cvar;  // signaled when one of the task heaps, _state, or _current_sort changes, or a task finishes.
 
 
   enum State {
   enum State {
     S_initial,  // no threads yet
     S_initial,  // no threads yet
@@ -163,6 +168,7 @@ protected:
   };
   };
 
 
   bool _tick_clock;
   bool _tick_clock;
+  bool _timeslice_priority;
   int _num_threads;
   int _num_threads;
   ThreadPriority _thread_priority;
   ThreadPriority _thread_priority;
   Threads _threads;
   Threads _threads;
@@ -170,10 +176,12 @@ protected:
   int _num_busy_threads;
   int _num_busy_threads;
   int _num_tasks;
   int _num_tasks;
   TaskHeap _active;
   TaskHeap _active;
+  TaskHeap _this_active;
   TaskHeap _next_active;
   TaskHeap _next_active;
   TaskHeap _sleeping;
   TaskHeap _sleeping;
   State _state;
   State _state;
   int _current_sort;
   int _current_sort;
+  bool _pickup_mode;
   bool _needs_cleanup;
   bool _needs_cleanup;
 
 
   int _current_frame;
   int _current_frame;

+ 1 - 0
panda/src/event/pythonTask.cxx

@@ -378,6 +378,7 @@ do_task() {
     case DS_done:
     case DS_done:
     case DS_cont:
     case DS_cont:
     case DS_again:
     case DS_again:
+    case DS_pickup:
       // Legitimate value.
       // Legitimate value.
       Py_DECREF(result);
       Py_DECREF(result);
       return (DoneStatus)retval;
       return (DoneStatus)retval;