Przeglądaj źródła

more threading issues

David Rose 17 lat temu
rodzic
commit
c59bfe5459

+ 8 - 2
direct/src/task/TaskNew.py

@@ -267,6 +267,10 @@ class TaskManager:
         return self.mgr.remove(tasks)
         return self.mgr.remove(tasks)
 
 
     def step(self):
     def step(self):
+        self.__doStep()
+        self.mgr.stopThreads()
+
+    def __doStep(self):
         # Replace keyboard interrupt handler during task list processing
         # Replace keyboard interrupt handler during task list processing
         # so we catch the keyboard interrupt but don't handle it until
         # so we catch the keyboard interrupt but don't handle it until
         # after task list processing is complete.
         # after task list processing is complete.
@@ -299,12 +303,12 @@ class TaskManager:
             self.resumeFunc()
             self.resumeFunc()
 
 
         if self.stepping:
         if self.stepping:
-            self.step()
+            self.__doStep()
         else:
         else:
             self.running = True
             self.running = True
             while self.running:
             while self.running:
                 try:
                 try:
-                    self.step()
+                    self.__doStep()
                 except KeyboardInterrupt:
                 except KeyboardInterrupt:
                     self.stop()
                     self.stop()
                 except IOError, ioError:
                 except IOError, ioError:
@@ -335,6 +339,8 @@ class TaskManager:
                     else:
                     else:
                         raise
                         raise
 
 
+        self.mgr.stopThreads()
+
     def _unpackIOError(self, ioError):
     def _unpackIOError(self, ioError):
         # IOError unpack from http://www.python.org/doc/essays/stdexceptions/
         # IOError unpack from http://www.python.org/doc/essays/stdexceptions/
         # this needs to be in its own method, exceptions that occur inside
         # this needs to be in its own method, exceptions that occur inside

+ 20 - 9
panda/src/event/asyncTask.cxx

@@ -173,15 +173,7 @@ set_task_chain(const string &chain_name) {
         chain_a->do_remove(this);
         chain_a->do_remove(this);
         _chain_name = chain_name;
         _chain_name = chain_name;
 
 
-        AsyncTaskChain *chain_b = manager->do_find_task_chain(_chain_name);
-        if (chain_b == (AsyncTaskChain *)NULL) {
-          task_cat.warning()
-            << "Creating implicit AsyncTaskChain " << _chain_name
-            << " for " << manager->get_type() << " "
-            << manager->get_name() << "\n";
-          chain_b = manager->do_make_task_chain(_chain_name);
-        }
-        chain_b->do_add(this);
+        jump_to_task_chain(manager);
 
 
       } else {
       } else {
         // If it's sleeping, currently being serviced, or something
         // If it's sleeping, currently being serviced, or something
@@ -305,6 +297,25 @@ output(ostream &out) const {
   }
   }
 }
 }
 
 
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTask::jump_to_task_chain
+//       Access: Protected
+//  Description: Switches the AsyncTask to its new task chain, named
+//               by _chain_name.  Called internally only.
+////////////////////////////////////////////////////////////////////
+void AsyncTask::
+jump_to_task_chain(AsyncTaskManager *manager) {
+  AsyncTaskChain *chain_b = manager->do_find_task_chain(_chain_name);
+  if (chain_b == (AsyncTaskChain *)NULL) {
+    task_cat.warning()
+      << "Creating implicit AsyncTaskChain " << _chain_name
+      << " for " << manager->get_type() << " "
+      << manager->get_name() << "\n";
+    chain_b = manager->do_make_task_chain(_chain_name);
+  }
+  chain_b->do_add(this);
+}
+
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 //     Function: AsyncTask::unlock_and_do_task
 //     Function: AsyncTask::unlock_and_do_task
 //       Access: Protected
 //       Access: Protected

+ 1 - 0
panda/src/event/asyncTask.h

@@ -108,6 +108,7 @@ PUBLISHED:
   virtual void output(ostream &out) const;
   virtual void output(ostream &out) const;
 
 
 protected:
 protected:
+  void jump_to_task_chain(AsyncTaskManager *manager);
   DoneStatus unlock_and_do_task();
   DoneStatus unlock_and_do_task();
 
 
   virtual DoneStatus do_task();
   virtual DoneStatus do_task();

+ 47 - 19
panda/src/event/asyncTaskChain.cxx

@@ -485,7 +485,6 @@ do_add(AsyncTask *task) {
   ++(_manager->_num_tasks);
   ++(_manager->_num_tasks);
   _needs_cleanup = true;
   _needs_cleanup = true;
 
 
-  task->upon_birth();
   _cvar.signal_all();
   _cvar.signal_all();
 }
 }
 
 
