Przeglądaj źródła

fix error at shutdown

David Rose 17 lat temu
rodzic
commit
58f3f8f322

+ 88 - 22
panda/src/event/asyncTaskChain.cxx

@@ -105,6 +105,12 @@ void AsyncTaskChain::
 set_num_threads(int num_threads) {
 set_num_threads(int num_threads) {
   nassertv(num_threads >= 0);
   nassertv(num_threads >= 0);
 
 
+  if (task_cat.is_debug()) {
+    do_output(task_cat.debug());
+    task_cat.debug(false)
+      << ": set_num_threads(" << num_threads << ")\n";
+  }
+
   if (!Thread::is_threading_supported()) {
   if (!Thread::is_threading_supported()) {
     num_threads = 0;
     num_threads = 0;
   }
   }
@@ -591,36 +597,62 @@ do_wait_for_tasks() {
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 void AsyncTaskChain::
 void AsyncTaskChain::
 do_cleanup() {
 do_cleanup() {
+  if (task_cat.is_spam()) {
+    do_output(task_cat.spam());
+    task_cat.spam(false)
+      << ": do_cleanup()\n";
+  }
   do_stop_threads();
   do_stop_threads();
+  _num_threads = 0;
 
 
-  // Move aside the task lists first.  We must do this before we start
-  // iterating, because cleanup_task() might release the lock,
-  // allowing the iterators to become invalid.
+  // Don't call the upon_death functions while we clean up the tasks.
+  // Instead, store all the tasks in a list as we clean them up, and
+  // then call the upon_death functions all at once.  We do this
+  // because calling upon_death wil release the lock, allowing the
+  // iterators to become invalid.
 
 
-  TaskHeap active, this_active, next_active, sleeping;
-  active.swap(_active);
-  this_active.swap(_this_active);
-  next_active.swap(_next_active);
-  sleeping.swap(_sleeping);
+  TaskHeap dead;
+  dead.reserve(_num_tasks);
 
 
   _needs_cleanup = false;
   _needs_cleanup = false;
 
 
   TaskHeap::const_iterator ti;
   TaskHeap::const_iterator ti;
-  for (ti = active.begin(); ti != active.end(); ++ti) {
+  for (ti = _active.begin(); ti != _active.end(); ++ti) {
     AsyncTask *task = (*ti);
     AsyncTask *task = (*ti);
-    cleanup_task(task, true, false);
+    dead.push_back(task);
+    cleanup_task(task, false, false);
   }
   }
-  for (ti = this_active.begin(); ti != this_active.end(); ++ti) {
+  for (ti = _this_active.begin(); ti != _this_active.end(); ++ti) {
     AsyncTask *task = (*ti);
     AsyncTask *task = (*ti);
-    cleanup_task(task, true, false);
+    dead.push_back(task);
+    cleanup_task(task, false, false);
   }
   }
-  for (ti = next_active.begin(); ti != next_active.end(); ++ti) {
+  for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) {
     AsyncTask *task = (*ti);
     AsyncTask *task = (*ti);
-    cleanup_task(task, true, false);
+    dead.push_back(task);
+    cleanup_task(task, false, false);
   }
   }
-  for (ti = sleeping.begin(); ti != sleeping.end(); ++ti) {
+  for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) {
     AsyncTask *task = (*ti);
     AsyncTask *task = (*ti);
-    cleanup_task(task, true, false);
+    dead.push_back(task);
+    cleanup_task(task, false, false);
+  }
+
+  // There might still be one task remaining: the currently-executing
+  // task.
+  nassertv(_num_tasks == 0 || _num_tasks == 1);
+  
+  // Now go back and call the upon_death functions.
+  _manager->_lock.release();
+  for (ti = dead.begin(); ti != dead.end(); ++ti) {
+    (*ti)->upon_death(false);
+  }
+  _manager->_lock.lock();
+
+  if (task_cat.is_spam()) {
+    do_output(task_cat.spam());
+    task_cat.spam(false)
+      << ": done do_cleanup()\n";
   }
   }
 }
 }
 
 
@@ -684,8 +716,8 @@ service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
 
 
     if (task_cat.is_spam()) {
     if (task_cat.is_spam()) {
       task_cat.spam()
       task_cat.spam()
-        << "Servicing " << *task << " in " << *Thread::get_current_thread()
-        << "\n";
+        << "Servicing " << *task << " in "
+        << *Thread::get_current_thread() << "\n";
     }
     }
 
 
     nassertv(task->get_sort() == _current_sort);
     nassertv(task->get_sort() == _current_sort);
@@ -755,13 +787,22 @@ service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
             _cvar.signal_all();
             _cvar.signal_all();
           }
           }
           break;
           break;
