2
0
Эх сурвалжийг харах

start and stop pstats info from suspended tasks (SIMPLE_TASKS only)

David Rose 18 жил өмнө
parent
commit
5deb8767f9

+ 39 - 12
panda/src/pipeline/thread.I

@@ -43,6 +43,7 @@ Thread(const string &name, const string &sync_name) :
 {
   _started = false;
   _pstats_index = -1;
+  _pstats_callback = NULL;
   _pipeline_stage = 0;
 
 #ifdef DEBUG_THREADS
@@ -110,18 +111,6 @@ get_pstats_index() const {
   return _pstats_index;
 }
 
-////////////////////////////////////////////////////////////////////
-//     Function: Thread::set_pstats_index
-//       Access: Published
-//  Description: Stores a PStats index to be associated with this
-//               thread.  This is used internally by the PStatClient;
-//               you should not need to call this directly.
-////////////////////////////////////////////////////////////////////
-INLINE void Thread::
-set_pstats_index(int pstats_index) {
-  _pstats_index = pstats_index;
-}
-
 ////////////////////////////////////////////////////////////////////
 //     Function: Thread::get_pipeline_stage
 //       Access: Published
@@ -348,6 +337,44 @@ prepare_for_exit() {
   ThreadImpl::prepare_for_exit();
 }
 
+////////////////////////////////////////////////////////////////////
+//     Function: Thread::set_pstats_index
+//       Access: Public
+//  Description: Stores a PStats index to be associated with this
+//               thread.  This is used internally by the PStatClient;
+//               you should not need to call this directly.
+////////////////////////////////////////////////////////////////////
+INLINE void Thread::
+set_pstats_index(int pstats_index) {
+  _pstats_index = pstats_index;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: Thread::set_pstats_callback
+//       Access: Public
+//  Description: Stores a PStats callback to be associated with this
+//               thread.  This is used internally by the PStatClient;
+//               you should not need to call this directly.
+////////////////////////////////////////////////////////////////////
+INLINE void Thread::
+set_pstats_callback(Thread::PStatsCallback *pstats_callback) {
+  _pstats_callback = pstats_callback;
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: Thread::get_pstats_callback
+//       Access: Public
+//  Description: Returns the PStats callback associated with this thread,
+//               or NULL if no callback has yet been associated with
+//               this thread.  This is used internally by the
+//               PStatClient; you should not need to call this
+//               directly.
+////////////////////////////////////////////////////////////////////
+INLINE Thread::PStatsCallback *Thread::
+get_pstats_callback() const {
+  return _pstats_callback;
+}
+
 INLINE ostream &
 operator << (ostream &out, const Thread &thread) {
   thread.output(out);

+ 25 - 0
panda/src/pipeline/thread.cxx

@@ -201,3 +201,28 @@ init_external_thread() {
     _external_thread->ref();
   }
 }
+
+////////////////////////////////////////////////////////////////////
+//     Function: Thread::PStatsCallback::deactivate_hook
+//       Access: Public, Virtual
+//  Description: Called when the thread is deactivated (swapped for
+//               another running thread).  This is intended to provide
+//               a callback hook for PStats to assign time to
+//               individual threads properly, particularly in the
+//               SIMPLE_THREADS case.
+////////////////////////////////////////////////////////////////////
+void Thread::PStatsCallback::
+deactivate_hook(Thread *) {
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: Thread::PStatsCallback::activate_hook
+//       Access: Public, Virtual
+//  Description: Called when the thread is activated (resumes
+//               execution).  This is intended to provide a callback
+//               hook for PStats to assign time to individual threads
+//               properly, particularly in the SIMPLE_THREADS case.
+////////////////////////////////////////////////////////////////////
+void Thread::PStatsCallback::
+activate_hook(Thread *) {
+}

+ 14 - 1
panda/src/pipeline/thread.h

@@ -64,7 +64,6 @@ PUBLISHED:
   INLINE const string &get_sync_name() const;
 
   INLINE int get_pstats_index() const;
-  INLINE void set_pstats_index(int pstats_index);
 
   INLINE int get_pipeline_stage() const;
   void set_pipeline_stage(int pipeline_stage);
@@ -92,6 +91,19 @@ PUBLISHED:
 
   INLINE static void prepare_for_exit();
 
+public:
+  // This class allows integration with PStats, particularly in the
+  // SIMPLE_THREADS case.
+  class PStatsCallback {
+  public:
+    virtual void deactivate_hook(Thread *thread);
+    virtual void activate_hook(Thread *thread);
+  };
+
+  INLINE void set_pstats_index(int pstats_index);
+  INLINE void set_pstats_callback(PStatsCallback *pstats_callback);
+  INLINE PStatsCallback *get_pstats_callback() const;
+
 private:
   static void init_main_thread();
   static void init_external_thread();
@@ -105,6 +117,7 @@ private:
   ThreadImpl _impl;
   int _pstats_index;
   int _pipeline_stage;
+  PStatsCallback *_pstats_callback;
 
 #ifdef DEBUG_THREADS
   MutexDebug *_blocked_on_mutex;

+ 16 - 6
panda/src/pipeline/threadSimpleImpl.I

@@ -66,8 +66,12 @@ is_true_threads() {
 INLINE void ThreadSimpleImpl::
 sleep(double seconds) {
   ThreadSimpleManager *manager = ThreadSimpleManager::get_global_ptr();
-  ThreadSimpleImpl *thread = manager->get_current_thread();
-  thread->sleep_this(seconds);
+  if (manager->is_same_system_thread()) {
+    ThreadSimpleImpl *thread = manager->get_current_thread();
+    thread->sleep_this(seconds);
+  } else {
+    manager->system_sleep(seconds);
+  }
 }
 
 ////////////////////////////////////////////////////////////////////
@@ -78,8 +82,12 @@ sleep(double seconds) {
 INLINE void ThreadSimpleImpl::
 yield() {
   ThreadSimpleManager *manager = ThreadSimpleManager::get_global_ptr();
-  ThreadSimpleImpl *thread = manager->get_current_thread();
-  thread->yield_this();
+  if (manager->is_same_system_thread()) {
+    ThreadSimpleImpl *thread = manager->get_current_thread();
+    thread->yield_this();
+  } else {
+    manager->system_yield();
+  }
 }
 
 ////////////////////////////////////////////////////////////////////
@@ -90,8 +98,10 @@ yield() {
 INLINE void ThreadSimpleImpl::
 consider_yield() {
   ThreadSimpleManager *manager = ThreadSimpleManager::get_global_ptr();
-  ThreadSimpleImpl *thread = manager->get_current_thread();
-  thread->consider_yield_this();
+  if (manager->is_same_system_thread()) {
+    ThreadSimpleImpl *thread = manager->get_current_thread();
+    thread->consider_yield_this();
+  }
 }
 
 ////////////////////////////////////////////////////////////////////

+ 19 - 0
panda/src/pipeline/threadSimpleManager.I

@@ -27,6 +27,25 @@ get_current_thread() {
   return _current_thread;
 }
 
+////////////////////////////////////////////////////////////////////
+//     Function: ThreadSimpleManager::is_same_system_thread
+//       Access: Public
+//  Description: Returns true if we are still running within the same
+//               OS-level thread that created the ThreadSimpleManager,
+//               or false if this appears to be running in a different
+//               thread.
+////////////////////////////////////////////////////////////////////
+INLINE bool ThreadSimpleManager::
+is_same_system_thread() const {
+#ifdef HAVE_POSIX_THREADS
+  return pthread_equal(_posix_system_thread_id, pthread_self());
+#endif
+#ifdef WIN32
+  return (_win32_system_thread_id == GetCurrentThreadId());
+#endif
+  return true;
+}
+
 ////////////////////////////////////////////////////////////////////
 //     Function: ThreadSimpleManager::get_current_time
 //       Access: Public

+ 60 - 39
panda/src/pipeline/threadSimpleManager.cxx

@@ -42,6 +42,13 @@ ThreadSimpleManager() {
   _clock = TrueClock::get_global_ptr();
   _waiting_for_exit = NULL;
 
+#ifdef HAVE_POSIX_THREADS
+  _posix_system_thread_id = pthread_self();
+#endif
+#ifdef WIN32
+  _win32_system_thread_id = GetCurrentThreadId();
+#endif
+
   // Install these global pointers so very low-level code (code
   // defined before the pipeline directory) can yield when necessary.
   global_thread_yield = &Thread::force_yield;
@@ -199,9 +206,23 @@ next_context() {
   _current_thread->_python_state = PyThreadState_Swap(NULL);
 #endif  // HAVE_PYTHON
 
+#ifdef DO_PSTATS
+  Thread::PStatsCallback *pstats_callback = _current_thread->_parent_obj->get_pstats_callback();
+  if (pstats_callback != NULL) {
+    pstats_callback->deactivate_hook(_current_thread->_parent_obj);
+  }
+#endif  // DO_PSTATS
+
   save_thread_context(&_current_thread->_context, st_choose_next_context, this);
   // Pass 2: we have returned into the context, and are now resuming
   // the current thread.
+
+#ifdef DO_PSTATS
+  if (pstats_callback != NULL) {
+    pstats_callback->activate_hook(_current_thread->_parent_obj);
+  }
+#endif  // DO_PSTATS
+
 #ifdef HAVE_PYTHON
   PyThreadState_Swap(_current_thread->_python_state);
 #endif  // HAVE_PYTHON
@@ -261,6 +282,26 @@ set_current_thread(ThreadSimpleImpl *current_thread) {
   _current_thread = current_thread;
 }
 
+////////////////////////////////////////////////////////////////////
+//     Function: ThreadSimpleManager::system_sleep
+//       Access: Public, Static
+//  Description: Calls the appropriate system sleep function to sleep
+//               the whole process for the indicated number of
+//               seconds.
+////////////////////////////////////////////////////////////////////
+void ThreadSimpleManager::
+system_sleep(double seconds) {
+#ifdef WIN32
+  Sleep((int)(seconds * 1000));
+
+#else
+  struct timespec rqtp;
+  rqtp.tv_sec = time_t(seconds);
+  rqtp.tv_nsec = long((seconds - (double)rqtp.tv_sec) * 1000000000.0);
+  nanosleep(&rqtp, NULL);
+#endif  // WIN32
+}
+
 ////////////////////////////////////////////////////////////////////
 //     Function: ThreadSimpleManager::write_status
 //       Access: Public
@@ -303,6 +344,25 @@ write_status(ostream &out) const {
   }
 }
 
+////////////////////////////////////////////////////////////////////
+//     Function: ThreadSimpleManager::system_yield
+//       Access: Public, Static
+//  Description: Calls the appropriate system function to yield
+//               the whole process to any other system processes.
+////////////////////////////////////////////////////////////////////
+void ThreadSimpleManager::
+system_yield() {
+#ifdef WIN32
+  Sleep(0);
+
+#else
+  struct timespec rqtp;
+  rqtp.tv_sec = 0;
+  rqtp.tv_nsec = 0;
+  nanosleep(&rqtp, NULL);
+#endif  // WIN32
+}
+
 ////////////////////////////////////////////////////////////////////
 //     Function: ThreadSimpleManager::init_pointers
 //       Access: Private, Static
@@ -447,45 +507,6 @@ wake_sleepers(double now) {
   }
 }
 
-////////////////////////////////////////////////////////////////////
-//     Function: ThreadSimpleManager::system_sleep
-//       Access: Private, Static
-//  Description: Calls the appropriate system sleep function to sleep
-//               the whole process for the indicated number of
-//               seconds.
-////////////////////////////////////////////////////////////////////
-void ThreadSimpleManager::
-system_sleep(double seconds) {
-#ifdef WIN32
-  Sleep((int)(seconds * 1000));
-
-#else
-  struct timespec rqtp;
-  rqtp.tv_sec = time_t(seconds);
-  rqtp.tv_nsec = long((seconds - (double)rqtp.tv_sec) * 1000000000.0);
-  nanosleep(&rqtp, NULL);
-#endif  // WIN32
-}
-
-////////////////////////////////////////////////////////////////////
-//     Function: ThreadSimpleManager::system_yield
-//       Access: Private, Static
-//  Description: Calls the appropriate system function to yield
-//               the whole process to any other system processes.
-////////////////////////////////////////////////////////////////////
-void ThreadSimpleManager::
-system_yield() {
-#ifdef WIN32
-  Sleep(0);
-
-#else
-  struct timespec rqtp;
-  rqtp.tv_sec = 0;
-  rqtp.tv_nsec = 0;
-  nanosleep(&rqtp, NULL);
-#endif  // WIN32
-}
-
 ////////////////////////////////////////////////////////////////////
 //     Function: ThreadSimpleManager::report_deadlock
 //       Access: Private

+ 24 - 2
panda/src/pipeline/threadSimpleManager.h

@@ -30,6 +30,14 @@
 #include "trueClock.h"
 #include <algorithm>
 
+#ifdef HAVE_POSIX_THREADS
+#include <pthread.h>  // for pthread_t, below
+#endif
+#ifdef WIN32
+#define WIN32_LEAN_AND_MEAN
+#include <windows.h>  // for DWORD, below
+#endif
+
 class Thread;
 class ThreadSimpleImpl;
 class BlockerSimple;
@@ -66,6 +74,9 @@ public:
 
   INLINE ThreadSimpleImpl *get_current_thread();
   void set_current_thread(ThreadSimpleImpl *current_thread);
+  INLINE bool is_same_system_thread() const;
+  static void system_sleep(double seconds);
+  static void system_yield();
 
   INLINE double get_current_time() const;
   INLINE static ThreadSimpleManager *get_global_ptr();
@@ -78,8 +89,6 @@ private:
   static void st_choose_next_context(void *data);
   void choose_next_context();
   void wake_sleepers(double now);
-  static void system_sleep(double seconds);
-  static void system_yield();
   void report_deadlock();
 
   // STL function object to sort the priority queue of sleeping threads.
@@ -113,6 +122,19 @@ private:
 
   TrueClock *_clock;
 
+  // We may not mix-and-match OS threads with Panda's SIMPLE_THREADS.
+  // If we ever get a Panda context switch request from a different OS
+  // thread than the original thread, that's a serious error that may
+  // cause major consequences.  For this reason, we store the OS
+  // thread's current thread ID here when the manager is constructed,
+  // and insist that it never changes.
+#ifdef HAVE_POSIX_THREADS
+  pthread_t _posix_system_thread_id;
+#endif
+#ifdef WIN32
+  DWORD _win32_system_thread_id;
+#endif
+
   static bool _pointers_initialized;
   static ThreadSimpleManager *_global_ptr;
 };

+ 90 - 3
panda/src/pstatclient/pStatClient.cxx

@@ -580,6 +580,7 @@ do_make_thread(Thread *thread) {
         // Yes, re-use this one.
         threads[index]->_thread = thread;
         thread->set_pstats_index(index);
+        thread->set_pstats_callback(this);
         return PStatThread(this, index);
       }
     }
@@ -588,6 +589,7 @@ do_make_thread(Thread *thread) {
   // Create a new PStatsThread for this thread pointer.
   int new_index = _num_threads;
   thread->set_pstats_index(new_index);
+  thread->set_pstats_callback(this);
   _threads_by_name[thread->get_name()].push_back(new_index);
   _threads_by_sync_name[thread->get_sync_name()].push_back(new_index);
         
@@ -679,7 +681,10 @@ start(int collector_index, int thread_index) {
     if (collector->_per_thread[thread_index]._nested_count == 0) {
       // This collector wasn't already started in this thread; record
       // a new data point.
-      thread->_frame_data.add_start(collector_index, get_real_time());
+      if (thread->_thread_active) {
+        thread->_frame_data.add_start(collector_index, get_real_time());
+      }
+      thread->_active_collectors.set_bit(collector_index);
     }
     collector->_per_thread[thread_index]._nested_count++;
   }
@@ -707,7 +712,10 @@ start(int collector_index, int thread_index, float as_of) {
     if (collector->_per_thread[thread_index]._nested_count == 0) {
       // This collector wasn't already started in this thread; record
       // a new data point.
-      thread->_frame_data.add_start(collector_index, as_of);
+      if (thread->_thread_active) {
+        thread->_frame_data.add_start(collector_index, as_of);
+      }
+      thread->_active_collectors.set_bit(collector_index);
     }
     collector->_per_thread[thread_index]._nested_count++;
   }
@@ -747,7 +755,10 @@ stop(int collector_index, int thread_index) {
     if (collector->_per_thread[thread_index]._nested_count == 0) {
       // This collector has now been completely stopped; record a new
       // data point.
-      thread->_frame_data.add_stop(collector_index, get_real_time());
+      if (thread->_thread_active) {
+        thread->_frame_data.add_stop(collector_index, get_real_time());
+      }
+      thread->_active_collectors.clear_bit(collector_index);
     }
   }
 }
@@ -1025,6 +1036,81 @@ add_thread(PStatClient::InternalThread *thread) {
   }
 }
 
+////////////////////////////////////////////////////////////////////
+//     Function: PStatClient::deactivate_hook
+//       Access: Public, Virtual
+//  Description: Called when the thread is deactivated (swapped for
+//               another running thread).  This is intended to provide
+//               a callback hook for PStats to assign time to
+//               individual threads properly, particularly in the
+//               SIMPLE_THREADS case.
+////////////////////////////////////////////////////////////////////
+void PStatClient::
+deactivate_hook(Thread *thread) {
+  // We shouldn't use a mutex here, because this code is only called
+  // during the SIMPLE_THREADS case, so a mutex isn't necessary; and
+  // because we are called during a context switch, so a mutex might
+  // be dangerous.
+
+  InternalThread *ithread = get_thread_ptr(thread->get_pstats_index());
+
+  if (ithread->_thread_active) {
+    // Stop all of the active collectors for this thread.
+    double now = get_real_time();
+    int off_bit = -1;
+    int on_bit = ithread->_active_collectors.get_lowest_on_bit();
+    while (off_bit != on_bit) {
+      off_bit = ithread->_active_collectors.get_next_higher_different_bit(on_bit);
+      nassertv(off_bit != on_bit);
+      while (on_bit < off_bit) {
+        // Here's an active collector.  Record a data point indicating
+        // it stops here.
+        ithread->_frame_data.add_stop(on_bit, now);
+        ++on_bit;
+      }
+      on_bit = ithread->_active_collectors.get_next_higher_different_bit(off_bit);
+    }
+    ithread->_thread_active = false;
+  }
+}
+
+////////////////////////////////////////////////////////////////////
+//     Function: PStatClient::activate_hook
+//       Access: Public, Virtual
+//  Description: Called when the thread is activated (resumes
+//               execution).  This is intended to provide a callback
+//               hook for PStats to assign time to individual threads
+//               properly, particularly in the SIMPLE_THREADS case.
+////////////////////////////////////////////////////////////////////
+void PStatClient::
+activate_hook(Thread *thread) {
+  // We shouldn't use a mutex here, because this code is only called
+  // during the SIMPLE_THREADS case, so a mutex isn't necessary; and
+  // because we are called during a context switch, so a mutex might
+  // be dangerous.
+
+  InternalThread *ithread = get_thread_ptr(thread->get_pstats_index());
+
+  if (!ithread->_thread_active) {
+    // Resume all of the active collectors for this thread.
+    double now = get_real_time();
+    int off_bit = -1;
+    int on_bit = ithread->_active_collectors.get_lowest_on_bit();
+    while (off_bit != on_bit) {
+      off_bit = ithread->_active_collectors.get_next_higher_different_bit(on_bit);
+      nassertv(off_bit != on_bit);
+      while (on_bit < off_bit) {
+        // Here's an active collector.  Record a data point indicating
+        // it resumes here.
+        ithread->_frame_data.add_start(on_bit, now);
+        ++on_bit;
+      }
+      on_bit = ithread->_active_collectors.get_next_higher_different_bit(off_bit);
+    }
+    ithread->_thread_active = true;
+  }
+}
+
 ////////////////////////////////////////////////////////////////////
 //     Function: PStatClient::Collector::make_def
 //       Access: Private
@@ -1057,6 +1143,7 @@ InternalThread(Thread *thread) :
   _is_active(false),
   _frame_number(0),
   _next_packet(0.0),
+  _thread_active(true),
   _thread_lock(string("PStatClient::InternalThread ") + thread->get_name())
 {
 }

+ 8 - 1
panda/src/pstatclient/pStatClient.h

@@ -34,6 +34,7 @@
 #include "vector_int.h"
 #include "atomicAdjust.h"
 #include "numeric_types.h"
+#include "bitArray.h"
 
 class PStatCollector;
 class PStatCollectorDef;
@@ -57,7 +58,7 @@ class PStatThread;
 //               stub class.
 ////////////////////////////////////////////////////////////////////
 #ifdef DO_PSTATS
-class EXPCL_PANDA PStatClient : public ConnectionManager {
+class EXPCL_PANDA PStatClient : public ConnectionManager, public Thread::PStatsCallback {
 public:
   PStatClient();
   ~PStatClient();
@@ -140,6 +141,9 @@ private:
   INLINE Collector *get_collector_ptr(int collector_index) const;
   INLINE InternalThread *get_thread_ptr(int thread_index) const;
 
+  virtual void deactivate_hook(Thread *thread);
+  virtual void activate_hook(Thread *thread);
+
 private:
   // This mutex protects everything in this class.
   ReMutex _lock;
@@ -208,6 +212,9 @@ private:
     int _frame_number;
     float _next_packet;
 
+    bool _thread_active;
+    BitArray _active_collectors;
+
     // This mutex is used to protect writes to _frame_data for this
     // particular thread, as well as writes to the _per_thread data
     // for this particular thread in the Collector class, above.