@@ -521,7 +520,7 @@ do_remove(AsyncTask *task) {
       _sleeping.erase(_sleeping.begin() + index);
       _sleeping.erase(_sleeping.begin() + index);
       make_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
       make_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
       removed = true;
       removed = true;
-      cleanup_task(task, false);
+      cleanup_task(task);
     }
     }
     break;
     break;
     
     
@@ -542,7 +541,7 @@ do_remove(AsyncTask *task) {
         }
         }
       }
       }
       removed = true;
       removed = true;
-      cleanup_task(task, false);
+      cleanup_task(task);
     }
     }
   }
   }
 
 
@@ -607,19 +606,23 @@ do_cleanup() {
   TaskHeap::const_iterator ti;
   TaskHeap::const_iterator ti;
   for (ti = active.begin(); ti != active.end(); ++ti) {
   for (ti = active.begin(); ti != active.end(); ++ti) {
     AsyncTask *task = (*ti);
     AsyncTask *task = (*ti);
-    cleanup_task(task, false);
+    cleanup_task(task);
+    task->upon_death(false);
   }
   }
   for (ti = this_active.begin(); ti != this_active.end(); ++ti) {
   for (ti = this_active.begin(); ti != this_active.end(); ++ti) {
     AsyncTask *task = (*ti);
     AsyncTask *task = (*ti);
-    cleanup_task(task, false);
+    cleanup_task(task);
+    task->upon_death(false);
   }
   }
   for (ti = next_active.begin(); ti != next_active.end(); ++ti) {
   for (ti = next_active.begin(); ti != next_active.end(); ++ti) {
     AsyncTask *task = (*ti);
     AsyncTask *task = (*ti);
-    cleanup_task(task, false);
+    cleanup_task(task);
+    task->upon_death(false);
   }
   }
   for (ti = sleeping.begin(); ti != sleeping.end(); ++ti) {
   for (ti = sleeping.begin(); ti != sleeping.end(); ++ti) {
     AsyncTask *task = (*ti);
     AsyncTask *task = (*ti);
-    cleanup_task(task, false);
+    cleanup_task(task);
+    task->upon_death(false);
   }
   }
 }
 }
 
 
