Browse Source

pstats: Switch from AtomicAdjust to C++11-style atomics

rdb 3 years ago
parent
commit
77b0d2d6a7

+ 8 - 10
panda/src/pstatclient/pStatClient.I

@@ -16,8 +16,7 @@
  */
 INLINE int PStatClient::
 get_num_collectors() const {
-  ReMutexHolder holder(_lock);
-  return (int)_num_collectors;
+  return _num_collectors.load(std::memory_order_relaxed);
 }
 
 /**
@@ -25,7 +24,7 @@ get_num_collectors() const {
  */
 INLINE PStatCollectorDef *PStatClient::
 get_collector_def(int index) const {
-  nassertr(index >= 0 && index < _num_collectors, nullptr);
+  nassertr(index >= 0 && index < get_num_collectors(), nullptr);
 
   return get_collector_ptr(index)->get_def(this, index);
 }
@@ -35,8 +34,7 @@ get_collector_def(int index) const {
  */
 INLINE int PStatClient::
 get_num_threads() const {
-  ReMutexHolder holder(_lock);
-  return (int)_num_threads;
+  return _num_threads.load(std::memory_order_relaxed);
 }
 
 /**
@@ -44,7 +42,7 @@ get_num_threads() const {
  */
 INLINE std::string PStatClient::
 get_thread_name(int index) const {
-  nassertr(index >= 0 && index < AtomicAdjust::get(_num_threads), std::string());
+  nassertr(index >= 0 && index < get_num_threads(), std::string());
   return get_thread_ptr(index)->_name;
 }
 
@@ -53,7 +51,7 @@ get_thread_name(int index) const {
  */
 INLINE std::string PStatClient::
 get_thread_sync_name(int index) const {
-  nassertr(index >= 0 && index < AtomicAdjust::get(_num_threads), std::string());
+  nassertr(index >= 0 && index < get_num_threads(), std::string());
   return get_thread_ptr(index)->_sync_name;
 }
 
@@ -62,7 +60,7 @@ get_thread_sync_name(int index) const {
  */
 INLINE PT(Thread) PStatClient::
 get_thread_object(int index) const {
-  nassertr(index >= 0 && index < AtomicAdjust::get(_num_threads), nullptr);
+  nassertr(index >= 0 && index < get_num_threads(), nullptr);
   InternalThread *thread = get_thread_ptr(index);
   return thread->_thread.lock();
 }
@@ -144,7 +142,7 @@ get_impl() const {
  */
 INLINE PStatClient::Collector *PStatClient::
 get_collector_ptr(int collector_index) const {
-  CollectorPointer *collectors = (CollectorPointer *)AtomicAdjust::get_ptr(_collectors);
+  CollectorPointer *collectors = _collectors.load(std::memory_order_consume);
   return collectors[collector_index];
 }
 
@@ -153,7 +151,7 @@ get_collector_ptr(int collector_index) const {
  */
 INLINE PStatClient::InternalThread *PStatClient::
 get_thread_ptr(int thread_index) const {
-  ThreadPointer *threads = (ThreadPointer *)AtomicAdjust::get_ptr(_threads);
+  ThreadPointer *threads = _threads.load(std::memory_order_consume);
   return threads[thread_index];
 }
 

+ 77 - 83
panda/src/pstatclient/pStatClient.cxx

@@ -77,14 +77,6 @@ PStatClient() :
   _lock("PStatClient::_lock"),
   _impl(nullptr)
 {
-  _collectors = nullptr;
-  _collectors_size = 0;
-  _num_collectors = 0;
-
-  _threads = nullptr;
-  _threads_size = 0;
-  _num_threads = 0;
-
   // We always have a collector at index 0 named "Frame".  This tracks the
   // total frame time and is the root of all other collectors.  We have to
   // make this one by hand since it's the root.
@@ -152,7 +144,7 @@ get_max_rate() const {
  */
 PStatCollector PStatClient::
 get_collector(int index) const {
-  nassertr(index >= 0 && index < AtomicAdjust::get(_num_collectors), PStatCollector());
+  nassertr(index >= 0 && index < get_num_collectors(), PStatCollector());
   return PStatCollector((PStatClient *)this, index);
 }
 
@@ -161,7 +153,7 @@ get_collector(int index) const {
  */
 string PStatClient::
 get_collector_name(int index) const {
-  nassertr(index >= 0 && index < AtomicAdjust::get(_num_collectors), string());
+  nassertr(index >= 0 && index < get_num_collectors(), string());
 
   return get_collector_ptr(index)->get_name();
 }
@@ -173,7 +165,7 @@ get_collector_name(int index) const {
  */
 string PStatClient::
 get_collector_fullname(int index) const {
-  nassertr(index >= 0 && index < AtomicAdjust::get(_num_collectors), string());
+  nassertr(index >= 0 && index < get_num_collectors(), string());
 
   Collector *collector = get_collector_ptr(index);
   int parent_index = collector->get_parent_index();
@@ -191,7 +183,7 @@ get_collector_fullname(int index) const {
 PStatThread PStatClient::
 get_thread(int index) const {
   ReMutexHolder holder(_lock);
-  nassertr(index >= 0 && index < _num_threads, PStatThread());
+  nassertr(index >= 0 && index < get_num_threads(), PStatThread());
   return PStatThread((PStatClient *)this, index);
 }
 
@@ -462,8 +454,9 @@ client_disconnect() {
     _impl = nullptr;
   }
 
-  ThreadPointer *threads = (ThreadPointer *)_threads;
-  for (int ti = 0; ti < _num_threads; ++ti) {
+  // These can be relaxed loads because we hold the lock.
+  ThreadPointer *threads = _threads.load(std::memory_order_relaxed);
+  for (int ti = 0; ti < get_num_threads(); ++ti) {
     InternalThread *thread = threads[ti];
     thread->_frame_number = 0;
     thread->_is_active = false;
@@ -471,14 +464,11 @@ client_disconnect() {
     thread->_frame_data.clear();
   }
 
-  CollectorPointer *collectors = (CollectorPointer *)_collectors;
-  for (int ci = 0; ci < _num_collectors; ++ci) {
+  CollectorPointer *collectors = _collectors.load(std::memory_order_relaxed);
+  for (int ci = 0; ci < get_num_collectors(); ++ci) {
     Collector *collector = collectors[ci];
-    PerThread::iterator ii;
-    for (ii = collector->_per_thread.begin();
-         ii != collector->_per_thread.end();
-         ++ii) {
-      (*ii)._nested_count = 0;
+    for (PerThreadData &per_thread : collector->_per_thread) {
+      per_thread._nested_count = 0;
     }
   }
 }
@@ -576,7 +566,8 @@ PStatCollector PStatClient::
 make_collector_with_name(int parent_index, const string &name) {
   ReMutexHolder holder(_lock);
 
-  nassertr(parent_index >= 0 && parent_index < _num_collectors,
+  int num_collectors = get_num_collectors();
+  nassertr(parent_index >= 0 && parent_index < num_collectors,
            PStatCollector());
 
   Collector *parent = get_collector_ptr(parent_index);
@@ -593,26 +584,25 @@ make_collector_with_name(int parent_index, const string &name) {
   if (ni != parent->_children.end()) {
     // We already had a collector by this name; return it.
     int index = (*ni).second;
-    nassertr(index >= 0 && index < _num_collectors, PStatCollector());
+    nassertr(index >= 0 && index < num_collectors, PStatCollector());
     return PStatCollector(this, (*ni).second);
   }
 
   // Create a new collector for this name.
-  int new_index = _num_collectors;
-  parent->_children.insert(ThingsByName::value_type(name, new_index));
+  parent->_children.insert(ThingsByName::value_type(name, num_collectors));
 
   Collector *collector = new Collector(parent_index, name);
-  // collector->_def = new PStatCollectorDef(new_index, name);
+  // collector->_def = new PStatCollectorDef(num_collectors, name);
   // collector->_def->set_parent(*_collectors[parent_index]._def);
   // initialize_collector_def(this, collector->_def);
 
   // We need one PerThreadData for each thread.
-  while ((int)collector->_per_thread.size() < _num_threads) {
+  while ((int)collector->_per_thread.size() < get_num_threads()) {
     collector->_per_thread.push_back(PerThreadData());
   }
   add_collector(collector);
 
-  return PStatCollector(this, new_index);
+  return PStatCollector(this, num_collectors);
 }
 
 /**
@@ -662,8 +652,8 @@ do_make_thread(Thread *thread) {
          vi != indices.end();
          ++vi) {
       int index = (*vi);
-      nassertr(index >= 0 && index < _num_threads, PStatThread());
-      ThreadPointer *threads = (ThreadPointer *)_threads;
+      nassertr(index >= 0 && index < get_num_threads(), PStatThread());
+      ThreadPointer *threads = _threads.load(std::memory_order_relaxed);
       if (threads[index]->_thread.was_deleted() &&
           threads[index]->_sync_name == thread->get_sync_name()) {
         // Yes, re-use this one.
@@ -676,7 +666,7 @@ do_make_thread(Thread *thread) {
   }
 
   // Create a new PStatsThread for this thread pointer.
-  int new_index = _num_threads;
+  int new_index = get_num_threads();
   thread->set_pstats_index(new_index);
   thread->set_pstats_callback(this);
 
@@ -693,7 +683,7 @@ do_make_thread(Thread *thread) {
 PStatThread PStatClient::
 make_gpu_thread(const string &name) {
   ReMutexHolder holder(_lock);
-  int new_index = _num_threads;
+  int new_index = get_num_threads();
 
   InternalThread *pthread = new InternalThread(name, "GPU");
   add_thread(pthread);
@@ -710,8 +700,8 @@ make_gpu_thread(const string &name) {
  */
 bool PStatClient::
 is_active(int collector_index, int thread_index) const {
-  nassertr(collector_index >= 0 && collector_index < AtomicAdjust::get(_num_collectors), false);
-  nassertr(thread_index >= 0 && thread_index < AtomicAdjust::get(_num_threads), false);
+  nassertr(collector_index >= 0 && collector_index < get_num_collectors(), false);
+  nassertr(thread_index >= 0 && thread_index < get_num_threads(), false);
 
   return (client_is_connected() &&
           get_collector_ptr(collector_index)->is_active() &&
@@ -727,8 +717,8 @@ is_active(int collector_index, int thread_index) const {
  */
 bool PStatClient::
 is_started(int collector_index, int thread_index) const {
-  nassertr(collector_index >= 0 && collector_index < AtomicAdjust::get(_num_collectors), false);
-  nassertr(thread_index >= 0 && thread_index < AtomicAdjust::get(_num_threads), false);
+  nassertr(collector_index >= 0 && collector_index < get_num_collectors(), false);
+  nassertr(thread_index >= 0 && thread_index < get_num_threads(), false);
 
   Collector *collector = get_collector_ptr(collector_index);
   InternalThread *thread = get_thread_ptr(thread_index);
@@ -758,8 +748,8 @@ start(int collector_index, int thread_index) {
   }
 
 #ifdef _DEBUG
-  nassertv(collector_index >= 0 && collector_index < AtomicAdjust::get(_num_collectors));
-  nassertv(thread_index >= 0 && thread_index < AtomicAdjust::get(_num_threads));
+  nassertv(collector_index >= 0 && collector_index < get_num_collectors());
+  nassertv(thread_index >= 0 && thread_index < get_num_threads());
 #endif
 
   Collector *collector = get_collector_ptr(collector_index);
@@ -789,8 +779,8 @@ start(int collector_index, int thread_index, double as_of) {
   }
 
 #ifdef _DEBUG
-  nassertv(collector_index >= 0 && collector_index < AtomicAdjust::get(_num_collectors));
-  nassertv(thread_index >= 0 && thread_index < AtomicAdjust::get(_num_threads));
+  nassertv(collector_index >= 0 && collector_index < get_num_collectors());
+  nassertv(thread_index >= 0 && thread_index < get_num_threads());
 #endif
 
   Collector *collector = get_collector_ptr(collector_index);
@@ -820,8 +810,8 @@ stop(int collector_index, int thread_index) {
   }
 
 #ifdef _DEBUG
-  nassertv(collector_index >= 0 && collector_index < AtomicAdjust::get(_num_collectors));
-  nassertv(thread_index >= 0 && thread_index < AtomicAdjust::get(_num_threads));
+  nassertv(collector_index >= 0 && collector_index < get_num_collectors());
+  nassertv(thread_index >= 0 && thread_index < get_num_threads());
 #endif
 
   Collector *collector = get_collector_ptr(collector_index);
@@ -862,8 +852,8 @@ stop(int collector_index, int thread_index, double as_of) {
   }
 
 #ifdef _DEBUG
-  nassertv(collector_index >= 0 && collector_index < AtomicAdjust::get(_num_collectors));
-  nassertv(thread_index >= 0 && thread_index < AtomicAdjust::get(_num_threads));
+  nassertv(collector_index >= 0 && collector_index < get_num_collectors());
+  nassertv(thread_index >= 0 && thread_index < get_num_threads());
 #endif
 
   Collector *collector = get_collector_ptr(collector_index);
@@ -905,8 +895,8 @@ clear_level(int collector_index, int thread_index) {
   }
 
 #ifdef _DEBUG
-  nassertv(collector_index >= 0 && collector_index < AtomicAdjust::get(_num_collectors));
-  nassertv(thread_index >= 0 && thread_index < AtomicAdjust::get(_num_threads));
+  nassertv(collector_index >= 0 && collector_index < get_num_collectors());
+  nassertv(thread_index >= 0 && thread_index < get_num_threads());
 #endif
 
   Collector *collector = get_collector_ptr(collector_index);
@@ -930,8 +920,8 @@ set_level(int collector_index, int thread_index, double level) {
   }
 
 #ifdef _DEBUG
-  nassertv(collector_index >= 0 && collector_index < AtomicAdjust::get(_num_collectors));
-  nassertv(thread_index >= 0 && thread_index < AtomicAdjust::get(_num_threads));
+  nassertv(collector_index >= 0 && collector_index < get_num_collectors());
+  nassertv(thread_index >= 0 && thread_index < get_num_threads());
 #endif
 
   Collector *collector = get_collector_ptr(collector_index);
@@ -963,8 +953,8 @@ add_level(int collector_index, int thread_index, double increment) {
   }
 
 #ifdef _DEBUG
-  nassertv(collector_index >= 0 && collector_index < AtomicAdjust::get(_num_collectors));
-  nassertv(thread_index >= 0 && thread_index < AtomicAdjust::get(_num_threads));
+  nassertv(collector_index >= 0 && collector_index < get_num_collectors());
+  nassertv(thread_index >= 0 && thread_index < get_num_threads());
 #endif
 
   Collector *collector = get_collector_ptr(collector_index);
@@ -991,8 +981,8 @@ get_level(int collector_index, int thread_index) const {
   }
 
 #ifdef _DEBUG
-  nassertr(collector_index >= 0 && collector_index < AtomicAdjust::get(_num_collectors), 0.0f);
-  nassertr(thread_index >= 0 && thread_index < AtomicAdjust::get(_num_threads), 0.0f);
+  nassertr(collector_index >= 0 && collector_index < get_num_collectors(), 0.0f);
+  nassertr(thread_index >= 0 && thread_index < get_num_threads(), 0.0f);
 #endif
 
   Collector *collector = get_collector_ptr(collector_index);
@@ -1050,17 +1040,19 @@ stop_clock_wait() {
  */
 void PStatClient::
 add_collector(PStatClient::Collector *collector) {
-  if (_num_collectors >= _collectors_size) {
+  int num_collectors = get_num_collectors();
+  if (num_collectors >= _collectors_size) {
     // We need to grow the array.  We have to be careful here, because there
     // might be clients accessing the array right now who are not protected by
     // the lock.
-    int new_collectors_size = (_collectors_size == 0) ? 128 : _collectors_size * 2;
+    size_t new_collectors_size = (_collectors_size == 0) ? 128 : _collectors_size * 2;
+    CollectorPointer *old_collectors = _collectors.load(std::memory_order_relaxed);
     CollectorPointer *new_collectors = new CollectorPointer[new_collectors_size];
-    if (_collectors != nullptr) {
-      memcpy(new_collectors, _collectors, _num_collectors * sizeof(CollectorPointer));
+    if (old_collectors != nullptr) {
+      memcpy(new_collectors, old_collectors, num_collectors * sizeof(CollectorPointer));
     }
-    AtomicAdjust::set_ptr(_collectors, new_collectors);
-    AtomicAdjust::set(_collectors_size, new_collectors_size);
+    _collectors_size = new_collectors_size;
+    _collectors.store(new_collectors, std::memory_order_release);
 
     // Now, we still have the old array, which we allow to leak.  We should
     // delete it, but there might be a thread out there that's still trying to
@@ -1068,14 +1060,14 @@ add_collector(PStatClient::Collector *collector) {
     // much, since it's not a big leak.  (We will only reallocate the array so
     // many times in an application, and then no more.)
 
-    new_collectors[_num_collectors] = collector;
-    AtomicAdjust::inc(_num_collectors);
-
-  } else {
-    CollectorPointer *collectors = (CollectorPointer *)_collectors;
-    collectors[_num_collectors] = collector;
-    AtomicAdjust::inc(_num_collectors);
+    new_collectors[num_collectors] = collector;
+  }
+  else {
+    CollectorPointer *collectors = _collectors.load(std::memory_order_relaxed);
+    collectors[num_collectors] = collector;
   }
+
+  _num_collectors.fetch_add(1, std::memory_order_release);
 }
 
 /**
@@ -1084,21 +1076,22 @@ add_collector(PStatClient::Collector *collector) {
  */
 void PStatClient::
 add_thread(PStatClient::InternalThread *thread) {
-  _threads_by_name[thread->_name].push_back(_num_threads);
-  _threads_by_sync_name[thread->_sync_name].push_back(_num_threads);
+  int num_threads = get_num_threads();
+  _threads_by_name[thread->_name].push_back(num_threads);
+  _threads_by_sync_name[thread->_sync_name].push_back(num_threads);
 
-  if (_num_threads >= _threads_size) {
+  if (num_threads >= _threads_size) {
     // We need to grow the array.  We have to be careful here, because there
     // might be clients accessing the array right now who are not protected by
     // the lock.
-    int new_threads_size = (_threads_size == 0) ? 128 : _threads_size * 2;
+    size_t new_threads_size = (_threads_size == 0) ? 128 : _threads_size * 2;
+    ThreadPointer *old_threads = _threads.load(std::memory_order_relaxed);
     ThreadPointer *new_threads = new ThreadPointer[new_threads_size];
-    if (_threads != nullptr) {
-      memcpy(new_threads, _threads, _num_threads * sizeof(ThreadPointer));
+    if (old_threads != nullptr) {
+      memcpy(new_threads, old_threads, num_threads * sizeof(ThreadPointer));
     }
-    // We assume that assignment to a pointer and to an int are each atomic.
-    AtomicAdjust::set_ptr(_threads, new_threads);
-    AtomicAdjust::set(_threads_size, new_threads_size);
+    _threads_size = new_threads_size;
+    _threads.store(new_threads, std::memory_order_release);
 
     // Now, we still have the old array, which we allow to leak.  We should
     // delete it, but there might be a thread out there that's still trying to
@@ -1106,22 +1099,23 @@ add_thread(PStatClient::InternalThread *thread) {
     // much, since it's not a big leak.  (We will only reallocate the array so
     // many times in an application, and then no more.)
 
-    new_threads[_num_threads] = thread;
-
-  } else {
-    ThreadPointer *threads = (ThreadPointer *)_threads;
-    threads[_num_threads] = thread;
+    new_threads[num_threads] = thread;
+  }
+  else {
+    ThreadPointer *threads = _threads.load(std::memory_order_relaxed);
+    threads[num_threads] = thread;
   }
 
-  AtomicAdjust::inc(_num_threads);
+  _num_threads.fetch_add(1, std::memory_order_release);
+  ++num_threads;
 
   // We need an additional PerThreadData for this thread in all of the
   // collectors.
-  CollectorPointer *collectors = (CollectorPointer *)_collectors;
-  for (int ci = 0; ci < _num_collectors; ++ci) {
+  CollectorPointer *collectors = _collectors.load(std::memory_order_relaxed);
+  for (int ci = 0; ci < get_num_collectors(); ++ci) {
     Collector *collector = collectors[ci];
     collector->_per_thread.push_back(PerThreadData());
-    nassertv((int)collector->_per_thread.size() == _num_threads);
+    nassertv((int)collector->_per_thread.size() == num_threads);
   }
 }
 

+ 7 - 7
panda/src/pstatclient/pStatClient.h

@@ -26,7 +26,7 @@
 #include "thread.h"
 #include "weakPointerTo.h"
 #include "vector_int.h"
-#include "atomicAdjust.h"
+#include "patomic.h"
 #include "numeric_types.h"
 #include "bitArray.h"
 
@@ -195,9 +195,9 @@ private:
     PerThread _per_thread;
   };
   typedef Collector *CollectorPointer;
-  AtomicAdjust::Pointer _collectors;  // CollectorPointer *_collectors;
-  AtomicAdjust::Integer _collectors_size;  // size of the allocated array
-  AtomicAdjust::Integer _num_collectors;   // number of in-use elements within the array
+  patomic<CollectorPointer *> _collectors {nullptr};
+  size_t _collectors_size {0};  // size of the allocated array
+  patomic<int> _num_collectors {0};   // number of in-use elements within the array
 
   // This defines a single thread, i.e.  a separate chain of execution,
   // independent of all other threads.  Timing and level data are maintained
@@ -224,9 +224,9 @@ private:
     LightMutex _thread_lock;
   };
   typedef InternalThread *ThreadPointer;
-  AtomicAdjust::Pointer _threads;  // ThreadPointer *_threads;
-  AtomicAdjust::Integer _threads_size;  // size of the allocated array
-  AtomicAdjust::Integer _num_threads;   // number of in-use elements within the array
+  patomic<ThreadPointer *> _threads {nullptr};
+  size_t _threads_size {0};  // size of the allocated array
+  patomic<int> _num_threads {0};   // number of in-use elements within the array
 
   mutable PStatClientImpl *_impl;