tracy_concurrentqueue.h 53 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446
  1. // Provides a C++11 implementation of a multi-producer, multi-consumer lock-free queue.
  2. // An overview, including benchmark results, is provided here:
  3. // http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++
  4. // The full design is also described in excruciating detail at:
  5. // http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue
  6. // Simplified BSD license:
  7. // Copyright (c) 2013-2016, Cameron Desrochers.
  8. // All rights reserved.
  9. //
  10. // Redistribution and use in source and binary forms, with or without modification,
  11. // are permitted provided that the following conditions are met:
  12. //
  13. // - Redistributions of source code must retain the above copyright notice, this list of
  14. // conditions and the following disclaimer.
  15. // - Redistributions in binary form must reproduce the above copyright notice, this list of
  16. // conditions and the following disclaimer in the documentation and/or other materials
  17. // provided with the distribution.
  18. //
  19. // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
  20. // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
  21. // MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
  22. // THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  23. // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
  24. // OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  25. // HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
  26. // TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
  27. // EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  28. #pragma once
  29. #include "../common/TracyAlloc.hpp"
  30. #include "../common/TracyForceInline.hpp"
  31. #include "../common/TracySystem.hpp"
  32. #if defined(__GNUC__)
  33. // Disable -Wconversion warnings (spuriously triggered when Traits::size_t and
  34. // Traits::index_t are set to < 32 bits, causing integer promotion, causing warnings
  35. // upon assigning any computed values)
  36. #pragma GCC diagnostic push
  37. #pragma GCC diagnostic ignored "-Wconversion"
  38. #endif
  39. #if defined(__APPLE__)
  40. #include "TargetConditionals.h"
  41. #endif
  42. #include <atomic> // Requires C++11. Sorry VS2010.
  43. #include <cassert>
  44. #include <cstddef> // for max_align_t
  45. #include <cstdint>
  46. #include <cstdlib>
  47. #include <type_traits>
  48. #include <algorithm>
  49. #include <utility>
  50. #include <limits>
  51. #include <climits> // for CHAR_BIT
  52. #include <array>
  53. #include <thread> // partly for __WINPTHREADS_VERSION if on MinGW-w64 w/ POSIX threading
  54. namespace tracy
  55. {
  56. // Compiler-specific likely/unlikely hints
  57. namespace moodycamel { namespace details {
  58. #if defined(__GNUC__)
  59. inline bool cqLikely(bool x) { return __builtin_expect((x), true); }
  60. inline bool cqUnlikely(bool x) { return __builtin_expect((x), false); }
  61. #else
  62. inline bool cqLikely(bool x) { return x; }
  63. inline bool cqUnlikely(bool x) { return x; }
  64. #endif
  65. } }
  66. namespace
  67. {
  68. // to avoid MSVC warning 4127: conditional expression is constant
  69. template <bool>
  70. struct compile_time_condition
  71. {
  72. static const bool value = false;
  73. };
  74. template <>
  75. struct compile_time_condition<true>
  76. {
  77. static const bool value = true;
  78. };
  79. }
  80. namespace moodycamel {
  81. namespace details {
  82. template<typename T>
  83. struct const_numeric_max {
  84. static_assert(std::is_integral<T>::value, "const_numeric_max can only be used with integers");
  85. static const T value = std::numeric_limits<T>::is_signed
  86. ? (static_cast<T>(1) << (sizeof(T) * CHAR_BIT - 1)) - static_cast<T>(1)
  87. : static_cast<T>(-1);
  88. };
  89. #if defined(__GLIBCXX__)
  90. typedef ::max_align_t std_max_align_t; // libstdc++ forgot to add it to std:: for a while
  91. #else
  92. typedef std::max_align_t std_max_align_t; // Others (e.g. MSVC) insist it can *only* be accessed via std::
  93. #endif
  94. // Some platforms have incorrectly set max_align_t to a type with <8 bytes alignment even while supporting
  95. // 8-byte aligned scalar values (*cough* 32-bit iOS). Work around this with our own union. See issue #64.
  96. typedef union {
  97. std_max_align_t x;
  98. long long y;
  99. void* z;
  100. } max_align_t;
  101. }
  102. // Default traits for the ConcurrentQueue. To change some of the
  103. // traits without re-implementing all of them, inherit from this
  104. // struct and shadow the declarations you wish to be different;
  105. // since the traits are used as a template type parameter, the
  106. // shadowed declarations will be used where defined, and the defaults
  107. // otherwise.
  108. struct ConcurrentQueueDefaultTraits
  109. {
  110. // General-purpose size type. std::size_t is strongly recommended.
  111. typedef std::size_t size_t;
  112. // The type used for the enqueue and dequeue indices. Must be at least as
  113. // large as size_t. Should be significantly larger than the number of elements
  114. // you expect to hold at once, especially if you have a high turnover rate;
  115. // for example, on 32-bit x86, if you expect to have over a hundred million
  116. // elements or pump several million elements through your queue in a very
  117. // short space of time, using a 32-bit type *may* trigger a race condition.
  118. // A 64-bit int type is recommended in that case, and in practice will
  119. // prevent a race condition no matter the usage of the queue. Note that
  120. // whether the queue is lock-free with a 64-int type depends on the whether
  121. // std::atomic<std::uint64_t> is lock-free, which is platform-specific.
  122. typedef std::size_t index_t;
  123. // Internally, all elements are enqueued and dequeued from multi-element
  124. // blocks; this is the smallest controllable unit. If you expect few elements
  125. // but many producers, a smaller block size should be favoured. For few producers
  126. // and/or many elements, a larger block size is preferred. A sane default
  127. // is provided. Must be a power of 2.
  128. static const size_t BLOCK_SIZE = 64*1024;
  129. // For explicit producers (i.e. when using a producer token), the block is
  130. // checked for being empty by iterating through a list of flags, one per element.
  131. // For large block sizes, this is too inefficient, and switching to an atomic
  132. // counter-based approach is faster. The switch is made for block sizes strictly
  133. // larger than this threshold.
  134. static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32;
  135. // How many full blocks can be expected for a single explicit producer? This should
  136. // reflect that number's maximum for optimal performance. Must be a power of 2.
  137. static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32;
  138. // Controls the number of items that an explicit consumer (i.e. one with a token)
  139. // must consume before it causes all consumers to rotate and move on to the next
  140. // internal queue.
  141. static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256;
  142. // The maximum number of elements (inclusive) that can be enqueued to a sub-queue.
  143. // Enqueue operations that would cause this limit to be surpassed will fail. Note
  144. // that this limit is enforced at the block level (for performance reasons), i.e.
  145. // it's rounded up to the nearest block size.
  146. static const size_t MAX_SUBQUEUE_SIZE = details::const_numeric_max<size_t>::value;
  147. // Memory allocation can be customized if needed.
  148. // malloc should return nullptr on failure, and handle alignment like std::malloc.
  149. #if defined(malloc) || defined(free)
  150. // Gah, this is 2015, stop defining macros that break standard code already!
  151. // Work around malloc/free being special macros:
  152. static inline void* WORKAROUND_malloc(size_t size) { return malloc(size); }
  153. static inline void WORKAROUND_free(void* ptr) { return free(ptr); }
  154. static inline void* (malloc)(size_t size) { return WORKAROUND_malloc(size); }
  155. static inline void (free)(void* ptr) { return WORKAROUND_free(ptr); }
  156. #else
  157. static inline void* malloc(size_t size) { return tracy::tracy_malloc(size); }
  158. static inline void free(void* ptr) { return tracy::tracy_free(ptr); }
  159. #endif
  160. };
  161. // When producing or consuming many elements, the most efficient way is to:
  162. // 1) Use one of the bulk-operation methods of the queue with a token
  163. // 2) Failing that, use the bulk-operation methods without a token
  164. // 3) Failing that, create a token and use that with the single-item methods
  165. // 4) Failing that, use the single-parameter methods of the queue
  166. // Having said that, don't create tokens willy-nilly -- ideally there should be
  167. // a maximum of one token per thread (of each kind).
  168. struct ProducerToken;
  169. struct ConsumerToken;
  170. template<typename T, typename Traits> class ConcurrentQueue;
  171. namespace details
  172. {
  173. struct ConcurrentQueueProducerTypelessBase
  174. {
  175. ConcurrentQueueProducerTypelessBase* next;
  176. std::atomic<bool> inactive;
  177. ProducerToken* token;
  178. uint64_t threadId;
  179. ConcurrentQueueProducerTypelessBase()
  180. : next(nullptr), inactive(false), token(nullptr), threadId(0)
  181. {
  182. }
  183. };
  184. template<typename T>
  185. static inline bool circular_less_than(T a, T b)
  186. {
  187. #ifdef _MSC_VER
  188. #pragma warning(push)
  189. #pragma warning(disable: 4554)
  190. #endif
  191. static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "circular_less_than is intended to be used only with unsigned integer types");
  192. return static_cast<T>(a - b) > static_cast<T>(static_cast<T>(1) << static_cast<T>(sizeof(T) * CHAR_BIT - 1));
  193. #ifdef _MSC_VER
  194. #pragma warning(pop)
  195. #endif
  196. }
  197. template<typename U>
  198. static inline char* align_for(char* ptr)
  199. {
  200. const std::size_t alignment = std::alignment_of<U>::value;
  201. return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
  202. }
  203. template<typename T>
  204. static inline T ceil_to_pow_2(T x)
  205. {
  206. static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "ceil_to_pow_2 is intended to be used only with unsigned integer types");
  207. // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
  208. --x;
  209. x |= x >> 1;
  210. x |= x >> 2;
  211. x |= x >> 4;
  212. for (std::size_t i = 1; i < sizeof(T); i <<= 1) {
  213. x |= x >> (i << 3);
  214. }
  215. ++x;
  216. return x;
  217. }
  218. template<typename T>
  219. static inline void swap_relaxed(std::atomic<T>& left, std::atomic<T>& right)
  220. {
  221. T temp = std::move(left.load(std::memory_order_relaxed));
  222. left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed);
  223. right.store(std::move(temp), std::memory_order_relaxed);
  224. }
  225. template<typename T>
  226. static inline T const& nomove(T const& x)
  227. {
  228. return x;
  229. }
  230. template<bool Enable>
  231. struct nomove_if
  232. {
  233. template<typename T>
  234. static inline T const& eval(T const& x)
  235. {
  236. return x;
  237. }
  238. };
  239. template<>
  240. struct nomove_if<false>
  241. {
  242. template<typename U>
  243. static inline auto eval(U&& x)
  244. -> decltype(std::forward<U>(x))
  245. {
  246. return std::forward<U>(x);
  247. }
  248. };
  249. template<typename It>
  250. static inline auto deref_noexcept(It& it) noexcept -> decltype(*it)
  251. {
  252. return *it;
  253. }
  254. #if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
  255. template<typename T> struct is_trivially_destructible : std::is_trivially_destructible<T> { };
  256. #else
  257. template<typename T> struct is_trivially_destructible : std::has_trivial_destructor<T> { };
  258. #endif
  259. template<typename T> struct static_is_lock_free_num { enum { value = 0 }; };
  260. template<> struct static_is_lock_free_num<signed char> { enum { value = ATOMIC_CHAR_LOCK_FREE }; };
  261. template<> struct static_is_lock_free_num<short> { enum { value = ATOMIC_SHORT_LOCK_FREE }; };
  262. template<> struct static_is_lock_free_num<int> { enum { value = ATOMIC_INT_LOCK_FREE }; };
  263. template<> struct static_is_lock_free_num<long> { enum { value = ATOMIC_LONG_LOCK_FREE }; };
  264. template<> struct static_is_lock_free_num<long long> { enum { value = ATOMIC_LLONG_LOCK_FREE }; };
  265. template<typename T> struct static_is_lock_free : static_is_lock_free_num<typename std::make_signed<T>::type> { };
  266. template<> struct static_is_lock_free<bool> { enum { value = ATOMIC_BOOL_LOCK_FREE }; };
  267. template<typename U> struct static_is_lock_free<U*> { enum { value = ATOMIC_POINTER_LOCK_FREE }; };
  268. }
  269. struct ProducerToken
  270. {
  271. template<typename T, typename Traits>
  272. explicit ProducerToken(ConcurrentQueue<T, Traits>& queue);
  273. ProducerToken(ProducerToken&& other) noexcept
  274. : producer(other.producer)
  275. {
  276. other.producer = nullptr;
  277. if (producer != nullptr) {
  278. producer->token = this;
  279. }
  280. }
  281. inline ProducerToken& operator=(ProducerToken&& other) noexcept
  282. {
  283. swap(other);
  284. return *this;
  285. }
  286. void swap(ProducerToken& other) noexcept
  287. {
  288. std::swap(producer, other.producer);
  289. if (producer != nullptr) {
  290. producer->token = this;
  291. }
  292. if (other.producer != nullptr) {
  293. other.producer->token = &other;
  294. }
  295. }
  296. // A token is always valid unless:
  297. // 1) Memory allocation failed during construction
  298. // 2) It was moved via the move constructor
  299. // (Note: assignment does a swap, leaving both potentially valid)
  300. // 3) The associated queue was destroyed
  301. // Note that if valid() returns true, that only indicates
  302. // that the token is valid for use with a specific queue,
  303. // but not which one; that's up to the user to track.
  304. inline bool valid() const { return producer != nullptr; }
  305. ~ProducerToken()
  306. {
  307. if (producer != nullptr) {
  308. producer->token = nullptr;
  309. producer->inactive.store(true, std::memory_order_release);
  310. }
  311. }
  312. // Disable copying and assignment
  313. ProducerToken(ProducerToken const&) = delete;
  314. ProducerToken& operator=(ProducerToken const&) = delete;
  315. private:
  316. template<typename T, typename Traits> friend class ConcurrentQueue;
  317. protected:
  318. details::ConcurrentQueueProducerTypelessBase* producer;
  319. };
  320. struct ConsumerToken
  321. {
  322. template<typename T, typename Traits>
  323. explicit ConsumerToken(ConcurrentQueue<T, Traits>& q);
  324. ConsumerToken(ConsumerToken&& other) noexcept
  325. : initialOffset(other.initialOffset), lastKnownGlobalOffset(other.lastKnownGlobalOffset), itemsConsumedFromCurrent(other.itemsConsumedFromCurrent), currentProducer(other.currentProducer), desiredProducer(other.desiredProducer)
  326. {
  327. }
  328. inline ConsumerToken& operator=(ConsumerToken&& other) noexcept
  329. {
  330. swap(other);
  331. return *this;
  332. }
  333. void swap(ConsumerToken& other) noexcept
  334. {
  335. std::swap(initialOffset, other.initialOffset);
  336. std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
  337. std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
  338. std::swap(currentProducer, other.currentProducer);
  339. std::swap(desiredProducer, other.desiredProducer);
  340. }
  341. // Disable copying and assignment
  342. ConsumerToken(ConsumerToken const&) = delete;
  343. ConsumerToken& operator=(ConsumerToken const&) = delete;
  344. private:
  345. template<typename T, typename Traits> friend class ConcurrentQueue;
  346. private: // but shared with ConcurrentQueue
  347. std::uint32_t initialOffset;
  348. std::uint32_t lastKnownGlobalOffset;
  349. std::uint32_t itemsConsumedFromCurrent;
  350. details::ConcurrentQueueProducerTypelessBase* currentProducer;
  351. details::ConcurrentQueueProducerTypelessBase* desiredProducer;
  352. };
  353. template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
  354. class ConcurrentQueue
  355. {
  356. public:
  357. struct ExplicitProducer;
  358. typedef moodycamel::ProducerToken producer_token_t;
  359. typedef moodycamel::ConsumerToken consumer_token_t;
  360. typedef typename Traits::index_t index_t;
  361. typedef typename Traits::size_t size_t;
  362. static const size_t BLOCK_SIZE = static_cast<size_t>(Traits::BLOCK_SIZE);
  363. static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD);
  364. static const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE);
  365. static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = static_cast<std::uint32_t>(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE);
  366. #ifdef _MSC_VER
  367. #pragma warning(push)
  368. #pragma warning(disable: 4307) // + integral constant overflow (that's what the ternary expression is for!)
  369. #pragma warning(disable: 4309) // static_cast: Truncation of constant value
  370. #endif
  371. static const size_t MAX_SUBQUEUE_SIZE = (details::const_numeric_max<size_t>::value - static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) < BLOCK_SIZE) ? details::const_numeric_max<size_t>::value : ((static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) + (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE);
  372. #ifdef _MSC_VER
  373. #pragma warning(pop)
  374. #endif
  375. static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value, "Traits::size_t must be an unsigned integral type");
  376. static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value, "Traits::index_t must be an unsigned integral type");
  377. static_assert(sizeof(index_t) >= sizeof(size_t), "Traits::index_t must be at least as wide as Traits::size_t");
  378. static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)), "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)");
  379. static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) && !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD & (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)), "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)");
  380. static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) && !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
  381. public:
  382. // Creates a queue with at least `capacity` element slots; note that the
  383. // actual number of elements that can be inserted without additional memory
  384. // allocation depends on the number of producers and the block size (e.g. if
  385. // the block size is equal to `capacity`, only a single block will be allocated
  386. // up-front, which means only a single producer will be able to enqueue elements
  387. // without an extra allocation -- blocks aren't shared between producers).
  388. // This method is not thread safe -- it is up to the user to ensure that the
  389. // queue is fully constructed before it starts being used by other threads (this
  390. // includes making the memory effects of construction visible, possibly with a
  391. // memory barrier).
  392. explicit ConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
  393. : producerListTail(nullptr),
  394. producerCount(0),
  395. initialBlockPoolIndex(0),
  396. nextExplicitConsumerId(0),
  397. globalExplicitConsumerOffset(0)
  398. {
  399. populate_initial_block_list(capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1));
  400. }
  401. // Computes the correct amount of pre-allocated blocks for you based
  402. // on the minimum number of elements you want available at any given
  403. // time, and the maximum concurrent number of each type of producer.
  404. ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers)
  405. : producerListTail(nullptr),
  406. producerCount(0),
  407. initialBlockPoolIndex(0),
  408. nextExplicitConsumerId(0),
  409. globalExplicitConsumerOffset(0)
  410. {
  411. size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) + 2 * (maxExplicitProducers);
  412. populate_initial_block_list(blocks);
  413. }
  414. // Note: The queue should not be accessed concurrently while it's
  415. // being deleted. It's up to the user to synchronize this.
  416. // This method is not thread safe.
  417. ~ConcurrentQueue()
  418. {
  419. // Destroy producers
  420. auto ptr = producerListTail.load(std::memory_order_relaxed);
  421. while (ptr != nullptr) {
  422. auto next = ptr->next_prod();
  423. if (ptr->token != nullptr) {
  424. ptr->token->producer = nullptr;
  425. }
  426. destroy(ptr);
  427. ptr = next;
  428. }
  429. // Destroy global free list
  430. auto block = freeList.head_unsafe();
  431. while (block != nullptr) {
  432. auto next = block->freeListNext.load(std::memory_order_relaxed);
  433. if (block->dynamicallyAllocated) {
  434. destroy(block);
  435. }
  436. block = next;
  437. }
  438. // Destroy initial free list
  439. destroy_array(initialBlockPool, initialBlockPoolSize);
  440. }
  441. // Disable copying and copy assignment
  442. ConcurrentQueue(ConcurrentQueue const&) = delete;
  443. ConcurrentQueue(ConcurrentQueue&& other) = delete;
  444. ConcurrentQueue& operator=(ConcurrentQueue const&) = delete;
  445. ConcurrentQueue& operator=(ConcurrentQueue&& other) = delete;
  446. public:
  447. tracy_force_inline T* enqueue_begin(producer_token_t const& token, index_t& currentTailIndex)
  448. {
  449. return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::enqueue_begin(currentTailIndex);
  450. }
  451. template<class NotifyThread, class ProcessData>
  452. size_t try_dequeue_bulk_single(consumer_token_t& token, NotifyThread notifyThread, ProcessData processData )
  453. {
  454. if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
  455. if (!update_current_producer_after_rotation(token)) {
  456. return 0;
  457. }
  458. }
  459. size_t count = static_cast<ProducerBase*>(token.currentProducer)->dequeue_bulk(notifyThread, processData);
  460. token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(count);
  461. auto tail = producerListTail.load(std::memory_order_acquire);
  462. auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod();
  463. if (ptr == nullptr) {
  464. ptr = tail;
  465. }
  466. if( count == 0 )
  467. {
  468. while (ptr != static_cast<ProducerBase*>(token.currentProducer)) {
  469. auto dequeued = ptr->dequeue_bulk(notifyThread, processData);
  470. if (dequeued != 0) {
  471. token.currentProducer = ptr;
  472. token.itemsConsumedFromCurrent = static_cast<std::uint32_t>(dequeued);
  473. return dequeued;
  474. }
  475. ptr = ptr->next_prod();
  476. if (ptr == nullptr) {
  477. ptr = tail;
  478. }
  479. }
  480. return 0;
  481. }
  482. else
  483. {
  484. token.currentProducer = ptr;
  485. token.itemsConsumedFromCurrent = 0;
  486. return count;
  487. }
  488. }
  489. // Returns an estimate of the total number of elements currently in the queue. This
  490. // estimate is only accurate if the queue has completely stabilized before it is called
  491. // (i.e. all enqueue and dequeue operations have completed and their memory effects are
  492. // visible on the calling thread, and no further operations start while this method is
  493. // being called).
  494. // Thread-safe.
  495. size_t size_approx() const
  496. {
  497. size_t size = 0;
  498. for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
  499. size += ptr->size_approx();
  500. }
  501. return size;
  502. }
  503. // Returns true if the underlying atomic variables used by
  504. // the queue are lock-free (they should be on most platforms).
  505. // Thread-safe.
  506. static bool is_lock_free()
  507. {
  508. return
  509. details::static_is_lock_free<bool>::value == 2 &&
  510. details::static_is_lock_free<size_t>::value == 2 &&
  511. details::static_is_lock_free<std::uint32_t>::value == 2 &&
  512. details::static_is_lock_free<index_t>::value == 2 &&
  513. details::static_is_lock_free<void*>::value == 2;
  514. }
  515. private:
  516. friend struct ProducerToken;
  517. friend struct ConsumerToken;
  518. friend struct ExplicitProducer;
  519. ///////////////////////////////
  520. // Queue methods
  521. ///////////////////////////////
  522. inline bool update_current_producer_after_rotation(consumer_token_t& token)
  523. {
  524. // Ah, there's been a rotation, figure out where we should be!
  525. auto tail = producerListTail.load(std::memory_order_acquire);
  526. if (token.desiredProducer == nullptr && tail == nullptr) {
  527. return false;
  528. }
  529. auto prodCount = producerCount.load(std::memory_order_relaxed);
  530. auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed);
  531. if (details::cqUnlikely(token.desiredProducer == nullptr)) {
  532. // Aha, first time we're dequeueing anything.
  533. // Figure out our local position
  534. // Note: offset is from start, not end, but we're traversing from end -- subtract from count first
  535. std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount);
  536. token.desiredProducer = tail;
  537. for (std::uint32_t i = 0; i != offset; ++i) {
  538. token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod();
  539. if (token.desiredProducer == nullptr) {
  540. token.desiredProducer = tail;
  541. }
  542. }
  543. }
  544. std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset;
  545. if (delta >= prodCount) {
  546. delta = delta % prodCount;
  547. }
  548. for (std::uint32_t i = 0; i != delta; ++i) {
  549. token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod();
  550. if (token.desiredProducer == nullptr) {
  551. token.desiredProducer = tail;
  552. }
  553. }
  554. token.lastKnownGlobalOffset = globalOffset;
  555. token.currentProducer = token.desiredProducer;
  556. token.itemsConsumedFromCurrent = 0;
  557. return true;
  558. }
  559. ///////////////////////////
  560. // Free list
  561. ///////////////////////////
  562. template <typename N>
  563. struct FreeListNode
  564. {
  565. FreeListNode() : freeListRefs(0), freeListNext(nullptr) { }
  566. std::atomic<std::uint32_t> freeListRefs;
  567. std::atomic<N*> freeListNext;
  568. };
  569. // A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention, but
  570. // simple and correct (assuming nodes are never freed until after the free list is destroyed), and fairly
  571. // speedy under low contention.
  572. template<typename N> // N must inherit FreeListNode or have the same fields (and initialization of them)
  573. struct FreeList
  574. {
  575. FreeList() : freeListHead(nullptr) { }
  576. FreeList(FreeList&& other) : freeListHead(other.freeListHead.load(std::memory_order_relaxed)) { other.freeListHead.store(nullptr, std::memory_order_relaxed); }
  577. void swap(FreeList& other) { details::swap_relaxed(freeListHead, other.freeListHead); }
  578. FreeList(FreeList const&) = delete;
  579. FreeList& operator=(FreeList const&) = delete;
  580. inline void add(N* node)
  581. {
  582. // We know that the should-be-on-freelist bit is 0 at this point, so it's safe to
  583. // set it using a fetch_add
  584. if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
  585. // Oh look! We were the last ones referencing this node, and we know
  586. // we want to add it to the free list, so let's do it!
  587. add_knowing_refcount_is_zero(node);
  588. }
  589. }
  590. inline N* try_get()
  591. {
  592. auto head = freeListHead.load(std::memory_order_acquire);
  593. while (head != nullptr) {
  594. auto prevHead = head;
  595. auto refs = head->freeListRefs.load(std::memory_order_relaxed);
  596. if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire, std::memory_order_relaxed)) {
  597. head = freeListHead.load(std::memory_order_acquire);
  598. continue;
  599. }
  600. // Good, reference count has been incremented (it wasn't at zero), which means we can read the
  601. // next and not worry about it changing between now and the time we do the CAS
  602. auto next = head->freeListNext.load(std::memory_order_relaxed);
  603. if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) {
  604. // Yay, got the node. This means it was on the list, which means shouldBeOnFreeList must be false no
  605. // matter the refcount (because nobody else knows it's been taken off yet, it can't have been put back on).
  606. assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
  607. // Decrease refcount twice, once for our ref, and once for the list's ref
  608. head->freeListRefs.fetch_sub(2, std::memory_order_release);
  609. return head;
  610. }
  611. // OK, the head must have changed on us, but we still need to decrease the refcount we increased.
  612. // Note that we don't need to release any memory effects, but we do need to ensure that the reference
  613. // count decrement happens-after the CAS on the head.
  614. refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
  615. if (refs == SHOULD_BE_ON_FREELIST + 1) {
  616. add_knowing_refcount_is_zero(prevHead);
  617. }
  618. }
  619. return nullptr;
  620. }
  621. // Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes)
  622. N* head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); }
  623. private:
  624. inline void add_knowing_refcount_is_zero(N* node)
  625. {
  626. // Since the refcount is zero, and nobody can increase it once it's zero (except us, and we run
  627. // only one copy of this method per node at a time, i.e. the single thread case), then we know
  628. // we can safely change the next pointer of the node; however, once the refcount is back above
  629. // zero, then other threads could increase it (happens under heavy contention, when the refcount
  630. // goes to zero in between a load and a refcount increment of a node in try_get, then back up to
  631. // something non-zero, then the refcount increment is done by the other thread) -- so, if the CAS
  632. // to add the node to the actual list fails, decrease the refcount and leave the add operation to
  633. // the next thread who puts the refcount back at zero (which could be us, hence the loop).
  634. auto head = freeListHead.load(std::memory_order_relaxed);
  635. while (true) {
  636. node->freeListNext.store(head, std::memory_order_relaxed);
  637. node->freeListRefs.store(1, std::memory_order_release);
  638. if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) {
  639. // Hmm, the add failed, but we can only try again when the refcount goes back to zero
  640. if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) {
  641. continue;
  642. }
  643. }
  644. return;
  645. }
  646. }
  647. private:
  648. // Implemented like a stack, but where node order doesn't matter (nodes are inserted out of order under contention)
  649. std::atomic<N*> freeListHead;
  650. static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
  651. static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
  652. };
  653. ///////////////////////////
  654. // Block
  655. ///////////////////////////
  656. struct Block
  657. {
  658. Block()
  659. : next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), freeListNext(nullptr), shouldBeOnFreeList(false), dynamicallyAllocated(true)
  660. {
  661. }
  662. inline bool is_empty() const
  663. {
  664. if (compile_time_condition<BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD>::value) {
  665. // Check flags
  666. for (size_t i = 0; i < BLOCK_SIZE; ++i) {
  667. if (!emptyFlags[i].load(std::memory_order_relaxed)) {
  668. return false;
  669. }
  670. }
  671. // Aha, empty; make sure we have all other memory effects that happened before the empty flags were set
  672. std::atomic_thread_fence(std::memory_order_acquire);
  673. return true;
  674. }
  675. else {
  676. // Check counter
  677. if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) {
  678. std::atomic_thread_fence(std::memory_order_acquire);
  679. return true;
  680. }
  681. assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE);
  682. return false;
  683. }
  684. }
  685. // Returns true if the block is now empty (does not apply in explicit context)
  686. inline bool set_empty(index_t i)
  687. {
  688. if (BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
  689. // Set flag
  690. assert(!emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].load(std::memory_order_relaxed));
  691. emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].store(true, std::memory_order_release);
  692. return false;
  693. }
  694. else {
  695. // Increment counter
  696. auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
  697. assert(prevVal < BLOCK_SIZE);
  698. return prevVal == BLOCK_SIZE - 1;
  699. }
  700. }
  701. // Sets multiple contiguous item statuses to 'empty' (assumes no wrapping and count > 0).
  702. // Returns true if the block is now empty (does not apply in explicit context).
  703. inline bool set_many_empty(index_t i, size_t count)
  704. {
  705. if (compile_time_condition<BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD>::value) {
  706. // Set flags
  707. std::atomic_thread_fence(std::memory_order_release);
  708. i = BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1)) - count + 1;
  709. for (size_t j = 0; j != count; ++j) {
  710. assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
  711. emptyFlags[i + j].store(true, std::memory_order_relaxed);
  712. }
  713. return false;
  714. }
  715. else {
  716. // Increment counter
  717. auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release);
  718. assert(prevVal + count <= BLOCK_SIZE);
  719. return prevVal + count == BLOCK_SIZE;
  720. }
  721. }
  722. inline void set_all_empty()
  723. {
  724. if (BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
  725. // Set all flags
  726. for (size_t i = 0; i != BLOCK_SIZE; ++i) {
  727. emptyFlags[i].store(true, std::memory_order_relaxed);
  728. }
  729. }
  730. else {
  731. // Reset counter
  732. elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed);
  733. }
  734. }
  735. inline void reset_empty()
  736. {
  737. if (compile_time_condition<BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD>::value) {
  738. // Reset flags
  739. for (size_t i = 0; i != BLOCK_SIZE; ++i) {
  740. emptyFlags[i].store(false, std::memory_order_relaxed);
  741. }
  742. }
  743. else {
  744. // Reset counter
  745. elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
  746. }
  747. }
  748. inline T* operator[](index_t idx) noexcept { return static_cast<T*>(static_cast<void*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
  749. inline T const* operator[](index_t idx) const noexcept { return static_cast<T const*>(static_cast<void const*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
  750. private:
  751. // IMPORTANT: This must be the first member in Block, so that if T depends on the alignment of
  752. // addresses returned by malloc, that alignment will be preserved. Apparently clang actually
  753. // generates code that uses this assumption for AVX instructions in some cases. Ideally, we
  754. // should also align Block to the alignment of T in case it's higher than malloc's 16-byte
  755. // alignment, but this is hard to do in a cross-platform way. Assert for this case:
  756. static_assert(std::alignment_of<T>::value <= std::alignment_of<details::max_align_t>::value, "The queue does not support super-aligned types at this time");
  757. // Additionally, we need the alignment of Block itself to be a multiple of max_align_t since
  758. // otherwise the appropriate padding will not be added at the end of Block in order to make
  759. // arrays of Blocks all be properly aligned (not just the first one). We use a union to force
  760. // this.
  761. union {
  762. char elements[sizeof(T) * BLOCK_SIZE];
  763. details::max_align_t dummy;
  764. };
  765. public:
  766. Block* next;
  767. std::atomic<size_t> elementsCompletelyDequeued;
  768. std::atomic<bool> emptyFlags[BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1];
  769. public:
  770. std::atomic<std::uint32_t> freeListRefs;
  771. std::atomic<Block*> freeListNext;
  772. std::atomic<bool> shouldBeOnFreeList;
  773. bool dynamicallyAllocated; // Perhaps a better name for this would be 'isNotPartOfInitialBlockPool'
  774. };
  775. static_assert(std::alignment_of<Block>::value >= std::alignment_of<details::max_align_t>::value, "Internal error: Blocks must be at least as aligned as the type they are wrapping");
  776. ///////////////////////////
  777. // Producer base
  778. ///////////////////////////
  779. struct ProducerBase : public details::ConcurrentQueueProducerTypelessBase
  780. {
  781. ProducerBase(ConcurrentQueue* parent_) :
  782. tailIndex(0),
  783. headIndex(0),
  784. dequeueOptimisticCount(0),
  785. dequeueOvercommit(0),
  786. tailBlock(nullptr),
  787. parent(parent_)
  788. {
  789. }
  790. virtual ~ProducerBase() { };
  791. template<class NotifyThread, class ProcessData>
  792. inline size_t dequeue_bulk(NotifyThread notifyThread, ProcessData processData)
  793. {
  794. return static_cast<ExplicitProducer*>(this)->dequeue_bulk(notifyThread, processData);
  795. }
  796. inline ProducerBase* next_prod() const { return static_cast<ProducerBase*>(next); }
  797. inline size_t size_approx() const
  798. {
  799. auto tail = tailIndex.load(std::memory_order_relaxed);
  800. auto head = headIndex.load(std::memory_order_relaxed);
  801. return details::circular_less_than(head, tail) ? static_cast<size_t>(tail - head) : 0;
  802. }
  803. inline index_t getTail() const { return tailIndex.load(std::memory_order_relaxed); }
  804. protected:
  805. std::atomic<index_t> tailIndex; // Where to enqueue to next
  806. std::atomic<index_t> headIndex; // Where to dequeue from next
  807. std::atomic<index_t> dequeueOptimisticCount;
  808. std::atomic<index_t> dequeueOvercommit;
  809. Block* tailBlock;
  810. public:
  811. ConcurrentQueue* parent;
  812. };
  813. public:
  814. ///////////////////////////
  815. // Explicit queue
  816. ///////////////////////////
  817. struct ExplicitProducer : public ProducerBase
  818. {
  819. explicit ExplicitProducer(ConcurrentQueue* _parent) :
  820. ProducerBase(_parent),
  821. blockIndex(nullptr),
  822. pr_blockIndexSlotsUsed(0),
  823. pr_blockIndexSize(EXPLICIT_INITIAL_INDEX_SIZE >> 1),
  824. pr_blockIndexFront(0),
  825. pr_blockIndexEntries(nullptr),
  826. pr_blockIndexRaw(nullptr)
  827. {
  828. size_t poolBasedIndexSize = details::ceil_to_pow_2(_parent->initialBlockPoolSize) >> 1;
  829. if (poolBasedIndexSize > pr_blockIndexSize) {
  830. pr_blockIndexSize = poolBasedIndexSize;
  831. }
  832. new_block_index(0); // This creates an index with double the number of current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE
  833. }
  834. ~ExplicitProducer()
  835. {
  836. // Destruct any elements not yet dequeued.
  837. // Since we're in the destructor, we can assume all elements
  838. // are either completely dequeued or completely not (no halfways).
  839. if (this->tailBlock != nullptr) { // Note this means there must be a block index too
  840. // First find the block that's partially dequeued, if any
  841. Block* halfDequeuedBlock = nullptr;
  842. if ((this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) != 0) {
  843. // The head's not on a block boundary, meaning a block somewhere is partially dequeued
  844. // (or the head block is the tail block and was fully dequeued, but the head/tail are still not on a boundary)
  845. size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1);
  846. while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base + BLOCK_SIZE, this->headIndex.load(std::memory_order_relaxed))) {
  847. i = (i + 1) & (pr_blockIndexSize - 1);
  848. }
  849. assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base, this->headIndex.load(std::memory_order_relaxed)));
  850. halfDequeuedBlock = pr_blockIndexEntries[i].block;
  851. }
  852. // Start at the head block (note the first line in the loop gives us the head from the tail on the first iteration)
  853. auto block = this->tailBlock;
  854. do {
  855. block = block->next;
  856. if (block->ConcurrentQueue::Block::is_empty()) {
  857. continue;
  858. }
  859. size_t i = 0; // Offset into block
  860. if (block == halfDequeuedBlock) {
  861. i = static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1));
  862. }
  863. // Walk through all the items in the block; if this is the tail block, we need to stop when we reach the tail index
  864. auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE : static_cast<size_t>(this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1));
  865. while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) {
  866. (*block)[i++]->~T();
  867. }
  868. } while (block != this->tailBlock);
  869. }
  870. // Destroy all blocks that we own
  871. if (this->tailBlock != nullptr) {
  872. auto block = this->tailBlock;
  873. do {
  874. auto nextBlock = block->next;
  875. if (block->dynamicallyAllocated) {
  876. destroy(block);
  877. }
  878. else {
  879. this->parent->add_block_to_free_list(block);
  880. }
  881. block = nextBlock;
  882. } while (block != this->tailBlock);
  883. }
  884. // Destroy the block indices
  885. auto header = static_cast<BlockIndexHeader*>(pr_blockIndexRaw);
  886. while (header != nullptr) {
  887. auto prev = static_cast<BlockIndexHeader*>(header->prev);
  888. header->~BlockIndexHeader();
  889. (Traits::free)(header);
  890. header = prev;
  891. }
  892. }
  893. inline void enqueue_begin_alloc(index_t currentTailIndex)
  894. {
  895. // We reached the end of a block, start a new one
  896. if (this->tailBlock != nullptr && this->tailBlock->next->ConcurrentQueue::Block::is_empty()) {
  897. // We can re-use the block ahead of us, it's empty!
  898. this->tailBlock = this->tailBlock->next;
  899. this->tailBlock->ConcurrentQueue::Block::reset_empty();
  900. // We'll put the block on the block index (guaranteed to be room since we're conceptually removing the
  901. // last block from it first -- except instead of removing then adding, we can just overwrite).
  902. // Note that there must be a valid block index here, since even if allocation failed in the ctor,
  903. // it would have been re-attempted when adding the first block to the queue; since there is such
  904. // a block, a block index must have been successfully allocated.
  905. }
  906. else {
  907. // We're going to need a new block; check that the block index has room
  908. if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) {
  909. // Hmm, the circular block index is already full -- we'll need
  910. // to allocate a new index. Note pr_blockIndexRaw can only be nullptr if
  911. // the initial allocation failed in the constructor.
  912. new_block_index(pr_blockIndexSlotsUsed);
  913. }
  914. // Insert a new block in the circular linked list
  915. auto newBlock = this->parent->ConcurrentQueue::requisition_block();
  916. newBlock->ConcurrentQueue::Block::reset_empty();
  917. if (this->tailBlock == nullptr) {
  918. newBlock->next = newBlock;
  919. }
  920. else {
  921. newBlock->next = this->tailBlock->next;
  922. this->tailBlock->next = newBlock;
  923. }
  924. this->tailBlock = newBlock;
  925. ++pr_blockIndexSlotsUsed;
  926. }
  927. // Add block to block index
  928. auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
  929. entry.base = currentTailIndex;
  930. entry.block = this->tailBlock;
  931. blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release);
  932. pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
  933. }
  934. tracy_force_inline T* enqueue_begin(index_t& currentTailIndex)
  935. {
  936. currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
  937. if (details::cqUnlikely((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0)) {
  938. this->enqueue_begin_alloc(currentTailIndex);
  939. }
  940. return (*this->tailBlock)[currentTailIndex];
  941. }
  942. tracy_force_inline std::atomic<index_t>& get_tail_index()
  943. {
  944. return this->tailIndex;
  945. }
  946. template<class NotifyThread, class ProcessData>
  947. size_t dequeue_bulk(NotifyThread notifyThread, ProcessData processData)
  948. {
  949. auto tail = this->tailIndex.load(std::memory_order_relaxed);
  950. auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
  951. auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
  952. if (details::circular_less_than<size_t>(0, desiredCount)) {
  953. desiredCount = desiredCount < 8192 ? desiredCount : 8192;
  954. std::atomic_thread_fence(std::memory_order_acquire);
  955. auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
  956. assert(overcommit <= myDequeueCount);
  957. tail = this->tailIndex.load(std::memory_order_acquire);
  958. auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
  959. if (details::circular_less_than<size_t>(0, actualCount)) {
  960. actualCount = desiredCount < actualCount ? desiredCount : actualCount;
  961. if (actualCount < desiredCount) {
  962. this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
  963. }
  964. // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
  965. // will never exceed tail.
  966. auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
  967. // Determine which block the first element is in
  968. auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
  969. auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
  970. auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
  971. auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1);
  972. auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(firstBlockBaseIndex - headBase) / BLOCK_SIZE);
  973. auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
  974. notifyThread( this->threadId );
  975. // Iterate the blocks and dequeue
  976. auto index = firstIndex;
  977. do {
  978. auto firstIndexInBlock = index;
  979. auto endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
  980. endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
  981. auto block = localBlockIndex->entries[indexIndex].block;
  982. const auto sz = endIndex - index;
  983. processData( (*block)[index], sz );
  984. index += sz;
  985. block->ConcurrentQueue::Block::set_many_empty(firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
  986. indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
  987. } while (index != firstIndex + actualCount);
  988. return actualCount;
  989. }
  990. else {
  991. // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
  992. this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
  993. }
  994. }
  995. return 0;
  996. }
  997. private:
  998. struct BlockIndexEntry
  999. {
  1000. index_t base;
  1001. Block* block;
  1002. };
  1003. struct BlockIndexHeader
  1004. {
  1005. size_t size;
  1006. std::atomic<size_t> front; // Current slot (not next, like pr_blockIndexFront)
  1007. BlockIndexEntry* entries;
  1008. void* prev;
  1009. };
  1010. bool new_block_index(size_t numberOfFilledSlotsToExpose)
  1011. {
  1012. auto prevBlockSizeMask = pr_blockIndexSize - 1;
  1013. // Create the new block
  1014. pr_blockIndexSize <<= 1;
  1015. auto newRawPtr = static_cast<char*>((Traits::malloc)(sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * pr_blockIndexSize));
  1016. if (newRawPtr == nullptr) {
  1017. pr_blockIndexSize >>= 1; // Reset to allow graceful retry
  1018. return false;
  1019. }
  1020. auto newBlockIndexEntries = reinterpret_cast<BlockIndexEntry*>(details::align_for<BlockIndexEntry>(newRawPtr + sizeof(BlockIndexHeader)));
  1021. // Copy in all the old indices, if any
  1022. size_t j = 0;
  1023. if (pr_blockIndexSlotsUsed != 0) {
  1024. auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask;
  1025. do {
  1026. newBlockIndexEntries[j++] = pr_blockIndexEntries[i];
  1027. i = (i + 1) & prevBlockSizeMask;
  1028. } while (i != pr_blockIndexFront);
  1029. }
  1030. // Update everything
  1031. auto header = new (newRawPtr) BlockIndexHeader;
  1032. header->size = pr_blockIndexSize;
  1033. header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed);
  1034. header->entries = newBlockIndexEntries;
  1035. header->prev = pr_blockIndexRaw; // we link the new block to the old one so we can free it later
  1036. pr_blockIndexFront = j;
  1037. pr_blockIndexEntries = newBlockIndexEntries;
  1038. pr_blockIndexRaw = newRawPtr;
  1039. blockIndex.store(header, std::memory_order_release);
  1040. return true;
  1041. }
  1042. private:
  1043. std::atomic<BlockIndexHeader*> blockIndex;
  1044. // To be used by producer only -- consumer must use the ones in referenced by blockIndex
  1045. size_t pr_blockIndexSlotsUsed;
  1046. size_t pr_blockIndexSize;
  1047. size_t pr_blockIndexFront; // Next slot (not current)
  1048. BlockIndexEntry* pr_blockIndexEntries;
  1049. void* pr_blockIndexRaw;
  1050. };
  1051. ExplicitProducer* get_explicit_producer(producer_token_t const& token)
  1052. {
  1053. return static_cast<ExplicitProducer*>(token.producer);
  1054. }
  1055. private:
  1056. //////////////////////////////////
  1057. // Block pool manipulation
  1058. //////////////////////////////////
  1059. void populate_initial_block_list(size_t blockCount)
  1060. {
  1061. initialBlockPoolSize = blockCount;
  1062. if (initialBlockPoolSize == 0) {
  1063. initialBlockPool = nullptr;
  1064. return;
  1065. }
  1066. initialBlockPool = create_array<Block>(blockCount);
  1067. if (initialBlockPool == nullptr) {
  1068. initialBlockPoolSize = 0;
  1069. }
  1070. for (size_t i = 0; i < initialBlockPoolSize; ++i) {
  1071. initialBlockPool[i].dynamicallyAllocated = false;
  1072. }
  1073. }
  1074. inline Block* try_get_block_from_initial_pool()
  1075. {
  1076. if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) {
  1077. return nullptr;
  1078. }
  1079. auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed);
  1080. return index < initialBlockPoolSize ? (initialBlockPool + index) : nullptr;
  1081. }
  1082. inline void add_block_to_free_list(Block* block)
  1083. {
  1084. freeList.add(block);
  1085. }
  1086. inline void add_blocks_to_free_list(Block* block)
  1087. {
  1088. while (block != nullptr) {
  1089. auto next = block->next;
  1090. add_block_to_free_list(block);
  1091. block = next;
  1092. }
  1093. }
  1094. inline Block* try_get_block_from_free_list()
  1095. {
  1096. return freeList.try_get();
  1097. }
  1098. // Gets a free block from one of the memory pools, or allocates a new one (if applicable)
  1099. Block* requisition_block()
  1100. {
  1101. auto block = try_get_block_from_initial_pool();
  1102. if (block != nullptr) {
  1103. return block;
  1104. }
  1105. block = try_get_block_from_free_list();
  1106. if (block != nullptr) {
  1107. return block;
  1108. }
  1109. return create<Block>();
  1110. }
  1111. //////////////////////////////////
  1112. // Producer list manipulation
  1113. //////////////////////////////////
  1114. ProducerBase* recycle_or_create_producer()
  1115. {
  1116. bool recycled;
  1117. return recycle_or_create_producer(recycled);
  1118. }
  1119. ProducerBase* recycle_or_create_producer(bool& recycled)
  1120. {
  1121. // Try to re-use one first
  1122. for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
  1123. if (ptr->inactive.load(std::memory_order_relaxed)) {
  1124. if( ptr->size_approx() == 0 )
  1125. {
  1126. bool expected = true;
  1127. if (ptr->inactive.compare_exchange_strong(expected, /* desired */ false, std::memory_order_acquire, std::memory_order_relaxed)) {
  1128. // We caught one! It's been marked as activated, the caller can have it
  1129. recycled = true;
  1130. return ptr;
  1131. }
  1132. }
  1133. }
  1134. }
  1135. recycled = false;
  1136. return add_producer(static_cast<ProducerBase*>(create<ExplicitProducer>(this)));
  1137. }
  1138. ProducerBase* add_producer(ProducerBase* producer)
  1139. {
  1140. // Handle failed memory allocation
  1141. if (producer == nullptr) {
  1142. return nullptr;
  1143. }
  1144. producerCount.fetch_add(1, std::memory_order_relaxed);
  1145. // Add it to the lock-free list
  1146. auto prevTail = producerListTail.load(std::memory_order_relaxed);
  1147. do {
  1148. producer->next = prevTail;
  1149. } while (!producerListTail.compare_exchange_weak(prevTail, producer, std::memory_order_release, std::memory_order_relaxed));
  1150. return producer;
  1151. }
  1152. void reown_producers()
  1153. {
  1154. // After another instance is moved-into/swapped-with this one, all the
  1155. // producers we stole still think their parents are the other queue.
  1156. // So fix them up!
  1157. for (auto ptr = producerListTail.load(std::memory_order_relaxed); ptr != nullptr; ptr = ptr->next_prod()) {
  1158. ptr->parent = this;
  1159. }
  1160. }
  1161. //////////////////////////////////
  1162. // Utility functions
  1163. //////////////////////////////////
  1164. template<typename U>
  1165. static inline U* create_array(size_t count)
  1166. {
  1167. assert(count > 0);
  1168. return static_cast<U*>((Traits::malloc)(sizeof(U) * count));
  1169. }
  1170. template<typename U>
  1171. static inline void destroy_array(U* p, size_t count)
  1172. {
  1173. ((void)count);
  1174. if (p != nullptr) {
  1175. assert(count > 0);
  1176. (Traits::free)(p);
  1177. }
  1178. }
  1179. template<typename U>
  1180. static inline U* create()
  1181. {
  1182. auto p = (Traits::malloc)(sizeof(U));
  1183. return new (p) U;
  1184. }
  1185. template<typename U, typename A1>
  1186. static inline U* create(A1&& a1)
  1187. {
  1188. auto p = (Traits::malloc)(sizeof(U));
  1189. return new (p) U(std::forward<A1>(a1));
  1190. }
  1191. template<typename U>
  1192. static inline void destroy(U* p)
  1193. {
  1194. if (p != nullptr) {
  1195. p->~U();
  1196. }
  1197. (Traits::free)(p);
  1198. }
  1199. private:
  1200. std::atomic<ProducerBase*> producerListTail;
  1201. std::atomic<std::uint32_t> producerCount;
  1202. std::atomic<size_t> initialBlockPoolIndex;
  1203. Block* initialBlockPool;
  1204. size_t initialBlockPoolSize;
  1205. FreeList<Block> freeList;
  1206. std::atomic<std::uint32_t> nextExplicitConsumerId;
  1207. std::atomic<std::uint32_t> globalExplicitConsumerOffset;
  1208. };
  1209. template<typename T, typename Traits>
  1210. ProducerToken::ProducerToken(ConcurrentQueue<T, Traits>& queue)
  1211. : producer(queue.recycle_or_create_producer())
  1212. {
  1213. if (producer != nullptr) {
  1214. producer->token = this;
  1215. producer->threadId = detail::GetThreadHandleImpl();
  1216. }
  1217. }
  1218. template<typename T, typename Traits>
  1219. ConsumerToken::ConsumerToken(ConcurrentQueue<T, Traits>& queue)
  1220. : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr)
  1221. {
  1222. initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
  1223. lastKnownGlobalOffset = static_cast<std::uint32_t>(-1);
  1224. }
  1225. template<typename T, typename Traits>
  1226. inline void swap(ConcurrentQueue<T, Traits>& a, ConcurrentQueue<T, Traits>& b) noexcept
  1227. {
  1228. a.swap(b);
  1229. }
  1230. inline void swap(ProducerToken& a, ProducerToken& b) noexcept
  1231. {
  1232. a.swap(b);
  1233. }
  1234. inline void swap(ConsumerToken& a, ConsumerToken& b) noexcept
  1235. {
  1236. a.swap(b);
  1237. }
  1238. }
  1239. } /* namespace tracy */
  1240. #if defined(__GNUC__)
  1241. #pragma GCC diagnostic pop
  1242. #endif