Browse Source

add stop_threads() etc

David Rose 18 years ago
parent
commit
197032bd70

+ 19 - 7
panda/src/event/asyncTaskManager.I

@@ -17,20 +17,32 @@
 ////////////////////////////////////////////////////////////////////
 
 
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskManager::is_started
+//       Access: Published
+//  Description: Returns true if the thread(s) have been started and
+//               are ready to service requests, false otherwise.  If
+//               this is false, the next call to add() or add_and_do()
+//               will automatically start the threads.
+////////////////////////////////////////////////////////////////////
+INLINE bool AsyncTaskManager::
+is_started() const {
+  return (_state == S_started);
+}
+
 ////////////////////////////////////////////////////////////////////
 //     Function: AsyncTaskManager::get_num_threads
 //       Access: Published
 //  Description: Returns the number of threads that have been created
-//               to service the tasks within this task manager.
+//               to service the tasks within this task manager.  This
+//               will return 0 before the threads have been started;
+//               it will also return 0 if thread support is not
+//               available.
 ////////////////////////////////////////////////////////////////////
 INLINE int AsyncTaskManager::
 get_num_threads() const {
-#ifdef HAVE_THREADS
-  return _num_threads;
-#else
-  // Without threading support, this is always 0 threads.
-  return 0;
-#endif
+  MutexHolder holder(_lock);
+  return _threads.size();
 }
 
 ////////////////////////////////////////////////////////////////////

+ 70 - 33
panda/src/event/asyncTaskManager.cxx

@@ -53,19 +53,58 @@ AsyncTaskManager(const string &name, int num_threads) :
 ////////////////////////////////////////////////////////////////////
 AsyncTaskManager::
 ~AsyncTaskManager() {
+  stop_threads();
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskManager::stop_threads
+//       Access: Published
+//  Description: Stops any threads that are currently running.  If any
+//               tasks are still pending and have not yet been picked
+//               up by a thread, they will not be serviced unless
+//               poll() or start_threads() is later called.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskManager::
+stop_threads() {
   if (_state == S_started) {
     // Clean up all of the threads.
     MutexHolder holder(_lock);
-    _state = S_shutdown;
-    _cvar.signal_all();
+    if (_state == S_started) {
+      _state = S_shutdown;
+      _cvar.signal_all();
+
+      Threads wait_threads;
+      wait_threads.swap(_threads);
+      
+      // We have to release the lock while we join, so the threads can
+      // wake up and see that we're shutting down.
+      _lock.release();
+      Threads::iterator ti;
+      for (ti = wait_threads.begin(); ti != wait_threads.end(); ++ti) {
+        (*ti)->join();
+      }
+      _lock.lock();
 
-    Threads::iterator ti;
-    for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
-      (*ti)->join();
+      _state = S_initial;
     }
   }
 }
 
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskManager::start_threads
+//       Access: Published
+//  Description: Starts any requested threads to service the tasks on
+//               the queue.  This is normally not necessary, since
+//               adding a task will start the threads automatically.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskManager::
+start_threads() {
+  if (_state == S_initial) {
+    MutexHolder holder(_lock);
+    do_start_threads();
+  }
+}
+
 ////////////////////////////////////////////////////////////////////
 //     Function: AsyncTaskManager::add
 //       Access: Published
@@ -81,19 +120,7 @@ add(AsyncTask *task) {
            task->_state == AsyncTask::S_inactive);
   nassertv(find_task(task) == -1);
 
-  // Attempt to start the threads, if we haven't already.
-  if (_state == S_initial) {
-    _state = S_started;
-    if (Thread::is_threading_supported()) {
-      _threads.reserve(_num_threads);
-      for (int i = 0; i < _num_threads; ++i) {
-        PT(AsyncTaskManagerThread) thread = new AsyncTaskManagerThread(this);
-        if (thread->start(TP_low, true)) {
-          _threads.push_back(thread);
-        }
-      }
-    }
-  }
+  do_start_threads();
 
   task->_manager = this;
   task->_state = AsyncTask::S_active;
@@ -118,7 +145,7 @@ add(AsyncTask *task) {
 //               execute the task before returning in the non-threaded
 //               case.  In the threaded case, this method behaves
 //               exactly the same as add().
-
+//
 //               The return value is true if the task has been added
 //               and is still pending, false if it has completed.
 ////////////////////////////////////////////////////////////////////
@@ -130,19 +157,7 @@ add_and_do(AsyncTask *task) {
            task->_state == AsyncTask::S_inactive, false);
   nassertr(find_task(task) == -1, false);
 
-  // Attempt to start the threads, if we haven't already.
-  if (_state == S_initial) {
-    _state = S_started;
-    if (Thread::is_threading_supported()) {
-      _threads.reserve(_num_threads);
-      for (int i = 0; i < _num_threads; ++i) {
-        PT(AsyncTaskManagerThread) thread = new AsyncTaskManagerThread(this);
-        if (thread->start(TP_low, true)) {
-          _threads.push_back(thread);
-        }
-      }
-    }
-  }
+  do_start_threads();
 
   task->_manager = this;
   task->_state = AsyncTask::S_active;
@@ -232,7 +247,7 @@ has_task(AsyncTask *task) const {
 void AsyncTaskManager::
 poll() {
   MutexHolder holder(_lock);
-  if (_threads.empty()) {
+  if (!_threads.empty()) {
     return;
   }
 
@@ -389,6 +404,28 @@ task_done(AsyncTask *task) {
   }
 }
 
+////////////////////////////////////////////////////////////////////
+//     Function: AsyncTaskManager::do_start_threads
+//       Access: Protected
+//  Description: The private implementation of start_threads; assumes
+//               the lock is already held.
+////////////////////////////////////////////////////////////////////
+void AsyncTaskManager::
+do_start_threads() {
+  if (_state == S_initial) {
+    _state = S_started;
+    if (Thread::is_threading_supported()) {
+      _threads.reserve(_num_threads);
+      for (int i = 0; i < _num_threads; ++i) {
+        PT(AsyncTaskManagerThread) thread = new AsyncTaskManagerThread(this);
+        if (thread->start(TP_low, true)) {
+          _threads.push_back(thread);
+        }
+      }
+    }
+  }
+}
+
 ////////////////////////////////////////////////////////////////////
 //     Function: AsyncTaskManager::AsyncTaskManagerThread::Constructor
 //       Access: Public

+ 5 - 0
panda/src/event/asyncTaskManager.h

@@ -53,6 +53,9 @@ PUBLISHED:
   virtual ~AsyncTaskManager();
 
   INLINE int get_num_threads() const;
+  BLOCKING void stop_threads();
+  void start_threads();
+  INLINE bool is_started() const;
 
   void add(AsyncTask *task);
   bool add_and_do(AsyncTask *task);
@@ -72,7 +75,9 @@ protected:
   int find_task(AsyncTask *task) const;
   void service_one_task(AsyncTaskManagerThread *thread);
   void task_done(AsyncTask *task);
+  void do_start_threads();
 
+protected:
   class AsyncTaskManagerThread : public Thread {
   public:
     AsyncTaskManagerThread(AsyncTaskManager *manager);