@@ -700,12 +703,16 @@ service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
     task->_servicing_thread = NULL;
     task->_servicing_thread = NULL;
 
 
     if (task->_chain == this) {
     if (task->_chain == this) {
-      // TODO: check task->_chain_name to see if the task wants to
-      // jump chains.
-
       if (task->_state == AsyncTask::S_servicing_removed) {
       if (task->_state == AsyncTask::S_servicing_removed) {
         // This task wants to kill itself.
         // This task wants to kill itself.
-        cleanup_task(task, false);
+        cleanup_task(task);
+        task->upon_death(false);
+
+      } else if (task->_chain_name != get_name()) {
+        // The task wants to jump to a different chain.
+        PT(AsyncTask) hold_task = task;
+        cleanup_task(task);
+        task->jump_to_task_chain(_manager);
 
 
       } else {
       } else {
         switch (ds) {
         switch (ds) {
@@ -747,7 +754,8 @@ service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
 
 
         case AsyncTask::DS_abort:
         case AsyncTask::DS_abort:
           // The task had an exception and wants to raise a big flag.
           // The task had an exception and wants to raise a big flag.
-          cleanup_task(task, false);
+          cleanup_task(task);
+          task->upon_death(false);
           if (_state == S_started) {
           if (_state == S_started) {
             _state = S_aborting;
             _state = S_aborting;
             _cvar.signal_all();
             _cvar.signal_all();
@@ -756,7 +764,8 @@ service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
           
           
         default:
         default:
           // The task has finished.
           // The task has finished.
-          cleanup_task(task, true);
+          cleanup_task(task);
+          task->upon_death(true);
         }
         }
       }
       }
     }
     }
@@ -774,7 +783,7 @@ service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
 //               this method.
 //               this method.
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 void AsyncTaskChain::
 void AsyncTaskChain::
-cleanup_task(AsyncTask *task, bool clean_exit) {
+cleanup_task(AsyncTask *task) {
   nassertv(task->_chain == this);
   nassertv(task->_chain == this);
   PT(AsyncTask) hold_task = task;
   PT(AsyncTask) hold_task = task;
 
 
@@ -785,7 +794,6 @@ cleanup_task(AsyncTask *task, bool clean_exit) {
   --(_manager->_num_tasks);
   --(_manager->_num_tasks);
 
 
   _manager->remove_task_by_name(task);
   _manager->remove_task_by_name(task);
-  task->upon_death(clean_exit);
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
@@ -1008,8 +1016,16 @@ filter_timeslice_priority() {
 void AsyncTaskChain::
 void AsyncTaskChain::
 do_stop_threads() {
 do_stop_threads() {
   if (_state == S_started || _state == S_aborting) {
   if (_state == S_started || _state == S_aborting) {
+    if (task_cat.is_debug() && !_threads.empty()) {
+      task_cat.debug()
+        << "Stopping " << _threads.size() 
+        << " threads for " << _manager->get_name()
+        << " chain " << get_name() << "\n";
+    }
+
     _state = S_shutdown;
     _state = S_shutdown;
     _cvar.signal_all();
     _cvar.signal_all();
+    _manager->_frame_cvar.signal_all();
     
     
     Threads wait_threads;
     Threads wait_threads;
     wait_threads.swap(_threads);
     wait_threads.swap(_threads);
@@ -1019,12 +1035,18 @@ do_stop_threads() {
     _manager->_lock.release();
     _manager->_lock.release();
     Threads::iterator ti;
     Threads::iterator ti;
     for (ti = wait_threads.begin(); ti != wait_threads.end(); ++ti) {
     for (ti = wait_threads.begin(); ti != wait_threads.end(); ++ti) {
+      if (task_cat.is_debug()) {
+        task_cat.debug()
+          << "Waiting for " << *(*ti) << " in " 
+          << *Thread::get_current_thread() << "\n";
+      }
       (*ti)->join();
       (*ti)->join();
     }
     }
     _manager->_lock.lock();
     _manager->_lock.lock();
     
     
     _state = S_initial;
     _state = S_initial;
     nassertv(_num_busy_threads == 0);
     nassertv(_num_busy_threads == 0);
+    cleanup_pickup_mode();
   }
   }
 }
 }
 
 
@@ -1043,7 +1065,12 @@ do_start_threads() {
   if (_state == S_initial) {
   if (_state == S_initial) {
     _state = S_started;
     _state = S_started;
     _num_busy_threads = 0;
     _num_busy_threads = 0;
-    if (Thread::is_threading_supported()) {
+    if (Thread::is_threading_supported() && _num_threads > 0) {
+      if (task_cat.is_debug()) {
+        task_cat.debug()
+          << "Starting " << _num_threads << " threads for "
+          << _manager->get_name() << " chain " << get_name() << "\n";
+      }
       _needs_cleanup = true;
       _needs_cleanup = true;
       _threads.reserve(_num_threads);
       _threads.reserve(_num_threads);
       for (int i = 0; i < _num_threads; ++i) {
       for (int i = 0; i < _num_threads; ++i) {
@@ -1257,13 +1284,13 @@ do_write(ostream &out, int indent_level) const {
   // Collect a list of all active tasks, then sort them into order for
   // Collect a list of all active tasks, then sort them into order for
   // output.
   // output.
   TaskHeap tasks = _active;
   TaskHeap tasks = _active;
+  tasks.insert(tasks.end(), _this_active.begin(), _this_active.end());
   tasks.insert(tasks.end(), _next_active.begin(), _next_active.end());
   tasks.insert(tasks.end(), _next_active.begin(), _next_active.end());
 
 
   Threads::const_iterator thi;
   Threads::const_iterator thi;
   for (thi = _threads.begin(); thi != _threads.end(); ++thi) {
   for (thi = _threads.begin(); thi != _threads.end(); ++thi) {
     AsyncTask *task = (*thi)->_servicing;
     AsyncTask *task = (*thi)->_servicing;
-    if (task != (AsyncTask *)NULL && 
-        task->_state != AsyncTask::S_servicing_removed) {
+    if (task != (AsyncTask *)NULL) {
       tasks.push_back(task);
       tasks.push_back(task);
     }
     }
   }
   }
@@ -1374,7 +1401,8 @@ thread_main() {
       // If we've exceeded our frame budget, sleep until the next
       // If we've exceeded our frame budget, sleep until the next
       // frame.
       // frame.
       if (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame >= _chain->_frame_budget) {
       if (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame >= _chain->_frame_budget) {
-        while (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame >= _chain->_frame_budget) {
+        while (_chain->_frame_budget >= 0.0 && _chain->_time_in_frame >= _chain->_frame_budget &&
+               _chain->_state != S_shutdown && _chain->_state != S_aborting) {
           _chain->cleanup_pickup_mode();
           _chain->cleanup_pickup_mode();
           _chain->_manager->_frame_cvar.wait();
           _chain->_manager->_frame_cvar.wait();
           frame = _chain->_manager->_clock->get_frame_count();
           frame = _chain->_manager->_clock->get_frame_count();

+ 1 - 1
panda/src/event/asyncTaskChain.h

@@ -109,7 +109,7 @@ protected:
   int find_task_on_heap(const TaskHeap &heap, AsyncTask *task) const;
   int find_task_on_heap(const TaskHeap &heap, AsyncTask *task) const;
 
 
   void service_one_task(AsyncTaskChainThread *thread);
   void service_one_task(AsyncTaskChainThread *thread);
-  void cleanup_task(AsyncTask *task, bool clean_exit);
+  void cleanup_task(AsyncTask *task);
   bool finish_sort_group();
   bool finish_sort_group();
   void filter_timeslice_priority();
   void filter_timeslice_priority();
   void do_stop_threads();
   void do_stop_threads();

+ 28 - 26
panda/src/event/asyncTaskManager.cxx

@@ -62,15 +62,14 @@ void AsyncTaskManager::
 cleanup() {
 cleanup() {
   MutexHolder holder(_lock);
   MutexHolder holder(_lock);
 
 
-  TaskChains::iterator tci;
-  for (tci = _task_chains.begin();
-       tci != _task_chains.end();
-       ++tci) {
-    AsyncTaskChain *chain = (*tci);
+  // Iterate carefully in case the tasks adjust the chain list within
+  // cleanup().
+  while (!_task_chains.empty()) {
+    PT(AsyncTaskChain) chain = _task_chains[_task_chains.size() - 1];
+    _task_chains.pop_back();
     chain->do_cleanup();
     chain->do_cleanup();
   }
   }
 
 
-  _task_chains.clear();
   nassertv(_num_tasks == 0 && _tasks_by_name.empty());
   nassertv(_num_tasks == 0 && _tasks_by_name.empty());
 }
 }
 
 
@@ -196,6 +195,7 @@ add(AsyncTask *task) {
     chain = do_make_task_chain(task->_chain_name);
     chain = do_make_task_chain(task->_chain_name);
   }
   }
   chain->do_add(task);
   chain->do_add(task);
+  task->upon_birth();
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
@@ -333,6 +333,7 @@ remove(const AsyncTaskCollection &tasks) {
           << "Removing " << *task << "\n";
           << "Removing " << *task << "\n";
       }
       }
       if (task->_chain->do_remove(task)) {
       if (task->_chain->do_remove(task)) {
+        task->upon_death(false);
         ++num_removed;
         ++num_removed;
       } else {
       } else {
         if (task_cat.is_debug()) {
         if (task_cat.is_debug()) {
@@ -357,11 +358,11 @@ wait_for_tasks() {
 
 
   // Wait for each of our task chains to finish.
   // Wait for each of our task chains to finish.
   while (_num_tasks > 0) {
   while (_num_tasks > 0) {
-    TaskChains::iterator tci;
-    for (tci = _task_chains.begin();
-         tci != _task_chains.end();
-         ++tci) {
-      AsyncTaskChain *chain = (*tci);
+    // We iterate through with an index, rather than with an iterator,
+    // because it's possible for a task to adjust the task_chain list
+    // during its execution.
+    for (unsigned int i = 0; i < _task_chains.size(); ++i) {
+      AsyncTaskChain *chain = _task_chains[i];
       chain->do_wait_for_tasks();
       chain->do_wait_for_tasks();
     }
     }
   }
   }
@@ -379,11 +380,11 @@ void AsyncTaskManager::
 stop_threads() {
 stop_threads() {
   MutexHolder holder(_lock);
   MutexHolder holder(_lock);
 
 
-  TaskChains::iterator tci;
-  for (tci = _task_chains.begin();
-       tci != _task_chains.end();
-       ++tci) {
-    AsyncTaskChain *chain = (*tci);
+  // We iterate through with an index, rather than with an iterator,
+  // because it's possible for a task to adjust the task_chain list
+  // during its execution.
+  for (unsigned int i = 0; i < _task_chains.size(); ++i) {
+    AsyncTaskChain *chain = _task_chains[i];
     chain->do_stop_threads();
     chain->do_stop_threads();
   }
   }
 }
 }
@@ -399,11 +400,12 @@ void AsyncTaskManager::
 start_threads() {
 start_threads() {
   MutexHolder holder(_lock);
   MutexHolder holder(_lock);
 
 
-  TaskChains::iterator tci;
-  for (tci = _task_chains.begin();
-       tci != _task_chains.end();
-       ++tci) {
-    AsyncTaskChain *chain = (*tci);
+  // We iterate through with an index, rather than with an iterator,
+  // because it's possible for a task to adjust the task_chain list
+  // during its execution.
+  for (unsigned int i = 0; i < _task_chains.size(); ++i) {
+    AsyncTaskChain *chain = _task_chains[i];
+
     chain->do_start_threads();
     chain->do_start_threads();
   }
   }
 }
 }
@@ -490,11 +492,11 @@ void AsyncTaskManager::
 poll() {
 poll() {
   MutexHolder holder(_lock);
   MutexHolder holder(_lock);
 
 
-  TaskChains::iterator tci;
-  for (tci = _task_chains.begin();
-       tci != _task_chains.end();
-       ++tci) {
-    AsyncTaskChain *chain = (*tci);
+  // We iterate through with an index, rather than with an iterator,
+  // because it's possible for a task to adjust the task_chain list
+  // during its execution.
+  for (unsigned int i = 0; i < _task_chains.size(); ++i) {
+    AsyncTaskChain *chain = _task_chains[i];
     chain->do_poll();
     chain->do_poll();
   }
   }
 
 

+ 1 - 0
panda/src/event/asyncTaskManager.h

@@ -149,6 +149,7 @@ private:
   friend class AsyncTaskChain;
   friend class AsyncTaskChain;
   friend class AsyncTaskChain::AsyncTaskChainThread;
   friend class AsyncTaskChain::AsyncTaskChainThread;
   friend class AsyncTask;
   friend class AsyncTask;
+  friend class PythonTask;
 };
 };
 
 
 INLINE ostream &operator << (ostream &out, const AsyncTaskManager &manager) {
 INLINE ostream &operator << (ostream &out, const AsyncTaskManager &manager) {

+ 23 - 15
panda/src/event/pythonTask.cxx

@@ -444,17 +444,21 @@ void PythonTask::
 upon_birth() {
 upon_birth() {
   AsyncTask::upon_birth();
   AsyncTask::upon_birth();
 
 
+  if (_owner != Py_None) {
+    _manager->_lock.release();
 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
-  // Use PyGILState to protect this asynchronous call.
-  PyGILState_STATE gstate;
-  gstate = PyGILState_Ensure();
+    // Use PyGILState to protect this asynchronous call.
+    PyGILState_STATE gstate;
+    gstate = PyGILState_Ensure();
 #endif
 #endif
-
-  call_owner_method("_addTask");
-
+    
+    call_owner_method("_addTask");
+    
 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
-  PyGILState_Release(gstate);
+    PyGILState_Release(gstate);
 #endif
 #endif
+    _manager->_lock.lock();
+  }
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
@@ -478,18 +482,22 @@ void PythonTask::
 upon_death(bool clean_exit) {
 upon_death(bool clean_exit) {
   AsyncTask::upon_death(clean_exit);
   AsyncTask::upon_death(clean_exit);
 
 
+  if (_owner != Py_None && _upon_death != Py_None) {
+    _manager->_lock.release();
 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
-  // Use PyGILState to protect this asynchronous call.
-  PyGILState_STATE gstate;
-  gstate = PyGILState_Ensure();
+    // Use PyGILState to protect this asynchronous call.
+    PyGILState_STATE gstate;
+    gstate = PyGILState_Ensure();
 #endif
 #endif
-
-  call_owner_method("_clearTask");
-  call_function(_upon_death);
-
+    
+    call_owner_method("_clearTask");
+    call_function(_upon_death);
+    
 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
 #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
-  PyGILState_Release(gstate);
+    PyGILState_Release(gstate);
 #endif
 #endif
+    _manager->_lock.lock();
+  }
 }
 }
 
 
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////

+ 1 - 1
panda/src/pgraph/loader.cxx

@@ -41,7 +41,7 @@ Loader::
 Loader(const string &name, int num_threads) :
 Loader(const string &name, int num_threads) :
   AsyncTaskManager(name)
   AsyncTaskManager(name)
 {
 {
-  PT(AsyncTaskChain) chain = make_task_chain("");
+  PT(AsyncTaskChain) chain = make_task_chain("default");
   if (num_threads < 0) {
   if (num_threads < 0) {
     // -1 means the default number of threads.
     // -1 means the default number of threads.