-
           
           
         default:
         default:
           // The task has finished.
           // The task has finished.
           cleanup_task(task, true, true);
           cleanup_task(task, true, true);
         }
         }
       }
       }
+    } else {
+      task_cat.error()
+        << "Task is no longer on chain " << get_name() 
+        << ": " << *task << "\n";
+    }
+
+    if (task_cat.is_spam()) {
+      task_cat.spam()
+        << "Done servicing " << *task << " in "
+        << *Thread::get_current_thread() << "\n";
     }
     }
   }
   }
 }
 }
@@ -782,6 +823,13 @@ service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
 ////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////
 void AsyncTaskChain::
 void AsyncTaskChain::
 cleanup_task(AsyncTask *task, bool upon_death, bool clean_exit) {
 cleanup_task(AsyncTask *task, bool upon_death, bool clean_exit) {
+  if (task_cat.is_spam()) {
+    do_output(task_cat.spam());
+    task_cat.spam(false)
+      << ": cleanup_task(" << *task << ", " << upon_death << ", " << clean_exit
+      << ")\n";
+  }
+
   nassertv(task->_chain == this);
   nassertv(task->_chain == this);
   PT(AsyncTask) hold_task = task;
   PT(AsyncTask) hold_task = task;
 
 
@@ -1026,7 +1074,8 @@ do_stop_threads() {
       task_cat.debug()
       task_cat.debug()
         << "Stopping " << _threads.size() 
         << "Stopping " << _threads.size() 
         << " threads for " << _manager->get_name()
         << " threads for " << _manager->get_name()
-        << " chain " << get_name() << "\n";
+        << " chain " << get_name() 
+        << " in " << *Thread::get_current_thread() << "\n";
     }
     }
 
 
     _state = S_shutdown;
     _state = S_shutdown;
@@ -1047,11 +1096,18 @@ do_stop_threads() {
           << *Thread::get_current_thread() << "\n";
           << *Thread::get_current_thread() << "\n";
       }
       }
       (*ti)->join();
       (*ti)->join();
+      if (task_cat.is_spam()) {
+        task_cat.spam()
+          << "Done waiting for " << *(*ti) << " in " 
+          << *Thread::get_current_thread() << "\n";
+      }
     }
     }
     _manager->_lock.lock();
     _manager->_lock.lock();
     
     
     _state = S_initial;
     _state = S_initial;
-    nassertv(_num_busy_threads == 0);
+
+    // There might be one busy "thread" still: the main thread.
+    nassertv(_num_busy_threads == 0 || _num_busy_threads == 1);
     cleanup_pickup_mode();
     cleanup_pickup_mode();
   }
   }
 }
 }
@@ -1070,7 +1126,6 @@ do_start_threads() {
 
 
   if (_state == S_initial) {
   if (_state == S_initial) {
     _state = S_started;
     _state = S_started;
-    _num_busy_threads = 0;
     if (Thread::is_threading_supported() && _num_threads > 0) {
     if (Thread::is_threading_supported() && _num_threads > 0) {
       if (task_cat.is_debug()) {
       if (task_cat.is_debug()) {
         task_cat.debug()
         task_cat.debug()
@@ -1184,7 +1239,18 @@ do_poll() {
       }
       }
       
       
       _current_sort = _active.front()->get_sort();
       _current_sort = _active.front()->get_sort();
+
+      // Normally, there won't be any threads running at the same time
+      // we're in poll().  But it's possible, if someone calls
+      // set_num_threads() while we're processing.
+      _num_busy_threads++;
       service_one_task(NULL);
       service_one_task(NULL);
+      _num_busy_threads--;
+      _cvar.signal_all();
+
+      if (!_threads.empty()) {
+        return;
+      }
     }
     }
     
     
     finish_sort_group();
     finish_sort_group();

+ 6 - 0
panda/src/event/asyncTaskManager.cxx

@@ -66,6 +66,12 @@ void AsyncTaskManager::
 cleanup() {
 cleanup() {
   MutexHolder holder(_lock);
   MutexHolder holder(_lock);
 
 
+  if (task_cat.is_spam()) {
+    do_output(task_cat.spam());
+    task_cat.spam(false)
+      << ": cleanup()\n";
+  }
+
   // Iterate carefully in case the tasks adjust the chain list within
   // Iterate carefully in case the tasks adjust the chain list within
   // cleanup().
   // cleanup().
   while (!_task_chains.empty()) {
   while (!_task_chains.empty()) {