testThreadSafeDeque.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. //-----------------------------------------------------------------------------
  2. // Copyright (c) 2012 GarageGames, LLC
  3. //
  4. // Permission is hereby granted, free of charge, to any person obtaining a copy
  5. // of this software and associated documentation files (the "Software"), to
  6. // deal in the Software without restriction, including without limitation the
  7. // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  8. // sell copies of the Software, and to permit persons to whom the Software is
  9. // furnished to do so, subject to the following conditions:
  10. //
  11. // The above copyright notice and this permission notice shall be included in
  12. // all copies or substantial portions of the Software.
  13. //
  14. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  19. // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  20. // IN THE SOFTWARE.
  21. //-----------------------------------------------------------------------------
  22. #include "unit/test.h"
  23. #include "platform/threads/threadSafeDeque.h"
  24. #include "platform/threads/thread.h"
  25. #include "core/util/tVector.h"
  26. #include "console/console.h"
  27. #ifndef TORQUE_SHIPPING
  28. using namespace UnitTesting;
  29. #define TEST( x ) test( ( x ), "FAIL: " #x )
  30. #define XTEST( t, x ) t->test( ( x ), "FAIL: " #x )
  31. // Test deque without concurrency.
  32. CreateUnitTest( TestThreadSafeDequeSerial, "Platform/ThreadSafeDeque/Serial" )
  33. {
  34. void test1()
  35. {
  36. ThreadSafeDeque< char > deque;
  37. String str = "teststring";
  38. for( U32 i = 0; i < str.length(); ++ i )
  39. deque.pushBack( str[ i ] );
  40. TEST( !deque.isEmpty() );
  41. for( U32 i = 0; i < str.length(); ++ i )
  42. {
  43. char ch;
  44. TEST( deque.tryPopFront( ch ) && ch == str[ i ] );
  45. }
  46. }
  47. void test2()
  48. {
  49. ThreadSafeDeque< char > deque;
  50. String str = "teststring";
  51. const char* p1 = str.c_str() + 4;
  52. const char* p2 = p1 + 1;
  53. while( *p2 )
  54. {
  55. deque.pushFront( *p1 );
  56. deque.pushBack( *p2 );
  57. -- p1;
  58. ++ p2;
  59. }
  60. #ifdef TORQUE_DEBUG
  61. deque.dumpDebug();
  62. #endif
  63. for( U32 i = 0; i < str.length(); ++ i )
  64. {
  65. char ch;
  66. TEST( deque.tryPopFront( ch ) && ch == str[ i ] );
  67. }
  68. }
  69. void test3()
  70. {
  71. ThreadSafeDeque< char > deque;
  72. String str = "teststring";
  73. const char* p1 = str.c_str() + 4;
  74. const char* p2 = p1 + 1;
  75. while( *p2 )
  76. {
  77. deque.pushFront( *p1 );
  78. deque.pushBack( *p2 );
  79. -- p1;
  80. ++ p2;
  81. }
  82. #ifdef TORQUE_DEBUG
  83. deque.dumpDebug();
  84. #endif
  85. for( S32 i = ( str.length() - 1 ); i >= 0; -- i )
  86. {
  87. char ch;
  88. TEST( deque.tryPopBack( ch ) && ch == str[ i ] );
  89. }
  90. }
  91. void test4()
  92. {
  93. ThreadSafeDeque< char > deque;
  94. char ch;
  95. TEST( deque.isEmpty() );
  96. deque.pushFront( 'a' );
  97. TEST( !deque.isEmpty() );
  98. TEST( deque.tryPopFront( ch ) );
  99. TEST( ch == 'a' );
  100. deque.pushBack( 'a' );
  101. TEST( !deque.isEmpty() );
  102. TEST( deque.tryPopFront( ch ) );
  103. TEST( ch == 'a' );
  104. deque.pushBack( 'a' );
  105. TEST( !deque.isEmpty() );
  106. TEST( deque.tryPopBack( ch ) );
  107. TEST( ch == 'a' );
  108. deque.pushFront( 'a' );
  109. TEST( !deque.isEmpty() );
  110. TEST( deque.tryPopBack( ch ) );
  111. TEST( ch == 'a' );
  112. }
  113. void run()
  114. {
  115. test1();
  116. test2();
  117. test3();
  118. test4();
  119. }
  120. };
  121. // Test deque in a concurrent setting.
  122. CreateUnitTest( TestThreadSafeDequeConcurrentSimple, "Platform/ThreadSafeDeque/ConcurrentSimple" )
  123. {
  124. public:
  125. typedef TestThreadSafeDequeConcurrentSimple TestType;
  126. enum
  127. {
  128. DEFAULT_NUM_VALUES = 100000,
  129. };
  130. struct Value : public ThreadSafeRefCount< Value >
  131. {
  132. U32 mIndex;
  133. U32 mTick;
  134. Value() {}
  135. Value( U32 index, U32 tick )
  136. : mIndex( index ), mTick( tick ) {}
  137. };
  138. typedef ThreadSafeRef< Value > ValueRef;
  139. struct Deque : public ThreadSafeDeque< ValueRef >
  140. {
  141. typedef ThreadSafeDeque<ValueRef> Parent;
  142. U32 mPushIndex;
  143. U32 mPopIndex;
  144. Deque()
  145. : mPushIndex( 0 ), mPopIndex( 0 ) {}
  146. void pushBack( const ValueRef& value )
  147. {
  148. AssertFatal( value->mIndex == mPushIndex, "index out of line" );
  149. mPushIndex ++;
  150. Parent::pushBack( value );
  151. }
  152. bool tryPopFront( ValueRef& outValue )
  153. {
  154. if( Parent::tryPopFront( outValue ) )
  155. {
  156. AssertFatal( outValue->mIndex == mPopIndex, "index out of line" );
  157. mPopIndex ++;
  158. return true;
  159. }
  160. else
  161. return false;
  162. }
  163. };
  164. Deque mDeque;
  165. Vector< U32 > mValues;
  166. struct ProducerThread : public Thread
  167. {
  168. ProducerThread( TestType* test )
  169. : Thread( 0, test ) {}
  170. virtual void run( void* arg )
  171. {
  172. _setName( "ProducerThread" );
  173. Platform::outputDebugString( "Starting ProducerThread" );
  174. TestType* test = ( TestType* ) arg;
  175. for( U32 i = 0; i < test->mValues.size(); ++ i )
  176. {
  177. U32 tick = Platform::getRealMilliseconds();
  178. test->mValues[ i ] = tick;
  179. ValueRef val = new Value( i, tick );
  180. test->mDeque.pushBack( val );
  181. }
  182. Platform::outputDebugString( "Stopping ProducerThread" );
  183. }
  184. };
  185. struct ConsumerThread : public Thread
  186. {
  187. ConsumerThread( TestType* test )
  188. : Thread( 0, test ) {}
  189. virtual void run( void* arg )
  190. {
  191. _setName( "ConsumerThread" );
  192. Platform::outputDebugString( "Starting CosumerThread" );
  193. TestType* t = ( TestType* ) arg;
  194. for( U32 i = 0; i < t->mValues.size(); ++ i )
  195. {
  196. ValueRef value;
  197. while( !t->mDeque.tryPopFront( value ) );
  198. XTEST( t, value->mIndex == i );
  199. XTEST( t, t->mValues[ i ] == value->mTick );
  200. }
  201. Platform::outputDebugString( "Stopping ConsumerThread" );
  202. }
  203. };
  204. void run()
  205. {
  206. U32 numValues = Con::getIntVariable( "$testThreadSafeDeque::numValues", DEFAULT_NUM_VALUES );
  207. mValues.setSize( numValues );
  208. ProducerThread pThread( this );
  209. ConsumerThread cThread( this );
  210. pThread.start();
  211. cThread.start();
  212. pThread.join();
  213. cThread.join();
  214. mValues.clear();
  215. }
  216. };
  217. CreateUnitTest( TestThreadSafeDequeConcurrent, "Platform/ThreadSafeDeque/Concurrent" )
  218. {
  219. public:
  220. typedef TestThreadSafeDequeConcurrent TestType;
  221. enum
  222. {
  223. DEFAULT_NUM_VALUES = 100000,
  224. DEFAULT_NUM_CONSUMERS = 10,
  225. DEFAULT_NUM_PRODUCERS = 10
  226. };
  227. struct Value : public ThreadSafeRefCount< Value >
  228. {
  229. U32 mIndex;
  230. U32 mTick;
  231. Value() {}
  232. Value( U32 index, U32 tick )
  233. : mIndex( index ), mTick( tick ) {}
  234. };
  235. typedef ThreadSafeRef< Value > ValueRef;
  236. U32 mProducerIndex;
  237. U32 mConsumerIndex;
  238. ThreadSafeDeque< ValueRef > mDeque;
  239. Vector< U32 > mValues;
  240. struct ProducerThread : public Thread
  241. {
  242. ProducerThread( TestType* test )
  243. : Thread( 0, test ) {}
  244. virtual void run( void* arg )
  245. {
  246. _setName( "ProducerThread" );
  247. Platform::outputDebugString( "Starting ProducerThread" );
  248. TestType* test = ( TestType* ) arg;
  249. while( 1 )
  250. {
  251. U32 index = test->mProducerIndex;
  252. if( index == test->mValues.size() )
  253. break;
  254. if( dCompareAndSwap( test->mProducerIndex, index, index + 1 ) )
  255. {
  256. U32 tick = Platform::getRealMilliseconds();
  257. test->mValues[ index ] = tick;
  258. ValueRef val = new Value( index, tick );
  259. test->mDeque.pushBack( val );
  260. }
  261. }
  262. Platform::outputDebugString( "Stopping ProducerThread" );
  263. }
  264. };
  265. struct ConsumerThread : public Thread
  266. {
  267. ConsumerThread( TestType* test )
  268. : Thread( 0, test ) {}
  269. virtual void run( void* arg )
  270. {
  271. _setName( "ConsumerThread" );
  272. Platform::outputDebugString( "Starting ConsumerThread" );
  273. TestType* t = ( TestType* ) arg;
  274. while( t->mConsumerIndex < t->mValues.size() )
  275. {
  276. ValueRef value;
  277. if( t->mDeque.tryPopFront( value ) )
  278. {
  279. dFetchAndAdd( t->mConsumerIndex, 1 );
  280. XTEST( t, t->mValues[ value->mIndex ] == value->mTick );
  281. t->mValues[ value->mIndex ] = 0;
  282. }
  283. }
  284. Platform::outputDebugString( "Stopping ConsumerThread" );
  285. }
  286. };
  287. void run()
  288. {
  289. U32 numValues = Con::getIntVariable( "$testThreadSafeDeque::numValues", DEFAULT_NUM_VALUES );
  290. U32 numConsumers = Con::getIntVariable( "$testThreadSafeDeque::numConsumers", DEFAULT_NUM_CONSUMERS );
  291. U32 numProducers = Con::getIntVariable( "$testThreadSafeDeque::numProducers", DEFAULT_NUM_PRODUCERS );
  292. mProducerIndex = 0;
  293. mConsumerIndex = 0;
  294. mValues.setSize( numValues );
  295. U32 tick = Platform::getRealMilliseconds();
  296. for( U32 i = 0; i < numValues; ++ i )
  297. mValues[ i ] = tick;
  298. Vector< ProducerThread* > producers;
  299. Vector< ConsumerThread* > consumers;
  300. producers.setSize( numProducers );
  301. consumers.setSize( numConsumers );
  302. for( U32 i = 0; i < numProducers; ++ i )
  303. {
  304. producers[ i ] = new ProducerThread( this );
  305. producers[ i ]->start();
  306. }
  307. for( U32 i = 0; i < numConsumers; ++ i )
  308. {
  309. consumers[ i ] = new ConsumerThread( this );
  310. consumers[ i ]->start();
  311. }
  312. for( U32 i = 0; i < numProducers; ++ i )
  313. {
  314. producers[ i ]->join();
  315. delete producers[ i ];
  316. }
  317. for( U32 i = 0; i < numConsumers; ++ i )
  318. {
  319. consumers[ i ]->join();
  320. delete consumers[ i ];
  321. }
  322. for( U32 i = 0; i < mValues.size(); ++ i )
  323. TEST( mValues[ i ] == 0 );
  324. mValues.clear();
  325. }
  326. };
  327. #endif // !TORQUE_SHIPPING